From 4677be8c27b131e53a45ba93f015f2ea6b0b1b22 Mon Sep 17 00:00:00 2001 From: Kiran Muddukrishna Date: Fri, 19 Jun 2026 13:15:20 +1000 Subject: [PATCH 1/2] test(k8s): seed apply_operations in the stored-apply fixture The remote-failure E2E fixture created an apply with tasks but no apply_operations row, so the operation-claim operator could not claim it once operation-level claiming becomes the default. Dual-write one operation per apply, mirroring the production apply-create path. --- e2e/k8s/k8s_test.go | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/e2e/k8s/k8s_test.go b/e2e/k8s/k8s_test.go index 20f74f8e..426d6a91 100644 --- a/e2e/k8s/k8s_test.go +++ b/e2e/k8s/k8s_test.go @@ -418,8 +418,26 @@ func createStoredK8sApplyWithTask(t *testing.T, dsn string, apply *storage.Apply task.CompletedAt = &now } + // Dual-write the apply_operations row alongside the apply, mirroring the + // production apply-create path. Every apply owns exactly one operation + // whose state tracks the parent, so the operator's operation-claim loop can + // claim and drive this apply; CreateWithTasksAndOperations links the task to + // the operation via apply_operation_id. + operation := &storage.ApplyOperation{ + Deployment: apply.Deployment, + State: apply.State, + CreatedAt: now, + UpdatedAt: now, + } + if !state.IsState(apply.State, state.Apply.Pending) { + operation.StartedAt = &now + } + if state.IsTerminalApplyState(apply.State) && !state.IsState(apply.State, state.Apply.Stopped) { + operation.CompletedAt = &now + } + store := mysqlstore.New(db) - _, err = store.Applies().CreateWithTasks(t.Context(), apply, []*storage.Task{task}) + _, err = store.Applies().CreateWithTasksAndOperations(t.Context(), apply, []*storage.Task{task}, []*storage.ApplyOperation{operation}) require.NoError(t, err) return apply.ApplyIdentifier } From 715a1799efcbb9a3c19111885b7b580c971a102d Mon Sep 17 00:00:00 2001 From: Kiran Muddukrishna Date: Fri, 19 Jun 2026 13:28:53 +1000 Subject: [PATCH 2/2] test(k8s): age apply_operations rows in markApplyHeartbeatStale Operation-level claiming reclaims a running apply via FindNextApplyOperation's stale-heartbeat clause, which keys off apply_operations.updated_at. The helper only aged applies.updated_at, so the operator had nothing claimable for ~1m and the stale-recovery E2E polls could stall. --- e2e/k8s/operator_fixtures_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/e2e/k8s/operator_fixtures_test.go b/e2e/k8s/operator_fixtures_test.go index ed8e8f41..65752bc1 100644 --- a/e2e/k8s/operator_fixtures_test.go +++ b/e2e/k8s/operator_fixtures_test.go @@ -55,6 +55,20 @@ func markApplyHeartbeatStale(t *testing.T, dsn, applyID, storageName string) { rowsAffected, err := result.RowsAffected() require.NoError(t, err) require.Equal(t, int64(1), rowsAffected, "expected to mark one %s apply heartbeat stale", storageName) + + // With operation-level claiming, a running apply is reclaimed via + // FindNextApplyOperation's stale-heartbeat clause, which keys off + // apply_operations.updated_at (not applies.updated_at). Age the apply's + // operation rows too so the operator can re-lease without waiting out the + // production staleness window. Operation rows are optional here (e.g. the + // data-plane apply has none), so we don't assert a row count. + _, err = db.ExecContext(t.Context(), + `UPDATE apply_operations ao + JOIN applies a ON ao.apply_id = a.id + SET ao.updated_at = NOW() - INTERVAL 2 MINUTE + WHERE a.apply_identifier = ?`, + applyID) + require.NoError(t, err) } func markDataPlaneHeartbeatStale(t *testing.T, applyID string) {