From 16e07e27d002139fbe0ad6502c689cbbceb316d9 Mon Sep 17 00:00:00 2001 From: Yvette Carlisle Date: Tue, 2 Jun 2026 14:29:17 +0800 Subject: [PATCH] {"schema":"decodex/commit/1","summary":"Stabilize account run capsules","authority":"manual"} --- .../Sources/DecodexApp/AccountStore.swift | 1 + .../DecodexApp/OperatorSnapshotModels.swift | 43 ++++- .../DecodexAppTests/AccountModelTests.swift | 164 ++++++++++++++++++ .../src/orchestrator/operator_dashboard.html | 32 +++- .../decodex/src/orchestrator/operator_http.rs | 6 + .../tests/operator/status/dashboard.rs | 13 +- .../tests/operator/status/http.rs | 6 + docs/reference/operator-control-plane.md | 5 +- 8 files changed, 260 insertions(+), 10 deletions(-) diff --git a/apps/decodex-app/Sources/DecodexApp/AccountStore.swift b/apps/decodex-app/Sources/DecodexApp/AccountStore.swift index 5db28fb8..f08b2baf 100644 --- a/apps/decodex-app/Sources/DecodexApp/AccountStore.swift +++ b/apps/decodex-app/Sources/DecodexApp/AccountStore.swift @@ -234,6 +234,7 @@ final class AccountStore: ObservableObject { let activity = OperatorRunActivitySnapshot( activeRuns: activeRuns, + activeRunsComplete: payload.activeRunsComplete ?? true, emittedAt: payload.emittedAt ?? Date() ) if let operatorSnapshotUpdatedAt, diff --git a/apps/decodex-app/Sources/DecodexApp/OperatorSnapshotModels.swift b/apps/decodex-app/Sources/DecodexApp/OperatorSnapshotModels.swift index 5c1100ee..cb148242 100644 --- a/apps/decodex-app/Sources/DecodexApp/OperatorSnapshotModels.swift +++ b/apps/decodex-app/Sources/DecodexApp/OperatorSnapshotModels.swift @@ -80,13 +80,30 @@ struct OperatorSnapshotResponse: Decodable, Sendable { activeRuns(for: account).count } - func mergingRunActivity(_ activityRuns: [OperatorRunStatus]) -> OperatorSnapshotResponse { + func mergingRunActivity( + _ activityRuns: [OperatorRunStatus], + activeRunsComplete: Bool = true + ) -> OperatorSnapshotResponse { + let activityRunsByID = activityRuns.reduce(into: [String: OperatorRunStatus]()) { runsByID, run in + runsByID[run.runID] = run + } let snapshotRunsByID = activeRuns.reduce(into: [String: OperatorRunStatus]()) { runsByID, run in runsByID[run.runID] = run } - let mergedRuns = activityRuns.map { activityRun in - snapshotRunsByID[activityRun.runID]?.mergingActivity(activityRun) ?? activityRun + let mergedSnapshotRuns = activeRuns.compactMap { snapshotRun -> OperatorRunStatus? in + if let activityRun = activityRunsByID[snapshotRun.runID] { + return snapshotRun.mergingActivity(activityRun) + } + if activeRunsComplete || activityRuns.isEmpty { + return nil + } + + return snapshotRun.shouldRetainDuringPartialRunActivity ? snapshotRun : nil + } + let newActivityRuns = activityRuns.filter { activityRun in + snapshotRunsByID[activityRun.runID] == nil } + let mergedRuns = mergedSnapshotRuns + newActivityRuns let activeCountsByProject = Dictionary(grouping: mergedRuns.compactMap(\.projectID)) { $0 } .mapValues(\.count) let mergedProjects = projects.map { project in @@ -346,6 +363,14 @@ struct OperatorRunStatus: Decodable, Identifiable, Sendable { func isAssigned(to account: CodexAccount) -> Bool { self.account?.matches(account) == true + || accounts.contains { $0.isSelected && $0.matches(account) } + } + + var shouldRetainDuringPartialRunActivity: Bool { + activeLease == true + || processAlive == true + || status == "running" + || phase == "executing" } func mergingActivity(_ activity: OperatorRunStatus) -> OperatorRunStatus { @@ -540,6 +565,7 @@ struct OperatorDashboardSocketPayload: Decodable, Sendable { let snapshotPublishedAtUnixEpoch: Int64? let snapshot: OperatorSnapshotResponse? let activeRuns: [OperatorRunStatus]? + let activeRunsComplete: Bool? var emittedAt: Date? { date(fromUnixEpoch: emittedAtUnixEpoch) @@ -554,11 +580,13 @@ struct OperatorDashboardSocketPayload: Decodable, Sendable { case snapshotPublishedAtUnixEpoch case snapshot case activeRuns + case activeRunsComplete } } struct OperatorRunActivitySnapshot: Sendable { let activeRuns: [OperatorRunStatus] + let activeRunsComplete: Bool let emittedAt: Date func shouldOverlay(snapshotPublishedAt: Date?) -> Bool { @@ -570,7 +598,7 @@ struct OperatorRunActivitySnapshot: Sendable { } func merging(into snapshot: OperatorSnapshotResponse) -> OperatorSnapshotResponse { - snapshot.mergingRunActivity(activeRuns) + snapshot.mergingRunActivity(activeRuns, activeRunsComplete: activeRunsComplete) } } @@ -663,6 +691,11 @@ struct OperatorChildAgentBucket: Decodable, Identifiable, Sendable { struct OperatorRunAccountSummary: Decodable, Sendable { let accountFingerprint: String let email: String? + let status: String? + + var isSelected: Bool { + status?.caseInsensitiveCompare("selected") == .orderedSame + } func matches(_ account: CodexAccount) -> Bool { if accountFingerprint.isEmpty == false, accountFingerprint == account.accountFingerprint { @@ -679,6 +712,7 @@ struct OperatorRunAccountSummary: Decodable, Sendable { case accountFingerprint = "account_fingerprint" case email case accountEmail = "account_email" + case status } init(from decoder: Decoder) throws { @@ -687,6 +721,7 @@ struct OperatorRunAccountSummary: Decodable, Sendable { accountFingerprint = try container.decodeIfPresent(String.self, forKey: .accountFingerprint) ?? "" email = try container.decodeIfPresent(String.self, forKey: .email) ?? container.decodeIfPresent(String.self, forKey: .accountEmail) + status = try container.decodeIfPresent(String.self, forKey: .status) } } diff --git a/apps/decodex-app/Tests/DecodexAppTests/AccountModelTests.swift b/apps/decodex-app/Tests/DecodexAppTests/AccountModelTests.swift index 13064eb1..67d602fa 100644 --- a/apps/decodex-app/Tests/DecodexAppTests/AccountModelTests.swift +++ b/apps/decodex-app/Tests/DecodexAppTests/AccountModelTests.swift @@ -144,6 +144,46 @@ final class AccountModelTests: XCTestCase { XCTAssertTrue(snapshot.activeRuns(for: poolOnlyAccount).isEmpty) } + func testOperatorSnapshotAssignsSelectedAccountWhenPrimaryAccountIsMissing() throws { + let assignedAccount = makeAccount( + status: "available", + email: "copy@example.com", + accountFingerprint: "...123456" + ) + let poolOnlyAccount = makeAccount( + status: "available", + email: "pool@example.com", + accountFingerprint: "...654321" + ) + let payload = """ + { + "active_runs": [ + { + "run_id": "run-1", + "issue_identifier": "XY-689", + "accounts": [ + { + "email": "copy@example.com", + "account_fingerprint": "...123456", + "status": "selected" + }, + { + "email": "pool@example.com", + "account_fingerprint": "...654321", + "status": "available" + } + ] + } + ] + } + """.data(using: .utf8)! + + let snapshot = try JSONDecoder().decode(OperatorSnapshotResponse.self, from: payload) + + XCTAssertEqual(snapshot.activeRuns(for: assignedAccount).map(\.runID), ["run-1"]) + XCTAssertTrue(snapshot.activeRuns(for: poolOnlyAccount).isEmpty) + } + func testOperatorRunActivityOverlayDoesNotReplaceNewerSnapshot() throws { let account = makeAccount( status: "available", @@ -185,6 +225,7 @@ final class AccountModelTests: XCTestCase { .activeRuns ?? [] let overlay = OperatorRunActivitySnapshot( activeRuns: activity, + activeRunsComplete: true, emittedAt: Date(timeIntervalSince1970: 10) ) @@ -192,6 +233,128 @@ final class AccountModelTests: XCTestCase { XCTAssertEqual(snapshot.activeRuns(for: account).map(\.runID), ["run-new"]) } + func testPartialRunActivityPreservesSnapshotActiveRuns() throws { + let account = makeAccount( + status: "available", + email: "copy@example.com", + accountFingerprint: "...123456" + ) + let snapshotPayload = """ + { + "active_runs": [ + { + "run_id": "run-689", + "issue_identifier": "XY-689", + "active_lease": true, + "account": { + "email": "copy@example.com", + "account_fingerprint": "...123456" + } + }, + { + "run_id": "run-690", + "issue_identifier": "XY-690", + "active_lease": true, + "account": { + "email": "copy@example.com", + "account_fingerprint": "...123456" + } + } + ] + } + """.data(using: .utf8)! + let activityPayload = """ + { + "activeRunsComplete": false, + "activeRuns": [ + { + "run_id": "run-690", + "issue_identifier": "XY-690", + "account": { + "email": "copy@example.com", + "account_fingerprint": "...123456" + } + } + ] + } + """.data(using: .utf8)! + + let snapshot = try JSONDecoder().decode(OperatorSnapshotResponse.self, from: snapshotPayload) + let event = try JSONDecoder() + .decode(OperatorDashboardSocketPayload.self, from: activityPayload) + let overlay = OperatorRunActivitySnapshot( + activeRuns: event.activeRuns ?? [], + activeRunsComplete: event.activeRunsComplete ?? true, + emittedAt: Date(timeIntervalSince1970: 30) + ) + let merged = overlay.merging(into: snapshot) + + XCTAssertTrue(overlay.shouldOverlay(snapshotPublishedAt: Date(timeIntervalSince1970: 20))) + XCTAssertEqual(merged.activeRuns.map(\.runID), ["run-689", "run-690"]) + XCTAssertEqual(merged.activeRuns(for: account).map(\.runID), ["run-689", "run-690"]) + } + + func testCompleteRunActivityReplacesSnapshotActiveRuns() throws { + let account = makeAccount( + status: "available", + email: "copy@example.com", + accountFingerprint: "...123456" + ) + let snapshotPayload = """ + { + "active_runs": [ + { + "run_id": "run-689", + "issue_identifier": "XY-689", + "active_lease": true, + "account": { + "email": "copy@example.com", + "account_fingerprint": "...123456" + } + }, + { + "run_id": "run-690", + "issue_identifier": "XY-690", + "active_lease": true, + "account": { + "email": "copy@example.com", + "account_fingerprint": "...123456" + } + } + ] + } + """.data(using: .utf8)! + let activityPayload = """ + { + "activeRunsComplete": true, + "activeRuns": [ + { + "run_id": "run-690", + "issue_identifier": "XY-690", + "account": { + "email": "copy@example.com", + "account_fingerprint": "...123456" + } + } + ] + } + """.data(using: .utf8)! + + let snapshot = try JSONDecoder().decode(OperatorSnapshotResponse.self, from: snapshotPayload) + let event = try JSONDecoder() + .decode(OperatorDashboardSocketPayload.self, from: activityPayload) + let overlay = OperatorRunActivitySnapshot( + activeRuns: event.activeRuns ?? [], + activeRunsComplete: event.activeRunsComplete ?? true, + emittedAt: Date(timeIntervalSince1970: 30) + ) + let merged = overlay.merging(into: snapshot) + + XCTAssertTrue(overlay.shouldOverlay(snapshotPublishedAt: Date(timeIntervalSince1970: 20))) + XCTAssertEqual(merged.activeRuns.map(\.runID), ["run-690"]) + XCTAssertEqual(merged.activeRuns(for: account).map(\.runID), ["run-690"]) + } + func testNewerEmptyRunActivityClearsSnapshotRuns() throws { let account = makeAccount( status: "available", @@ -215,6 +378,7 @@ final class AccountModelTests: XCTestCase { let snapshot = try JSONDecoder().decode(OperatorSnapshotResponse.self, from: snapshotPayload) let overlay = OperatorRunActivitySnapshot( activeRuns: [], + activeRunsComplete: true, emittedAt: Date(timeIntervalSince1970: 30) ) let merged = overlay.merging(into: snapshot) diff --git a/apps/decodex/src/orchestrator/operator_dashboard.html b/apps/decodex/src/orchestrator/operator_dashboard.html index 307d9555..4d1db468 100644 --- a/apps/decodex/src/orchestrator/operator_dashboard.html +++ b/apps/decodex/src/orchestrator/operator_dashboard.html @@ -3860,6 +3860,8 @@

Run History

let dashboardSocket = null; let dashboardSocketReconnectTimer = null; let dashboardLiveActiveRuns = []; + let dashboardLiveRunActivitySeen = false; + let dashboardLiveActiveRunsComplete = true; let dashboardLiveAccounts = null; let dashboardLiveAccountControl = null; let dashboardStreamState = { @@ -10173,7 +10175,11 @@

${escapeHtml(worktree.branch_name)}

return lookup; } - function mergeDashboardActiveRuns(snapshot, activeRunRows) { + function dashboardRunShouldRetainDuringPartialActivity(run) { + return run?.active_lease === true || run?.process_alive === true || runCountsAsRunning(run); + } + + function mergeDashboardActiveRuns(snapshot, activeRunRows, activeRunsComplete = true) { const snapshotActiveRuns = snapshot.active_runs || []; const snapshotRunLookup = dashboardRunLookup(snapshotActiveRuns); const seenSnapshotRuns = new Set(); @@ -10192,7 +10198,11 @@

${escapeHtml(worktree.branch_name)}

} for (const snapshotRun of snapshotActiveRuns) { - if (!seenSnapshotRuns.has(snapshotRun)) { + if ( + !activeRunsComplete && + !seenSnapshotRuns.has(snapshotRun) && + dashboardRunShouldRetainDuringPartialActivity(snapshotRun) + ) { mergedRuns.push(snapshotRun); } } @@ -10204,7 +10214,14 @@

${escapeHtml(worktree.branch_name)}

const activeRunRows = activeRuns .filter((run) => run && typeof run === "object") .map((run) => ({ ...run })); - const mergedActiveRuns = mergeDashboardActiveRuns(snapshot, activeRunRows); + const activeRunsComplete = + activityPayload.activeRunsComplete !== false && + activityPayload.activeRunScope !== "filtered"; + const mergedActiveRuns = mergeDashboardActiveRuns( + snapshot, + activeRunRows, + activeRunsComplete, + ); const activeById = new Map( mergedActiveRuns .filter((run) => run.run_id) @@ -10276,6 +10293,7 @@

${escapeHtml(worktree.branch_name)}

} if ( + !dashboardLiveRunActivitySeen && !dashboardLiveActiveRuns.length && !dashboardLiveAccounts && !dashboardLiveAccountControl @@ -10286,6 +10304,7 @@

${escapeHtml(worktree.branch_name)}

return mergeDashboardRunActivity(snapshot, dashboardLiveActiveRuns, { accountControl: dashboardLiveAccountControl, accounts: dashboardLiveAccounts, + activeRunsComplete: dashboardLiveActiveRunsComplete, }); } @@ -10303,6 +10322,9 @@

${escapeHtml(worktree.branch_name)}

dashboardLiveActiveRuns = payload.activeRuns .filter((run) => run && typeof run === "object") .map((run) => ({ ...run })); + dashboardLiveRunActivitySeen = true; + dashboardLiveActiveRunsComplete = + payload.activeRunsComplete !== false && payload.activeRunScope !== "filtered"; dashboardLiveAccounts = Array.isArray(payload.accounts) && payload.accounts.length ? payload.accounts.filter(Boolean).map((account) => ({ ...account })) : null; @@ -10466,6 +10488,8 @@

${escapeHtml(worktree.branch_name)}

}; dashboardSocket.onclose = () => { dashboardLiveActiveRuns = []; + dashboardLiveRunActivitySeen = false; + dashboardLiveActiveRunsComplete = true; dashboardLiveAccounts = null; dashboardLiveAccountControl = null; updateDashboardStreamState({ @@ -10476,6 +10500,8 @@

${escapeHtml(worktree.branch_name)}

}; dashboardSocket.onerror = () => { dashboardLiveActiveRuns = []; + dashboardLiveRunActivitySeen = false; + dashboardLiveActiveRunsComplete = true; dashboardLiveAccounts = null; dashboardLiveAccountControl = null; updateDashboardStreamState({ diff --git a/apps/decodex/src/orchestrator/operator_http.rs b/apps/decodex/src/orchestrator/operator_http.rs index 89ab6497..e833dba2 100644 --- a/apps/decodex/src/orchestrator/operator_http.rs +++ b/apps/decodex/src/orchestrator/operator_http.rs @@ -511,6 +511,8 @@ fn build_operator_run_activity_event(state_store: &StateStore) -> Result Result