From 84c2847fe86b81f6862dc8bb36b6ce562327b950 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Wed, 3 Jun 2026 13:03:05 +0800 Subject: [PATCH] {"schema":"decodex/commit/1","summary":"Persist review-policy checkpoints in runtime state","authority":"XY-707"} --- .../src/agent/tracker_tool_bridge/review.rs | 149 ++++++++++--- .../src/agent/tracker_tool_bridge/tests.rs | 57 ++++- .../tests/review/policy.rs | 103 ++++++--- .../src/agent/tracker_tool_bridge/tools.rs | 53 +++-- apps/decodex/src/state/internal.rs | 203 +++++++++++++++++- apps/decodex/src/state/models.rs | 57 +++++ apps/decodex/src/state/store.rs | 154 +++++++++++++ apps/decodex/src/state/tests.rs | 158 +++++++++++--- docs/reference/operator-control-plane.md | 11 +- docs/spec/owned-lane-policy.md | 6 +- docs/spec/runtime.md | 15 +- 11 files changed, 842 insertions(+), 124 deletions(-) diff --git a/apps/decodex/src/agent/tracker_tool_bridge/review.rs b/apps/decodex/src/agent/tracker_tool_bridge/review.rs index e8fc7ec1..11d1fc07 100644 --- a/apps/decodex/src/agent/tracker_tool_bridge/review.rs +++ b/apps/decodex/src/agent/tracker_tool_bridge/review.rs @@ -13,7 +13,10 @@ use crate::{ TrackerToolBridge, }, prelude::eyre, - state::{self, ReviewHandoffMarker, ReviewOrchestrationMarker}, + state::{ + self, ReviewHandoffMarker, ReviewOrchestrationMarker, ReviewPolicyCheckpoint, + ReviewPolicyCheckpointInput, StateStore, + }, tracker::{ self, TrackerIssue, records::{self, LinearExecutionEventPublicProjection}, @@ -71,31 +74,40 @@ impl<'a> TrackerToolBridge<'a> { ) } - pub(super) fn cache_review_policy_state_best_effort( + pub(super) fn persist_review_policy_state( &self, review_context: &ReviewHandoffContext, review_policy_phase: ReviewPolicyPhase, review_policy_status: ReviewPolicyStatus, head_sha: &str, nonclean_rounds: i64, - ) { - if let Err(error) = state::write_run_review_policy_state( - &review_context.cwd, - &review_context.run_id, - review_context.attempt_number, - review_policy_phase.as_str(), - review_policy_status.as_str(), - head_sha, - nonclean_rounds, - ) { - tracing::warn!( - ?error, - issue = self.issue.identifier, - run_id = review_context.run_id, - worktree_path = %review_context.cwd.display(), - "Review policy marker write failed; continuing with runtime state." - ); - } + ) -> Result<(), String> { + let state_store = self.state_store.ok_or_else(|| { + format!( + "Runtime state store is required to persist review checkpoint state for issue `{}`.", + self.issue.identifier + ) + })?; + + state_store + .upsert_review_policy_checkpoint(ReviewPolicyCheckpointInput { + project_id: &review_context.service_id, + issue_id: &self.issue.id, + run_id: &review_context.run_id, + attempt_number: review_context.attempt_number, + phase: review_policy_phase.as_str(), + status: review_policy_status.as_str(), + head_sha, + nonclean_rounds, + }) + .map_err(|error| { + format!( + "Failed to persist review checkpoint state for issue `{}`: {error}", + self.issue.identifier + ) + })?; + + Ok(()) } pub(super) fn ensure_issue_scope(&self, scope: &ScopeArgs) -> Result<(), String> { @@ -874,37 +886,106 @@ impl<'a> TrackerToolBridge<'a> { let Some(current_phase) = ReviewPolicyPhase::for_mode(review_context.mode) else { return Ok(None); }; + let Some(state_store) = self.state_store else { + return Ok(None); + }; + + if let Some(checkpoint) = state_store.review_policy_checkpoint( + &review_context.service_id, + &self.issue.id, + &review_context.run_id, + review_context.attempt_number, + current_phase.as_str(), + )? { + return self.review_policy_state_from_checkpoint(checkpoint).map(Some); + } + + self.migrate_legacy_review_policy_marker(review_context, state_store, current_phase) + } + + fn review_policy_state_from_checkpoint( + &self, + checkpoint: ReviewPolicyCheckpoint, + ) -> crate::prelude::Result { + let phase = + ReviewPolicyPhase::parse(checkpoint.phase()).map_err(|error| eyre::eyre!(error))?; + let status = + ReviewPolicyStatus::parse(checkpoint.status()).map_err(|error| eyre::eyre!(error))?; + + Ok(ReviewPolicyState { + phase, + status, + head_sha: checkpoint.head_sha().to_owned(), + nonclean_rounds: checkpoint.nonclean_rounds(), + }) + } + + fn migrate_legacy_review_policy_marker( + &self, + review_context: &ReviewHandoffContext, + state_store: &StateStore, + current_phase: ReviewPolicyPhase, + ) -> crate::prelude::Result> { let Some(marker) = state::read_run_activity_marker_snapshot(&review_context.cwd)? else { return Ok(None); }; + + if marker.run_id() != review_context.run_id + || marker.attempt_number() != review_context.attempt_number + { + return Ok(None); + } + let Some(phase) = marker.review_policy_phase() else { return Ok(None); }; - let Some(status) = marker.review_policy_status() else { - eyre::bail!( - "Run activity marker for issue `{}` is missing `review_policy_status`.", - self.issue.identifier + let (Some(status), Some(head_sha), Some(nonclean_rounds)) = ( + marker.review_policy_status(), + marker.review_policy_head_sha(), + marker.review_policy_nonclean_rounds(), + ) else { + tracing::warn!( + issue = self.issue.identifier, + run_id = review_context.run_id, + "Legacy review policy marker is incomplete; ignoring marker policy fields." ); + + return Ok(None); }; - let Some(head_sha) = marker.review_policy_head_sha() else { - eyre::bail!( - "Run activity marker for issue `{}` is missing `review_policy_head_sha`.", - self.issue.identifier + let Ok(phase) = ReviewPolicyPhase::parse(phase) else { + tracing::warn!( + issue = self.issue.identifier, + run_id = review_context.run_id, + "Legacy review policy marker has an invalid phase; ignoring marker policy fields." ); + + return Ok(None); }; - let Some(nonclean_rounds) = marker.review_policy_nonclean_rounds() else { - eyre::bail!( - "Run activity marker for issue `{}` is missing `review_policy_nonclean_rounds`.", - self.issue.identifier + let Ok(status) = ReviewPolicyStatus::parse(status) else { + tracing::warn!( + issue = self.issue.identifier, + run_id = review_context.run_id, + "Legacy review policy marker has an invalid status; ignoring marker policy fields." ); + + return Ok(None); }; - let phase = ReviewPolicyPhase::parse(phase).map_err(|error| eyre::eyre!(error))?; - let status = ReviewPolicyStatus::parse(status).map_err(|error| eyre::eyre!(error))?; if phase != current_phase { return Ok(None); } + state_store.upsert_review_policy_checkpoint(ReviewPolicyCheckpointInput { + project_id: &review_context.service_id, + issue_id: &self.issue.id, + run_id: &review_context.run_id, + attempt_number: review_context.attempt_number, + phase: phase.as_str(), + status: status.as_str(), + head_sha, + nonclean_rounds, + })?; + Ok(Some(ReviewPolicyState { phase, status, diff --git a/apps/decodex/src/agent/tracker_tool_bridge/tests.rs b/apps/decodex/src/agent/tracker_tool_bridge/tests.rs index a57cdfa9..268bc8c5 100644 --- a/apps/decodex/src/agent/tracker_tool_bridge/tests.rs +++ b/apps/decodex/src/agent/tracker_tool_bridge/tests.rs @@ -25,7 +25,9 @@ use crate::{ }, config::InternalReviewMode, prelude::eyre, - state::{self, ReviewHandoffMarker, ReviewOrchestrationMarker, StateStore}, + state::{ + self, ReviewHandoffMarker, ReviewOrchestrationMarker, ReviewPolicyCheckpoint, StateStore, + }, tracker::{ IssueTracker, TrackerComment, TrackerIssue, TrackerLabel, TrackerState, TrackerTeam, privacy_classifier::{ @@ -575,6 +577,59 @@ fn bridge_state_store<'a>(bridge: &TrackerToolBridge<'a>) -> &'a StateStore { bridge.state_store.expect("test bridge should have a runtime state store") } +fn persisted_review_policy_checkpoint( + bridge: &TrackerToolBridge<'_>, + issue: &TrackerIssue, + review_context: &ReviewHandoffContext, +) -> ReviewPolicyCheckpoint { + let phase = match review_context.mode { + ReviewExecutionMode::Handoff => "handoff", + ReviewExecutionMode::Repair => "repair", + ReviewExecutionMode::Closeout => { + panic!("closeout does not support review checkpoints") + }, + }; + + bridge_state_store(bridge) + .review_policy_checkpoint( + &review_context.service_id, + &issue.id, + &review_context.run_id, + review_context.attempt_number, + phase, + ) + .expect("review policy checkpoint should read") + .expect("review policy checkpoint should exist") +} + +fn assert_review_policy_checkpoint_cleared( + bridge: &TrackerToolBridge<'_>, + issue: &TrackerIssue, + review_context: &ReviewHandoffContext, +) { + let phase = match review_context.mode { + ReviewExecutionMode::Handoff => "handoff", + ReviewExecutionMode::Repair => "repair", + ReviewExecutionMode::Closeout => { + panic!("closeout does not support review checkpoints") + }, + }; + + assert!( + bridge_state_store(bridge) + .review_policy_checkpoint( + &review_context.service_id, + &issue.id, + &review_context.run_id, + review_context.attempt_number, + phase, + ) + .expect("review policy checkpoint should read") + .is_none(), + "review policy checkpoint should be cleared after completion" + ); +} + fn persisted_review_handoff_marker( bridge: &TrackerToolBridge<'_>, issue: &TrackerIssue, diff --git a/apps/decodex/src/agent/tracker_tool_bridge/tests/review/policy.rs b/apps/decodex/src/agent/tracker_tool_bridge/tests/review/policy.rs index 750bb9d3..1f2fbb7c 100644 --- a/apps/decodex/src/agent/tracker_tool_bridge/tests/review/policy.rs +++ b/apps/decodex/src/agent/tracker_tool_bridge/tests/review/policy.rs @@ -1,3 +1,5 @@ +use state::ReviewPolicyCheckpointInput; + fn sample_review_repair_apply_inspectors( pr_url: &str, ) -> (FakePullRequestInspector, FakeLocalRepoInspector) { @@ -145,6 +147,8 @@ fn records_review_handoff_and_applies_it_after_validation() { assert!(response.success); + assert_review_policy_checkpoint_cleared(&bridge, &issue, &review_context); + bridge.apply_review_handoff().expect("review handoff should apply"); assert_eq!(tracker.state_updates.borrow().as_slice(), ["state-review"]); @@ -428,7 +432,7 @@ fn review_checkpoint_normalizes_matching_short_head_sha_to_full_head() { &tracker, &issue, &workflow, - review_context, + review_context.clone(), &pull_request_inspector, &local_repo_inspector, ); @@ -445,11 +449,9 @@ fn review_checkpoint_normalizes_matching_short_head_sha_to_full_head() { assert!(response.success); assert!(tracker.comments.borrow().is_empty()); - let marker = state::read_run_activity_marker_snapshot(temp_dir.path()) - .expect("marker snapshot should load") - .expect("marker snapshot should exist"); + let checkpoint = persisted_review_policy_checkpoint(&bridge, &issue, &review_context); - assert_eq!(marker.review_policy_head_sha(), Some(sample_local_repo().head_oid.as_str())); + assert_eq!(checkpoint.head_sha(), sample_local_repo().head_oid.as_str()); } #[test] @@ -492,13 +494,11 @@ fn review_checkpoint_findings_continue_until_budget_then_stop() { TurnCompletionStatus::Continue ); - let marker = state::read_run_activity_marker_snapshot(temp_dir.path()) - .expect("marker snapshot should load") - .expect("marker snapshot should exist"); + let checkpoint = persisted_review_policy_checkpoint(&bridge, &issue, &review_context); - assert_eq!(marker.review_policy_phase(), Some("handoff")); - assert_eq!(marker.review_policy_status(), Some("findings")); - assert_eq!(marker.review_policy_nonclean_rounds(), Some(expected_round)); + assert_eq!(checkpoint.phase(), "handoff"); + assert_eq!(checkpoint.status(), "findings"); + assert_eq!(checkpoint.nonclean_rounds(), expected_round); } let response = DynamicToolHandler::handle_call( @@ -542,7 +542,7 @@ fn review_checkpoint_clean_resets_nonclean_rounds_before_next_findings() { &tracker, &issue, &workflow, - review_context, + review_context.clone(), &pull_request_inspector, &local_repo_inspector, ); @@ -561,12 +561,10 @@ fn review_checkpoint_clean_resets_nonclean_rounds_before_next_findings() { assert!(response.success); } - let marker = state::read_run_activity_marker_snapshot(temp_dir.path()) - .expect("marker snapshot should load") - .expect("marker snapshot should exist"); + let checkpoint = persisted_review_policy_checkpoint(&bridge, &issue, &review_context); - assert_eq!(marker.review_policy_status(), Some("findings")); - assert_eq!(marker.review_policy_nonclean_rounds(), Some(1)); + assert_eq!(checkpoint.status(), "findings"); + assert_eq!(checkpoint.nonclean_rounds(), 1); assert_eq!( DynamicToolHandler::classify_turn_completion(&bridge, "continue") .expect("findings after a clean checkpoint should continue"), @@ -588,7 +586,7 @@ fn review_checkpoint_does_not_depend_on_tracker_comment_write() { &tracker, &issue, &workflow, - review_context, + review_context.clone(), &pull_request_inspector, &local_repo_inspector, ); @@ -605,11 +603,9 @@ fn review_checkpoint_does_not_depend_on_tracker_comment_write() { assert!(response.success); assert!(tracker.comments.borrow().is_empty()); - let marker = state::read_run_activity_marker_snapshot(temp_dir.path()) - .expect("marker snapshot should load") - .expect("marker snapshot should exist"); + let checkpoint = persisted_review_policy_checkpoint(&bridge, &issue, &review_context); - assert_eq!(marker.review_policy_nonclean_rounds(), Some(1)); + assert_eq!(checkpoint.nonclean_rounds(), 1); } #[test] @@ -679,7 +675,7 @@ fn review_checkpoint_phase_switch_resets_nonclean_rounds() { &tracker, &issue, &workflow, - repair_context, + repair_context.clone(), &pull_request_inspector, &local_repo_inspector, ); @@ -695,12 +691,10 @@ fn review_checkpoint_phase_switch_resets_nonclean_rounds() { assert!(response.success); - let marker = state::read_run_activity_marker_snapshot(temp_dir.path()) - .expect("marker snapshot should load") - .expect("marker snapshot should exist"); + let checkpoint = persisted_review_policy_checkpoint(&bridge, &issue, &repair_context); - assert_eq!(marker.review_policy_phase(), Some("repair")); - assert_eq!(marker.review_policy_nonclean_rounds(), Some(1)); + assert_eq!(checkpoint.phase(), "repair"); + assert_eq!(checkpoint.nonclean_rounds(), 1); } #[test] @@ -740,6 +734,57 @@ fn stale_review_checkpoint_for_old_head_does_not_stop_new_head() { ); } +#[test] +fn runtime_review_checkpoint_wins_over_stale_marker_policy_state() { + let temp_dir = TempDir::new().expect("tempdir should create"); + let tracker = FakeTracker::new(); + let issue = sample_issue(); + let workflow = sample_workflow(); + let inspector = FakePullRequestInspector::new(vec![Ok(sample_pull_request())]); + let local_repo_inspector = + FakeLocalRepoInspector::new(vec![Ok(sample_local_repo()), Ok(sample_local_repo())]); + let review_context = sample_review_context_in(temp_dir.path()); + let bridge = TrackerToolBridge::with_review_handoff_for_test( + &tracker, + &issue, + &workflow, + review_context.clone(), + &inspector, + &local_repo_inspector, + ); + + bridge_state_store(&bridge) + .upsert_review_policy_checkpoint(ReviewPolicyCheckpointInput { + project_id: &review_context.service_id, + issue_id: &issue.id, + run_id: &review_context.run_id, + attempt_number: review_context.attempt_number, + phase: "handoff", + status: "clean", + head_sha: &sample_local_repo().head_oid, + nonclean_rounds: 0, + }) + .expect("runtime review checkpoint should persist"); + write_review_policy_checkpoint( + &review_context, + "handoff", + "blocked", + &sample_local_repo().head_oid, + 0, + ); + + let response = DynamicToolHandler::handle_call( + &bridge, + ISSUE_REVIEW_HANDOFF_TOOL_NAME, + serde_json::json!({ + "pr_url": "https://github.com/hack-ink/decodex/pull/48", + "summary": "Ready for review." + }), + ); + + assert!(response.success, "runtime store checkpoint should override stale marker state"); +} + #[test] fn review_handoff_requires_a_clean_checkpoint() { let temp_dir = TempDir::new().expect("tempdir should create"); @@ -1111,6 +1156,8 @@ fn review_repair_apply_persists_updated_handoff_marker_without_tracker_transitio assert!(response.success); assert!(finalize_response.success); + assert_review_policy_checkpoint_cleared(&bridge, &issue, &review_context); + DynamicToolHandler::validate_turn_completion(&bridge, "done") .expect("review repair completion should allow the turn to complete"); diff --git a/apps/decodex/src/agent/tracker_tool_bridge/tools.rs b/apps/decodex/src/agent/tracker_tool_bridge/tools.rs index 8b05e572..6b2b723b 100644 --- a/apps/decodex/src/agent/tracker_tool_bridge/tools.rs +++ b/apps/decodex/src/agent/tracker_tool_bridge/tools.rs @@ -894,13 +894,15 @@ impl<'a> TrackerToolBridge<'a> { Err(error) => return DynamicToolCallResponse::failure(error), }; - self.cache_review_policy_state_best_effort( + if let Err(error) = self.persist_review_policy_state( review_context, review_policy_phase, review_policy_status, &head_sha, nonclean_rounds, - ); + ) { + return DynamicToolCallResponse::failure(error); + } let message = self.review_checkpoint_success_message( review_policy_phase, @@ -980,25 +982,38 @@ impl<'a> TrackerToolBridge<'a> { review_context: &ReviewHandoffContext, tool_name: &str, ) -> Result<(), String> { - match state::clear_run_review_policy_state(&review_context.cwd) { - Ok(()) => Ok(()), - Err(error) if review_context.internal_review_checkpoint_enabled() => Err(format!( - "Failed to clear review policy state for issue `{}` after recording `{tool_name}`: {error}", + if let Some(state_store) = self.state_store { + state_store + .clear_review_policy_checkpoints_for_run_attempt( + &review_context.service_id, + &self.issue.id, + &review_context.run_id, + review_context.attempt_number, + ) + .map_err(|error| { + format!( + "Failed to clear review policy state for issue `{}` after recording `{tool_name}`: {error}", + self.issue.identifier + ) + })?; + } else if review_context.internal_review_checkpoint_enabled() { + return Err(format!( + "Runtime state store is required to clear review policy state for issue `{}` after recording `{tool_name}`.", self.issue.identifier - )), - Err(error) => { - tracing::warn!( - ?error, - issue = self.issue.identifier, - run_id = review_context.run_id, - tool_name, - worktree_path = %review_context.cwd.display(), - "Review policy state clear failed while internal review is disabled; continuing." - ); - - Ok(()) - }, + )); + } + if let Err(error) = state::clear_run_review_policy_state(&review_context.cwd) { + tracing::warn!( + ?error, + issue = self.issue.identifier, + run_id = review_context.run_id, + tool_name, + worktree_path = %review_context.cwd.display(), + "Legacy review policy marker clear failed; continuing after runtime state clear." + ); } + + Ok(()) } pub(super) fn handle_review_handoff(&self, arguments: Value) -> DynamicToolCallResponse { diff --git a/apps/decodex/src/state/internal.rs b/apps/decodex/src/state/internal.rs index ad6d1d5e..328883e9 100644 --- a/apps/decodex/src/state/internal.rs +++ b/apps/decodex/src/state/internal.rs @@ -129,6 +129,7 @@ struct StateData { private_execution_events: Vec, review_handoffs: HashMap, review_orchestrations: HashMap, + review_policy_checkpoints: HashMap, connector_backoffs: HashMap<(String, String), ConnectorBackoff>, dispatch_slot_configs: HashMap, issue_claim_guards: HashMap, @@ -147,6 +148,7 @@ impl StateData { self.private_execution_events = loaded.private_execution_events; self.review_handoffs = loaded.review_handoffs; self.review_orchestrations = loaded.review_orchestrations; + self.review_policy_checkpoints = loaded.review_policy_checkpoints; self.connector_backoffs = loaded.connector_backoffs; } @@ -343,6 +345,20 @@ CREATE TABLE IF NOT EXISTS linear_execution_events ( ); CREATE INDEX IF NOT EXISTS linear_execution_events_issue_idx ON linear_execution_events (service_id, issue_id, event_unix, recorded_at_unix); +"#, + )?; + self.bootstrap_review_schema()?; + self.bootstrap_run_control_channels_schema()?; + self.bootstrap_connector_backoffs_schema()?; + self.bootstrap_private_execution_events_schema()?; + self.record_schema_version()?; + + Ok(()) + } + + fn bootstrap_review_schema(&self) -> Result<()> { + self.connection.execute_batch( + r#" CREATE TABLE IF NOT EXISTS review_handoffs ( project_id TEXT NOT NULL, issue_id TEXT NOT NULL, @@ -376,12 +392,21 @@ CREATE TABLE IF NOT EXISTS review_orchestrations ( updated_at_unix INTEGER NOT NULL, PRIMARY KEY (project_id, issue_id, branch_name, run_id, attempt_number) ); +CREATE TABLE IF NOT EXISTS review_policy_checkpoints ( + project_id TEXT NOT NULL, + issue_id TEXT NOT NULL, + run_id TEXT NOT NULL, + attempt_number INTEGER NOT NULL, + phase TEXT NOT NULL, + status TEXT NOT NULL, + head_sha TEXT NOT NULL, + nonclean_rounds INTEGER NOT NULL, + updated_at TEXT NOT NULL, + updated_at_unix INTEGER NOT NULL, + PRIMARY KEY (project_id, issue_id, run_id, attempt_number, phase) +); "#, )?; - self.bootstrap_run_control_channels_schema()?; - self.bootstrap_connector_backoffs_schema()?; - self.bootstrap_private_execution_events_schema()?; - self.record_schema_version()?; Ok(()) } @@ -463,7 +488,7 @@ CREATE TABLE IF NOT EXISTS schema_meta ( value TEXT NOT NULL ); INSERT INTO schema_meta (key, value) -VALUES ('schema_version', '7') +VALUES ('schema_version', '8') ON CONFLICT(key) DO UPDATE SET value = excluded.value; "#, )?; @@ -484,6 +509,7 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; self.load_private_execution_events(&mut state)?; self.load_review_handoffs(&mut state)?; self.load_review_orchestrations(&mut state)?; + self.load_review_policy_checkpoints(&mut state)?; self.load_connector_backoffs(&mut state)?; Ok(state) @@ -522,6 +548,7 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; persist_private_execution_events(&transaction, state)?; persist_review_handoffs(&transaction, state)?; persist_review_orchestrations(&transaction, state)?; + persist_review_policy_checkpoints(&transaction, state)?; persist_connector_backoffs(&transaction, state)?; transaction.commit()?; @@ -757,6 +784,20 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; "UPDATE private_execution_events SET issue_id = ?2 WHERE issue_id = ?1", params![previous_issue_id, canonical_issue_id], )?; + transaction.execute( + "INSERT OR IGNORE INTO review_policy_checkpoints ( + project_id, issue_id, run_id, attempt_number, phase, status, head_sha, + nonclean_rounds, updated_at, updated_at_unix + ) + SELECT project_id, ?2, run_id, attempt_number, phase, status, head_sha, + nonclean_rounds, updated_at, updated_at_unix + FROM review_policy_checkpoints WHERE issue_id = ?1", + params![previous_issue_id, canonical_issue_id], + )?; + transaction.execute( + "DELETE FROM review_policy_checkpoints WHERE issue_id = ?1", + params![previous_issue_id], + )?; transaction.execute( "INSERT OR IGNORE INTO review_handoffs ( project_id, issue_id, branch_name, run_id, attempt_number, pr_url, @@ -803,6 +844,10 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; "DELETE FROM review_orchestrations WHERE issue_id = ?1", params![issue_id], )?; + transaction.execute( + "DELETE FROM review_policy_checkpoints WHERE issue_id = ?1", + params![issue_id], + )?; transaction.commit()?; Ok(()) @@ -816,6 +861,10 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; "DELETE FROM review_orchestrations WHERE issue_id = ?1", params![issue_id], )?; + transaction.execute( + "DELETE FROM review_policy_checkpoints WHERE issue_id = ?1", + params![issue_id], + )?; transaction.commit()?; Ok(()) @@ -842,11 +891,32 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; AND run_id = ?4 AND attempt_number = ?5", params![project_id, issue_id, branch_name, run_id, attempt_number], )?; + transaction.execute( + "DELETE FROM review_policy_checkpoints + WHERE project_id = ?1 AND issue_id = ?2 AND run_id = ?3 AND attempt_number = ?4", + params![project_id, issue_id, run_id, attempt_number], + )?; transaction.commit()?; Ok(()) } + fn delete_review_policy_checkpoints_for_run_attempt( + &mut self, + project_id: &str, + issue_id: &str, + run_id: &str, + attempt_number: i64, + ) -> Result<()> { + self.connection.execute( + "DELETE FROM review_policy_checkpoints + WHERE project_id = ?1 AND issue_id = ?2 AND run_id = ?3 AND attempt_number = ?4", + params![project_id, issue_id, run_id, attempt_number], + )?; + + Ok(()) + } + fn load_projects(&self, state: &mut StateData) -> Result<()> { let mut statement = self.connection.prepare( "SELECT service_id, config_path, repo_root, worktree_root, workflow_path, \ @@ -1437,6 +1507,44 @@ ON CONFLICT(key) DO UPDATE SET value = excluded.value; Ok(()) } + fn load_review_policy_checkpoints(&self, state: &mut StateData) -> Result<()> { + let mut statement = self.connection.prepare( + "SELECT project_id, issue_id, run_id, attempt_number, phase, status, head_sha, \ + nonclean_rounds, updated_at, updated_at_unix FROM review_policy_checkpoints", + )?; + let rows = statement.query_map([], |row| { + let project_id: String = row.get(0)?; + let issue_id: String = row.get(1)?; + let run_id: String = row.get(2)?; + let attempt_number: i64 = row.get(3)?; + let phase: String = row.get(4)?; + + Ok(( + ReviewPolicyKey::new(&project_id, &issue_id, &run_id, attempt_number, &phase), + ReviewPolicyRuntimeRecord { + project_id, + issue_id, + run_id, + attempt_number, + phase, + status: row.get(5)?, + head_sha: row.get(6)?, + nonclean_rounds: row.get(7)?, + updated_at: row.get(8)?, + updated_at_unix: row.get(9)?, + }, + )) + })?; + + for row in rows { + let (key, record) = row?; + + state.review_policy_checkpoints.insert(key, record); + } + + Ok(()) + } + fn load_connector_backoffs(&self, state: &mut StateData) -> Result<()> { let mut statement = self.connection.prepare( "SELECT project_id, connector, sync_phase, quota_class, reset_unix_epoch, \ @@ -1662,6 +1770,32 @@ impl ReviewOrchestrationKey { } } +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +struct ReviewPolicyKey { + project_id: String, + issue_id: String, + run_id: String, + attempt_number: i64, + phase: String, +} +impl ReviewPolicyKey { + fn new( + project_id: &str, + issue_id: &str, + run_id: &str, + attempt_number: i64, + phase: &str, + ) -> Self { + Self { + project_id: project_id.to_owned(), + issue_id: issue_id.to_owned(), + run_id: run_id.to_owned(), + attempt_number, + phase: phase.to_owned(), + } + } +} + #[derive(Clone, Debug)] struct ReviewHandoffRuntimeRecord { project_id: String, @@ -1684,6 +1818,36 @@ struct ReviewOrchestrationRuntimeRecord { updated_at_unix: i64, } +#[derive(Clone, Debug)] +struct ReviewPolicyRuntimeRecord { + project_id: String, + issue_id: String, + run_id: String, + attempt_number: i64, + phase: String, + status: String, + head_sha: String, + nonclean_rounds: i64, + updated_at: String, + updated_at_unix: i64, +} +impl ReviewPolicyRuntimeRecord { + fn as_public(&self) -> ReviewPolicyCheckpoint { + ReviewPolicyCheckpoint { + project_id: self.project_id.clone(), + issue_id: self.issue_id.clone(), + run_id: self.run_id.clone(), + attempt_number: self.attempt_number, + phase: self.phase.clone(), + status: self.status.clone(), + head_sha: self.head_sha.clone(), + nonclean_rounds: self.nonclean_rounds, + updated_at: self.updated_at.clone(), + updated_at_unix: self.updated_at_unix, + } + } +} + #[derive(Clone, Default)] struct RunActivityMarkerRecord { run_id: Option, @@ -2047,6 +2211,7 @@ pub(crate) fn clear_run_retry_schedule(worktree_path: &Path) -> Result<()> { Ok(()) } +#[cfg_attr(not(test), allow(dead_code))] pub(crate) fn write_run_review_policy_state( worktree_path: &Path, run_id: &str, @@ -2585,6 +2750,34 @@ fn persist_review_orchestrations(transaction: &Transaction<'_>, state: &StateDat Ok(()) } +fn persist_review_policy_checkpoints( + transaction: &Transaction<'_>, + state: &StateData, +) -> Result<()> { + for record in state.review_policy_checkpoints.values() { + transaction.execute( + "INSERT OR REPLACE INTO review_policy_checkpoints ( + project_id, issue_id, run_id, attempt_number, phase, status, head_sha, + nonclean_rounds, updated_at, updated_at_unix + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", + params![ + record.project_id, + record.issue_id, + record.run_id, + record.attempt_number, + record.phase, + record.status, + record.head_sha, + record.nonclean_rounds, + record.updated_at, + record.updated_at_unix, + ], + )?; + } + + Ok(()) +} + fn persist_connector_backoffs(transaction: &Transaction<'_>, state: &StateData) -> Result<()> { for record in state.connector_backoffs.values() { transaction.execute( diff --git a/apps/decodex/src/state/models.rs b/apps/decodex/src/state/models.rs index a54a4d16..15ab7051 100644 --- a/apps/decodex/src/state/models.rs +++ b/apps/decodex/src/state/models.rs @@ -492,6 +492,63 @@ pub struct PreacquiredLeaseGuards { pub dispatch_slot_index: usize, } +/// Latest runtime-owned review-policy checkpoint for one run phase. +#[derive(Clone, Debug, Eq, PartialEq)] +pub(crate) struct ReviewPolicyCheckpoint { + project_id: String, + issue_id: String, + run_id: String, + attempt_number: i64, + phase: String, + status: String, + head_sha: String, + nonclean_rounds: i64, + updated_at: String, + updated_at_unix: i64, +} +#[cfg_attr(not(test), allow(dead_code))] +impl ReviewPolicyCheckpoint { + pub(crate) fn project_id(&self) -> &str { + &self.project_id + } + + pub(crate) fn issue_id(&self) -> &str { + &self.issue_id + } + + pub(crate) fn run_id(&self) -> &str { + &self.run_id + } + + pub(crate) fn attempt_number(&self) -> i64 { + self.attempt_number + } + + pub(crate) fn phase(&self) -> &str { + &self.phase + } + + pub(crate) fn status(&self) -> &str { + &self.status + } + + pub(crate) fn head_sha(&self) -> &str { + &self.head_sha + } + + pub(crate) fn nonclean_rounds(&self) -> i64 { + self.nonclean_rounds + } + + pub(crate) fn updated_at(&self) -> &str { + &self.updated_at + } + + pub(crate) fn updated_at_unix(&self) -> i64 { + self.updated_at_unix + } +} + /// Foundation request for resolving a local run-control action. #[derive(Clone, Copy, Debug, Eq, PartialEq)] #[cfg_attr(not(test), allow(dead_code))] diff --git a/apps/decodex/src/state/store.rs b/apps/decodex/src/state/store.rs index 4c6f92f0..14eed24c 100644 --- a/apps/decodex/src/state/store.rs +++ b/apps/decodex/src/state/store.rs @@ -11,6 +11,18 @@ pub(crate) struct ConnectorBackoffInput<'a> { pub(crate) warning: &'a str, } +/// Input fields for recording the latest review-policy checkpoint. +pub(crate) struct ReviewPolicyCheckpointInput<'a> { + pub(crate) project_id: &'a str, + pub(crate) issue_id: &'a str, + pub(crate) run_id: &'a str, + pub(crate) attempt_number: i64, + pub(crate) phase: &'a str, + pub(crate) status: &'a str, + pub(crate) head_sha: &'a str, + pub(crate) nonclean_rounds: i64, +} + /// Local runtime store for leases, attempts, worktrees, protocol events, and private evidence. #[derive(Default)] pub struct StateStore { @@ -209,6 +221,11 @@ impl StateStore { previous_issue_id, canonical_issue_id, ); + retarget_review_policy_issue( + &mut state.review_policy_checkpoints, + previous_issue_id, + canonical_issue_id, + ); if let Some(guard) = state.issue_claim_guards.remove(previous_issue_id) { state.issue_claim_guards.entry(canonical_issue_id.to_owned()).or_insert(guard); @@ -1613,6 +1630,79 @@ impl StateStore { Ok(state.review_orchestrations.get(&key).map(|record| record.marker.clone())) } + /// Create or replace the latest review-policy checkpoint for one run phase. + pub(crate) fn upsert_review_policy_checkpoint( + &self, + input: ReviewPolicyCheckpointInput<'_>, + ) -> Result { + let now = timestamp_parts(); + let key = ReviewPolicyKey::new( + input.project_id, + input.issue_id, + input.run_id, + input.attempt_number, + input.phase, + ); + let record = ReviewPolicyRuntimeRecord { + project_id: input.project_id.to_owned(), + issue_id: input.issue_id.to_owned(), + run_id: input.run_id.to_owned(), + attempt_number: input.attempt_number, + phase: input.phase.to_owned(), + status: input.status.to_owned(), + head_sha: input.head_sha.to_owned(), + nonclean_rounds: input.nonclean_rounds, + updated_at: now.text, + updated_at_unix: now.unix, + }; + let mut state = self.lock()?; + + state.review_policy_checkpoints.insert(key, record.clone()); + self.persist_runtime_state_locked(&state)?; + + Ok(record.as_public()) + } + + /// Read the latest runtime-owned review-policy checkpoint for one run phase. + pub(crate) fn review_policy_checkpoint( + &self, + project_id: &str, + issue_id: &str, + run_id: &str, + attempt_number: i64, + phase: &str, + ) -> Result> { + let state = self.lock()?; + let key = ReviewPolicyKey::new(project_id, issue_id, run_id, attempt_number, phase); + + Ok(state.review_policy_checkpoints.get(&key).map(ReviewPolicyRuntimeRecord::as_public)) + } + + /// Clear review-policy checkpoints for one completed run attempt. + pub(crate) fn clear_review_policy_checkpoints_for_run_attempt( + &self, + project_id: &str, + issue_id: &str, + run_id: &str, + attempt_number: i64, + ) -> Result<()> { + let mut state = self.lock()?; + + state.review_policy_checkpoints.retain(|key, _record| { + key.project_id != project_id + || key.issue_id != issue_id + || key.run_id != run_id + || key.attempt_number != attempt_number + }); + + self.delete_review_policy_checkpoints_for_run_attempt_locked( + project_id, + issue_id, + run_id, + attempt_number, + ) + } + /// Remove retained review markers for one issue without clearing its worktree mapping. pub(crate) fn clear_review_markers(&self, issue_id: &str) -> Result<()> { let mut state = self.lock()?; @@ -1621,6 +1711,9 @@ impl StateStore { state .review_orchestrations .retain(|key, _record| key.issue_id != issue_id); + state + .review_policy_checkpoints + .retain(|key, _record| key.issue_id != issue_id); self.persist_runtime_state_locked(&state)?; self.delete_review_markers_locked(issue_id) @@ -1646,6 +1739,12 @@ impl StateStore { state.review_handoffs.remove(&handoff_key); state.review_orchestrations.remove(&orchestration_key); + state.review_policy_checkpoints.retain(|key, _record| { + key.project_id != project_id + || key.issue_id != issue_id + || key.run_id != orchestration_marker.run_id() + || key.attempt_number != orchestration_marker.attempt_number() + }); self.persist_runtime_state_locked(&state)?; self.delete_review_marker_identity_locked( @@ -1691,6 +1790,9 @@ impl StateStore { state .review_orchestrations .retain(|key, _record| key.issue_id != issue_id); + state + .review_policy_checkpoints + .retain(|key, _record| key.issue_id != issue_id); self.persist_runtime_state_locked(&state)?; self.delete_worktree_and_review_markers_locked(issue_id) @@ -1974,6 +2076,28 @@ impl StateStore { attempt_number, ) } + + fn delete_review_policy_checkpoints_for_run_attempt_locked( + &self, + project_id: &str, + issue_id: &str, + run_id: &str, + attempt_number: i64, + ) -> Result<()> { + let Some(sqlite) = self.sqlite.as_ref() else { + return Ok(()); + }; + let mut sqlite = sqlite + .lock() + .map_err(|_| eyre::eyre!("StateStore SQLite mutex is poisoned."))?; + + sqlite.delete_review_policy_checkpoints_for_run_attempt( + project_id, + issue_id, + run_id, + attempt_number, + ) + } } #[cfg_attr(not(test), allow(dead_code))] @@ -2074,6 +2198,36 @@ fn retarget_review_handoff_issue( } } +fn retarget_review_policy_issue( + records: &mut HashMap, + previous_issue_id: &str, + canonical_issue_id: &str, +) { + let previous_keys = records + .keys() + .filter(|key| key.issue_id == previous_issue_id) + .cloned() + .collect::>(); + + for key in previous_keys { + let Some(mut record) = records.remove(&key) else { + continue; + }; + + record.issue_id = canonical_issue_id.to_owned(); + + records + .entry(ReviewPolicyKey::new( + &key.project_id, + canonical_issue_id, + &key.run_id, + key.attempt_number, + &key.phase, + )) + .or_insert(record); + } +} + fn active_run_attempt_status(status: &str) -> bool { matches!(status, "starting" | "running") } diff --git a/apps/decodex/src/state/tests.rs b/apps/decodex/src/state/tests.rs index 80610a4d..a2e28c8b 100644 --- a/apps/decodex/src/state/tests.rs +++ b/apps/decodex/src/state/tests.rs @@ -17,7 +17,8 @@ use crate::{ ProjectRegistration, ProtocolActivityMarker, ProtocolActivitySummary, RUN_ACTIVITY_MARKER_FILE, RUN_CONTROL_ACTION_COMPLETED, RUN_CONTROL_ACTION_FAILED, RUN_CONTROL_ACTION_FALLBACK, RUN_CONTROL_ACTION_TIMED_OUT, RUN_OPERATION_REPO_GATE, - ReviewHandoffMarker, ReviewOrchestrationMarker, RunControlActionRequest, StateStore, + ReviewHandoffMarker, ReviewOrchestrationMarker, ReviewPolicyCheckpointInput, + RunControlActionRequest, StateStore, }, tracker::records::{LinearExecutionEventIdentity, LinearExecutionEventRecord}, }; @@ -62,6 +63,28 @@ fn sample_pub_101_review_orchestration() -> ReviewOrchestrationMarker { ) } +fn upsert_handoff_review_policy_checkpoint( + store: &StateStore, + issue_id: &str, + run_id: &str, + status: &str, + head_sha: &str, + nonclean_rounds: i64, +) { + store + .upsert_review_policy_checkpoint(ReviewPolicyCheckpointInput { + project_id: "pubfi", + issue_id, + run_id, + attempt_number: 1, + phase: "handoff", + status, + head_sha, + nonclean_rounds, + }) + .expect("review policy checkpoint should persist"); +} + #[test] fn review_markers_roundtrip_preserve_required_fields() { let store = StateStore::open_in_memory().expect("state store should open"); @@ -162,29 +185,8 @@ fn clear_review_markers_for_handoff_preserves_other_branches() { let temp_dir = TempDir::new().expect("tempdir should create"); let state_path = temp_dir.path().join("runtime.sqlite3"); let store = StateStore::open(&state_path).expect("state store should open"); - let removed_handoff = ReviewHandoffMarker::new( - "run-1", - 1, - "x/decodex-pub-101", - "https://github.com/hack-ink/decodex/pull/101", - "main", - "x/decodex-pub-101", - "08a20f7dfb9526e7421a5f095b1c6adec84e52d6", - ); - let removed_orchestration = ReviewOrchestrationMarker::new( - "run-1", - 1, - "x/decodex-pub-101", - "https://github.com/hack-ink/decodex/pull/101", - "08a20f7dfb9526e7421a5f095b1c6adec84e52d6", - "request_pending", - None, - None, - None, - 0, - 0, - None, - ); + let removed_handoff = sample_pub_101_review_handoff(); + let removed_orchestration = sample_pub_101_review_orchestration(); let kept_handoff = ReviewHandoffMarker::new( "run-2", 1, @@ -215,12 +217,32 @@ fn clear_review_markers_for_handoff_preserves_other_branches() { store .upsert_review_orchestration_marker("pubfi", "PUB-101", &removed_orchestration) .expect("removed orchestration marker should persist"); + + upsert_handoff_review_policy_checkpoint( + &store, + "PUB-101", + "run-1", + "findings", + "08a20f7dfb9526e7421a5f095b1c6adec84e52d6", + 2, + ); + store .upsert_review_handoff_marker("pubfi", "PUB-101", &kept_handoff) .expect("kept handoff marker should persist"); store .upsert_review_orchestration_marker("pubfi", "PUB-101", &kept_orchestration) .expect("kept orchestration marker should persist"); + + upsert_handoff_review_policy_checkpoint( + &store, + "PUB-101", + "run-2", + "clean", + "18a20f7dfb9526e7421a5f095b1c6adec84e52d6", + 0, + ); + store .clear_review_markers_for_handoff( "pubfi", @@ -250,6 +272,20 @@ fn clear_review_markers_for_handoff_preserves_other_branches() { .expect("kept orchestration marker should read"), Some(kept_orchestration) ); + assert!( + reopened + .review_policy_checkpoint("pubfi", "PUB-101", "run-1", 1, "handoff") + .expect("removed review policy checkpoint should read") + .is_none() + ); + + let kept_checkpoint = reopened + .review_policy_checkpoint("pubfi", "PUB-101", "run-2", 1, "handoff") + .expect("kept review policy checkpoint should read") + .expect("kept review policy checkpoint should exist"); + + assert_eq!(kept_checkpoint.status(), "clean"); + assert_eq!(kept_checkpoint.head_sha(), "18a20f7dfb9526e7421a5f095b1c6adec84e52d6"); } #[test] @@ -279,6 +315,56 @@ fn missing_review_markers_return_absent() { ); } +#[test] +fn review_policy_checkpoints_persist_reload_and_clear_for_run_attempt() { + let temp_dir = TempDir::new().expect("tempdir should create"); + let state_path = temp_dir.path().join("runtime.sqlite3"); + let store = StateStore::open(&state_path).expect("state store should open"); + let checkpoint = store + .upsert_review_policy_checkpoint(ReviewPolicyCheckpointInput { + project_id: "pubfi", + issue_id: "PUB-101", + run_id: "run-1", + attempt_number: 2, + phase: "handoff", + status: "findings", + head_sha: "abc123", + nonclean_rounds: 2, + }) + .expect("review policy checkpoint should persist"); + + assert_eq!(checkpoint.project_id(), "pubfi"); + assert_eq!(checkpoint.issue_id(), "PUB-101"); + assert_eq!(checkpoint.run_id(), "run-1"); + assert_eq!(checkpoint.attempt_number(), 2); + assert_eq!(checkpoint.phase(), "handoff"); + assert_eq!(checkpoint.status(), "findings"); + assert_eq!(checkpoint.head_sha(), "abc123"); + assert_eq!(checkpoint.nonclean_rounds(), 2); + assert!(!checkpoint.updated_at().is_empty()); + assert!(checkpoint.updated_at_unix() > 0); + + let reopened = StateStore::open(&state_path).expect("reopened state store should open"); + let reloaded = reopened + .review_policy_checkpoint("pubfi", "PUB-101", "run-1", 2, "handoff") + .expect("review policy checkpoint should read") + .expect("review policy checkpoint should exist"); + + assert_eq!(reloaded.status(), "findings"); + assert_eq!(reloaded.nonclean_rounds(), 2); + + reopened + .clear_review_policy_checkpoints_for_run_attempt("pubfi", "PUB-101", "run-1", 2) + .expect("review policy checkpoint should clear"); + + assert!( + reopened + .review_policy_checkpoint("pubfi", "PUB-101", "run-1", 2, "handoff") + .expect("cleared review policy checkpoint should read") + .is_none() + ); +} + #[test] fn persistent_review_markers_survive_stale_store_persist_and_are_visible() { let temp_dir = TempDir::new().expect("tempdir should create"); @@ -2061,6 +2147,16 @@ fn canonicalize_issue_identity_retargets_persistent_rows_without_cache_refresh() writer .upsert_review_orchestration_marker("pubfi", "PUB-101", &orchestration) .expect("orchestration marker should persist"); + + upsert_handoff_review_policy_checkpoint( + &writer, + "PUB-101", + "run-1", + "findings", + "08a20f7dfb9526e7421a5f095b1c6adec84e52d6", + 2, + ); + stale_store .canonicalize_issue_identity("PUB-101", "linear-id-101") .expect("identity should canonicalize from SQLite rows"); @@ -2111,6 +2207,20 @@ fn canonicalize_issue_identity_retargets_persistent_rows_without_cache_refresh() .expect("canonical orchestration should read"), Some(orchestration) ); + assert!( + reopened + .review_policy_checkpoint("pubfi", "PUB-101", "run-1", 1, "handoff") + .expect("old review policy checkpoint should read") + .is_none() + ); + + let canonical_checkpoint = reopened + .review_policy_checkpoint("pubfi", "linear-id-101", "run-1", 1, "handoff") + .expect("canonical review policy checkpoint should read") + .expect("canonical review policy checkpoint should exist"); + + assert_eq!(canonical_checkpoint.status(), "findings"); + assert_eq!(canonical_checkpoint.nonclean_rounds(), 2); } #[test] diff --git a/docs/reference/operator-control-plane.md b/docs/reference/operator-control-plane.md index d65381e2..05228653 100644 --- a/docs/reference/operator-control-plane.md +++ b/docs/reference/operator-control-plane.md @@ -121,12 +121,12 @@ work that needs full private payload values. | Surface | Owns | Does Not Own | | --- | --- | --- | -| Runtime SQLite DB | active leases, attempts, run-control channels, protocol events, private execution events, worktree mappings, retry state, retained PR state, phase timing, connector backoff, project registry | human backlog grooming or durable team-visible issue history | +| Runtime SQLite DB | active leases, attempts, run-control channels, protocol events, private execution events, worktree mappings, retry state, retained PR state, review-policy checkpoints, phase timing, connector backoff, project registry | human backlog grooming or durable team-visible issue history | | Central project config | `service_id`, repo root, worktree root, tracker/GitHub credential env-var names, enabled project registration | per-run state or issue ownership | | Project `WORKFLOW.md` | repo policy, validation gate, state names, retry/review policy | runtime ownership, queue labels, credentials, model overrides | | Linear | team-visible issue state, queue/active/manual-attention labels, coarse execution ledger comments, progress/failure/handoff/closeout summaries | high-frequency runtime truth, heartbeat, token pressure, raw attempts, private execution evidence, connector retry budgets | | GitHub | PR, checks, review comments, merge evidence, signed commit verification | queue selection or local lane ownership | -| `.decodex-run-activity` | short-lived child activity heartbeat for the active attempt, including same-boot and same-process-start liveness | durable ownership, review handoff identity, cleanup authority | +| `.decodex-run-activity` | short-lived child activity heartbeat for the active attempt, including same-boot and same-process-start liveness plus diagnostic protocol/child/account breadcrumbs | durable ownership, review handoff identity, review-policy checkpoint authority, cleanup authority | | `.decodex-run-control/` | local per-attempt control-channel marker files for active runtime-owned attempts | standalone ownership proof, public tracker history, or dashboard-authored lane mutation | ## Operator Dashboard Sections @@ -392,6 +392,9 @@ map to an operator decision. it is not a scheduler contract. - Missing child-agent activity means no breakdown was captured for that run, not that the lane is invalid. +- Review-policy checkpoint state comes from runtime SQLite, not marker files. Legacy + marker fields such as `review_policy_status` may explain an old worktree, but they + must not override the store-backed checkpoint row for handoff or repair gating. The dashboard should avoid pretending that every bucket has a fixed total budget. When a row is event-only or sub-second, the UI should present it as diagnostic event @@ -412,8 +415,8 @@ rate-limited, or unavailable. - Linear writes should stay coarse: one run-start ledger, material progress checkpoints, PR-ready/handoff, blocked/failed, landed, done, and cleanup summaries. Full structured execution evidence belongs in private runtime SQLite events. -- Fine-grained retry budgets, raw attempts, heartbeat, child buckets, token pressure, - recovery details, and process logs stay local. Logs are diagnostic text; private +- Fine-grained retry budgets, review-policy checkpoints, raw attempts, heartbeat, + child buckets, token pressure, recovery details, and process logs stay local. Logs are diagnostic text; private execution events are structured runtime evidence. - Completed lanes without Decodex Linear execution ledger records are reported as `missing` / `execution_ledger_missing`. Tracker terminal state, local attempt diff --git a/docs/spec/owned-lane-policy.md b/docs/spec/owned-lane-policy.md index 3b70d39f..8ee36721 100644 --- a/docs/spec/owned-lane-policy.md +++ b/docs/spec/owned-lane-policy.md @@ -111,7 +111,7 @@ This action requires: - an explicit retained review-handoff lineage marker is still present when the lane already crossed into post-review ownership - retained post-review re-entry can still prove the same PR lineage, even when the current repair attempt advances to a newer head on that same PR -- retained review-policy state from `.decodex-run-activity` still matches the current phase when convergence counting must continue across re-entry +- retained review-policy checkpoint state in runtime SQLite still matches the current phase when convergence counting must continue across re-entry ### `manual_intervention_required` @@ -174,8 +174,8 @@ For review-policy churn, the runtime counts only structured review checkpoints: - phase is `handoff` before the first PR-backed review handoff succeeds - phase is `repair` during retained review-repair runs -- every `findings` checkpoint increments `review_policy_nonclean_rounds` -- `clean` does not stop the lane and resets `review_policy_nonclean_rounds` to zero for the current phase +- every `findings` checkpoint increments the runtime checkpoint's non-clean round count +- `clean` does not stop the lane and resets that non-clean round count to zero for the current phase - recording `issue_review_handoff` or `issue_review_repair_complete` clears the retained review-policy state Review-policy stops are also the only review failures eligible for a future diff --git a/docs/spec/runtime.md b/docs/spec/runtime.md index 958eb596..456463f5 100644 --- a/docs/spec/runtime.md +++ b/docs/spec/runtime.md @@ -31,7 +31,7 @@ Defines: The runtime scope, source-of-truth boundaries, eligibility rules, lane ## Source of truth boundaries -- The Decodex runtime SQLite database is the single-machine source of truth for active leases, attempts, run-control channels, protocol events, private execution events, worktree mappings, retained PR state, retry state, phase timing, project registration, tracker cache, PR cache, and connector backoff. +- The Decodex runtime SQLite database is the single-machine source of truth for active leases, attempts, run-control channels, protocol events, private execution events, worktree mappings, retained PR state, review-policy checkpoints, retry state, phase timing, project registration, tracker cache, PR cache, and connector backoff. - Linear remains the team-visible tracker surface for issue lifecycle, queue/active/manual-attention labels, and coarse lifecycle summaries such as start, PR-ready, blocked, failed, landed, and done. - Versioned Linear execution event comments use the schema in [`linear-execution-ledger.md`](./linear-execution-ledger.md), but fine-grained runtime truth must not be rebuilt from comments every tick. @@ -55,6 +55,7 @@ mirror: | --- | --- | | Runtime SQLite `private_execution_events` | Structured private execution evidence for the local Decodex installation. This is where full checkpoint payloads, verification notes, local head evidence, and recovery detail belong. | | Runtime SQLite `run_control_channels` | Local control capability metadata for active run attempts. It records the project, issue, run id, attempt, transport, local channel path, channel status, and publish/update timestamps needed to route future control requests without bypassing active lease ownership. | +| Runtime SQLite `review_policy_checkpoints` | Latest bounded-review checkpoint state for one project, issue, run, attempt, and phase. This row is the authority for review handoff and retained repair gating. | | Agent evidence under `~/.codex/decodex/agent-evidence//` | Derived local handoff view for repair agents. It may reference private evidence readback commands and compact run capsules, but it is not scheduling authority and is not a public mirror. | | Logs under `~/.codex/decodex/logs/` and `.decodex-run-activity` | Diagnostic process and liveness signals. They may explain what a local process did, but they are not the structured execution ledger and must not be replayed as tracker state. | | Linear execution ledger comments | Low-frequency public projection for team-visible lifecycle state. They carry coarse start, progress phase, PR, handoff, failure, landing, closeout, and cleanup summaries only. | @@ -69,6 +70,7 @@ The following facts are local runtime truth and must not be rebuilt from Linear - active run-control channel metadata and local control audit events - protocol events, event counts, event timestamps, and thread/liveness hydration fields - private execution events carrying structured local evidence for an issue/run/attempt +- review-policy checkpoint state: current phase, normalized status, lane head, and consecutive non-clean round count - retry and backoff state: queued retry kind, due time, retry budget, and connector backoff - phase timing and operator activity summaries - retained worktree mappings, retained PR handoff identity, post-review phase, and cleanup or repair ownership @@ -280,7 +282,7 @@ When `codex.internal_review_mode = "loop"`, handoff and retained review-repair r - latest checkpoint `findings` with three or more consecutive non-clean rounds in the same phase: fail the turn through the human-required failure path - latest checkpoint `needs_architecture_review` or `blocked`: fail the turn through the human-required failure path -`decodex` persists this review-policy state in the retained lane's `.decodex-run-activity` marker using `review_policy_phase`, `review_policy_status`, `review_policy_head_sha`, and `review_policy_nonclean_rounds`. Recording `issue_review_handoff` or `issue_review_repair_complete` clears those fields. When `codex.internal_review_mode = "prompt"` or `"off"`, Decodex does not expose `issue_review_checkpoint`, does not require a clean checkpoint before review handoff or repair completion, and ignores stale review-policy state while classifying clean turn boundaries. +`decodex` persists this review-policy state in the runtime SQLite `review_policy_checkpoints` table keyed by project, issue, run, attempt, and phase. The stored row contains `phase`, `status`, `head_sha`, and `nonclean_rounds`, and it is the only authority used to require a current clean checkpoint before `issue_review_handoff` or `issue_review_repair_complete`. Legacy `.decodex-run-activity` marker fields with the same names may be parsed to seed a missing runtime row during compatibility recovery, but a runtime-store checkpoint always wins over stale marker values. Recording `issue_review_handoff` or `issue_review_repair_complete` clears the runtime checkpoint for that run attempt and only best-effort clears any legacy marker fields. When `codex.internal_review_mode = "prompt"` or `"off"`, Decodex does not expose `issue_review_checkpoint`, does not require a clean checkpoint before review handoff or repair completion, and ignores stale review-policy state while classifying clean turn boundaries. The review-policy human-required failure path is also the boundary for any later runtime-owned research escalation. The current runtime must not dispatch research from a @@ -384,14 +386,15 @@ The runtime database stores at least: - private execution events scoped by project, issue, run, and attempt - worktree mappings - retained PR and post-review state +- review-policy checkpoints - retry state and retry budgets - phase timing and operator activity summaries - tracker and PR cache rows needed to survive connector outages - typed connector health and external API backoff -For child supervision, the active lane may also carry a short-lived worktree heartbeat marker at `.worktrees//.decodex-run-activity`. That marker is advisory, keyed to the current `run_id` plus `attempt_number`, and exists so the control plane can observe child activity across process boundaries, surface active thread and protocol progress in operator status, and keep high-frequency telemetry out of Linear. When the marker records process liveness, it must pair `process_id` with both host boot identity (`host_boot_id`) and process start identity (`process_start_identity`). A marker from a previous boot, a marker missing either identity, a marker whose process start identity no longer matches the live PID, a marker whose PID has exited into an unreaped zombie state, or a marker observed while Decodex cannot read the current host or process identity must not be treated as a live process even if that PID currently exists. Operator snapshots expose `process_liveness_reason` so operators can distinguish stopped processes, previous-boot markers, and same-boot PID reuse from genuine live execution. The marker may also carry an additive `child_agent_activity` JSON summary for the current attempt; that summary is diagnostic state for operator snapshots, not durable scheduling authority. Operator snapshots must keep queue ownership separate from execution liveness: `active_lease` and `queue_lease_state` describe the local queue lease, while `execution_liveness` describes the observed process, app-server thread, or protocol marker that keeps an active lane visible. If a raw attempt is still `starting` after app-server thread, model, or protocol activity is observed, operator-facing `status` must report `running` and preserve the raw value in `attempt_status`. High-frequency heartbeat, child-agent buckets, token counts, idle ages, and other transient liveness details stay local/operator-only under the boundary defined by [`linear-execution-ledger.md`](./linear-execution-ledger.md). +For child supervision, the active lane may also carry a short-lived worktree heartbeat marker at `.worktrees//.decodex-run-activity`. That marker is advisory, keyed to the current `run_id` plus `attempt_number`, and exists so the control plane can observe child activity across process boundaries, surface active thread and protocol progress in operator status, and keep high-frequency telemetry out of Linear. When the marker records process liveness, it must pair `process_id` with both host boot identity (`host_boot_id`) and process start identity (`process_start_identity`). A marker from a previous boot, a marker missing either identity, a marker whose process start identity no longer matches the live PID, a marker whose PID has exited into an unreaped zombie state, or a marker observed while Decodex cannot read the current host or process identity must not be treated as a live process even if that PID currently exists. Operator snapshots expose `process_liveness_reason` so operators can distinguish stopped processes, previous-boot markers, and same-boot PID reuse from genuine live execution. The marker may also carry additive `child_agent_activity`, protocol, account, and legacy review-policy JSON or scalar fields for operator diagnostics. Legacy review-policy marker fields are breadcrumbs only: review-policy gating belongs to the runtime store and must not be overwritten from marker values. Operator snapshots must keep queue ownership separate from execution liveness: `active_lease` and `queue_lease_state` describe the local queue lease, while `execution_liveness` describes the observed process, app-server thread, or protocol marker that keeps an active lane visible. If a raw attempt is still `starting` after app-server thread, model, or protocol activity is observed, operator-facing `status` must report `running` and preserve the raw value in `attempt_status`. High-frequency heartbeat, child-agent buckets, token counts, idle ages, and other transient liveness details stay local/operator-only under the boundary defined by [`linear-execution-ledger.md`](./linear-execution-ledger.md). Post-review ownership is stored in the runtime database. Retained handoff rows record the authoritative PR URL, branch lineage, validated PR head OID, run id, and attempt number that completed the `In Review` handoff. Retained orchestration rows record the current post-review phase for that exact handoff identity. If the matching database row is missing, post-review ownership must block as unresolved instead of rebinding from branch-name, current-head, Linear comments, or other heuristics. If a retained review marker exists but a stored handoff or orchestration head no longer matches a clean retained worktree and matching PR head, operator status must keep the marker PR URL visible when known and recovery diagnosis must report the concrete mismatched field before any explicit rebind refresh. When retained PR readback degrades but the handoff identity is still safe to preserve, operator-local status may expose a typed `readback_root_cause` diagnostic such as missing GitHub CLI, missing GitHub token, GitHub auth failure, API/read failure, parse/shape failure, or lineage validation failure while keeping public-safe warning reasons such as `pull_request_state_read_failed` stable. -The only source-tree marker that clean-source checks may ignore is the untracked `.decodex-run-activity` heartbeat marker. Review handoff, orchestration, retry, phase timing, and retained PR state belong in the Decodex runtime database, not in root-level or worktree-local review marker files. +The only source-tree marker that clean-source checks may ignore is the untracked `.decodex-run-activity` heartbeat marker. Durable review handoff, orchestration, review-policy checkpoints, retry, phase timing, and retained PR state belong in the Decodex runtime database, not in root-level or worktree-local review marker files. If the heartbeat marker carries similarly named fields for compatibility or operator diagnostics, those breadcrumb values cannot override runtime-store rows. ### Dispatch-slot handoff invariant @@ -465,7 +468,7 @@ After a process restart, recent-run history, active lease ownership, retained po - While the control plane is running an active lane, every poll tick must refresh cached tracker state for the leased issue before considering any new selection. - While the control plane is running an active lane, that child must keep the workflow snapshot it started with; project `WORKFLOW.md` reloads affect later decisions without restarting the in-flight child. - While the control plane is supervising an active child process, stall detection must consult the child-updated `.decodex-run-activity` marker for the current `run_id` plus `attempt_number` and the persisted runtime event journal. A retained marker only proves a live process when its PID is still alive on the current host boot and the process start identity still matches; after power loss, reboot, or same-boot PID reuse, recovery must clear the reconstructed lease and re-enter the retained lane through retry-style dispatch instead of preserving the old running state. -- Retry-style recovery prompts must tell the next agent to treat the current worktree, tracker state, protocol events, and marker files as durable truth, inspect the branch/diff/recent validation evidence first, and continue from partial work rather than assuming prior in-memory model/tool state survived. +- Retry-style recovery prompts must tell the next agent to treat the current worktree, tracker state, runtime-store records, and protocol events as durable truth, use marker files only as diagnostic liveness breadcrumbs, inspect the branch/diff/recent validation evidence first, and continue from partial work rather than assuming prior in-memory model/tool state survived. - While the control plane owns a queued retry entry, that queued claim must take priority over normal candidate selection for the affected project. - While the control plane is idle between lanes, it may reload the configured project `WORKFLOW.md` on each tick and immediately apply a newly valid document to future dispatch, retry, post-exit reconciliation, and prompt generation. - If that same configured `WORKFLOW.md` path becomes invalid after a successful load, the control plane must log the reload failure and keep the last known good document active instead of dropping the tick or clearing runtime policy. @@ -482,7 +485,7 @@ After a process restart, recent-run history, active lease ownership, retained po - A leased issue that is still in a configured startable state during early control-plane ticks must be treated as a lane that has not finished claiming tracker ownership yet, not as an immediate non-active interruption. - If a running attempt exceeds the app-server idle timeout with no recorded protocol activity, `decodex` must treat it as stalled, stop the active run, mark the attempt `stalled`, and converge the issue through the human-required failure path instead of silently retrying in this phase. - If the supervised child already exited before the next control-plane tick, stalled reconciliation must still inspect the just-finished lane using recorded protocol activity rather than skipping directly to generic failure handling. -- Operator status snapshots must expose structured liveness and wait-state fields derived from persisted run markers, including current phase, optional wait reason, current operation, last run/protocol/progress times, idle age, a soft `suspected_stall` signal, and any queued retry kind plus due time, so operators can distinguish active execution from continuation waits, retry backoff, early stall suspicion, and genuine hard stalls without inferring progress from filesystem churn. +- Operator status snapshots must expose structured liveness and wait-state fields derived from runtime records plus marker breadcrumbs, including current phase, optional wait reason, current operation, last run/protocol/progress times, idle age, a soft `suspected_stall` signal, and any queued retry kind plus due time, so operators can distinguish active execution from continuation waits, retry backoff, early stall suspicion, and genuine hard stalls without inferring progress from filesystem churn. - Operator status snapshots may expose an additive `child_agent_activity` object when app-server protocol events have produced one for the current run. The object must stay machine-readable and dashboard/CLI shared, and should describe dynamic observed buckets rather than a fixed workflow: current child bucket and elapsed time, bucket wall/event/tool counts, current/max/cumulative input tokens, cumulative output tokens, largest tool output, and warnings for repeated large outputs. Missing `child_agent_activity` means no child breakdown was captured; existing JSON consumers must continue to work without it. - If the agent Git credential preflight fails, operator status must report the retained lane as a credential failure requiring operator recovery, not as a still-running lane. - If retry budget or needs-attention recovery finds tracked changes in the retained worktree, operator status must report retained partial progress rather than only a generic retry-budget hold. The failure class may be `partial_progress_retained` when no more specific runtime error class is available. Operators should then inspect the patch, finish validation and PR handoff if it is useful, or reset the retained worktree explicitly.