Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/decodex/src/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub(crate) const EXTERNAL_REVIEW_MERGE_VISIBILITY_TIMEOUT_SECS: i64 = 15 * 60;

const CONTINUATION_RETRY_DELAY_MS: u64 = 1_000;
const FAILURE_RETRY_BASE_DELAY_MS: u64 = 10_000;
const RECOVERABLE_WORKTREE_SKIP_TTL: Duration = Duration::from_secs(10 * 60);
const AGENT_GIT_ASKPASS_PREFIX: &str = ".decodex-git-askpass-";
const CONTINUATION_PENDING_RUN_STATUS: &str = "continuation_pending";
const TERMINAL_GUARDED_RUN_STATUS: &str = "terminal_guarded";
Expand Down
12 changes: 10 additions & 2 deletions apps/decodex/src/orchestrator/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ struct DaemonTickRuntimeContext<'a, T, I> {
workflow: &'a WorkflowDocument,
worktree_manager: &'a WorktreeManager,
review_state_inspector: &'a I,
recoverable_worktree_skip_cache: Option<&'a mut RecoverableWorktreeSkipCache>,
}

fn load_daemon_tick_context(
Expand Down Expand Up @@ -73,6 +74,7 @@ fn run_daemon_tick(
state_store: &StateStore,
active_children: &mut Vec<DaemonRunChild>,
retry_queue: &mut RetryQueue,
recoverable_worktree_skip_cache: &mut RecoverableWorktreeSkipCache,
context: &DaemonTickContext,
) -> Result<()> {
let review_state_inspector = GhPullRequestReviewStateInspector {
Expand All @@ -90,6 +92,7 @@ fn run_daemon_tick(
workflow: &context.workflow,
worktree_manager: &context.worktree_manager,
review_state_inspector: &review_state_inspector,
recoverable_worktree_skip_cache: Some(recoverable_worktree_skip_cache),
},
)
}
Expand All @@ -99,7 +102,7 @@ fn run_daemon_tick_with_review_state_inspector<T, I>(
state_store: &StateStore,
active_children: &mut Vec<DaemonRunChild>,
retry_queue: &mut RetryQueue,
context: DaemonTickRuntimeContext<'_, T, I>,
mut context: DaemonTickRuntimeContext<'_, T, I>,
) -> Result<()>
where
T: IssueTracker,
Expand All @@ -116,12 +119,15 @@ where
)?;

if active_children.is_empty() {
let recoverable_worktree_skip_cache = context.recoverable_worktree_skip_cache.as_deref_mut();

recover_and_reconcile_idle_daemon_state(
context.tracker,
context.project,
context.workflow,
state_store,
context.worktree_manager,
recoverable_worktree_skip_cache,
)?;
}

Expand Down Expand Up @@ -162,15 +168,17 @@ fn recover_and_reconcile_idle_daemon_state<T>(
workflow: &WorkflowDocument,
state_store: &StateStore,
worktree_manager: &WorktreeManager,
recoverable_worktree_skip_cache: Option<&mut RecoverableWorktreeSkipCache>,
) -> Result<()>
where
T: IssueTracker,
{
let _ = recover_runtime_state_from_tracker_and_worktrees(
let _ = recover_runtime_state_from_tracker_and_worktrees_with_skip_cache(
tracker,
project,
workflow,
state_store,
recoverable_worktree_skip_cache,
)?;

reconcile_project_state(tracker, project, workflow, state_store, worktree_manager)
Expand Down
1 change: 1 addition & 0 deletions apps/decodex/src/orchestrator/entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,7 @@ fn control_plane_project_snapshot(
state_store,
&mut runtime.active_children,
&mut runtime.retry_queue,
&mut runtime.recoverable_worktree_skip_cache,
context,
) {
if let Some(connector_backoff) = remember_tracker_backoff(
Expand Down
115 changes: 98 additions & 17 deletions apps/decodex/src/orchestrator/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ enum AccountActivityMode {
Snapshot,
}

#[derive(Clone, Copy)]
enum RunIssueMetadataHydration {
AllRows,
ActiveRowsOnly,
}

#[derive(Clone, Copy)]
struct LiveOperatorStatusSnapshotOptions {
hydrate_history_ledger: bool,
run_issue_metadata_hydration: RunIssueMetadataHydration,
account_activity_mode: AccountActivityMode,
}

enum TrackerObserverOutcome {
Ok,
Unavailable,
Expand Down Expand Up @@ -132,6 +145,7 @@ struct LiveOperatorStatusObserverContext<'a, T> {
state_store: &'a StateStore,
review_state_inspector: &'a GhPullRequestReviewStateInspector,
hydrate_history_ledger: bool,
run_issue_metadata_hydration: RunIssueMetadataHydration,
}

struct PostReviewLaneBuildContext<'a, I> {
Expand Down Expand Up @@ -289,8 +303,11 @@ where
workflow,
state_store,
limit,
true,
AccountActivityMode::Probe,
LiveOperatorStatusSnapshotOptions {
hydrate_history_ledger: true,
run_issue_metadata_hydration: RunIssueMetadataHydration::AllRows,
account_activity_mode: AccountActivityMode::Probe,
},
)
}

Expand All @@ -310,8 +327,11 @@ where
workflow,
state_store,
limit,
false,
AccountActivityMode::Snapshot,
LiveOperatorStatusSnapshotOptions {
hydrate_history_ledger: false,
run_issue_metadata_hydration: RunIssueMetadataHydration::ActiveRowsOnly,
account_activity_mode: AccountActivityMode::Snapshot,
},
)
}

Expand All @@ -321,8 +341,7 @@ fn build_live_operator_status_snapshot_with_history_ledger<T>(
workflow: &WorkflowDocument,
state_store: &StateStore,
limit: usize,
hydrate_history_ledger: bool,
account_activity_mode: AccountActivityMode,
options: LiveOperatorStatusSnapshotOptions,
) -> crate::prelude::Result<OperatorStatusSnapshot>
where
T: IssueTracker,
Expand All @@ -340,7 +359,7 @@ where
project,
state_store,
limit,
account_activity_mode,
options.account_activity_mode,
)?;

hydrate_history_lanes_from_local_ledger(project, state_store, &mut snapshot)?;
Expand All @@ -351,7 +370,8 @@ where
workflow,
state_store,
review_state_inspector: &review_state_inspector,
hydrate_history_ledger,
hydrate_history_ledger: options.hydrate_history_ledger,
run_issue_metadata_hydration: options.run_issue_metadata_hydration,
},
&mut snapshot,
)?;
Expand All @@ -376,7 +396,12 @@ where

if !paused {
paused = apply_tracker_observer_outcome(
hydrate_operator_run_rows_from_tracker(context.tracker, context.project, snapshot),
hydrate_operator_run_rows_from_tracker(
context.tracker,
context.project,
snapshot,
context.run_issue_metadata_hydration,
),
snapshot,
context.state_store,
context.project,
Expand Down Expand Up @@ -1171,11 +1196,12 @@ fn hydrate_operator_run_rows_from_tracker<T>(
tracker: &T,
project: &ServiceConfig,
snapshot: &mut OperatorStatusSnapshot,
hydration: RunIssueMetadataHydration,
) -> TrackerObserverOutcome
where
T: IssueTracker,
{
let issue_ids = operator_snapshot_run_issue_ids(snapshot);
let issue_ids = operator_snapshot_run_issue_ids(snapshot, hydration);

if issue_ids.is_empty() {
return TrackerObserverOutcome::Ok;
Expand Down Expand Up @@ -1220,17 +1246,26 @@ where
}
}

fn operator_snapshot_run_issue_ids(snapshot: &OperatorStatusSnapshot) -> Vec<String> {
fn operator_snapshot_run_issue_ids(
snapshot: &OperatorStatusSnapshot,
hydration: RunIssueMetadataHydration,
) -> Vec<String> {
let mut issue_ids = BTreeSet::new();

for run in snapshot.active_runs.iter().chain(snapshot.recent_runs.iter()) {
for run in &snapshot.active_runs {
append_operator_run_issue_id(&mut issue_ids, run);
}
for lane in &snapshot.history_lanes {
append_operator_run_issue_id(&mut issue_ids, &lane.latest_run);

for attempt in &lane.attempts {
append_operator_run_issue_id(&mut issue_ids, attempt);
if matches!(hydration, RunIssueMetadataHydration::AllRows) {
for run in &snapshot.recent_runs {
append_operator_run_issue_id(&mut issue_ids, run);
}
for lane in &snapshot.history_lanes {
append_operator_run_issue_id(&mut issue_ids, &lane.latest_run);

for attempt in &lane.attempts {
append_operator_run_issue_id(&mut issue_ids, attempt);
}
}
}

Expand Down Expand Up @@ -3669,6 +3704,25 @@ fn recover_runtime_state_from_tracker_and_worktrees<T>(
workflow: &WorkflowDocument,
state_store: &StateStore,
) -> crate::prelude::Result<RecoveredRuntimeState>
where
T: IssueTracker,
{
recover_runtime_state_from_tracker_and_worktrees_with_skip_cache(
tracker,
project,
workflow,
state_store,
None,
)
}

fn recover_runtime_state_from_tracker_and_worktrees_with_skip_cache<T>(
tracker: &T,
project: &ServiceConfig,
workflow: &WorkflowDocument,
state_store: &StateStore,
mut recoverable_worktree_skip_cache: Option<&mut RecoverableWorktreeSkipCache>,
) -> crate::prelude::Result<RecoveredRuntimeState>
where
T: IssueTracker,
{
Expand All @@ -3686,7 +3740,11 @@ where
}
}

let mut issues = tracker.refresh_issues(&issue_ids)?;
let mut issues = if issue_ids.is_empty() && recoverable_worktree_skip_cache.is_some() {
Vec::new()
} else {
tracker.refresh_issues(&issue_ids)?
};
let mut known_identifiers =
issues.iter().map(|issue| issue.identifier.to_ascii_uppercase()).collect::<BTreeSet<_>>();

Expand All @@ -3697,6 +3755,7 @@ where
&issue_identifier,
&mut known_identifiers,
&mut issues,
recoverable_worktree_skip_cache.as_deref_mut(),
)?;
}

Expand Down Expand Up @@ -3824,6 +3883,7 @@ fn append_recoverable_tracker_issue<T>(
issue_identifier: &str,
known_identifiers: &mut BTreeSet<String>,
issues: &mut Vec<TrackerIssue>,
mut recoverable_worktree_skip_cache: Option<&mut RecoverableWorktreeSkipCache>,
) -> crate::prelude::Result<()>
where
T: IssueTracker,
Expand All @@ -3834,7 +3894,24 @@ where
return Ok(());
}

let now = Instant::now();

if let Some(cache) = recoverable_worktree_skip_cache.as_deref_mut()
&& cache.is_suppressed(&canonical_identifier, now)
{
tracing::debug!(
issue = canonical_identifier,
"Skipped retained worktree tracker lookup because a recent recovery probe already found no service ownership."
);

return Ok(());
}

let Some(issue) = tracker.get_issue_by_identifier(issue_identifier)? else {
if let Some(cache) = recoverable_worktree_skip_cache {
cache.remember(&canonical_identifier, now);
}

return Ok(());
};

Expand All @@ -3846,6 +3923,10 @@ where
"Skipping retained worktree recovery because the tracker issue is not explicitly owned by this service."
);

if let Some(cache) = recoverable_worktree_skip_cache {
cache.remember(&canonical_identifier, now);
}

return Ok(());
}

Expand Down
8 changes: 8 additions & 0 deletions apps/decodex/src/orchestrator/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ struct FakeTracker {
identifier_lookup_issues: Option<Vec<TrackerIssue>>,
issues_by_label: HashMap<String, Vec<TrackerIssue>>,
team_label_ids_by_name: HashMap<(String, String), String>,
identifier_queries: RefCell<Vec<String>>,
refresh_snapshots: RefCell<Vec<Vec<TrackerIssue>>>,
refresh_error: RefCell<Option<String>>,
refresh_queries: RefCell<Vec<Vec<String>>>,
label_queries: RefCell<Vec<String>>,
comment_queries: RefCell<Vec<String>>,
comments: RefCell<Vec<String>>,
Expand Down Expand Up @@ -168,8 +170,10 @@ impl FakeTracker {
identifier_lookup_issues: None,
issues_by_label: HashMap::new(),
team_label_ids_by_name: HashMap::new(),
identifier_queries: RefCell::new(Vec::new()),
refresh_snapshots: RefCell::new(refresh_snapshots),
refresh_error: RefCell::new(None),
refresh_queries: RefCell::new(Vec::new()),
label_queries: RefCell::new(Vec::new()),
comment_queries: RefCell::new(Vec::new()),
comments: RefCell::new(Vec::new()),
Expand Down Expand Up @@ -258,6 +262,8 @@ impl IssueTracker for FakeTracker {
}

fn get_issue_by_identifier(&self, issue_identifier: &str) -> Result<Option<TrackerIssue>> {
self.identifier_queries.borrow_mut().push(issue_identifier.to_owned());

let issues = self.identifier_lookup_issues.as_ref().unwrap_or(&self.listed_issues);

Ok(issues
Expand All @@ -267,6 +273,8 @@ impl IssueTracker for FakeTracker {
}

fn refresh_issues(&self, issue_ids: &[String]) -> Result<Vec<TrackerIssue>> {
self.refresh_queries.borrow_mut().push(issue_ids.to_vec());

if let Some(message) = self.refresh_error.borrow_mut().take() {
return Err(Report::msg(message));
}
Expand Down
Loading