diff --git a/src/dspy_cli/server/app.py b/src/dspy_cli/server/app.py index e017871..8ac3de8 100644 --- a/src/dspy_cli/server/app.py +++ b/src/dspy_cli/server/app.py @@ -15,7 +15,7 @@ from dspy_cli.discovery import discover_modules from dspy_cli.discovery.gateway_finder import get_gateways_for_module, is_cron_gateway from dspy_cli.gateway import APIGateway, IdentityGateway -from dspy_cli.server.logging import setup_logging +from dspy_cli.server.logging import setup_logging, start_log_writer, stop_log_writer from dspy_cli.server.metrics import get_all_metrics, get_program_metrics_cached from dspy_cli.server.routes import create_program_routes from dspy_cli.server.scheduler import GatewayScheduler @@ -47,6 +47,7 @@ def create_app( """ # Setup logging setup_logging() + start_log_writer() # Create FastAPI app app = FastAPI( @@ -332,6 +333,8 @@ async def lifespan(app: FastAPI): except Exception as e: logger.warning(f"Gateway shutdown error: {e}") + stop_log_writer() + def _create_lm_instance(model_config: Dict) -> dspy.LM: """Create a DSPy LM instance from configuration. diff --git a/src/dspy_cli/server/logging.py b/src/dspy_cli/server/logging.py index 0636430..73e3c7b 100644 --- a/src/dspy_cli/server/logging.py +++ b/src/dspy_cli/server/logging.py @@ -2,12 +2,111 @@ import json import logging +import queue +import threading from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) +_log_queue: queue.Queue = queue.Queue() +_writer_thread: Optional[threading.Thread] = None +_shutdown_event = threading.Event() +_SENTINEL = object() + + +def start_log_writer() -> None: + """Start the background log writer thread.""" + global _writer_thread + + if _writer_thread is not None and _writer_thread.is_alive(): + return + + _shutdown_event.clear() + _writer_thread = threading.Thread(target=_log_writer_loop, name="log-writer", daemon=True) + _writer_thread.start() + + +def stop_log_writer(timeout: float = 5.0) -> None: + """Stop the background log writer, flushing pending entries.""" + global _writer_thread + + if _writer_thread is None or not _writer_thread.is_alive(): + return + + _shutdown_event.set() + _log_queue.put(_SENTINEL) + _writer_thread.join(timeout=timeout) + _writer_thread = None + + +def _log_writer_loop() -> None: + """Drain the queue and batch-write entries to per-program log files.""" + while True: + entries: list = [] + + try: + item = _log_queue.get(timeout=1.0) + except queue.Empty: + if _shutdown_event.is_set(): + break + continue + + if item is _SENTINEL: + _drain_remaining(entries) + _flush_entries(entries) + break + + entries.append(item) + + # Batch up to 49 more without blocking + for _ in range(49): + try: + item = _log_queue.get_nowait() + if item is _SENTINEL: + _flush_entries(entries) + return + entries.append(item) + except queue.Empty: + break + + _flush_entries(entries) + + +def _drain_remaining(entries: list) -> None: + """Drain any remaining items from the queue.""" + while not _log_queue.empty(): + try: + item = _log_queue.get_nowait() + if item is not _SENTINEL: + entries.append(item) + except queue.Empty: + break + + +def _flush_entries(entries: list) -> None: + """Write a batch of log entries grouped by program to their files.""" + if not entries: + return + + grouped: Dict[tuple, List[str]] = {} + for logs_dir, program_name, log_entry in entries: + key = (str(logs_dir), program_name) + if key not in grouped: + grouped[key] = [] + grouped[key].append(json.dumps(log_entry)) + + for (logs_dir_str, program_name), lines in grouped.items(): + log_dir = Path(logs_dir_str) + log_file = log_dir / f"{program_name}.log" + try: + log_dir.mkdir(exist_ok=True, parents=True) + with open(log_file, "a") as f: + f.write("\n".join(lines) + "\n") + except Exception as e: + logger.error(f"Failed to write inference log for {program_name}: {e}") + def log_inference( logs_dir: Path, @@ -21,22 +120,9 @@ def log_inference( cost_usd: Optional[float] = None, lm_calls: Optional[List[Dict[str, Any]]] = None, ): - """Log a DSPy inference trace to a per-program log file. - - This creates a structured log entry suitable for use as training data, - capturing the full inference trace including inputs, outputs, and metadata. - - Args: - logs_dir: Directory to write log files - program_name: Name of the DSPy program - model: Model identifier (e.g., 'anthropic/claude-sonnet-4-5') - inputs: Input fields passed to the program - outputs: Output fields from the program - duration_ms: Execution duration in milliseconds - error: Optional error message if inference failed - tokens: Optional token counts {"prompt_tokens": int, "completion_tokens": int, "total_tokens": int} - cost_usd: Optional total cost in USD for this inference - lm_calls: Optional list of LM calls made during inference (for compound programs) + """Enqueue an inference log entry for the background writer. + + Never blocks the calling thread or event loop. """ log_entry = { "timestamp": datetime.now(timezone.utc).isoformat(), @@ -62,14 +148,7 @@ def log_inference( if lm_calls: log_entry["lm_calls"] = lm_calls - log_file = logs_dir / f"{program_name}.log" - - try: - logs_dir.mkdir(exist_ok=True, parents=True) - with open(log_file, "a") as f: - f.write(json.dumps(log_entry) + "\n") - except Exception as e: - logger.error(f"Failed to write inference log: {e}") + _log_queue.put((logs_dir, program_name, log_entry)) def setup_logging(log_level: str = "INFO"):