Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion e2e/k8s/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,26 @@ func createStoredK8sApplyWithTask(t *testing.T, dsn string, apply *storage.Apply
task.CompletedAt = &now
}

// Dual-write the apply_operations row alongside the apply, mirroring the
// production apply-create path. Every apply owns exactly one operation
// whose state tracks the parent, so the operator's operation-claim loop can
// claim and drive this apply; CreateWithTasksAndOperations links the task to
// the operation via apply_operation_id.
operation := &storage.ApplyOperation{
Deployment: apply.Deployment,
State: apply.State,
CreatedAt: now,
UpdatedAt: now,
}
Comment thread
Kiran01bm marked this conversation as resolved.
if !state.IsState(apply.State, state.Apply.Pending) {
operation.StartedAt = &now
}
if state.IsTerminalApplyState(apply.State) && !state.IsState(apply.State, state.Apply.Stopped) {
operation.CompletedAt = &now
}

store := mysqlstore.New(db)
_, err = store.Applies().CreateWithTasks(t.Context(), apply, []*storage.Task{task})
_, err = store.Applies().CreateWithTasksAndOperations(t.Context(), apply, []*storage.Task{task}, []*storage.ApplyOperation{operation})
require.NoError(t, err)
return apply.ApplyIdentifier
}
Expand Down
14 changes: 14 additions & 0 deletions e2e/k8s/operator_fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ func markApplyHeartbeatStale(t *testing.T, dsn, applyID, storageName string) {
rowsAffected, err := result.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rowsAffected, "expected to mark one %s apply heartbeat stale", storageName)

// With operation-level claiming, a running apply is reclaimed via
// FindNextApplyOperation's stale-heartbeat clause, which keys off
// apply_operations.updated_at (not applies.updated_at). Age the apply's
// operation rows too so the operator can re-lease without waiting out the
// production staleness window. Operation rows are optional here (e.g. the
// data-plane apply has none), so we don't assert a row count.
_, err = db.ExecContext(t.Context(),
`UPDATE apply_operations ao
JOIN applies a ON ao.apply_id = a.id
SET ao.updated_at = NOW() - INTERVAL 2 MINUTE
WHERE a.apply_identifier = ?`,
applyID)
require.NoError(t, err)
}

func markDataPlaneHeartbeatStale(t *testing.T, applyID string) {
Expand Down
Loading