ruff check preview (#25653)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Asuka Minato
2025-09-16 13:58:12 +09:00
committed by GitHub
parent a0c7713494
commit bdd85b36a4
42 changed files with 224 additions and 342 deletions

View File

@@ -7,29 +7,28 @@ measuring key metrics like connection rate, event throughput, and time to first
"""
import json
import time
import logging
import os
import random
import statistics
import sys
import threading
import os
import logging
import statistics
from pathlib import Path
import time
from collections import deque
from dataclasses import asdict, dataclass
from datetime import datetime
from dataclasses import dataclass, asdict
from locust import HttpUser, task, between, events, constant
from typing import TypedDict, Literal, TypeAlias
from pathlib import Path
from typing import Literal, TypeAlias, TypedDict
import requests.exceptions
from locust import HttpUser, between, constant, events, task
# Add the stress-test directory to path to import common modules
sys.path.insert(0, str(Path(__file__).parent))
from common.config_helper import ConfigHelper # type: ignore[import-not-found]
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# Configuration from environment
@@ -54,6 +53,7 @@ ErrorType: TypeAlias = Literal[
class ErrorCounts(TypedDict):
"""Error count tracking"""
connection_error: int
timeout: int
invalid_json: int
@@ -65,6 +65,7 @@ class ErrorCounts(TypedDict):
class SSEEvent(TypedDict):
"""Server-Sent Event structure"""
data: str
event: str
id: str | None
@@ -72,11 +73,13 @@ class SSEEvent(TypedDict):
class WorkflowInputs(TypedDict):
"""Workflow input structure"""
question: str
class WorkflowRequestData(TypedDict):
"""Workflow request payload"""
inputs: WorkflowInputs
response_mode: Literal["streaming"]
user: str
@@ -84,6 +87,7 @@ class WorkflowRequestData(TypedDict):
class ParsedEventData(TypedDict, total=False):
"""Parsed event data from SSE stream"""
event: str
task_id: str
workflow_run_id: str
@@ -93,6 +97,7 @@ class ParsedEventData(TypedDict, total=False):
class LocustStats(TypedDict):
"""Locust statistics structure"""
total_requests: int
total_failures: int
avg_response_time: float
@@ -102,6 +107,7 @@ class LocustStats(TypedDict):
class ReportData(TypedDict):
"""JSON report structure"""
timestamp: str
duration_seconds: float
metrics: dict[str, object] # Metrics as dict for JSON serialization
@@ -154,7 +160,7 @@ class MetricsTracker:
self.total_connections = 0
self.total_events = 0
self.start_time = time.time()
# Enhanced metrics with memory limits
self.max_samples = 10000 # Prevent unbounded growth
self.ttfe_samples: deque[float] = deque(maxlen=self.max_samples)
@@ -233,9 +239,7 @@ class MetricsTracker:
max_ttfe = max(self.ttfe_samples)
p50_ttfe = statistics.median(self.ttfe_samples)
if len(self.ttfe_samples) >= 2:
quantiles = statistics.quantiles(
self.ttfe_samples, n=20, method="inclusive"
)
quantiles = statistics.quantiles(self.ttfe_samples, n=20, method="inclusive")
p95_ttfe = quantiles[18] # 19th of 19 quantiles = 95th percentile
else:
p95_ttfe = max_ttfe
@@ -255,9 +259,7 @@ class MetricsTracker:
if durations
else 0
)
events_per_stream_avg = (
statistics.mean(events_per_stream) if events_per_stream else 0
)
events_per_stream_avg = statistics.mean(events_per_stream) if events_per_stream else 0
# Calculate inter-event latency statistics
all_inter_event_times = []
@@ -268,32 +270,20 @@ class MetricsTracker:
inter_event_latency_avg = statistics.mean(all_inter_event_times)
inter_event_latency_p50 = statistics.median(all_inter_event_times)
inter_event_latency_p95 = (
statistics.quantiles(
all_inter_event_times, n=20, method="inclusive"
)[18]
statistics.quantiles(all_inter_event_times, n=20, method="inclusive")[18]
if len(all_inter_event_times) >= 2
else max(all_inter_event_times)
)
else:
inter_event_latency_avg = inter_event_latency_p50 = (
inter_event_latency_p95
) = 0
inter_event_latency_avg = inter_event_latency_p50 = inter_event_latency_p95 = 0
else:
stream_duration_avg = stream_duration_p50 = stream_duration_p95 = (
events_per_stream_avg
) = 0
inter_event_latency_avg = inter_event_latency_p50 = (
inter_event_latency_p95
) = 0
stream_duration_avg = stream_duration_p50 = stream_duration_p95 = events_per_stream_avg = 0
inter_event_latency_avg = inter_event_latency_p50 = inter_event_latency_p95 = 0
# Also calculate overall average rates
total_elapsed = current_time - self.start_time
overall_conn_rate = (
self.total_connections / total_elapsed if total_elapsed > 0 else 0
)
overall_event_rate = (
self.total_events / total_elapsed if total_elapsed > 0 else 0
)
overall_conn_rate = self.total_connections / total_elapsed if total_elapsed > 0 else 0
overall_event_rate = self.total_events / total_elapsed if total_elapsed > 0 else 0
return MetricsSnapshot(
active_connections=self.active_connections,
@@ -389,7 +379,7 @@ class DifyWorkflowUser(HttpUser):
# Load questions from file or use defaults
if QUESTIONS_FILE and os.path.exists(QUESTIONS_FILE):
with open(QUESTIONS_FILE, "r") as f:
with open(QUESTIONS_FILE) as f:
self.questions = [line.strip() for line in f if line.strip()]
else:
self.questions = [
@@ -451,18 +441,13 @@ class DifyWorkflowUser(HttpUser):
try:
# Validate response
if response.status_code >= 400:
error_type: ErrorType = (
"http_4xx" if response.status_code < 500 else "http_5xx"
)
error_type: ErrorType = "http_4xx" if response.status_code < 500 else "http_5xx"
metrics.record_error(error_type)
response.failure(f"HTTP {response.status_code}")
return
content_type = response.headers.get("Content-Type", "")
if (
"text/event-stream" not in content_type
and "application/json" not in content_type
):
if "text/event-stream" not in content_type and "application/json" not in content_type:
logger.error(f"Expected text/event-stream, got: {content_type}")
metrics.record_error("invalid_response")
response.failure(f"Invalid content type: {content_type}")
@@ -473,10 +458,13 @@ class DifyWorkflowUser(HttpUser):
for line in response.iter_lines(decode_unicode=True):
# Check if runner is stopping
if getattr(self.environment.runner, 'state', '') in ('stopping', 'stopped'):
if getattr(self.environment.runner, "state", "") in (
"stopping",
"stopped",
):
logger.debug("Runner stopping, breaking streaming loop")
break
if line is not None:
bytes_received += len(line.encode("utf-8"))
@@ -489,9 +477,7 @@ class DifyWorkflowUser(HttpUser):
# Track inter-event timing
if last_event_time:
inter_event_times.append(
(current_time - last_event_time) * 1000
)
inter_event_times.append((current_time - last_event_time) * 1000)
last_event_time = current_time
if first_event_time is None:
@@ -512,15 +498,11 @@ class DifyWorkflowUser(HttpUser):
parsed_event: ParsedEventData = json.loads(event_data)
# Check for terminal events
if parsed_event.get("event") in TERMINAL_EVENTS:
logger.debug(
f"Received terminal event: {parsed_event.get('event')}"
)
logger.debug(f"Received terminal event: {parsed_event.get('event')}")
request_success = True
break
except json.JSONDecodeError as e:
logger.debug(
f"JSON decode error: {e} for data: {event_data[:100]}"
)
logger.debug(f"JSON decode error: {e} for data: {event_data[:100]}")
metrics.record_error("invalid_json")
except Exception as e:
@@ -583,16 +565,18 @@ def on_test_start(environment: object, **kwargs: object) -> None:
# Periodic stats reporting
def report_stats() -> None:
if not hasattr(environment, 'runner'):
if not hasattr(environment, "runner"):
return
runner = environment.runner
while hasattr(runner, 'state') and runner.state not in ["stopped", "stopping"]:
while hasattr(runner, "state") and runner.state not in ["stopped", "stopping"]:
time.sleep(5) # Report every 5 seconds
if hasattr(runner, 'state') and runner.state == "running":
if hasattr(runner, "state") and runner.state == "running":
stats = metrics.get_stats()
# Only log on master node in distributed mode
is_master = not getattr(environment.runner, "worker_id", None) if hasattr(environment, 'runner') else True
is_master = (
not getattr(environment.runner, "worker_id", None) if hasattr(environment, "runner") else True
)
if is_master:
# Clear previous lines and show updated stats
logger.info("\n" + "=" * 80)
@@ -623,15 +607,15 @@ def on_test_start(environment: object, **kwargs: object) -> None:
logger.info(
f"{'(TTFE in ms)':<25} {stats.ttfe_avg:>15.1f} {stats.ttfe_p50:>10.1f} {stats.ttfe_p95:>10.1f} {stats.ttfe_min:>10.1f} {stats.ttfe_max:>10.1f}"
)
logger.info(f"{'Window Samples':<25} {stats.ttfe_samples:>15,d} (last {min(10000, stats.ttfe_total_samples):,d} samples)")
logger.info(
f"{'Window Samples':<25} {stats.ttfe_samples:>15,d} (last {min(10000, stats.ttfe_total_samples):,d} samples)"
)
logger.info(f"{'Total Samples':<25} {stats.ttfe_total_samples:>15,d}")
# Inter-event latency
if stats.inter_event_latency_avg > 0:
logger.info("-" * 80)
logger.info(
f"{'INTER-EVENT LATENCY':<25} {'AVG':>15} {'P50':>10} {'P95':>10}"
)
logger.info(f"{'INTER-EVENT LATENCY':<25} {'AVG':>15} {'P50':>10} {'P95':>10}")
logger.info(
f"{'(ms between events)':<25} {stats.inter_event_latency_avg:>15.1f} {stats.inter_event_latency_p50:>10.1f} {stats.inter_event_latency_p95:>10.1f}"
)
@@ -647,9 +631,9 @@ def on_test_start(environment: object, **kwargs: object) -> None:
logger.info("=" * 80)
# Show Locust stats summary
if hasattr(environment, 'stats') and hasattr(environment.stats, 'total'):
if hasattr(environment, "stats") and hasattr(environment.stats, "total"):
total = environment.stats.total
if hasattr(total, 'num_requests') and total.num_requests > 0:
if hasattr(total, "num_requests") and total.num_requests > 0:
logger.info(
f"{'LOCUST STATS':<25} {'Requests':>12} {'Fails':>8} {'Avg (ms)':>12} {'Min':>8} {'Max':>8}"
)
@@ -687,21 +671,15 @@ def on_test_stop(environment: object, **kwargs: object) -> None:
logger.info("")
logger.info("EVENTS")
logger.info(f" {'Total Events Received:':<30} {stats.total_events:>10,d}")
logger.info(
f" {'Average Throughput:':<30} {stats.overall_event_rate:>10.2f} events/s"
)
logger.info(
f" {'Final Rate (10s window):':<30} {stats.event_rate:>10.2f} events/s"
)
logger.info(f" {'Average Throughput:':<30} {stats.overall_event_rate:>10.2f} events/s")
logger.info(f" {'Final Rate (10s window):':<30} {stats.event_rate:>10.2f} events/s")
logger.info("")
logger.info("STREAM METRICS")
logger.info(f" {'Avg Stream Duration:':<30} {stats.stream_duration_avg:>10.1f} ms")
logger.info(f" {'P50 Stream Duration:':<30} {stats.stream_duration_p50:>10.1f} ms")
logger.info(f" {'P95 Stream Duration:':<30} {stats.stream_duration_p95:>10.1f} ms")
logger.info(
f" {'Avg Events per Stream:':<30} {stats.events_per_stream_avg:>10.1f}"
)
logger.info(f" {'Avg Events per Stream:':<30} {stats.events_per_stream_avg:>10.1f}")
logger.info("")
logger.info("INTER-EVENT LATENCY")
@@ -716,7 +694,9 @@ def on_test_stop(environment: object, **kwargs: object) -> None:
logger.info(f" {'95th Percentile:':<30} {stats.ttfe_p95:>10.1f} ms")
logger.info(f" {'Minimum:':<30} {stats.ttfe_min:>10.1f} ms")
logger.info(f" {'Maximum:':<30} {stats.ttfe_max:>10.1f} ms")
logger.info(f" {'Window Samples:':<30} {stats.ttfe_samples:>10,d} (last {min(10000, stats.ttfe_total_samples):,d})")
logger.info(
f" {'Window Samples:':<30} {stats.ttfe_samples:>10,d} (last {min(10000, stats.ttfe_total_samples):,d})"
)
logger.info(f" {'Total Samples:':<30} {stats.ttfe_total_samples:>10,d}")
# Error summary
@@ -730,7 +710,7 @@ def on_test_stop(environment: object, **kwargs: object) -> None:
logger.info("=" * 80 + "\n")
# Export machine-readable report (only on master node)
is_master = not getattr(environment.runner, 'worker_id', None) if hasattr(environment, 'runner') else True
is_master = not getattr(environment.runner, "worker_id", None) if hasattr(environment, "runner") else True
if is_master:
export_json_report(stats, test_duration, environment)
@@ -746,9 +726,9 @@ def export_json_report(stats: MetricsSnapshot, duration: float, environment: obj
# Access environment.stats.total attributes safely
locust_stats: LocustStats | None = None
if hasattr(environment, 'stats') and hasattr(environment.stats, 'total'):
if hasattr(environment, "stats") and hasattr(environment.stats, "total"):
total = environment.stats.total
if hasattr(total, 'num_requests') and total.num_requests > 0:
if hasattr(total, "num_requests") and total.num_requests > 0:
locust_stats = LocustStats(
total_requests=total.num_requests,
total_failures=total.num_failures,