diff --git a/.claude/skills/dspy-cli/SKILL.md b/.claude/skills/dspy-cli/SKILL.md new file mode 100644 index 0000000..54de543 --- /dev/null +++ b/.claude/skills/dspy-cli/SKILL.md @@ -0,0 +1,271 @@ +--- +name: dspy-cli +description: Reference for using dspy-cli to create, develop, test, and deploy DSPy projects. Use when working in a dspy-cli project or when the user needs help with dspy-cli commands, configuration, modules, gateways, or deployment. +--- + +# dspy-cli + +CLI tool for creating and serving [DSPy](https://dspy.ai) projects. Handles scaffolding, module discovery, and running a FastAPI server that exposes DSPy modules as HTTP endpoints. + +```bash +dspy-cli new my-project # Create project +cd my-project +dspy-cli g scaffold analyzer -m CoT -s "text -> summary" # Add program +dspy-cli serve # Dev server +dspy-cli serve --no-reload --auth # Production +``` + +## Commands + +### `dspy-cli new [project_name]` + +Interactive if name omitted. Options: + +- `-p, --program-name` — Initial program name (default: derived from project) +- `-s, --signature` — Inline signature (default: `"question -> answer"`) +- `-m, --module-type` — Module type: `Predict`, `CoT`, `ReAct`, `PoT`, `Refine`, `MultiChainComparison` +- `--model` — LiteLLM model string (e.g. `openai/gpt-4o`) +- `--api-key` — Stored in `.env` + +### `dspy-cli g scaffold ` + +Generate signature + module. Alias: `generate scaffold`. + +- `-m, --module` — Module type (default: `Predict`) +- `-s, --signature` — Inline signature + +Creates: `src//signatures/.py` and `src//modules/_.py` + +### `dspy-cli g signature ` + +Signature file only. `-s` for inline signature. + +### `dspy-cli g module ` + +Module file only. `-m` for module type. + +### `dspy-cli g gateway ` + +- `-t, --type` — `api` (default) or `cron` +- `-p, --path` — Custom HTTP path (API gateways) +- `-s, --schedule` — Cron expression (default: `"0 * * * *"`) +- `--public/--private` — Auth requirement (default: `--private`) + +### `dspy-cli serve` + +Alias: `s`. Start FastAPI server. + +- `--port` (8000), `--host` (0.0.0.0) +- `--reload/--no-reload` (default: reload on) +- `--auth/--no-auth` — Bearer token via `DSPY_API_KEY` +- `--mcp` — Enable Model Context Protocol at `/mcp` +- `--sync-workers` — Thread pool size (default: `min(32, cpu+4)`) +- `--save-openapi/--no-save-openapi`, `--openapi-format json|yaml` +- `--python` — Path to interpreter, `--system` — Use system Python +- `--logs-dir` — Inference log directory (default: `./logs`) + +## Project Structure + +``` +my-project/ +├── src/my_project/ +│ ├── modules/ # DSPy modules (auto-discovered) +│ ├── signatures/ # Signature definitions +│ ├── gateways/ # API/Cron gateways (optional) +│ ├── optimizers/ # Optimizer configs +│ ├── metrics/ # Evaluation metrics +│ └── utils/ +├── logs/ # JSONL inference logs (one per program) +├── dspy.config.yaml # Model registry + server config +├── .env # API keys (gitignored) +├── pyproject.toml +└── Dockerfile +``` + +**Naming:** project dirs use hyphens (`blog-categorizer`), packages use underscores (`blog_categorizer`), files lowercase, classes PascalCase. + +## Configuration + +### dspy.config.yaml + +```yaml +app_id: my-project + +models: + default: my-model + registry: + my-model: + model: openai/gpt-4o-mini # LiteLLM model string + api_key: ${{ env.OPENAI_API_KEY }} + # model_type: chat max_tokens: 1000 temperature: 0.7 cache: false + local: + model: openai/llama3 + api_base: http://localhost:11434/v1 # Ollama, vLLM, etc. + api_key: not-needed + +program_models: # Per-program model overrides + ExpensiveProgram: claude-opus + +server: # All optional + sync_worker_threads: 16 + max_concurrent_per_program: 20 # 429 when exceeded + cors_origins: "*" # Or: ["https://app.example.com"] +``` + +### .env + +```bash +OPENAI_API_KEY=sk-... +ANTHROPIC_API_KEY=sk-ant-... +DSPY_API_KEY=... # For --auth mode +DSPY_CORS_ORIGINS=* # Overrides config +``` + +## Module Types + +| `-m` value | Suffix | DSPy class | +|-----------|--------|------------| +| `Predict` | `_predict` | `dspy.Predict` | +| `CoT` | `_cot` | `dspy.ChainOfThought` | +| `PoT` | `_pot` | `dspy.ProgramOfThought` | +| `ReAct` | `_react` | `dspy.ReAct` | +| `MultiChainComparison` | `_mcc` | `dspy.MultiChainComparison` | +| `Refine` | `_refine` | `dspy.Refine` | + +## Signatures + +Pattern: `inputs -> outputs`. Types: `str`, `int`, `float`, `bool`, `list[str]`, `dict[str, int]`, `dspy.Image`. + +``` +"question -> answer" +"text -> summary, sentiment: bool" +"context: list[str], question -> answer, confidence: float" +``` + +## Module Discovery + +Server scans `src//modules/*.py`. Valid modules must: +1. Subclass `dspy.Module` +2. Be defined in that file (not imported) +3. Not start with `_` +4. Have `forward()` + +No `pip install -e .` needed — uses `importlib.util.spec_from_file_location`. + +```python +import dspy + +class QA(dspy.Module): + def __init__(self): + self.predict = dspy.Predict("question -> answer") + + def forward(self, question: str) -> dspy.Prediction: + return self.predict(question=question) +``` + +### Async support + +Add `aforward()` to bypass the thread pool (runs on event loop directly): + +```python +async def aforward(self, question: str) -> dspy.Prediction: + return await self.predict.acall(question=question) +``` + +`forward()` is required by DSPy's `__call__` (optimizers, notebooks, scripts depend on it). `aforward()` is server-only. If you keep both, logic must stay in sync. + +## Server Endpoints + +- `POST /{ProgramName}` — Execute program +- `POST /{ProgramName}/{GatewayName}` — Execute through gateway +- `GET /programs` — List programs +- `GET /health/live` — Liveness (200 if running) +- `GET /health/ready` — Readiness (200 when LMs init'd, 503 otherwise) +- `GET /openapi.json` — OpenAPI spec +- `GET /` — Web UI +- `GET /api/metrics` — Metrics (`?sort_by=calls&order=desc`) +- `GET /api/logs/{program_name}` — Inference logs + +## Gateways + +### API Gateway + +```python +from dspy_cli.gateway import APIGateway + +class SlackWebhook(APIGateway): + path = "/webhooks/slack" + requires_auth = False + + def to_pipeline_inputs(self, request): + return {"text": request["event"]["text"]} + + def from_pipeline_output(self, output): + return {"text": output["response"]} + +class MyModule(dspy.Module): + gateway = SlackWebhook() +``` + +### Cron Gateway + +```python +from dspy_cli.gateway import CronGateway + +class HourlyCheck(CronGateway): + schedule = "0 * * * *" + use_batch = True + num_threads = 4 + + async def get_pipeline_inputs(self): + return [{"text": item["content"], "_meta": {"id": item["id"]}} + for item in await fetch_pending()] + + async def on_complete(self, inputs, output): + await update_record(inputs["_meta"]["id"], output) + +class MyModule(dspy.Module): + gateway = HourlyCheck() +``` + +## Concurrency + +- **Sync modules:** Bounded thread pool (`min(32, cpu+4)` default). `dspy.context()` propagates into workers. +- **Async modules:** Event loop direct. No thread overhead. +- **Backpressure:** Per-program semaphore (default 20). Queues up to 30s, then 429. + +## Auth + +`dspy-cli serve --auth` — requires `DSPY_API_KEY` env var. Auto-generates if unset. + +```bash +curl -H "Authorization: Bearer $DSPY_API_KEY" http://localhost:8000/MyProgram \ + -H "Content-Type: application/json" -d '{"question": "hello"}' +``` + +Web UI uses session cookies via `/login`. + +## Testing + +```bash +uv sync --dev && pytest +``` + +```python +import dspy +from my_project.modules.qa_predict import QAPredict + +dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini")) +result = QAPredict()(question="What is DSPy?") +``` + +## Deployment + +```bash +docker build -t my-project . +docker run -p 8000:8000 --env-file .env my-project +``` + +Generated Dockerfile: Python 3.11 slim, `uv sync`, serves with `--auth --no-reload`. + +Production flags: `--no-reload --auth --host 0.0.0.0 --sync-workers N` diff --git a/.gitignore b/.gitignore index ad76c5e..08e7880 100644 --- a/.gitignore +++ b/.gitignore @@ -60,4 +60,10 @@ mlruns mlartifacts # Documentation -/site/ \ No newline at end of file +/site/ + +# Load test results +tests/load/results/*.csv +tests/load/results/*.html +tests/load/fixture_project/openapi.json +tests/load/fixture_project/logs/*.log \ No newline at end of file diff --git a/dspy-cli-performance-plan.md b/dspy-cli-performance-plan.md new file mode 100644 index 0000000..6c37699 --- /dev/null +++ b/dspy-cli-performance-plan.md @@ -0,0 +1,1031 @@ +# dspy-cli Performance Implementation Plan + +## Overview + +This plan is organized into three phases. Phase 1 builds the testing infrastructure and establishes a benchmark against the current codebase — nothing gets changed yet. Phase 2 applies the quick wins that are low-risk and high-impact. Phase 3 implements the Django-style async routing and supporting infrastructure in the right order, with the test suite verifying each change. + +All tasks assume work is happening inside the `dspy-cli` repo. New test files go under `tests/`. + +--- + +## Phase 1 — Testing Harness & Baseline Benchmark + +The goal of this phase is a repeatable, automated stress test that runs against a live `dspy-cli serve` instance with a mocked LLM backend. The mock backend is critical: it removes the upstream provider as a variable so you're measuring dspy-cli's own overhead, not OpenAI's response time. + +### Task 1.1 — Mock LLM Backend + +Create a minimal FastAPI server that speaks the OpenAI chat completions API format and returns immediately. This stands in for the real LLM during load tests. + +**File:** `tests/load/mock_lm_server.py` + +```python +""" +Minimal OpenAI-compatible mock server for load testing. +Returns a canned response immediately with configurable delay. +""" +import asyncio +import time +import uvicorn +from fastapi import FastAPI +from pydantic import BaseModel +from typing import Any + +app = FastAPI() + +MOCK_DELAY_MS = 50 # Simulate minimal LLM latency. Set via env var MOCK_DELAY_MS. + +class ChatRequest(BaseModel): + model: str + messages: list[dict[str, Any]] + max_tokens: int = 100 + temperature: float = 1.0 + +@app.post("/v1/chat/completions") +async def chat(request: ChatRequest): + delay = float(__import__("os").environ.get("MOCK_DELAY_MS", MOCK_DELAY_MS)) / 1000 + await asyncio.sleep(delay) + return { + "id": "mock-completion", + "object": "chat.completion", + "created": int(time.time()), + "model": request.model, + "choices": [{ + "index": 0, + "message": { + "role": "assistant", + "content": '[[ ## answer ## ]]\nMock answer.\n\n[[ ## completed ## ]]' + }, + "finish_reason": "stop" + }], + "usage": { + "prompt_tokens": 20, + "completion_tokens": 10, + "total_tokens": 30 + } + } + +@app.get("/health") +async def health(): + return {"status": "ok"} + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=9999) +``` + +**Notes:** +- `MOCK_DELAY_MS=50` simulates a fast local model. Set to `500` to simulate a real API call. +- `MOCK_DELAY_MS=0` tests pure server overhead with no LLM latency. +- The response format matches the DSPy ChatAdapter's expected format exactly (`[[ ## field ## ]]` delimiters). + +--- + +### Task 1.2 — Fixture DSPy CLI Project + +Create a minimal dspy-cli project used exclusively for load testing. It lives under `tests/load/fixture_project/` and is checked into the repo. + +**File:** `tests/load/fixture_project/dspy.config.yaml` + +```yaml +app_id: load-test-app +models: + default: mock:local + registry: + mock:local: + model: openai/mock-gpt + api_base: http://localhost:9999/v1 + api_key: mock-key + model_type: chat + max_tokens: 100 + temperature: 1.0 +``` + +**File:** `tests/load/fixture_project/src/load_test_app/modules/simple_predict.py` + +```python +import dspy + +class SimplePredict(dspy.Module): + """Single-predict module. Used to test sync fallback path.""" + def __init__(self): + self.predict = dspy.Predict("question:str -> answer:str") + + def forward(self, question: str) -> dspy.Prediction: + return self.predict(question=question) +``` + +**File:** `tests/load/fixture_project/src/load_test_app/modules/async_predict.py` + +```python +import dspy + +class AsyncPredict(dspy.Module): + """Same as SimplePredict but with aforward. Used to test async path.""" + def __init__(self): + self.predict = dspy.Predict("question:str -> answer:str") + + def forward(self, question: str) -> dspy.Prediction: + return self.predict(question=question) + + async def aforward(self, question: str) -> dspy.Prediction: + return await self.predict.acall(question=question) +``` + +Two modules lets you directly compare sync vs async paths under identical load. + +--- + +### Task 1.3 — Load Test Script + +**File:** `tests/load/locustfile.py` + +```python +""" +Locust load test for dspy-cli. + +Run with: + locust -f tests/load/locustfile.py \ + --host http://localhost:8000 \ + --headless -u 50 -r 5 \ + --run-time 60s \ + --csv results/baseline +""" +import os +from locust import HttpUser, task, between, events +from locust.runners import MasterRunner + + +QUESTION_PAYLOAD = {"question": "What is the capital of France?"} + + +class SyncModuleUser(HttpUser): + """Hits the sync-fallback module (no aforward).""" + wait_time = between(0.01, 0.1) + weight = 1 + + @task + def call_simple_predict(self): + with self.client.post( + "/SimplePredict", + json=QUESTION_PAYLOAD, + catch_response=True + ) as response: + if response.status_code != 200: + response.failure(f"Got {response.status_code}: {response.text[:200]}") + elif "answer" not in response.json(): + response.failure("Missing 'answer' in response") + + +class AsyncModuleUser(HttpUser): + """Hits the native async module (has aforward).""" + wait_time = between(0.01, 0.1) + weight = 1 + + @task + def call_async_predict(self): + with self.client.post( + "/AsyncPredict", + json=QUESTION_PAYLOAD, + catch_response=True + ) as response: + if response.status_code != 200: + response.failure(f"Got {response.status_code}: {response.text[:200]}") + elif "answer" not in response.json(): + response.failure("Missing 'answer' in response") + + +@events.quitting.add_listener +def on_quit(environment, **kwargs): + """Fail CI if error rate exceeds threshold.""" + if environment.runner.stats.total.fail_ratio > 0.01: + print(f"ERROR: Failure rate {environment.runner.stats.total.fail_ratio:.1%} > 1%") + environment.process_exit_code = 1 +``` + +--- + +### Task 1.4 — Orchestration Script + +A single script that boots everything, runs the test, captures results, and tears down. This is what CI runs. + +**File:** `tests/load/run_benchmark.sh` + +```bash +#!/usr/bin/env bash +set -euo pipefail + +# Config +RESULTS_DIR="tests/load/results" +MOCK_PORT=9999 +SERVER_PORT=8000 +USERS=${USERS:-50} +SPAWN_RATE=${SPAWN_RATE:-5} +DURATION=${DURATION:-60s} +LABEL=${LABEL:-"$(git rev-parse --short HEAD)"} + +mkdir -p "$RESULTS_DIR" + +# 1. Start mock LLM server +echo "Starting mock LLM server on :$MOCK_PORT..." +MOCK_DELAY_MS=50 python tests/load/mock_lm_server.py & +MOCK_PID=$! +sleep 1 + +# 2. Start dspy-cli server against fixture project +echo "Starting dspy-cli server on :$SERVER_PORT..." +pushd tests/load/fixture_project +dspy-cli serve --port $SERVER_PORT --no-reload --system & +SERVER_PID=$! +popd +sleep 3 + +# 3. Wait for server health +echo "Waiting for server..." +for i in {1..20}; do + if curl -sf http://localhost:$SERVER_PORT/programs > /dev/null; then + echo "Server ready." + break + fi + sleep 1 +done + +# 4. Run load test +echo "Running load test (users=$USERS, duration=$DURATION)..." +locust -f tests/load/locustfile.py \ + --host http://localhost:$SERVER_PORT \ + --headless \ + -u $USERS -r $SPAWN_RATE \ + --run-time $DURATION \ + --csv "$RESULTS_DIR/$LABEL" \ + --html "$RESULTS_DIR/$LABEL.html" + +# 5. Teardown +kill $SERVER_PID $MOCK_PID 2>/dev/null || true +wait $SERVER_PID $MOCK_PID 2>/dev/null || true + +echo "Results written to $RESULTS_DIR/$LABEL*.csv" +echo "Done." +``` + +--- + +### Task 1.5 — Pytest Integration for CI + +Separate from the load test (which needs a running server), add a pytest-based integration test that verifies correctness under moderate concurrency. This runs in normal `pytest` without the locust dependency. + +**File:** `tests/integration/test_concurrent_requests.py` + +```python +""" +Concurrent correctness tests. Not a load test — verifies that responses +are correct under concurrency, not just that the server survives. + +Requires a running dspy-cli server + mock LLM. Use the fixture in conftest.py. +""" +import asyncio +import httpx +import pytest + +BASE_URL = "http://localhost:8000" + + +async def make_request(client: httpx.AsyncClient, endpoint: str, question: str): + response = await client.post( + f"{BASE_URL}/{endpoint}", + json={"question": question}, + timeout=30.0 + ) + return response + + +@pytest.mark.asyncio +async def test_sync_module_concurrent_correctness(): + """20 concurrent requests to sync module should all succeed with valid responses.""" + async with httpx.AsyncClient() as client: + tasks = [ + make_request(client, "SimplePredict", f"Question {i}") + for i in range(20) + ] + responses = await asyncio.gather(*tasks) + + for i, r in enumerate(responses): + assert r.status_code == 200, f"Request {i} failed: {r.text}" + assert "answer" in r.json(), f"Request {i} missing 'answer': {r.json()}" + + +@pytest.mark.asyncio +async def test_async_module_concurrent_correctness(): + """20 concurrent requests to async module should all succeed.""" + async with httpx.AsyncClient() as client: + tasks = [ + make_request(client, "AsyncPredict", f"Question {i}") + for i in range(20) + ] + responses = await asyncio.gather(*tasks) + + for i, r in enumerate(responses): + assert r.status_code == 200, f"Request {i} failed: {r.text}" + assert "answer" in r.json(), f"Request {i} missing 'answer': {r.json()}" + + +@pytest.mark.asyncio +async def test_no_response_cross_contamination(): + """ + Verifies that concurrent requests don't bleed into each other's outputs. + Sends requests with distinct questions and checks that answers are independent. + This would catch ContextVar leakage or shared state bugs. + """ + questions = [f"Unique question {i} xyzzy" for i in range(10)] + + async with httpx.AsyncClient() as client: + tasks = [ + make_request(client, "AsyncPredict", q) + for q in questions + ] + responses = await asyncio.gather(*tasks) + + for r in responses: + assert r.status_code == 200 + data = r.json() + assert "answer" in data + # Mock server returns the same canned response, but we're verifying + # there's no exception or empty response caused by state mixing. + assert data["answer"] != "" +``` + +**File:** `tests/integration/conftest.py` + +```python +""" +Starts mock LLM server and dspy-cli server as subprocess fixtures. +Tests in this directory require these fixtures. +""" +import subprocess +import time +import httpx +import pytest +import os + + +@pytest.fixture(scope="session", autouse=True) +def mock_lm_server(): + proc = subprocess.Popen( + ["python", "tests/load/mock_lm_server.py"], + env={**os.environ, "MOCK_DELAY_MS": "50"} + ) + time.sleep(1) + yield proc + proc.terminate() + proc.wait() + + +@pytest.fixture(scope="session", autouse=True) +def dspy_cli_server(mock_lm_server): + proc = subprocess.Popen( + ["dspy-cli", "serve", "--port", "8000", "--no-reload", "--system"], + cwd="tests/load/fixture_project" + ) + # Wait for server to be ready + for _ in range(20): + try: + httpx.get("http://localhost:8000/programs", timeout=1) + break + except Exception: + time.sleep(0.5) + yield proc + proc.terminate() + proc.wait() +``` + +--- + +### Task 1.6 — Capture Baseline + +Run the benchmark script against `main` before any code changes and commit the CSV output. + +```bash +LABEL="baseline" bash tests/load/run_benchmark.sh +git add tests/load/results/baseline*.csv tests/load/results/baseline.html +git commit -m "perf: capture baseline benchmark" +``` + +Key numbers to record from the CSV: + +| Metric | Where in CSV | +|---|---| +| Requests/sec (RPS) at 50 users | `_stats.csv` → `Requests/s` | +| Median response time | `_stats.csv` → `50%` | +| P95 response time | `_stats.csv` → `95%` | +| Failure rate | `_stats.csv` → `Failure Count / Request Count` | +| RPS at saturation (where failures start) | Increase `-u` until failure rate climbs | + +--- + +## Phase 2 — Quick Wins (No Architecture Changes) + +These changes are safe, small, and can be shipped before the async routing work. Run the benchmark after each one. + +--- + +### Task 2.1 — Disable Global History in Production + +**File:** `src/dspy_cli/server/runner.py` (or wherever `create_app` is called) + +Add this at server startup, before any request is handled: + +```python +import dspy + +# GLOBAL_HISTORY is a plain list with no locking. +# Under concurrent async requests, update_history() is a race condition. +# We capture what we need in the JSONL logs; global history adds no value in production. +dspy.settings.configure(disable_history=True) +``` + +This eliminates the most concrete data-race bug identified in the codebase. It should have no visible effect on behavior but will be measurable under concurrency stress as fewer intermittent errors. + +--- + +### Task 2.2 — Disable Hot Reload in the Generated Dockerfile + +The generated Dockerfile from `dspy-cli new` currently produces a container that starts with `--reload` on by default (or inherits the default). Hot reload launches a filesystem watcher subprocess that restarts the server on any file change. In a container, that's a silent footgun. + +**File:** `src/dspy_cli/templates/code_templates/` (wherever the Dockerfile template lives) + +In the Dockerfile `CMD`: +```dockerfile +# Before +CMD ["dspy-cli", "serve", "--host", "0.0.0.0"] + +# After +CMD ["dspy-cli", "serve", "--host", "0.0.0.0", "--no-reload"] +``` + +Also update the serve command help text to make clear `--reload` is a development flag. + +--- + +### Task 2.3 — Document `--no-reload` Prominently + +Audit the README and docs. Anywhere the Docker or production deployment is described, add an explicit note that `--reload` must be disabled. Low effort, prevents user mistakes. + +--- + +## Phase 3 — Django-Style Async Routing + +This is the core work. The sequence below is ordered so that each task is testable in isolation and doesn't break the next task's assumptions. + +--- + +### Task 3.1 — Bounded Executor Infrastructure + +Before changing the route creation logic, create the executor infrastructure it will depend on. + +**New file:** `src/dspy_cli/server/executor.py` + +```python +""" +Bounded thread pool executor for sync DSPy module execution. + +Why bounded: the natural backpressure limit for LLM calls is the upstream +rate limit. A bounded executor makes this limit explicit and configurable +rather than relying on Uvicorn's opaque thread pool default (40 threads). + +Default of 10 workers is conservative — tune up based on provider rate limits +and measured concurrency in your environment. +""" +import asyncio +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Callable + +_executor: ThreadPoolExecutor | None = None + + +def get_executor(max_workers: int = 10) -> ThreadPoolExecutor: + """Return the process-wide bounded executor, creating it if needed.""" + global _executor + if _executor is None: + _executor = ThreadPoolExecutor( + max_workers=max_workers, + thread_name_prefix="dspy-sync-worker" + ) + return _executor + + +def shutdown_executor(): + """Gracefully shutdown the executor. Call on server shutdown.""" + global _executor + if _executor is not None: + _executor.shutdown(wait=True) + _executor = None + + +async def run_sync_in_executor(fn: Callable, *args: Any, **kwargs: Any) -> Any: + """ + Run a sync callable in the bounded thread pool without blocking the event loop. + + ContextVars (including dspy.context overrides) are propagated into the thread + automatically by asyncio.get_event_loop().run_in_executor via the current + context snapshot. This means `with dspy.context(lm=request_lm)` set before + calling this function will be visible inside `fn`. + """ + loop = asyncio.get_event_loop() + executor = get_executor() + + if kwargs: + # run_in_executor doesn't accept kwargs; wrap in a lambda + return await loop.run_in_executor(executor, lambda: fn(*args, **kwargs)) + return await loop.run_in_executor(executor, fn, *args) +``` + +**Update server lifespan** in `app.py` to shut down the executor on server stop: + +```python +from dspy_cli.server.executor import shutdown_executor + +@asynccontextmanager +async def lifespan(app: FastAPI): + # ... existing startup ... + yield + # ... existing shutdown ... + shutdown_executor() +``` + +**Add to config** (`dspy.config.yaml` schema and loader): + +```yaml +server: + sync_worker_threads: 10 # Max concurrent sync module executions +``` + +--- + +### Task 3.2 — Module Async Detection at Discovery Time + +Rather than re-checking `hasattr(instance, 'aforward')` on every request, detect it once at discovery time and store it on `DiscoveredModule`. This is also where the distinction between "has user-implemented aforward" vs "inherits aforward from base class" matters. + +**File:** `src/dspy_cli/discovery/module_finder.py` + +Add a field to `DiscoveredModule`: + +```python +@dataclass +class DiscoveredModule: + # ... existing fields ... + has_native_async: bool = False # True only if user implemented aforward (not just inherited) +``` + +In the module discovery logic, after loading the class: + +```python +def _has_user_implemented_aforward(cls) -> bool: + """ + Returns True only if the module's own class (not a parent) defines aforward. + + This is the important distinction: all dspy.Module subclasses inherit a base + aforward from Predict, but if the user hasn't overridden it in their Module, + their forward() logic doesn't run in the async path — only the inner predict + does. We need user-level aforward to trust the full async path. + """ + # Check if 'aforward' is defined directly on this class (not inherited) + return "aforward" in cls.__dict__ +``` + +The distinction matters because `SimplePredict` (no user `aforward`) still technically has `.aforward` via the `Predict` sub-module, but calling `acall()` on the outer module would still run sync `forward()` logic wrapping the async `predict`. You want to detect user intent, not just method existence. + +--- + +### Task 3.3 — Update `execute_pipeline` Dispatch Logic + +This is the core change. Replace the current `hasattr(instance, 'aforward')` check with `module.has_native_async`, and add the `run_sync_in_executor` fallback. + +**File:** `src/dspy_cli/server/execution.py` + +```python +from dspy_cli.server.executor import run_sync_in_executor + +async def execute_pipeline( + *, + module: DiscoveredModule, + instance: dspy.Module, + lm: dspy.LM, + model_name: str, + program_name: str, + inputs: Dict[str, Any], + logs_dir: Path, +) -> Dict[str, Any]: + + start_time = time.time() + request_lm = lm.copy() + + try: + logger.info(f"Executing {program_name} with inputs: {inputs}") + + with dspy.context(lm=request_lm): + if module.has_native_async: + # Native async path: LM HTTP calls are awaited, event loop is free. + result = await instance.acall(**inputs) + else: + # Sync fallback: dispatch to bounded thread pool. + # dspy.context ContextVar propagates into the thread via asyncio's + # context snapshot mechanism (PEP 567). + result = await run_sync_in_executor(instance, **inputs) + + # ... rest unchanged ... +``` + +**Write a test for the dispatch logic specifically:** + +**File:** `tests/unit/test_execution_dispatch.py` + +```python +""" +Tests that the right execution path is chosen based on module.has_native_async. +Uses a mock module to avoid needing a real LM. +""" +import asyncio +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + + +def test_sync_module_dispatches_to_executor(fixture_sync_module, fixture_lm): + """Sync modules should use run_sync_in_executor, not acall.""" + with patch("dspy_cli.server.execution.run_sync_in_executor") as mock_executor: + mock_executor.return_value = asyncio.coroutine(lambda: {"answer": "ok"})() + # ... call execute_pipeline and assert mock_executor was called + + +def test_async_module_uses_acall(fixture_async_module, fixture_lm): + """Async modules should use instance.acall, not the executor.""" + # ... +``` + +--- + +### Task 3.4 — Update `execute_pipeline_batch` to Match + +The batch execution path in `execute_pipeline_batch` has its own dispatch. Apply the same `has_native_async` check there for consistency. + +--- + +### Task 3.5 — Expose `sync_worker_threads` in Config and CLI + +Wire up the `sync_worker_threads` config value through the full stack: + +1. Read from `dspy.config.yaml` in the config loader +2. Pass through `create_app()` → `get_executor(max_workers=...)` +3. Add `--sync-workers N` CLI flag to `dspy-cli serve` as an override +4. Log the value at startup: `"Sync executor: N threads for sync module dispatch"` + +This makes the limit visible and tunable without code changes. + +--- + +### Task 3.6 — Add `aforward` to `generate scaffold` Template + +When a user runs `dspy-cli generate scaffold mymodule -s "question -> answer"`, the generated module should include an `aforward` implementation by default. + +**File:** `src/dspy_cli/templates/code_templates/` (module template) + +```python +# Generated template — before +class {{ module_name }}(dspy.Module): + def __init__(self): + self.predict = dspy.{{ module_type }}("{{ signature }}") + + def forward(self, {{ input_fields }}): + return self.predict({{ input_kwargs }}) +``` + +```python +# Generated template — after +class {{ module_name }}(dspy.Module): + def __init__(self): + self.predict = dspy.{{ module_type }}("{{ signature }}") + + def forward(self, {{ input_fields }}): + return self.predict({{ input_kwargs }}) + + async def aforward(self, {{ input_fields }}): + """ + Async version of forward(). When present, dspy-cli routes requests + through the native async path (no thread pool). For complex modules + with custom logic between LLM calls, ensure all sub-module calls + use acall() not direct invocation, e.g.: + result = await self.predict.acall(...) + """ + return await self.predict.acall({{ input_kwargs }}) +``` + +Add a note in docs explaining when users need to do more than just call `acall` on a single predictor (multi-step modules, custom logic between calls). + +--- + +### Task 3.7 — Fix JSONL Write Contention + +The concurrent write problem: multiple threads and async tasks writing to the same log file with no locking. + +**File:** `src/dspy_cli/server/logging.py` + +Replace direct file writes with a `QueueHandler` → single background thread drain: + +```python +import asyncio +import json +import logging +import queue +import threading +from pathlib import Path + +_log_queue: queue.Queue = queue.Queue() +_log_thread: threading.Thread | None = None + + +def _log_writer_thread(logs_dir: Path): + """Single background thread that drains the log queue and writes to disk.""" + open_files = {} + while True: + item = _log_queue.get() + if item is None: # Shutdown signal + for f in open_files.values(): + f.close() + return + + program_name, entry_json = item + log_file = logs_dir / f"{program_name}.log" + + if program_name not in open_files: + open_files[program_name] = open(log_file, "a", buffering=1) + + open_files[program_name].write(entry_json + "\n") + _log_queue.task_done() + + +def start_log_writer(logs_dir: Path): + """Start the background log writer. Call once at server startup.""" + global _log_thread + _log_thread = threading.Thread( + target=_log_writer_thread, + args=(logs_dir,), + daemon=True, + name="dspy-log-writer" + ) + _log_thread.start() + + +def stop_log_writer(): + """Drain the queue and shut down the log writer. Call on server shutdown.""" + _log_queue.put(None) + if _log_thread: + _log_thread.join(timeout=5) + + +def log_inference(*, logs_dir: Path, program_name: str, **fields): + """Enqueue a log entry. Non-blocking — returns immediately.""" + entry = {"program": program_name, **fields} + _log_queue.put((program_name, json.dumps(entry))) +``` + +Update server lifespan in `app.py` to call `start_log_writer()` / `stop_log_writer()`. + +--- + +### Task 3.8 — Fix Metrics Endpoint: In-Memory Accumulation + +The current `/api/metrics` endpoint reads and parses the entire JSONL file on every call. Replace the file-scan approach with in-memory running totals that are updated at write time and written to file only for durability. + +**File:** `src/dspy_cli/server/metrics.py` + +```python +""" +In-memory metrics accumulation with JSONL durability. + +Metrics are updated in-memory on every log_inference() call. +The /api/metrics endpoint reads from memory, not from disk. +JSONL files remain for persistence across restarts — on startup, +metrics are reconstructed from the log file once, then maintained in memory. +""" +import threading +from dataclasses import dataclass, field +from typing import Dict, Any + +@dataclass +class ProgramMetrics: + program: str + call_count: int = 0 + success_count: int = 0 + error_count: int = 0 + _durations: list = field(default_factory=list, repr=False) + total_tokens: int = 0 + # ... other fields + + # Thread-safe: metrics are only written from the single log writer thread + # and read from the metrics endpoint. No locking needed as long as GIL + # protects the int/list updates (it does for CPython). + + def record(self, duration_ms: float, success: bool, tokens: int): + self.call_count += 1 + if success: + self.success_count += 1 + else: + self.error_count += 1 + self._durations.append(duration_ms) + self.total_tokens += tokens + + @property + def avg_latency_ms(self): + if not self._durations: + return None + return sum(self._durations) / len(self._durations) + + @property + def p95_latency_ms(self): + if not self._durations: + return None + sorted_d = sorted(self._durations) + return sorted_d[int(0.95 * (len(sorted_d) - 1))] + + +# Process-wide metrics store: program_name -> ProgramMetrics +_metrics_store: Dict[str, ProgramMetrics] = {} +``` + +Update `log_inference()` in the logging module to call `metrics_store[program].record(...)` after writing to the queue. + +--- + +### Task 3.9 — Semaphore-Based Rate Limiting / Backpressure + +Add a per-program concurrency limit. When the semaphore is full, new requests get a `429 Too Many Requests` with a `Retry-After` header rather than queuing indefinitely. + +**File:** `src/dspy_cli/server/routes.py` + +```python +import asyncio +from fastapi import HTTPException +from fastapi.responses import JSONResponse + +# Created once per program at route creation time +program_semaphores: dict[str, asyncio.Semaphore] = {} + +def create_program_routes(app, module, lm, model_config, config, gateway=None): + max_concurrent = config.get("server", {}).get("max_concurrent_per_program", 20) + semaphore = asyncio.Semaphore(max_concurrent) + program_semaphores[module.name] = semaphore + + async def run_program(request: request_model): + if not await asyncio.wait_for( + asyncio.shield(semaphore.acquire()), + timeout=0 # Non-blocking check + ): + return JSONResponse( + status_code=429, + content={"error": "Too many concurrent requests", "program": module.name}, + headers={"Retry-After": "1"} + ) + try: + return await execute_pipeline(...) + finally: + semaphore.release() +``` + +Expose `max_concurrent_per_program` in `dspy.config.yaml` and as a CLI flag. + +--- + +### Task 3.10 — Multi-Worker Dockerfile + +Update the generated Dockerfile to use Gunicorn + Uvicorn workers for true multi-process parallelism. + +```dockerfile +# Install gunicorn +RUN pip install gunicorn + +# Replace single-process uvicorn with multi-worker gunicorn +CMD gunicorn \ + --worker-class uvicorn.workers.UvicornWorker \ + --workers ${WORKERS:-4} \ + --bind 0.0.0.0:8000 \ + --timeout 120 \ + --access-logfile - \ + "dspy_cli.server.runner:create_gunicorn_app()" +``` + +This requires adding a `create_gunicorn_app()` factory function in `runner.py` that Gunicorn can import. The current `main()` entry point is not importable in the Gunicorn pattern. + +Note: with multiple workers, the in-memory metrics store (Task 3.8) is per-process. The `/api/metrics` endpoint will only reflect one worker's data. Solutions: use Redis for shared metrics, or accept per-worker metrics and aggregate at the load balancer. Document this limitation clearly. + +--- + +### Task 3.11 — Health Check Differentiation + +Add proper liveness vs readiness endpoints. + +**File:** `src/dspy_cli/server/routes.py` or `app.py` + +```python +@app.get("/health/live") +async def liveness(): + """Liveness: is the process running? Returns 200 if the server is up.""" + return {"status": "alive"} + + +@app.get("/health/ready") +async def readiness(): + """ + Readiness: can this instance serve traffic? + Checks that all LM instances initialized successfully. + Returns 503 if any program failed to initialize. + """ + failed = [] + for name, lm in app.state.program_lms.items(): + if lm is None: + failed.append(name) + + if failed: + return JSONResponse( + status_code=503, + content={"status": "not ready", "failed_programs": failed} + ) + return {"status": "ready", "programs": len(app.state.program_lms)} +``` + +--- + +### Task 3.12 — Final Benchmark & Regression Gate + +Run the full benchmark suite and compare against baseline. + +```bash +LABEL="after-async-routing" bash tests/load/run_benchmark.sh +``` + +Add a CI gate script that reads the baseline CSV and the current CSV and fails if P95 latency has regressed or RPS has dropped: + +**File:** `tests/load/assert_benchmark.py` + +```python +""" +Compares two locust CSV result files and fails if performance has regressed. +Usage: python tests/load/assert_benchmark.py results/baseline_stats.csv results/current_stats.csv +""" +import sys +import csv + +def load_stats(path): + with open(path) as f: + reader = csv.DictReader(f) + for row in reader: + if row["Name"] == "Aggregated": + return { + "rps": float(row["Requests/s"]), + "p95": float(row["95%"]), + "failures": float(row["Failure Count"]) / max(float(row["Request Count"]), 1), + } + +baseline = load_stats(sys.argv[1]) +current = load_stats(sys.argv[2]) + +rps_change = (current["rps"] - baseline["rps"]) / baseline["rps"] +p95_change = (current["p95"] - baseline["p95"]) / baseline["p95"] +fail_change = current["failures"] - baseline["failures"] + +print(f"RPS: {baseline['rps']:.1f} → {current['rps']:.1f} ({rps_change:+.1%})") +print(f"P95 (ms): {baseline['p95']:.0f} → {current['p95']:.0f} ({p95_change:+.1%})") +print(f"Failures: {baseline['failures']:.1%} → {current['failures']:.1%}") + +errors = [] +if rps_change < -0.10: errors.append(f"RPS dropped {rps_change:.1%} (threshold: -10%)") +if p95_change > 0.20: errors.append(f"P95 increased {p95_change:.1%} (threshold: +20%)") +if current["failures"] > 0.01: errors.append(f"Failure rate {current['failures']:.1%} > 1%") + +if errors: + print("\nREGRESSION DETECTED:") + for e in errors: + print(f" ✗ {e}") + sys.exit(1) + +print("\nAll performance gates passed.") +``` + +--- + +## Summary: Task Order + +| # | Task | Phase | Risk | Impact | +|---|------|-------|------|--------| +| 1.1–1.6 | Testing harness + baseline | 1 | None | Unblocks everything | +| 2.1 | Disable global history | 2 | Very Low | Fixes race condition | +| 2.2 | Dockerfile `--no-reload` | 2 | Very Low | Fixes silent production footgun | +| 2.3 | Document `--no-reload` | 2 | None | Prevents user mistakes | +| 3.1 | Bounded executor infrastructure | 3 | Low | Foundation for 3.3 | +| 3.2 | `has_native_async` at discovery | 3 | Low | Foundation for 3.3 | +| 3.3 | Update `execute_pipeline` dispatch | 3 | Medium | Core async routing change | +| 3.4 | Update batch dispatch | 3 | Low | Consistency | +| 3.5 | Expose `sync_worker_threads` config | 3 | Low | Operability | +| 3.6 | `aforward` in `generate scaffold` | 3 | Low | New modules get async path free | +| 3.7 | JSONL write contention fix | 3 | Medium | Fixes concurrent write corruption | +| 3.8 | In-memory metrics accumulation | 3 | Medium | Eliminates O(n) metrics scan | +| 3.9 | Semaphore backpressure | 3 | Medium | Prevents cascade failure | +| 3.10 | Multi-worker Dockerfile | 3 | High | Depends on 3.8 decision re: shared metrics | +| 3.11 | Health check differentiation | 3 | Low | Required for k8s deployments | +| 3.12 | Final benchmark + CI gate | 3 | None | Locks in gains | diff --git a/src/dspy_cli/commands/serve.py b/src/dspy_cli/commands/serve.py index 9c45798..a2e3aa2 100644 --- a/src/dspy_cli/commands/serve.py +++ b/src/dspy_cli/commands/serve.py @@ -68,7 +68,7 @@ def _exec_clean(target_python: Path, args: list[str]) -> NoReturn: @click.option( "--reload/--no-reload", default=True, - help="Enable auto-reload on file changes (default: enabled)", + help="Enable auto-reload on file changes (default: enabled). Use --no-reload for production.", ) @click.option( "--save-openapi/--no-save-openapi", @@ -102,7 +102,13 @@ def _exec_clean(target_python: Path, args: list[str]) -> NoReturn: default=False, help="Enable API authentication via DSPY_API_KEY (default: disabled)", ) -def serve(port, host, logs_dir, reload, save_openapi, openapi_format, python, system, mcp, auth): +@click.option( + "--sync-workers", + default=None, + type=click.IntRange(1, 200), + help="Number of threads for sync module execution (default: 10, or server.sync_worker_threads in config)", +) +def serve(port, host, logs_dir, reload, save_openapi, openapi_format, python, system, mcp, auth, sync_workers): """Start an HTTP API server that exposes your DSPy programs. This command: @@ -127,6 +133,7 @@ def serve(port, host, logs_dir, reload, save_openapi, openapi_format, python, sy openapi_format=openapi_format, mcp=mcp, auth=auth, + sync_workers=sync_workers, ) return @@ -192,6 +199,8 @@ def serve(port, host, logs_dir, reload, save_openapi, openapi_format, python, sy args.append("--mcp") if auth: args.append("--auth") + if sync_workers is not None: + args.extend(["--sync-workers", str(sync_workers)]) _exec_clean(target_python, args) else: @@ -205,4 +214,5 @@ def serve(port, host, logs_dir, reload, save_openapi, openapi_format, python, sy openapi_format=openapi_format, mcp=mcp, auth=auth, + sync_workers=sync_workers, ) diff --git a/src/dspy_cli/discovery/module_finder.py b/src/dspy_cli/discovery/module_finder.py index 98a5a3e..73b6583 100644 --- a/src/dspy_cli/discovery/module_finder.py +++ b/src/dspy_cli/discovery/module_finder.py @@ -28,6 +28,7 @@ class DiscoveredModule: forward_input_fields: Optional[Dict[str, Any]] = None # Input field types from forward() method forward_output_fields: Optional[Dict[str, Any]] = None # Output field types from forward() method is_forward_typed: bool = False # True if forward() has proper type annotations + has_native_async: bool = False # True if the module has a user-implemented aforward() gateway_classes: List[Type["Gateway"]] = None # Gateway classes if specified on module (supports list) # Backward compatibility: single gateway_class property @@ -43,6 +44,27 @@ def instantiate(self, lm: dspy.LM | None = None) -> dspy.Module: return self.class_obj() +def _has_user_implemented_aforward(cls: Type[dspy.Module]) -> bool: + """Check if a class defines its own aforward() (not inherited from dspy.Module). + + This prevents false positives where a base class provides a default + aforward that just delegates to forward(). + + Args: + cls: The DSPy Module class to check. + + Returns: + True if the class (not a parent) defines aforward. + """ + # Check if 'aforward' is defined directly on the class (not inherited) + if 'aforward' not in cls.__dict__: + return False + + # Extra guard: make sure it's actually callable + method = cls.__dict__['aforward'] + return callable(method) or isinstance(method, (staticmethod, classmethod)) + + def discover_modules( package_path: Path, package_name: str, @@ -135,6 +157,11 @@ def discover_modules( # Extract gateway classes if specified (supports single or list) gateway_classes = _extract_gateway_classes(obj) + # Detect user-implemented aforward + native_async = _has_user_implemented_aforward(obj) + if native_async: + logger.info(f"Module {name} has native async support (aforward)") + discovered.append( DiscoveredModule( name=name, @@ -144,6 +171,7 @@ def discover_modules( forward_input_fields=forward_info.get("inputs"), forward_output_fields=forward_info.get("outputs"), is_forward_typed=forward_info.get("is_typed", False), + has_native_async=native_async, gateway_classes=gateway_classes, ) ) diff --git a/src/dspy_cli/server/app.py b/src/dspy_cli/server/app.py index e017871..da349f6 100644 --- a/src/dspy_cli/server/app.py +++ b/src/dspy_cli/server/app.py @@ -6,6 +6,7 @@ from pathlib import Path from typing import Dict, List, Union from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse import dspy @@ -15,7 +16,8 @@ from dspy_cli.discovery import discover_modules from dspy_cli.discovery.gateway_finder import get_gateways_for_module, is_cron_gateway from dspy_cli.gateway import APIGateway, IdentityGateway -from dspy_cli.server.logging import setup_logging +from dspy_cli.server.executor import init_executor, shutdown_executor, DEFAULT_SYNC_WORKERS +from dspy_cli.server.logging import setup_logging, start_log_writer, stop_log_writer from dspy_cli.server.metrics import get_all_metrics, get_program_metrics_cached from dspy_cli.server.routes import create_program_routes from dspy_cli.server.scheduler import GatewayScheduler @@ -31,6 +33,7 @@ def create_app( logs_dir: Path, enable_ui: bool = True, enable_auth: bool = False, + sync_workers: int | None = None, ) -> FastAPI: """Create and configure the FastAPI application. @@ -41,6 +44,7 @@ def create_app( logs_dir: Directory for log files enable_ui: Whether to enable the web UI (always True, kept for compatibility) enable_auth: Whether to enable API authentication via DSPY_API_KEY + sync_workers: Number of threads for sync module execution (overrides config) Returns: Configured FastAPI application @@ -48,6 +52,14 @@ def create_app( # Setup logging setup_logging() + # Initialize bounded executor for sync module execution + # Priority: CLI flag > config file > default + worker_count = sync_workers or config.get("server", {}).get("sync_worker_threads") or DEFAULT_SYNC_WORKERS + init_executor(max_workers=worker_count) + + # Start background log writer + start_log_writer() + # Create FastAPI app app = FastAPI( title="DSPy API", @@ -184,6 +196,22 @@ def create_app( else: logger.warning(f"Unknown gateway type for {module.name}: {type(gateway)}") + # Health check endpoints + @app.get("/health/live") + async def liveness(): + """Liveness probe — returns 200 if the process is running.""" + return {"status": "alive"} + + @app.get("/health/ready") + async def readiness(): + """Readiness probe — returns 200 when all LM instances are initialized.""" + if not modules: + return JSONResponse(status_code=503, content={"status": "not_ready", "reason": "no modules discovered"}) + missing = [m.name for m in modules if m.name not in app.state.program_lms] + if missing: + return JSONResponse(status_code=503, content={"status": "not_ready", "reason": f"LMs not initialized: {missing}"}) + return {"status": "ready", "programs": len(modules)} + # Add programs list endpoint @app.get("/programs") async def list_programs(): @@ -319,19 +347,23 @@ async def lifespan(app: FastAPI): scheduler = getattr(app.state, "scheduler", None) if scheduler and scheduler.job_count > 0: scheduler.start() - + yield - + # Shutdown if scheduler and scheduler.job_count > 0: scheduler.shutdown() - + for shutdown_fn in getattr(app.state, "_gateway_shutdowns", []): try: shutdown_fn() except Exception as e: logger.warning(f"Gateway shutdown error: {e}") + # Shutdown executor and log writer + shutdown_executor() + stop_log_writer() + def _create_lm_instance(model_config: Dict) -> dspy.LM: """Create a DSPy LM instance from configuration. @@ -349,6 +381,7 @@ def _create_lm_instance(model_config: Dict) -> dspy.LM: max_tokens = model_config.get("max_tokens") api_key = model_config.get("api_key") api_base = model_config.get("api_base") + cache = model_config.get("cache") # Build kwargs kwargs = {} @@ -360,6 +393,8 @@ def _create_lm_instance(model_config: Dict) -> dspy.LM: kwargs["api_key"] = api_key if api_base is not None: kwargs["api_base"] = api_base + if cache is not None: + kwargs["cache"] = cache # Create and return LM instance return dspy.LM( @@ -379,7 +414,9 @@ def _configure_dspy_model(model_config: Dict): lm = _create_lm_instance(model_config) # Configure DSPy - dspy.settings.configure(lm=lm) + # Disable global history: it's an unprotected plain list that races under + # concurrent async/threaded requests. Inference logs capture everything we need. + dspy.settings.configure(lm=lm, disable_history=True) model = model_config.get("model") model_type = model_config.get("model_type", "chat") diff --git a/src/dspy_cli/server/execution.py b/src/dspy_cli/server/execution.py index 8279392..a9f617f 100644 --- a/src/dspy_cli/server/execution.py +++ b/src/dspy_cli/server/execution.py @@ -1,5 +1,6 @@ """Shared pipeline execution logic for HTTP, MCP, and gateways.""" +import asyncio import base64 import logging import time @@ -10,6 +11,7 @@ import dspy from dspy_cli.discovery import DiscoveredModule +from dspy_cli.server.executor import run_sync_in_executor from dspy_cli.server.logging import log_inference logger = logging.getLogger(__name__) @@ -277,10 +279,10 @@ async def execute_pipeline( logger.info(f"Executing {program_name} with inputs: {inputs}") with dspy.context(lm=request_lm): - if hasattr(instance, 'aforward'): + if module.has_native_async: result = await instance.acall(**inputs) else: - result = instance(**inputs) + result = await run_sync_in_executor(instance, **inputs) output = _normalize_output(result, module) duration_ms = (time.time() - start_time) * 1000 @@ -393,7 +395,11 @@ async def execute_pipeline_batch( if max_errors is not None: batch_kwargs["max_errors"] = max_errors - batch_result = instance.batch(examples, **batch_kwargs) + # batch() is always sync — run it in the bounded executor + # to avoid blocking the event loop + batch_result = await run_sync_in_executor( + instance.batch, examples, **batch_kwargs + ) if isinstance(batch_result, tuple) and len(batch_result) == 3: successful, failed_examples, exceptions = batch_result diff --git a/src/dspy_cli/server/executor.py b/src/dspy_cli/server/executor.py new file mode 100644 index 0000000..664c5f2 --- /dev/null +++ b/src/dspy_cli/server/executor.py @@ -0,0 +1,78 @@ +"""Bounded thread pool executor for sync DSPy module execution.""" + +import asyncio +import contextvars +import functools +import logging +import os +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Callable, Optional + +logger = logging.getLogger(__name__) + +_executor: Optional[ThreadPoolExecutor] = None + +# Match Python's default ThreadPoolExecutor size (same as asyncio.to_thread). +# Users can override via --sync-workers or server.sync_worker_threads config. +DEFAULT_SYNC_WORKERS = min(32, (os.cpu_count() or 1) + 4) + + +def get_executor() -> Optional[ThreadPoolExecutor]: + """Return the current executor, or None if not initialized.""" + return _executor + + +def init_executor(max_workers: Optional[int] = None) -> ThreadPoolExecutor: + """Create and store a bounded ThreadPoolExecutor. + + Args: + max_workers: Maximum number of worker threads. + Defaults to DEFAULT_SYNC_WORKERS. + + Returns: + The initialized ThreadPoolExecutor. + """ + global _executor + if _executor is not None: + logger.warning("Executor already initialized, shutting down previous instance") + _executor.shutdown(wait=False) + + workers = max_workers or DEFAULT_SYNC_WORKERS + _executor = ThreadPoolExecutor(max_workers=workers, thread_name_prefix="dspy-sync") + logger.info(f"Initialized sync executor with {workers} worker threads") + return _executor + + +def shutdown_executor() -> None: + """Shut down the executor, waiting for pending work to complete.""" + global _executor + if _executor is not None: + logger.info("Shutting down sync executor") + _executor.shutdown(wait=True) + _executor = None + + +async def run_sync_in_executor(fn: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: + """Run a sync callable in the bounded executor. + + Falls back to asyncio.to_thread() if no executor has been initialized + (e.g., during testing). + + Args: + fn: Synchronous callable to execute. + *args: Positional arguments for fn. + **kwargs: Keyword arguments for fn. + + Returns: + The return value of fn(*args, **kwargs). + """ + loop = asyncio.get_running_loop() + executor = _executor + + # Copy the current context so that contextvars (including dspy.context + # overrides like per-request LM) propagate into the worker thread. + # This mirrors what asyncio.to_thread() does internally. + ctx = contextvars.copy_context() + func_call = functools.partial(ctx.run, fn, *args, **kwargs) + + return await loop.run_in_executor(executor, func_call) diff --git a/src/dspy_cli/server/logging.py b/src/dspy_cli/server/logging.py index 0636430..a0f084f 100644 --- a/src/dspy_cli/server/logging.py +++ b/src/dspy_cli/server/logging.py @@ -2,12 +2,132 @@ import json import logging +import queue +import threading from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) +# Background writer state +_log_queue: queue.Queue = queue.Queue() +_writer_thread: Optional[threading.Thread] = None +_shutdown_event = threading.Event() + +_SENTINEL = None # Signals the writer thread to stop + + +def start_log_writer() -> None: + """Start the background log writer thread. + + Safe to call multiple times — restarts the writer if already stopped. + """ + global _writer_thread, _shutdown_event + + if _writer_thread is not None and _writer_thread.is_alive(): + return + + _shutdown_event.clear() + _writer_thread = threading.Thread( + target=_log_writer_loop, + name="log-writer", + daemon=True, + ) + _writer_thread.start() + logger.info("Background log writer started") + + +def stop_log_writer(timeout: float = 5.0) -> None: + """Stop the background log writer, flushing pending entries. + + Args: + timeout: Maximum seconds to wait for the writer to drain. + """ + global _writer_thread + + if _writer_thread is None or not _writer_thread.is_alive(): + return + + _shutdown_event.set() + _log_queue.put(_SENTINEL) + _writer_thread.join(timeout=timeout) + _writer_thread = None + logger.info("Background log writer stopped") + + +def _log_writer_loop() -> None: + """Drain the log queue and write entries to per-program files. + + Runs in a dedicated thread. Batches up to 50 entries per flush + for efficiency under high concurrency. + """ + while True: + entries: List[tuple] = [] + + # Block for the first entry + try: + item = _log_queue.get(timeout=1.0) + except queue.Empty: + if _shutdown_event.is_set(): + break + continue + + if item is _SENTINEL: + # Drain any remaining entries before exiting + while not _log_queue.empty(): + try: + remaining = _log_queue.get_nowait() + if remaining is not _SENTINEL: + entries.append(remaining) + except queue.Empty: + break + _flush_entries(entries) + break + + entries.append(item) + + # Batch: grab up to 49 more without blocking + for _ in range(49): + try: + item = _log_queue.get_nowait() + if item is _SENTINEL: + _flush_entries(entries) + return + entries.append(item) + except queue.Empty: + break + + _flush_entries(entries) + + +def _flush_entries(entries: List[tuple]) -> None: + """Write a batch of log entries grouped by program to their files. + + Args: + entries: List of (logs_dir, program_name, log_entry_dict) tuples. + """ + if not entries: + return + + # Group by (logs_dir, program_name) to minimize file opens + grouped: Dict[tuple, List[str]] = {} + for logs_dir, program_name, log_entry in entries: + key = (str(logs_dir), program_name) + if key not in grouped: + grouped[key] = [] + grouped[key].append(json.dumps(log_entry)) + + for (logs_dir_str, program_name), lines in grouped.items(): + logs_dir = Path(logs_dir_str) + log_file = logs_dir / f"{program_name}.log" + try: + logs_dir.mkdir(exist_ok=True, parents=True) + with open(log_file, "a") as f: + f.write("\n".join(lines) + "\n") + except Exception as e: + logger.error(f"Failed to write inference log for {program_name}: {e}") + def log_inference( logs_dir: Path, @@ -23,8 +143,8 @@ def log_inference( ): """Log a DSPy inference trace to a per-program log file. - This creates a structured log entry suitable for use as training data, - capturing the full inference trace including inputs, outputs, and metadata. + This enqueues the entry for the background writer thread, so it + never blocks the calling thread or event loop. Args: logs_dir: Directory to write log files @@ -62,14 +182,7 @@ def log_inference( if lm_calls: log_entry["lm_calls"] = lm_calls - log_file = logs_dir / f"{program_name}.log" - - try: - logs_dir.mkdir(exist_ok=True, parents=True) - with open(log_file, "a") as f: - f.write(json.dumps(log_entry) + "\n") - except Exception as e: - logger.error(f"Failed to write inference log: {e}") + _log_queue.put((logs_dir, program_name, log_entry)) def setup_logging(log_level: str = "INFO"): diff --git a/src/dspy_cli/server/routes.py b/src/dspy_cli/server/routes.py index 4e0732a..3b5f689 100644 --- a/src/dspy_cli/server/routes.py +++ b/src/dspy_cli/server/routes.py @@ -1,10 +1,12 @@ """Dynamic route generation for DSPy programs.""" +import asyncio import logging from typing import Any, Dict import dspy from fastapi import FastAPI, HTTPException +from fastapi.responses import JSONResponse from pydantic import create_model from dspy_cli.discovery import DiscoveredModule @@ -12,6 +14,11 @@ from dspy_cli.gateway import APIGateway, IdentityGateway from dspy_cli.server.execution import _convert_dspy_types, execute_pipeline +DEFAULT_MAX_CONCURRENT = 20 + +# Per-program semaphores, keyed by program name +_program_semaphores: Dict[str, asyncio.Semaphore] = {} + logger = logging.getLogger(__name__) @@ -77,8 +84,22 @@ def create_program_routes( else: route_path = f"/{program_name}/{gateway.__class__.__name__}" + # Create or reuse per-program semaphore + max_concurrent = config.get("server", {}).get("max_concurrent_per_program", DEFAULT_MAX_CONCURRENT) + if program_name not in _program_semaphores: + _program_semaphores[program_name] = asyncio.Semaphore(max_concurrent) + sem = _program_semaphores[program_name] + async def run_program(request: request_model): """Execute the DSPy program with given inputs.""" + try: + await asyncio.wait_for(sem.acquire(), timeout=30.0) + except asyncio.TimeoutError: + return JSONResponse( + status_code=429, + content={"detail": f"Too many concurrent requests for '{program_name}'. Try again later."}, + ) + try: pipeline_inputs = gateway.to_pipeline_inputs(request) @@ -100,6 +121,8 @@ async def run_program(request: request_model): except Exception as e: raise HTTPException(status_code=500, detail=str(e)) + finally: + sem.release() # Initialize gateway lifecycle gateway.setup() diff --git a/src/dspy_cli/server/runner.py b/src/dspy_cli/server/runner.py index b1636c3..76cd85b 100644 --- a/src/dspy_cli/server/runner.py +++ b/src/dspy_cli/server/runner.py @@ -20,6 +20,7 @@ ENV_ENABLE_MCP = "DSPY_CLI_ENABLE_MCP" ENV_LOGS_DIR = "DSPY_CLI_LOGS_DIR" ENV_AUTH_ENABLED = "DSPY_CLI_AUTH_ENABLED" +ENV_SYNC_WORKERS = "DSPY_CLI_SYNC_WORKERS" def _maybe_mount_mcp(app, enable: bool, *, path: str = MCP_DEFAULT_PATH, notify=None) -> bool: @@ -86,6 +87,8 @@ def create_app_instance(): logs_dir = os.environ.get(ENV_LOGS_DIR, "./logs") enable_mcp = os.environ.get(ENV_ENABLE_MCP, "false").lower() == "true" enable_auth = os.environ.get(ENV_AUTH_ENABLED, "false").lower() == "true" + sync_workers_str = os.environ.get(ENV_SYNC_WORKERS) + sync_workers = int(sync_workers_str) if sync_workers_str else None # Validate project structure if not validate_project_structure(): @@ -118,6 +121,7 @@ def create_app_instance(): logs_dir=logs_path, enable_ui=True, enable_auth=enable_auth, + sync_workers=sync_workers, ) # Mount MCP if enabled @@ -135,6 +139,7 @@ def main( openapi_format: str = "json", mcp: bool = False, auth: bool = False, + sync_workers: int | None = None, ): """Main server execution logic. @@ -147,6 +152,7 @@ def main( openapi_format: Format for OpenAPI spec (json or yaml) mcp: Whether to enable MCP server at /mcp auth: Whether to enable API authentication + sync_workers: Number of threads for sync module execution """ click.echo("Starting DSPy API server...") click.echo() @@ -192,6 +198,7 @@ def main( logs_dir=logs_path, enable_ui=True, enable_auth=auth, + sync_workers=sync_workers, ) # Mount MCP if enabled @@ -275,6 +282,8 @@ def notify_cli(msg: str, level: str = "info"): os.environ[ENV_LOGS_DIR] = str(logs_path) os.environ[ENV_ENABLE_MCP] = str(mcp).lower() os.environ[ENV_AUTH_ENABLED] = str(auth).lower() + if sync_workers is not None: + os.environ[ENV_SYNC_WORKERS] = str(sync_workers) # Get project root and src directory for watching project_root = Path.cwd() @@ -318,6 +327,7 @@ def notify_cli(msg: str, level: str = "info"): parser.add_argument("--openapi-format", choices=["json", "yaml"], default="json") parser.add_argument("--mcp", action="store_true", help="Enable MCP server at /mcp") parser.add_argument("--auth", action="store_true", help="Enable API authentication") + parser.add_argument("--sync-workers", type=int, default=None, help="Number of sync worker threads") args = parser.parse_args() main( @@ -329,4 +339,5 @@ def notify_cli(msg: str, level: str = "info"): openapi_format=args.openapi_format, mcp=args.mcp, auth=args.auth, + sync_workers=args.sync_workers, ) diff --git a/src/dspy_cli/templates/README.md.template b/src/dspy_cli/templates/README.md.template index d502dd8..9f03dc7 100644 --- a/src/dspy_cli/templates/README.md.template +++ b/src/dspy_cli/templates/README.md.template @@ -50,6 +50,47 @@ curl -X POST http://localhost:8000/{program_name} \\ -d '{{"question": "your question here"}}' ``` +### Async Modules + +`dspy-cli serve` runs `forward()` in a thread pool so it doesn't block the +server. This handles concurrency well for most workloads. + +If the thread pool becomes a bottleneck, you can add `aforward()` to a module. +The server detects it at startup and calls it directly on the event loop, +bypassing the thread pool: + +```python +class MyModule(dspy.Module): + def __init__(self): + self.predict = dspy.Predict("question -> answer") + + def forward(self, question: str) -> dspy.Prediction: + return self.predict(question=question) + + async def aforward(self, question: str) -> dspy.Prediction: + return await self.predict.acall(question=question) +``` + +You _can_ delete `forward` and use `aforward` only — the server will work fine. +But be aware that DSPy's `__call__` dispatches to `forward`, so optimizers, +notebooks, scripts, and tests all depend on it. If you keep both, any logic in +`forward` must be replicated in `aforward` or the server will skip it. For most +projects the thread pool is fast enough and `forward` alone is the simpler +choice. + +## Production + +Always disable hot reload when deploying: +```bash +dspy-cli serve --no-reload --host 0.0.0.0 +``` + +Or use the included Dockerfile, which has `--no-reload` set by default: +```bash +docker build -t {project_name} . +docker run -p 8000:8000 --env-file .env {project_name} +``` + ## Testing Run tests: diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..d766f22 --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,54 @@ +""" +Starts mock LLM server and dspy-cli server as subprocess fixtures. +Tests in this directory require these fixtures. +""" +import subprocess +import time +import os +import sys + +import httpx +import pytest + + +MOCK_PORT = 9999 +SERVER_PORT = 8000 +FIXTURE_PROJECT = os.path.join(os.path.dirname(__file__), "..", "load", "fixture_project") + + +@pytest.fixture(scope="session", autouse=True) +def mock_lm_server(): + proc = subprocess.Popen( + [sys.executable, os.path.join(os.path.dirname(__file__), "..", "load", "mock_lm_server.py")], + env={**os.environ, "MOCK_DELAY_MS": "50", "MOCK_PORT": str(MOCK_PORT)}, + ) + # Wait for mock server to be ready + for _ in range(10): + try: + httpx.get(f"http://127.0.0.1:{MOCK_PORT}/health", timeout=1) + break + except Exception: + time.sleep(0.5) + yield proc + proc.terminate() + proc.wait() + + +@pytest.fixture(scope="session", autouse=True) +def dspy_cli_server(mock_lm_server): + proc = subprocess.Popen( + [sys.executable, "-m", "dspy_cli.server.runner", + "--port", str(SERVER_PORT), "--host", "127.0.0.1"], + cwd=FIXTURE_PROJECT, + ) + # Wait for server to be ready + for _ in range(30): + try: + resp = httpx.get(f"http://127.0.0.1:{SERVER_PORT}/programs", timeout=2) + if resp.status_code == 200: + break + except Exception: + time.sleep(1) + yield proc + proc.terminate() + proc.wait() diff --git a/tests/integration/test_concurrent_requests.py b/tests/integration/test_concurrent_requests.py new file mode 100644 index 0000000..5392152 --- /dev/null +++ b/tests/integration/test_concurrent_requests.py @@ -0,0 +1,76 @@ +""" +Concurrent correctness tests. Not a load test — verifies that responses +are correct under concurrency, not just that the server survives. + +Requires a running dspy-cli server + mock LLM. Uses the fixtures in conftest.py. +""" +import asyncio + +import httpx +import pytest + +BASE_URL = "http://127.0.0.1:8000" + + +async def make_request(client: httpx.AsyncClient, endpoint: str, question: str): + response = await client.post( + f"{BASE_URL}/{endpoint}", + json={"question": question}, + timeout=30.0 + ) + return response + + +@pytest.mark.asyncio +async def test_sync_module_concurrent_correctness(): + """20 concurrent requests to sync module should all succeed with valid responses.""" + async with httpx.AsyncClient() as client: + tasks = [ + make_request(client, "SimplePredict", f"Question {i}") + for i in range(20) + ] + responses = await asyncio.gather(*tasks) + + for i, r in enumerate(responses): + assert r.status_code == 200, f"Request {i} failed: {r.text}" + assert "answer" in r.json(), f"Request {i} missing 'answer': {r.json()}" + + +@pytest.mark.asyncio +async def test_async_module_concurrent_correctness(): + """20 concurrent requests to async module should all succeed.""" + async with httpx.AsyncClient() as client: + tasks = [ + make_request(client, "AsyncPredict", f"Question {i}") + for i in range(20) + ] + responses = await asyncio.gather(*tasks) + + for i, r in enumerate(responses): + assert r.status_code == 200, f"Request {i} failed: {r.text}" + assert "answer" in r.json(), f"Request {i} missing 'answer': {r.json()}" + + +@pytest.mark.asyncio +async def test_no_response_cross_contamination(): + """ + Verifies that concurrent requests don't bleed into each other's outputs. + Sends requests with distinct questions and checks that answers are independent. + This would catch ContextVar leakage or shared state bugs. + """ + questions = [f"Unique question {i} xyzzy" for i in range(10)] + + async with httpx.AsyncClient() as client: + tasks = [ + make_request(client, "AsyncPredict", q) + for q in questions + ] + responses = await asyncio.gather(*tasks) + + for r in responses: + assert r.status_code == 200 + data = r.json() + assert "answer" in data + # Mock server returns the same canned response, but we're verifying + # there's no exception or empty response caused by state mixing. + assert data["answer"] != "" diff --git a/tests/load/assert_benchmark.py b/tests/load/assert_benchmark.py new file mode 100644 index 0000000..8b8bf53 --- /dev/null +++ b/tests/load/assert_benchmark.py @@ -0,0 +1,55 @@ +""" +Compares two locust CSV result files and fails if performance has regressed. +Usage: python tests/load/assert_benchmark.py results/baseline_stats.csv results/current_stats.csv +""" +import csv +import sys + + +def load_stats(path): + with open(path) as f: + reader = csv.DictReader(f) + for row in reader: + if row["Name"] == "Aggregated": + return { + "rps": float(row["Requests/s"]), + "p95": float(row["95%"]), + "failures": float(row["Failure Count"]) / max(float(row["Request Count"]), 1), + } + raise ValueError(f"No 'Aggregated' row found in {path}") + + +def main(): + if len(sys.argv) != 3: + print(f"Usage: {sys.argv[0]} ") + sys.exit(2) + + baseline = load_stats(sys.argv[1]) + current = load_stats(sys.argv[2]) + + rps_change = (current["rps"] - baseline["rps"]) / baseline["rps"] + p95_change = (current["p95"] - baseline["p95"]) / baseline["p95"] + + print(f"RPS: {baseline['rps']:.1f} -> {current['rps']:.1f} ({rps_change:+.1%})") + print(f"P95 (ms): {baseline['p95']:.0f} -> {current['p95']:.0f} ({p95_change:+.1%})") + print(f"Failures: {baseline['failures']:.1%} -> {current['failures']:.1%}") + + errors = [] + if rps_change < -0.10: + errors.append(f"RPS dropped {rps_change:.1%} (threshold: -10%)") + if p95_change > 0.20: + errors.append(f"P95 increased {p95_change:.1%} (threshold: +20%)") + if current["failures"] > 0.01: + errors.append(f"Failure rate {current['failures']:.1%} > 1%") + + if errors: + print("\nREGRESSION DETECTED:") + for e in errors: + print(f" x {e}") + sys.exit(1) + + print("\nAll performance gates passed.") + + +if __name__ == "__main__": + main() diff --git a/tests/load/fixture_project/dspy.config.yaml b/tests/load/fixture_project/dspy.config.yaml new file mode 100644 index 0000000..e8d1449 --- /dev/null +++ b/tests/load/fixture_project/dspy.config.yaml @@ -0,0 +1,24 @@ +app_id: load-test-app +models: + default: model-alpha + registry: + model-alpha: + model: openai/mock-alpha + api_base: http://127.0.0.1:9999/v1 + api_key: mock-key + model_type: chat + max_tokens: 100 + temperature: 1.0 + cache: false + model-beta: + model: openai/mock-beta + api_base: http://127.0.0.1:9999/v1 + api_key: mock-key + model_type: chat + max_tokens: 100 + temperature: 1.0 + cache: false + +program_models: + SimplePredict: model-alpha + AsyncPredict: model-beta diff --git a/tests/load/fixture_project/src/load_test_app/__init__.py b/tests/load/fixture_project/src/load_test_app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/load/fixture_project/src/load_test_app/modules/__init__.py b/tests/load/fixture_project/src/load_test_app/modules/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/load/fixture_project/src/load_test_app/modules/async_predict.py b/tests/load/fixture_project/src/load_test_app/modules/async_predict.py new file mode 100644 index 0000000..82afbad --- /dev/null +++ b/tests/load/fixture_project/src/load_test_app/modules/async_predict.py @@ -0,0 +1,13 @@ +import dspy + + +class AsyncPredict(dspy.Module): + """Same as SimplePredict but with aforward. Used to test async path.""" + def __init__(self): + self.predict = dspy.Predict("question:str -> answer:str") + + def forward(self, question: str) -> dspy.Prediction: + return self.predict(question=question) + + async def aforward(self, question: str) -> dspy.Prediction: + return await self.predict.acall(question=question) diff --git a/tests/load/fixture_project/src/load_test_app/modules/simple_predict.py b/tests/load/fixture_project/src/load_test_app/modules/simple_predict.py new file mode 100644 index 0000000..7ed0d16 --- /dev/null +++ b/tests/load/fixture_project/src/load_test_app/modules/simple_predict.py @@ -0,0 +1,10 @@ +import dspy + + +class SimplePredict(dspy.Module): + """Single-predict module. Used to test sync fallback path.""" + def __init__(self): + self.predict = dspy.Predict("question:str -> answer:str") + + def forward(self, question: str) -> dspy.Prediction: + return self.predict(question=question) diff --git a/tests/load/locustfile.py b/tests/load/locustfile.py new file mode 100644 index 0000000..eba9076 --- /dev/null +++ b/tests/load/locustfile.py @@ -0,0 +1,78 @@ +""" +Locust load test for dspy-cli. + +Validates both throughput and correctness: each program must receive +responses routed through its configured model. The mock LLM server +echoes the model name back in the answer, so we can verify it. + +Single-scenario run: + locust -f tests/load/locustfile.py \ + --host http://localhost:8000 \ + --headless -u 100 -r 10 \ + --run-time 60s \ + --csv results/test + +Matrix run (preferred): + bash tests/load/run_matrix.sh +""" +import uuid +from locust import HttpUser, task, between, events + + +def unique_payload(): + """Generate a unique question per request to defeat any caching layer.""" + return {"question": f"What is the capital of France? [{uuid.uuid4().hex[:8]}]"} + + +class SyncModuleUser(HttpUser): + """Hits the sync-fallback module (no aforward). Expects model-alpha.""" + wait_time = between(0.01, 0.1) + weight = 1 + + @task + def call_simple_predict(self): + with self.client.post( + "/SimplePredict", + json=unique_payload(), + catch_response=True + ) as response: + if response.status_code != 200: + response.failure(f"Got {response.status_code}: {response.text[:200]}") + return + body = response.json() + answer = body.get("answer", "") + if "model=mock-alpha" not in answer: + response.failure( + f"SimplePredict model mismatch: expected mock-alpha, got: {answer[:100]}" + ) + + +class AsyncModuleUser(HttpUser): + """Hits the native async module (has aforward). Expects model-beta.""" + wait_time = between(0.01, 0.1) + weight = 1 + + @task + def call_async_predict(self): + with self.client.post( + "/AsyncPredict", + json=unique_payload(), + catch_response=True + ) as response: + if response.status_code != 200: + response.failure(f"Got {response.status_code}: {response.text[:200]}") + return + body = response.json() + answer = body.get("answer", "") + if "model=mock-beta" not in answer: + response.failure( + f"AsyncPredict model mismatch: expected mock-beta, got: {answer[:100]}" + ) + + +@events.quitting.add_listener +def on_quit(environment, **kwargs): + """Fail CI if error rate exceeds threshold.""" + if environment.runner.stats.total.fail_ratio > 0.01: + print(f"ERROR: Failure rate {environment.runner.stats.total.fail_ratio:.1%} > 1%") + environment.process_exit_code = 1 diff --git a/tests/load/mock_lm_server.py b/tests/load/mock_lm_server.py new file mode 100644 index 0000000..4f9282e --- /dev/null +++ b/tests/load/mock_lm_server.py @@ -0,0 +1,83 @@ +""" +Minimal OpenAI-compatible mock server for load testing. + +Echoes the requested model name back in the answer so load tests can +verify that per-program model routing is correct under concurrency. + +Environment variables: + MOCK_PORT - Port to listen on (default: 9999) + MOCK_DELAY_MS - Simulated LLM latency in ms (default: 50) + MOCK_ERROR_RATE - Fraction of requests that return 500 (0.0-1.0, default: 0.0) +""" +import asyncio +import os +import random +import time + +import uvicorn +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse + +app = FastAPI() + +MOCK_DELAY_MS = 50 + + +@app.post("/v1/chat/completions") +async def chat(request: Request): + """Accept any JSON body — no strict schema validation. + + LiteLLM sends varying extra fields (stream, n, tools, etc.) + depending on the call path. A strict Pydantic model rejects those. + + The response embeds the requested model name in the answer field + so callers can verify the correct model was routed. + """ + body = await request.json() + model = body.get("model", "unknown") + delay = float(os.environ.get("MOCK_DELAY_MS", MOCK_DELAY_MS)) / 1000 + await asyncio.sleep(delay) + + # Simulate LLM errors + error_rate = float(os.environ.get("MOCK_ERROR_RATE", "0.0")) + if error_rate > 0 and random.random() < error_rate: + return JSONResponse( + status_code=500, + content={ + "error": { + "message": "Mock LLM internal error", + "type": "server_error", + "code": "internal_error", + } + }, + ) + + return { + "id": "mock-completion", + "object": "chat.completion", + "created": int(time.time()), + "model": model, + "choices": [{ + "index": 0, + "message": { + "role": "assistant", + "content": f'[[ ## answer ## ]]\nmodel={model}\n\n[[ ## completed ## ]]' + }, + "finish_reason": "stop" + }], + "usage": { + "prompt_tokens": 20, + "completion_tokens": 10, + "total_tokens": 30 + } + } + + +@app.get("/health") +async def health(): + return {"status": "ok"} + + +if __name__ == "__main__": + port = int(os.environ.get("MOCK_PORT", 9999)) + uvicorn.run(app, host="127.0.0.1", port=port) diff --git a/tests/load/results/.gitkeep b/tests/load/results/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/load/results/baseline/baseline.html b/tests/load/results/baseline/baseline.html new file mode 100644 index 0000000..aee00a2 --- /dev/null +++ b/tests/load/results/baseline/baseline.html @@ -0,0 +1,124 @@ + + + + + + + + + + + Locust + + + + +
+ + + + + \ No newline at end of file diff --git a/tests/load/results/baseline/baseline_exceptions.csv b/tests/load/results/baseline/baseline_exceptions.csv new file mode 100644 index 0000000..5e0e870 --- /dev/null +++ b/tests/load/results/baseline/baseline_exceptions.csv @@ -0,0 +1 @@ +Count,Message,Traceback,Nodes diff --git a/tests/load/results/baseline/baseline_failures.csv b/tests/load/results/baseline/baseline_failures.csv new file mode 100644 index 0000000..f87ff75 --- /dev/null +++ b/tests/load/results/baseline/baseline_failures.csv @@ -0,0 +1 @@ +Method,Name,Error,Occurrences diff --git a/tests/load/results/baseline/baseline_stats.csv b/tests/load/results/baseline/baseline_stats.csv new file mode 100644 index 0000000..61cf3f0 --- /dev/null +++ b/tests/load/results/baseline/baseline_stats.csv @@ -0,0 +1,4 @@ +Type,Name,Request Count,Failure Count,Median Response Time,Average Response Time,Min Response Time,Max Response Time,Average Content Size,Requests/s,Failures/s,50%,66%,75%,80%,90%,95%,98%,99%,99.9%,99.99%,100% +POST,/AsyncPredict,1774,0,1500.0,1464.5187883196566,504.5831250026822,1961.8753330141772,25.0,30.060674289707595,0.0,1500,1500,1600,1600,1700,1800,1800,1900,2000,2000,2000 +POST,/SimplePredict,1782,0,1500.0,1456.4669797631648,505.59662500745617,1914.0643750142772,25.0,30.196235391352275,0.0,1500,1500,1600,1600,1700,1800,1800,1900,1900,1900,1900 +,Aggregated,3556,0,1500.0,1460.4838268889293,504.5831250026822,1961.8753330141772,25.0,60.25690968105987,0.0,1500,1500,1600,1600,1700,1800,1800,1900,1900,2000,2000 diff --git a/tests/load/results/baseline/baseline_stats_history.csv b/tests/load/results/baseline/baseline_stats_history.csv new file mode 100644 index 0000000..595cf5a --- /dev/null +++ b/tests/load/results/baseline/baseline_stats_history.csv @@ -0,0 +1,57 @@ +Timestamp,User Count,Type,Name,Requests/s,Failures/s,50%,66%,75%,80%,90%,95%,98%,99%,99.9%,99.99%,100%,Total Request Count,Total Failure Count,Total Median Response Time,Total Average Response Time,Total Min Response Time,Total Max Response Time,Total Average Content Size +1771779950,0,,Aggregated,0.000000,0.000000,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,0,0,0,0.0,0,0,0 +1771779951,10,,Aggregated,0.000000,0.000000,900,900,900,900,900,900,900,900,900,900,900,10,0,900.0,897.7971415937645,892.9550829925574,901.8138329847716,25.0 +1771779952,20,,Aggregated,0.000000,0.000000,540,540,900,900,900,900,900,900,900,900,900,30,0,540.0,647.297143030058,505.5432500084862,901.8138329847716,25.0 +1771779953,30,,Aggregated,0.000000,0.000000,520,540,540,540,900,900,900,900,900,900,900,80,0,520.0,569.7809229215636,504.72287498996593,901.8138329847716,25.0 +1771779954,40,,Aggregated,5.500000,0.000000,520,530,540,540,570,900,900,900,900,900,900,143,0,520.0,551.6170877248937,504.6017910062801,901.8138329847716,25.0 +1771779955,50,,Aggregated,16.666667,0.000000,530,540,570,610,730,780,900,900,900,900,900,206,0,530.0,571.1061693239138,504.5831250026822,901.8138329847716,25.0 +1771779956,60,,Aggregated,27.500000,0.000000,540,640,730,780,860,870,900,900,900,900,900,268,0,540.0,623.2233179690587,504.5831250026822,901.8138329847716,25.0 +1771779957,70,,Aggregated,35.200000,0.000000,620,820,860,880,920,940,950,950,970,970,970,335,0,620.0,680.5353191782852,504.5831250026822,967.1514159999788,25.0 +1771779958,80,,Aggregated,40.166667,0.000000,730,870,920,940,1000,1100,1100,1100,1200,1200,1200,397,0,730.0,737.7592675584768,504.5831250026822,1165.4350419994444,25.0 +1771779959,90,,Aggregated,43.285714,0.000000,840,920,1000,1000,1200,1200,1300,1300,1400,1400,1400,460,0,840.0,803.0433590795956,504.5831250026822,1353.3677080122288,25.0 +1771779960,100,,Aggregated,45.750000,0.000000,860,1000,1100,1200,1300,1300,1400,1500,1500,1500,1500,523,0,860.0,867.1580765843877,504.5831250026822,1548.6039999814238,25.0 +1771779961,100,,Aggregated,46.777778,0.000000,910,1100,1200,1300,1400,1500,1600,1700,1700,1700,1700,590,0,910.0,941.7136149908124,504.5831250026822,1697.2984580206685,25.0 +1771779962,100,,Aggregated,48.700000,0.000000,940,1200,1300,1400,1500,1600,1700,1700,1900,1900,1900,646,0,940.0,999.4511639790976,504.5831250026822,1864.870916993823,25.0 +1771779963,100,,Aggregated,54.900000,0.000000,1000,1300,1400,1500,1600,1700,1700,1800,1900,1900,1900,705,0,1000.0,1048.5638348318814,504.5831250026822,1872.2402919956949,25.0 +1771779964,100,,Aggregated,59.400000,0.000000,1100,1400,1500,1500,1600,1700,1700,1800,1900,1900,1900,767,0,1100.0,1090.5636585417499,504.5831250026822,1872.2402919956949,25.0 +1771779965,100,,Aggregated,61.400000,0.000000,1200,1500,1500,1500,1600,1700,1700,1800,1900,1900,1900,821,0,1200.0,1120.3708278300835,504.5831250026822,1872.2402919956949,25.0 +1771779966,100,,Aggregated,60.700000,0.000000,1200,1500,1500,1500,1600,1700,1700,1800,1900,1900,1900,887,0,1200.0,1153.7092160152665,504.5831250026822,1872.2402919956949,25.0 +1771779967,100,,Aggregated,61.000000,0.000000,1300,1500,1500,1500,1600,1700,1700,1800,1900,1900,1900,947,0,1300.0,1178.0787344936589,504.5831250026822,1872.2402919956949,25.0 +1771779968,100,,Aggregated,60.900000,0.000000,1400,1500,1500,1500,1600,1700,1700,1800,1900,1900,1900,1009,0,1400.0,1200.274627414751,504.5831250026822,1872.2402919956949,25.0 +1771779969,100,,Aggregated,60.800000,0.000000,1500,1500,1500,1500,1600,1700,1700,1800,1900,1900,1900,1070,0,1400.0,1220.7163864648771,504.5831250026822,1872.2402919956949,25.0 +1771779970,100,,Aggregated,60.800000,0.000000,1500,1500,1500,1500,1700,1700,1700,1800,1900,1900,1900,1135,0,1500.0,1239.3945813659807,504.5831250026822,1872.2402919956949,25.0 +1771779971,100,,Aggregated,61.700000,0.000000,1500,1500,1500,1600,1700,1700,1800,1800,1900,2000,2000,1203,0,1500.0,1261.9729939521003,504.5831250026822,1954.3677080073394,25.0 +1771779972,100,,Aggregated,61.600000,0.000000,1500,1500,1600,1600,1700,1700,1800,1800,2000,2000,2000,1261,0,1500.0,1280.3413839963218,504.5831250026822,1961.8753330141772,25.0 +1771779973,100,,Aggregated,61.200000,0.000000,1500,1500,1600,1600,1700,1700,1800,1800,2000,2000,2000,1318,0,1500.0,1293.3326693925358,504.5831250026822,1961.8753330141772,25.0 +1771779974,100,,Aggregated,61.600000,0.000000,1500,1500,1600,1600,1700,1700,1800,1800,2000,2000,2000,1389,0,1500.0,1305.7114727698909,504.5831250026822,1961.8753330141772,25.0 +1771779975,100,,Aggregated,61.800000,0.000000,1500,1500,1600,1600,1700,1700,1800,1800,2000,2000,2000,1452,0,1500.0,1317.726899396883,504.5831250026822,1961.8753330141772,25.0 +1771779976,100,,Aggregated,62.800000,0.000000,1500,1500,1600,1600,1700,1700,1800,1900,2000,2000,2000,1510,0,1500.0,1326.7295880035022,504.5831250026822,1961.8753330141772,25.0 +1771779977,100,,Aggregated,61.700000,0.000000,1500,1500,1600,1600,1700,1700,1800,1900,2000,2000,2000,1558,0,1500.0,1334.0394992061865,504.5831250026822,1961.8753330141772,25.0 +1771779978,100,,Aggregated,60.600000,0.000000,1500,1500,1600,1600,1700,1700,1800,1900,2000,2000,2000,1615,0,1500.0,1342.3231836738341,504.5831250026822,1961.8753330141772,25.0 +1771779979,100,,Aggregated,60.300000,0.000000,1500,1500,1600,1600,1700,1700,1800,1900,2000,2000,2000,1680,0,1500.0,1349.60965885839,504.5831250026822,1961.8753330141772,25.0 +1771779980,100,,Aggregated,61.100000,0.000000,1500,1500,1600,1600,1700,1800,1900,1900,2000,2000,2000,1748,0,1500.0,1357.9513788892025,504.5831250026822,1961.8753330141772,25.0 +1771779981,100,,Aggregated,61.200000,0.000000,1500,1500,1600,1600,1700,1800,1900,1900,2000,2000,2000,1806,0,1500.0,1365.0792076912057,504.5831250026822,1961.8753330141772,25.0 +1771779982,100,,Aggregated,60.500000,0.000000,1500,1500,1600,1600,1700,1800,1800,1900,2000,2000,2000,1872,0,1500.0,1375.058709449989,504.5831250026822,1961.8753330141772,25.0 +1771779983,100,,Aggregated,61.500000,0.000000,1500,1500,1600,1600,1700,1800,1800,1900,2000,2000,2000,1941,0,1500.0,1382.542747300114,504.5831250026822,1961.8753330141772,25.0 +1771779984,100,,Aggregated,62.500000,0.000000,1500,1500,1600,1600,1700,1800,1800,1900,2000,2000,2000,1992,0,1500.0,1387.3855649731145,504.5831250026822,1961.8753330141772,25.0 +1771779985,100,,Aggregated,60.500000,0.000000,1500,1500,1600,1600,1700,1800,1800,1900,2000,2000,2000,2056,0,1500.0,1392.5274837516133,504.5831250026822,1961.8753330141772,25.0 +1771779986,100,,Aggregated,60.600000,0.000000,1500,1500,1600,1600,1700,1800,1800,1900,2000,2000,2000,2112,0,1500.0,1396.8452125635938,504.5831250026822,1961.8753330141772,25.0 +1771779987,100,,Aggregated,60.600000,0.000000,1500,1500,1600,1600,1700,1800,1800,1900,2000,2000,2000,2181,0,1500.0,1404.8335369552951,504.5831250026822,1961.8753330141772,25.0 +1771779988,100,,Aggregated,62.200000,0.000000,1500,1500,1600,1600,1700,1800,1800,1900,2000,2000,2000,2233,0,1500.0,1408.4474196603137,504.5831250026822,1961.8753330141772,25.0 +1771779989,100,,Aggregated,62.600000,0.000000,1500,1500,1600,1600,1700,1800,1800,1900,2000,2000,2000,2302,0,1500.0,1412.901191598802,504.5831250026822,1961.8753330141772,25.0 +1771779990,100,,Aggregated,61.800000,0.000000,1500,1500,1600,1600,1700,1700,1800,1900,2000,2000,2000,2363,0,1500.0,1416.0940536316255,504.5831250026822,1961.8753330141772,25.0 +1771779991,100,,Aggregated,61.100000,0.000000,1500,1500,1600,1600,1700,1800,1800,1900,2000,2000,2000,2419,0,1500.0,1419.2152034520461,504.5831250026822,1961.8753330141772,25.0 +1771779992,100,,Aggregated,60.500000,0.000000,1500,1500,1600,1600,1700,1800,1800,1900,2000,2000,2000,2490,0,1500.0,1423.557135333945,504.5831250026822,1961.8753330141772,25.0 +1771779993,100,,Aggregated,60.400000,0.000000,1500,1500,1600,1600,1700,1800,1800,1900,2000,2000,2000,2555,0,1500.0,1427.1213547051193,504.5831250026822,1961.8753330141772,25.0 +1771779994,100,,Aggregated,60.100000,0.000000,1500,1500,1600,1600,1700,1700,1800,1900,2000,2000,2000,2610,0,1500.0,1429.8955300055898,504.5831250026822,1961.8753330141772,25.0 +1771779995,100,,Aggregated,62.100000,0.000000,1500,1500,1600,1600,1700,1700,1800,1900,2000,2000,2000,2669,0,1500.0,1431.8687182864396,504.5831250026822,1961.8753330141772,25.0 +1771779996,100,,Aggregated,61.400000,0.000000,1500,1500,1600,1600,1700,1700,1800,1900,2000,2000,2000,2727,0,1500.0,1434.1564936528837,504.5831250026822,1961.8753330141772,25.0 +1771779997,100,,Aggregated,61.600000,0.000000,1500,1500,1600,1600,1700,1700,1800,1900,2000,2000,2000,2796,0,1500.0,1436.6255640008544,504.5831250026822,1961.8753330141772,25.0 +1771779998,100,,Aggregated,61.700000,0.000000,1500,1500,1600,1600,1700,1700,1800,1900,2000,2000,2000,2864,0,1500.0,1439.4161172846843,504.5831250026822,1961.8753330141772,25.0 +1771779999,100,,Aggregated,62.100000,0.000000,1500,1500,1600,1600,1700,1800,1800,1900,2000,2000,2000,2917,0,1500.0,1441.077310474922,504.5831250026822,1961.8753330141772,25.0 +1771780000,100,,Aggregated,61.500000,0.000000,1500,1500,1600,1600,1700,1800,1800,1900,2000,2000,2000,2979,0,1500.0,1443.0766145492926,504.5831250026822,1961.8753330141772,25.0 +1771780001,100,,Aggregated,62.700000,0.000000,1500,1500,1600,1600,1700,1800,1800,1900,1900,2000,2000,3045,0,1500.0,1445.2816717006253,504.5831250026822,1961.8753330141772,25.0 +1771780002,100,,Aggregated,63.500000,0.000000,1500,1500,1600,1600,1700,1800,1800,1900,1900,2000,2000,3110,0,1500.0,1446.941764709141,504.5831250026822,1961.8753330141772,25.0 +1771780003,100,,Aggregated,63.300000,0.000000,1500,1500,1600,1600,1700,1800,1800,1900,1900,2000,2000,3174,0,1500.0,1449.0915538347606,504.5831250026822,1961.8753330141772,25.0 +1771780004,100,,Aggregated,63.000000,0.000000,1500,1500,1600,1600,1700,1800,1800,1900,1900,2000,2000,3232,0,1500.0,1450.794850981613,504.5831250026822,1961.8753330141772,25.0 +1771780005,100,,Aggregated,62.900000,0.000000,1500,1500,1600,1600,1700,1800,1800,1900,1900,2000,2000,3301,0,1500.0,1453.0374849287298,504.5831250026822,1961.8753330141772,25.0 diff --git a/tests/load/results/broken_baseline_archive/baseline.html b/tests/load/results/broken_baseline_archive/baseline.html new file mode 100644 index 0000000..6c3a753 --- /dev/null +++ b/tests/load/results/broken_baseline_archive/baseline.html @@ -0,0 +1,124 @@ + + + + + + + + + + + Locust + + + + +
+ + + + + \ No newline at end of file diff --git a/tests/load/results/broken_baseline_archive/baseline_exceptions.csv b/tests/load/results/broken_baseline_archive/baseline_exceptions.csv new file mode 100644 index 0000000..5e0e870 --- /dev/null +++ b/tests/load/results/broken_baseline_archive/baseline_exceptions.csv @@ -0,0 +1 @@ +Count,Message,Traceback,Nodes diff --git a/tests/load/results/broken_baseline_archive/baseline_failures.csv b/tests/load/results/broken_baseline_archive/baseline_failures.csv new file mode 100644 index 0000000..f87ff75 --- /dev/null +++ b/tests/load/results/broken_baseline_archive/baseline_failures.csv @@ -0,0 +1 @@ +Method,Name,Error,Occurrences diff --git a/tests/load/results/broken_baseline_archive/baseline_stats.csv b/tests/load/results/broken_baseline_archive/baseline_stats.csv new file mode 100644 index 0000000..ff28efe --- /dev/null +++ b/tests/load/results/broken_baseline_archive/baseline_stats.csv @@ -0,0 +1,4 @@ +Type,Name,Request Count,Failure Count,Median Response Time,Average Response Time,Min Response Time,Max Response Time,Average Content Size,Requests/s,Failures/s,50%,66%,75%,80%,90%,95%,98%,99%,99.9%,99.99%,100% +POST,/AsyncPredict,18,0,24000.0,27650.756740611945,13191.930792003404,53540.412749993266,25.0,0.32402148939554815,0.0,24000,30000,33000,35000,51000,54000,54000,54000,54000,54000,54000 +POST,/SimplePredict,107,0,5100.0,7328.775276467759,805.6632499938132,49931.275541996,25.0,1.9261277425179806,0.0,5100,6200,7200,7700,7700,23000,42000,46000,50000,50000,50000 +,Aggregated,125,0,5600.0,10255.14060730452,805.6632499938132,53540.412749993266,25.0,2.250149231913529,0.0,5600,7200,7700,13000,26000,39000,50000,51000,54000,54000,54000 diff --git a/tests/load/results/broken_baseline_archive/baseline_stats_history.csv b/tests/load/results/broken_baseline_archive/baseline_stats_history.csv new file mode 100644 index 0000000..3d89805 --- /dev/null +++ b/tests/load/results/broken_baseline_archive/baseline_stats_history.csv @@ -0,0 +1,57 @@ +Timestamp,User Count,Type,Name,Requests/s,Failures/s,50%,66%,75%,80%,90%,95%,98%,99%,99.9%,99.99%,100%,Total Request Count,Total Failure Count,Total Median Response Time,Total Average Response Time,Total Min Response Time,Total Max Response Time,Total Average Content Size +1771692981,0,,Aggregated,0.000000,0.000000,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,N/A,0,0,0,0.0,0,0,0 +1771692982,10,,Aggregated,0.000000,0.000000,810,810,810,810,810,810,810,810,810,810,810,1,0,805.6632499938132,805.6632499938132,805.6632499938132,805.6632499938132,25.0 +1771692983,20,,Aggregated,0.000000,0.000000,1300,1300,1300,1300,1300,1300,1300,1300,1300,1300,1300,2,0,810.0,1064.1067914984887,805.6632499938132,1322.5503330031643,25.0 +1771692984,30,,Aggregated,0.000000,0.000000,1500,1500,2400,2400,2400,2400,2400,2400,2400,2400,2400,4,0,1300.0,1489.394989246648,805.6632499938132,2360.5907909950474,25.0 +1771692985,40,,Aggregated,0.000000,0.000000,2000,2000,2400,2400,3400,3400,3400,3400,3400,3400,3400,6,0,1500.0,1887.3389926642024,805.6632499938132,3393.2706659979885,25.0 +1771692986,50,,Aggregated,1.333333,0.000000,2400,2500,2500,3400,4900,4900,4900,4900,4900,4900,4900,9,0,2400.0,2368.033569110493,805.6632499938132,4933.092083010706,25.0 +1771692987,60,,Aggregated,1.500000,0.000000,2500,2500,2500,2500,3400,4900,4900,4900,4900,4900,4900,11,0,2500.0,2392.910673908773,805.6632499938132,4933.092083010706,25.0 +1771692988,70,,Aggregated,1.500000,0.000000,2500,2500,2500,2500,3400,4900,4900,4900,4900,4900,4900,11,0,2500.0,2392.910673908773,805.6632499938132,4933.092083010706,25.0 +1771692989,80,,Aggregated,1.200000,0.000000,2500,2500,2500,2600,3400,4900,4900,4900,4900,4900,4900,14,0,2500.0,2421.936240786246,805.6632499938132,4933.092083010706,25.0 +1771692990,90,,Aggregated,1.500000,0.000000,2500,2500,2600,2600,3400,4900,4900,4900,4900,4900,4900,16,0,2500.0,2437.714880000385,805.6632499938132,4933.092083010706,25.0 +1771692991,100,,Aggregated,1.500000,0.000000,2500,2500,2600,2600,3400,4900,4900,4900,4900,4900,4900,16,0,2500.0,2437.714880000385,805.6632499938132,4933.092083010706,25.0 +1771692992,100,,Aggregated,1.750000,0.000000,2500,2500,2500,2600,3400,4900,4900,4900,4900,4900,4900,19,0,2500.0,2448.9047542638696,805.6632499938132,4933.092083010706,25.0 +1771692993,100,,Aggregated,1.777778,0.000000,2500,2500,2600,3000,3400,4900,11000,11000,11000,11000,11000,22,0,2500.0,2873.953956318887,805.6632499938132,10636.83120800124,25.0 +1771692994,100,,Aggregated,1.777778,0.000000,2500,2500,2600,3000,3400,4900,11000,11000,11000,11000,11000,22,0,2500.0,2873.953956318887,805.6632499938132,10636.83120800124,25.0 +1771692995,100,,Aggregated,1.900000,0.000000,2500,3000,3100,4900,13000,13000,13000,13000,13000,13000,13000,29,0,2500.0,4314.252132070171,805.6632499938132,13194.241457997123,25.0 +1771692996,100,,Aggregated,2.000000,0.000000,2600,3000,3100,3400,13000,13000,13000,13000,13000,13000,13000,32,0,2600.0,4195.288412626269,805.6632499938132,13194.241457997123,25.0 +1771692997,100,,Aggregated,2.000000,0.000000,2600,3000,3100,3400,13000,13000,13000,13000,13000,13000,13000,32,0,2600.0,4195.288412626269,805.6632499938132,13194.241457997123,25.0 +1771692998,100,,Aggregated,2.600000,0.000000,3000,3100,3500,3600,13000,13000,16000,16000,16000,16000,16000,36,0,3000.0,4463.847347084487,805.6632499938132,15802.561499993317,25.0 +1771692999,100,,Aggregated,2.600000,0.000000,3000,3100,3500,3600,13000,13000,16000,16000,16000,16000,16000,36,0,3000.0,4463.847347084487,805.6632499938132,15802.561499993317,25.0 +1771693000,100,,Aggregated,2.100000,0.000000,3000,3500,4000,4900,13000,16000,19000,19000,19000,19000,19000,41,0,3000.0,5112.772894220742,805.6632499938132,18874.41945800674,25.0 +1771693001,100,,Aggregated,2.100000,0.000000,3000,3500,4000,4900,13000,16000,19000,19000,19000,19000,19000,41,0,3000.0,5112.772894220742,805.6632499938132,18874.41945800674,25.0 +1771693002,100,,Aggregated,2.100000,0.000000,3000,3500,4000,4900,13000,16000,19000,19000,19000,19000,19000,41,0,3000.0,5112.772894220742,805.6632499938132,18874.41945800674,25.0 +1771693003,100,,Aggregated,2.000000,0.000000,3100,4100,4600,11000,16000,19000,20000,20000,20000,20000,20000,47,0,3100.0,5722.24496712814,805.6632499938132,20464.040499995463,25.0 +1771693004,100,,Aggregated,2.000000,0.000000,3100,4100,4600,11000,16000,19000,20000,20000,20000,20000,20000,47,0,3100.0,5722.24496712814,805.6632499938132,20464.040499995463,25.0 +1771693005,100,,Aggregated,2.500000,0.000000,3500,4600,5100,13000,18000,20000,23000,23000,23000,23000,23000,53,0,3500.0,6329.993357661154,805.6632499938132,23048.565541001153,25.0 +1771693006,100,,Aggregated,2.500000,0.000000,3500,4600,5100,13000,18000,20000,23000,23000,23000,23000,23000,53,0,3500.0,6329.993357661154,805.6632499938132,23048.565541001153,25.0 +1771693007,100,,Aggregated,2.500000,0.000000,3500,4600,5100,13000,18000,20000,23000,23000,23000,23000,23000,53,0,3500.0,6329.993357661154,805.6632499938132,23048.565541001153,25.0 +1771693008,100,,Aggregated,1.800000,0.000000,4000,5100,5100,13000,19000,23000,23000,26000,26000,26000,26000,59,0,4000.0,6553.381324136014,805.6632499938132,25625.104000006104,25.0 +1771693009,100,,Aggregated,1.800000,0.000000,4000,5100,5100,13000,19000,23000,23000,26000,26000,26000,26000,59,0,4000.0,6553.381324136014,805.6632499938132,25625.104000006104,25.0 +1771693010,100,,Aggregated,1.800000,0.000000,4000,5100,5100,13000,19000,23000,23000,26000,26000,26000,26000,59,0,4000.0,6553.381324136014,805.6632499938132,25625.104000006104,25.0 +1771693011,100,,Aggregated,2.700000,0.000000,4600,5100,5100,11000,18000,20000,23000,26000,26000,26000,26000,64,0,4100.0,6440.431621062771,805.6632499938132,25625.104000006104,25.0 +1771693012,100,,Aggregated,2.700000,0.000000,4600,5100,5100,11000,18000,20000,23000,26000,26000,26000,26000,64,0,4100.0,6440.431621062771,805.6632499938132,25625.104000006104,25.0 +1771693013,100,,Aggregated,2.300000,0.000000,4600,5100,5100,5200,18000,20000,23000,26000,26000,26000,26000,69,0,4600.0,6344.890783174365,805.6632499938132,25625.104000006104,25.0 +1771693014,100,,Aggregated,2.300000,0.000000,4600,5100,5100,5200,18000,20000,23000,26000,26000,26000,26000,69,0,4600.0,6344.890783174365,805.6632499938132,25625.104000006104,25.0 +1771693015,100,,Aggregated,2.300000,0.000000,4600,5100,5100,5200,18000,20000,23000,26000,26000,26000,26000,69,0,4600.0,6344.890783174365,805.6632499938132,25625.104000006104,25.0 +1771693016,100,,Aggregated,1.700000,0.000000,5100,5100,5100,5200,18000,23000,26000,33000,33000,33000,33000,75,0,5100.0,6622.444131626787,805.6632499938132,33370.41241599945,25.0 +1771693017,100,,Aggregated,1.700000,0.000000,5100,5100,5100,5200,18000,23000,26000,33000,33000,33000,33000,75,0,5100.0,6622.444131626787,805.6632499938132,33370.41241599945,25.0 +1771693018,100,,Aggregated,1.700000,0.000000,5100,5100,5100,5200,18000,23000,26000,33000,33000,33000,33000,75,0,5100.0,6622.444131626787,805.6632499938132,33370.41241599945,25.0 +1771693019,100,,Aggregated,2.200000,0.000000,5100,5100,5600,13000,20000,24000,33000,35000,35000,35000,35000,84,0,5100.0,7535.649337774105,805.6632499938132,35466.63120799349,25.0 +1771693020,100,,Aggregated,2.200000,0.000000,5100,5100,5600,13000,20000,24000,33000,35000,35000,35000,35000,84,0,5100.0,7535.649337774105,805.6632499938132,35466.63120799349,25.0 +1771693021,100,,Aggregated,2.200000,0.000000,5100,5100,5600,13000,20000,24000,33000,35000,35000,35000,35000,84,0,5100.0,7535.649337774105,805.6632499938132,35466.63120799349,25.0 +1771693022,100,,Aggregated,2.500000,0.000000,5100,5100,6100,13000,23000,24000,35000,39000,39000,39000,39000,90,0,5100.0,7802.457972189424,805.6632499938132,38562.959875009255,25.0 +1771693023,100,,Aggregated,2.500000,0.000000,5100,5100,6100,13000,23000,24000,35000,39000,39000,39000,39000,90,0,5100.0,7802.457972189424,805.6632499938132,38562.959875009255,25.0 +1771693024,100,,Aggregated,2.500000,0.000000,5100,5100,6100,13000,23000,24000,35000,39000,39000,39000,39000,90,0,5100.0,7802.457972189424,805.6632499938132,38562.959875009255,25.0 +1771693025,100,,Aggregated,2.600000,0.000000,5100,5600,6700,11000,23000,26000,39000,42000,42000,42000,42000,97,0,5100.0,8086.352363784021,805.6632499938132,42174.885790998815,25.0 +1771693026,100,,Aggregated,2.600000,0.000000,5100,5600,6700,11000,23000,26000,39000,42000,42000,42000,42000,97,0,5100.0,8086.352363784021,805.6632499938132,42174.885790998815,25.0 +1771693027,100,,Aggregated,2.600000,0.000000,5100,5600,6700,11000,23000,26000,39000,42000,42000,42000,42000,97,0,5100.0,8086.352363784021,805.6632499938132,42174.885790998815,25.0 +1771693028,100,,Aggregated,2.600000,0.000000,5100,5600,6700,11000,23000,26000,39000,42000,42000,42000,42000,97,0,5100.0,8086.352363784021,805.6632499938132,42174.885790998815,25.0 +1771693029,100,,Aggregated,2.200000,0.000000,5100,6100,7200,13000,23000,33000,39000,42000,46000,46000,46000,105,0,5100.0,8647.202811067247,805.6632499938132,45795.427167002345,25.0 +1771693030,100,,Aggregated,2.200000,0.000000,5100,6100,7200,13000,23000,33000,39000,42000,46000,46000,46000,105,0,5100.0,8647.202811067247,805.6632499938132,45795.427167002345,25.0 +1771693031,100,,Aggregated,2.200000,0.000000,5100,6100,7200,13000,23000,33000,39000,42000,46000,46000,46000,105,0,5100.0,8647.202811067247,805.6632499938132,45795.427167002345,25.0 +1771693032,100,,Aggregated,2.200000,0.000000,5100,6100,7200,13000,23000,33000,39000,42000,46000,46000,46000,105,0,5100.0,8647.202811067247,805.6632499938132,45795.427167002345,25.0 +1771693033,100,,Aggregated,2.100000,0.000000,5100,6700,7700,13000,24000,39000,50000,50000,51000,51000,51000,116,0,5100.0,9853.008594060722,805.6632499938132,50939.16370799707,25.0 +1771693034,100,,Aggregated,2.100000,0.000000,5100,6700,7700,13000,24000,39000,50000,50000,51000,51000,51000,116,0,5100.0,9853.008594060722,805.6632499938132,50939.16370799707,25.0 +1771693035,100,,Aggregated,2.100000,0.000000,5100,6700,7700,13000,24000,39000,50000,50000,51000,51000,51000,116,0,5100.0,9853.008594060722,805.6632499938132,50939.16370799707,25.0 +1771693036,100,,Aggregated,2.100000,0.000000,5100,6700,7700,13000,24000,39000,50000,50000,51000,51000,51000,116,0,5100.0,9853.008594060722,805.6632499938132,50939.16370799707,25.0 diff --git a/tests/load/run_benchmark.sh b/tests/load/run_benchmark.sh new file mode 100755 index 0000000..553d0c3 --- /dev/null +++ b/tests/load/run_benchmark.sh @@ -0,0 +1,81 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Single-scenario benchmark runner. +# For multi-scenario matrix, use run_matrix.sh instead. + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" +RESULTS_DIR="$SCRIPT_DIR/results" +MOCK_PORT=${MOCK_PORT:-9999} +SERVER_PORT=${SERVER_PORT:-8000} +USERS=${USERS:-100} +SPAWN_RATE=${SPAWN_RATE:-10} +DURATION=${DURATION:-60s} +MOCK_DELAY_MS=${MOCK_DELAY_MS:-500} +LABEL=${LABEL:-"$(git -C "$REPO_ROOT" rev-parse --short HEAD)"} + +mkdir -p "$RESULTS_DIR" + +kill_port() { + # Kill ALL processes listening on a port, including children + lsof -ti:"$1" 2>/dev/null | xargs kill -9 2>/dev/null || true +} + +cleanup() { + echo "Cleaning up..." + kill_port $MOCK_PORT + kill_port $SERVER_PORT + sleep 1 +} +trap cleanup EXIT + +# 0. Ensure ports are free before starting +kill_port $MOCK_PORT +kill_port $SERVER_PORT +sleep 1 + +# 1. Start mock LLM server +echo "Starting mock LLM server on :$MOCK_PORT (delay=${MOCK_DELAY_MS}ms)..." +MOCK_DELAY_MS=$MOCK_DELAY_MS MOCK_PORT=$MOCK_PORT python "$SCRIPT_DIR/mock_lm_server.py" & +sleep 1 + +if ! curl -sf http://127.0.0.1:$MOCK_PORT/health > /dev/null; then + echo "ERROR: Mock LLM server failed to start" + exit 1 +fi +echo "Mock LLM server ready." + +# 2. Start dspy-cli server against fixture project +echo "Starting dspy-cli server on :$SERVER_PORT..." +pushd "$SCRIPT_DIR/fixture_project" > /dev/null +dspy-cli serve --port $SERVER_PORT --no-reload --no-save-openapi --system & +popd > /dev/null +sleep 3 + +# 3. Wait for server health +echo "Waiting for server..." +for i in {1..20}; do + if curl -sf http://127.0.0.1:$SERVER_PORT/programs > /dev/null; then + echo "Server ready." + break + fi + if [ $i -eq 20 ]; then + echo "ERROR: Server failed to start within 20s" + exit 1 + fi + sleep 1 +done + +# 4. Run load test +echo "Running load test (users=$USERS, delay=${MOCK_DELAY_MS}ms, duration=$DURATION)..." +locust -f "$SCRIPT_DIR/locustfile.py" \ + --host http://127.0.0.1:$SERVER_PORT \ + --headless \ + -u $USERS -r $SPAWN_RATE \ + --run-time $DURATION \ + --csv "$RESULTS_DIR/$LABEL" \ + --html "$RESULTS_DIR/$LABEL.html" + +echo "Results written to $RESULTS_DIR/$LABEL*.csv" +echo "Done." diff --git a/tests/load/run_matrix.sh b/tests/load/run_matrix.sh new file mode 100755 index 0000000..b7c4f21 --- /dev/null +++ b/tests/load/run_matrix.sh @@ -0,0 +1,156 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Matrix benchmark: runs multiple delay x user-count scenarios. +# +# Each scenario boots fresh servers to avoid cross-contamination. +# Results go to tests/load/results/