From d67e6a19eecbbaeff78b9a6de36b197e6d0c785c Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Thu, 28 May 2026 11:44:58 +0800 Subject: [PATCH] {"schema":"decodex/commit/1","summary":"Handle Linear connector backoff and retained review ownership drift","authority":"XY-611"} --- apps/decodex/src/orchestrator/daemon.rs | 8 + apps/decodex/src/orchestrator/entrypoints.rs | 416 ++++++++++++++++-- apps/decodex/src/orchestrator/status.rs | 407 +++++++++++++++-- .../tests/operator/status/publishing.rs | 147 ++++++- .../tests/review_landing/status_rows.rs | 4 + apps/decodex/src/recovery.rs | 144 +++++- apps/decodex/src/state/internal.rs | 105 ++++- apps/decodex/src/state/models.rs | 60 +++ apps/decodex/src/state/store.rs | 77 ++++ apps/decodex/src/state/tests.rs | 51 ++- docs/runbook/recover-review-handoff.md | 13 +- docs/spec/post-review-lifecycle.md | 12 +- 12 files changed, 1336 insertions(+), 108 deletions(-) diff --git a/apps/decodex/src/orchestrator/daemon.rs b/apps/decodex/src/orchestrator/daemon.rs index 10059692..ad3747dd 100644 --- a/apps/decodex/src/orchestrator/daemon.rs +++ b/apps/decodex/src/orchestrator/daemon.rs @@ -202,6 +202,14 @@ where if !warnings.is_empty() { hydrate_history_lanes_from_local_ledger(project, state_store, &mut snapshot)?; } + if warnings.contains(&TRACKER_RATE_LIMIT_WARNING) { + let review_state_inspector = GhPullRequestReviewStateInspector { + github_token_env_var: Some(project.github().token_env_var().to_owned()), + }; + + snapshot.post_review_lanes = + build_degraded_post_review_lane_statuses(project, state_store, &review_state_inspector)?; + } for warning in warnings { add_operator_snapshot_warning(&mut snapshot, warning); diff --git a/apps/decodex/src/orchestrator/entrypoints.rs b/apps/decodex/src/orchestrator/entrypoints.rs index bd1a4774..f0679f59 100644 --- a/apps/decodex/src/orchestrator/entrypoints.rs +++ b/apps/decodex/src/orchestrator/entrypoints.rs @@ -1,3 +1,5 @@ +use state::{ConnectorBackoff, ConnectorBackoffInput}; + use crate::runtime; struct ControlPlaneProjectTick { @@ -5,27 +7,34 @@ struct ControlPlaneProjectTick { project_status: Option, } +struct ConnectorBackoffStatusParts<'a> { + project_id: &'a str, + connector: &'a str, + sync_phase: &'a str, + quota_class: &'a str, + reset_unix_epoch: i64, + reset_source: &'a str, + warning: &'a str, +} + impl TrackerConnectorBackoff { fn to_operator_status( &self, project_id: &str, now_unix_epoch: i64, ) -> OperatorConnectorBackoffStatus { - OperatorConnectorBackoffStatus { - project_id: project_id.to_owned(), - connector: String::from("linear"), - sync_phase: self.sync_phase.to_owned(), - quota_class: String::from("linear_graphql_api"), - reset_at: format_optional_unix_timestamp(Some(self.reset_unix_epoch)) - .unwrap_or_else(|| self.reset_unix_epoch.to_string()), - reset_unix_epoch: self.reset_unix_epoch, - reset_source: self.reset_source.to_owned(), - retry_after_seconds: self.reset_unix_epoch.saturating_sub(now_unix_epoch).max(0), - next_action: String::from( - "Wait for the reset window; keep monitoring local running lanes.", - ), - warning: String::from(TRACKER_RATE_LIMIT_WARNING), - } + operator_connector_backoff_status( + ConnectorBackoffStatusParts { + project_id, + connector: "linear", + sync_phase: self.sync_phase, + quota_class: "linear_graphql_api", + reset_unix_epoch: self.reset_unix_epoch, + reset_source: self.reset_source, + warning: TRACKER_RATE_LIMIT_WARNING, + }, + now_unix_epoch, + ) } } @@ -45,6 +54,17 @@ pub(crate) fn run_once(request: RunOnceRequest<'_>) -> Result<()> { runtime::register_project_config(&state_store, &config_path, true)?; + let config = ServiceConfig::from_path(&config_path)?; + let workflow = load_configured_cycle_workflow(&config, request.preferred_workflow_snapshot)?; + + if let Some(status) = + active_stored_tracker_backoff_status(&state_store, config.service_id())? + { + print!("{}", render_tracker_backoff_cli_message("run", &status)); + + return Ok(()); + } + let preferred_run_identity = match (request.preferred_run_id, request.preferred_attempt_number) { (Some(run_id), Some(attempt_number)) => @@ -61,18 +81,37 @@ pub(crate) fn run_once(request: RunOnceRequest<'_>) -> Result<()> { eyre::bail!("queue explanation does not accept a preferred issue."); } - let config = ServiceConfig::from_path(&config_path)?; - let workflow = load_configured_cycle_workflow(&config, request.preferred_workflow_snapshot)?; let tracker = LinearClient::new(config.tracker().resolve_api_key()?)?; - let queued_candidates = - build_queued_candidate_statuses(&tracker, &config, &workflow, &state_store)?; + let queued_candidates = match build_queued_candidate_statuses( + &tracker, + &config, + &workflow, + &state_store, + ) { + Ok(queued_candidates) => queued_candidates, + Err(error) => { + let Some(backoff) = + tracker_rate_limit_backoff(&error, Instant::now(), "queue_explain") + else { + return Err(error); + }; + let status = backoff + .to_operator_status(config.service_id(), OffsetDateTime::now_utc().unix_timestamp()); + + persist_tracker_backoff_state(&state_store, config.service_id(), &backoff); + + print!("{}", render_tracker_backoff_cli_message("run", &status)); + + return Ok(()); + }, + }; print!("{}", render_queue_explain(&config, &queued_candidates)); return Ok(()); } - if let Some(summary) = run_configured_cycle(RunCycleRequest { + let run_summary = match run_configured_cycle(RunCycleRequest { config_path: &config_path, state_store: &state_store, dry_run: request.dry_run, @@ -87,14 +126,33 @@ pub(crate) fn run_once(request: RunOnceRequest<'_>) -> Result<()> { preferred_run_identity, preferred_retry_budget_base: request.preferred_retry_budget_base, preferred_workflow_snapshot: request.preferred_workflow_snapshot, - })? { + }) { + Ok(summary) => summary, + Err(error) => { + let Some(backoff) = tracker_rate_limit_backoff(&error, Instant::now(), "run_cycle") + else { + return Err(error); + }; + let status = backoff + .to_operator_status(config.service_id(), OffsetDateTime::now_utc().unix_timestamp()); + + persist_tracker_backoff_state(&state_store, config.service_id(), &backoff); + + print!("{}", render_tracker_backoff_cli_message("run", &status)); + + return Ok(()); + }, + }; + + if let Some(summary) = run_summary { + clear_tracker_backoff_state_best_effort(&state_store, config.service_id()); + println!("{}", format_run_once_summary(&summary, request.dry_run)); return Ok(()); } - let config = ServiceConfig::from_path(&config_path)?; - let workflow = load_configured_cycle_workflow(&config, request.preferred_workflow_snapshot)?; + clear_tracker_backoff_state_best_effort(&state_store, config.service_id()); println!("{}", format_no_eligible_issue_message(&config, &workflow)); @@ -218,10 +276,21 @@ pub(crate) fn print_status( }; let config = ServiceConfig::from_path(&config_path)?; let workflow = WorkflowDocument::from_path(config.workflow_path())?; - let tracker = LinearClient::new(config.tracker().resolve_api_key()?)?; runtime::register_project_config(&state_store, &config_path, true)?; + if let Some(status) = + active_stored_tracker_backoff_status(&state_store, config.service_id())? + { + let snapshot = + build_operator_status_snapshot_for_tracker_backoff(&config, &state_store, limit, &status)?; + + print_operator_status_snapshot(&snapshot, json)?; + + return Ok(()); + } + + let tracker = LinearClient::new(config.tracker().resolve_api_key()?)?; let recovered_state = recover_runtime_state_from_tracker_and_worktrees( &tracker, &config, @@ -234,6 +303,28 @@ pub(crate) fn print_status( Ok(recovered_state) => hydrate_status_snapshot_state(&config, &state_store, recovered_state)?, Err(error) => { + if let Some(backoff) = + tracker_rate_limit_backoff(&error, Instant::now(), "runtime_recovery") + { + let status = backoff.to_operator_status( + config.service_id(), + OffsetDateTime::now_utc().unix_timestamp(), + ); + + persist_tracker_backoff_state(&state_store, config.service_id(), &backoff); + + let snapshot = build_operator_status_snapshot_for_tracker_backoff( + &config, + &state_store, + limit, + &status, + )?; + + print_operator_status_snapshot(&snapshot, json)?; + + return Ok(()); + } + let warning = runtime_recovery_warning("runtime_recovery_unavailable", &error); tracing::warn!( @@ -245,8 +336,30 @@ pub(crate) fn print_status( }, } - let mut snapshot = - build_live_operator_status_snapshot(&tracker, &config, &workflow, &state_store, limit)?; + let mut snapshot = match build_live_operator_status_snapshot( + &tracker, + &config, + &workflow, + &state_store, + limit, + ) { + Ok(snapshot) => snapshot, + Err(error) => { + let Some(backoff) = + tracker_rate_limit_backoff(&error, Instant::now(), "operator_status_refresh") + else { + return Err(error); + }; + let status = backoff.to_operator_status( + config.service_id(), + OffsetDateTime::now_utc().unix_timestamp(), + ); + + persist_tracker_backoff_state(&state_store, config.service_id(), &backoff); + + build_operator_status_snapshot_for_tracker_backoff(&config, &state_store, limit, &status)? + }, + }; for warning in snapshot_warnings { add_operator_snapshot_warning(&mut snapshot, &warning); @@ -254,12 +367,16 @@ pub(crate) fn print_status( refresh_operator_project_summary(&mut snapshot); - if json { - println!("{}", serde_json::to_string_pretty(&snapshot)?); - } else { - print!("{}", render_operator_status(&snapshot)); + if !snapshot + .connector_backoffs + .iter() + .any(|backoff| backoff.connector == "linear") + { + clear_tracker_backoff_state_best_effort(&state_store, config.service_id()); } + print_operator_status_snapshot(&snapshot, json)?; + Ok(()) } @@ -354,6 +471,174 @@ pub(crate) fn print_private_evidence(request: EvidenceRequest<'_>) -> Result<()> Ok(()) } +fn print_operator_status_snapshot( + snapshot: &OperatorStatusSnapshot, + json: bool, +) -> Result<()> { + if json { + println!("{}", serde_json::to_string_pretty(snapshot)?); + } else { + print!("{}", render_operator_status(snapshot)); + } + + Ok(()) +} + +fn operator_connector_backoff_status( + parts: ConnectorBackoffStatusParts<'_>, + now_unix_epoch: i64, +) -> OperatorConnectorBackoffStatus { + OperatorConnectorBackoffStatus { + project_id: parts.project_id.to_owned(), + connector: parts.connector.to_owned(), + sync_phase: parts.sync_phase.to_owned(), + quota_class: parts.quota_class.to_owned(), + reset_at: format_optional_unix_timestamp(Some(parts.reset_unix_epoch)) + .unwrap_or_else(|| parts.reset_unix_epoch.to_string()), + reset_unix_epoch: parts.reset_unix_epoch, + reset_source: parts.reset_source.to_owned(), + retry_after_seconds: parts.reset_unix_epoch.saturating_sub(now_unix_epoch).max(0), + next_action: String::from( + "Wait for the reset window; keep monitoring local running lanes.", + ), + warning: parts.warning.to_owned(), + } +} + +fn connector_backoff_record_to_operator_status( + backoff: &ConnectorBackoff, + now_unix_epoch: i64, +) -> OperatorConnectorBackoffStatus { + operator_connector_backoff_status( + ConnectorBackoffStatusParts { + project_id: backoff.project_id(), + connector: backoff.connector(), + sync_phase: backoff.sync_phase(), + quota_class: backoff.quota_class(), + reset_unix_epoch: backoff.reset_unix_epoch(), + reset_source: backoff.reset_source(), + warning: backoff.warning(), + }, + now_unix_epoch, + ) +} + +fn active_stored_tracker_backoff_status( + state_store: &StateStore, + project_id: &str, +) -> Result> { + let Some(backoff) = state_store.connector_backoff(project_id, "linear")? else { + return Ok(None); + }; + let now_unix_epoch = OffsetDateTime::now_utc().unix_timestamp(); + + if backoff.reset_unix_epoch() <= now_unix_epoch { + state_store.clear_connector_backoff(project_id, "linear")?; + + return Ok(None); + } + + Ok(Some(connector_backoff_record_to_operator_status(&backoff, now_unix_epoch))) +} + +fn active_stored_tracker_backoff_status_best_effort( + state_store: &StateStore, + project_id: &str, +) -> Option { + match active_stored_tracker_backoff_status(state_store, project_id) { + Ok(status) => status, + Err(error) => { + let _ = error; + + tracing::warn!( + project_id = project_id, + "Failed to read persisted tracker backoff; sensitive runtime details were withheld." + ); + + None + }, + } +} + +fn persist_tracker_backoff_state( + state_store: &StateStore, + project_id: &str, + backoff: &TrackerConnectorBackoff, +) { + if let Err(error) = state_store.upsert_connector_backoff(ConnectorBackoffInput { + project_id, + connector: "linear", + sync_phase: backoff.sync_phase, + quota_class: "linear_graphql_api", + reset_unix_epoch: backoff.reset_unix_epoch, + reset_source: backoff.reset_source, + warning: TRACKER_RATE_LIMIT_WARNING, + }) { + let _ = error; + + tracing::warn!( + project_id = project_id, + "Failed to persist tracker backoff; sensitive runtime details were withheld." + ); + } +} + +fn clear_tracker_backoff_state_best_effort(state_store: &StateStore, project_id: &str) { + if let Err(error) = state_store.clear_connector_backoff(project_id, "linear") { + let _ = error; + + tracing::warn!( + project_id = project_id, + "Failed to clear persisted tracker backoff; sensitive runtime details were withheld." + ); + } +} + +fn render_tracker_backoff_cli_message( + command: &str, + status: &OperatorConnectorBackoffStatus, +) -> String { + format!( + "Linear connector is in backoff for project `{}`; `{}` skipped tracker reads for `{}` until {} (retry_after_seconds={}).\n", + status.project_id, + command, + status.sync_phase, + status.reset_at, + status.retry_after_seconds + ) +} + +fn build_operator_status_snapshot_for_tracker_backoff( + project: &ServiceConfig, + state_store: &StateStore, + limit: usize, + status: &OperatorConnectorBackoffStatus, +) -> Result { + let review_state_inspector = GhPullRequestReviewStateInspector { + github_token_env_var: Some(project.github().token_env_var().to_owned()), + }; + let mut snapshot = build_operator_status_snapshot_with_account_mode( + project, + state_store, + limit, + AccountActivityMode::Snapshot, + )?; + + hydrate_history_lanes_from_local_ledger(project, state_store, &mut snapshot)?; + + snapshot.post_review_lanes = + build_degraded_post_review_lane_statuses(project, state_store, &review_state_inspector)?; + + add_operator_snapshot_warning(&mut snapshot, TRACKER_RATE_LIMIT_WARNING); + + snapshot.connector_backoffs.push(status.clone()); + + add_operator_snapshot_warning(&mut snapshot, "external_observer_status_skipped"); + refresh_operator_project_summary(&mut snapshot); + + Ok(snapshot) +} + fn publish_operator_snapshot( operator_state_endpoint: &OperatorStateEndpoint, snapshot: &OperatorStatusSnapshot, @@ -642,7 +927,29 @@ fn run_control_plane_project_tick( if tracker_backoff_active(runtime, Instant::now()) { snapshot_warnings.push(TRACKER_RATE_LIMIT_WARNING); - return control_plane_project_local_snapshot(project, state_store, runtime, snapshot_warnings); + let connector_backoffs = active_connector_backoff_statuses(project.service_id(), runtime); + + return control_plane_project_local_snapshot( + project, + state_store, + runtime, + snapshot_warnings, + &connector_backoffs, + ); + } + + if let Some(connector_backoff) = + active_stored_tracker_backoff_status_best_effort(state_store, project.service_id()) + { + snapshot_warnings.push(TRACKER_RATE_LIMIT_WARNING); + + return control_plane_project_local_snapshot( + project, + state_store, + runtime, + snapshot_warnings, + slice::from_ref(&connector_backoff), + ); } match load_daemon_tick_context(project.config_path(), &mut runtime.workflow_cache) { @@ -678,17 +985,21 @@ fn tracker_backoff_active(runtime: &mut ProjectDaemonRuntime, now: Instant) -> b fn remember_tracker_backoff( runtime: &mut ProjectDaemonRuntime, + state_store: &StateStore, + project_id: &str, error: &Report, now: Instant, sync_phase: &'static str, -) -> bool { - let Some(backoff) = tracker_rate_limit_backoff(error, now, sync_phase) else { - return false; - }; +) -> Option { + let backoff = tracker_rate_limit_backoff(error, now, sync_phase)?; + let status = + backoff.to_operator_status(project_id, OffsetDateTime::now_utc().unix_timestamp()); + + persist_tracker_backoff_state(state_store, project_id, &backoff); runtime.tracker_backoff = Some(backoff); - true + Some(status) } fn tracker_rate_limit_backoff( @@ -745,6 +1056,7 @@ fn control_plane_project_local_snapshot( state_store: &StateStore, runtime: &mut ProjectDaemonRuntime, snapshot_warnings: &mut Vec<&'static str>, + connector_backoffs: &[OperatorConnectorBackoffStatus], ) -> ControlPlaneProjectTick { match load_daemon_tick_context(project.config_path(), &mut runtime.workflow_cache) { Ok(context) => match build_operator_state_snapshot_for_publish( @@ -754,7 +1066,7 @@ fn control_plane_project_local_snapshot( state_store, DEFAULT_OPERATOR_DASHBOARD_RUN_LIMIT, snapshot_warnings, - &active_connector_backoff_statuses(project.service_id(), runtime), + connector_backoffs, ) { Ok(snapshot) => { write_agent_evidence_best_effort(&snapshot, AgentEvidenceSource::ServeTick); @@ -820,7 +1132,14 @@ fn control_plane_project_snapshot( &mut runtime.retry_queue, context, ) { - if remember_tracker_backoff(runtime, &error, Instant::now(), "control_plane_tick") { + if let Some(connector_backoff) = remember_tracker_backoff( + runtime, + state_store, + project.service_id(), + &error, + Instant::now(), + "control_plane_tick", + ) { snapshot_warnings.push(TRACKER_RATE_LIMIT_WARNING); return control_plane_project_local_snapshot( @@ -828,6 +1147,7 @@ fn control_plane_project_snapshot( state_store, runtime, snapshot_warnings, + slice::from_ref(&connector_backoff), ); } @@ -851,7 +1171,11 @@ fn control_plane_project_snapshot( &[], ) { Ok(snapshot) => { - runtime.tracker_backoff = None; + if !operator_snapshot_has_linear_backoff(&snapshot) { + runtime.tracker_backoff = None; + + clear_tracker_backoff_state_best_effort(state_store, project.service_id()); + } write_agent_evidence_best_effort(&snapshot, AgentEvidenceSource::ServeTick); @@ -865,7 +1189,14 @@ fn control_plane_project_snapshot( } }, Err(error) => { - if remember_tracker_backoff(runtime, &error, Instant::now(), "operator_snapshot_refresh") { + if let Some(connector_backoff) = remember_tracker_backoff( + runtime, + state_store, + project.service_id(), + &error, + Instant::now(), + "operator_snapshot_refresh", + ) { snapshot_warnings.push(TRACKER_RATE_LIMIT_WARNING); return control_plane_project_local_snapshot( @@ -873,6 +1204,7 @@ fn control_plane_project_snapshot( state_store, runtime, snapshot_warnings, + slice::from_ref(&connector_backoff), ); } @@ -893,6 +1225,10 @@ fn control_plane_project_snapshot( } } +fn operator_snapshot_has_linear_backoff(snapshot: &OperatorStatusSnapshot) -> bool { + snapshot.connector_backoffs.iter().any(|backoff| backoff.connector == "linear") +} + fn complete_project_status( project: &ProjectRegistration, mut status: OperatorProjectStatus, diff --git a/apps/decodex/src/orchestrator/status.rs b/apps/decodex/src/orchestrator/status.rs index debef46a..aca17ebe 100644 --- a/apps/decodex/src/orchestrator/status.rs +++ b/apps/decodex/src/orchestrator/status.rs @@ -19,6 +19,18 @@ enum ExternalReviewRequestCiGate { ManualAttention(&'static str), } +#[derive(Clone, Copy)] +enum AccountActivityMode { + Probe, + Snapshot, +} + +enum TrackerObserverOutcome { + Ok, + Unavailable, + RateLimited(TrackerConnectorBackoff), +} + struct PostReviewOrchestrationStatus { phase: ReviewOrchestrationPhase, request_acknowledged: bool, @@ -96,6 +108,15 @@ struct OperatorRunProtocolSummary { event_count: i64, } +struct LiveOperatorStatusObserverContext<'a, T> { + tracker: &'a T, + project: &'a ServiceConfig, + workflow: &'a WorkflowDocument, + state_store: &'a StateStore, + review_state_inspector: &'a GhPullRequestReviewStateInspector, + hydrate_history_ledger: bool, +} + struct PostReviewLaneBuildContext<'a, I> { project: &'a ServiceConfig, workflow: &'a WorkflowDocument, @@ -123,12 +144,6 @@ struct WorktreeOwnership { reason: String, } -#[derive(Clone, Copy)] -enum AccountActivityMode { - Probe, - Snapshot, -} - pub(crate) fn ensure_project_has_no_merged_worktree_cleanup_debt( project: &ServiceConfig, ) -> crate::prelude::Result<()> { @@ -312,57 +327,237 @@ where )?; hydrate_history_lanes_from_local_ledger(project, state_store, &mut snapshot)?; - - if hydrate_operator_run_rows_from_tracker( - tracker, - project, + hydrate_live_operator_external_observers( + LiveOperatorStatusObserverContext { + tracker, + project, + workflow, + state_store, + review_state_inspector: &review_state_inspector, + hydrate_history_ledger, + }, &mut snapshot, - ) { - add_operator_snapshot_warning(&mut snapshot, "run_issue_metadata_unavailable"); + )?; + refresh_worktree_ownership( + &mut snapshot, + Some(workflow.frontmatter().tracker().resolved_completed_state()), + ); + refresh_operator_project_summary(&mut snapshot); + + Ok(snapshot) +} + +fn hydrate_live_operator_external_observers( + context: LiveOperatorStatusObserverContext<'_, T>, + snapshot: &mut OperatorStatusSnapshot, +) -> crate::prelude::Result<()> +where + T: IssueTracker, +{ + let mut paused = + pause_operator_snapshot_for_stored_tracker_backoff(&context, snapshot)?; + + if !paused { + paused = apply_tracker_observer_outcome( + hydrate_operator_run_rows_from_tracker(context.tracker, context.project, snapshot), + snapshot, + context.state_store, + context.project, + "run_issue_metadata_unavailable", + ); + } + if !paused && context.hydrate_history_ledger { + paused = apply_tracker_observer_outcome( + hydrate_history_lanes_from_linear_ledger(context.tracker, context.project, snapshot), + snapshot, + context.state_store, + context.project, + "execution_ledger_status_unavailable", + ); + } + if !paused { + paused = hydrate_queued_candidate_status_observer(&context, snapshot); } - if hydrate_history_ledger && hydrate_history_lanes_from_linear_ledger(tracker, project, &mut snapshot) { - add_operator_snapshot_warning(&mut snapshot, "execution_ledger_status_unavailable"); + if !paused { + paused = hydrate_post_review_lane_status_observer(&context, snapshot)?; } + if paused { + if snapshot.post_review_lanes.is_empty() { + snapshot.post_review_lanes = build_degraded_post_review_lane_statuses( + context.project, + context.state_store, + context.review_state_inspector, + )?; + } + + add_operator_snapshot_warning(snapshot, "external_observer_status_skipped"); + } + + Ok(()) +} + +fn pause_operator_snapshot_for_stored_tracker_backoff( + context: &LiveOperatorStatusObserverContext<'_, T>, + snapshot: &mut OperatorStatusSnapshot, +) -> crate::prelude::Result { + let Some(backoff) = + active_stored_tracker_backoff_status(context.state_store, context.project.service_id())? + else { + return Ok(false); + }; + + add_tracker_backoff_to_operator_snapshot(snapshot, &backoff); + + Ok(true) +} + +fn apply_tracker_observer_outcome( + outcome: TrackerObserverOutcome, + snapshot: &mut OperatorStatusSnapshot, + state_store: &StateStore, + project: &ServiceConfig, + unavailable_warning: &'static str, +) -> bool { + match outcome { + TrackerObserverOutcome::Ok => false, + TrackerObserverOutcome::Unavailable => { + add_operator_snapshot_warning(snapshot, unavailable_warning); + + false + }, + TrackerObserverOutcome::RateLimited(backoff) => { + pause_operator_snapshot_for_rate_limit(snapshot, state_store, project, &backoff); + + true + }, + } +} + +fn pause_operator_snapshot_for_rate_limit( + snapshot: &mut OperatorStatusSnapshot, + state_store: &StateStore, + project: &ServiceConfig, + backoff: &TrackerConnectorBackoff, +) { + persist_tracker_backoff_state(state_store, project.service_id(), backoff); - match build_queued_candidate_statuses(tracker, project, workflow, state_store) { - Ok(queued_candidates) => snapshot.queued_candidates = queued_candidates, + let backoff = backoff.to_operator_status( + project.service_id(), + OffsetDateTime::now_utc().unix_timestamp(), + ); + + add_tracker_backoff_to_operator_snapshot(snapshot, &backoff); +} + +fn hydrate_queued_candidate_status_observer( + context: &LiveOperatorStatusObserverContext<'_, T>, + snapshot: &mut OperatorStatusSnapshot, +) -> bool +where + T: IssueTracker, +{ + match build_queued_candidate_statuses( + context.tracker, + context.project, + context.workflow, + context.state_store, + ) { + Ok(queued_candidates) => { + snapshot.queued_candidates = queued_candidates; + + false + }, Err(error) => { - let _ = error; + let Some(backoff) = + tracker_rate_limit_backoff(&error, Instant::now(), "queued_candidate_status") + else { + let _ = error; - tracing::warn!( - "Skipped queued candidate status while publishing an operator snapshot; sensitive runtime details were withheld." + tracing::warn!( + "Skipped queued candidate status while publishing an operator snapshot; sensitive runtime details were withheld." + ); + + add_operator_snapshot_warning(snapshot, "queued_candidate_status_unavailable"); + + return false; + }; + + pause_operator_snapshot_for_rate_limit( + snapshot, + context.state_store, + context.project, + &backoff, ); - add_operator_snapshot_warning(&mut snapshot, "queued_candidate_status_unavailable"); + true }, } +} + +fn hydrate_post_review_lane_status_observer( + context: &LiveOperatorStatusObserverContext<'_, T>, + snapshot: &mut OperatorStatusSnapshot, +) -> crate::prelude::Result +where + T: IssueTracker, +{ match build_post_review_lane_statuses_and_hydrate_worktrees( - tracker, - project, - workflow, - state_store, - &review_state_inspector, - &mut snapshot, + context.tracker, + context.project, + context.workflow, + context.state_store, + context.review_state_inspector, + snapshot, ) { - Ok(post_review_lanes) => snapshot.post_review_lanes = post_review_lanes, + Ok(post_review_lanes) => { + snapshot.post_review_lanes = post_review_lanes; + + Ok(false) + }, Err(error) => { - let _ = error; + let Some(backoff) = + tracker_rate_limit_backoff(&error, Instant::now(), "post_review_lane_status") + else { + let _ = error; - tracing::warn!( - "Skipped post-review lane status while publishing an operator snapshot; sensitive runtime details were withheld." + tracing::warn!( + "Skipped post-review lane status while publishing an operator snapshot; sensitive runtime details were withheld." + ); + + add_operator_snapshot_warning(snapshot, "post_review_lane_status_unavailable"); + + return Ok(false); + }; + + pause_operator_snapshot_for_rate_limit( + snapshot, + context.state_store, + context.project, + &backoff, ); - add_operator_snapshot_warning(&mut snapshot, "post_review_lane_status_unavailable"); + snapshot.post_review_lanes = build_degraded_post_review_lane_statuses( + context.project, + context.state_store, + context.review_state_inspector, + )?; + + Ok(true) }, } +} - refresh_worktree_ownership( - &mut snapshot, - Some(workflow.frontmatter().tracker().resolved_completed_state()), - ); - refresh_operator_project_summary(&mut snapshot); +fn add_tracker_backoff_to_operator_snapshot( + snapshot: &mut OperatorStatusSnapshot, + backoff: &OperatorConnectorBackoffStatus, +) { + add_operator_snapshot_warning(snapshot, TRACKER_RATE_LIMIT_WARNING); - Ok(snapshot) + if !snapshot.connector_backoffs.iter().any(|existing| { + existing.project_id == backoff.project_id && existing.connector == backoff.connector + }) { + snapshot.connector_backoffs.push(backoff.clone()); + } } fn hydrate_history_lanes_from_local_ledger( @@ -956,14 +1151,14 @@ fn hydrate_operator_run_rows_from_tracker( tracker: &T, project: &ServiceConfig, snapshot: &mut OperatorStatusSnapshot, -) -> bool +) -> TrackerObserverOutcome where T: IssueTracker, { let issue_ids = operator_snapshot_run_issue_ids(snapshot); if issue_ids.is_empty() { - return false; + return TrackerObserverOutcome::Ok; } match tracker.refresh_issues(&issue_ids) { @@ -984,9 +1179,15 @@ where hydrate_operator_snapshot_run_rows(snapshot, &metadata_by_issue_id); - false + TrackerObserverOutcome::Ok }, Err(error) => { + if let Some(backoff) = + tracker_rate_limit_backoff(&error, Instant::now(), "run_issue_metadata") + { + return TrackerObserverOutcome::RateLimited(backoff); + } + let _ = error; tracing::warn!( @@ -994,7 +1195,7 @@ where "Skipped tracker issue metadata hydration for operator run rows; sensitive tracker details were withheld." ); - true + TrackerObserverOutcome::Unavailable }, } } @@ -1146,7 +1347,7 @@ fn hydrate_history_lanes_from_linear_ledger( tracker: &T, project: &ServiceConfig, snapshot: &mut OperatorStatusSnapshot, -) -> bool +) -> TrackerObserverOutcome where T: IssueTracker, { @@ -1163,6 +1364,14 @@ where lane.ledger_outcome = operator_history_ledger_outcome(&records); }, Err(error) => { + if let Some(backoff) = + tracker_rate_limit_backoff(&error, Instant::now(), "execution_ledger_status") + { + lane.ledger_outcome = unavailable_history_ledger_outcome(); + + return TrackerObserverOutcome::RateLimited(backoff); + } + let _ = error; tracing::warn!( @@ -1176,7 +1385,11 @@ where } } - unavailable + if unavailable { + TrackerObserverOutcome::Unavailable + } else { + TrackerObserverOutcome::Ok + } } fn hydrate_history_lane_from_ledger_records( @@ -1972,6 +2185,89 @@ where .collect()) } +fn build_degraded_post_review_lane_statuses( + project: &ServiceConfig, + state_store: &StateStore, + review_state_inspector: &I, +) -> crate::prelude::Result> +where + I: PullRequestReviewStateInspector, +{ + let mut lanes = Vec::new(); + + for worktree in state_store.list_worktrees(project.service_id())? { + let Some(review_handoff) = state_store.review_handoff_marker( + project.service_id(), + worktree.issue_id(), + worktree.branch_name(), + )? else { + continue; + }; + let issue_identifier = retained_issue_identifier_from_worktree(&worktree); + let review_state = review_state_inspector + .inspect_review_state(worktree.worktree_path(), review_handoff.pr_url()) + .ok(); + let ( + pr_head_sha, + pr_state, + review_decision, + mergeable, + check_state, + unresolved_review_threads, + ) = match review_state { + Some(review_state) => ( + Some(review_state.head_ref_oid), + Some(review_state.state), + review_state.review_decision, + Some(review_state.mergeable), + review_state.status_check_rollup_state, + Some(review_state.unresolved_review_threads), + ), + None => ( + Some(review_handoff.pr_head_oid().to_owned()), + None, + None, + None, + None, + None, + ), + }; + + lanes.push(OperatorPostReviewLaneStatus { + issue_id: worktree.issue_id().to_owned(), + issue_identifier, + issue_state: String::from("tracker_readback_degraded"), + branch_name: worktree.branch_name().to_owned(), + worktree_path: relative_worktree_path_for_path(project, worktree.worktree_path()), + classification: String::from("wait_for_review"), + reason: String::from("tracker_issue_readback_degraded"), + pr_url: Some(review_handoff.pr_url().to_owned()), + pr_head_sha, + pr_state, + review_decision, + mergeable, + check_state, + unresolved_review_threads, + readback_warning: Some(String::from("tracker_issue_readback_degraded")), + }); + } + + lanes.sort_by(|left, right| left.issue_identifier.cmp(&right.issue_identifier)); + + Ok(lanes) +} + +fn retained_issue_identifier_from_worktree(worktree: &WorktreeMapping) -> String { + worktree + .worktree_path() + .file_name() + .and_then(|name| name.to_str()) + .map(str::trim) + .filter(|name| !name.is_empty()) + .unwrap_or_else(|| worktree.issue_id()) + .to_ascii_uppercase() +} + fn build_post_review_lane_statuses_from_worktree_issues( project: &ServiceConfig, workflow: &WorkflowDocument, @@ -2107,6 +2403,13 @@ where ); } + apply_active_ownership_warning_to_post_review_lane( + context.project, + context.success_state, + &snapshot, + &mut classification, + ); + Ok(Some(post_review_lane_status_from_classification( context.project, &snapshot, @@ -2114,6 +2417,24 @@ where ))) } +fn apply_active_ownership_warning_to_post_review_lane( + project: &ServiceConfig, + success_state: &str, + snapshot: &PostReviewLaneSnapshot, + classification: &mut PostReviewLaneClassification, +) { + if snapshot.review_handoff.is_none() + || snapshot.issue.state.name != success_state + || !snapshot.issue.labels_complete + || snapshot.issue.has_label(&tracker::automation_active_label(project.service_id())) + { + return; + } + if classification.readback_warning.is_none() { + classification.readback_warning = Some(String::from("active_ownership_label_missing")); + } +} + fn post_review_lane_status_from_classification( project: &ServiceConfig, snapshot: &PostReviewLaneSnapshot, diff --git a/apps/decodex/src/orchestrator/tests/operator/status/publishing.rs b/apps/decodex/src/orchestrator/tests/operator/status/publishing.rs index d876a79f..6aea5475 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status/publishing.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status/publishing.rs @@ -1,3 +1,5 @@ +use crate::state::ConnectorBackoffInput; + #[test] fn live_operator_status_snapshot_degrades_when_post_review_status_refresh_fails() { let (_temp_dir, config, workflow) = temp_project_layout(); @@ -29,6 +31,72 @@ fn live_operator_status_snapshot_degrades_when_post_review_status_refresh_fails( assert!(snapshot.post_review_lanes.is_empty()); } +#[test] +fn live_operator_status_snapshot_preserves_retained_handoff_during_linear_backoff() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let issue = sample_issue("In Review", &[]); + let tracker = FakeTracker::with_refresh_error( + vec![issue.clone()], + "Linear connector is rate limited: Rate limit exceeded. Only 2500 requests are allowed per 1 hour.", + ); + let branch_name = "x/pubfi-pub-101"; + let pr_url = "https://github.com/hack-ink/pubfi-mono-v2/pull/101"; + let head_sha = "1111111111111111111111111111111111111111"; + let worktree_path = config.worktree_root().join(&issue.identifier); + let handoff = ReviewHandoffMarker::new( + "pub-101-attempt-1", + 1, + branch_name, + pr_url, + "main", + branch_name, + head_sha, + ); + + state_store + .upsert_worktree( + TEST_SERVICE_ID, + &issue.id, + branch_name, + &worktree_path.display().to_string(), + ) + .expect("worktree should record"); + state_store + .upsert_review_handoff_marker(TEST_SERVICE_ID, &issue.id, &handoff) + .expect("review handoff should record"); + + let snapshot = orchestrator::build_live_operator_status_snapshot( + &tracker, + &config, + &workflow, + &state_store, + 10, + ) + .expect("snapshot should degrade instead of failing"); + + assert!(snapshot.warnings.contains(&String::from(orchestrator::TRACKER_RATE_LIMIT_WARNING))); + assert!(snapshot.warnings.contains(&String::from("external_observer_status_skipped"))); + assert_eq!(snapshot.connector_backoffs.len(), 1); + assert_eq!(snapshot.connector_backoffs[0].sync_phase, "post_review_lane_status"); + assert_eq!(snapshot.post_review_lanes.len(), 1); + assert_eq!(snapshot.post_review_lanes[0].issue_identifier, "PUB-101"); + assert_eq!(snapshot.post_review_lanes[0].issue_state, "tracker_readback_degraded"); + assert_eq!(snapshot.post_review_lanes[0].reason, "tracker_issue_readback_degraded"); + assert_eq!(snapshot.post_review_lanes[0].pr_url.as_deref(), Some(pr_url)); + assert_eq!(snapshot.post_review_lanes[0].pr_head_sha.as_deref(), Some(head_sha)); + assert_eq!( + snapshot.post_review_lanes[0].readback_warning.as_deref(), + Some("tracker_issue_readback_degraded") + ); + assert!( + state_store + .connector_backoff(TEST_SERVICE_ID, "linear") + .expect("connector backoff should read") + .is_some() + ); +} + #[test] fn operator_state_snapshot_publish_skips_external_observers_after_tick_failure() { let (_temp_dir, config, workflow) = temp_project_layout(); @@ -64,7 +132,12 @@ fn operator_state_snapshot_publish_skips_external_observers_after_tick_failure() fn operator_state_snapshot_reports_tracker_rate_limit_as_backoff() { let (_temp_dir, config, workflow) = temp_project_layout(); let state_store = StateStore::open_in_memory().expect("state store should open"); - let tracker = FakeTracker::new(vec![sample_issue("Todo", &[])]); + let issue = sample_issue("In Review", &[]); + let tracker = FakeTracker::new(vec![issue.clone()]); + let branch_name = "x/pubfi-pub-101"; + let pr_url = "https://github.com/hack-ink/pubfi-mono-v2/pull/101"; + let head_sha = "1111111111111111111111111111111111111111"; + let worktree_path = config.worktree_root().join(&issue.identifier); let reset_unix_epoch = OffsetDateTime::now_utc().unix_timestamp() + 60; let error = eyre::eyre!( "Linear connector is rate limited until `{reset_unix_epoch}`: API rate limit exceeded" @@ -76,6 +149,31 @@ fn operator_state_snapshot_reports_tracker_rate_limit_as_backoff() { ) .expect("rate limit should create backoff") .to_operator_status(config.service_id(), reset_unix_epoch - 15); + + state_store + .upsert_worktree( + TEST_SERVICE_ID, + &issue.id, + branch_name, + &worktree_path.display().to_string(), + ) + .expect("worktree should record"); + state_store + .upsert_review_handoff_marker( + TEST_SERVICE_ID, + &issue.id, + &ReviewHandoffMarker::new( + "pub-101-attempt-1", + 1, + branch_name, + pr_url, + "main", + branch_name, + head_sha, + ), + ) + .expect("review handoff should record"); + let snapshot = orchestrator::build_operator_state_snapshot_for_publish( &tracker, &config, @@ -113,6 +211,11 @@ fn operator_state_snapshot_reports_tracker_rate_limit_as_backoff() { assert_eq!(snapshot_json["connector_backoffs"][0]["retry_after_seconds"], 15); assert_ne!(snapshot_json["connector_backoffs"][0]["reset_at"], Value::Null); assert_ne!(snapshot_json["connector_backoffs"][0]["next_action"], Value::Null); + assert_eq!(snapshot.post_review_lanes.len(), 1); + assert_eq!(snapshot.post_review_lanes[0].issue_identifier, "PUB-101"); + assert_eq!(snapshot.post_review_lanes[0].reason, "tracker_issue_readback_degraded"); + assert_eq!(snapshot.post_review_lanes[0].pr_url.as_deref(), Some(pr_url)); + assert_eq!(snapshot.post_review_lanes[0].pr_head_sha.as_deref(), Some(head_sha)); assert_eq!(snapshot.projects[0].connector_state, "backoff"); assert!( tracker.label_queries.borrow().is_empty(), @@ -120,6 +223,48 @@ fn operator_state_snapshot_reports_tracker_rate_limit_as_backoff() { ); } +#[test] +fn live_operator_status_snapshot_honors_persisted_tracker_backoff_without_linear_reads() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let tracker = FakeTracker::new(vec![sample_issue("Todo", &[])]); + let reset_unix_epoch = OffsetDateTime::now_utc().unix_timestamp() + 60; + + state_store + .upsert_connector_backoff(ConnectorBackoffInput { + project_id: config.service_id(), + connector: "linear", + sync_phase: "run_cycle", + quota_class: "linear_graphql_api", + reset_unix_epoch, + reset_source: "local_default", + warning: TRACKER_RATE_LIMIT_WARNING, + }) + .expect("connector backoff should persist"); + + let snapshot = orchestrator::build_live_operator_status_snapshot( + &tracker, + &config, + &workflow, + &state_store, + 10, + ) + .expect("snapshot should use local backoff state"); + + assert!(snapshot.warnings.contains(&String::from(orchestrator::TRACKER_RATE_LIMIT_WARNING))); + assert!(snapshot.warnings.contains(&String::from("external_observer_status_skipped"))); + assert_eq!(snapshot.connector_backoffs.len(), 1); + assert_eq!(snapshot.projects[0].connector_state, "backoff"); + assert!( + tracker.label_queries.borrow().is_empty(), + "persisted backoff should skip queued-label reads" + ); + assert!( + tracker.comment_queries.borrow().is_empty(), + "persisted backoff should skip execution-ledger reads" + ); +} + #[test] fn operator_state_snapshot_publish_does_not_derive_history_outcome_without_execution_ledger() { let (_temp_dir, config, workflow) = temp_project_layout(); diff --git a/apps/decodex/src/orchestrator/tests/review_landing/status_rows.rs b/apps/decodex/src/orchestrator/tests/review_landing/status_rows.rs index 5c3e1b2e..c5c73de6 100644 --- a/apps/decodex/src/orchestrator/tests/review_landing/status_rows.rs +++ b/apps/decodex/src/orchestrator/tests/review_landing/status_rows.rs @@ -64,6 +64,10 @@ fn build_post_review_lane_statuses_reports_ready_to_land() { assert_eq!(lanes[0].classification, "ready_to_land"); assert_eq!(lanes[0].reason, "external_review_passed_strict"); assert_eq!(lanes[0].pr_url.as_deref(), Some(pr_url)); + assert_eq!( + lanes[0].readback_warning.as_deref(), + Some("active_ownership_label_missing") + ); } #[test] diff --git a/apps/decodex/src/recovery.rs b/apps/decodex/src/recovery.rs index f6d33c02..06ddc7cb 100644 --- a/apps/decodex/src/recovery.rs +++ b/apps/decodex/src/recovery.rs @@ -7,6 +7,7 @@ use std::{ process::Command, }; +use color_eyre::Report; use serde::Serialize; use time::{OffsetDateTime, format_description::well_known::Rfc3339}; @@ -17,8 +18,8 @@ use crate::{ pull_request::PullRequestLandingState, runtime, state::{ - RUN_ACTIVITY_MARKER_FILE, ReviewHandoffMarker, ReviewOrchestrationMarker, StateStore, - WorktreeMapping, + ConnectorBackoffInput, RUN_ACTIVITY_MARKER_FILE, ReviewHandoffMarker, + ReviewOrchestrationMarker, StateStore, WorktreeMapping, }, tracker::{ self, IssueTracker, TrackerIssue, @@ -32,11 +33,14 @@ use crate::{ const MISSING_HANDOFF_REASON: &str = "missing_review_handoff_record"; const ORPHANED_REVIEW_HANDOFF_CLASSIFICATION: &str = "orphaned_review_handoff"; const REVIEW_HANDOFF_BOUND_CLASSIFICATION: &str = "review_handoff_bound"; +const REVIEW_HANDOFF_OWNERSHIP_DRIFT_CLASSIFICATION: &str = "review_handoff_ownership_drift"; const REVIEW_HANDOFF_REBIND_REQUIRED_CLASSIFICATION: &str = "review_handoff_rebind_required"; const REVIEW_HANDOFF_UNVERIFIED_CLASSIFICATION: &str = "review_handoff_unverified"; const REVIEW_HANDOFF_MISMATCH_CLASSIFICATION: &str = "review_handoff_mismatch"; const REVIEW_HANDOFF_REBIND_EVENT: &str = "review_handoff_rebind"; const REBOUND_ORCHESTRATION_PHASE: &str = "request_pending"; +const LINEAR_CONNECTOR_BACKOFF_WARNING: &str = "tracker_rate_limited"; +const LINEAR_CONNECTOR_BACKOFF_SECS: i64 = 15 * 60; /// Read-only retained review handoff diagnostic request. #[derive(Debug)] @@ -171,9 +175,32 @@ pub(crate) fn run_review_handoff_diagnose( request: &ReviewHandoffDiagnoseRequest, ) -> Result<()> { let context = load_recovery_context(config_path)?; - let diagnostics = match request.issue.as_deref() { - Some(issue_identifier) => vec![diagnose_issue(&context, issue_identifier)?], - None => diagnose_all_retained_review_worktrees(&context)?, + + if let Some(message) = active_recovery_tracker_backoff_message(&context)? { + println!("{message}"); + + return Ok(()); + } + + let diagnostics = match match request.issue.as_deref() { + Some(issue_identifier) => + diagnose_issue(&context, issue_identifier).map(|diagnostic| vec![diagnostic]), + None => diagnose_all_retained_review_worktrees(&context), + } { + Ok(diagnostics) => diagnostics, + Err(error) => { + if let Some(message) = remember_recovery_tracker_backoff_message( + &context, + &error, + "review_handoff_recovery", + ) { + println!("{message}"); + + return Ok(()); + } + + return Err(error); + }, }; let report = ReviewHandoffRecoveryReport { project_id: context.config.service_id().to_owned(), @@ -239,6 +266,88 @@ fn load_recovery_context(config_path: Option<&Path>) -> Result Ok(RecoveryContext { config, workflow, state_store, tracker }) } +fn active_recovery_tracker_backoff_message(context: &RecoveryContext) -> Result> { + let Some(backoff) = + context.state_store.connector_backoff(context.config.service_id(), "linear")? + else { + return Ok(None); + }; + let now_unix_epoch = OffsetDateTime::now_utc().unix_timestamp(); + + if backoff.reset_unix_epoch() <= now_unix_epoch { + context.state_store.clear_connector_backoff(context.config.service_id(), "linear")?; + + return Ok(None); + } + + Ok(Some(recovery_tracker_backoff_message( + context.config.service_id(), + backoff.sync_phase(), + backoff.reset_unix_epoch(), + backoff.reset_unix_epoch().saturating_sub(now_unix_epoch), + ))) +} + +fn remember_recovery_tracker_backoff_message( + context: &RecoveryContext, + error: &Report, + sync_phase: &str, +) -> Option { + let message = format!("{error:#}"); + + if !message.contains("Linear connector is rate limited") { + return None; + } + + let now_unix_epoch = OffsetDateTime::now_utc().unix_timestamp(); + let (reset_unix_epoch, reset_source) = + match parse_recovery_rate_limit_reset_unix_epoch(&message) { + Some(reset) if reset > now_unix_epoch => (reset, "linear"), + _ => (now_unix_epoch.saturating_add(LINEAR_CONNECTOR_BACKOFF_SECS), "local_default"), + }; + + if let Err(store_error) = context.state_store.upsert_connector_backoff(ConnectorBackoffInput { + project_id: context.config.service_id(), + connector: "linear", + sync_phase, + quota_class: "linear_graphql_api", + reset_unix_epoch, + reset_source, + warning: LINEAR_CONNECTOR_BACKOFF_WARNING, + }) { + let _ = store_error; + + tracing::warn!( + project_id = context.config.service_id(), + "Failed to persist recovery tracker backoff; sensitive runtime details were withheld." + ); + } + + Some(recovery_tracker_backoff_message( + context.config.service_id(), + sync_phase, + reset_unix_epoch, + reset_unix_epoch.saturating_sub(now_unix_epoch), + )) +} + +fn parse_recovery_rate_limit_reset_unix_epoch(message: &str) -> Option { + let reset = message.split("rate limited until `").nth(1)?.split('`').next()?; + + reset.parse().ok() +} + +fn recovery_tracker_backoff_message( + service_id: &str, + sync_phase: &str, + reset_unix_epoch: i64, + retry_after_seconds: i64, +) -> String { + format!( + "Linear connector is in backoff for project `{service_id}`; recovery skipped tracker reads for `{sync_phase}` until unix_epoch={reset_unix_epoch} (retry_after_seconds={retry_after_seconds})." + ) +} + fn resolve_recovery_config_path( config_path: Option<&Path>, state_store: &StateStore, @@ -434,6 +543,20 @@ fn diagnostic_binding(request: HandoffDiagnosticRequest<'_>) -> HandoffBindingDi return diagnostic; } + if request.active_label_present == Some(false) { + return HandoffBindingDiagnostic { + classification: String::from(REVIEW_HANDOFF_OWNERSHIP_DRIFT_CLASSIFICATION), + reason: String::from("active_ownership_label_missing"), + pr_base_ref, + pr_head_oid, + mismatched_field: Some(String::from("issue.labels")), + next_action: bound_handoff_next_action( + request.service_id, + request.active_label_present, + ), + }; + } + HandoffBindingDiagnostic { classification: String::from(REVIEW_HANDOFF_BOUND_CLASSIFICATION), reason: String::from("review_handoff_record_present"), @@ -618,7 +741,7 @@ fn missing_handoff_next_action(service_id: &str, issue_identifier: &str) -> Stri fn bound_handoff_next_action(service_id: &str, active_label_present: Option) -> String { if active_label_present == Some(false) { return format!( - "Restore explicit lane ownership with label `{}`, then continue the existing post-review lifecycle.", + "Restore explicit lane ownership with label `{}`, then rerun `decodex recover review-handoff diagnose ` and continue the existing post-review lifecycle.", tracker::automation_active_label(service_id) ); } @@ -1218,8 +1341,8 @@ mod tests { use crate::{ pull_request::PullRequestLandingState, recovery::{ - REVIEW_HANDOFF_BOUND_CLASSIFICATION, REVIEW_HANDOFF_REBIND_EVENT, - REVIEW_HANDOFF_REBIND_REQUIRED_CLASSIFICATION, + REVIEW_HANDOFF_BOUND_CLASSIFICATION, REVIEW_HANDOFF_OWNERSHIP_DRIFT_CLASSIFICATION, + REVIEW_HANDOFF_REBIND_EVENT, REVIEW_HANDOFF_REBIND_REQUIRED_CLASSIFICATION, }, state::{ReviewHandoffMarker, ReviewOrchestrationMarker, StateStore, WorktreeMapping}, tracker::records::{self, LinearExecutionEventIdentity, LinearExecutionEventRecord}, @@ -1483,8 +1606,9 @@ mod tests { active_label_present: Some(false), }); - assert_eq!(diagnostic.classification, REVIEW_HANDOFF_BOUND_CLASSIFICATION); - assert_eq!(diagnostic.reason, "review_handoff_record_present"); + assert_eq!(diagnostic.classification, REVIEW_HANDOFF_OWNERSHIP_DRIFT_CLASSIFICATION); + assert_eq!(diagnostic.reason, "active_ownership_label_missing"); + assert_eq!(diagnostic.mismatched_field.as_deref(), Some("issue.labels")); assert!(diagnostic.next_action.contains("decodex:active:pubfi")); assert!(diagnostic.next_action.contains("Restore explicit lane ownership")); } diff --git a/apps/decodex/src/state/internal.rs b/apps/decodex/src/state/internal.rs index 5037a41c..f002e93e 100644 --- a/apps/decodex/src/state/internal.rs +++ b/apps/decodex/src/state/internal.rs @@ -101,6 +101,7 @@ struct StateData { private_execution_events: Vec, review_handoffs: HashMap, review_orchestrations: HashMap, + connector_backoffs: HashMap<(String, String), ConnectorBackoff>, dispatch_slot_configs: HashMap, issue_claim_guards: HashMap, dispatch_slot_guards: HashMap, @@ -117,6 +118,7 @@ impl StateData { self.private_execution_events = loaded.private_execution_events; self.review_handoffs = loaded.review_handoffs; self.review_orchestrations = loaded.review_orchestrations; + self.connector_backoffs = loaded.connector_backoffs; } fn replace_project_run_state(&mut self, loaded: Self) { @@ -336,12 +338,34 @@ CREATE TABLE IF NOT EXISTS review_orchestrations ( ); "#, )?; + self.bootstrap_connector_backoffs_schema()?; self.bootstrap_private_execution_events_schema()?; self.record_schema_version()?; Ok(()) } + fn bootstrap_connector_backoffs_schema(&self) -> Result<()> { + self.connection.execute_batch( + r#" +CREATE TABLE IF NOT EXISTS connector_backoffs ( + project_id TEXT NOT NULL, + connector TEXT NOT NULL, + sync_phase TEXT NOT NULL, + quota_class TEXT NOT NULL, + reset_unix_epoch INTEGER NOT NULL, + reset_source TEXT NOT NULL, + warning TEXT NOT NULL, + updated_at TEXT NOT NULL, + updated_at_unix INTEGER NOT NULL, + PRIMARY KEY (project_id, connector) +); +"#, + )?; + + Ok(()) + } + fn bootstrap_private_execution_events_schema(&self) -> Result<()> { self.connection.execute_batch( r#" @@ -374,7 +398,7 @@ CREATE TABLE IF NOT EXISTS schema_meta ( value TEXT NOT NULL ); INSERT INTO schema_meta (key, value) -VALUES ('schema_version', '5') +VALUES ('schema_version', '6') ON CONFLICT(key) DO UPDATE SET value = excluded.value; "#, )?; @@ -394,6 +418,7 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; self.load_private_execution_events(&mut state)?; self.load_review_handoffs(&mut state)?; self.load_review_orchestrations(&mut state)?; + self.load_connector_backoffs(&mut state)?; Ok(state) } @@ -429,6 +454,7 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; persist_private_execution_events(&transaction, state)?; persist_review_handoffs(&transaction, state)?; persist_review_orchestrations(&transaction, state)?; + persist_connector_backoffs(&transaction, state)?; transaction.commit()?; @@ -436,8 +462,23 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; } fn delete_project(&mut self, service_id: &str) -> Result<()> { - self.connection - .execute("DELETE FROM projects WHERE service_id = ?1", params![service_id])?; + let transaction = self.connection.transaction()?; + + transaction.execute("DELETE FROM projects WHERE service_id = ?1", params![service_id])?; + transaction.execute( + "DELETE FROM connector_backoffs WHERE project_id = ?1", + params![service_id], + )?; + transaction.commit()?; + + Ok(()) + } + + fn delete_connector_backoff(&self, project_id: &str, connector: &str) -> Result<()> { + self.connection.execute( + "DELETE FROM connector_backoffs WHERE project_id = ?1 AND connector = ?2", + params![project_id, connector], + )?; Ok(()) } @@ -1229,6 +1270,40 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; Ok(()) } + + fn load_connector_backoffs(&self, state: &mut StateData) -> Result<()> { + let mut statement = self.connection.prepare( + "SELECT project_id, connector, sync_phase, quota_class, reset_unix_epoch, \ + reset_source, warning, updated_at, updated_at_unix FROM connector_backoffs", + )?; + let rows = statement.query_map([], |row| { + let project_id: String = row.get(0)?; + let connector: String = row.get(1)?; + + Ok(( + (project_id.clone(), connector.clone()), + ConnectorBackoff { + project_id, + connector, + sync_phase: row.get(2)?, + quota_class: row.get(3)?, + reset_unix_epoch: row.get(4)?, + reset_source: row.get(5)?, + warning: row.get(6)?, + updated_at: row.get(7)?, + updated_at_unix: row.get(8)?, + }, + )) + })?; + + for row in rows { + let (key, record) = row?; + + state.connector_backoffs.insert(key, record); + } + + Ok(()) + } } struct TimestampParts { @@ -2286,6 +2361,30 @@ fn persist_review_orchestrations(transaction: &Transaction<'_>, state: &StateDat Ok(()) } +fn persist_connector_backoffs(transaction: &Transaction<'_>, state: &StateData) -> Result<()> { + for record in state.connector_backoffs.values() { + transaction.execute( + "INSERT OR REPLACE INTO connector_backoffs ( + project_id, connector, sync_phase, quota_class, reset_unix_epoch, + reset_source, warning, updated_at, updated_at_unix + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", + params![ + record.project_id, + record.connector, + record.sync_phase, + record.quota_class, + record.reset_unix_epoch, + record.reset_source, + record.warning, + record.updated_at, + record.updated_at_unix, + ], + )?; + } + + Ok(()) +} + fn dispatch_slot_lock_path(root: &Path, slot_index: usize) -> PathBuf { root.join(format!("{DISPATCH_SLOT_LOCK_FILE_PREFIX}.{slot_index}.lock")) } diff --git a/apps/decodex/src/state/models.rs b/apps/decodex/src/state/models.rs index c4e2bdc0..03b3b6e8 100644 --- a/apps/decodex/src/state/models.rs +++ b/apps/decodex/src/state/models.rs @@ -258,6 +258,66 @@ impl WorktreeMapping { } } +/// Project-scoped external connector backoff retained in the runtime store. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct ConnectorBackoff { + project_id: String, + connector: String, + sync_phase: String, + quota_class: String, + reset_unix_epoch: i64, + reset_source: String, + warning: String, + updated_at: String, + updated_at_unix: i64, +} +impl ConnectorBackoff { + /// Local project identifier affected by this connector backoff. + pub fn project_id(&self) -> &str { + &self.project_id + } + + /// Connector name, such as `linear`. + pub fn connector(&self) -> &str { + &self.connector + } + + /// Runtime phase that last observed the connector backoff. + pub fn sync_phase(&self) -> &str { + &self.sync_phase + } + + /// Quota class backing the pause. + pub fn quota_class(&self) -> &str { + &self.quota_class + } + + /// Unix epoch when Decodex may retry the connector. + pub fn reset_unix_epoch(&self) -> i64 { + self.reset_unix_epoch + } + + /// Source for the reset time. + pub fn reset_source(&self) -> &str { + &self.reset_source + } + + /// Snapshot warning represented by this backoff. + pub fn warning(&self) -> &str { + &self.warning + } + + /// Timestamp when Decodex stored the backoff. + pub fn updated_at(&self) -> &str { + &self.updated_at + } + + /// Unix timestamp when Decodex stored the backoff. + pub fn updated_at_unix(&self) -> i64 { + self.updated_at_unix + } +} + /// Unix file-descriptor handoff for a daemon-planned lease adopted by a child process. pub struct PreacquiredLeaseGuards { /// The inherited issue-claim lock fd that keeps one issue single-owned across processes. diff --git a/apps/decodex/src/state/store.rs b/apps/decodex/src/state/store.rs index 80fe3de8..72a5c2c9 100644 --- a/apps/decodex/src/state/store.rs +++ b/apps/decodex/src/state/store.rs @@ -1,5 +1,16 @@ use crate::workflow::WorkflowConcurrencyLimit; +/// Input fields for recording a project-scoped external connector backoff. +pub(crate) struct ConnectorBackoffInput<'a> { + pub(crate) project_id: &'a str, + pub(crate) connector: &'a str, + pub(crate) sync_phase: &'a str, + pub(crate) quota_class: &'a str, + pub(crate) reset_unix_epoch: i64, + pub(crate) reset_source: &'a str, + pub(crate) warning: &'a str, +} + /// Shared dispatch-slot capacity for one project. #[derive(Clone, Copy)] pub(crate) enum DispatchSlotLimit { @@ -130,6 +141,61 @@ impl StateStore { self.persist_runtime_state_locked(&state) } + /// Create or replace a project-scoped external connector backoff. + pub(crate) fn upsert_connector_backoff( + &self, + input: ConnectorBackoffInput<'_>, + ) -> Result { + let now = timestamp_parts(); + let record = ConnectorBackoff { + project_id: input.project_id.to_owned(), + connector: input.connector.to_owned(), + sync_phase: input.sync_phase.to_owned(), + quota_class: input.quota_class.to_owned(), + reset_unix_epoch: input.reset_unix_epoch, + reset_source: input.reset_source.to_owned(), + warning: input.warning.to_owned(), + updated_at: now.text, + updated_at_unix: now.unix, + }; + let mut state = self.lock()?; + + state.connector_backoffs.insert( + (input.project_id.to_owned(), input.connector.to_owned()), + record.clone(), + ); + self.persist_runtime_state_locked(&state)?; + + Ok(record) + } + + /// Read a project-scoped connector backoff from the runtime store. + pub(crate) fn connector_backoff( + &self, + project_id: &str, + connector: &str, + ) -> Result> { + let state = self.lock()?; + + Ok(state + .connector_backoffs + .get(&(project_id.to_owned(), connector.to_owned())) + .cloned()) + } + + /// Clear a project-scoped connector backoff from the runtime store. + pub(crate) fn clear_connector_backoff( + &self, + project_id: &str, + connector: &str, + ) -> Result<()> { + let mut state = self.lock()?; + + state.connector_backoffs.remove(&(project_id.to_owned(), connector.to_owned())); + + self.delete_connector_backoff_locked(project_id, connector) + } + /// Configure the shared cross-process dispatch-slot root for one project. pub(crate) fn configure_dispatch_slot_root( &self, @@ -1454,6 +1520,17 @@ impl StateStore { sqlite.delete_project(service_id) } + fn delete_connector_backoff_locked(&self, project_id: &str, connector: &str) -> Result<()> { + let Some(sqlite) = self.sqlite.as_ref() else { + return Ok(()); + }; + let sqlite = sqlite + .lock() + .map_err(|_| eyre::eyre!("StateStore SQLite mutex is poisoned."))?; + + sqlite.delete_connector_backoff(project_id, connector) + } + fn upsert_run_attempt_locked(&self, attempt: &RunAttemptRecord) -> Result<()> { let Some(sqlite) = self.sqlite.as_ref() else { return Ok(()); diff --git a/apps/decodex/src/state/tests.rs b/apps/decodex/src/state/tests.rs index b3774c4f..e625bd64 100644 --- a/apps/decodex/src/state/tests.rs +++ b/apps/decodex/src/state/tests.rs @@ -13,9 +13,10 @@ use tempfile::TempDir; use crate::{ state::{ self, ChildAgentActivitySummary, CodexAccountActivitySummary, CodexAccountMarker, - DispatchSlotLimit, EffectiveRuntimeMarker, PreacquiredLeaseGuards, ProjectRegistration, - ProtocolActivityMarker, ProtocolActivitySummary, RUN_ACTIVITY_MARKER_FILE, - RUN_OPERATION_REPO_GATE, ReviewHandoffMarker, ReviewOrchestrationMarker, StateStore, + ConnectorBackoffInput, DispatchSlotLimit, EffectiveRuntimeMarker, PreacquiredLeaseGuards, + ProjectRegistration, ProtocolActivityMarker, ProtocolActivitySummary, + RUN_ACTIVITY_MARKER_FILE, RUN_OPERATION_REPO_GATE, ReviewHandoffMarker, + ReviewOrchestrationMarker, StateStore, }, tracker::records::{LinearExecutionEventIdentity, LinearExecutionEventRecord}, }; @@ -111,6 +112,50 @@ fn review_markers_roundtrip_preserve_required_fields() { assert_eq!(restored_orchestration, orchestration); } +#[test] +fn connector_backoff_roundtrip_and_clear_from_runtime_store() { + let temp_dir = TempDir::new().expect("tempdir should create"); + let state_path = temp_dir.path().join("runtime.sqlite3"); + let store = StateStore::open(&state_path).expect("state store should open"); + + store + .upsert_connector_backoff(ConnectorBackoffInput { + project_id: "pubfi", + connector: "linear", + sync_phase: "post_review_lane_status", + quota_class: "linear_graphql_api", + reset_unix_epoch: 1_777_392_000, + reset_source: "linear", + warning: "tracker_rate_limited", + }) + .expect("connector backoff should persist"); + + let reopened = StateStore::open(&state_path).expect("state store should reopen"); + let backoff = reopened + .connector_backoff("pubfi", "linear") + .expect("connector backoff should read") + .expect("connector backoff should exist"); + + assert_eq!(backoff.project_id(), "pubfi"); + assert_eq!(backoff.connector(), "linear"); + assert_eq!(backoff.sync_phase(), "post_review_lane_status"); + assert_eq!(backoff.quota_class(), "linear_graphql_api"); + assert_eq!(backoff.reset_unix_epoch(), 1_777_392_000); + assert_eq!(backoff.reset_source(), "linear"); + assert_eq!(backoff.warning(), "tracker_rate_limited"); + + reopened.clear_connector_backoff("pubfi", "linear").expect("connector backoff should clear"); + + let reopened = StateStore::open(&state_path).expect("state store should reopen again"); + + assert!( + reopened + .connector_backoff("pubfi", "linear") + .expect("connector backoff should read after clear") + .is_none() + ); +} + #[test] fn clear_review_markers_for_handoff_preserves_other_branches() { let temp_dir = TempDir::new().expect("tempdir should create"); diff --git a/docs/runbook/recover-review-handoff.md b/docs/runbook/recover-review-handoff.md index 45963689..cc13374c 100644 --- a/docs/runbook/recover-review-handoff.md +++ b/docs/runbook/recover-review-handoff.md @@ -79,12 +79,13 @@ repair required, or blocked for a different concrete reason. ## Active Ownership Recovery -If diagnosis reports `classification: review_handoff_bound` but -`active_label_present: false`, do not run rebind just to restore ownership. First -verify the issue is still meant to continue the retained post-review lifecycle for this -service, then restore the issue to the workflow success state and add -`decodex:active:`. If the issue still has `decodex:needs-attention`, clear -that label only after the recorded blocker has been repaired. +If diagnosis reports `classification: review_handoff_ownership_drift`, +`reason: active_ownership_label_missing`, and `active_label_present: false`, do not run +rebind just to restore ownership. First verify the issue is still meant to continue the +retained post-review lifecycle for this service, then restore the issue to the workflow +success state and add `decodex:active:`. If the issue still has +`decodex:needs-attention`, clear that label only after the recorded blocker has been +repaired. After restoring explicit ownership, rerun: diff --git a/docs/spec/post-review-lifecycle.md b/docs/spec/post-review-lifecycle.md index ab622dbc..dcf7b3ed 100644 --- a/docs/spec/post-review-lifecycle.md +++ b/docs/spec/post-review-lifecycle.md @@ -78,6 +78,13 @@ marker PR URL, and marker head SHA, and it may expose `readback_warning = "pull_request_state_read_failed"` until the next successful PR state refresh. +When Linear issue metadata readback is degraded by connector backoff, operator status +must still keep locally retained handoff rows visible with the marker PR URL and head +SHA and must mark the row as tracker-readback degraded instead of presenting the PR or +code state as failed. If the handoff is bound but `decodex:active:` is +missing, recovery diagnosis must classify the state as ownership drift and give the +single label repair action instead of recommending rebind. + The supported operator recovery surface is `decodex recover review-handoff`. This is a break-glass recovery path for orphaned retained review lanes and stale retained marker heads after explicit manual repair or rebase. It is not part of the normal automation @@ -86,8 +93,9 @@ success path. - `diagnose` is read-only. It reports the project, issue, branch, worktree, local head, active automation label, existing PR URL when present, stored handoff head, stored orchestration head, PR base/head when readable, and the missing or mismatched marker - reason. A diagnostic may report a bound marker, a missing marker, an unverified PR - read, or a concrete field mismatch that requires explicit rebind. + reason. A diagnostic may report a bound marker, active ownership drift, a missing + marker, an unverified PR read, or a concrete field mismatch that requires explicit + rebind. - `rebind` is mutating and requires an explicit issue identifier plus PR URL. It must validate the configured project, tracker issue, success-state compatibility, active automation ownership, retained worktree branch, clean worktree, PR repository, PR base,