From cfb741854ac355bc8c763989c06d1ffaa5813566 Mon Sep 17 00:00:00 2001 From: Roberdan Date: Sat, 4 Apr 2026 16:12:53 +0200 Subject: [PATCH 1/2] =?UTF-8?q?fix:=20resolve=205=20known=20issues=20?= =?UTF-8?q?=E2=80=94=20spawner=20exit,=20zombie=20reaper,=20depgraph=20wir?= =?UTF-8?q?ing,=20observatory=20sink,=20inference=20backend?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Spawner: add --max-turns 50 to claude -p so agents exit reliably 2. Reaper: clean zombie agents stuck in spawning stage for >1h 3. Wire DepgraphExtension in main.rs with manifests from all extensions 4. Observatory: new timeline sink subscribes to EventBus and persists domain events to obs_timeline 5. Inference: add real HTTP backend (Ollama/OpenAI-compatible) with echo fallback, POST /api/inference/complete endpoint Co-Authored-By: Claude Opus 4.6 (1M context) --- daemon/Cargo.lock | 4 + .../convergio-agent-runtime/src/reaper.rs | 64 ++++++++ .../convergio-agent-runtime/src/spawner.rs | 3 + daemon/crates/convergio-inference/Cargo.toml | 1 + .../crates/convergio-inference/src/backend.rs | 149 ++++++++++++++++++ daemon/crates/convergio-inference/src/lib.rs | 1 + .../crates/convergio-inference/src/router.rs | 60 +++++-- .../crates/convergio-inference/src/routes.rs | 33 +++- .../crates/convergio-observatory/Cargo.toml | 2 + .../crates/convergio-observatory/src/ext.rs | 11 +- .../crates/convergio-observatory/src/lib.rs | 1 + .../crates/convergio-observatory/src/sink.rs | 74 +++++++++ daemon/src/main.rs | 10 +- 13 files changed, 398 insertions(+), 15 deletions(-) create mode 100644 daemon/crates/convergio-inference/src/backend.rs create mode 100644 daemon/crates/convergio-observatory/src/sink.rs diff --git a/daemon/Cargo.lock b/daemon/Cargo.lock index 76e5ad8a..4734d57b 100644 --- a/daemon/Cargo.lock +++ b/daemon/Cargo.lock @@ -592,6 +592,7 @@ dependencies = [ "convergio-ipc", "convergio-telemetry", "convergio-types", + "reqwest", "rusqlite", "serde", "serde_json", @@ -723,11 +724,13 @@ dependencies = [ "axum", "chrono", "convergio-db", + "convergio-ipc", "convergio-telemetry", "convergio-types", "rusqlite", "serde", "serde_json", + "tokio", "tracing", ] @@ -763,6 +766,7 @@ dependencies = [ "serde", "serde_json", "thiserror 1.0.69", + "tokio", "tracing", ] diff --git a/daemon/crates/convergio-agent-runtime/src/reaper.rs b/daemon/crates/convergio-agent-runtime/src/reaper.rs index 28566043..33b02b67 100644 --- a/daemon/crates/convergio-agent-runtime/src/reaper.rs +++ b/daemon/crates/convergio-agent-runtime/src/reaper.rs @@ -14,6 +14,7 @@ use crate::types::RuntimeResult; #[derive(Debug, Clone, Default)] pub struct ReaperReport { pub stale_agents_reaped: usize, + pub zombie_spawning_reaped: usize, pub expired_delegations_returned: usize, pub orphan_scopes_cleaned: usize, } @@ -41,6 +42,18 @@ pub fn reap_cycle(pool: &ConnPool) -> RuntimeResult { report.stale_agents_reaped += 1; } + // 1b. Reap zombie agents stuck in 'spawning' for over 1 hour + let zombies = conn.execute( + "UPDATE art_agents SET stage = 'reaped', updated_at = datetime('now') \ + WHERE stage = 'spawning' \ + AND created_at < datetime('now', '-1 hour')", + [], + )?; + if zombies > 0 { + tracing::warn!(count = zombies, "reaped zombie spawning agents (>1h old)"); + } + report.zombie_spawning_reaped = zombies; + // 2. Auto-return expired delegations let expired = crate::delegation::find_expired(&conn)?; for deleg in &expired { @@ -62,11 +75,13 @@ pub fn reap_cycle(pool: &ConnPool) -> RuntimeResult { report.orphan_scopes_cleaned = cleaned; if report.stale_agents_reaped > 0 + || report.zombie_spawning_reaped > 0 || report.expired_delegations_returned > 0 || report.orphan_scopes_cleaned > 0 { tracing::info!( stale = report.stale_agents_reaped, + zombies = report.zombie_spawning_reaped, expired = report.expired_delegations_returned, scopes = report.orphan_scopes_cleaned, "reaper cycle complete" @@ -111,6 +126,55 @@ mod tests { assert_eq!(report.expired_delegations_returned, 0); } + #[test] + fn reap_cycle_reaps_zombie_spawning_agents() { + let pool = convergio_db::pool::create_memory_pool().unwrap(); + { + let conn = pool.get().unwrap(); + for m in crate::schema::migrations() { + conn.execute_batch(m.up).unwrap(); + } + // Insert a zombie agent stuck in spawning for 2 hours + conn.execute( + "INSERT INTO art_agents (id, agent_name, org_id, node, stage, created_at) \ + VALUES ('zombie-1', 'ghost', 'test-org', 'n1', 'spawning', \ + datetime('now', '-2 hours'))", + [], + ) + .unwrap(); + // Insert a fresh spawning agent (should NOT be reaped) + conn.execute( + "INSERT INTO art_agents (id, agent_name, org_id, node, stage, created_at) \ + VALUES ('fresh-1', 'alive', 'test-org', 'n1', 'spawning', \ + datetime('now', '-5 minutes'))", + [], + ) + .unwrap(); + } + + let report = reap_cycle(&pool).unwrap(); + assert_eq!(report.zombie_spawning_reaped, 1); + + let conn = pool.get().unwrap(); + let zombie_stage: String = conn + .query_row( + "SELECT stage FROM art_agents WHERE id = 'zombie-1'", + [], + |r| r.get(0), + ) + .unwrap(); + assert_eq!(zombie_stage, "reaped"); + + let fresh_stage: String = conn + .query_row( + "SELECT stage FROM art_agents WHERE id = 'fresh-1'", + [], + |r| r.get(0), + ) + .unwrap(); + assert_eq!(fresh_stage, "spawning"); + } + #[test] fn reap_cycle_reaps_stale_agents() { let pool = convergio_db::pool::create_memory_pool().unwrap(); diff --git a/daemon/crates/convergio-agent-runtime/src/spawner.rs b/daemon/crates/convergio-agent-runtime/src/spawner.rs index e92b951f..bd2f8c32 100644 --- a/daemon/crates/convergio-agent-runtime/src/spawner.rs +++ b/daemon/crates/convergio-agent-runtime/src/spawner.rs @@ -77,6 +77,9 @@ pub fn spawn_process( let mut cmd = Command::new(&claude_bin); cmd.args(["--dangerously-skip-permissions"]); cmd.args(["--model", model]); + // Learning: claude -p sometimes hangs after completing its task. + // --max-turns caps conversation turns so the process exits reliably. + cmd.args(["--max-turns", "50"]); cmd.args(["-p", "Leggi TASK.md per le istruzioni. Poi inizia."]); cmd.current_dir(workspace); for (k, v) in env_vars { diff --git a/daemon/crates/convergio-inference/Cargo.toml b/daemon/crates/convergio-inference/Cargo.toml index 4352d242..87e47a85 100644 --- a/daemon/crates/convergio-inference/Cargo.toml +++ b/daemon/crates/convergio-inference/Cargo.toml @@ -17,3 +17,4 @@ serde_json = { workspace = true } chrono = { workspace = true } rusqlite = { workspace = true } axum = { workspace = true } +reqwest = { workspace = true } diff --git a/daemon/crates/convergio-inference/src/backend.rs b/daemon/crates/convergio-inference/src/backend.rs new file mode 100644 index 00000000..74cfe62f --- /dev/null +++ b/daemon/crates/convergio-inference/src/backend.rs @@ -0,0 +1,149 @@ +//! HTTP backend — calls real model APIs (Ollama, OpenAI-compatible). +//! +//! Ollama exposes an OpenAI-compatible endpoint at /v1/chat/completions. +//! Cloud providers (Anthropic, OpenAI) also follow this format. + +use std::time::Instant; + +use serde::{Deserialize, Serialize}; + +use crate::types::{InferenceResponse, ModelEndpoint, ModelProvider}; + +#[derive(Serialize)] +struct ChatRequest<'a> { + model: &'a str, + messages: Vec>, + max_tokens: u32, + stream: bool, +} + +#[derive(Serialize)] +struct ChatMessage<'a> { + role: &'a str, + content: &'a str, +} + +#[derive(Deserialize)] +struct ChatResponse { + choices: Vec, + usage: Option, +} + +#[derive(Deserialize)] +struct Choice { + message: ChoiceMessage, +} + +#[derive(Deserialize)] +struct ChoiceMessage { + content: String, +} + +#[derive(Deserialize)] +struct Usage { + #[serde(default)] + total_tokens: u32, +} + +/// Call a model endpoint and return the real response. +/// Falls back to echo mode if the endpoint is unreachable. +pub async fn call_model( + endpoint: &ModelEndpoint, + prompt: &str, + max_tokens: u32, +) -> Result { + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(120)) + .build() + .map_err(|e| format!("http client: {e}"))?; + + // Ollama model name: strip provider prefix if present + let model_name = endpoint + .name + .split('/') + .last() + .unwrap_or(&endpoint.name); + + let url = format!("{}/v1/chat/completions", endpoint.url.trim_end_matches('/')); + + let body = ChatRequest { + model: model_name, + messages: vec![ChatMessage { + role: "user", + content: prompt, + }], + max_tokens, + stream: false, + }; + + let start = Instant::now(); + + let mut req = client.post(&url).json(&body); + + // Cloud providers need auth headers (loaded from daemon env file) + if endpoint.provider == ModelProvider::Cloud { + if let Ok(key) = std::env::var("CONVERGIO_ANTHROPIC_TOKEN") { + req = req + .header("x-api-key", &key) + .header("anthropic-version", "2023-06-01"); + } else if let Ok(key) = std::env::var("CONVERGIO_OPENAI_TOKEN") { + req = req.header("Authorization", format!("Bearer {key}")); + } + } + + let resp = req.send().await.map_err(|e| format!("request: {e}"))?; + let latency_ms = start.elapsed().as_millis() as u64; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + return Err(format!("model returned {status}: {body}")); + } + + let chat: ChatResponse = resp + .json() + .await + .map_err(|e| format!("parse response: {e}"))?; + + let content = chat + .choices + .into_iter() + .next() + .map(|c| c.message.content) + .unwrap_or_default(); + + let tokens_used = chat + .usage + .map(|u| u.total_tokens) + .unwrap_or(max_tokens); + + let cost = (tokens_used as f64 / 1000.0) * endpoint.cost_per_1k_input; + + Ok(InferenceResponse { + content, + model_used: endpoint.name.clone(), + latency_ms, + tokens_used, + cost, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::{InferenceTier, ModelProvider}; + + #[test] + fn model_name_strips_prefix() { + let name = "ollama/llama3.2"; + let stripped = name.split('/').last().unwrap_or(name); + assert_eq!(stripped, "llama3.2"); + } + + #[test] + fn model_name_no_prefix() { + let name = "llama3.2"; + let stripped = name.split('/').last().unwrap_or(name); + assert_eq!(stripped, "llama3.2"); + } +} diff --git a/daemon/crates/convergio-inference/src/lib.rs b/daemon/crates/convergio-inference/src/lib.rs index a2a53a33..ade6ee12 100644 --- a/daemon/crates/convergio-inference/src/lib.rs +++ b/daemon/crates/convergio-inference/src/lib.rs @@ -3,6 +3,7 @@ //! Implements Extension: provides semantic model routing that replaces //! static fallback chains with intelligent, budget-aware selection. +pub mod backend; pub mod budget; pub mod classifier; pub mod ext; diff --git a/daemon/crates/convergio-inference/src/router.rs b/daemon/crates/convergio-inference/src/router.rs index 74240969..c36724d0 100644 --- a/daemon/crates/convergio-inference/src/router.rs +++ b/daemon/crates/convergio-inference/src/router.rs @@ -49,35 +49,73 @@ impl ModelRouter { self.models.keys().cloned().collect() } - /// Route a request, applying semantic classification and budget awareness. - /// Returns Err when no healthy model covers the effective tier. + /// Route and call a real model backend. Falls back to echo if unreachable. + pub async fn route_real( + &self, + request: &InferenceRequest, + budget_downgrade: bool, + ) -> Result<(InferenceResponse, RoutingDecision), String> { + let decision = self.make_decision(request, budget_downgrade)?; + let endpoint = self.models.get(&decision.selected_model); + + let response = match endpoint { + Some(ep) if !ep.url.is_empty() => { + match crate::backend::call_model(ep, &request.prompt, request.max_tokens).await { + Ok(resp) => resp, + Err(e) => { + tracing::warn!( + model = decision.selected_model.as_str(), + error = e.as_str(), + "model call failed, returning echo" + ); + self.echo_response(request, &decision) + } + } + } + _ => self.echo_response(request, &decision), + }; + Ok((response, decision)) + } + + /// Route without calling a real backend (echo mode for tests/dry-run). pub fn route( &self, request: &InferenceRequest, budget_downgrade: bool, ) -> Result<(InferenceResponse, RoutingDecision), String> { - let classified_tier = classifier::classify(request); + let decision = self.make_decision(request, budget_downgrade)?; + Ok((self.echo_response(request, &decision), decision)) + } + fn make_decision( + &self, + request: &InferenceRequest, + budget_downgrade: bool, + ) -> Result { + let classified_tier = classifier::classify(request); let effective_tier = if budget_downgrade { budget::downgrade_tier(classified_tier.clone()) } else { - classified_tier.clone() + classified_tier }; + self.select(&effective_tier, &request.constraints, budget_downgrade) + } - let decision = self.select(&effective_tier, &request.constraints, budget_downgrade)?; - - let response = InferenceResponse { + fn echo_response( + &self, + request: &InferenceRequest, + decision: &RoutingDecision, + ) -> InferenceResponse { + InferenceResponse { content: format!( - "[routed to {}] {}", + "[echo:{}] {}", decision.selected_model, &request.prompt ), model_used: decision.selected_model.clone(), latency_ms: 0, tokens_used: request.max_tokens, cost: 0.0, - }; - - Ok((response, decision)) + } } /// Build a routing decision for the given tier and constraints. diff --git a/daemon/crates/convergio-inference/src/routes.rs b/daemon/crates/convergio-inference/src/routes.rs index ac644667..b50a536d 100644 --- a/daemon/crates/convergio-inference/src/routes.rs +++ b/daemon/crates/convergio-inference/src/routes.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use axum::extract::{Query, State}; use axum::response::Json; -use axum::routing::get; +use axum::routing::{get, post}; use axum::Router; use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; @@ -59,6 +59,7 @@ pub fn inference_routes(state: Arc) -> Router { Router::new() .route("/api/inference/costs", get(handle_costs)) .route("/api/inference/routing-decision", get(handle_routing)) + .route("/api/inference/complete", post(handle_complete)) .with_state(state) } @@ -141,3 +142,33 @@ async fn handle_routing( })), } } + +/// POST /api/inference/complete — real model inference call. +async fn handle_complete( + State(state): State>, + Json(request): Json, +) -> Json { + let should_downgrade = { + let conn = state.pool.get().ok(); + conn.map(|c| { + budget::should_downgrade( + &c, + &request.agent_id, + &budget::BudgetConfig::default(), + ) + .unwrap_or(false) + }) + .unwrap_or(false) + }; + + let router = state.router.read().await; + match router.route_real(&request, should_downgrade).await { + Ok((resp, decision)) => Json(serde_json::json!({ + "response": resp, + "decision": decision, + })), + Err(e) => Json(serde_json::json!({ + "error": { "code": "INFERENCE_FAILED", "message": e } + })), + } +} diff --git a/daemon/crates/convergio-observatory/Cargo.toml b/daemon/crates/convergio-observatory/Cargo.toml index 51906820..c1971b67 100644 --- a/daemon/crates/convergio-observatory/Cargo.toml +++ b/daemon/crates/convergio-observatory/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true [dependencies] convergio-types = { path = "../convergio-types" } convergio-db = { path = "../convergio-db" } +convergio-ipc = { path = "../convergio-ipc" } convergio-telemetry = { path = "../convergio-telemetry" } tracing = { workspace = true } serde = { workspace = true } @@ -15,3 +16,4 @@ serde_json = { workspace = true } chrono = { workspace = true } rusqlite = { workspace = true } axum = { workspace = true } +tokio = { workspace = true } diff --git a/daemon/crates/convergio-observatory/src/ext.rs b/daemon/crates/convergio-observatory/src/ext.rs index 78f96c2d..25e9665a 100644 --- a/daemon/crates/convergio-observatory/src/ext.rs +++ b/daemon/crates/convergio-observatory/src/ext.rs @@ -80,7 +80,16 @@ impl Extension for ObservatoryExtension { Some(crate::routes::observatory_routes(self.state())) } - fn on_start(&self, _ctx: &AppContext) -> ExtResult<()> { + fn on_start(&self, ctx: &AppContext) -> ExtResult<()> { + // Subscribe to EventBus and persist domain events to timeline + if let Some(bus) = ctx.get::>() { + crate::sink::spawn_timeline_sink(self.pool.clone(), bus.clone()); + tracing::info!("observatory: timeline sink started"); + } else { + tracing::warn!( + "observatory: no EventBus in context, timeline sink disabled" + ); + } tracing::info!("observatory: extension started"); Ok(()) } diff --git a/daemon/crates/convergio-observatory/src/lib.rs b/daemon/crates/convergio-observatory/src/lib.rs index 8df58144..6bb9dc5b 100644 --- a/daemon/crates/convergio-observatory/src/lib.rs +++ b/daemon/crates/convergio-observatory/src/lib.rs @@ -12,6 +12,7 @@ pub mod routes; pub mod routes_webhook; pub mod schema; pub mod search; +pub mod sink; pub mod timeline; pub mod types; diff --git a/daemon/crates/convergio-observatory/src/sink.rs b/daemon/crates/convergio-observatory/src/sink.rs new file mode 100644 index 00000000..40c3b074 --- /dev/null +++ b/daemon/crates/convergio-observatory/src/sink.rs @@ -0,0 +1,74 @@ +//! EventBus → timeline persistence sink. +//! +//! Subscribes to the IPC EventBus and persists every domain event +//! to the obs_timeline table. Without this, events only flow to SSE +//! clients and are lost when nobody is listening. + +use std::sync::Arc; + +use convergio_db::pool::ConnPool; +use convergio_ipc::sse::EventBus; + +use crate::timeline::{self, NewEvent}; +use crate::types::EventSource; + +/// Spawn a background task that subscribes to the EventBus and +/// writes every event to obs_timeline. +pub fn spawn_timeline_sink( + pool: ConnPool, + bus: Arc, +) -> tokio::task::JoinHandle<()> { + let mut rx = bus.subscribe(); + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(event) => { + let source = classify_source(&event.event_type); + let conn = match pool.get() { + Ok(c) => c, + Err(e) => { + tracing::warn!("timeline sink: pool error: {e}"); + continue; + } + }; + let new_evt = NewEvent { + source: &source, + event_type: &event.event_type, + actor: &event.from, + org_id: None, + node_id: None, + summary: &event.content, + details_json: None, + }; + if let Err(e) = timeline::record_event(&conn, &new_evt) { + tracing::warn!( + event_type = event.event_type.as_str(), + "timeline sink: write failed: {e}" + ); + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!(dropped = n, "timeline sink lagged"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + tracing::info!("timeline sink: bus closed, stopping"); + break; + } + } + } + }) +} + +fn classify_source(event_type: &str) -> EventSource { + match event_type { + t if t.starts_with("plan_") || t.starts_with("task_") => { + EventSource::Orchestrator + } + t if t.starts_with("agent_") || t.starts_with("delegation_") => { + EventSource::Agent + } + t if t.starts_with("budget_") => EventSource::Billing, + t if t.starts_with("health_") => EventSource::System, + _ => EventSource::System, + } +} diff --git a/daemon/src/main.rs b/daemon/src/main.rs index 51272ec7..2d6e0ac6 100644 --- a/daemon/src/main.rs +++ b/daemon/src/main.rs @@ -120,10 +120,16 @@ async fn main() { // 5. Register extensions (shared EventBus for domain events → SSE) let event_bus = Arc::new(convergio_ipc::sse::EventBus::new(1024)); - let extensions = register_extensions(pool.clone(), Arc::clone(&event_bus)); + let mut extensions = register_extensions(pool.clone(), Arc::clone(&event_bus)); + + // 5b. Wire depgraph with manifests from all registered extensions + let manifests: Vec<_> = extensions.iter().map(|e| e.manifest()).collect(); + extensions.push(Arc::new(convergio_depgraph::DepgraphExtension::new(manifests))); + let mut ctx = AppContext::new(); - let sink: Arc = event_bus; + let sink: Arc = event_bus.clone(); ctx.insert(sink); + ctx.insert(Arc::clone(&event_bus)); // 6. Extension migrations { From 7154d4c3c39fb9567a0c0292a00dbc1283cec627 Mon Sep 17 00:00:00 2001 From: Roberdan Date: Sat, 4 Apr 2026 16:30:29 +0200 Subject: [PATCH 2/2] fix(monitor): use waitpid instead of kill(0) to detect zombie agents kill(pid, 0) returns success for zombie processes, so the monitor never detects agent completion. waitpid(WNOHANG) properly reaps zombies and returns the exit code. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/spawn_monitor.rs | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/daemon/crates/convergio-agent-runtime/src/spawn_monitor.rs b/daemon/crates/convergio-agent-runtime/src/spawn_monitor.rs index 05a6c2aa..c05d242a 100644 --- a/daemon/crates/convergio-agent-runtime/src/spawn_monitor.rs +++ b/daemon/crates/convergio-agent-runtime/src/spawn_monitor.rs @@ -80,13 +80,28 @@ pub fn monitor_agent( } /// Poll until process exits. Returns exit code or -1. +/// Uses waitpid(WNOHANG) to detect zombie processes that kill(0) misses. async fn wait_for_exit(pid: u32) -> i32 { loop { tokio::time::sleep(std::time::Duration::from_secs(5)).await; - // Check if process is still alive - let alive = unsafe { libc::kill(pid as i32, 0) } == 0; - if !alive { - return -1; // Process gone, can't get exit code from detached child + // Try waitpid first — catches zombies that kill(0) reports as alive + let mut status: i32 = 0; + let result = + unsafe { libc::waitpid(pid as i32, &mut status, libc::WNOHANG) }; + if result > 0 { + // Process exited (or was zombie, now reaped) + if libc::WIFEXITED(status) { + return libc::WEXITSTATUS(status); + } + return -1; + } + // waitpid returns 0 = still running, -1 = not our child + if result < 0 { + // ECHILD: not our child process (already reaped or not a child) + let alive = unsafe { libc::kill(pid as i32, 0) } == 0; + if !alive { + return -1; + } } } }