Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/decodex-app/Sources/DecodexApp/AccountStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ final class AccountStore: ObservableObject {

let activity = OperatorRunActivitySnapshot(
activeRuns: activeRuns,
activeRunsComplete: payload.activeRunsComplete ?? true,
emittedAt: payload.emittedAt ?? Date()
)
if let operatorSnapshotUpdatedAt,
Expand Down
43 changes: 39 additions & 4 deletions apps/decodex-app/Sources/DecodexApp/OperatorSnapshotModels.swift
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,30 @@ struct OperatorSnapshotResponse: Decodable, Sendable {
activeRuns(for: account).count
}

func mergingRunActivity(_ activityRuns: [OperatorRunStatus]) -> OperatorSnapshotResponse {
func mergingRunActivity(
_ activityRuns: [OperatorRunStatus],
activeRunsComplete: Bool = true
) -> OperatorSnapshotResponse {
let activityRunsByID = activityRuns.reduce(into: [String: OperatorRunStatus]()) { runsByID, run in
runsByID[run.runID] = run
}
let snapshotRunsByID = activeRuns.reduce(into: [String: OperatorRunStatus]()) { runsByID, run in
runsByID[run.runID] = run
}
let mergedRuns = activityRuns.map { activityRun in
snapshotRunsByID[activityRun.runID]?.mergingActivity(activityRun) ?? activityRun
let mergedSnapshotRuns = activeRuns.compactMap { snapshotRun -> OperatorRunStatus? in
if let activityRun = activityRunsByID[snapshotRun.runID] {
return snapshotRun.mergingActivity(activityRun)
}
if activeRunsComplete || activityRuns.isEmpty {
return nil
}

return snapshotRun.shouldRetainDuringPartialRunActivity ? snapshotRun : nil
}
let newActivityRuns = activityRuns.filter { activityRun in
snapshotRunsByID[activityRun.runID] == nil
}
let mergedRuns = mergedSnapshotRuns + newActivityRuns
let activeCountsByProject = Dictionary(grouping: mergedRuns.compactMap(\.projectID)) { $0 }
.mapValues(\.count)
let mergedProjects = projects.map { project in
Expand Down Expand Up @@ -346,6 +363,14 @@ struct OperatorRunStatus: Decodable, Identifiable, Sendable {

func isAssigned(to account: CodexAccount) -> Bool {
self.account?.matches(account) == true
|| accounts.contains { $0.isSelected && $0.matches(account) }
}

var shouldRetainDuringPartialRunActivity: Bool {
activeLease == true
|| processAlive == true
|| status == "running"
|| phase == "executing"
}

func mergingActivity(_ activity: OperatorRunStatus) -> OperatorRunStatus {
Expand Down Expand Up @@ -540,6 +565,7 @@ struct OperatorDashboardSocketPayload: Decodable, Sendable {
let snapshotPublishedAtUnixEpoch: Int64?
let snapshot: OperatorSnapshotResponse?
let activeRuns: [OperatorRunStatus]?
let activeRunsComplete: Bool?

var emittedAt: Date? {
date(fromUnixEpoch: emittedAtUnixEpoch)
Expand All @@ -554,11 +580,13 @@ struct OperatorDashboardSocketPayload: Decodable, Sendable {
case snapshotPublishedAtUnixEpoch
case snapshot
case activeRuns
case activeRunsComplete
}
}

struct OperatorRunActivitySnapshot: Sendable {
let activeRuns: [OperatorRunStatus]
let activeRunsComplete: Bool
let emittedAt: Date

func shouldOverlay(snapshotPublishedAt: Date?) -> Bool {
Expand All @@ -570,7 +598,7 @@ struct OperatorRunActivitySnapshot: Sendable {
}

func merging(into snapshot: OperatorSnapshotResponse) -> OperatorSnapshotResponse {
snapshot.mergingRunActivity(activeRuns)
snapshot.mergingRunActivity(activeRuns, activeRunsComplete: activeRunsComplete)
}
}

Expand Down Expand Up @@ -663,6 +691,11 @@ struct OperatorChildAgentBucket: Decodable, Identifiable, Sendable {
struct OperatorRunAccountSummary: Decodable, Sendable {
let accountFingerprint: String
let email: String?
let status: String?

var isSelected: Bool {
status?.caseInsensitiveCompare("selected") == .orderedSame
}

func matches(_ account: CodexAccount) -> Bool {
if accountFingerprint.isEmpty == false, accountFingerprint == account.accountFingerprint {
Expand All @@ -679,6 +712,7 @@ struct OperatorRunAccountSummary: Decodable, Sendable {
case accountFingerprint = "account_fingerprint"
case email
case accountEmail = "account_email"
case status
}

init(from decoder: Decoder) throws {
Expand All @@ -687,6 +721,7 @@ struct OperatorRunAccountSummary: Decodable, Sendable {
accountFingerprint = try container.decodeIfPresent(String.self, forKey: .accountFingerprint) ?? ""
email = try container.decodeIfPresent(String.self, forKey: .email)
?? container.decodeIfPresent(String.self, forKey: .accountEmail)
status = try container.decodeIfPresent(String.self, forKey: .status)
}
}

Expand Down
164 changes: 164 additions & 0 deletions apps/decodex-app/Tests/DecodexAppTests/AccountModelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,46 @@ final class AccountModelTests: XCTestCase {
XCTAssertTrue(snapshot.activeRuns(for: poolOnlyAccount).isEmpty)
}

func testOperatorSnapshotAssignsSelectedAccountWhenPrimaryAccountIsMissing() throws {
let assignedAccount = makeAccount(
status: "available",
email: "copy@example.com",
accountFingerprint: "...123456"
)
let poolOnlyAccount = makeAccount(
status: "available",
email: "pool@example.com",
accountFingerprint: "...654321"
)
let payload = """
{
"active_runs": [
{
"run_id": "run-1",
"issue_identifier": "XY-689",
"accounts": [
{
"email": "copy@example.com",
"account_fingerprint": "...123456",
"status": "selected"
},
{
"email": "pool@example.com",
"account_fingerprint": "...654321",
"status": "available"
}
]
}
]
}
""".data(using: .utf8)!

let snapshot = try JSONDecoder().decode(OperatorSnapshotResponse.self, from: payload)

XCTAssertEqual(snapshot.activeRuns(for: assignedAccount).map(\.runID), ["run-1"])
XCTAssertTrue(snapshot.activeRuns(for: poolOnlyAccount).isEmpty)
}

func testOperatorRunActivityOverlayDoesNotReplaceNewerSnapshot() throws {
let account = makeAccount(
status: "available",
Expand Down Expand Up @@ -185,13 +225,136 @@ final class AccountModelTests: XCTestCase {
.activeRuns ?? []
let overlay = OperatorRunActivitySnapshot(
activeRuns: activity,
activeRunsComplete: true,
emittedAt: Date(timeIntervalSince1970: 10)
)

XCTAssertFalse(overlay.shouldOverlay(snapshotPublishedAt: Date(timeIntervalSince1970: 20)))
XCTAssertEqual(snapshot.activeRuns(for: account).map(\.runID), ["run-new"])
}

func testPartialRunActivityPreservesSnapshotActiveRuns() throws {
let account = makeAccount(
status: "available",
email: "copy@example.com",
accountFingerprint: "...123456"
)
let snapshotPayload = """
{
"active_runs": [
{
"run_id": "run-689",
"issue_identifier": "XY-689",
"active_lease": true,
"account": {
"email": "copy@example.com",
"account_fingerprint": "...123456"
}
},
{
"run_id": "run-690",
"issue_identifier": "XY-690",
"active_lease": true,
"account": {
"email": "copy@example.com",
"account_fingerprint": "...123456"
}
}
]
}
""".data(using: .utf8)!
let activityPayload = """
{
"activeRunsComplete": false,
"activeRuns": [
{
"run_id": "run-690",
"issue_identifier": "XY-690",
"account": {
"email": "copy@example.com",
"account_fingerprint": "...123456"
}
}
]
}
""".data(using: .utf8)!

let snapshot = try JSONDecoder().decode(OperatorSnapshotResponse.self, from: snapshotPayload)
let event = try JSONDecoder()
.decode(OperatorDashboardSocketPayload.self, from: activityPayload)
let overlay = OperatorRunActivitySnapshot(
activeRuns: event.activeRuns ?? [],
activeRunsComplete: event.activeRunsComplete ?? true,
emittedAt: Date(timeIntervalSince1970: 30)
)
let merged = overlay.merging(into: snapshot)

XCTAssertTrue(overlay.shouldOverlay(snapshotPublishedAt: Date(timeIntervalSince1970: 20)))
XCTAssertEqual(merged.activeRuns.map(\.runID), ["run-689", "run-690"])
XCTAssertEqual(merged.activeRuns(for: account).map(\.runID), ["run-689", "run-690"])
}

func testCompleteRunActivityReplacesSnapshotActiveRuns() throws {
let account = makeAccount(
status: "available",
email: "copy@example.com",
accountFingerprint: "...123456"
)
let snapshotPayload = """
{
"active_runs": [
{
"run_id": "run-689",
"issue_identifier": "XY-689",
"active_lease": true,
"account": {
"email": "copy@example.com",
"account_fingerprint": "...123456"
}
},
{
"run_id": "run-690",
"issue_identifier": "XY-690",
"active_lease": true,
"account": {
"email": "copy@example.com",
"account_fingerprint": "...123456"
}
}
]
}
""".data(using: .utf8)!
let activityPayload = """
{
"activeRunsComplete": true,
"activeRuns": [
{
"run_id": "run-690",
"issue_identifier": "XY-690",
"account": {
"email": "copy@example.com",
"account_fingerprint": "...123456"
}
}
]
}
""".data(using: .utf8)!

let snapshot = try JSONDecoder().decode(OperatorSnapshotResponse.self, from: snapshotPayload)
let event = try JSONDecoder()
.decode(OperatorDashboardSocketPayload.self, from: activityPayload)
let overlay = OperatorRunActivitySnapshot(
activeRuns: event.activeRuns ?? [],
activeRunsComplete: event.activeRunsComplete ?? true,
emittedAt: Date(timeIntervalSince1970: 30)
)
let merged = overlay.merging(into: snapshot)

XCTAssertTrue(overlay.shouldOverlay(snapshotPublishedAt: Date(timeIntervalSince1970: 20)))
XCTAssertEqual(merged.activeRuns.map(\.runID), ["run-690"])
XCTAssertEqual(merged.activeRuns(for: account).map(\.runID), ["run-690"])
}

func testNewerEmptyRunActivityClearsSnapshotRuns() throws {
let account = makeAccount(
status: "available",
Expand All @@ -215,6 +378,7 @@ final class AccountModelTests: XCTestCase {
let snapshot = try JSONDecoder().decode(OperatorSnapshotResponse.self, from: snapshotPayload)
let overlay = OperatorRunActivitySnapshot(
activeRuns: [],
activeRunsComplete: true,
emittedAt: Date(timeIntervalSince1970: 30)
)
let merged = overlay.merging(into: snapshot)
Expand Down
Loading