Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,10 @@ mlruns
mlartifacts

# Documentation
/site/
/site/

# Load test results
tests/load/results/*.csv
tests/load/results/*.html
tests/load/fixture_project/openapi.json
tests/load/fixture_project/logs/*.log
54 changes: 54 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""
Starts mock LLM server and dspy-cli server as subprocess fixtures.
Tests in this directory require these fixtures.
"""
import subprocess
import time
import os
import sys

import httpx
import pytest


MOCK_PORT = 9999
SERVER_PORT = 8000
FIXTURE_PROJECT = os.path.join(os.path.dirname(__file__), "..", "load", "fixture_project")


@pytest.fixture(scope="session", autouse=True)
def mock_lm_server():
proc = subprocess.Popen(
[sys.executable, os.path.join(os.path.dirname(__file__), "..", "load", "mock_lm_server.py")],
env={**os.environ, "MOCK_DELAY_MS": "50", "MOCK_PORT": str(MOCK_PORT)},
)
# Wait for mock server to be ready
for _ in range(10):
try:
httpx.get(f"http://127.0.0.1:{MOCK_PORT}/health", timeout=1)
break
except Exception:
time.sleep(0.5)
yield proc
proc.terminate()
proc.wait()


@pytest.fixture(scope="session", autouse=True)
def dspy_cli_server(mock_lm_server):
proc = subprocess.Popen(
[sys.executable, "-m", "dspy_cli.server.runner",
"--port", str(SERVER_PORT), "--host", "127.0.0.1"],
cwd=FIXTURE_PROJECT,
)
# Wait for server to be ready
for _ in range(30):
try:
resp = httpx.get(f"http://127.0.0.1:{SERVER_PORT}/programs", timeout=2)
if resp.status_code == 200:
break
except Exception:
time.sleep(1)
yield proc
proc.terminate()
proc.wait()
76 changes: 76 additions & 0 deletions tests/integration/test_concurrent_requests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""
Concurrent correctness tests. Not a load test — verifies that responses
are correct under concurrency, not just that the server survives.

Requires a running dspy-cli server + mock LLM. Uses the fixtures in conftest.py.
"""
import asyncio

import httpx
import pytest

BASE_URL = "http://127.0.0.1:8000"


async def make_request(client: httpx.AsyncClient, endpoint: str, question: str):
response = await client.post(
f"{BASE_URL}/{endpoint}",
json={"question": question},
timeout=30.0
)
return response


@pytest.mark.asyncio
async def test_sync_module_concurrent_correctness():
"""20 concurrent requests to sync module should all succeed with valid responses."""
async with httpx.AsyncClient() as client:
tasks = [
make_request(client, "SimplePredict", f"Question {i}")
for i in range(20)
]
responses = await asyncio.gather(*tasks)

for i, r in enumerate(responses):
assert r.status_code == 200, f"Request {i} failed: {r.text}"
assert "answer" in r.json(), f"Request {i} missing 'answer': {r.json()}"


@pytest.mark.asyncio
async def test_async_module_concurrent_correctness():
"""20 concurrent requests to async module should all succeed."""
async with httpx.AsyncClient() as client:
tasks = [
make_request(client, "AsyncPredict", f"Question {i}")
for i in range(20)
]
responses = await asyncio.gather(*tasks)

for i, r in enumerate(responses):
assert r.status_code == 200, f"Request {i} failed: {r.text}"
assert "answer" in r.json(), f"Request {i} missing 'answer': {r.json()}"


@pytest.mark.asyncio
async def test_no_response_cross_contamination():
"""
Verifies that concurrent requests don't bleed into each other's outputs.
Sends requests with distinct questions and checks that answers are independent.
This would catch ContextVar leakage or shared state bugs.
"""
questions = [f"Unique question {i} xyzzy" for i in range(10)]

async with httpx.AsyncClient() as client:
tasks = [
make_request(client, "AsyncPredict", q)
for q in questions
]
responses = await asyncio.gather(*tasks)

for r in responses:
assert r.status_code == 200
data = r.json()
assert "answer" in data
# Mock server returns the same canned response, but we're verifying
# there's no exception or empty response caused by state mixing.
assert data["answer"] != ""
55 changes: 55 additions & 0 deletions tests/load/assert_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""
Compares two locust CSV result files and fails if performance has regressed.
Usage: python tests/load/assert_benchmark.py results/baseline_stats.csv results/current_stats.csv
"""
import csv
import sys


def load_stats(path):
with open(path) as f:
reader = csv.DictReader(f)
for row in reader:
if row["Name"] == "Aggregated":
return {
"rps": float(row["Requests/s"]),
"p95": float(row["95%"]),
"failures": float(row["Failure Count"]) / max(float(row["Request Count"]), 1),
}
raise ValueError(f"No 'Aggregated' row found in {path}")


def main():
if len(sys.argv) != 3:
print(f"Usage: {sys.argv[0]} <baseline_stats.csv> <current_stats.csv>")
sys.exit(2)

baseline = load_stats(sys.argv[1])
current = load_stats(sys.argv[2])

rps_change = (current["rps"] - baseline["rps"]) / baseline["rps"]
p95_change = (current["p95"] - baseline["p95"]) / baseline["p95"]
Comment on lines +30 to +31
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Handle zero baseline metrics before delta math

The benchmark assertion computes percentage deltas by dividing by baseline rps and p95 without guarding for zero. If the baseline run produced zero throughput or zero latency (for example after an unsuccessful run that still writes an Aggregated row), this raises ZeroDivisionError and aborts comparison logic instead of surfacing a clear regression/failure result.

Useful? React with 👍 / 👎.


print(f"RPS: {baseline['rps']:.1f} -> {current['rps']:.1f} ({rps_change:+.1%})")
print(f"P95 (ms): {baseline['p95']:.0f} -> {current['p95']:.0f} ({p95_change:+.1%})")
print(f"Failures: {baseline['failures']:.1%} -> {current['failures']:.1%}")

errors = []
if rps_change < -0.10:
errors.append(f"RPS dropped {rps_change:.1%} (threshold: -10%)")
if p95_change > 0.20:
errors.append(f"P95 increased {p95_change:.1%} (threshold: +20%)")
if current["failures"] > 0.01:
errors.append(f"Failure rate {current['failures']:.1%} > 1%")

if errors:
print("\nREGRESSION DETECTED:")
for e in errors:
print(f" x {e}")
sys.exit(1)

print("\nAll performance gates passed.")


if __name__ == "__main__":
main()
24 changes: 24 additions & 0 deletions tests/load/fixture_project/dspy.config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
app_id: load-test-app
models:
default: model-alpha
registry:
model-alpha:
model: openai/mock-alpha
api_base: http://127.0.0.1:9999/v1
api_key: mock-key
model_type: chat
max_tokens: 100
temperature: 1.0
cache: false
model-beta:
model: openai/mock-beta
api_base: http://127.0.0.1:9999/v1
api_key: mock-key
model_type: chat
max_tokens: 100
temperature: 1.0
cache: false

program_models:
SimplePredict: model-alpha
AsyncPredict: model-beta
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import dspy


class AsyncPredict(dspy.Module):
"""Same as SimplePredict but with aforward. Used to test async path."""
def __init__(self):
self.predict = dspy.Predict("question:str -> answer:str")

def forward(self, question: str) -> dspy.Prediction:
return self.predict(question=question)

async def aforward(self, question: str) -> dspy.Prediction:
return await self.predict.acall(question=question)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import dspy


class SimplePredict(dspy.Module):
"""Single-predict module. Used to test sync fallback path."""
def __init__(self):
self.predict = dspy.Predict("question:str -> answer:str")

def forward(self, question: str) -> dspy.Prediction:
return self.predict(question=question)
78 changes: 78 additions & 0 deletions tests/load/locustfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
Locust load test for dspy-cli.

Validates both throughput and correctness: each program must receive
responses routed through its configured model. The mock LLM server
echoes the model name back in the answer, so we can verify it.

Single-scenario run:
locust -f tests/load/locustfile.py \
--host http://localhost:8000 \
--headless -u 100 -r 10 \
--run-time 60s \
--csv results/test

Matrix run (preferred):
bash tests/load/run_matrix.sh
"""
import uuid
from locust import HttpUser, task, between, events


def unique_payload():
"""Generate a unique question per request to defeat any caching layer."""
return {"question": f"What is the capital of France? [{uuid.uuid4().hex[:8]}]"}


class SyncModuleUser(HttpUser):
"""Hits the sync-fallback module (no aforward). Expects model-alpha."""
wait_time = between(0.01, 0.1)
weight = 1

@task
def call_simple_predict(self):
with self.client.post(
"/SimplePredict",
json=unique_payload(),
catch_response=True
) as response:
if response.status_code != 200:
response.failure(f"Got {response.status_code}: {response.text[:200]}")
return
body = response.json()
answer = body.get("answer", "")
if "model=mock-alpha" not in answer:
response.failure(
f"SimplePredict model mismatch: expected mock-alpha, got: {answer[:100]}"
)


class AsyncModuleUser(HttpUser):
"""Hits the native async module (has aforward). Expects model-beta."""
wait_time = between(0.01, 0.1)
weight = 1

@task
def call_async_predict(self):
with self.client.post(
"/AsyncPredict",
json=unique_payload(),
catch_response=True
) as response:
if response.status_code != 200:
response.failure(f"Got {response.status_code}: {response.text[:200]}")
return
body = response.json()
answer = body.get("answer", "")
if "model=mock-beta" not in answer:
response.failure(
f"AsyncPredict model mismatch: expected mock-beta, got: {answer[:100]}"
)


@events.quitting.add_listener
def on_quit(environment, **kwargs):
"""Fail CI if error rate exceeds threshold."""
if environment.runner.stats.total.fail_ratio > 0.01:
print(f"ERROR: Failure rate {environment.runner.stats.total.fail_ratio:.1%} > 1%")
environment.process_exit_code = 1
Loading