Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/dspy_cli/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from dspy_cli.discovery import discover_modules
from dspy_cli.discovery.gateway_finder import get_gateways_for_module, is_cron_gateway
from dspy_cli.gateway import APIGateway, IdentityGateway
from dspy_cli.server.logging import setup_logging
from dspy_cli.server.logging import setup_logging, start_log_writer, stop_log_writer
from dspy_cli.server.metrics import get_all_metrics, get_program_metrics_cached
from dspy_cli.server.routes import create_program_routes
from dspy_cli.server.scheduler import GatewayScheduler
Expand Down Expand Up @@ -47,6 +47,7 @@ def create_app(
"""
# Setup logging
setup_logging()
start_log_writer()

# Create FastAPI app
app = FastAPI(
Expand Down Expand Up @@ -332,6 +333,8 @@ async def lifespan(app: FastAPI):
except Exception as e:
logger.warning(f"Gateway shutdown error: {e}")

stop_log_writer()


def _create_lm_instance(model_config: Dict) -> dspy.LM:
"""Create a DSPy LM instance from configuration.
Expand Down
127 changes: 103 additions & 24 deletions src/dspy_cli/server/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,111 @@

import json
import logging
import queue
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional

logger = logging.getLogger(__name__)

_log_queue: queue.Queue = queue.Queue()
_writer_thread: Optional[threading.Thread] = None
_shutdown_event = threading.Event()
_SENTINEL = object()


def start_log_writer() -> None:
"""Start the background log writer thread."""
global _writer_thread

if _writer_thread is not None and _writer_thread.is_alive():
return

_shutdown_event.clear()
_writer_thread = threading.Thread(target=_log_writer_loop, name="log-writer", daemon=True)
_writer_thread.start()


def stop_log_writer(timeout: float = 5.0) -> None:
"""Stop the background log writer, flushing pending entries."""
global _writer_thread

if _writer_thread is None or not _writer_thread.is_alive():
return

_shutdown_event.set()
_log_queue.put(_SENTINEL)
_writer_thread.join(timeout=timeout)
_writer_thread = None


def _log_writer_loop() -> None:
"""Drain the queue and batch-write entries to per-program log files."""
while True:
entries: list = []

try:
item = _log_queue.get(timeout=1.0)
except queue.Empty:
if _shutdown_event.is_set():
break
continue

if item is _SENTINEL:
_drain_remaining(entries)
_flush_entries(entries)
break

entries.append(item)

# Batch up to 49 more without blocking
for _ in range(49):
try:
item = _log_queue.get_nowait()
if item is _SENTINEL:
_flush_entries(entries)
return
entries.append(item)
except queue.Empty:
break

_flush_entries(entries)


def _drain_remaining(entries: list) -> None:
"""Drain any remaining items from the queue."""
while not _log_queue.empty():
try:
item = _log_queue.get_nowait()
if item is not _SENTINEL:
entries.append(item)
except queue.Empty:
break


def _flush_entries(entries: list) -> None:
"""Write a batch of log entries grouped by program to their files."""
if not entries:
return

grouped: Dict[tuple, List[str]] = {}
for logs_dir, program_name, log_entry in entries:
key = (str(logs_dir), program_name)
if key not in grouped:
grouped[key] = []
grouped[key].append(json.dumps(log_entry))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Catch serialization errors before they kill log writer

_flush_entries() now calls json.dumps(log_entry) outside the try block, so any non-JSON-serializable value in inputs, outputs, or lm_calls raises TypeError and terminates the background thread; after that, all subsequent inference logs are silently dropped because nothing drains _log_queue. The previous synchronous path caught this exception and continued serving requests, so this is a regression that can break metrics/training log collection for the rest of the process after a single malformed payload.

Useful? React with 👍 / 👎.


for (logs_dir_str, program_name), lines in grouped.items():
log_dir = Path(logs_dir_str)
log_file = log_dir / f"{program_name}.log"
try:
log_dir.mkdir(exist_ok=True, parents=True)
with open(log_file, "a") as f:
f.write("\n".join(lines) + "\n")
except Exception as e:
logger.error(f"Failed to write inference log for {program_name}: {e}")


def log_inference(
logs_dir: Path,
Expand All @@ -21,22 +120,9 @@ def log_inference(
cost_usd: Optional[float] = None,
lm_calls: Optional[List[Dict[str, Any]]] = None,
):
"""Log a DSPy inference trace to a per-program log file.

This creates a structured log entry suitable for use as training data,
capturing the full inference trace including inputs, outputs, and metadata.

Args:
logs_dir: Directory to write log files
program_name: Name of the DSPy program
model: Model identifier (e.g., 'anthropic/claude-sonnet-4-5')
inputs: Input fields passed to the program
outputs: Output fields from the program
duration_ms: Execution duration in milliseconds
error: Optional error message if inference failed
tokens: Optional token counts {"prompt_tokens": int, "completion_tokens": int, "total_tokens": int}
cost_usd: Optional total cost in USD for this inference
lm_calls: Optional list of LM calls made during inference (for compound programs)
"""Enqueue an inference log entry for the background writer.

Never blocks the calling thread or event loop.
"""
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
Expand All @@ -62,14 +148,7 @@ def log_inference(
if lm_calls:
log_entry["lm_calls"] = lm_calls

log_file = logs_dir / f"{program_name}.log"

try:
logs_dir.mkdir(exist_ok=True, parents=True)
with open(log_file, "a") as f:
f.write(json.dumps(log_entry) + "\n")
except Exception as e:
logger.error(f"Failed to write inference log: {e}")
_log_queue.put((logs_dir, program_name, log_entry))


def setup_logging(log_level: str = "INFO"):
Expand Down