diff --git a/e2e/k8s/k8s_test.go b/e2e/k8s/k8s_test.go index 20f74f8e..426d6a91 100644 --- a/e2e/k8s/k8s_test.go +++ b/e2e/k8s/k8s_test.go @@ -418,8 +418,26 @@ func createStoredK8sApplyWithTask(t *testing.T, dsn string, apply *storage.Apply task.CompletedAt = &now } + // Dual-write the apply_operations row alongside the apply, mirroring the + // production apply-create path. Every apply owns exactly one operation + // whose state tracks the parent, so the operator's operation-claim loop can + // claim and drive this apply; CreateWithTasksAndOperations links the task to + // the operation via apply_operation_id. + operation := &storage.ApplyOperation{ + Deployment: apply.Deployment, + State: apply.State, + CreatedAt: now, + UpdatedAt: now, + } + if !state.IsState(apply.State, state.Apply.Pending) { + operation.StartedAt = &now + } + if state.IsTerminalApplyState(apply.State) && !state.IsState(apply.State, state.Apply.Stopped) { + operation.CompletedAt = &now + } + store := mysqlstore.New(db) - _, err = store.Applies().CreateWithTasks(t.Context(), apply, []*storage.Task{task}) + _, err = store.Applies().CreateWithTasksAndOperations(t.Context(), apply, []*storage.Task{task}, []*storage.ApplyOperation{operation}) require.NoError(t, err) return apply.ApplyIdentifier } diff --git a/e2e/k8s/operator_fixtures_test.go b/e2e/k8s/operator_fixtures_test.go index ed8e8f41..65752bc1 100644 --- a/e2e/k8s/operator_fixtures_test.go +++ b/e2e/k8s/operator_fixtures_test.go @@ -55,6 +55,20 @@ func markApplyHeartbeatStale(t *testing.T, dsn, applyID, storageName string) { rowsAffected, err := result.RowsAffected() require.NoError(t, err) require.Equal(t, int64(1), rowsAffected, "expected to mark one %s apply heartbeat stale", storageName) + + // With operation-level claiming, a running apply is reclaimed via + // FindNextApplyOperation's stale-heartbeat clause, which keys off + // apply_operations.updated_at (not applies.updated_at). Age the apply's + // operation rows too so the operator can re-lease without waiting out the + // production staleness window. Operation rows are optional here (e.g. the + // data-plane apply has none), so we don't assert a row count. + _, err = db.ExecContext(t.Context(), + `UPDATE apply_operations ao + JOIN applies a ON ao.apply_id = a.id + SET ao.updated_at = NOW() - INTERVAL 2 MINUTE + WHERE a.apply_identifier = ?`, + applyID) + require.NoError(t, err) } func markDataPlaneHeartbeatStale(t *testing.T, applyID string) {