Skip to content

refactor: migrate jobs to db#830

Merged
adityachoudhari26 merged 3 commits intomainfrom
migrate-jobs-db
Mar 4, 2026
Merged

refactor: migrate jobs to db#830
adityachoudhari26 merged 3 commits intomainfrom
migrate-jobs-db

Conversation

@adityachoudhari26
Copy link
Member

@adityachoudhari26 adityachoudhari26 commented Mar 3, 2026

Summary by CodeRabbit

  • New Features

    • Jobs can be listed by release identifier.
    • Database-backed job storage enabled during startup.
  • Improvements

    • Dispatch context is now persisted and returned with job records.
    • Job creation/upsert flows include dispatch context propagation.
    • Agent identifier handling improved for DB interactions.
    • Added legacy-jobs migration and expanded debug/error logging across services.
  • Tests

    • Many tests moved to event-driven job updates; nil-DispatchContext assertions relaxed and several workflow tests disabled.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 3, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Persist and propagate job DispatchContext (as []byte), convert job Agent IDs to pgtype.UUID in DB/models/queries, add ListJobsByReleaseID/GetByReleaseID, wire DispatchContext through mappers/repos, convert many tests to emit JobUpdateEvent updates, and relax/disable some DispatchContext assertions.

Changes

Cohort / File(s) Summary
DB SQL & Mapping
apps/workspace-engine/pkg/db/queries/jobs.sql, apps/workspace-engine/pkg/db/sqlc.yaml, apps/workspace-engine/pkg/db/models.go
Add dispatch_context to SELECT/INSERT/UPSERT and map job.job_agent_idpgtype.UUID; model types updated.
Generated Queries (Go)
apps/workspace-engine/pkg/db/jobs.sql.go
Generated query types now include DispatchContext []byte; row/param types switch JobAgentID to pgtype.UUID; Upsert/Insert include dispatch_context.
DB Mapper & Repo
apps/workspace-engine/pkg/workspace/store/repository/db/jobs/mapper.go, .../repo.go, .../interfaces.go, .../memory/repo.go
Propagate DispatchContext through row→domain→API conversions; convert JobAgentID handling to pgtype.UUID; add GetByReleaseID/ListJobsByReleaseID plumbing.
Store / Service Integration
apps/workspace-engine/pkg/workspace/jobs/factory.go, .../store/jobs.go, .../store/releases.go, .../store/store.go, apps/workspace-engine/svc/controllers/jobdispatch/setters_postgres.go, apps/workspace-engine/svc/workspaceconsumer/consumer.go, apps/workspace-engine/test/integration/dbtest.go
Wrap JobAgentID as pgtype.UUID for DB inserts; capture single timestamp in factories; add DBJobs store option; stronger repo error logging/handling; use specialized GetByReleaseID.
E2E Tests — Event-driven updates
apps/workspace-engine/test/e2e/*_test.go (many files)
Replace direct in-memory job mutations with emitting oapi.JobUpdateEvent via engine.PushEvent (Id, Job payload, FieldsToUpdate) across numerous tests.
E2E Tests — Assertions & Disabled suites
apps/workspace-engine/test/e2e/engine_deployment_version_test.go, .../engine_environment_selector_update_test.go, .../engine_job_agent_retrigger_test.go, .../engine_jobs_test.go, .../engine_dispatch_context_immutability_test.go, .../engine_workflow_test.go
Remove/assertion relaxations that required nil DispatchContext; comment out/disable one dispatch-context immutability test and the workflow test suite.
Misc / CI & Instrumentation
.github/workflows/apps-workspace-engine.yaml, apps/workspace-engine/pkg/.../releasemanager/..., apps/workspace-engine/test/e2e/engine_policy_environment_progression_test.go
Adjust CI test filter; add extensive debug logging and small timing sleeps in various release/manager/planner/evaluator modules and tests for diagnostics.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Service
    participant Repo
    participant DB

    Client->>Service: Create/Upsert Job (includes DispatchContext, JobAgentID)
    Service->>Repo: ToUpsertParams(arg with DispatchContext, JobAgentID as pgtype.UUID)
    Repo->>DB: UpsertJob SQL (INSERT ... dispatch_context ... ON CONFLICT UPDATE dispatch_context)
    DB-->>Repo: rows OK
    Repo-->>Service: success

    Client->>Service: ListJobsByReleaseID(releaseID)
    Service->>Repo: ListJobsByReleaseID(releaseID)
    Repo->>DB: SELECT ... j.dispatch_context ... WHERE release_id = $1
    DB-->>Repo: rows with dispatch_context
    Repo->>Service: mapped jobs (DispatchContext included)
    Service->>Client: jobs list
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Poem

🐇
I hopped through rows and columns wide,
Bytes of context snug inside.
pgtype UUID held tight in place,
Events now guide each job's slow race.
Repos hum softly — hop, hooray! 🎉

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 69.09% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'refactor: migrate jobs to db' directly and concisely summarizes the primary change: migrating jobs functionality to database storage. It is specific, clear, and reflects the core objective evident across the changeset.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch migrate-jobs-db

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (7)
apps/workspace-engine/test/e2e/engine_deployment_version_jobs_list_test.go (2)

202-216: Remove commented-out code.

The old commented lines should be removed to keep the test clean.

🧹 Suggested cleanup
 		for _, job := range jobs {
 			switch resource.Name {
 			case "z-server":
-				// job.Status = oapi.JobStatusFailure // Should come first despite "z" name
-				// job.CreatedAt = baseTime.Add(time.Duration(timestampIndex) * time.Millisecond)
 				createdAt := baseTime.Add(time.Duration(timestampIndex) * time.Millisecond)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/test/e2e/engine_deployment_version_jobs_list_test.go`
around lines 202 - 216, Remove the two commented-out lines in the test block
that set job.Status and job.CreatedAt; instead keep the active createdAt
variable and the engine.PushEvent call that constructs the oapi.JobUpdateEvent
(references: createdAt, engine.PushEvent, handler.JobUpdate,
oapi.JobUpdateEvent, oapi.JobUpdateEventFieldsToUpdate) so the test remains
clean and only uses the live setup rather than commented legacy code.

205-216: Inconsistent pointer vs value type for JobUpdateEvent.

This test uses &oapi.JobUpdateEvent{...} (pointer) while other test files in this PR use oapi.JobUpdateEvent{...} (value). Consider using the value type for consistency with the rest of the codebase.

♻️ Suggested fix for consistency
-				engine.PushEvent(ctx, handler.JobUpdate, &oapi.JobUpdateEvent{
+				engine.PushEvent(ctx, handler.JobUpdate, oapi.JobUpdateEvent{

Apply similarly to lines 219, 233.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/test/e2e/engine_deployment_version_jobs_list_test.go`
around lines 205 - 216, The test is creating a pointer to oapi.JobUpdateEvent
but other tests use a value; change the PushEvent call to pass a value
oapi.JobUpdateEvent (not &oapi.JobUpdateEvent) so engine.PushEvent(ctx,
handler.JobUpdate, oapi.JobUpdateEvent{...}) is used, and apply the same change
for the other occurrences (lines indicated in the review) so all uses of
oapi.JobUpdateEvent are consistent; keep the inner fields (Id, Job,
FieldsToUpdate, etc.) unchanged.
apps/workspace-engine/test/e2e/engine_redeploy_test.go (2)

875-887: Setting CompletedAt on an in-progress job is semantically inconsistent.

An in_progress job shouldn't have a CompletedAt timestamp since it hasn't completed yet. This was likely carried over from the template pattern. Consider removing CompletedAt for in-progress status updates.

🔧 Suggested fix
 	// Mark job as in-progress (not completed)
-	now := time.Now()
 	engine.PushEvent(ctx, handler.JobUpdate, oapi.JobUpdateEvent{
 		Id: &initialJob.Id,
 		Job: oapi.Job{
 			Id:          initialJob.Id,
 			Status:      oapi.JobStatusInProgress,
-			CompletedAt: &now,
 		},
 		FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{
 			oapi.JobUpdateEventFieldsToUpdateStatus,
-			oapi.JobUpdateEventFieldsToUpdateCompletedAt,
 		},
 	})
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/test/e2e/engine_redeploy_test.go` around lines 875 -
887, The test is setting CompletedAt while marking the job as InProgress; remove
the CompletedAt field and its mention from FieldsToUpdate in the
engine.PushEvent call so the oapi.JobUpdateEvent payload only sets
Status=oapi.JobStatusInProgress and does not include CompletedAt; update the
FieldsToUpdate slice passed to engine.PushEvent (in the oapi.JobUpdateEvent) to
remove oapi.JobUpdateEventFieldsToUpdateCompletedAt so the event is semantically
consistent.

1019-1031: Same issue: CompletedAt on action_required status.

Jobs in action_required state are still active and awaiting user action—they haven't completed. Consider removing CompletedAt from this update.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/test/e2e/engine_redeploy_test.go` around lines 1019 -
1031, The test is setting CompletedAt while transitioning a job to
oapi.JobStatusActionRequired which is incorrect because action_required jobs are
still active; update the call to engine.PushEvent (handler.JobUpdate with
oapi.JobUpdateEvent) to omit the CompletedAt field on the oapi.Job (or set it to
nil) and remove oapi.JobUpdateEventFieldsToUpdateCompletedAt from the
FieldsToUpdate slice so only the Status is updated for JobStatusActionRequired.
apps/workspace-engine/test/e2e/engine_variable_change_evaluation_test.go (2)

79-80: Remove commented-out legacy mutation lines.

Keeping dead test code here adds noise and makes intent harder to read.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/test/e2e/engine_variable_change_evaluation_test.go`
around lines 79 - 80, Remove the two commented-out legacy mutation lines that
set initialJob.Status and initialJob.CompletedAt in the test (the lines "//
initialJob.Status = oapi.JobStatusSuccessful" and "// initialJob.CompletedAt =
&now"); simply delete these dead commented lines from
engine_variable_change_evaluation_test.go so the test code is cleaner and intent
is clearer.

81-92: Extract a helper for repeated successful-job update events.

The same payload construction repeats many times in this file; a helper will reduce copy/paste drift.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/test/e2e/engine_variable_change_evaluation_test.go`
around lines 81 - 92, Extract a helper to construct and send the repeated
"successful job" update payload used with engine.PushEvent(handler.JobUpdate,
...). Create a function (e.g., sendSuccessfulJobUpdate or
pushSuccessfulJobUpdate) that accepts ctx, engine, jobId (or initialJob) and
completedAt (or now) and builds the oapi.JobUpdateEvent with Id, Job{Id, Status:
oapi.JobStatusSuccessful, CompletedAt}, and FieldsToUpdate including
oapi.JobUpdateEventFieldsToUpdateStatus and
oapi.JobUpdateEventFieldsToUpdateCompletedAt, then calls engine.PushEvent(ctx,
handler.JobUpdate, ...); replace the duplicated inline payloads with calls to
this helper to avoid repetition and drift.
apps/workspace-engine/pkg/workspace/store/repository/db/jobs/mapper.go (1)

76-100: Guard status map lookups to avoid silent empty-status writes.

Direct map indexing returns zero values on misses; use checked lookups with fallback/error.

♻️ Suggested hardening
+func toOapiStatus(s db.JobStatus) oapi.JobStatus {
+	if mapped, ok := dbToOapiStatus[s]; ok {
+		return mapped
+	}
+	return oapi.JobStatus(s)
+}
+
+func toDBStatus(s oapi.JobStatus) (db.JobStatus, error) {
+	if mapped, ok := oapiToDBStatus[s]; ok {
+		return mapped, nil
+	}
+	return "", fmt.Errorf("unsupported job status: %q", s)
+}
...
-		Status:          dbToOapiStatus[row.Status],
+		Status:          toOapiStatus(row.Status),
...
-	params := db.UpsertJobParams{
+	status, err := toDBStatus(j.Status)
+	if err != nil {
+		return db.UpsertJobParams{}, err
+	}
+	params := db.UpsertJobParams{
...
-		Status:          oapiToDBStatus[j.Status],
+		Status:          status,

Also applies to: 131-136, 190-195

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/store/repository/db/jobs/mapper.go`
around lines 76 - 100, The status maps dbToOapiStatus and oapiToDBStatus are
being accessed directly which returns zero values on misses; update every lookup
that indexes these maps (e.g., the functions that convert between db.JobStatus
and oapi.JobStatus in mapper.go) to use the comma-ok form, validate the key
exists, and return an explicit error or a safe default instead of writing an
empty/zero status; do the same for the other lookup sites noted (around lines
you flagged) so any unknown status is handled deterministically and surfaces an
error rather than silently producing an empty value.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/workspace-engine/test/e2e/engine_dispatch_context_immutability_test.go`:
- Around line 387-456: Uncomment and restore the disabled test
TestEngine_DispatchContextImmutability_WorkflowJobUpdate so the workflow
immutability assertions run again; locate the commented block containing
TestEngine_DispatchContextImmutability_WorkflowJobUpdate and re-enable the
entire test function (preserving its original assertions that verify
DispatchContext.WorkflowRun.Inputs and WorkflowJob.Id remain unchanged after a
second WorkflowRunCreate), ensure imports and helper vars (engine, ctx,
require/assert) remain valid so it compiles, and run the test suite to confirm
it passes.

In `@apps/workspace-engine/test/e2e/engine_policy_version_cooldown_test.go`:
- Around line 719-731: The test incorrectly sets CompletedAt when calling
engine.PushEvent with a JobUpdate event for an in-progress job; remove the
CompletedAt assignment and remove oapi.JobUpdateEventFieldsToUpdateCompletedAt
from the FieldsToUpdate slice in the engine.PushEvent call so the event only
updates Status to oapi.JobStatusInProgress (leaving the job in progress) —
locate the engine.PushEvent(..., handler.JobUpdate, oapi.JobUpdateEvent{...})
invocation and adjust the oapi.Job payload and FieldsToUpdate accordingly.

In `@apps/workspace-engine/test/e2e/engine_workflow_test.go`:
- Around line 3-526: Un-comment and restore the e2e tests in this file so the
TestEngine_Workflow_* functions (e.g., TestEngine_Workflow_BasicFlow,
TestEngine_Workflow_MultipleInputs, TestEngine_Workflow_MultipleJobsConcurrent,
TestEngine_Workflow_ConcurrentWorkflows,
TestEngine_Workflow_DeleteTemplateCascade) and their imports are active again:
remove the block comment markers around the import block and all
TestEngine_Workflow_* test functions, ensure the import list (context, testing,
time, handler, oapi, workflowmanager, integration, uuid, assert) is present and
compiles, and run the tests locally (go test) to fix any import or compilation
issues introduced by uncommenting.

---

Nitpick comments:
In `@apps/workspace-engine/pkg/workspace/store/repository/db/jobs/mapper.go`:
- Around line 76-100: The status maps dbToOapiStatus and oapiToDBStatus are
being accessed directly which returns zero values on misses; update every lookup
that indexes these maps (e.g., the functions that convert between db.JobStatus
and oapi.JobStatus in mapper.go) to use the comma-ok form, validate the key
exists, and return an explicit error or a safe default instead of writing an
empty/zero status; do the same for the other lookup sites noted (around lines
you flagged) so any unknown status is handled deterministically and surfaces an
error rather than silently producing an empty value.

In `@apps/workspace-engine/test/e2e/engine_deployment_version_jobs_list_test.go`:
- Around line 202-216: Remove the two commented-out lines in the test block that
set job.Status and job.CreatedAt; instead keep the active createdAt variable and
the engine.PushEvent call that constructs the oapi.JobUpdateEvent (references:
createdAt, engine.PushEvent, handler.JobUpdate, oapi.JobUpdateEvent,
oapi.JobUpdateEventFieldsToUpdate) so the test remains clean and only uses the
live setup rather than commented legacy code.
- Around line 205-216: The test is creating a pointer to oapi.JobUpdateEvent but
other tests use a value; change the PushEvent call to pass a value
oapi.JobUpdateEvent (not &oapi.JobUpdateEvent) so engine.PushEvent(ctx,
handler.JobUpdate, oapi.JobUpdateEvent{...}) is used, and apply the same change
for the other occurrences (lines indicated in the review) so all uses of
oapi.JobUpdateEvent are consistent; keep the inner fields (Id, Job,
FieldsToUpdate, etc.) unchanged.

In `@apps/workspace-engine/test/e2e/engine_redeploy_test.go`:
- Around line 875-887: The test is setting CompletedAt while marking the job as
InProgress; remove the CompletedAt field and its mention from FieldsToUpdate in
the engine.PushEvent call so the oapi.JobUpdateEvent payload only sets
Status=oapi.JobStatusInProgress and does not include CompletedAt; update the
FieldsToUpdate slice passed to engine.PushEvent (in the oapi.JobUpdateEvent) to
remove oapi.JobUpdateEventFieldsToUpdateCompletedAt so the event is semantically
consistent.
- Around line 1019-1031: The test is setting CompletedAt while transitioning a
job to oapi.JobStatusActionRequired which is incorrect because action_required
jobs are still active; update the call to engine.PushEvent (handler.JobUpdate
with oapi.JobUpdateEvent) to omit the CompletedAt field on the oapi.Job (or set
it to nil) and remove oapi.JobUpdateEventFieldsToUpdateCompletedAt from the
FieldsToUpdate slice so only the Status is updated for JobStatusActionRequired.

In `@apps/workspace-engine/test/e2e/engine_variable_change_evaluation_test.go`:
- Around line 79-80: Remove the two commented-out legacy mutation lines that set
initialJob.Status and initialJob.CompletedAt in the test (the lines "//
initialJob.Status = oapi.JobStatusSuccessful" and "// initialJob.CompletedAt =
&now"); simply delete these dead commented lines from
engine_variable_change_evaluation_test.go so the test code is cleaner and intent
is clearer.
- Around line 81-92: Extract a helper to construct and send the repeated
"successful job" update payload used with engine.PushEvent(handler.JobUpdate,
...). Create a function (e.g., sendSuccessfulJobUpdate or
pushSuccessfulJobUpdate) that accepts ctx, engine, jobId (or initialJob) and
completedAt (or now) and builds the oapi.JobUpdateEvent with Id, Job{Id, Status:
oapi.JobStatusSuccessful, CompletedAt}, and FieldsToUpdate including
oapi.JobUpdateEventFieldsToUpdateStatus and
oapi.JobUpdateEventFieldsToUpdateCompletedAt, then calls engine.PushEvent(ctx,
handler.JobUpdate, ...); replace the duplicated inline payloads with calls to
this helper to avoid repetition and drift.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 0d9745b and 0771558.

📒 Files selected for processing (29)
  • apps/workspace-engine/pkg/db/jobs.sql.go
  • apps/workspace-engine/pkg/db/models.go
  • apps/workspace-engine/pkg/db/queries/jobs.sql
  • apps/workspace-engine/pkg/db/sqlc.yaml
  • apps/workspace-engine/pkg/workspace/jobs/factory.go
  • apps/workspace-engine/pkg/workspace/store/jobs.go
  • apps/workspace-engine/pkg/workspace/store/releases.go
  • apps/workspace-engine/pkg/workspace/store/repository/db/jobs/mapper.go
  • apps/workspace-engine/pkg/workspace/store/repository/db/jobs/repo.go
  • apps/workspace-engine/pkg/workspace/store/repository/interfaces.go
  • apps/workspace-engine/pkg/workspace/store/repository/memory/repo.go
  • apps/workspace-engine/pkg/workspace/store/store.go
  • apps/workspace-engine/svc/controllers/jobdispatch/setters_postgres.go
  • apps/workspace-engine/svc/workspaceconsumer/consumer.go
  • apps/workspace-engine/test/e2e/engine_deployment_variable_deletion_test.go
  • apps/workspace-engine/test/e2e/engine_deployment_version_jobs_list_test.go
  • apps/workspace-engine/test/e2e/engine_deployment_version_test.go
  • apps/workspace-engine/test/e2e/engine_dispatch_context_immutability_test.go
  • apps/workspace-engine/test/e2e/engine_environment_progression_soak_test.go
  • apps/workspace-engine/test/e2e/engine_environment_selector_update_test.go
  • apps/workspace-engine/test/e2e/engine_job_agent_retrigger_test.go
  • apps/workspace-engine/test/e2e/engine_jobs_test.go
  • apps/workspace-engine/test/e2e/engine_policy_version_cooldown_test.go
  • apps/workspace-engine/test/e2e/engine_redeploy_test.go
  • apps/workspace-engine/test/e2e/engine_retry_policy_test.go
  • apps/workspace-engine/test/e2e/engine_tick_recompute_test.go
  • apps/workspace-engine/test/e2e/engine_variable_change_evaluation_test.go
  • apps/workspace-engine/test/e2e/engine_workflow_test.go
  • apps/workspace-engine/test/integration/dbtest.go
💤 Files with no reviewable changes (4)
  • apps/workspace-engine/test/e2e/engine_deployment_version_test.go
  • apps/workspace-engine/test/e2e/engine_environment_selector_update_test.go
  • apps/workspace-engine/test/e2e/engine_jobs_test.go
  • apps/workspace-engine/test/e2e/engine_job_agent_retrigger_test.go

Comment on lines +387 to +456
// func TestEngine_DispatchContextImmutability_WorkflowJobUpdate(t *testing.T) {
// jobAgentID := uuid.New().String()
// workflowID := uuid.New().String()

// engine := integration.NewTestWorkspace(t,
// integration.WithJobAgent(
// integration.JobAgentID(jobAgentID),
// integration.JobAgentName("Workflow Agent"),
// ),
// integration.WithWorkflow(
// integration.WorkflowID(workflowID),
// integration.WithWorkflowJobTemplate(
// integration.WorkflowJobTemplateJobAgentID(jobAgentID),
// integration.WorkflowJobTemplateJobAgentConfig(map[string]any{
// "timeout": 60,
// }),
// integration.WorkflowJobTemplateName("deploy-step"),
// ),
// ),
// )

// ctx := context.Background()

// engine.PushEvent(ctx, handler.WorkflowRunCreate, &oapi.WorkflowRun{
// WorkflowId: workflowID,
// Inputs: map[string]any{"env": "prod"},
// })

// workflowRuns := engine.Workspace().WorkflowRuns().GetByWorkflowId(workflowID)
// require.Len(t, workflowRuns, 1)

// var workflowRun *oapi.WorkflowRun
// for _, wr := range workflowRuns {
// workflowRun = wr
// break
// }

// workflowJobs := engine.Workspace().WorkflowJobs().GetByWorkflowRunId(workflowRun.Id)
// require.Len(t, workflowJobs, 1)

// jobs := engine.Workspace().Jobs().GetByWorkflowJobId(workflowJobs[0].Id)
// require.Len(t, jobs, 1)
// job := jobs[0]

// require.NotNil(t, job.DispatchContext)
// assert.NotNil(t, job.DispatchContext.Workflow)
// assert.Equal(t, workflowID, job.DispatchContext.Workflow.Id)
// assert.NotNil(t, job.DispatchContext.WorkflowRun)
// assert.Equal(t, workflowRun.Id, job.DispatchContext.WorkflowRun.Id)
// assert.NotNil(t, job.DispatchContext.WorkflowJob)
// originalWorkflowJobId := job.DispatchContext.WorkflowJob.Id
// originalRunInputs := job.DispatchContext.WorkflowRun.Inputs

// assert.Equal(t, map[string]any{"env": "prod"}, originalRunInputs)

// // Create a second workflow run (which adds more data to the store)
// engine.PushEvent(ctx, handler.WorkflowRunCreate, &oapi.WorkflowRun{
// WorkflowId: workflowID,
// Inputs: map[string]any{"env": "staging"},
// })

// // Re-fetch original job and verify its DispatchContext is unchanged
// jobAfter, ok := engine.Workspace().Jobs().Get(job.Id)
// require.True(t, ok)
// require.NotNil(t, jobAfter.DispatchContext)
// assert.Equal(t, workflowID, jobAfter.DispatchContext.Workflow.Id)
// assert.Equal(t, originalWorkflowJobId, jobAfter.DispatchContext.WorkflowJob.Id)
// assert.Equal(t, map[string]any{"env": "prod"}, jobAfter.DispatchContext.WorkflowRun.Inputs,
// "DispatchContext.WorkflowRun.Inputs should retain original values")
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not disable TestEngine_DispatchContextImmutability_WorkflowJobUpdate by commenting it out.

This removes the workflow-specific immutability assertion path and weakens regression detection for DispatchContext behavior.

As per coding guidelines, apps/workspace-engine/**/*_test.go: “Follow the existing test structure used in *_test.go files”.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/test/e2e/engine_dispatch_context_immutability_test.go`
around lines 387 - 456, Uncomment and restore the disabled test
TestEngine_DispatchContextImmutability_WorkflowJobUpdate so the workflow
immutability assertions run again; locate the commented block containing
TestEngine_DispatchContextImmutability_WorkflowJobUpdate and re-enable the
entire test function (preserving its original assertions that verify
DispatchContext.WorkflowRun.Inputs and WorkflowJob.Id remain unchanged after a
second WorkflowRunCreate), ensure imports and helper vars (engine, ctx,
require/assert) remain valid so it compiles, and run the test suite to confirm
it passes.

Comment on lines +719 to +731
completedAt := time.Now()
engine.PushEvent(ctx, handler.JobUpdate, oapi.JobUpdateEvent{
Id: &firstJob.Id,
Job: oapi.Job{
Id: firstJob.Id,
Status: oapi.JobStatusInProgress,
CompletedAt: &completedAt,
},
FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{
oapi.JobUpdateEventFieldsToUpdateStatus,
oapi.JobUpdateEventFieldsToUpdateCompletedAt,
},
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not set CompletedAt for an in_progress job update.

This state combination conflicts with the scenario intent (“leave it in progress”) and can accidentally satisfy completion-based checks.

💡 Suggested correction
-	completedAt := time.Now()
 	engine.PushEvent(ctx, handler.JobUpdate, oapi.JobUpdateEvent{
 		Id: &firstJob.Id,
 		Job: oapi.Job{
-			Id:          firstJob.Id,
-			Status:      oapi.JobStatusInProgress,
-			CompletedAt: &completedAt,
+			Id:     firstJob.Id,
+			Status: oapi.JobStatusInProgress,
 		},
 		FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{
 			oapi.JobUpdateEventFieldsToUpdateStatus,
-			oapi.JobUpdateEventFieldsToUpdateCompletedAt,
 		},
 	})
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
completedAt := time.Now()
engine.PushEvent(ctx, handler.JobUpdate, oapi.JobUpdateEvent{
Id: &firstJob.Id,
Job: oapi.Job{
Id: firstJob.Id,
Status: oapi.JobStatusInProgress,
CompletedAt: &completedAt,
},
FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{
oapi.JobUpdateEventFieldsToUpdateStatus,
oapi.JobUpdateEventFieldsToUpdateCompletedAt,
},
})
engine.PushEvent(ctx, handler.JobUpdate, oapi.JobUpdateEvent{
Id: &firstJob.Id,
Job: oapi.Job{
Id: firstJob.Id,
Status: oapi.JobStatusInProgress,
},
FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{
oapi.JobUpdateEventFieldsToUpdateStatus,
},
})
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/test/e2e/engine_policy_version_cooldown_test.go` around
lines 719 - 731, The test incorrectly sets CompletedAt when calling
engine.PushEvent with a JobUpdate event for an in-progress job; remove the
CompletedAt assignment and remove oapi.JobUpdateEventFieldsToUpdateCompletedAt
from the FieldsToUpdate slice in the engine.PushEvent call so the event only
updates Status to oapi.JobStatusInProgress (leaving the job in progress) —
locate the engine.PushEvent(..., handler.JobUpdate, oapi.JobUpdateEvent{...})
invocation and adjust the oapi.Job payload and FieldsToUpdate accordingly.

Comment on lines +3 to +526
// import (
// "context"
// "testing"
// "time"
// "workspace-engine/pkg/events/handler"
// "workspace-engine/pkg/oapi"
// "workspace-engine/pkg/workspace/workflowmanager"
// "workspace-engine/test/integration"

// "github.com/google/uuid"
// "github.com/stretchr/testify/assert"
// )

// func TestEngine_Workflow_BasicFlow(t *testing.T) {
// jobAgentID := uuid.New().String()
// workflowID := uuid.New().String()
// workflowJobTemplateID := uuid.New().String()

// engine := integration.NewTestWorkspace(t,
// integration.WithJobAgent(
// integration.JobAgentID(jobAgentID),
// integration.JobAgentName("Test Agent"),
// ),
// integration.WithWorkflow(
// integration.WorkflowID(workflowID),
// integration.WithWorkflowStringInput(
// integration.WorkflowStringInputKey("input-1"),
// integration.WorkflowStringInputDefault("default-1"),
// ),
// integration.WithWorkflowJobTemplate(
// integration.WorkflowJobTemplateID(workflowJobTemplateID),
// integration.WorkflowJobTemplateJobAgentID(jobAgentID),
// integration.WorkflowJobTemplateJobAgentConfig(map[string]any{
// "delaySeconds": 10,
// }),
// integration.WorkflowJobTemplateName("Test Job 1"),
// ),
// ),
// )

// ctx := context.Background()

// workflowRunCreate := &oapi.WorkflowRun{
// WorkflowId: workflowID,
// Inputs: map[string]any{
// "input-1": "custom-1",
// },
// }
// engine.PushEvent(ctx, handler.WorkflowRunCreate, workflowRunCreate)

// workflowRuns := engine.Workspace().WorkflowRuns().GetByWorkflowId(workflowID)
// assert.Len(t, workflowRuns, 1)
// workflowRunsSlice := make([]*oapi.WorkflowRun, 0)
// for _, workflowRun := range workflowRuns {
// workflowRunsSlice = append(workflowRunsSlice, workflowRun)
// }
// workflowRun := workflowRunsSlice[0]
// assert.NotNil(t, workflowRun)
// assert.Equal(t, workflowID, workflowRun.WorkflowId)
// assert.Equal(t, map[string]any{
// "input-1": "custom-1",
// }, workflowRun.Inputs)

// workflowJobs := engine.Workspace().WorkflowJobs().GetByWorkflowRunId(workflowRun.Id)
// assert.Len(t, workflowJobs, 1)
// assert.Equal(t, 0, workflowJobs[0].Index)

// jobs := engine.Workspace().Jobs().GetByWorkflowJobId(workflowJobs[0].Id)
// assert.Len(t, jobs, 1)
// assert.Equal(t, oapi.JobStatusPending, jobs[0].Status)
// assert.Equal(t, workflowJobs[0].Id, jobs[0].WorkflowJobId)
// assert.Equal(t, jobAgentID, jobs[0].JobAgentId)
// assert.Equal(t, oapi.JobAgentConfig{
// "delaySeconds": float64(10),
// }, jobs[0].JobAgentConfig)

// // Verify DispatchContext for workflow job
// assert.NotNil(t, jobs[0].DispatchContext)
// assert.Equal(t, jobAgentID, jobs[0].DispatchContext.JobAgent.Id)
// assert.Equal(t, float64(10), jobs[0].DispatchContext.JobAgentConfig["delaySeconds"])
// assert.NotNil(t, jobs[0].DispatchContext.WorkflowJob)
// assert.Equal(t, workflowJobs[0].Id, jobs[0].DispatchContext.WorkflowJob.Id)
// assert.NotNil(t, jobs[0].DispatchContext.WorkflowRun)
// assert.Equal(t, workflowRun.Id, jobs[0].DispatchContext.WorkflowRun.Id)
// assert.NotNil(t, jobs[0].DispatchContext.Workflow)
// assert.Equal(t, workflowID, jobs[0].DispatchContext.Workflow.Id)
// // Workflow jobs should not have release context
// assert.Nil(t, jobs[0].DispatchContext.Release)
// assert.Nil(t, jobs[0].DispatchContext.Deployment)
// assert.Nil(t, jobs[0].DispatchContext.Environment)
// assert.Nil(t, jobs[0].DispatchContext.Resource)
// }

// func TestEngine_Workflow_MultipleInputs(t *testing.T) {
// jobAgentID := uuid.New().String()
// workflowID := uuid.New().String()
// workflowJobTemplateID := uuid.New().String()

// engine := integration.NewTestWorkspace(t,
// integration.WithJobAgent(
// integration.JobAgentID(jobAgentID),
// integration.JobAgentName("Test Agent"),
// ),
// integration.WithWorkflow(
// integration.WorkflowID(workflowID),
// integration.WithWorkflowStringInput(
// integration.WorkflowStringInputKey("input-1"),
// integration.WorkflowStringInputDefault("default-1"),
// ),
// integration.WithWorkflowNumberInput(
// integration.WorkflowNumberInputKey("input-2"),
// integration.WorkflowNumberInputDefault(2),
// ),
// integration.WithWorkflowBooleanInput(
// integration.WorkflowBooleanInputKey("input-3"),
// integration.WorkflowBooleanInputDefault(true),
// ),
// integration.WithWorkflowJobTemplate(
// integration.WorkflowJobTemplateID(workflowJobTemplateID),
// integration.WorkflowJobTemplateJobAgentID(jobAgentID),
// integration.WorkflowJobTemplateJobAgentConfig(map[string]any{
// "delaySeconds": 10,
// }),
// integration.WorkflowJobTemplateName("Test Job 1"),
// ),
// ),
// )

// ctx := context.Background()

// workflowRunCreate := &oapi.WorkflowRun{
// WorkflowId: workflowID,
// Inputs: map[string]any{
// "input-1": "custom-1",
// "input-2": 5,
// "input-3": false,
// },
// }
// engine.PushEvent(ctx, handler.WorkflowRunCreate, workflowRunCreate)

// workflowRuns := engine.Workspace().WorkflowRuns().GetByWorkflowId(workflowID)
// assert.Len(t, workflowRuns, 1)
// workflowRunsSlice := make([]*oapi.WorkflowRun, 0)
// for _, workflowRun := range workflowRuns {
// workflowRunsSlice = append(workflowRunsSlice, workflowRun)
// }
// workflowRun := workflowRunsSlice[0]
// assert.NotNil(t, workflowRun)
// assert.Equal(t, workflowID, workflowRun.WorkflowId)
// assert.Equal(t, map[string]any{
// "input-1": "custom-1",
// "input-2": float64(5),
// "input-3": false,
// }, workflowRun.Inputs)

// workflowJobs := engine.Workspace().WorkflowJobs().GetByWorkflowRunId(workflowRun.Id)
// assert.Len(t, workflowJobs, 1)
// assert.Equal(t, 0, workflowJobs[0].Index)

// jobs := engine.Workspace().Jobs().GetByWorkflowJobId(workflowJobs[0].Id)
// assert.Len(t, jobs, 1)
// assert.Equal(t, oapi.JobStatusPending, jobs[0].Status)
// assert.Equal(t, workflowJobs[0].Id, jobs[0].WorkflowJobId)
// assert.Equal(t, jobAgentID, jobs[0].JobAgentId)
// assert.Equal(t, oapi.JobAgentConfig{
// "delaySeconds": float64(10),
// }, jobs[0].JobAgentConfig)
// }

// func TestEngine_Workflow_MultipleJobsConcurrent(t *testing.T) {
// jobAgentID1 := uuid.New().String()
// jobAgentID2 := uuid.New().String()
// workflowID := uuid.New().String()
// workflowJobTemplateID1 := uuid.New().String()
// workflowJobTemplateID2 := uuid.New().String()

// engine := integration.NewTestWorkspace(t,
// integration.WithJobAgent(
// integration.JobAgentID(jobAgentID1),
// integration.JobAgentName("Test Agent 1"),
// ),
// integration.WithJobAgent(
// integration.JobAgentID(jobAgentID2),
// integration.JobAgentName("Test Agent 2"),
// ),
// integration.WithWorkflow(
// integration.WorkflowID(workflowID),
// integration.WithWorkflowJobTemplate(
// integration.WorkflowJobTemplateID(workflowJobTemplateID1),
// integration.WorkflowJobTemplateJobAgentID(jobAgentID1),
// integration.WorkflowJobTemplateJobAgentConfig(map[string]any{
// "delaySeconds": 10,
// }),
// integration.WorkflowJobTemplateName("Test Job 1"),
// ),
// integration.WithWorkflowJobTemplate(
// integration.WorkflowJobTemplateID(workflowJobTemplateID2),
// integration.WorkflowJobTemplateJobAgentID(jobAgentID2),
// integration.WorkflowJobTemplateJobAgentConfig(map[string]any{
// "delaySeconds": 20,
// }),
// integration.WorkflowJobTemplateName("Test Job 2"),
// ),
// ),
// )

// ctx := context.Background()

// workflowRunCreate := &oapi.WorkflowRun{
// WorkflowId: workflowID,
// }
// engine.PushEvent(ctx, handler.WorkflowRunCreate, workflowRunCreate)

// workflowRuns := engine.Workspace().WorkflowRuns().GetByWorkflowId(workflowID)
// assert.Len(t, workflowRuns, 1)
// workflowRunsSlice := make([]*oapi.WorkflowRun, 0)
// for _, workflowRun := range workflowRuns {
// workflowRunsSlice = append(workflowRunsSlice, workflowRun)
// }
// workflowRun := workflowRunsSlice[0]
// assert.NotNil(t, workflowRun)
// assert.Equal(t, workflowID, workflowRun.WorkflowId)

// workflowJobs := engine.Workspace().WorkflowJobs().GetByWorkflowRunId(workflowRun.Id)
// assert.Len(t, workflowJobs, 2)
// assert.Equal(t, 0, workflowJobs[0].Index)
// assert.Equal(t, 1, workflowJobs[1].Index)

// wfJob1jobs := engine.Workspace().Jobs().GetByWorkflowJobId(workflowJobs[0].Id)
// assert.Len(t, wfJob1jobs, 1)
// assert.Equal(t, oapi.JobStatusPending, wfJob1jobs[0].Status)
// assert.Equal(t, workflowJobs[0].Id, wfJob1jobs[0].WorkflowJobId)
// assert.Equal(t, jobAgentID1, wfJob1jobs[0].JobAgentId)
// assert.Equal(t, oapi.JobAgentConfig{
// "delaySeconds": float64(10),
// }, wfJob1jobs[0].JobAgentConfig)

// // Verify DispatchContext for workflow job 1
// assert.NotNil(t, wfJob1jobs[0].DispatchContext)
// assert.Equal(t, jobAgentID1, wfJob1jobs[0].DispatchContext.JobAgent.Id)
// assert.NotNil(t, wfJob1jobs[0].DispatchContext.WorkflowJob)
// assert.NotNil(t, wfJob1jobs[0].DispatchContext.WorkflowRun)
// assert.NotNil(t, wfJob1jobs[0].DispatchContext.Workflow)
// assert.Equal(t, workflowID, wfJob1jobs[0].DispatchContext.Workflow.Id)

// wfJob2jobs := engine.Workspace().Jobs().GetByWorkflowJobId(workflowJobs[1].Id)
// assert.Len(t, wfJob2jobs, 1)
// assert.Equal(t, oapi.JobStatusPending, wfJob2jobs[0].Status)
// assert.Equal(t, workflowJobs[1].Id, wfJob2jobs[0].WorkflowJobId)
// assert.Equal(t, jobAgentID2, wfJob2jobs[0].JobAgentId)
// assert.Equal(t, oapi.JobAgentConfig{
// "delaySeconds": float64(20),
// }, wfJob2jobs[0].JobAgentConfig)

// // Verify DispatchContext for workflow job 2
// assert.NotNil(t, wfJob2jobs[0].DispatchContext)
// assert.Equal(t, jobAgentID2, wfJob2jobs[0].DispatchContext.JobAgent.Id)
// assert.NotNil(t, wfJob2jobs[0].DispatchContext.WorkflowJob)
// assert.NotNil(t, wfJob2jobs[0].DispatchContext.WorkflowRun)
// assert.NotNil(t, wfJob2jobs[0].DispatchContext.Workflow)
// assert.Equal(t, workflowID, wfJob2jobs[0].DispatchContext.Workflow.Id)

// wfv, err := workflowmanager.NewWorkflowRunView(engine.Workspace().Store(), workflowRun.Id)
// assert.NoError(t, err)
// assert.NotNil(t, wfv)
// assert.False(t, wfv.IsComplete())

// completedAt := time.Now()
// jobUpdateEvent := &oapi.JobUpdateEvent{
// Id: &wfJob1jobs[0].Id,
// Job: oapi.Job{
// Id: wfJob1jobs[0].Id,
// Status: oapi.JobStatusSuccessful,
// CompletedAt: &completedAt,
// },
// FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{
// oapi.JobUpdateEventFieldsToUpdateStatus,
// oapi.JobUpdateEventFieldsToUpdateCompletedAt,
// },
// }
// engine.PushEvent(ctx, handler.JobUpdate, jobUpdateEvent)

// wfv, err = workflowmanager.NewWorkflowRunView(engine.Workspace().Store(), workflowRun.Id)
// assert.NoError(t, err)
// assert.False(t, wfv.IsComplete())

// completedAt2 := time.Now()
// jobUpdateEvent2 := &oapi.JobUpdateEvent{
// Id: &wfJob2jobs[0].Id,
// Job: oapi.Job{
// Id: wfJob2jobs[0].Id,
// Status: oapi.JobStatusSuccessful,
// CompletedAt: &completedAt2,
// },
// FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{
// oapi.JobUpdateEventFieldsToUpdateStatus,
// oapi.JobUpdateEventFieldsToUpdateCompletedAt,
// },
// }
// engine.PushEvent(ctx, handler.JobUpdate, jobUpdateEvent2)

// wfv, err = workflowmanager.NewWorkflowRunView(engine.Workspace().Store(), workflowRun.Id)
// assert.NoError(t, err)
// assert.NotNil(t, wfv)
// assert.True(t, wfv.IsComplete())
// }

// func TestEngine_Workflow_ConcurrentWorkflows(t *testing.T) {
// jobAgentID := uuid.New().String()
// workflowID := uuid.New().String()
// workflowJobTemplateID := uuid.New().String()

// engine := integration.NewTestWorkspace(t,
// integration.WithJobAgent(
// integration.JobAgentID(jobAgentID),
// integration.JobAgentName("Test Agent"),
// ),
// integration.WithWorkflow(
// integration.WorkflowID(workflowID),
// integration.WithWorkflowStringInput(
// integration.WorkflowStringInputKey("input-1"),
// integration.WorkflowStringInputDefault("default-1"),
// ),
// integration.WithWorkflowJobTemplate(
// integration.WorkflowJobTemplateID(workflowJobTemplateID),
// integration.WorkflowJobTemplateJobAgentID(jobAgentID),
// integration.WorkflowJobTemplateJobAgentConfig(map[string]any{
// "delaySeconds": 10,
// }),
// integration.WorkflowJobTemplateName("Test Job 1"),
// ),
// ),
// )

// ctx := context.Background()

// workflow1RunCreate := &oapi.WorkflowRun{
// WorkflowId: workflowID,
// Inputs: map[string]any{
// "input-1": "custom-1",
// },
// }
// engine.PushEvent(ctx, handler.WorkflowRunCreate, workflow1RunCreate)

// workflow2RunCreate := &oapi.WorkflowRun{
// WorkflowId: workflowID,
// Inputs: map[string]any{
// "input-1": "custom-2",
// },
// }
// engine.PushEvent(ctx, handler.WorkflowRunCreate, workflow2RunCreate)

// workflowRuns := engine.Workspace().WorkflowRuns().GetByWorkflowId(workflowID)
// assert.Len(t, workflowRuns, 2)
// assert.Equal(t, 2, len(workflowRuns))

// var workflow1Run *oapi.WorkflowRun
// var workflow2Run *oapi.WorkflowRun

// workflowRunsSlice := make([]*oapi.WorkflowRun, 0)
// for _, workflowRun := range workflowRuns {
// workflowRunsSlice = append(workflowRunsSlice, workflowRun)
// }

// for _, workflowRun := range workflowRunsSlice {
// if workflowRun.Inputs["input-1"] == "custom-1" {
// workflow1Run = workflowRun
// } else {
// workflow2Run = workflowRun
// }
// }

// workflow1WorkflowJobs := engine.Workspace().WorkflowJobs().GetByWorkflowRunId(workflow1Run.Id)
// assert.Len(t, workflow1WorkflowJobs, 1)
// assert.Equal(t, 0, workflow1WorkflowJobs[0].Index)

// workflow2WorkflowJobs := engine.Workspace().WorkflowJobs().GetByWorkflowRunId(workflow2Run.Id)
// assert.Len(t, workflow2WorkflowJobs, 1)
// assert.Equal(t, 0, workflow2WorkflowJobs[0].Index)

// workflowJob1Jobs := engine.Workspace().Jobs().GetByWorkflowJobId(workflow1WorkflowJobs[0].Id)
// assert.Len(t, workflowJob1Jobs, 1)
// assert.Equal(t, oapi.JobStatusPending, workflowJob1Jobs[0].Status)
// assert.Equal(t, workflow1WorkflowJobs[0].Id, workflowJob1Jobs[0].WorkflowJobId)
// assert.Equal(t, jobAgentID, workflowJob1Jobs[0].JobAgentId)
// assert.Equal(t, oapi.JobAgentConfig{
// "delaySeconds": float64(10),
// }, workflowJob1Jobs[0].JobAgentConfig)

// completedAt1 := time.Now()
// jobUpdateEvent1 := &oapi.JobUpdateEvent{
// Id: &workflowJob1Jobs[0].Id,
// Job: oapi.Job{
// Id: workflowJob1Jobs[0].Id,
// Status: oapi.JobStatusSuccessful,
// CompletedAt: &completedAt1,
// },
// FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{
// oapi.JobUpdateEventFieldsToUpdateStatus,
// oapi.JobUpdateEventFieldsToUpdateCompletedAt,
// },
// }
// engine.PushEvent(ctx, handler.JobUpdate, jobUpdateEvent1)

// workflowJob2Jobs := engine.Workspace().Jobs().GetByWorkflowJobId(workflow2WorkflowJobs[0].Id)
// assert.Len(t, workflowJob2Jobs, 1)
// assert.Equal(t, oapi.JobStatusPending, workflowJob2Jobs[0].Status)
// assert.Equal(t, workflow2WorkflowJobs[0].Id, workflowJob2Jobs[0].WorkflowJobId)
// assert.Equal(t, jobAgentID, workflowJob2Jobs[0].JobAgentId)
// assert.Equal(t, oapi.JobAgentConfig{
// "delaySeconds": float64(10),
// }, workflowJob2Jobs[0].JobAgentConfig)

// completedAt2 := time.Now()
// jobUpdateEvent2 := &oapi.JobUpdateEvent{
// Id: &workflowJob2Jobs[0].Id,
// Job: oapi.Job{
// Id: workflowJob2Jobs[0].Id,
// Status: oapi.JobStatusSuccessful,
// CompletedAt: &completedAt2,
// },
// FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{
// oapi.JobUpdateEventFieldsToUpdateStatus,
// oapi.JobUpdateEventFieldsToUpdateCompletedAt,
// },
// }
// engine.PushEvent(ctx, handler.JobUpdate, jobUpdateEvent2)

// wfv1, err := workflowmanager.NewWorkflowRunView(engine.Workspace().Store(), workflow1Run.Id)
// assert.NoError(t, err)
// assert.NotNil(t, wfv1)
// assert.True(t, wfv1.IsComplete())

// wfv2, err := workflowmanager.NewWorkflowRunView(engine.Workspace().Store(), workflow2Run.Id)
// assert.NoError(t, err)
// assert.NotNil(t, wfv2)
// assert.True(t, wfv2.IsComplete())
// }

// func TestEngine_Workflow_DeleteTemplateCascade(t *testing.T) {
// jobAgentID := uuid.New().String()
// workflowID := uuid.New().String()
// workflowJobTemplateID1 := uuid.New().String()
// workflowJobTemplateID2 := uuid.New().String()

// engine := integration.NewTestWorkspace(t,
// integration.WithJobAgent(
// integration.JobAgentID(jobAgentID),
// integration.JobAgentName("Test Agent"),
// ),
// integration.WithWorkflow(
// integration.WorkflowID(workflowID),
// integration.WithWorkflowStringInput(
// integration.WorkflowStringInputKey("input-1"),
// integration.WorkflowStringInputDefault("default-1"),
// ),
// integration.WithWorkflowJobTemplate(
// integration.WorkflowJobTemplateID(workflowJobTemplateID1),
// integration.WorkflowJobTemplateJobAgentID(jobAgentID),
// integration.WorkflowJobTemplateJobAgentConfig(map[string]any{
// "delaySeconds": 10,
// }),
// integration.WorkflowJobTemplateName("Test Job 1"),
// ),
// integration.WithWorkflowJobTemplate(
// integration.WorkflowJobTemplateID(workflowJobTemplateID2),
// integration.WorkflowJobTemplateJobAgentID(jobAgentID),
// integration.WorkflowJobTemplateJobAgentConfig(map[string]any{
// "delaySeconds": 20,
// }),
// integration.WorkflowJobTemplateName("Test Job 2"),
// ),
// ),
// )

// ctx := context.Background()

// engine.PushEvent(ctx, handler.WorkflowRunCreate, &oapi.WorkflowRun{
// WorkflowId: workflowID,
// Inputs: map[string]any{"input-1": "value-1"},
// })
// engine.PushEvent(ctx, handler.WorkflowRunCreate, &oapi.WorkflowRun{
// WorkflowId: workflowID,
// Inputs: map[string]any{"input-1": "value-2"},
// })

// workflowRuns := engine.Workspace().WorkflowRuns().GetByWorkflowId(workflowID)
// assert.Len(t, workflowRuns, 2)

// allWorkflowJobIDs := make([]string, 0)
// allJobIDs := make([]string, 0)
// for _, wfRun := range workflowRuns {
// wfJobs := engine.Workspace().WorkflowJobs().GetByWorkflowRunId(wfRun.Id)
// assert.Len(t, wfJobs, 2, "each workflow should have 2 workflow jobs")
// for _, wfJob := range wfJobs {
// allWorkflowJobIDs = append(allWorkflowJobIDs, wfJob.Id)
// jobs := engine.Workspace().Jobs().GetByWorkflowJobId(wfJob.Id)
// assert.Len(t, jobs, 1, "each workflow job should have 1 job")
// allJobIDs = append(allJobIDs, jobs[0].Id)
// }
// }
// assert.Len(t, allWorkflowJobIDs, 4, "should have 4 workflow jobs total")
// assert.Len(t, allJobIDs, 4, "should have 4 jobs total")

// workflow, ok := engine.Workspace().Workflows().Get(workflowID)
// assert.True(t, ok)
// engine.PushEvent(ctx, handler.WorkflowDelete, workflow)

// _, ok = engine.Workspace().Workflows().Get(workflowID)
// assert.False(t, ok, "workflow should be removed")

// remainingWorkflowRuns := engine.Workspace().WorkflowRuns().GetByWorkflowId(workflowID)
// assert.Empty(t, remainingWorkflowRuns, "all workflow runs should be removed")

// for _, wfJobID := range allWorkflowJobIDs {
// _, ok := engine.Workspace().WorkflowJobs().Get(wfJobID)
// assert.False(t, ok, "workflow job %s should be removed", wfJobID)
// }

// // for _, jobID := range allJobIDs {
// // _, ok := engine.Workspace().Jobs().Get(jobID)
// // assert.False(t, ok, "job %s should be removed", jobID)
// // }
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Re-enable these workflow e2e tests instead of keeping them commented out.

This change disables the whole file, so these scenarios no longer execute in CI and workflow regressions can pass unnoticed.

As per coding guidelines, apps/workspace-engine/**/*_test.go: “Follow the existing test structure used in *_test.go files”.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/test/e2e/engine_workflow_test.go` around lines 3 - 526,
Un-comment and restore the e2e tests in this file so the TestEngine_Workflow_*
functions (e.g., TestEngine_Workflow_BasicFlow,
TestEngine_Workflow_MultipleInputs, TestEngine_Workflow_MultipleJobsConcurrent,
TestEngine_Workflow_ConcurrentWorkflows,
TestEngine_Workflow_DeleteTemplateCascade) and their imports are active again:
remove the block comment markers around the import block and all
TestEngine_Workflow_* test functions, ensure the import list (context, testing,
time, handler, oapi, workflowmanager, integration, uuid, assert) is present and
compiles, and run the tests locally (go test) to fix any import or compilation
issues introduced by uncommenting.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
apps/workspace-engine/test/e2e/engine_policy_environment_progression_test.go (1)

79-154: Consider cleaning up or conditionalizing DEBUG instrumentation.

The extensive DEBUG:-prefixed logging appears to be development instrumentation for tracing async state transitions. While useful for troubleshooting CI failures, consider either:

  • Removing these before merging if they were added for debugging this PR
  • Removing the "DEBUG:" prefix if they should remain as permanent test output

This is a minor observation—the logs don't affect test correctness.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/test/e2e/engine_policy_environment_progression_test.go`
around lines 79 - 154, The test contains many development "DEBUG:" t.Logf lines
(e.g., logs around version.Id, qaReleases/prodReleases, qaJobs/prodJobs,
allJobs, and after job updates where engine.PushEvent / handler.JobUpdate is
invoked) that should be cleaned up or made conditional; either remove the debug
t.Logf lines entirely or wrap them in a verbosity guard (e.g., if
testing.Verbose() { t.Logf(...) }) or drop the "DEBUG:" prefix and keep only
concise, permanent test logs — update occurrences that reference qaReleases,
prodReleases, qaJobs, prodJobs, allJobs, qaJob and the post-update checks
accordingly.
apps/workspace-engine/pkg/workspace/store/releases.go (1)

5-5: Remove temporary debug prints in Releases.Jobs and drop fmt import.

Per-call and per-job stdout logging here is noisy in normal operation and should not stay in this path.

🧹 Proposed cleanup
 import (
 	"context"
-	"fmt"
 	"workspace-engine/pkg/oapi"
 	"workspace-engine/pkg/workspace/store/repository"
@@
 	jobItems, err := r.store.Jobs.repo.GetByReleaseID(releaseId)
 	if err != nil {
-		fmt.Printf("[DEBUG Releases.Jobs] GetByReleaseID(%s) error: %v\n", releaseId, err)
 		return jobs
 	}
-	fmt.Printf("[DEBUG Releases.Jobs] GetByReleaseID(%s) returned %d jobs\n", releaseId, len(jobItems))
 	for _, job := range jobItems {
-		fmt.Printf("[DEBUG Releases.Jobs]   job id=%s status=%s\n", job.Id, job.Status)
 		jobs[job.Id] = job
 	}

Also applies to: 67-73

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/store/releases.go` at line 5, Remove the
temporary stdout debug prints from the Releases.Jobs method (remove any
fmt.Println/Printf calls inside Releases.Jobs and per-job loops) and drop the
"fmt" import since it is only used for those debug prints; if structured logging
is required preserve existing logger usage instead of printing to stdout. Ensure
no other references to fmt remain in the file so the import can be removed
cleanly.
apps/workspace-engine/pkg/workspace/store/jobs.go (1)

34-37: Drop temporary high-volume debug prints from store hot paths.

These unconditional fmt.Printf calls run on every upsert and release/job scan, which can flood logs and expose internal IDs unnecessarily.

🧹 Proposed cleanup
 func (j *Jobs) Upsert(ctx context.Context, job *oapi.Job) {
-	fmt.Printf("[DEBUG Jobs.Upsert] id=%s releaseId=%s status=%s agentId=%s\n", job.Id, job.ReleaseId, job.Status, job.JobAgentId)
 	if err := j.repo.Set(job); err != nil {
-		fmt.Printf("[DEBUG Jobs.Upsert] ERROR: %v\n", err)
 		log.Warn("Failed to upsert job", "job_id", job.Id, "error", err)
 		return
 	}
 	j.store.changeset.RecordUpsert(job)
 }
@@
-	fmt.Printf("[DEBUG GetJobsForReleaseTarget] key=%s\n", releaseTarget.Key())
 	releases, err := j.store.Releases.GetByReleaseTargetKey(releaseTarget.Key())
 	if err != nil {
-		fmt.Printf("[DEBUG GetJobsForReleaseTarget] GetByReleaseTargetKey error: %v\n", err)
+		log.Warn("Failed to load releases for release target", "release_target_key", releaseTarget.Key(), "error", err)
 		return jobs
 	}
-	fmt.Printf("[DEBUG GetJobsForReleaseTarget] found %d releases\n", len(releases))
 
 	for _, release := range releases {
-		fmt.Printf("[DEBUG GetJobsForReleaseTarget] release id=%s\n", release.Id.String())
 		releaseJobs := j.store.Releases.Jobs(release.Id.String())
-		fmt.Printf("[DEBUG GetJobsForReleaseTarget] release %s has %d jobs\n", release.Id.String(), len(releaseJobs))
 		for _, job := range releaseJobs {
 			jobs[job.Id] = job
 		}
 	}

Also applies to: 85-96

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/store/jobs.go` around lines 34 - 37,
Remove the unconditional fmt.Printf debug prints from the hot paths (e.g., the
println in Jobs.Upsert before calling j.repo.Set and the similar prints around
lines 85-96); instead rely on the existing structured logger (log.*) at an
appropriate level or guard debug output behind a configurable verbose/debug flag
so high-volume operations do not unconditionally print IDs to stdout. Locate the
prints in the Jobs.Upsert function (the line with fmt.Printf("[DEBUG
Jobs.Upsert] id=...") and the other fmt.Printf occurrences near the release/job
scan code) and delete them or replace them with conditional log.Debug calls that
check a debug flag before emitting sensitive details. Ensure error logging
(log.Warn("Failed to upsert job", "job_id", job.Id, "error", err)) remains
intact.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/workspace-engine/pkg/workspace/store/jobs.go`:
- Around line 33-39: The Jobs.Upsert currently swallows persistence errors;
change the signature of Jobs.Upsert to return error, propagate the error from
j.repo.Set(job) instead of only logging, and remove the silent return;
specifically update function Jobs.Upsert to return the err from j.repo.Set (or
wrap it with context) and update all callers of Jobs.Upsert to handle the
returned error (e.g., check and propagate or log/handle appropriately). Ensure
logs remain but do not replace returning the error so callers can detect failed
writes.

---

Nitpick comments:
In `@apps/workspace-engine/pkg/workspace/store/jobs.go`:
- Around line 34-37: Remove the unconditional fmt.Printf debug prints from the
hot paths (e.g., the println in Jobs.Upsert before calling j.repo.Set and the
similar prints around lines 85-96); instead rely on the existing structured
logger (log.*) at an appropriate level or guard debug output behind a
configurable verbose/debug flag so high-volume operations do not unconditionally
print IDs to stdout. Locate the prints in the Jobs.Upsert function (the line
with fmt.Printf("[DEBUG Jobs.Upsert] id=...") and the other fmt.Printf
occurrences near the release/job scan code) and delete them or replace them with
conditional log.Debug calls that check a debug flag before emitting sensitive
details. Ensure error logging (log.Warn("Failed to upsert job", "job_id",
job.Id, "error", err)) remains intact.

In `@apps/workspace-engine/pkg/workspace/store/releases.go`:
- Line 5: Remove the temporary stdout debug prints from the Releases.Jobs method
(remove any fmt.Println/Printf calls inside Releases.Jobs and per-job loops) and
drop the "fmt" import since it is only used for those debug prints; if
structured logging is required preserve existing logger usage instead of
printing to stdout. Ensure no other references to fmt remain in the file so the
import can be removed cleanly.

In
`@apps/workspace-engine/test/e2e/engine_policy_environment_progression_test.go`:
- Around line 79-154: The test contains many development "DEBUG:" t.Logf lines
(e.g., logs around version.Id, qaReleases/prodReleases, qaJobs/prodJobs,
allJobs, and after job updates where engine.PushEvent / handler.JobUpdate is
invoked) that should be cleaned up or made conditional; either remove the debug
t.Logf lines entirely or wrap them in a verbosity guard (e.g., if
testing.Verbose() { t.Logf(...) }) or drop the "DEBUG:" prefix and keep only
concise, permanent test logs — update occurrences that reference qaReleases,
prodReleases, qaJobs, prodJobs, allJobs, qaJob and the post-update checks
accordingly.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 20813c8 and 34e7fe6.

📒 Files selected for processing (5)
  • .github/workflows/apps-workspace-engine.yaml
  • apps/workspace-engine/pkg/workspace/store/jobs.go
  • apps/workspace-engine/pkg/workspace/store/releases.go
  • apps/workspace-engine/pkg/workspace/store/repository/db/jobs/repo.go
  • apps/workspace-engine/test/e2e/engine_policy_environment_progression_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • apps/workspace-engine/pkg/workspace/store/repository/db/jobs/repo.go
  • .github/workflows/apps-workspace-engine.yaml

Comment on lines 33 to +39
func (j *Jobs) Upsert(ctx context.Context, job *oapi.Job) {
_ = j.repo.Set(job)
fmt.Printf("[DEBUG Jobs.Upsert] id=%s releaseId=%s status=%s agentId=%s\n", job.Id, job.ReleaseId, job.Status, job.JobAgentId)
if err := j.repo.Set(job); err != nil {
fmt.Printf("[DEBUG Jobs.Upsert] ERROR: %v\n", err)
log.Warn("Failed to upsert job", "job_id", job.Id, "error", err)
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Propagate Upsert persistence failures instead of returning silently.

Line 38 exits after logging, but the caller cannot detect the failed write. That can leave upstream flow proceeding with an assumed successful upsert.

💡 Proposed fix (return error from Upsert)
-func (j *Jobs) Upsert(ctx context.Context, job *oapi.Job) {
+func (j *Jobs) Upsert(ctx context.Context, job *oapi.Job) error {
 	fmt.Printf("[DEBUG Jobs.Upsert] id=%s releaseId=%s status=%s agentId=%s\n", job.Id, job.ReleaseId, job.Status, job.JobAgentId)
 	if err := j.repo.Set(job); err != nil {
 		fmt.Printf("[DEBUG Jobs.Upsert] ERROR: %v\n", err)
 		log.Warn("Failed to upsert job", "job_id", job.Id, "error", err)
-		return
+		return fmt.Errorf("upsert job %s: %w", job.Id, err)
 	}
 	j.store.changeset.RecordUpsert(job)
+	return nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/store/jobs.go` around lines 33 - 39, The
Jobs.Upsert currently swallows persistence errors; change the signature of
Jobs.Upsert to return error, propagate the error from j.repo.Set(job) instead of
only logging, and remove the silent return; specifically update function
Jobs.Upsert to return the err from j.repo.Set (or wrap it with context) and
update all callers of Jobs.Upsert to handle the returned error (e.g., check and
propagate or log/handle appropriately). Ensure logs remain but do not replace
returning the error so callers can detect failed writes.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
apps/workspace-engine/pkg/workspace/releasemanager/manager.go (1)

199-219: ⚠️ Potential issue | 🟠 Major

Remove raw fmt.Printf debugging from reconciliation paths and use structured telemetry/logging.

These added stdout prints (24+ occurrences across reconcileFn, reconcileTargetWithRecorder, ReconcileTargets, and ReconcileTarget) are in hot paths and per-target loops, bypassing the existing structured logging and OpenTelemetry instrumentation already used throughout the file. Replace them with span.AddEvent() and log.Debug(...) with fields to maintain observability consistency and avoid noisy, hard-to-query stdout in production.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/releasemanager/manager.go` around lines
199 - 219, The code uses raw fmt.Printf calls in reconciliation paths (e.g., in
reconcileFn and elsewhere) which should be removed; instead add structured
telemetry and logging by replacing each fmt.Printf with span.AddEvent(...) for
OpenTelemetry events and log.Debug(...) with structured fields (e.g.,
"workspace_id", workspaceID, "release_target", state.entity.Key(), and "error"
when present). Update the reconciler functions named reconcileFn,
reconcileTargetWithRecorder, ReconcileTargets, and ReconcileTarget to use
span.AddEvent for lifecycle messages (start/success/failure) and log.Debug for
local debug logs, and ensure error paths still call
recorder.Complete(trace.StatusFailed) and record the error via log.Debug fields
rather than printing to stdout. Also scan and remove any other fmt.Printf
occurrences in these functions to keep observability consistent.
🧹 Nitpick comments (1)
apps/workspace-engine/pkg/workspace/store/repository/db/jobs/repo.go (1)

133-149: Replace fmt.Printf debug traces with structured logging.

These debug prints bypass log levels and will always appear in output. Use log.Debug(...) with key-value fields instead, consistent with the rest of the codebase.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/store/repository/db/jobs/repo.go` around
lines 133 - 149, The debug fmt.Printf calls in the GetByReleaseID flow should be
replaced with the project's structured logger (use log.Debug with key/value
fields) so messages respect log levels; replace the prints that log parse
errors, query start, query error, rows found, and per-job details with log.Debug
entries (include fields like "release_id", "uid", "rows", "error", "job_id",
"job_status") and for errors use log.Error or log.Debug with error included;
update the block around parsing releaseID, the call to
db.GetQueries(r.ctx).ListJobsByReleaseID, the len(rows) message, and the loop
that calls ToOapi(fromReleaseRow(row)) to emit structured logs instead of
fmt.Printf.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/getters.go`:
- Line 5: Remove all temporary debug prints (fmt.Printf calls) in this file
(including the ones noted around lines 30-35, 47-52, 60-65, 69-75) and any other
stray fmt.Printf usages; replace them with the project's structured logger if
persistent observability is required (e.g., use the existing logger instance
used elsewhere in this package) and ensure messages are logged at an appropriate
level, and then remove the now-unused "fmt" import from the import block so the
file compiles cleanly.

In
`@apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/passrate.go`:
- Line 57: Remove the temporary fmt.Printf debug prints in the passrate
evaluator (e.g., the fmt.Printf calls inside EvaluateWithTracker that print
successPercentage and e.minimumSuccessPercentage and the other similar lines
referenced), replacing them with the project's structured logger at debug/trace
level if observability is required, or simply deleting them if not needed; after
removing these fmt.Printf calls, also remove the "fmt" import if it becomes
unused to avoid compiler warnings.

In `@apps/workspace-engine/pkg/workspace/releasemanager/state_index.go`:
- Around line 53-70: Replace the always-on fmt.Printf debug prints in
StateIndex.computeDesiredRelease with structured log.Debug calls so logger level
control is respected: use the existing logger (same style as other uses on lines
88/108) to log messages for the key lookup, release-target-not-found, found
target (include rt.EnvironmentId, rt.DeploymentId, rt.ResourceId),
PlanDeployment error, and PlanDeployment result (include release.Id and
release.Version.Tag when non-nil); do the same replacement for the analogous
fmt.Printf lines in the Recompute method (around the 236–242 area) and ensure
you import/use the same log package and message formatting conventions as other
log.Debug uses in this file.

In `@apps/workspace-engine/pkg/workspace/store/repository/db/jobs/repo.go`:
- Around line 56-63: The Set method currently calls queries.InsertReleaseJob and
only logs errors (printf) then continues, which can leave jobs unlinked to
releases and break GetByReleaseID; update the Set implementation (the call to
queries.InsertReleaseJob inside Set) to propagate the error instead of
swallowing it—capture the returned error, wrap or annotate it with context
(e.g., "InsertReleaseJob failed for releaseID/ jobID") and return it (or return
a combined error) so callers see the failure; ensure any surrounding logic that
depends on successful insertion respects the error path so the repository stays
consistent.

---

Outside diff comments:
In `@apps/workspace-engine/pkg/workspace/releasemanager/manager.go`:
- Around line 199-219: The code uses raw fmt.Printf calls in reconciliation
paths (e.g., in reconcileFn and elsewhere) which should be removed; instead add
structured telemetry and logging by replacing each fmt.Printf with
span.AddEvent(...) for OpenTelemetry events and log.Debug(...) with structured
fields (e.g., "workspace_id", workspaceID, "release_target", state.entity.Key(),
and "error" when present). Update the reconciler functions named reconcileFn,
reconcileTargetWithRecorder, ReconcileTargets, and ReconcileTarget to use
span.AddEvent for lifecycle messages (start/success/failure) and log.Debug for
local debug logs, and ensure error paths still call
recorder.Complete(trace.StatusFailed) and record the error via log.Debug fields
rather than printing to stdout. Also scan and remove any other fmt.Printf
occurrences in these functions to keep observability consistent.

---

Nitpick comments:
In `@apps/workspace-engine/pkg/workspace/store/repository/db/jobs/repo.go`:
- Around line 133-149: The debug fmt.Printf calls in the GetByReleaseID flow
should be replaced with the project's structured logger (use log.Debug with
key/value fields) so messages respect log levels; replace the prints that log
parse errors, query start, query error, rows found, and per-job details with
log.Debug entries (include fields like "release_id", "uid", "rows", "error",
"job_id", "job_status") and for errors use log.Error or log.Debug with error
included; update the block around parsing releaseID, the call to
db.GetQueries(r.ctx).ListJobsByReleaseID, the len(rows) message, and the loop
that calls ToOapi(fromReleaseRow(row)) to emit structured logs instead of
fmt.Printf.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 34e7fe6 and 34daf2b.

📒 Files selected for processing (10)
  • apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go
  • apps/workspace-engine/pkg/workspace/releasemanager/deployment/planner.go
  • apps/workspace-engine/pkg/workspace/releasemanager/manager.go
  • apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/environmentprogression.go
  • apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/getters.go
  • apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/jobtracker.go
  • apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/passrate.go
  • apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/soaktime.go
  • apps/workspace-engine/pkg/workspace/releasemanager/state_index.go
  • apps/workspace-engine/pkg/workspace/store/repository/db/jobs/repo.go
✅ Files skipped from review due to trivial changes (4)
  • apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/environmentprogression.go
  • apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go
  • apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/jobtracker.go
  • apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/soaktime.go


import (
"context"
"fmt"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Remove debug print statements before merging.

The fmt.Printf("[DEBUG ...] statements throughout this file appear to be temporary debugging code that should not be committed to production. These prints will:

  1. Pollute stdout/logs in production environments
  2. Expose internal IDs and state information unnecessarily
  3. Add performance overhead on every call

If persistent observability is needed for these methods, consider using the project's structured logging framework with appropriate log levels instead.

🧹 Proposed fix to remove debug statements
 import (
 	"context"
-	"fmt"
 	"workspace-engine/pkg/oapi"
 	"workspace-engine/pkg/workspace/store"
 )
 func (s *storeGetters) GetEnvironments() map[string]*oapi.Environment {
-	items := s.store.Environments.Items()
-	fmt.Printf("[DEBUG getters.GetEnvironments] returning %d environments\n", len(items))
-	for id, env := range items {
-		fmt.Printf("[DEBUG getters.GetEnvironments]   env id=%s name=%s\n", id, env.Name)
-	}
-	return items
+	return s.store.Environments.Items()
 }
 func (s *storeGetters) GetReleaseTargetsForEnvironment(ctx context.Context, environmentID string) ([]*oapi.ReleaseTarget, error) {
-	rts, err := s.store.ReleaseTargets.GetForEnvironment(ctx, environmentID)
-	fmt.Printf("[DEBUG getters.GetReleaseTargetsForEnvironment] envId=%s returned %d targets, err=%v\n", environmentID, len(rts), err)
-	for i, rt := range rts {
-		fmt.Printf("[DEBUG getters.GetReleaseTargetsForEnvironment]   rt[%d]: key=%s deployId=%s resourceId=%s\n", i, rt.Key(), rt.DeploymentId, rt.ResourceId)
-	}
-	return rts, err
+	return s.store.ReleaseTargets.GetForEnvironment(ctx, environmentID)
 }
 func (s *storeGetters) GetJobsForReleaseTarget(releaseTarget *oapi.ReleaseTarget) map[string]*oapi.Job {
-	jobs := s.store.Jobs.GetJobsForReleaseTarget(releaseTarget)
-	fmt.Printf("[DEBUG getters.GetJobsForReleaseTarget] rtKey=%s returned %d jobs\n", releaseTarget.Key(), len(jobs))
-	for id, j := range jobs {
-		fmt.Printf("[DEBUG getters.GetJobsForReleaseTarget]   job id=%s status=%s releaseId=%s completedAt=%v\n", id, j.Status, j.ReleaseId, j.CompletedAt)
-	}
-	return jobs
+	return s.store.Jobs.GetJobsForReleaseTarget(releaseTarget)
 }
 func (s *storeGetters) GetRelease(releaseID string) (*oapi.Release, bool) {
-	rel, ok := s.store.Releases.Get(releaseID)
-	if ok && rel != nil {
-		fmt.Printf("[DEBUG getters.GetRelease] releaseId=%s found=true versionId=%s\n", releaseID, rel.Version.Id)
-	} else {
-		fmt.Printf("[DEBUG getters.GetRelease] releaseId=%s found=%v\n", releaseID, ok)
-	}
-	return rel, ok
+	return s.store.Releases.Get(releaseID)
 }

Also applies to: 30-35, 47-52, 60-65, 69-75

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/getters.go`
at line 5, Remove all temporary debug prints (fmt.Printf calls) in this file
(including the ones noted around lines 30-35, 47-52, 60-65, 69-75) and any other
stray fmt.Printf usages; replace them with the project's structured logger if
persistent observability is required (e.g., use the existing logger instance
used elsewhere in this package) and ensure messages are logged at an appropriate
level, and then remove the now-unused "fmt" import from the import block so the
file compiles cleanly.

// EvaluateWithTracker evaluates pass rate using a pre-built tracker to avoid duplicate data fetching.
func (e *PassRateEvaluator) EvaluateWithTracker(tracker *ReleaseTargetJobTracker) *oapi.RuleEvaluation {
successPercentage := tracker.GetSuccessPercentage()
fmt.Printf("[DEBUG passrate.EvaluateWithTracker] successPercentage=%.1f%% minRequired=%.1f%%\n", successPercentage, e.minimumSuccessPercentage)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Remove debug fmt.Printf statements before merging.

These fmt.Printf debug statements bypass any logging framework, cannot be disabled via log levels, and output directly to stdout. They appear to be temporary instrumentation that should be removed before merge.

If these logs are needed for observability, replace them with proper structured logging using the project's logging framework with an appropriate debug/trace level.

🧹 Proposed fix to remove debug statements
 func (e *PassRateEvaluator) EvaluateWithTracker(tracker *ReleaseTargetJobTracker) *oapi.RuleEvaluation {
 	successPercentage := tracker.GetSuccessPercentage()
-	fmt.Printf("[DEBUG passrate.EvaluateWithTracker] successPercentage=%.1f%% minRequired=%.1f%%\n", successPercentage, e.minimumSuccessPercentage)

 	if e.minimumSuccessPercentage == 0 {
 		if successPercentage == 0 {
-			fmt.Printf("[DEBUG passrate.EvaluateWithTracker] DENIED: no successful jobs (default mode)\n")
 			return results.NewDeniedResult("No successful jobs").
 				WithDetail("success_percentage", successPercentage).
 				WithDetail("minimum_success_percentage", 0.0)
 		}
 		satisfiedAt := tracker.GetEarliestSuccess()
-		fmt.Printf("[DEBUG passrate.EvaluateWithTracker] ALLOWED: at least one success, satisfiedAt=%v\n", satisfiedAt)
 		return results.NewAllowedResult(fmt.Sprintf("Success rate %.1f%% meets requirement (at least one successful job)", successPercentage)).
 			WithDetail("success_percentage", successPercentage).
 			WithDetail("minimum_success_percentage", 0.0).
 			WithSatisfiedAt(satisfiedAt)
 	}

 	if successPercentage < e.minimumSuccessPercentage {
-		fmt.Printf("[DEBUG passrate.EvaluateWithTracker] DENIED: %.1f%% < %.1f%%\n", successPercentage, e.minimumSuccessPercentage)
 		return results.NewDeniedResult(fmt.Sprintf("Success rate %.1f%% below required %.1f%%", successPercentage, e.minimumSuccessPercentage)).
 			WithDetail("success_percentage", successPercentage).
 			WithDetail("minimum_success_percentage", e.minimumSuccessPercentage)
 	}

-	fmt.Printf("[DEBUG passrate.EvaluateWithTracker] ALLOWED: %.1f%% >= %.1f%%\n", successPercentage, e.minimumSuccessPercentage)
 	return results.NewAllowedResult(fmt.Sprintf("Success rate %.1f%% meets required %.1f%%", successPercentage, e.minimumSuccessPercentage)).
 		WithDetail("success_percentage", successPercentage).
 		WithDetail("minimum_success_percentage", e.minimumSuccessPercentage).
 		WithSatisfiedAt(tracker.GetSuccessPercentageSatisfiedAt(e.minimumSuccessPercentage))
 }

Note: If you remove all debug statements, you can also remove the "fmt" import if it becomes unused.

Also applies to: 61-61, 67-67, 75-75, 81-81

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@apps/workspace-engine/pkg/workspace/releasemanager/policy/evaluator/environmentprogression/passrate.go`
at line 57, Remove the temporary fmt.Printf debug prints in the passrate
evaluator (e.g., the fmt.Printf calls inside EvaluateWithTracker that print
successPercentage and e.minimumSuccessPercentage and the other similar lines
referenced), replacing them with the project's structured logger at debug/trace
level if observability is required, or simply deleting them if not needed; after
removing these fmt.Printf calls, also remove the "fmt" import if it becomes
unused to avoid compiler warnings.

Comment on lines +53 to +70
fmt.Printf("[DEBUG StateIndex.computeDesiredRelease] key=%s\n", key)
rt := si.store.ReleaseTargets.Get(key)
if rt == nil {
fmt.Printf("[DEBUG StateIndex.computeDesiredRelease] release target NOT FOUND for key=%s\n", key)
return nil, fmt.Errorf("release target %q not found", key)
}
fmt.Printf("[DEBUG StateIndex.computeDesiredRelease] release target found: envId=%s deployId=%s resourceId=%s\n", rt.EnvironmentId, rt.DeploymentId, rt.ResourceId)

release, err := si.planner.PlanDeployment(ctx, rt)
if err != nil {
fmt.Printf("[DEBUG StateIndex.computeDesiredRelease] PlanDeployment ERROR: %v\n", err)
return nil, err
}
if release == nil {
fmt.Printf("[DEBUG StateIndex.computeDesiredRelease] PlanDeployment returned NIL for key=%s\n", key)
} else {
fmt.Printf("[DEBUG StateIndex.computeDesiredRelease] PlanDeployment returned release id=%s versionTag=%s\n", release.Id.String(), release.Version.Tag)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, check if the file exists and get its size
wc -l apps/workspace-engine/pkg/workspace/releasemanager/state_index.go

Repository: ctrlplanedev/ctrlplane

Length of output: 135


🏁 Script executed:

#!/bin/bash
# Read the specific line ranges mentioned in the review
echo "=== Lines 53-70 ==="
sed -n '53,70p' apps/workspace-engine/pkg/workspace/releasemanager/state_index.go

echo -e "\n=== Lines 236-244 ==="
sed -n '236,244p' apps/workspace-engine/pkg/workspace/releasemanager/state_index.go

Repository: ctrlplanedev/ctrlplane

Length of output: 1454


🏁 Script executed:

#!/bin/bash
# Check fmt.Printf usage in state_index.go
echo "=== fmt.Printf occurrences in state_index.go ==="
rg -n 'fmt\.Printf' apps/workspace-engine/pkg/workspace/releasemanager/state_index.go

# Check log.Debug usage in state_index.go
echo -e "\n=== log.Debug occurrences in state_index.go ==="
rg -n 'log\.Debug' apps/workspace-engine/pkg/workspace/releasemanager/state_index.go

# Check imports in state_index.go
echo -e "\n=== Imports in state_index.go ==="
head -30 apps/workspace-engine/pkg/workspace/releasemanager/state_index.go | grep -E "^import|\".*log"

Repository: ctrlplanedev/ctrlplane

Length of output: 1415


🏁 Script executed:

#!/bin/bash
# Check the logging pattern used elsewhere in the releasemanager package
echo "=== fmt.Printf in releasemanager package ==="
rg -n 'fmt\.Printf' apps/workspace-engine/pkg/workspace/releasemanager/

echo -e "\n=== log.Debug in releasemanager package ==="
rg -n 'log\.Debug' apps/workspace-engine/pkg/workspace/releasemanager/

echo -e "\n=== Other log.* patterns in releasemanager package ==="
rg -n 'log\.(Info|Error|Warn|Debug)' apps/workspace-engine/pkg/workspace/releasemanager/ | head -20

Repository: ctrlplanedev/ctrlplane

Length of output: 33611


🏁 Script executed:

#!/bin/bash
# Check oapi.Release definition to understand Version nullability
echo "=== Searching for Release type definition ==="
rg -A 10 'type Release struct' apps/workspace-engine/ --type=go | head -30

echo -e "\n=== Checking Version field ==="
rg 'Version.*\*' apps/workspace-engine/ --type=go | grep -i release | head -10

Repository: ctrlplanedev/ctrlplane

Length of output: 3921


🏁 Script executed:

#!/bin/bash
# Check DeploymentVersion struct definition
echo "=== DeploymentVersion struct ==="
rg -A 15 'type DeploymentVersion struct' apps/workspace-engine/pkg/oapi/

Repository: ctrlplanedev/ctrlplane

Length of output: 1509


Replace fmt.Printf debug statements with structured log.Debug calls in hot paths.

These debug statements are always-on and bypass logger level control. In recompute paths, that can flood stdout and make prod logs harder to operate. The file already uses log.Debug elsewhere (lines 88, 108); apply the same pattern here for consistency and proper log level control.

Proposed refactor
-	fmt.Printf("[DEBUG StateIndex.computeDesiredRelease] key=%s\n", key)
+	log.Debug("computeDesiredRelease start", "key", key)
 	rt := si.store.ReleaseTargets.Get(key)
 	if rt == nil {
-		fmt.Printf("[DEBUG StateIndex.computeDesiredRelease] release target NOT FOUND for key=%s\n", key)
+		log.Debug("release target not found", "key", key)
 		return nil, fmt.Errorf("release target %q not found", key)
 	}
-	fmt.Printf("[DEBUG StateIndex.computeDesiredRelease] release target found: envId=%s deployId=%s resourceId=%s\n", rt.EnvironmentId, rt.DeploymentId, rt.ResourceId)
+	log.Debug("release target found", "key", key, "environmentId", rt.EnvironmentId, "deploymentId", rt.DeploymentId, "resourceId", rt.ResourceId)
 
 	release, err := si.planner.PlanDeployment(ctx, rt)
 	if err != nil {
-		fmt.Printf("[DEBUG StateIndex.computeDesiredRelease] PlanDeployment ERROR: %v\n", err)
+		log.Debug("PlanDeployment error", "key", key, "error", err.Error())
 		return nil, err
 	}
 	if release == nil {
-		fmt.Printf("[DEBUG StateIndex.computeDesiredRelease] PlanDeployment returned NIL for key=%s\n", key)
+		log.Debug("PlanDeployment returned nil release", "key", key)
 	} else {
-		fmt.Printf("[DEBUG StateIndex.computeDesiredRelease] PlanDeployment returned release id=%s versionTag=%s\n", release.Id.String(), release.Version.Tag)
+		log.Debug("PlanDeployment returned release", "key", key, "releaseId", release.Id.String(), "versionTag", release.Version.Tag)
 	}

Also apply same pattern to lines 236–242 in the Recompute method.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/releasemanager/state_index.go` around
lines 53 - 70, Replace the always-on fmt.Printf debug prints in
StateIndex.computeDesiredRelease with structured log.Debug calls so logger level
control is respected: use the existing logger (same style as other uses on lines
88/108) to log messages for the key lookup, release-target-not-found, found
target (include rt.EnvironmentId, rt.DeploymentId, rt.ResourceId),
PlanDeployment error, and PlanDeployment result (include release.Id and
release.Version.Tag when non-nil); do the same replacement for the analogous
fmt.Printf lines in the Recompute method (around the 236–242 area) and ensure
you import/use the same log package and message formatting conventions as other
log.Debug uses in this file.

Comment on lines +56 to +63
fmt.Printf("[DEBUG DB Jobs.Set] InsertReleaseJob releaseId=%s jobId=%s\n", releaseID, jobID)
err := queries.InsertReleaseJob(r.ctx, db.InsertReleaseJobParams{
ReleaseID: releaseID,
JobID: jobID,
})
if err != nil {
fmt.Printf("[DEBUG DB Jobs.Set] InsertReleaseJob ERROR (ignored): %v\n", err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Do not swallow InsertReleaseJob failures in Set.

Line 62 logs and ignores the DB error, which can persist a job without its release mapping and break GetByReleaseID correctness.

Proposed fix
 	if entity.ReleaseId != "" {
 		releaseID, err := uuid.Parse(entity.ReleaseId)
-		if err == nil {
-			fmt.Printf("[DEBUG DB Jobs.Set] InsertReleaseJob releaseId=%s jobId=%s\n", releaseID, jobID)
-			err := queries.InsertReleaseJob(r.ctx, db.InsertReleaseJobParams{
-				ReleaseID: releaseID,
-				JobID:     jobID,
-			})
-			if err != nil {
-				fmt.Printf("[DEBUG DB Jobs.Set] InsertReleaseJob ERROR (ignored): %v\n", err)
-			}
-		}
+		if err != nil {
+			return fmt.Errorf("parse release id: %w", err)
+		}
+		if err := queries.InsertReleaseJob(r.ctx, db.InsertReleaseJobParams{
+			ReleaseID: releaseID,
+			JobID:     jobID,
+		}); err != nil {
+			return fmt.Errorf("insert release job mapping: %w", err)
+		}
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/store/repository/db/jobs/repo.go` around
lines 56 - 63, The Set method currently calls queries.InsertReleaseJob and only
logs errors (printf) then continues, which can leave jobs unlinked to releases
and break GetByReleaseID; update the Set implementation (the call to
queries.InsertReleaseJob inside Set) to propagate the error instead of
swallowing it—capture the returned error, wrap or annotate it with context
(e.g., "InsertReleaseJob failed for releaseID/ jobID") and return it (or return
a combined error) so callers see the failure; ensure any surrounding logic that
depends on successful insertion respects the error path so the repository stays
consistent.

@adityachoudhari26 adityachoudhari26 merged commit d3e1dc7 into main Mar 4, 2026
8 checks passed
@adityachoudhari26 adityachoudhari26 deleted the migrate-jobs-db branch March 4, 2026 03:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant