diff --git a/src/dspy_cli/server/app.py b/src/dspy_cli/server/app.py index 471a562..6496d22 100644 --- a/src/dspy_cli/server/app.py +++ b/src/dspy_cli/server/app.py @@ -1,17 +1,24 @@ -"""FastAPI application factory.""" +"""FastAPI application factory with deferred initialization. + +The app shell (health endpoints, CORS, auth) is created immediately so the +server port binds fast. Heavy work (module discovery, LM creation, route +registration) is deferred to the ASGI lifespan startup, which runs *after* +the socket is listening. This eliminates false "not listening" warnings from +container orchestrators like Fly.io and Kubernetes. +""" import logging import os from contextlib import asynccontextmanager +from dataclasses import dataclass from pathlib import Path -from typing import Dict, List, Union +from typing import Dict, List, Optional, Union + +from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse - import dspy -from fastapi import FastAPI - from dspy_cli.config import get_model_config, get_program_model from dspy_cli.discovery import discover_modules from dspy_cli.discovery.gateway_finder import get_gateways_for_module, is_cron_gateway @@ -26,6 +33,17 @@ logger = logging.getLogger(__name__) +@dataclass +class _AppInitParams: + """Parameters stored on app.state for deferred initialization.""" + config: Dict + package_path: Path + package_name: str + logs_dir: Path + enable_auth: bool + sync_workers: Optional[int] + + def create_app( config: Dict, package_path: Path, @@ -37,6 +55,10 @@ def create_app( ) -> FastAPI: """Create and configure the FastAPI application. + Returns a lightweight app shell with health endpoints. Heavy initialization + (module discovery, LM creation, route registration) is deferred to the + ASGI lifespan so the server port binds quickly. + Args: config: Loaded configuration dictionary package_path: Path to the modules package @@ -49,30 +71,80 @@ def create_app( Returns: Configured FastAPI application """ - # Setup logging setup_logging() - # Initialize bounded executor for sync module execution - worker_count = sync_workers or config.get("server", {}).get("sync_worker_threads") or DEFAULT_SYNC_WORKERS - init_executor(max_workers=worker_count) + # Configure default DSPy model synchronously (before async lifespan). + # dspy.settings.configure() must be called outside async tasks due to DSPy's + # ownership guard, and it's lightweight (no I/O). + default_model_alias = config["models"]["default"] + default_model_config = get_model_config(config, default_model_alias) + _configure_dspy_model(default_model_config) + logger.info(f"Configured default model: {default_model_alias}") + + init_params = _AppInitParams( + config=config, + package_path=package_path, + package_name=package_name, + logs_dir=logs_dir, + enable_auth=enable_auth, + sync_workers=sync_workers, + ) - # Create FastAPI app app = FastAPI( title="DSPy API", description="Automatically generated API for DSPy programs", version="0.1.0", - lifespan=lifespan, + lifespan=_make_lifespan(init_params), ) - # Configure CORS if enabled (env var takes precedence over config file) + # Pre-init state + app.state._ready = False + app.state.modules = [] + app.state.program_lms = {} + app.state.logs_dir = logs_dir + app.state.metrics_cache = {} + + # CORS (lightweight, no deferred init needed) + _setup_cors(app, config) + + # Health endpoints (available as soon as the port binds) + @app.get("/health/live") + async def liveness(): + """Liveness probe -- returns 200 if the process is running.""" + return {"status": "alive"} + + @app.get("/health/ready") + async def readiness(): + """Readiness probe -- returns 200 when all LM instances are initialized.""" + if not app.state._ready: + return JSONResponse(status_code=503, content={"status": "starting"}) + modules = app.state.modules + if not modules: + return JSONResponse(status_code=503, content={"status": "not_ready", "reason": "no modules discovered"}) + missing = [m.name for m in modules if m.name not in app.state.program_lms] + if missing: + return JSONResponse(status_code=503, content={"status": "not_ready", "reason": f"LMs not initialized: {missing}"}) + return {"status": "ready", "programs": len(modules)} + + # Auth middleware (early setup with mutable open_paths so lifespan can add gateway paths) + if enable_auth: + _setup_auth_middleware(app) + + return app + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _setup_cors(app: FastAPI, config: Dict): + """Configure CORS middleware (lightweight, runs immediately).""" cors_origins: Union[str, List[str], None] = os.environ.get("DSPY_CORS_ORIGINS") if cors_origins is None: cors_origins = config.get("server", {}).get("cors_origins") if cors_origins: - if cors_origins == "*" or cors_origins == ["*"]: - # Wildcard mode - no credentials allowed app.add_middleware( CORSMiddleware, allow_origins=["*"], @@ -82,7 +154,6 @@ def create_app( ) logger.info("CORS enabled for all origins (wildcard mode)") else: - # Specific origins - allow credentials origins = ( cors_origins if isinstance(cors_origins, list) @@ -97,13 +168,102 @@ def create_app( ) logger.info(f"CORS enabled for origins: {origins}") - # Store logs directory and metrics cache in app state - app.state.logs_dir = logs_dir - app.state.metrics_cache = {} + +def _setup_auth_middleware(app: FastAPI): + """Set up auth middleware with default open paths. + + The open_paths set is mutable so the lifespan can add gateway public + paths after module discovery. + """ + from dspy_cli.server.auth import ( + DEFAULT_OPEN_PATHS, + AuthMiddleware, + create_auth_routes, + generate_token, + get_api_token, + ) + + token = get_api_token() + if not token: + token = generate_token() + os.environ["DSPY_API_KEY"] = token + logger.warning("=" * 60) + logger.warning("DSPY_API_KEY not set. Generated temporary token:") + logger.warning(f" {token}") + logger.warning("Set DSPY_API_KEY as an environment secret for a persistent token.") + logger.warning("=" * 60) + + auth_router = create_auth_routes(token) + app.include_router(auth_router) + + # Mutable set -- _deferred_init will add gateway public paths later + open_paths = set(DEFAULT_OPEN_PATHS) + app.state._auth_open_paths = open_paths + + app.add_middleware(AuthMiddleware, token=token, open_paths=open_paths) + logger.info("Authentication enabled") + + +# --------------------------------------------------------------------------- +# Lifespan & deferred initialization +# --------------------------------------------------------------------------- + +def _make_lifespan(params: _AppInitParams): + """Create the ASGI lifespan context manager. + + Heavy initialization runs here, *after* the server socket is bound. + """ + @asynccontextmanager + async def lifespan(app: FastAPI): + # --- Startup (port is already listening at this point) --- + _deferred_init(app, params) + + # Run any callbacks registered by runner.py (MCP, OpenAPI save, etc.) + for cb in getattr(app.state, "_on_ready_callbacks", []): + try: + cb() + except Exception as e: + logger.warning(f"on_ready callback error: {e}") + + scheduler = getattr(app.state, "scheduler", None) + if scheduler and scheduler.job_count > 0: + scheduler.start() + + yield + + # --- Shutdown --- + if scheduler and scheduler.job_count > 0: + scheduler.shutdown() + + for shutdown_fn in getattr(app.state, "_gateway_shutdowns", []): + try: + shutdown_fn() + except Exception as e: + logger.warning(f"Gateway shutdown error: {e}") + + shutdown_executor() + + return lifespan + + +def _deferred_init(app: FastAPI, params: _AppInitParams): + """Heavy initialization: module discovery, LM creation, route registration. + + Called from the lifespan startup, after the server port is already bound. + """ + config = params.config + + # Initialize bounded executor for sync module execution + worker_count = ( + params.sync_workers + or config.get("server", {}).get("sync_worker_threads") + or DEFAULT_SYNC_WORKERS + ) + init_executor(max_workers=worker_count) # Discover modules - logger.info(f"Discovering modules in {package_path}") - modules = discover_modules(package_path, package_name) + logger.info(f"Discovering modules in {params.package_path}") + modules = discover_modules(params.package_path, params.package_name) if not modules: logger.warning("No DSPy modules discovered!") @@ -115,49 +275,32 @@ def create_app( duplicate_set = set(duplicates) error_msg = f"Error: Duplicate module names found: {', '.join(sorted(duplicate_set))}" logger.error(error_msg) - logger.error("Each module must have a unique class name.") raise ValueError(error_msg) - # Configure default model - default_model_alias = config["models"]["default"] - default_model_config = get_model_config(config, default_model_alias) - _configure_dspy_model(default_model_config) - - logger.info(f"Configured default model: {default_model_alias}") - - # Create LM instances for each program and store them - app.state.program_lms = {} + # Create LM instances for each program (dspy.settings already configured in create_app) for module in modules: - # Get model for this program (could be overridden) model_alias = get_program_model(config, module.name) model_config = get_model_config(config, model_alias) - - # Create LM instance for this program lm = _create_lm_instance(model_config) app.state.program_lms[module.name] = lm - logger.info(f"Created LM for program: {module.name} (model: {model_alias})") # Initialize scheduler for cron gateways - scheduler = GatewayScheduler(logs_dir) + scheduler = GatewayScheduler(params.logs_dir) app.state.scheduler = scheduler # Track registered API paths to detect conflicts - registered_paths: Dict[str, str] = {} # path -> "module.gateway" for error messages + registered_paths: Dict[str, str] = {} # Create routes for each discovered module for module in modules: - # Get the LM instance for this program lm = app.state.program_lms[module.name] model_alias = get_program_model(config, module.name) model_config = get_model_config(config, model_alias) - - # Get all gateways for this module and route by type gateways = get_gateways_for_module(module) for gateway in gateways: if is_cron_gateway(gateway): - # Register with scheduler instead of creating HTTP route scheduler.register_cron_gateway( module=module, gateway=gateway, @@ -166,7 +309,6 @@ def create_app( ) logger.info(f"Registered cron gateway: {module.name} ({gateway.__class__.__name__}, schedule: {gateway.schedule})") elif isinstance(gateway, APIGateway): - # Calculate the route path (same logic as routes.py) if gateway.path: route_path = gateway.path elif isinstance(gateway, IdentityGateway): @@ -174,7 +316,6 @@ def create_app( else: route_path = f"/{module.name}/{gateway.__class__.__name__}" - # Check for path conflicts gateway_id = f"{module.name}.{gateway.__class__.__name__}" if route_path in registered_paths: existing = registered_paths[route_path] @@ -192,22 +333,6 @@ def create_app( else: logger.warning(f"Unknown gateway type for {module.name}: {type(gateway)}") - # Health check endpoints - @app.get("/health/live") - async def liveness(): - """Liveness probe -- returns 200 if the process is running.""" - return {"status": "alive"} - - @app.get("/health/ready") - async def readiness(): - """Readiness probe -- returns 200 when all LM instances are initialized.""" - if not modules: - return JSONResponse(status_code=503, content={"status": "not_ready", "reason": "no modules discovered"}) - missing = [m.name for m in modules if m.name not in app.state.program_lms] - if missing: - return JSONResponse(status_code=503, content={"status": "not_ready", "reason": f"LMs not initialized: {missing}"}) - return {"status": "ready", "programs": len(modules)} - # Add programs list endpoint @app.get("/programs") async def list_programs(): @@ -215,29 +340,20 @@ async def list_programs(): programs = [] for module in modules: model_alias = get_program_model(config, module.name) - - program_info = { + programs.append({ "name": module.name, "model": model_alias, "endpoint": f"/{module.name}", - } - - programs.append(program_info) - + }) return {"programs": programs} # Add metrics endpoints @app.get("/api/metrics") async def list_metrics(sort_by: str = "calls", order: str = "desc"): - """Get aggregated metrics for all programs. - - Args: - sort_by: Sort key (name, calls, latency, cost, tokens, last_call) - order: Sort order (asc, desc) - """ + """Get aggregated metrics for all programs.""" program_names = [m.name for m in modules] metrics_list = get_all_metrics( - logs_dir, + params.logs_dir, program_names, app.state.metrics_cache, sort_by=sort_by, @@ -253,40 +369,28 @@ async def program_metrics(program_name: str): raise HTTPException(status_code=404, detail=f"Program '{program_name}' not found") metrics = get_program_metrics_cached( - logs_dir, + params.logs_dir, program_name, app.state.metrics_cache, ) return {"metrics": metrics.to_dict()} - # Store modules in app state for access by routes + # Store modules and config in app state app.state.modules = modules app.state.config = config - # Enhance OpenAPI metadata with DSPy-specific information + # Enhance OpenAPI metadata app_id = config.get("app_id", "DSPy API") app_description = config.get("description", "Automatically generated API for DSPy programs") - - # Create program-to-model mapping program_models = {module.name: get_program_model(config, module.name) for module in modules} - - # Create DSPy extensions extensions = create_openapi_extensions(config, modules, program_models) - - enhance_openapi_metadata( - app, - title=app_id, - description=app_description, - extensions=extensions - ) - + enhance_openapi_metadata(app, title=app_id, description=app_description, extensions=extensions) logger.info("Enhanced OpenAPI metadata with DSPy configuration") - # Register UI routes (always enabled) + # Register UI routes from fastapi.staticfiles import StaticFiles from dspy_cli.server.ui import create_ui_routes - # Mount static files static_dir = Path(__file__).parent.parent / "templates" / "ui" / "static" if static_dir.exists(): app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") @@ -294,81 +398,27 @@ async def program_metrics(program_name: str): else: logger.warning(f"Static directory not found: {static_dir}") - # Create UI routes - create_ui_routes(app, modules, config, logs_dir, auth_enabled=enable_auth) + create_ui_routes(app, modules, config, params.logs_dir, auth_enabled=params.enable_auth) logger.info("UI routes registered") - # Setup authentication if enabled - if enable_auth: - from dspy_cli.server.auth import ( - DEFAULT_OPEN_PATHS, - AuthMiddleware, - create_auth_routes, - generate_token, - get_api_token, - ) - - token = get_api_token() - if not token: - # Auto-generate a token and log it (Jupyter-style) - token = generate_token() - import os as os_module - os_module.environ["DSPY_API_KEY"] = token - logger.warning("=" * 60) - logger.warning("DSPY_API_KEY not set. Generated temporary token:") - logger.warning(f" {token}") - logger.warning("Set DSPY_API_KEY as an environment secret for a persistent token.") - logger.warning("=" * 60) - - # Add auth routes (login/logout) - auth_router = create_auth_routes(token) - app.include_router(auth_router) - - # Combine default open paths with gateway public paths (requires_auth=False) - open_paths = set(DEFAULT_OPEN_PATHS) + # Update auth open paths with gateway public paths discovered during route creation + if params.enable_auth and hasattr(app.state, "_auth_open_paths"): if hasattr(app.state, "public_paths"): - open_paths.update(app.state.public_paths) + app.state._auth_open_paths.update(app.state.public_paths) - # Add auth middleware (must be added after routes) - app.add_middleware(AuthMiddleware, token=token, open_paths=open_paths) - logger.info("Authentication enabled") + # Reset cached OpenAPI schema since routes were added dynamically + app.openapi_schema = None - return app + app.state._ready = True + logger.info("Application initialization complete") -@asynccontextmanager -async def lifespan(app: FastAPI): - """Lifespan context manager for startup/shutdown events.""" - # Startup - scheduler = getattr(app.state, "scheduler", None) - if scheduler and scheduler.job_count > 0: - scheduler.start() - - yield - - # Shutdown - if scheduler and scheduler.job_count > 0: - scheduler.shutdown() - - for shutdown_fn in getattr(app.state, "_gateway_shutdowns", []): - try: - shutdown_fn() - except Exception as e: - logger.warning(f"Gateway shutdown error: {e}") - - shutdown_executor() - +# --------------------------------------------------------------------------- +# LM helpers (unchanged) +# --------------------------------------------------------------------------- def _create_lm_instance(model_config: Dict) -> dspy.LM: - """Create a DSPy LM instance from configuration. - - Args: - model_config: Model configuration dictionary - - Returns: - Configured LM instance - """ - # Extract configuration + """Create a DSPy LM instance from configuration.""" model = model_config.get("model") model_type = model_config.get("model_type", "chat") temperature = model_config.get("temperature") @@ -377,7 +427,6 @@ def _create_lm_instance(model_config: Dict) -> dspy.LM: api_base = model_config.get("api_base") cache = model_config.get("cache") - # Build kwargs kwargs = {} if temperature is not None: kwargs["temperature"] = temperature @@ -390,25 +439,12 @@ def _create_lm_instance(model_config: Dict) -> dspy.LM: if cache is not None: kwargs["cache"] = cache - # Create and return LM instance - return dspy.LM( - model=model, - model_type=model_type, - **kwargs - ) + return dspy.LM(model=model, model_type=model_type, **kwargs) def _configure_dspy_model(model_config: Dict): - """Configure DSPy with a language model. - - Args: - model_config: Model configuration dictionary - """ - # Create LM instance + """Configure DSPy with a language model.""" lm = _create_lm_instance(model_config) - - # Disable global history: it's an unprotected plain list that races under - # concurrent async/threaded requests. Inference logs capture everything we need. dspy.settings.configure(lm=lm, disable_history=True) model = model_config.get("model") diff --git a/src/dspy_cli/server/runner.py b/src/dspy_cli/server/runner.py index c6aac83..2b5ba17 100644 --- a/src/dspy_cli/server/runner.py +++ b/src/dspy_cli/server/runner.py @@ -21,6 +21,8 @@ ENV_LOGS_DIR = "DSPY_CLI_LOGS_DIR" ENV_AUTH_ENABLED = "DSPY_CLI_AUTH_ENABLED" ENV_SYNC_WORKERS = "DSPY_CLI_SYNC_WORKERS" +ENV_SAVE_OPENAPI = "DSPY_CLI_SAVE_OPENAPI" +ENV_OPENAPI_FORMAT = "DSPY_CLI_OPENAPI_FORMAT" def _maybe_mount_mcp(app, enable: bool, *, path: str = MCP_DEFAULT_PATH, notify=None) -> bool: @@ -89,6 +91,8 @@ def create_app_instance(): enable_auth = os.environ.get(ENV_AUTH_ENABLED, "false").lower() == "true" sync_workers_str = os.environ.get(ENV_SYNC_WORKERS) sync_workers = int(sync_workers_str) if sync_workers_str else None + save_openapi = os.environ.get(ENV_SAVE_OPENAPI, "true").lower() == "true" + openapi_format = os.environ.get(ENV_OPENAPI_FORMAT, "json") # Validate project structure if not validate_project_structure(): @@ -124,8 +128,20 @@ def create_app_instance(): sync_workers=sync_workers, ) - # Mount MCP if enabled - _maybe_mount_mcp(app, enable_mcp) + # Register post-init callbacks (run by the lifespan after module discovery) + def _on_ready(): + _maybe_mount_mcp(app, enable_mcp) + if save_openapi: + try: + spec = generate_openapi_spec(app) + spec_filename = f"openapi.{openapi_format}" + spec_path = Path.cwd() / spec_filename + save_openapi_spec(spec, spec_path, format=openapi_format) + logger.info("OpenAPI spec saved: %s", spec_filename) + except Exception as e: + logger.warning("Could not save OpenAPI spec: %s", e) + + app.state._on_ready_callbacks = [_on_ready] return app @@ -199,62 +215,27 @@ def main( enable_auth=auth, sync_workers=sync_workers, ) - - # Mount MCP if enabled - def notify_cli(msg: str, level: str = "info"): - color = "green" if level == "info" else "yellow" - click.echo(click.style(msg, fg=color)) - - _maybe_mount_mcp(app, mcp, notify=notify_cli) - except Exception as e: click.echo(click.style(f"Error creating application: {e}", fg="red")) raise click.Abort() - click.echo() - click.echo(click.style("Discovered Programs:", fg="cyan", bold=True)) - click.echo() + # Register post-init callbacks (run by the lifespan after module discovery) + def _on_ready(): + _maybe_mount_mcp(app, mcp) + if save_openapi: + try: + spec = generate_openapi_spec(app) + spec_filename = f"openapi.{openapi_format}" + spec_path = Path.cwd() / spec_filename + save_openapi_spec(spec, spec_path, format=openapi_format) + logger.info("OpenAPI spec saved: %s", spec_filename) + except Exception as e: + logger.warning("Could not save OpenAPI spec: %s", e) + + app.state._on_ready_callbacks = [_on_ready] - if hasattr(app.state, "modules") and app.state.modules: - for module in app.state.modules: - click.echo(f" • {module.name}") - click.echo(f" POST /{module.name}") - else: - click.echo(click.style(" No programs discovered", fg="yellow")) - click.echo() - click.echo("Make sure your DSPy modules:") - click.echo(" 1. Are in src//modules/") - click.echo(" 2. Subclass dspy.Module") - click.echo(" 3. Are not named with a leading underscore") - click.echo(" 4. If you are using external dependencies:") - from dspy_cli.utils.venv import venv_activate_command - click.echo(f" - Ensure your venv is activated ({venv_activate_command()})") - click.echo(" - Make sure you have dspy-cli as a local dependency") - click.echo(" - Install them using pip install -e .") - - click.echo() - click.echo(click.style("Additional Endpoints:", fg="cyan", bold=True)) - click.echo() - click.echo(" GET /programs - List all programs and their schemas") - click.echo(" GET /openapi.json - OpenAPI specification") - click.echo(" GET / - Web UI for interactive testing") - if mcp: - click.echo(" POST /mcp - Model Context Protocol server") click.echo() - # Generate and save OpenAPI spec if requested - if save_openapi: - try: - spec = generate_openapi_spec(app) - spec_filename = f"openapi.{openapi_format}" - spec_path = Path.cwd() / spec_filename - save_openapi_spec(spec, spec_path, format=openapi_format) - click.echo(click.style(f"✓ OpenAPI spec saved: {spec_filename}", fg="green")) - click.echo() - except Exception as e: - click.echo(click.style(f"Warning: Could not save OpenAPI spec: {e}", fg="yellow")) - click.echo() - host_string = "localhost" if host == "0.0.0.0" else host click.echo(click.style("=" * 60, fg="cyan")) click.echo(click.style(f"Server starting on http://{host_string}:{port}", fg="green", bold=True)) @@ -282,6 +263,8 @@ def notify_cli(msg: str, level: str = "info"): os.environ[ENV_LOGS_DIR] = str(logs_path) os.environ[ENV_ENABLE_MCP] = str(mcp).lower() os.environ[ENV_AUTH_ENABLED] = str(auth).lower() + os.environ[ENV_SAVE_OPENAPI] = str(save_openapi).lower() + os.environ[ENV_OPENAPI_FORMAT] = openapi_format if sync_workers is not None: os.environ[ENV_SYNC_WORKERS] = str(sync_workers) diff --git a/tests/gateway/test_gateway_routes.py b/tests/gateway/test_gateway_routes.py index e47ad03..e4b4c9a 100644 --- a/tests/gateway/test_gateway_routes.py +++ b/tests/gateway/test_gateway_routes.py @@ -184,9 +184,10 @@ def test_custom_gateway_path(self, gateway_project, test_config): enable_ui=False ) - routes = [r.path for r in app.routes if hasattr(r, "path")] - assert "/webhooks/process" in routes - assert "/WebhookProcessor" not in routes + with TestClient(app): + routes = [r.path for r in app.routes if hasattr(r, "path")] + assert "/webhooks/process" in routes + assert "/WebhookProcessor" not in routes def test_custom_gateway_input_transform(self, gateway_project, test_config): """Gateway should transform webhook payload to pipeline inputs.""" @@ -246,8 +247,9 @@ def test_no_gateway_uses_module_name_path(self, identity_gateway_project, test_c enable_ui=False ) - routes = [r.path for r in app.routes if hasattr(r, "path")] - assert "/Echo" in routes + with TestClient(app): + routes = [r.path for r in app.routes if hasattr(r, "path")] + assert "/Echo" in routes def test_identity_gateway_passthrough(self, identity_gateway_project, test_config): """IdentityGateway should pass inputs/outputs unchanged.""" diff --git a/tests/test_serve_integration.py b/tests/test_serve_integration.py index 27e4955..3937779 100644 --- a/tests/test_serve_integration.py +++ b/tests/test_serve_integration.py @@ -103,21 +103,22 @@ def test_create_app_discovers_modules(temp_project, test_config): enable_ui=False ) - # Verify Echo module was discovered - assert hasattr(app.state, "modules") - module_names = [m.name for m in app.state.modules] - assert "Echo" in module_names - - # Verify POST /Echo route exists - routes = [r for r in app.routes] - echo_route = None - for route in routes: - if hasattr(route, "path") and route.path == "/Echo": - echo_route = route - break - - assert echo_route is not None, "POST /Echo route not found" - assert "POST" in echo_route.methods + with TestClient(app): + # Verify Echo module was discovered + assert hasattr(app.state, "modules") + module_names = [m.name for m in app.state.modules] + assert "Echo" in module_names + + # Verify POST /Echo route exists + routes = [r for r in app.routes] + echo_route = None + for route in routes: + if hasattr(route, "path") and route.path == "/Echo": + echo_route = route + break + + assert echo_route is not None, "POST /Echo route not found" + assert "POST" in echo_route.methods def test_openapi_spec_generation(temp_project, test_config): @@ -130,17 +131,18 @@ def test_openapi_spec_generation(temp_project, test_config): enable_ui=False ) - # Generate spec - spec = generate_openapi_spec(app) - - # Verify basic structure - assert "openapi" in spec - assert "paths" in spec - assert "/Echo" in spec["paths"] - assert "post" in spec["paths"]["/Echo"] - - # Verify other standard endpoints - assert "/programs" in spec["paths"] + with TestClient(app): + # Generate spec (routes registered during lifespan) + spec = generate_openapi_spec(app) + + # Verify basic structure + assert "openapi" in spec + assert "paths" in spec + assert "/Echo" in spec["paths"] + assert "post" in spec["paths"]["/Echo"] + + # Verify other standard endpoints + assert "/programs" in spec["paths"] def test_save_openapi_spec_json(temp_project, test_config): @@ -153,17 +155,17 @@ def test_save_openapi_spec_json(temp_project, test_config): enable_ui=False ) - spec = generate_openapi_spec(app) - output_path = temp_project["root"] / "openapi.json" - - save_openapi_spec(spec, output_path, format="json") - - assert output_path.exists() - - # Verify it's valid JSON - import json - content = json.loads(output_path.read_text()) - assert "/Echo" in content["paths"] + with TestClient(app): + spec = generate_openapi_spec(app) + output_path = temp_project["root"] / "openapi.json" + + save_openapi_spec(spec, output_path, format="json") + + assert output_path.exists() + + import json + content = json.loads(output_path.read_text()) + assert "/Echo" in content["paths"] def test_save_openapi_spec_yaml(temp_project, test_config): @@ -176,17 +178,17 @@ def test_save_openapi_spec_yaml(temp_project, test_config): enable_ui=False ) - spec = generate_openapi_spec(app) - output_path = temp_project["root"] / "openapi.yaml" - - save_openapi_spec(spec, output_path, format="yaml") - - assert output_path.exists() - - # Verify it's valid YAML - import yaml - content = yaml.safe_load(output_path.read_text()) - assert "/Echo" in content["paths"] + with TestClient(app): + spec = generate_openapi_spec(app) + output_path = temp_project["root"] / "openapi.yaml" + + save_openapi_spec(spec, output_path, format="yaml") + + assert output_path.exists() + + import yaml + content = yaml.safe_load(output_path.read_text()) + assert "/Echo" in content["paths"] def test_runner_main_no_reload(temp_project, test_config, monkeypatch): @@ -196,7 +198,7 @@ def test_runner_main_no_reload(temp_project, test_config, monkeypatch): # Mock load_config to return test config monkeypatch.setattr("dspy_cli.server.runner.load_config", lambda: test_config) - # Mock uvicorn.run to avoid starting server + # Mock uvicorn.run to trigger the lifespan (via TestClient) so _on_ready runs calls = [] def fake_run(app_or_str, **kw): calls.append({ @@ -206,6 +208,10 @@ def fake_run(app_or_str, **kw): "reload": kw.get("reload", False), "factory": kw.get("factory", False) }) + # Simulate startup so _on_ready callbacks fire (e.g. OpenAPI save) + if not isinstance(app_or_str, str): + with TestClient(app_or_str): + pass monkeypatch.setattr("uvicorn.run", fake_run) @@ -226,7 +232,7 @@ def fake_run(app_or_str, **kw): assert calls[0]["port"] == 1234 assert calls[0]["reload"] is False - # Verify OpenAPI was saved + # Verify OpenAPI was saved (by _on_ready callback during lifespan) assert (temp_project["root"] / "openapi.json").exists()