diff --git a/packages/aws-durable-execution-sdk-js-examples/src/examples/wait-for-callback/quick-completion/wait-for-callback-quick-completion.test.ts b/packages/aws-durable-execution-sdk-js-examples/src/examples/wait-for-callback/quick-completion/wait-for-callback-quick-completion.test.ts new file mode 100644 index 00000000..26775d42 --- /dev/null +++ b/packages/aws-durable-execution-sdk-js-examples/src/examples/wait-for-callback/quick-completion/wait-for-callback-quick-completion.test.ts @@ -0,0 +1,37 @@ +import { InvocationType } from "@aws-sdk/client-lambda"; +import { handler } from "./wait-for-callback-quick-completion"; +import { createTests } from "../../../utils/test-helper"; + +createTests({ + handler, + invocationType: InvocationType.Event, + localRunnerConfig: { + skipTime: false, + }, + tests: (runner, { isCloud }) => { + it("should handle waitForCallback when callback completes before ", async () => { + const callbackOp = runner.getOperationByIndex(0); + + const executionPromise = runner.run({ + payload: isCloud + ? { + submitterDelay: 5, + } + : {}, + }); + + // only wait for started status + await callbackOp.waitForData(); + + await callbackOp.sendCallbackSuccess("{}"); + + const result = await executionPromise; + + expect(result.getResult()).toEqual({ + callbackResult: "{}", + success: true, + }); + expect(result.getInvocations().length).toBe(1); + }); + }, +}); diff --git a/packages/aws-durable-execution-sdk-js-examples/src/examples/wait-for-callback/quick-completion/wait-for-callback-quick-completion.ts b/packages/aws-durable-execution-sdk-js-examples/src/examples/wait-for-callback/quick-completion/wait-for-callback-quick-completion.ts new file mode 100644 index 00000000..0f8e3201 --- /dev/null +++ b/packages/aws-durable-execution-sdk-js-examples/src/examples/wait-for-callback/quick-completion/wait-for-callback-quick-completion.ts @@ -0,0 +1,29 @@ +import { + DurableContext, + withDurableExecution, +} from "@aws/durable-execution-sdk-js"; +import { ExampleConfig } from "../../../types"; + +export const config: ExampleConfig = { + name: "Wait for Callback - Quick Completion", + description: + "Demonstrates waitForCallback invocation-level completion scenario", +}; + +export const handler = withDurableExecution( + async (event: { submitterDelay: number }, context: DurableContext) => { + const result = await context.waitForCallback(async () => { + if (event.submitterDelay) { + await new Promise((resolve) => + setTimeout(resolve, event.submitterDelay * 1000), + ); + } + return Promise.resolve(); + }); + + return { + callbackResult: result, + success: true, + }; + }, +); diff --git a/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-central-termination.test.ts b/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-central-termination.test.ts index ed7b918f..879b9a63 100644 --- a/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-central-termination.test.ts +++ b/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-central-termination.test.ts @@ -4,6 +4,7 @@ import { TerminationReason } from "../../termination-manager/types"; import { OperationLifecycleState, OperationSubType } from "../../types"; import { OperationType } from "@aws-sdk/client-lambda"; import { EventEmitter } from "events"; +import { hashId } from "../step-id-utils/step-id-utils"; jest.mock("../logger/logger"); @@ -199,6 +200,233 @@ describe("CheckpointManager - Centralized Termination", () => { await expect(promise).resolves.toBeUndefined(); }); + + describe("terminal status instant resolution", () => { + it.each([ + { status: "SUCCEEDED", operationStatus: "SUCCEEDED" }, + { status: "CANCELLED", operationStatus: "CANCELLED" }, + { status: "FAILED", operationStatus: "FAILED" }, + { status: "STOPPED", operationStatus: "STOPPED" }, + { status: "TIMED_OUT", operationStatus: "TIMED_OUT" }, + ])( + "should instantly resolve when status is $status", + async ({ operationStatus }) => { + const stepId = "step-1"; + + // Create operation in RETRY_WAITING state + checkpointManager.markOperationState( + stepId, + OperationLifecycleState.RETRY_WAITING, + { + metadata: { + stepId, + type: OperationType.STEP, + subType: OperationSubType.STEP, + }, + endTimestamp: new Date(Date.now() + 5000), + }, + ); + + // Set up stepData with terminal status + const hashedStepId = hashId(stepId); + (checkpointManager as any).stepData[hashedStepId] = { + Id: hashedStepId, + Status: operationStatus, + }; + + jest.clearAllTimers(); + + // Call waitForRetryTimer - should resolve immediately + const promise = checkpointManager.waitForRetryTimer(stepId); + + await expect(promise).resolves.toBeUndefined(); + + // Verify no polling was set up + const ops = checkpointManager.getAllOperations(); + const op = ops.get(stepId); + expect(op?.timer).toBeUndefined(); + expect(op?.resolver).toBeUndefined(); + expect(op?.pollCount).toBeUndefined(); + expect(op?.pollStartTime).toBeUndefined(); + + // Verify no timers were scheduled + expect(jest.getTimerCount()).toBe(0); + }, + ); + + it("should instantly resolve when status is terminal even with future endTimestamp", async () => { + const stepId = "step-1"; + + // Create operation with future endTimestamp + checkpointManager.markOperationState( + stepId, + OperationLifecycleState.RETRY_WAITING, + { + metadata: { + stepId, + type: OperationType.STEP, + subType: OperationSubType.STEP, + }, + endTimestamp: new Date(Date.now() + 10000), // 10 seconds in future + }, + ); + + // Set up stepData with terminal status + const hashedStepId = hashId(stepId); + (checkpointManager as any).stepData[hashedStepId] = { + Id: hashedStepId, + Status: "SUCCEEDED", + }; + + jest.clearAllTimers(); + + // Call waitForRetryTimer - should resolve immediately despite future endTimestamp + const promise = checkpointManager.waitForRetryTimer(stepId); + + await expect(promise).resolves.toBeUndefined(); + + // Verify no polling was set up + const ops = checkpointManager.getAllOperations(); + const op = ops.get(stepId); + expect(op?.timer).toBeUndefined(); + expect(op?.resolver).toBeUndefined(); + expect(op?.pollCount).toBeUndefined(); + expect(op?.pollStartTime).toBeUndefined(); + + // Verify no timers were scheduled + expect(jest.getTimerCount()).toBe(0); + }); + + it("should set up polling when status is not terminal", async () => { + const stepId = "step-1"; + + // Create operation in RETRY_WAITING state + checkpointManager.markOperationState( + stepId, + OperationLifecycleState.RETRY_WAITING, + { + metadata: { + stepId, + type: OperationType.STEP, + subType: OperationSubType.STEP, + }, + endTimestamp: new Date(Date.now() + 5000), + }, + ); + + // Set up stepData with non-terminal status + const hashedStepId = hashId(stepId); + (checkpointManager as any).stepData[hashedStepId] = { + Id: hashedStepId, + Status: "STARTED", // Non-terminal status + }; + + jest.clearAllTimers(); + + // Call waitForRetryTimer - should set up polling + const promise = checkpointManager.waitForRetryTimer(stepId); + + // Verify polling was set up + const ops = checkpointManager.getAllOperations(); + const op = ops.get(stepId); + expect(op?.timer).toBeDefined(); + expect(op?.resolver).toBeDefined(); + expect(op?.pollCount).toBe(0); + expect(op?.pollStartTime).toBeDefined(); + + // Verify timer was scheduled + expect(jest.getTimerCount()).toBeGreaterThan(0); + + // Clean up by resolving the operation + op?.resolver?.(); + await promise; + }); + + it("should set up polling when stepData is missing", async () => { + const stepId = "step-1"; + + // Create operation in RETRY_WAITING state + checkpointManager.markOperationState( + stepId, + OperationLifecycleState.RETRY_WAITING, + { + metadata: { + stepId, + type: OperationType.STEP, + subType: OperationSubType.STEP, + }, + endTimestamp: new Date(Date.now() + 5000), + }, + ); + + // Don't set up stepData - should be missing/undefined + + jest.clearAllTimers(); + + // Call waitForRetryTimer - should set up polling + const promise = checkpointManager.waitForRetryTimer(stepId); + + // Verify polling was set up + const ops = checkpointManager.getAllOperations(); + const op = ops.get(stepId); + expect(op?.timer).toBeDefined(); + expect(op?.resolver).toBeDefined(); + expect(op?.pollCount).toBe(0); + expect(op?.pollStartTime).toBeDefined(); + + // Verify timer was scheduled + expect(jest.getTimerCount()).toBeGreaterThan(0); + + // Clean up by resolving the operation + op?.resolver?.(); + await promise; + }); + + it("should set up polling when stepData status is undefined", async () => { + const stepId = "step-1"; + + // Create operation in RETRY_WAITING state + checkpointManager.markOperationState( + stepId, + OperationLifecycleState.RETRY_WAITING, + { + metadata: { + stepId, + type: OperationType.STEP, + subType: OperationSubType.STEP, + }, + endTimestamp: new Date(Date.now() + 5000), + }, + ); + + // Set up stepData without status + const hashedStepId = hashId(stepId); + (checkpointManager as any).stepData[hashedStepId] = { + Id: hashedStepId, + // Status is undefined + }; + + jest.clearAllTimers(); + + // Call waitForRetryTimer - should set up polling + const promise = checkpointManager.waitForRetryTimer(stepId); + + // Verify polling was set up + const ops = checkpointManager.getAllOperations(); + const op = ops.get(stepId); + expect(op?.timer).toBeDefined(); + expect(op?.resolver).toBeDefined(); + expect(op?.pollCount).toBe(0); + expect(op?.pollStartTime).toBeDefined(); + + // Verify timer was scheduled + expect(jest.getTimerCount()).toBeGreaterThan(0); + + // Clean up by resolving the operation + op?.resolver?.(); + await promise; + }); + }); }); describe("waitForStatusChange", () => { @@ -252,6 +480,229 @@ describe("CheckpointManager - Centralized Termination", () => { await expect(promise).resolves.toBeUndefined(); }); + + describe("terminal status instant resolution", () => { + it.each([ + { status: "SUCCEEDED", operationStatus: "SUCCEEDED" }, + { status: "CANCELLED", operationStatus: "CANCELLED" }, + { status: "FAILED", operationStatus: "FAILED" }, + { status: "STOPPED", operationStatus: "STOPPED" }, + { status: "TIMED_OUT", operationStatus: "TIMED_OUT" }, + ])( + "should instantly resolve when status is $status", + async ({ operationStatus }) => { + const stepId = "step-1"; + + // Create operation in IDLE_AWAITED state + checkpointManager.markOperationState( + stepId, + OperationLifecycleState.IDLE_AWAITED, + { + metadata: { + stepId, + type: OperationType.STEP, + subType: OperationSubType.WAIT, + }, + }, + ); + + // Set up stepData with terminal status + const hashedStepId = hashId(stepId); + (checkpointManager as any).stepData[hashedStepId] = { + Id: hashedStepId, + Status: operationStatus, + }; + + jest.clearAllTimers(); + + // Call waitForStatusChange - should resolve immediately + const promise = checkpointManager.waitForStatusChange(stepId); + + await expect(promise).resolves.toBeUndefined(); + + // Verify no polling was set up + const ops = checkpointManager.getAllOperations(); + const op = ops.get(stepId); + expect(op?.timer).toBeUndefined(); + expect(op?.resolver).toBeUndefined(); + expect(op?.pollCount).toBeUndefined(); + expect(op?.pollStartTime).toBeUndefined(); + + // Verify no timers were scheduled + expect(jest.getTimerCount()).toBe(0); + }, + ); + + it("should instantly resolve when status is terminal even with endTimestamp", async () => { + const stepId = "step-1"; + + // Create operation with future endTimestamp + checkpointManager.markOperationState( + stepId, + OperationLifecycleState.IDLE_AWAITED, + { + metadata: { + stepId, + type: OperationType.STEP, + subType: OperationSubType.WAIT, + }, + endTimestamp: new Date(Date.now() + 10000), // 10 seconds in future + }, + ); + + // Set up stepData with terminal status + const hashedStepId = hashId(stepId); + (checkpointManager as any).stepData[hashedStepId] = { + Id: hashedStepId, + Status: "SUCCEEDED", + }; + + jest.clearAllTimers(); + + // Call waitForStatusChange - should resolve immediately despite endTimestamp + const promise = checkpointManager.waitForStatusChange(stepId); + + await expect(promise).resolves.toBeUndefined(); + + // Verify no polling was set up + const ops = checkpointManager.getAllOperations(); + const op = ops.get(stepId); + expect(op?.timer).toBeUndefined(); + expect(op?.resolver).toBeUndefined(); + expect(op?.pollCount).toBeUndefined(); + expect(op?.pollStartTime).toBeUndefined(); + + // Verify no timers were scheduled + expect(jest.getTimerCount()).toBe(0); + }); + + it("should set up polling when status is not terminal", async () => { + const stepId = "step-1"; + + // Create operation in IDLE_AWAITED state + checkpointManager.markOperationState( + stepId, + OperationLifecycleState.IDLE_AWAITED, + { + metadata: { + stepId, + type: OperationType.STEP, + subType: OperationSubType.WAIT, + }, + }, + ); + + // Set up stepData with non-terminal status + const hashedStepId = hashId(stepId); + (checkpointManager as any).stepData[hashedStepId] = { + Id: hashedStepId, + Status: "STARTED", // Non-terminal status + }; + + jest.clearAllTimers(); + + // Call waitForStatusChange - should set up polling + const promise = checkpointManager.waitForStatusChange(stepId); + + // Verify polling was set up + const ops = checkpointManager.getAllOperations(); + const op = ops.get(stepId); + expect(op?.timer).toBeDefined(); + expect(op?.resolver).toBeDefined(); + expect(op?.pollCount).toBe(0); + expect(op?.pollStartTime).toBeDefined(); + + // Verify timer was scheduled + expect(jest.getTimerCount()).toBeGreaterThan(0); + + // Clean up by resolving the operation + op?.resolver?.(); + await promise; + }); + + it("should set up polling when stepData is missing", async () => { + const stepId = "step-1"; + + // Create operation in IDLE_AWAITED state + checkpointManager.markOperationState( + stepId, + OperationLifecycleState.IDLE_AWAITED, + { + metadata: { + stepId, + type: OperationType.STEP, + subType: OperationSubType.WAIT, + }, + }, + ); + + // Don't set up stepData - should be missing/undefined + + jest.clearAllTimers(); + + // Call waitForStatusChange - should set up polling + const promise = checkpointManager.waitForStatusChange(stepId); + + // Verify polling was set up + const ops = checkpointManager.getAllOperations(); + const op = ops.get(stepId); + expect(op?.timer).toBeDefined(); + expect(op?.resolver).toBeDefined(); + expect(op?.pollCount).toBe(0); + expect(op?.pollStartTime).toBeDefined(); + + // Verify timer was scheduled + expect(jest.getTimerCount()).toBeGreaterThan(0); + + // Clean up by resolving the operation + op?.resolver?.(); + await promise; + }); + + it("should set up polling when stepData status is undefined", async () => { + const stepId = "step-1"; + + // Create operation in IDLE_AWAITED state + checkpointManager.markOperationState( + stepId, + OperationLifecycleState.IDLE_AWAITED, + { + metadata: { + stepId, + type: OperationType.STEP, + subType: OperationSubType.WAIT, + }, + }, + ); + + // Set up stepData without status + const hashedStepId = hashId(stepId); + (checkpointManager as any).stepData[hashedStepId] = { + Id: hashedStepId, + // Status is undefined + }; + + jest.clearAllTimers(); + + // Call waitForStatusChange - should set up polling + const promise = checkpointManager.waitForStatusChange(stepId); + + // Verify polling was set up + const ops = checkpointManager.getAllOperations(); + const op = ops.get(stepId); + expect(op?.timer).toBeDefined(); + expect(op?.resolver).toBeDefined(); + expect(op?.pollCount).toBe(0); + expect(op?.pollStartTime).toBeDefined(); + + // Verify timer was scheduled + expect(jest.getTimerCount()).toBeGreaterThan(0); + + // Clean up by resolving the operation + op?.resolver?.(); + await promise; + }); + }); }); describe("termination cooldown", () => { diff --git a/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-manager.ts b/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-manager.ts index 6d526e50..979458af 100644 --- a/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-manager.ts +++ b/packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-manager.ts @@ -2,6 +2,7 @@ import { CheckpointDurableExecutionRequest, OperationUpdate, Operation, + OperationStatus, } from "@aws-sdk/client-lambda"; import { DurableExecutionClient } from "../../types/durable-execution"; import { log } from "../logger/logger"; @@ -23,6 +24,14 @@ import { export const STEP_DATA_UPDATED_EVENT = "stepDataUpdated"; +const TERMINAL_STATUSES: OperationStatus[] = [ + OperationStatus.SUCCEEDED, + OperationStatus.CANCELLED, + OperationStatus.FAILED, + OperationStatus.STOPPED, + OperationStatus.TIMED_OUT, +]; + interface QueuedCheckpoint { stepId: string; data: Partial; @@ -494,6 +503,12 @@ export class CheckpointManager implements Checkpoint { ); } + // Resolve immediately if the step was completed already + const stepData = this.stepData[hashId(stepId)]; + if (stepData?.Status && TERMINAL_STATUSES.includes(stepData.Status)) { + return Promise.resolve(); + } + // Start timer with polling this.startTimerWithPolling(stepId, op.endTimestamp); @@ -515,6 +530,12 @@ export class CheckpointManager implements Checkpoint { ); } + // Resolve immediately if the step was completed already + const stepData = this.stepData[hashId(stepId)]; + if (stepData?.Status && TERMINAL_STATUSES.includes(stepData.Status)) { + return Promise.resolve(); + } + // Start timer with polling this.startTimerWithPolling(stepId, op.endTimestamp);