Skip to content

Commit faecbea

Browse files
pranaygpclaude
andcommitted
Fix performance regression: reuse validation reads
The validation logic was causing redundant file/DB reads: - For run events: read run for validation, then read again for update - For step events: read step for validation, then read again for update This fix reuses the validation reads: - world-local: reuse currentRun and validatedStep in event handlers - world-postgres: fetch startedAt in validation read, reuse for step_started Before: 2-3 reads per event After: 1 read per event 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 736cecd commit faecbea

File tree

2 files changed

+120
-105
lines changed

2 files changed

+120
-105
lines changed

packages/world-local/src/storage.ts

Lines changed: 105 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,8 @@ export function createStorage(basedir: string): Storage {
397397
}
398398

399399
// Step-related event validation (ordering and terminal state)
400+
// Store existingStep so we can reuse it later (avoid double read)
401+
let validatedStep: Step | null = null;
400402
const stepEvents = [
401403
'step_started',
402404
'step_completed',
@@ -410,27 +412,27 @@ export function createStorage(basedir: string): Storage {
410412
'steps',
411413
`${stepCompositeKey}.json`
412414
);
413-
const existingStep = await readJSON(stepPath, StepSchema);
415+
validatedStep = await readJSON(stepPath, StepSchema);
414416

415417
// Event ordering: step must exist before these events
416-
if (!existingStep) {
418+
if (!validatedStep) {
417419
throw new WorkflowAPIError(
418420
`Step "${data.correlationId}" not found`,
419421
{ status: 404 }
420422
);
421423
}
422424

423425
// Step terminal state validation
424-
if (isStepTerminal(existingStep.status)) {
426+
if (isStepTerminal(validatedStep.status)) {
425427
throw new WorkflowAPIError(
426-
`Cannot modify step in terminal state "${existingStep.status}"`,
428+
`Cannot modify step in terminal state "${validatedStep.status}"`,
427429
{ status: 409 }
428430
);
429431
}
430432

431433
// On terminal runs: only allow completing/failing in-progress steps
432434
if (currentRun && isRunTerminal(currentRun.status)) {
433-
if (existingStep.status !== 'running') {
435+
if (validatedStep.status !== 'running') {
434436
throw new WorkflowAPIError(
435437
`Cannot modify non-running step on run in terminal state "${currentRun.status}"`,
436438
{ status: 409 }
@@ -498,40 +500,48 @@ export function createStorage(basedir: string): Storage {
498500
const runPath = path.join(basedir, 'runs', `${effectiveRunId}.json`);
499501
await writeJSON(runPath, run);
500502
} else if (data.eventType === 'run_started') {
501-
const runPath = path.join(basedir, 'runs', `${effectiveRunId}.json`);
502-
const existingRun = await readJSON(runPath, WorkflowRunSchema);
503-
if (existingRun) {
503+
// Reuse currentRun from validation (already read above)
504+
if (currentRun) {
505+
const runPath = path.join(
506+
basedir,
507+
'runs',
508+
`${effectiveRunId}.json`
509+
);
504510
run = {
505-
runId: existingRun.runId,
506-
deploymentId: existingRun.deploymentId,
507-
workflowName: existingRun.workflowName,
508-
executionContext: existingRun.executionContext,
509-
input: existingRun.input,
510-
createdAt: existingRun.createdAt,
511-
expiredAt: existingRun.expiredAt,
511+
runId: currentRun.runId,
512+
deploymentId: currentRun.deploymentId,
513+
workflowName: currentRun.workflowName,
514+
executionContext: currentRun.executionContext,
515+
input: currentRun.input,
516+
createdAt: currentRun.createdAt,
517+
expiredAt: currentRun.expiredAt,
512518
status: 'running',
513519
output: undefined,
514520
error: undefined,
515521
completedAt: undefined,
516-
startedAt: existingRun.startedAt ?? now,
522+
startedAt: currentRun.startedAt ?? now,
517523
updatedAt: now,
518524
};
519525
await writeJSON(runPath, run, { overwrite: true });
520526
}
521527
} else if (data.eventType === 'run_completed' && 'eventData' in data) {
522528
const completedData = data.eventData as { output?: any };
523-
const runPath = path.join(basedir, 'runs', `${effectiveRunId}.json`);
524-
const existingRun = await readJSON(runPath, WorkflowRunSchema);
525-
if (existingRun) {
529+
// Reuse currentRun from validation (already read above)
530+
if (currentRun) {
531+
const runPath = path.join(
532+
basedir,
533+
'runs',
534+
`${effectiveRunId}.json`
535+
);
526536
run = {
527-
runId: existingRun.runId,
528-
deploymentId: existingRun.deploymentId,
529-
workflowName: existingRun.workflowName,
530-
executionContext: existingRun.executionContext,
531-
input: existingRun.input,
532-
createdAt: existingRun.createdAt,
533-
expiredAt: existingRun.expiredAt,
534-
startedAt: existingRun.startedAt,
537+
runId: currentRun.runId,
538+
deploymentId: currentRun.deploymentId,
539+
workflowName: currentRun.workflowName,
540+
executionContext: currentRun.executionContext,
541+
input: currentRun.input,
542+
createdAt: currentRun.createdAt,
543+
expiredAt: currentRun.expiredAt,
544+
startedAt: currentRun.startedAt,
535545
status: 'completed',
536546
output: completedData.output,
537547
error: undefined,
@@ -546,18 +556,22 @@ export function createStorage(basedir: string): Storage {
546556
error: any;
547557
errorCode?: string;
548558
};
549-
const runPath = path.join(basedir, 'runs', `${effectiveRunId}.json`);
550-
const existingRun = await readJSON(runPath, WorkflowRunSchema);
551-
if (existingRun) {
559+
// Reuse currentRun from validation (already read above)
560+
if (currentRun) {
561+
const runPath = path.join(
562+
basedir,
563+
'runs',
564+
`${effectiveRunId}.json`
565+
);
552566
run = {
553-
runId: existingRun.runId,
554-
deploymentId: existingRun.deploymentId,
555-
workflowName: existingRun.workflowName,
556-
executionContext: existingRun.executionContext,
557-
input: existingRun.input,
558-
createdAt: existingRun.createdAt,
559-
expiredAt: existingRun.expiredAt,
560-
startedAt: existingRun.startedAt,
567+
runId: currentRun.runId,
568+
deploymentId: currentRun.deploymentId,
569+
workflowName: currentRun.workflowName,
570+
executionContext: currentRun.executionContext,
571+
input: currentRun.input,
572+
createdAt: currentRun.createdAt,
573+
expiredAt: currentRun.expiredAt,
574+
startedAt: currentRun.startedAt,
561575
status: 'failed',
562576
output: undefined,
563577
error: {
@@ -575,18 +589,22 @@ export function createStorage(basedir: string): Storage {
575589
await deleteAllHooksForRun(basedir, effectiveRunId);
576590
}
577591
} else if (data.eventType === 'run_cancelled') {
578-
const runPath = path.join(basedir, 'runs', `${effectiveRunId}.json`);
579-
const existingRun = await readJSON(runPath, WorkflowRunSchema);
580-
if (existingRun) {
592+
// Reuse currentRun from validation (already read above)
593+
if (currentRun) {
594+
const runPath = path.join(
595+
basedir,
596+
'runs',
597+
`${effectiveRunId}.json`
598+
);
581599
run = {
582-
runId: existingRun.runId,
583-
deploymentId: existingRun.deploymentId,
584-
workflowName: existingRun.workflowName,
585-
executionContext: existingRun.executionContext,
586-
input: existingRun.input,
587-
createdAt: existingRun.createdAt,
588-
expiredAt: existingRun.expiredAt,
589-
startedAt: existingRun.startedAt,
600+
runId: currentRun.runId,
601+
deploymentId: currentRun.deploymentId,
602+
workflowName: currentRun.workflowName,
603+
executionContext: currentRun.executionContext,
604+
input: currentRun.input,
605+
createdAt: currentRun.createdAt,
606+
expiredAt: currentRun.expiredAt,
607+
startedAt: currentRun.startedAt,
590608
status: 'cancelled',
591609
output: undefined,
592610
error: undefined,
@@ -630,38 +648,38 @@ export function createStorage(basedir: string): Storage {
630648
} else if (data.eventType === 'step_started') {
631649
// step_started: Increments attempt, sets status to 'running'
632650
// Sets startedAt only on the first start (not updated on retries)
633-
const stepCompositeKey = `${effectiveRunId}-${data.correlationId}`;
634-
const stepPath = path.join(
635-
basedir,
636-
'steps',
637-
`${stepCompositeKey}.json`
638-
);
639-
const existingStep = await readJSON(stepPath, StepSchema);
640-
if (existingStep) {
651+
// Reuse validatedStep from validation (already read above)
652+
if (validatedStep) {
653+
const stepCompositeKey = `${effectiveRunId}-${data.correlationId}`;
654+
const stepPath = path.join(
655+
basedir,
656+
'steps',
657+
`${stepCompositeKey}.json`
658+
);
641659
step = {
642-
...existingStep,
660+
...validatedStep,
643661
status: 'running',
644662
// Only set startedAt on the first start
645-
startedAt: existingStep.startedAt ?? now,
663+
startedAt: validatedStep.startedAt ?? now,
646664
// Increment attempt counter on every start
647-
attempt: existingStep.attempt + 1,
665+
attempt: validatedStep.attempt + 1,
648666
updatedAt: now,
649667
};
650668
await writeJSON(stepPath, step, { overwrite: true });
651669
}
652670
} else if (data.eventType === 'step_completed' && 'eventData' in data) {
653671
// step_completed: Terminal state with output
672+
// Reuse validatedStep from validation (already read above)
654673
const completedData = data.eventData as { result: any };
655-
const stepCompositeKey = `${effectiveRunId}-${data.correlationId}`;
656-
const stepPath = path.join(
657-
basedir,
658-
'steps',
659-
`${stepCompositeKey}.json`
660-
);
661-
const existingStep = await readJSON(stepPath, StepSchema);
662-
if (existingStep) {
674+
if (validatedStep) {
675+
const stepCompositeKey = `${effectiveRunId}-${data.correlationId}`;
676+
const stepPath = path.join(
677+
basedir,
678+
'steps',
679+
`${stepCompositeKey}.json`
680+
);
663681
step = {
664-
...existingStep,
682+
...validatedStep,
665683
status: 'completed',
666684
output: completedData.result,
667685
completedAt: now,
@@ -671,18 +689,18 @@ export function createStorage(basedir: string): Storage {
671689
}
672690
} else if (data.eventType === 'step_failed' && 'eventData' in data) {
673691
// step_failed: Terminal state with error
692+
// Reuse validatedStep from validation (already read above)
674693
const failedData = data.eventData as {
675694
error: any;
676695
stack?: string;
677696
};
678-
const stepCompositeKey = `${effectiveRunId}-${data.correlationId}`;
679-
const stepPath = path.join(
680-
basedir,
681-
'steps',
682-
`${stepCompositeKey}.json`
683-
);
684-
const existingStep = await readJSON(stepPath, StepSchema);
685-
if (existingStep) {
697+
if (validatedStep) {
698+
const stepCompositeKey = `${effectiveRunId}-${data.correlationId}`;
699+
const stepPath = path.join(
700+
basedir,
701+
'steps',
702+
`${stepCompositeKey}.json`
703+
);
686704
const error = {
687705
message:
688706
typeof failedData.error === 'string'
@@ -691,7 +709,7 @@ export function createStorage(basedir: string): Storage {
691709
stack: failedData.stack,
692710
};
693711
step = {
694-
...existingStep,
712+
...validatedStep,
695713
status: 'failed',
696714
error,
697715
completedAt: now,
@@ -701,21 +719,21 @@ export function createStorage(basedir: string): Storage {
701719
}
702720
} else if (data.eventType === 'step_retrying' && 'eventData' in data) {
703721
// step_retrying: Sets status back to 'pending', records error
722+
// Reuse validatedStep from validation (already read above)
704723
const retryData = data.eventData as {
705724
error: any;
706725
stack?: string;
707726
retryAfter?: Date;
708727
};
709-
const stepCompositeKey = `${effectiveRunId}-${data.correlationId}`;
710-
const stepPath = path.join(
711-
basedir,
712-
'steps',
713-
`${stepCompositeKey}.json`
714-
);
715-
const existingStep = await readJSON(stepPath, StepSchema);
716-
if (existingStep) {
728+
if (validatedStep) {
729+
const stepCompositeKey = `${effectiveRunId}-${data.correlationId}`;
730+
const stepPath = path.join(
731+
basedir,
732+
'steps',
733+
`${stepCompositeKey}.json`
734+
);
717735
step = {
718-
...existingStep,
736+
...validatedStep,
719737
status: 'pending',
720738
error: {
721739
message:

packages/world-postgres/src/storage.ts

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,9 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] {
279279
}
280280

281281
// Step-related event validation (ordering and terminal state)
282+
// Fetch status + startedAt so we can reuse for step_started (avoid double read)
283+
let validatedStep: { status: string; startedAt: Date | null } | null =
284+
null;
282285
const stepEvents = [
283286
'step_started',
284287
'step_completed',
@@ -287,7 +290,10 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] {
287290
];
288291
if (stepEvents.includes(data.eventType) && data.correlationId) {
289292
const [existingStep] = await drizzle
290-
.select({ status: Schema.steps.status })
293+
.select({
294+
status: Schema.steps.status,
295+
startedAt: Schema.steps.startedAt,
296+
})
291297
.from(Schema.steps)
292298
.where(
293299
and(
@@ -297,24 +303,26 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] {
297303
)
298304
.limit(1);
299305

306+
validatedStep = existingStep ?? null;
307+
300308
// Event ordering: step must exist before these events
301-
if (!existingStep) {
309+
if (!validatedStep) {
302310
throw new WorkflowAPIError(`Step "${data.correlationId}" not found`, {
303311
status: 404,
304312
});
305313
}
306314

307315
// Step terminal state validation
308-
if (isStepTerminal(existingStep.status)) {
316+
if (isStepTerminal(validatedStep.status)) {
309317
throw new WorkflowAPIError(
310-
`Cannot modify step in terminal state "${existingStep.status}"`,
318+
`Cannot modify step in terminal state "${validatedStep.status}"`,
311319
{ status: 409 }
312320
);
313321
}
314322

315323
// On terminal runs: only allow completing/failing in-progress steps
316324
if (currentRun && isRunTerminal(currentRun.status)) {
317-
if (existingStep.status !== 'running') {
325+
if (validatedStep.status !== 'running') {
318326
throw new WorkflowAPIError(
319327
`Cannot modify non-running step on run in terminal state "${currentRun.status}"`,
320328
{ status: 409 }
@@ -491,20 +499,9 @@ export function createEventsStorage(drizzle: Drizzle): Storage['events'] {
491499

492500
// Handle step_started event: increment attempt, set status to 'running'
493501
// Sets startedAt (maps to startedAt) only on first start
502+
// Reuse validatedStep from validation (already read above)
494503
if (data.eventType === 'step_started') {
495-
// First, get the current step to check attempt
496-
const [existingStep] = await drizzle
497-
.select()
498-
.from(Schema.steps)
499-
.where(
500-
and(
501-
eq(Schema.steps.runId, effectiveRunId),
502-
eq(Schema.steps.stepId, data.correlationId!)
503-
)
504-
)
505-
.limit(1);
506-
507-
const isFirstStart = !existingStep?.startedAt;
504+
const isFirstStart = !validatedStep?.startedAt;
508505

509506
const [stepValue] = await drizzle
510507
.update(Schema.steps)

0 commit comments

Comments
 (0)