Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/dspy_cli/server/routes.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
"""Dynamic route generation for DSPy programs."""

import asyncio
import logging
from typing import Any, Dict

import dspy
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from pydantic import create_model

from dspy_cli.discovery import DiscoveredModule
from dspy_cli.discovery.gateway_finder import get_gateway_for_module
from dspy_cli.gateway import APIGateway, IdentityGateway
from dspy_cli.server.execution import _convert_dspy_types, execute_pipeline

DEFAULT_MAX_CONCURRENT = 20
_program_semaphores: Dict[str, asyncio.Semaphore] = {}

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -77,8 +82,21 @@ def create_program_routes(
else:
route_path = f"/{program_name}/{gateway.__class__.__name__}"

max_concurrent = config.get("server", {}).get("max_concurrent_per_program", DEFAULT_MAX_CONCURRENT)
if program_name not in _program_semaphores:
_program_semaphores[program_name] = asyncio.Semaphore(max_concurrent)
sem = _program_semaphores[program_name]
Comment on lines +86 to +88
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep concurrency semaphores per app instance

Using the module-level _program_semaphores cache here makes semaphore state global across all create_app() calls in the same Python process, so apps with the same program_name end up sharing a single limiter and the first app’s max_concurrent_per_program setting is reused by later apps. This can produce unexpected 429s (cross-app backpressure) and ignores updated concurrency config in test/integration setups or embedded multi-app deployments; storing semaphores on app.state (or rebuilding them per app) avoids this bleed-through.

Useful? React with 👍 / 👎.


async def run_program(request: request_model):
"""Execute the DSPy program with given inputs."""
try:
await asyncio.wait_for(sem.acquire(), timeout=30.0)
except asyncio.TimeoutError:
return JSONResponse(
status_code=429,
content={"detail": f"Too many concurrent requests for '{program_name}'. Try again later."},
)

try:
pipeline_inputs = gateway.to_pipeline_inputs(request)

Expand All @@ -100,6 +118,8 @@ async def run_program(request: request_model):

except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
sem.release()

# Initialize gateway lifecycle
gateway.setup()
Expand Down