From 8bb156cd1c5e5e2304a1ec2cbbb37cd350981d83 Mon Sep 17 00:00:00 2001 From: isaacbmiller Date: Sat, 28 Feb 2026 15:00:37 -0500 Subject: [PATCH] test: add load testing, stress tests, and integration test infrastructure Add a complete performance testing harness: - Mock OpenAI-compatible LLM server (configurable delay, error rate) - Fixture dspy-cli project with sync and async modules - Locust load test with model-routing correctness checks - Benchmark runner (single scenario) and matrix runner (delay x users) - Benchmark comparison script (assert_benchmark.py) - Stress tests: backpressure, error storms, log writer integrity - Integration tests: concurrent correctness with pytest + httpx - .gitignore entries for generated results The integration tests require a running server (not part of normal pytest runs). The load/stress tests are standalone scripts. Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com> --- .gitignore | 8 +- tests/integration/conftest.py | 54 ++++ tests/integration/test_concurrent_requests.py | 76 ++++++ tests/load/assert_benchmark.py | 55 ++++ tests/load/fixture_project/dspy.config.yaml | 24 ++ .../src/load_test_app/__init__.py | 0 .../src/load_test_app/modules/__init__.py | 0 .../load_test_app/modules/async_predict.py | 13 + .../load_test_app/modules/simple_predict.py | 10 + tests/load/locustfile.py | 78 ++++++ tests/load/mock_lm_server.py | 83 ++++++ tests/load/results/.gitkeep | 0 tests/load/run_benchmark.sh | 81 ++++++ tests/load/run_matrix.sh | 156 +++++++++++ tests/load/run_stress_tests.sh | 248 ++++++++++++++++++ tests/load/stress_backpressure.py | 152 +++++++++++ tests/load/stress_error_storm.py | 170 ++++++++++++ tests/load/stress_log_integrity.py | 170 ++++++++++++ 18 files changed, 1377 insertions(+), 1 deletion(-) create mode 100644 tests/integration/conftest.py create mode 100644 tests/integration/test_concurrent_requests.py create mode 100644 tests/load/assert_benchmark.py create mode 100644 tests/load/fixture_project/dspy.config.yaml create mode 100644 tests/load/fixture_project/src/load_test_app/__init__.py create mode 100644 tests/load/fixture_project/src/load_test_app/modules/__init__.py create mode 100644 tests/load/fixture_project/src/load_test_app/modules/async_predict.py create mode 100644 tests/load/fixture_project/src/load_test_app/modules/simple_predict.py create mode 100644 tests/load/locustfile.py create mode 100644 tests/load/mock_lm_server.py create mode 100644 tests/load/results/.gitkeep create mode 100755 tests/load/run_benchmark.sh create mode 100755 tests/load/run_matrix.sh create mode 100755 tests/load/run_stress_tests.sh create mode 100644 tests/load/stress_backpressure.py create mode 100644 tests/load/stress_error_storm.py create mode 100644 tests/load/stress_log_integrity.py diff --git a/.gitignore b/.gitignore index ad76c5e..08e7880 100644 --- a/.gitignore +++ b/.gitignore @@ -60,4 +60,10 @@ mlruns mlartifacts # Documentation -/site/ \ No newline at end of file +/site/ + +# Load test results +tests/load/results/*.csv +tests/load/results/*.html +tests/load/fixture_project/openapi.json +tests/load/fixture_project/logs/*.log \ No newline at end of file diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..d766f22 --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,54 @@ +""" +Starts mock LLM server and dspy-cli server as subprocess fixtures. +Tests in this directory require these fixtures. +""" +import subprocess +import time +import os +import sys + +import httpx +import pytest + + +MOCK_PORT = 9999 +SERVER_PORT = 8000 +FIXTURE_PROJECT = os.path.join(os.path.dirname(__file__), "..", "load", "fixture_project") + + +@pytest.fixture(scope="session", autouse=True) +def mock_lm_server(): + proc = subprocess.Popen( + [sys.executable, os.path.join(os.path.dirname(__file__), "..", "load", "mock_lm_server.py")], + env={**os.environ, "MOCK_DELAY_MS": "50", "MOCK_PORT": str(MOCK_PORT)}, + ) + # Wait for mock server to be ready + for _ in range(10): + try: + httpx.get(f"http://127.0.0.1:{MOCK_PORT}/health", timeout=1) + break + except Exception: + time.sleep(0.5) + yield proc + proc.terminate() + proc.wait() + + +@pytest.fixture(scope="session", autouse=True) +def dspy_cli_server(mock_lm_server): + proc = subprocess.Popen( + [sys.executable, "-m", "dspy_cli.server.runner", + "--port", str(SERVER_PORT), "--host", "127.0.0.1"], + cwd=FIXTURE_PROJECT, + ) + # Wait for server to be ready + for _ in range(30): + try: + resp = httpx.get(f"http://127.0.0.1:{SERVER_PORT}/programs", timeout=2) + if resp.status_code == 200: + break + except Exception: + time.sleep(1) + yield proc + proc.terminate() + proc.wait() diff --git a/tests/integration/test_concurrent_requests.py b/tests/integration/test_concurrent_requests.py new file mode 100644 index 0000000..5392152 --- /dev/null +++ b/tests/integration/test_concurrent_requests.py @@ -0,0 +1,76 @@ +""" +Concurrent correctness tests. Not a load test — verifies that responses +are correct under concurrency, not just that the server survives. + +Requires a running dspy-cli server + mock LLM. Uses the fixtures in conftest.py. +""" +import asyncio + +import httpx +import pytest + +BASE_URL = "http://127.0.0.1:8000" + + +async def make_request(client: httpx.AsyncClient, endpoint: str, question: str): + response = await client.post( + f"{BASE_URL}/{endpoint}", + json={"question": question}, + timeout=30.0 + ) + return response + + +@pytest.mark.asyncio +async def test_sync_module_concurrent_correctness(): + """20 concurrent requests to sync module should all succeed with valid responses.""" + async with httpx.AsyncClient() as client: + tasks = [ + make_request(client, "SimplePredict", f"Question {i}") + for i in range(20) + ] + responses = await asyncio.gather(*tasks) + + for i, r in enumerate(responses): + assert r.status_code == 200, f"Request {i} failed: {r.text}" + assert "answer" in r.json(), f"Request {i} missing 'answer': {r.json()}" + + +@pytest.mark.asyncio +async def test_async_module_concurrent_correctness(): + """20 concurrent requests to async module should all succeed.""" + async with httpx.AsyncClient() as client: + tasks = [ + make_request(client, "AsyncPredict", f"Question {i}") + for i in range(20) + ] + responses = await asyncio.gather(*tasks) + + for i, r in enumerate(responses): + assert r.status_code == 200, f"Request {i} failed: {r.text}" + assert "answer" in r.json(), f"Request {i} missing 'answer': {r.json()}" + + +@pytest.mark.asyncio +async def test_no_response_cross_contamination(): + """ + Verifies that concurrent requests don't bleed into each other's outputs. + Sends requests with distinct questions and checks that answers are independent. + This would catch ContextVar leakage or shared state bugs. + """ + questions = [f"Unique question {i} xyzzy" for i in range(10)] + + async with httpx.AsyncClient() as client: + tasks = [ + make_request(client, "AsyncPredict", q) + for q in questions + ] + responses = await asyncio.gather(*tasks) + + for r in responses: + assert r.status_code == 200 + data = r.json() + assert "answer" in data + # Mock server returns the same canned response, but we're verifying + # there's no exception or empty response caused by state mixing. + assert data["answer"] != "" diff --git a/tests/load/assert_benchmark.py b/tests/load/assert_benchmark.py new file mode 100644 index 0000000..8b8bf53 --- /dev/null +++ b/tests/load/assert_benchmark.py @@ -0,0 +1,55 @@ +""" +Compares two locust CSV result files and fails if performance has regressed. +Usage: python tests/load/assert_benchmark.py results/baseline_stats.csv results/current_stats.csv +""" +import csv +import sys + + +def load_stats(path): + with open(path) as f: + reader = csv.DictReader(f) + for row in reader: + if row["Name"] == "Aggregated": + return { + "rps": float(row["Requests/s"]), + "p95": float(row["95%"]), + "failures": float(row["Failure Count"]) / max(float(row["Request Count"]), 1), + } + raise ValueError(f"No 'Aggregated' row found in {path}") + + +def main(): + if len(sys.argv) != 3: + print(f"Usage: {sys.argv[0]} ") + sys.exit(2) + + baseline = load_stats(sys.argv[1]) + current = load_stats(sys.argv[2]) + + rps_change = (current["rps"] - baseline["rps"]) / baseline["rps"] + p95_change = (current["p95"] - baseline["p95"]) / baseline["p95"] + + print(f"RPS: {baseline['rps']:.1f} -> {current['rps']:.1f} ({rps_change:+.1%})") + print(f"P95 (ms): {baseline['p95']:.0f} -> {current['p95']:.0f} ({p95_change:+.1%})") + print(f"Failures: {baseline['failures']:.1%} -> {current['failures']:.1%}") + + errors = [] + if rps_change < -0.10: + errors.append(f"RPS dropped {rps_change:.1%} (threshold: -10%)") + if p95_change > 0.20: + errors.append(f"P95 increased {p95_change:.1%} (threshold: +20%)") + if current["failures"] > 0.01: + errors.append(f"Failure rate {current['failures']:.1%} > 1%") + + if errors: + print("\nREGRESSION DETECTED:") + for e in errors: + print(f" x {e}") + sys.exit(1) + + print("\nAll performance gates passed.") + + +if __name__ == "__main__": + main() diff --git a/tests/load/fixture_project/dspy.config.yaml b/tests/load/fixture_project/dspy.config.yaml new file mode 100644 index 0000000..e8d1449 --- /dev/null +++ b/tests/load/fixture_project/dspy.config.yaml @@ -0,0 +1,24 @@ +app_id: load-test-app +models: + default: model-alpha + registry: + model-alpha: + model: openai/mock-alpha + api_base: http://127.0.0.1:9999/v1 + api_key: mock-key + model_type: chat + max_tokens: 100 + temperature: 1.0 + cache: false + model-beta: + model: openai/mock-beta + api_base: http://127.0.0.1:9999/v1 + api_key: mock-key + model_type: chat + max_tokens: 100 + temperature: 1.0 + cache: false + +program_models: + SimplePredict: model-alpha + AsyncPredict: model-beta diff --git a/tests/load/fixture_project/src/load_test_app/__init__.py b/tests/load/fixture_project/src/load_test_app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/load/fixture_project/src/load_test_app/modules/__init__.py b/tests/load/fixture_project/src/load_test_app/modules/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/load/fixture_project/src/load_test_app/modules/async_predict.py b/tests/load/fixture_project/src/load_test_app/modules/async_predict.py new file mode 100644 index 0000000..82afbad --- /dev/null +++ b/tests/load/fixture_project/src/load_test_app/modules/async_predict.py @@ -0,0 +1,13 @@ +import dspy + + +class AsyncPredict(dspy.Module): + """Same as SimplePredict but with aforward. Used to test async path.""" + def __init__(self): + self.predict = dspy.Predict("question:str -> answer:str") + + def forward(self, question: str) -> dspy.Prediction: + return self.predict(question=question) + + async def aforward(self, question: str) -> dspy.Prediction: + return await self.predict.acall(question=question) diff --git a/tests/load/fixture_project/src/load_test_app/modules/simple_predict.py b/tests/load/fixture_project/src/load_test_app/modules/simple_predict.py new file mode 100644 index 0000000..7ed0d16 --- /dev/null +++ b/tests/load/fixture_project/src/load_test_app/modules/simple_predict.py @@ -0,0 +1,10 @@ +import dspy + + +class SimplePredict(dspy.Module): + """Single-predict module. Used to test sync fallback path.""" + def __init__(self): + self.predict = dspy.Predict("question:str -> answer:str") + + def forward(self, question: str) -> dspy.Prediction: + return self.predict(question=question) diff --git a/tests/load/locustfile.py b/tests/load/locustfile.py new file mode 100644 index 0000000..eba9076 --- /dev/null +++ b/tests/load/locustfile.py @@ -0,0 +1,78 @@ +""" +Locust load test for dspy-cli. + +Validates both throughput and correctness: each program must receive +responses routed through its configured model. The mock LLM server +echoes the model name back in the answer, so we can verify it. + +Single-scenario run: + locust -f tests/load/locustfile.py \ + --host http://localhost:8000 \ + --headless -u 100 -r 10 \ + --run-time 60s \ + --csv results/test + +Matrix run (preferred): + bash tests/load/run_matrix.sh +""" +import uuid +from locust import HttpUser, task, between, events + + +def unique_payload(): + """Generate a unique question per request to defeat any caching layer.""" + return {"question": f"What is the capital of France? [{uuid.uuid4().hex[:8]}]"} + + +class SyncModuleUser(HttpUser): + """Hits the sync-fallback module (no aforward). Expects model-alpha.""" + wait_time = between(0.01, 0.1) + weight = 1 + + @task + def call_simple_predict(self): + with self.client.post( + "/SimplePredict", + json=unique_payload(), + catch_response=True + ) as response: + if response.status_code != 200: + response.failure(f"Got {response.status_code}: {response.text[:200]}") + return + body = response.json() + answer = body.get("answer", "") + if "model=mock-alpha" not in answer: + response.failure( + f"SimplePredict model mismatch: expected mock-alpha, got: {answer[:100]}" + ) + + +class AsyncModuleUser(HttpUser): + """Hits the native async module (has aforward). Expects model-beta.""" + wait_time = between(0.01, 0.1) + weight = 1 + + @task + def call_async_predict(self): + with self.client.post( + "/AsyncPredict", + json=unique_payload(), + catch_response=True + ) as response: + if response.status_code != 200: + response.failure(f"Got {response.status_code}: {response.text[:200]}") + return + body = response.json() + answer = body.get("answer", "") + if "model=mock-beta" not in answer: + response.failure( + f"AsyncPredict model mismatch: expected mock-beta, got: {answer[:100]}" + ) + + +@events.quitting.add_listener +def on_quit(environment, **kwargs): + """Fail CI if error rate exceeds threshold.""" + if environment.runner.stats.total.fail_ratio > 0.01: + print(f"ERROR: Failure rate {environment.runner.stats.total.fail_ratio:.1%} > 1%") + environment.process_exit_code = 1 diff --git a/tests/load/mock_lm_server.py b/tests/load/mock_lm_server.py new file mode 100644 index 0000000..4f9282e --- /dev/null +++ b/tests/load/mock_lm_server.py @@ -0,0 +1,83 @@ +""" +Minimal OpenAI-compatible mock server for load testing. + +Echoes the requested model name back in the answer so load tests can +verify that per-program model routing is correct under concurrency. + +Environment variables: + MOCK_PORT - Port to listen on (default: 9999) + MOCK_DELAY_MS - Simulated LLM latency in ms (default: 50) + MOCK_ERROR_RATE - Fraction of requests that return 500 (0.0-1.0, default: 0.0) +""" +import asyncio +import os +import random +import time + +import uvicorn +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse + +app = FastAPI() + +MOCK_DELAY_MS = 50 + + +@app.post("/v1/chat/completions") +async def chat(request: Request): + """Accept any JSON body — no strict schema validation. + + LiteLLM sends varying extra fields (stream, n, tools, etc.) + depending on the call path. A strict Pydantic model rejects those. + + The response embeds the requested model name in the answer field + so callers can verify the correct model was routed. + """ + body = await request.json() + model = body.get("model", "unknown") + delay = float(os.environ.get("MOCK_DELAY_MS", MOCK_DELAY_MS)) / 1000 + await asyncio.sleep(delay) + + # Simulate LLM errors + error_rate = float(os.environ.get("MOCK_ERROR_RATE", "0.0")) + if error_rate > 0 and random.random() < error_rate: + return JSONResponse( + status_code=500, + content={ + "error": { + "message": "Mock LLM internal error", + "type": "server_error", + "code": "internal_error", + } + }, + ) + + return { + "id": "mock-completion", + "object": "chat.completion", + "created": int(time.time()), + "model": model, + "choices": [{ + "index": 0, + "message": { + "role": "assistant", + "content": f'[[ ## answer ## ]]\nmodel={model}\n\n[[ ## completed ## ]]' + }, + "finish_reason": "stop" + }], + "usage": { + "prompt_tokens": 20, + "completion_tokens": 10, + "total_tokens": 30 + } + } + + +@app.get("/health") +async def health(): + return {"status": "ok"} + + +if __name__ == "__main__": + port = int(os.environ.get("MOCK_PORT", 9999)) + uvicorn.run(app, host="127.0.0.1", port=port) diff --git a/tests/load/results/.gitkeep b/tests/load/results/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/load/run_benchmark.sh b/tests/load/run_benchmark.sh new file mode 100755 index 0000000..553d0c3 --- /dev/null +++ b/tests/load/run_benchmark.sh @@ -0,0 +1,81 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Single-scenario benchmark runner. +# For multi-scenario matrix, use run_matrix.sh instead. + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" +RESULTS_DIR="$SCRIPT_DIR/results" +MOCK_PORT=${MOCK_PORT:-9999} +SERVER_PORT=${SERVER_PORT:-8000} +USERS=${USERS:-100} +SPAWN_RATE=${SPAWN_RATE:-10} +DURATION=${DURATION:-60s} +MOCK_DELAY_MS=${MOCK_DELAY_MS:-500} +LABEL=${LABEL:-"$(git -C "$REPO_ROOT" rev-parse --short HEAD)"} + +mkdir -p "$RESULTS_DIR" + +kill_port() { + # Kill ALL processes listening on a port, including children + lsof -ti:"$1" 2>/dev/null | xargs kill -9 2>/dev/null || true +} + +cleanup() { + echo "Cleaning up..." + kill_port $MOCK_PORT + kill_port $SERVER_PORT + sleep 1 +} +trap cleanup EXIT + +# 0. Ensure ports are free before starting +kill_port $MOCK_PORT +kill_port $SERVER_PORT +sleep 1 + +# 1. Start mock LLM server +echo "Starting mock LLM server on :$MOCK_PORT (delay=${MOCK_DELAY_MS}ms)..." +MOCK_DELAY_MS=$MOCK_DELAY_MS MOCK_PORT=$MOCK_PORT python "$SCRIPT_DIR/mock_lm_server.py" & +sleep 1 + +if ! curl -sf http://127.0.0.1:$MOCK_PORT/health > /dev/null; then + echo "ERROR: Mock LLM server failed to start" + exit 1 +fi +echo "Mock LLM server ready." + +# 2. Start dspy-cli server against fixture project +echo "Starting dspy-cli server on :$SERVER_PORT..." +pushd "$SCRIPT_DIR/fixture_project" > /dev/null +dspy-cli serve --port $SERVER_PORT --no-reload --no-save-openapi --system & +popd > /dev/null +sleep 3 + +# 3. Wait for server health +echo "Waiting for server..." +for i in {1..20}; do + if curl -sf http://127.0.0.1:$SERVER_PORT/programs > /dev/null; then + echo "Server ready." + break + fi + if [ $i -eq 20 ]; then + echo "ERROR: Server failed to start within 20s" + exit 1 + fi + sleep 1 +done + +# 4. Run load test +echo "Running load test (users=$USERS, delay=${MOCK_DELAY_MS}ms, duration=$DURATION)..." +locust -f "$SCRIPT_DIR/locustfile.py" \ + --host http://127.0.0.1:$SERVER_PORT \ + --headless \ + -u $USERS -r $SPAWN_RATE \ + --run-time $DURATION \ + --csv "$RESULTS_DIR/$LABEL" \ + --html "$RESULTS_DIR/$LABEL.html" + +echo "Results written to $RESULTS_DIR/$LABEL*.csv" +echo "Done." diff --git a/tests/load/run_matrix.sh b/tests/load/run_matrix.sh new file mode 100755 index 0000000..b7c4f21 --- /dev/null +++ b/tests/load/run_matrix.sh @@ -0,0 +1,156 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Matrix benchmark: runs multiple delay x user-count scenarios. +# +# Each scenario boots fresh servers to avoid cross-contamination. +# Results go to tests/load/results/