diff --git a/apps/decodex/src/orchestrator.rs b/apps/decodex/src/orchestrator.rs index baee2af4..444a7a10 100644 --- a/apps/decodex/src/orchestrator.rs +++ b/apps/decodex/src/orchestrator.rs @@ -69,6 +69,7 @@ pub(crate) const EXTERNAL_REVIEW_MERGE_VISIBILITY_TIMEOUT_SECS: i64 = 15 * 60; const CONTINUATION_RETRY_DELAY_MS: u64 = 1_000; const FAILURE_RETRY_BASE_DELAY_MS: u64 = 10_000; +const RECOVERABLE_WORKTREE_SKIP_TTL: Duration = Duration::from_secs(10 * 60); const AGENT_GIT_ASKPASS_PREFIX: &str = ".decodex-git-askpass-"; const CONTINUATION_PENDING_RUN_STATUS: &str = "continuation_pending"; const TERMINAL_GUARDED_RUN_STATUS: &str = "terminal_guarded"; diff --git a/apps/decodex/src/orchestrator/daemon.rs b/apps/decodex/src/orchestrator/daemon.rs index ad3747dd..df3ef770 100644 --- a/apps/decodex/src/orchestrator/daemon.rs +++ b/apps/decodex/src/orchestrator/daemon.rs @@ -13,6 +13,7 @@ struct DaemonTickRuntimeContext<'a, T, I> { workflow: &'a WorkflowDocument, worktree_manager: &'a WorktreeManager, review_state_inspector: &'a I, + recoverable_worktree_skip_cache: Option<&'a mut RecoverableWorktreeSkipCache>, } fn load_daemon_tick_context( @@ -73,6 +74,7 @@ fn run_daemon_tick( state_store: &StateStore, active_children: &mut Vec, retry_queue: &mut RetryQueue, + recoverable_worktree_skip_cache: &mut RecoverableWorktreeSkipCache, context: &DaemonTickContext, ) -> Result<()> { let review_state_inspector = GhPullRequestReviewStateInspector { @@ -90,6 +92,7 @@ fn run_daemon_tick( workflow: &context.workflow, worktree_manager: &context.worktree_manager, review_state_inspector: &review_state_inspector, + recoverable_worktree_skip_cache: Some(recoverable_worktree_skip_cache), }, ) } @@ -99,7 +102,7 @@ fn run_daemon_tick_with_review_state_inspector( state_store: &StateStore, active_children: &mut Vec, retry_queue: &mut RetryQueue, - context: DaemonTickRuntimeContext<'_, T, I>, + mut context: DaemonTickRuntimeContext<'_, T, I>, ) -> Result<()> where T: IssueTracker, @@ -116,12 +119,15 @@ where )?; if active_children.is_empty() { + let recoverable_worktree_skip_cache = context.recoverable_worktree_skip_cache.as_deref_mut(); + recover_and_reconcile_idle_daemon_state( context.tracker, context.project, context.workflow, state_store, context.worktree_manager, + recoverable_worktree_skip_cache, )?; } @@ -162,15 +168,17 @@ fn recover_and_reconcile_idle_daemon_state( workflow: &WorkflowDocument, state_store: &StateStore, worktree_manager: &WorktreeManager, + recoverable_worktree_skip_cache: Option<&mut RecoverableWorktreeSkipCache>, ) -> Result<()> where T: IssueTracker, { - let _ = recover_runtime_state_from_tracker_and_worktrees( + let _ = recover_runtime_state_from_tracker_and_worktrees_with_skip_cache( tracker, project, workflow, state_store, + recoverable_worktree_skip_cache, )?; reconcile_project_state(tracker, project, workflow, state_store, worktree_manager) diff --git a/apps/decodex/src/orchestrator/entrypoints.rs b/apps/decodex/src/orchestrator/entrypoints.rs index f0679f59..56f1d2ba 100644 --- a/apps/decodex/src/orchestrator/entrypoints.rs +++ b/apps/decodex/src/orchestrator/entrypoints.rs @@ -1130,6 +1130,7 @@ fn control_plane_project_snapshot( state_store, &mut runtime.active_children, &mut runtime.retry_queue, + &mut runtime.recoverable_worktree_skip_cache, context, ) { if let Some(connector_backoff) = remember_tracker_backoff( diff --git a/apps/decodex/src/orchestrator/status.rs b/apps/decodex/src/orchestrator/status.rs index d9e8119c..48dd3db3 100644 --- a/apps/decodex/src/orchestrator/status.rs +++ b/apps/decodex/src/orchestrator/status.rs @@ -42,6 +42,19 @@ enum AccountActivityMode { Snapshot, } +#[derive(Clone, Copy)] +enum RunIssueMetadataHydration { + AllRows, + ActiveRowsOnly, +} + +#[derive(Clone, Copy)] +struct LiveOperatorStatusSnapshotOptions { + hydrate_history_ledger: bool, + run_issue_metadata_hydration: RunIssueMetadataHydration, + account_activity_mode: AccountActivityMode, +} + enum TrackerObserverOutcome { Ok, Unavailable, @@ -132,6 +145,7 @@ struct LiveOperatorStatusObserverContext<'a, T> { state_store: &'a StateStore, review_state_inspector: &'a GhPullRequestReviewStateInspector, hydrate_history_ledger: bool, + run_issue_metadata_hydration: RunIssueMetadataHydration, } struct PostReviewLaneBuildContext<'a, I> { @@ -289,8 +303,11 @@ where workflow, state_store, limit, - true, - AccountActivityMode::Probe, + LiveOperatorStatusSnapshotOptions { + hydrate_history_ledger: true, + run_issue_metadata_hydration: RunIssueMetadataHydration::AllRows, + account_activity_mode: AccountActivityMode::Probe, + }, ) } @@ -310,8 +327,11 @@ where workflow, state_store, limit, - false, - AccountActivityMode::Snapshot, + LiveOperatorStatusSnapshotOptions { + hydrate_history_ledger: false, + run_issue_metadata_hydration: RunIssueMetadataHydration::ActiveRowsOnly, + account_activity_mode: AccountActivityMode::Snapshot, + }, ) } @@ -321,8 +341,7 @@ fn build_live_operator_status_snapshot_with_history_ledger( workflow: &WorkflowDocument, state_store: &StateStore, limit: usize, - hydrate_history_ledger: bool, - account_activity_mode: AccountActivityMode, + options: LiveOperatorStatusSnapshotOptions, ) -> crate::prelude::Result where T: IssueTracker, @@ -340,7 +359,7 @@ where project, state_store, limit, - account_activity_mode, + options.account_activity_mode, )?; hydrate_history_lanes_from_local_ledger(project, state_store, &mut snapshot)?; @@ -351,7 +370,8 @@ where workflow, state_store, review_state_inspector: &review_state_inspector, - hydrate_history_ledger, + hydrate_history_ledger: options.hydrate_history_ledger, + run_issue_metadata_hydration: options.run_issue_metadata_hydration, }, &mut snapshot, )?; @@ -376,7 +396,12 @@ where if !paused { paused = apply_tracker_observer_outcome( - hydrate_operator_run_rows_from_tracker(context.tracker, context.project, snapshot), + hydrate_operator_run_rows_from_tracker( + context.tracker, + context.project, + snapshot, + context.run_issue_metadata_hydration, + ), snapshot, context.state_store, context.project, @@ -1171,11 +1196,12 @@ fn hydrate_operator_run_rows_from_tracker( tracker: &T, project: &ServiceConfig, snapshot: &mut OperatorStatusSnapshot, + hydration: RunIssueMetadataHydration, ) -> TrackerObserverOutcome where T: IssueTracker, { - let issue_ids = operator_snapshot_run_issue_ids(snapshot); + let issue_ids = operator_snapshot_run_issue_ids(snapshot, hydration); if issue_ids.is_empty() { return TrackerObserverOutcome::Ok; @@ -1220,17 +1246,26 @@ where } } -fn operator_snapshot_run_issue_ids(snapshot: &OperatorStatusSnapshot) -> Vec { +fn operator_snapshot_run_issue_ids( + snapshot: &OperatorStatusSnapshot, + hydration: RunIssueMetadataHydration, +) -> Vec { let mut issue_ids = BTreeSet::new(); - for run in snapshot.active_runs.iter().chain(snapshot.recent_runs.iter()) { + for run in &snapshot.active_runs { append_operator_run_issue_id(&mut issue_ids, run); } - for lane in &snapshot.history_lanes { - append_operator_run_issue_id(&mut issue_ids, &lane.latest_run); - for attempt in &lane.attempts { - append_operator_run_issue_id(&mut issue_ids, attempt); + if matches!(hydration, RunIssueMetadataHydration::AllRows) { + for run in &snapshot.recent_runs { + append_operator_run_issue_id(&mut issue_ids, run); + } + for lane in &snapshot.history_lanes { + append_operator_run_issue_id(&mut issue_ids, &lane.latest_run); + + for attempt in &lane.attempts { + append_operator_run_issue_id(&mut issue_ids, attempt); + } } } @@ -3669,6 +3704,25 @@ fn recover_runtime_state_from_tracker_and_worktrees( workflow: &WorkflowDocument, state_store: &StateStore, ) -> crate::prelude::Result +where + T: IssueTracker, +{ + recover_runtime_state_from_tracker_and_worktrees_with_skip_cache( + tracker, + project, + workflow, + state_store, + None, + ) +} + +fn recover_runtime_state_from_tracker_and_worktrees_with_skip_cache( + tracker: &T, + project: &ServiceConfig, + workflow: &WorkflowDocument, + state_store: &StateStore, + mut recoverable_worktree_skip_cache: Option<&mut RecoverableWorktreeSkipCache>, +) -> crate::prelude::Result where T: IssueTracker, { @@ -3686,7 +3740,11 @@ where } } - let mut issues = tracker.refresh_issues(&issue_ids)?; + let mut issues = if issue_ids.is_empty() && recoverable_worktree_skip_cache.is_some() { + Vec::new() + } else { + tracker.refresh_issues(&issue_ids)? + }; let mut known_identifiers = issues.iter().map(|issue| issue.identifier.to_ascii_uppercase()).collect::>(); @@ -3697,6 +3755,7 @@ where &issue_identifier, &mut known_identifiers, &mut issues, + recoverable_worktree_skip_cache.as_deref_mut(), )?; } @@ -3824,6 +3883,7 @@ fn append_recoverable_tracker_issue( issue_identifier: &str, known_identifiers: &mut BTreeSet, issues: &mut Vec, + mut recoverable_worktree_skip_cache: Option<&mut RecoverableWorktreeSkipCache>, ) -> crate::prelude::Result<()> where T: IssueTracker, @@ -3834,7 +3894,24 @@ where return Ok(()); } + let now = Instant::now(); + + if let Some(cache) = recoverable_worktree_skip_cache.as_deref_mut() + && cache.is_suppressed(&canonical_identifier, now) + { + tracing::debug!( + issue = canonical_identifier, + "Skipped retained worktree tracker lookup because a recent recovery probe already found no service ownership." + ); + + return Ok(()); + } + let Some(issue) = tracker.get_issue_by_identifier(issue_identifier)? else { + if let Some(cache) = recoverable_worktree_skip_cache { + cache.remember(&canonical_identifier, now); + } + return Ok(()); }; @@ -3846,6 +3923,10 @@ where "Skipping retained worktree recovery because the tracker issue is not explicitly owned by this service." ); + if let Some(cache) = recoverable_worktree_skip_cache { + cache.remember(&canonical_identifier, now); + } + return Ok(()); } diff --git a/apps/decodex/src/orchestrator/tests.rs b/apps/decodex/src/orchestrator/tests.rs index 54b54e83..4bd02dda 100644 --- a/apps/decodex/src/orchestrator/tests.rs +++ b/apps/decodex/src/orchestrator/tests.rs @@ -135,8 +135,10 @@ struct FakeTracker { identifier_lookup_issues: Option>, issues_by_label: HashMap>, team_label_ids_by_name: HashMap<(String, String), String>, + identifier_queries: RefCell>, refresh_snapshots: RefCell>>, refresh_error: RefCell>, + refresh_queries: RefCell>>, label_queries: RefCell>, comment_queries: RefCell>, comments: RefCell>, @@ -168,8 +170,10 @@ impl FakeTracker { identifier_lookup_issues: None, issues_by_label: HashMap::new(), team_label_ids_by_name: HashMap::new(), + identifier_queries: RefCell::new(Vec::new()), refresh_snapshots: RefCell::new(refresh_snapshots), refresh_error: RefCell::new(None), + refresh_queries: RefCell::new(Vec::new()), label_queries: RefCell::new(Vec::new()), comment_queries: RefCell::new(Vec::new()), comments: RefCell::new(Vec::new()), @@ -258,6 +262,8 @@ impl IssueTracker for FakeTracker { } fn get_issue_by_identifier(&self, issue_identifier: &str) -> Result> { + self.identifier_queries.borrow_mut().push(issue_identifier.to_owned()); + let issues = self.identifier_lookup_issues.as_ref().unwrap_or(&self.listed_issues); Ok(issues @@ -267,6 +273,8 @@ impl IssueTracker for FakeTracker { } fn refresh_issues(&self, issue_ids: &[String]) -> Result> { + self.refresh_queries.borrow_mut().push(issue_ids.to_vec()); + if let Some(message) = self.refresh_error.borrow_mut().take() { return Err(Report::msg(message)); } diff --git a/apps/decodex/src/orchestrator/tests/operator/status/publishing.rs b/apps/decodex/src/orchestrator/tests/operator/status/publishing.rs index 6aea5475..78e54e30 100644 --- a/apps/decodex/src/orchestrator/tests/operator/status/publishing.rs +++ b/apps/decodex/src/orchestrator/tests/operator/status/publishing.rs @@ -336,6 +336,117 @@ fn operator_state_snapshot_publish_does_not_derive_history_outcome_without_execu ); } +#[test] +fn operator_state_snapshot_publish_skips_terminal_run_metadata_refresh() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let issue = sample_issue_with_sort_fields( + "issue-1", + "XY-355", + "Done", + &[], + Some(3), + "2026-04-29T10:11:00Z", + ); + let tracker = FakeTracker::with_refresh_error( + vec![issue.clone()], + "Linear connector is rate limited: Rate limit exceeded. Only 2500 requests are allowed per 1 hour.", + ); + + state_store + .upsert_worktree( + TEST_SERVICE_ID, + &issue.id, + "y/decodex-xy-355", + &config.worktree_root().join(&issue.identifier).display().to_string(), + ) + .expect("worktree should remember project ownership"); + state_store + .record_run_attempt("xy-355-attempt-1", &issue.id, 1, "succeeded") + .expect("successful attempt should record"); + state_store + .clear_worktree(&issue.id) + .expect("completed lane cleanup should clear local worktree"); + + let snapshot = orchestrator::build_operator_state_snapshot_for_publish( + &tracker, + &config, + &workflow, + &state_store, + 10, + &[], + &[], + ) + .expect("terminal-only publish should avoid Linear metadata refresh"); + + assert_eq!(snapshot.history_lanes.len(), 1); + assert!( + !snapshot + .warnings + .iter() + .any(|warning| warning == orchestrator::TRACKER_RATE_LIMIT_WARNING), + "terminal-only publish should not enter backoff from run metadata" + ); + assert!( + tracker.refresh_queries.borrow().is_empty(), + "control-plane publish should not refresh terminal recent/history run metadata" + ); +} + +#[test] +fn operator_state_snapshot_publish_still_refreshes_active_run_metadata() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let issue = sample_issue_with_sort_fields( + "issue-1", + "XY-355", + "In Progress", + &[], + Some(3), + "2026-04-29T10:11:00Z", + ); + let tracker = FakeTracker::new(vec![issue.clone()]); + let worktree_path = config.worktree_root().join(&issue.identifier); + + state_store + .record_run_attempt("xy-355-attempt-1", &issue.id, 1, "running") + .expect("running attempt should record"); + state_store + .upsert_lease(TEST_SERVICE_ID, &issue.id, "xy-355-attempt-1", "In Progress") + .expect("active lease should record"); + state_store + .upsert_worktree( + TEST_SERVICE_ID, + &issue.id, + "y/decodex-xy-355", + &worktree_path.display().to_string(), + ) + .expect("worktree should remember project ownership"); + + let snapshot = orchestrator::build_operator_state_snapshot_for_publish( + &tracker, + &config, + &workflow, + &state_store, + 10, + &[], + &[], + ) + .expect("active publish should build"); + let refresh_queries = tracker.refresh_queries.borrow(); + + assert!( + refresh_queries + .iter() + .any(|query| query.len() == 1 && query.first() == Some(&issue.id)), + "active publish should still refresh the active run issue metadata" + ); + assert_eq!(snapshot.active_runs.len(), 1); + assert_eq!(snapshot.active_runs[0].issue_identifier.as_deref(), Some("XY-355")); + assert_eq!(snapshot.active_runs[0].title.as_deref(), Some("Implement orchestration")); + assert_eq!(snapshot.active_runs[0].author.as_deref(), Some("Yvette")); +} + #[test] fn operator_state_snapshot_publish_reads_local_completed_ledger_details_without_comment_replay() { let (_temp_dir, config, workflow) = temp_project_layout(); diff --git a/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs b/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs index d2669cc0..cbd831b9 100644 --- a/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs +++ b/apps/decodex/src/orchestrator/tests/recovery/runtime_reentry.rs @@ -1,3 +1,5 @@ +use orchestrator::RecoverableWorktreeSkipCache; + #[test] fn exited_child_reconciliation_detects_stalled_failed_runs_from_protocol_idle() { let (_temp_dir, config, workflow) = temp_project_layout(); @@ -1090,6 +1092,49 @@ fn run_project_once_recovers_worktree_when_identifier_lookup_labels_are_truncate assert_eq!(summary.worktree_path, expected_worktree); } +#[test] +fn recovery_skip_cache_suppresses_repeated_unowned_worktree_lookup() { + let (_temp_dir, config, workflow) = temp_project_layout(); + let issue = sample_issue("Todo", &[]); + let tracker = FakeTracker::new(Vec::new()).with_identifier_lookup_issues(vec![issue.clone()]); + let state_store = StateStore::open_in_memory().expect("state store should open"); + let worktree_path = config.worktree_root().join(&issue.identifier); + let mut skip_cache = RecoverableWorktreeSkipCache::default(); + + fs::create_dir_all(&worktree_path).expect("stale worktree directory should exist"); + + let first = orchestrator::recover_runtime_state_from_tracker_and_worktrees_with_skip_cache( + &tracker, + &config, + &workflow, + &state_store, + Some(&mut skip_cache), + ) + .expect("first recovery probe should succeed"); + let second = orchestrator::recover_runtime_state_from_tracker_and_worktrees_with_skip_cache( + &tracker, + &config, + &workflow, + &state_store, + Some(&mut skip_cache), + ) + .expect("cached recovery probe should succeed"); + let identifier_queries = tracker.identifier_queries.borrow(); + + assert!(first.active_issues.is_empty()); + assert!(second.active_issues.is_empty()); + assert_eq!(identifier_queries.len(), 1); + assert_eq!(identifier_queries[0], issue.identifier); + assert!( + tracker.refresh_queries.borrow().is_empty(), + "empty known issue sets should not call tracker refresh" + ); + assert!( + tracker.label_queries.borrow().is_empty(), + "complete issue labels should not need server confirmation" + ); +} + #[test] fn live_run_skips_issue_that_becomes_ineligible_after_worktree_prepare() { let (_temp_dir, config, workflow) = temp_project_layout(); @@ -1195,6 +1240,7 @@ fn idle_daemon_recovery_reconstructs_completed_closeout_worktree_mapping() { &workflow, &state_store, &worktree_manager, + None, ) .expect("idle daemon recovery should succeed"); diff --git a/apps/decodex/src/orchestrator/tests/retry/scheduling.rs b/apps/decodex/src/orchestrator/tests/retry/scheduling.rs index e37acd05..0459d5ae 100644 --- a/apps/decodex/src/orchestrator/tests/retry/scheduling.rs +++ b/apps/decodex/src/orchestrator/tests/retry/scheduling.rs @@ -1370,6 +1370,7 @@ fn daemon_tick_reconciles_ready_retained_review_lane_before_dry_run_planning() { review_state_inspector: &FakePullRequestReviewStateInspector::new(vec![Ok( review_state, )]), + recoverable_worktree_skip_cache: None, }, ); @@ -1487,6 +1488,7 @@ fn daemon_tick_clears_terminal_mapping_without_worktree_before_retained_land() { &head_oid, ), )]), + recoverable_worktree_skip_cache: None, }, ) .expect("daemon tick should not fail on stale terminal worktree state"); diff --git a/apps/decodex/src/orchestrator/types.rs b/apps/decodex/src/orchestrator/types.rs index 2361bf76..fd68de5c 100644 --- a/apps/decodex/src/orchestrator/types.rs +++ b/apps/decodex/src/orchestrator/types.rs @@ -512,6 +512,30 @@ impl RetryQueue { } } +#[derive(Default)] +struct RecoverableWorktreeSkipCache { + entries: HashMap, +} +impl RecoverableWorktreeSkipCache { + fn is_suppressed(&mut self, issue_identifier: &str, now: Instant) -> bool { + self.retain_active(now); + + self.entries.get(&issue_identifier.to_ascii_uppercase()).is_some_and(|until| *until > now) + } + + fn remember(&mut self, issue_identifier: &str, now: Instant) { + self.retain_active(now); + self.entries.insert( + issue_identifier.to_ascii_uppercase(), + now + RECOVERABLE_WORKTREE_SKIP_TTL, + ); + } + + fn retain_active(&mut self, now: Instant) { + self.entries.retain(|_, until| *until > now); + } +} + struct DaemonTickContext { config: ServiceConfig, workflow: WorkflowDocument, @@ -525,6 +549,7 @@ struct ProjectDaemonRuntime { retry_queue: RetryQueue, tracker_backoff: Option, workflow_cache: Option, + recoverable_worktree_skip_cache: RecoverableWorktreeSkipCache, } #[derive(Clone, Debug)]