Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 115 additions & 34 deletions apps/decodex/src/agent/tracker_tool_bridge/review.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ use crate::{
TrackerToolBridge,
},
prelude::eyre,
state::{self, ReviewHandoffMarker, ReviewOrchestrationMarker},
state::{
self, ReviewHandoffMarker, ReviewOrchestrationMarker, ReviewPolicyCheckpoint,
ReviewPolicyCheckpointInput, StateStore,
},
tracker::{
self, TrackerIssue,
records::{self, LinearExecutionEventPublicProjection},
Expand Down Expand Up @@ -71,31 +74,40 @@ impl<'a> TrackerToolBridge<'a> {
)
}

pub(super) fn cache_review_policy_state_best_effort(
pub(super) fn persist_review_policy_state(
&self,
review_context: &ReviewHandoffContext,
review_policy_phase: ReviewPolicyPhase,
review_policy_status: ReviewPolicyStatus,
head_sha: &str,
nonclean_rounds: i64,
) {
if let Err(error) = state::write_run_review_policy_state(
&review_context.cwd,
&review_context.run_id,
review_context.attempt_number,
review_policy_phase.as_str(),
review_policy_status.as_str(),
head_sha,
nonclean_rounds,
) {
tracing::warn!(
?error,
issue = self.issue.identifier,
run_id = review_context.run_id,
worktree_path = %review_context.cwd.display(),
"Review policy marker write failed; continuing with runtime state."
);
}
) -> Result<(), String> {
let state_store = self.state_store.ok_or_else(|| {
format!(
"Runtime state store is required to persist review checkpoint state for issue `{}`.",
self.issue.identifier
)
})?;

state_store
.upsert_review_policy_checkpoint(ReviewPolicyCheckpointInput {
project_id: &review_context.service_id,
issue_id: &self.issue.id,
run_id: &review_context.run_id,
attempt_number: review_context.attempt_number,
phase: review_policy_phase.as_str(),
status: review_policy_status.as_str(),
head_sha,
nonclean_rounds,
})
.map_err(|error| {
format!(
"Failed to persist review checkpoint state for issue `{}`: {error}",
self.issue.identifier
)
})?;

Ok(())
}

pub(super) fn ensure_issue_scope(&self, scope: &ScopeArgs) -> Result<(), String> {
Expand Down Expand Up @@ -874,37 +886,106 @@ impl<'a> TrackerToolBridge<'a> {
let Some(current_phase) = ReviewPolicyPhase::for_mode(review_context.mode) else {
return Ok(None);
};
let Some(state_store) = self.state_store else {
return Ok(None);
};

if let Some(checkpoint) = state_store.review_policy_checkpoint(
&review_context.service_id,
&self.issue.id,
&review_context.run_id,
review_context.attempt_number,
current_phase.as_str(),
)? {
return self.review_policy_state_from_checkpoint(checkpoint).map(Some);
}

self.migrate_legacy_review_policy_marker(review_context, state_store, current_phase)
}

fn review_policy_state_from_checkpoint(
&self,
checkpoint: ReviewPolicyCheckpoint,
) -> crate::prelude::Result<ReviewPolicyState> {
let phase =
ReviewPolicyPhase::parse(checkpoint.phase()).map_err(|error| eyre::eyre!(error))?;
let status =
ReviewPolicyStatus::parse(checkpoint.status()).map_err(|error| eyre::eyre!(error))?;

Ok(ReviewPolicyState {
phase,
status,
head_sha: checkpoint.head_sha().to_owned(),
nonclean_rounds: checkpoint.nonclean_rounds(),
})
}

fn migrate_legacy_review_policy_marker(
&self,
review_context: &ReviewHandoffContext,
state_store: &StateStore,
current_phase: ReviewPolicyPhase,
) -> crate::prelude::Result<Option<ReviewPolicyState>> {
let Some(marker) = state::read_run_activity_marker_snapshot(&review_context.cwd)? else {
return Ok(None);
};

if marker.run_id() != review_context.run_id
|| marker.attempt_number() != review_context.attempt_number
{
return Ok(None);
}

let Some(phase) = marker.review_policy_phase() else {
return Ok(None);
};
let Some(status) = marker.review_policy_status() else {
eyre::bail!(
"Run activity marker for issue `{}` is missing `review_policy_status`.",
self.issue.identifier
let (Some(status), Some(head_sha), Some(nonclean_rounds)) = (
marker.review_policy_status(),
marker.review_policy_head_sha(),
marker.review_policy_nonclean_rounds(),
) else {
tracing::warn!(
issue = self.issue.identifier,
run_id = review_context.run_id,
"Legacy review policy marker is incomplete; ignoring marker policy fields."
);

return Ok(None);
};
let Some(head_sha) = marker.review_policy_head_sha() else {
eyre::bail!(
"Run activity marker for issue `{}` is missing `review_policy_head_sha`.",
self.issue.identifier
let Ok(phase) = ReviewPolicyPhase::parse(phase) else {
tracing::warn!(
issue = self.issue.identifier,
run_id = review_context.run_id,
"Legacy review policy marker has an invalid phase; ignoring marker policy fields."
);

return Ok(None);
};
let Some(nonclean_rounds) = marker.review_policy_nonclean_rounds() else {
eyre::bail!(
"Run activity marker for issue `{}` is missing `review_policy_nonclean_rounds`.",
self.issue.identifier
let Ok(status) = ReviewPolicyStatus::parse(status) else {
tracing::warn!(
issue = self.issue.identifier,
run_id = review_context.run_id,
"Legacy review policy marker has an invalid status; ignoring marker policy fields."
);

return Ok(None);
};
let phase = ReviewPolicyPhase::parse(phase).map_err(|error| eyre::eyre!(error))?;
let status = ReviewPolicyStatus::parse(status).map_err(|error| eyre::eyre!(error))?;

if phase != current_phase {
return Ok(None);
}

state_store.upsert_review_policy_checkpoint(ReviewPolicyCheckpointInput {
project_id: &review_context.service_id,
issue_id: &self.issue.id,
run_id: &review_context.run_id,
attempt_number: review_context.attempt_number,
phase: phase.as_str(),
status: status.as_str(),
head_sha,
nonclean_rounds,
})?;

Ok(Some(ReviewPolicyState {
phase,
status,
Expand Down
57 changes: 56 additions & 1 deletion apps/decodex/src/agent/tracker_tool_bridge/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use crate::{
},
config::InternalReviewMode,
prelude::eyre,
state::{self, ReviewHandoffMarker, ReviewOrchestrationMarker, StateStore},
state::{
self, ReviewHandoffMarker, ReviewOrchestrationMarker, ReviewPolicyCheckpoint, StateStore,
},
tracker::{
IssueTracker, TrackerComment, TrackerIssue, TrackerLabel, TrackerState, TrackerTeam,
privacy_classifier::{
Expand Down Expand Up @@ -575,6 +577,59 @@ fn bridge_state_store<'a>(bridge: &TrackerToolBridge<'a>) -> &'a StateStore {
bridge.state_store.expect("test bridge should have a runtime state store")
}

fn persisted_review_policy_checkpoint(
bridge: &TrackerToolBridge<'_>,
issue: &TrackerIssue,
review_context: &ReviewHandoffContext,
) -> ReviewPolicyCheckpoint {
let phase = match review_context.mode {
ReviewExecutionMode::Handoff => "handoff",
ReviewExecutionMode::Repair => "repair",
ReviewExecutionMode::Closeout => {
panic!("closeout does not support review checkpoints")
},
};

bridge_state_store(bridge)
.review_policy_checkpoint(
&review_context.service_id,
&issue.id,
&review_context.run_id,
review_context.attempt_number,
phase,
)
.expect("review policy checkpoint should read")
.expect("review policy checkpoint should exist")
}

fn assert_review_policy_checkpoint_cleared(
bridge: &TrackerToolBridge<'_>,
issue: &TrackerIssue,
review_context: &ReviewHandoffContext,
) {
let phase = match review_context.mode {
ReviewExecutionMode::Handoff => "handoff",
ReviewExecutionMode::Repair => "repair",
ReviewExecutionMode::Closeout => {
panic!("closeout does not support review checkpoints")
},
};

assert!(
bridge_state_store(bridge)
.review_policy_checkpoint(
&review_context.service_id,
&issue.id,
&review_context.run_id,
review_context.attempt_number,
phase,
)
.expect("review policy checkpoint should read")
.is_none(),
"review policy checkpoint should be cleared after completion"
);
}

fn persisted_review_handoff_marker(
bridge: &TrackerToolBridge<'_>,
issue: &TrackerIssue,
Expand Down
Loading