Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 49 additions & 8 deletions apps/decodex/src/orchestrator/operator_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ const OPERATOR_DASHBOARD_LOGO_ICO: &[u8] =
const OPERATOR_DASHBOARD_LOGO_TOUCH_PNG: &[u8] =
include_bytes!(concat!(env!("CARGO_MANIFEST_DIR"), "/src/orchestrator/assets/logo-touch.png"));
const OPERATOR_HTTP_READ_TIMEOUT: Duration = Duration::from_millis(250);
const DASHBOARD_RUN_ACTIVITY_FINGERPRINT_VOLATILE_FIELDS: &[&str] = &[
"idle_for_seconds",
"protocol_idle_for_seconds",
"current_elapsed_seconds",
"wall_seconds",
];

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum OperatorRequestRoute {
Expand Down Expand Up @@ -534,27 +540,62 @@ fn build_operator_run_activity_event(
active_runs.extend(project_active_runs);
}

let fingerprint_payload = json!({
let fingerprint_payload = dashboard_run_activity_fingerprint_payload(
&account_control,
&accounts,
&active_runs,
);
let fingerprint = serde_json::to_vec(&fingerprint_payload)?;
let payload = json!({
"emittedAtUnixEpoch": now_unix_epoch,
"accountControl": &account_control,
"accounts": &accounts,
"activeRuns": &active_runs,
"activeRunsComplete": true,
"activeRunScope": "complete",
});
let fingerprint = serde_json::to_vec(&fingerprint_payload)?;
let payload = json!({
"emittedAtUnixEpoch": now_unix_epoch,

Ok(DashboardRunActivityEvent {
fingerprint,
event: DashboardBroadcastEvent { event_type: "runActivity", payload },
})
}

fn dashboard_run_activity_fingerprint_payload(
account_control: &OperatorCodexAccountControlStatus,
accounts: &[CodexAccountActivitySummary],
active_runs: &[OperatorRunStatus],
) -> Value {
let mut fingerprint_payload = json!({
"accountControl": account_control,
"accounts": accounts,
"activeRuns": active_runs,
"activeRunsComplete": true,
"activeRunScope": "complete",
});

Ok(DashboardRunActivityEvent {
fingerprint,
event: DashboardBroadcastEvent { event_type: "runActivity", payload },
})
strip_dashboard_run_activity_volatile_fields(&mut fingerprint_payload);

fingerprint_payload
}

fn strip_dashboard_run_activity_volatile_fields(value: &mut Value) {
match value {
Value::Object(object) => {
for field in DASHBOARD_RUN_ACTIVITY_FINGERPRINT_VOLATILE_FIELDS {
object.remove(*field);
}
for child in object.values_mut() {
strip_dashboard_run_activity_volatile_fields(child);
}
},
Value::Array(values) => {
for child in values {
strip_dashboard_run_activity_volatile_fields(child);
}
},
_ => {},
}
}

fn dashboard_current_snapshot_event_payload(
Expand Down
84 changes: 84 additions & 0 deletions apps/decodex/src/orchestrator/tests/operator/status/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,90 @@ fn operator_dashboard_run_activity_event_summarizes_active_runs() {
assert_eq!(data["activeRuns"][0]["protocol_activity"]["waiting_reason"], "model");
assert_eq!(data["activeRuns"][0]["account"]["account_fingerprint"], "acct-1");
assert_eq!(data["activeRuns"][0]["accounts"][0]["account_fingerprint"], "acct-1");
assert!(data["activeRuns"][0].get("idle_for_seconds").is_some());
assert!(data["activeRuns"][0].get("protocol_idle_for_seconds").is_some());
assert!(fingerprint["activeRuns"][0].get("idle_for_seconds").is_none());
assert!(fingerprint["activeRuns"][0].get("protocol_idle_for_seconds").is_none());
}

#[test]
fn operator_dashboard_run_activity_fingerprint_ignores_volatile_timing_fields() {
let mut first = serde_json::json!({
"accountControl": {
"mode": "balanced",
"account_selector": null,
},
"accounts": [],
"activeRuns": [
{
"run_id": "run-1",
"status": "running",
"phase": "executing",
"idle_for_seconds": 4,
"protocol_idle_for_seconds": 3,
"child_agent_activity": {
"current_bucket": "model",
"current_elapsed_seconds": 2,
"buckets": [
{
"bucket": "model",
"wall_seconds": 2,
"event_count": 7,
},
],
},
},
],
"activeRunsComplete": true,
"activeRunScope": "complete",
});
let mut second = serde_json::json!({
"accountControl": {
"mode": "balanced",
"account_selector": null,
},
"accounts": [],
"activeRuns": [
{
"run_id": "run-1",
"status": "running",
"phase": "executing",
"idle_for_seconds": 5,
"protocol_idle_for_seconds": 4,
"child_agent_activity": {
"current_bucket": "model",
"current_elapsed_seconds": 3,
"buckets": [
{
"bucket": "model",
"wall_seconds": 3,
"event_count": 7,
},
],
},
},
],
"activeRunsComplete": true,
"activeRunScope": "complete",
});

orchestrator::strip_dashboard_run_activity_volatile_fields(&mut first);
orchestrator::strip_dashboard_run_activity_volatile_fields(&mut second);

assert_eq!(first, second);
assert_eq!(first["activeRuns"][0]["run_id"], "run-1");
assert_eq!(first["activeRuns"][0]["child_agent_activity"]["buckets"][0]["event_count"], 7);
assert!(first["activeRuns"][0].get("idle_for_seconds").is_none());
assert!(
first["activeRuns"][0]["child_agent_activity"]
.get("current_elapsed_seconds")
.is_none()
);
assert!(
first["activeRuns"][0]["child_agent_activity"]["buckets"][0]
.get("wall_seconds")
.is_none()
);
}

#[test]
Expand Down