From 87e21729047178cc1baa55821d4672d32e7303bd Mon Sep 17 00:00:00 2001 From: Anthony Ting Date: Tue, 16 Dec 2025 13:33:55 -0800 Subject: [PATCH] fix(testing-sdk): support PENDING status reinvocation and execution failure --- ...-for-callback-multiple-invocations.test.ts | 3 + .../__tests__/execution-handlers.test.ts | 10 +- .../handlers/execution-handlers.ts | 5 +- .../checkpoint-manager-dirty.test.ts | 9 + .../__tests__/execution-manager.test.ts | 96 ++- .../storage/checkpoint-manager.ts | 4 + .../storage/execution-manager.ts | 12 +- .../worker-api/worker-api-response.ts | 7 +- .../__tests__/checkpoint-worker.test.ts | 10 +- ...tion-orchestrator-invocation-order.test.ts | 34 +- ...ion-orchestrator-pending-rejection.test.ts | 765 ++++++++++++++++++ .../test-execution-orchestrator.test.ts | 11 +- .../local/api-client/checkpoint-api-client.ts | 5 +- .../checkpoint-worker-api-client.ts | 5 +- .../__tests__/invocation-tracker.test.ts | 15 +- .../local/operations/invocation-tracker.ts | 5 +- .../local/test-execution-orchestrator.ts | 94 ++- 17 files changed, 1032 insertions(+), 58 deletions(-) create mode 100644 packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator-pending-rejection.test.ts diff --git a/packages/aws-durable-execution-sdk-js-examples/src/examples/wait-for-callback/multiple-invocations/wait-for-callback-multiple-invocations.test.ts b/packages/aws-durable-execution-sdk-js-examples/src/examples/wait-for-callback/multiple-invocations/wait-for-callback-multiple-invocations.test.ts index 97d114b95..92e6bc4bc 100644 --- a/packages/aws-durable-execution-sdk-js-examples/src/examples/wait-for-callback/multiple-invocations/wait-for-callback-multiple-invocations.test.ts +++ b/packages/aws-durable-execution-sdk-js-examples/src/examples/wait-for-callback/multiple-invocations/wait-for-callback-multiple-invocations.test.ts @@ -8,6 +8,9 @@ import { createTests } from "../../../utils/test-helper"; createTests({ handler, invocationType: InvocationType.Event, + localRunnerConfig: { + skipTime: false, + }, tests: (runner) => { it("should handle multiple invocations tracking with waitForCallback operations", async () => { // Get operations for verification diff --git a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/execution-handlers.test.ts b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/execution-handlers.test.ts index f525e8884..7846f6e1c 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/execution-handlers.test.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/execution-handlers.test.ts @@ -141,7 +141,10 @@ describe("execution handlers", () => { Error: { Payload: undefined }, }, }; - completeInvocationSpy.mockReturnValue(mockEvent); + completeInvocationSpy.mockReturnValue({ + event: mockEvent, + hasDirtyOperations: false, + }); const errorObject: ErrorObject = { ErrorType: "TestError", @@ -160,7 +163,10 @@ describe("execution handlers", () => { invocationId, errorObject, ); - expect(result).toEqual(mockEvent); + expect(result).toEqual({ + event: mockEvent, + hasDirtyOperations: false, + }); }); }); }); diff --git a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/execution-handlers.ts b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/execution-handlers.ts index d7722f5aa..8aa41033f 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/execution-handlers.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/execution-handlers.ts @@ -1,4 +1,4 @@ -import { ErrorObject, Event } from "@aws-sdk/client-lambda"; +import { ErrorObject } from "@aws-sdk/client-lambda"; import { createExecutionId, ExecutionId, @@ -12,6 +12,7 @@ import { StartDurableExecutionRequest, StartInvocationRequest, } from "../worker-api/worker-api-request"; +import { CompleteInvocationResponse } from "../worker-api/worker-api-response"; /** * Starts a durable execution. Returns the data needed for the handler invocation event. @@ -42,6 +43,6 @@ export function processCompleteInvocation( invocationId: InvocationId, error: ErrorObject | undefined, executionManager: ExecutionManager, -): Event { +): CompleteInvocationResponse { return executionManager.completeInvocation(executionId, invocationId, error); } diff --git a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/__tests__/checkpoint-manager-dirty.test.ts b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/__tests__/checkpoint-manager-dirty.test.ts index 0ac1349b6..6b42a30bf 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/__tests__/checkpoint-manager-dirty.test.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/__tests__/checkpoint-manager-dirty.test.ts @@ -36,6 +36,7 @@ describe("CheckpointManager dirty operation tracking", () => { const dirtyOperations = storage.getDirtyOperations(); expect(dirtyOperations).toEqual([]); + expect(storage.hasDirtyOperations()).toBe(false); }); it("should return dirty operations and clear the dirty set", () => { @@ -51,8 +52,11 @@ describe("CheckpointManager dirty operation tracking", () => { expect(firstCall).toHaveLength(1); expect(firstCall[0].Id).toBe("dirty-step"); + expect(storage.hasDirtyOperations()).toBe(false); + const secondCall = storage.getDirtyOperations(); expect(secondCall).toEqual([]); + expect(storage.hasDirtyOperations()).toBe(false); }); it("should return multiple dirty operations in correct order", () => { @@ -86,6 +90,7 @@ describe("CheckpointManager dirty operation tracking", () => { "second-op", "third-op", ]); + expect(storage.hasDirtyOperations()).toBe(false); }); }); @@ -100,16 +105,20 @@ describe("CheckpointManager dirty operation tracking", () => { storage.registerUpdate(stepUpdate); // Verify operation is dirty before invocation + expect(storage.hasDirtyOperations()).toBe(true); const dirtyBefore = storage.getDirtyOperations(); expect(dirtyBefore).toHaveLength(1); + expect(storage.hasDirtyOperations()).toBe(false); // Start new invocation should clear dirty set const invocationId = createInvocationId("test-invocation"); storage.startInvocation(invocationId); + expect(storage.hasDirtyOperations()).toBe(false); // Verify dirty set is cleared after invocation const dirtyAfter = storage.getDirtyOperations(); expect(dirtyAfter).toEqual([]); + expect(storage.hasDirtyOperations()).toBe(false); }); it("should return all operations from startInvocation", () => { diff --git a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/__tests__/execution-manager.test.ts b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/__tests__/execution-manager.test.ts index f4ffb69b9..065d2c0f5 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/__tests__/execution-manager.test.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/__tests__/execution-manager.test.ts @@ -4,6 +4,7 @@ import { OperationType, EventType, ErrorObject, + OperationAction, } from "@aws-sdk/client-lambda"; import { ExecutionManager, StartExecutionParams } from "../execution-manager"; import { CheckpointManager } from "../checkpoint-manager"; @@ -520,7 +521,10 @@ describe("execution-manager", () => { ); // Verify the result - expect(result).toBe(mockHistoryEvent); + expect(result).toEqual({ + event: mockHistoryEvent, + hasDirtyOperations: false, + }); // Verify CheckpointManager.completeInvocation was called expect(completeInvocationSpy).toHaveBeenCalledWith(invocationId); @@ -593,7 +597,10 @@ describe("execution-manager", () => { ); // Verify the result - expect(result).toBe(mockHistoryEvent); + expect(result).toEqual({ + event: mockHistoryEvent, + hasDirtyOperations: false, + }); // Verify CheckpointManager.completeInvocation was called expect(completeInvocationSpy).toHaveBeenCalledWith(invocationId); @@ -642,6 +649,91 @@ describe("execution-manager", () => { // Verify CheckpointManager.completeInvocation was called expect(completeInvocationSpy).toHaveBeenCalledWith(invocationId); }); + + it("should return hasDirtyOperations as true when checkpoint storage has dirty operations", () => { + // Create an execution first + const executionId = createExecutionId("test-execution-id"); + const invocationId = createInvocationId("test-invocation-id"); + executionManager.startExecution({ executionId, invocationId }); + + const storage = executionManager.getCheckpointsByExecution(executionId); + expect(storage).toBeDefined(); + + // Register an operation to make the storage dirty + const stepUpdate = { + Id: "test-step-operation", + Type: OperationType.STEP, + Action: OperationAction.START, + }; + storage!.registerUpdate(stepUpdate); + + // Mock the completeInvocation method on CheckpointManager + const mockTimestamps = { + startTimestamp: new Date("2023-01-01T00:00:00.000Z"), + endTimestamp: new Date("2023-01-01T00:01:00.000Z"), + }; + + const completeInvocationSpy = jest + .spyOn(storage!, "completeInvocation") + .mockReturnValue(mockTimestamps); + + // Mock the eventProcessor.createHistoryEvent method + const mockHistoryEvent = { + EventType: EventType.InvocationCompleted, + Timestamp: new Date(), + InvocationCompletedDetails: { + StartTimestamp: mockTimestamps.startTimestamp, + EndTimestamp: mockTimestamps.endTimestamp, + Error: { + Payload: undefined, + }, + RequestId: invocationId, + }, + }; + + const createHistoryEventSpy = jest + .spyOn(storage!.eventProcessor, "createHistoryEvent") + .mockReturnValue(mockHistoryEvent); + + // Mock hasDirtyOperations to return true + const hasDirtyOperationsSpy = jest + .spyOn(storage!, "hasDirtyOperations") + .mockReturnValue(true); + + // Call the method + const result = executionManager.completeInvocation( + executionId, + invocationId, + undefined, + ); + + // Verify the result shows dirty operations exist + expect(result).toEqual({ + event: mockHistoryEvent, + hasDirtyOperations: true, + }); + + // Verify CheckpointManager.completeInvocation was called + expect(completeInvocationSpy).toHaveBeenCalledWith(invocationId); + + // Verify hasDirtyOperations was called + expect(hasDirtyOperationsSpy).toHaveBeenCalled(); + + // Verify eventProcessor.createHistoryEvent was called with correct parameters + expect(createHistoryEventSpy).toHaveBeenCalledWith( + EventType.InvocationCompleted, + undefined, + "InvocationCompletedDetails", + { + StartTimestamp: mockTimestamps.startTimestamp, + EndTimestamp: mockTimestamps.endTimestamp, + Error: { + Payload: undefined, + }, + RequestId: invocationId, + }, + ); + }); }); }); }); diff --git a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/checkpoint-manager.ts b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/checkpoint-manager.ts index b4f66e53f..e9b14aec9 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/checkpoint-manager.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/checkpoint-manager.ts @@ -50,6 +50,10 @@ export class CheckpointManager { return this.operationDataMap; } + hasDirtyOperations(): boolean { + return !!this.dirtyOperationIds.size; + } + getDirtyOperations(): Operation[] { const dirtyOperations = Array.from(this.dirtyOperationIds).map((id) => { const operationEvents = this.operationDataMap.get(id); diff --git a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/execution-manager.ts b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/execution-manager.ts index f0d602c99..d18bcc2d5 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/execution-manager.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/storage/execution-manager.ts @@ -9,11 +9,12 @@ import { CallbackId, CheckpointToken } from "../utils/tagged-strings"; import { ExecutionId, InvocationId } from "../utils/tagged-strings"; import { decodeCallbackId } from "../utils/callback-id"; import { OperationEvents } from "../../test-runner/common/operations/operation-with-data"; -import { ErrorObject, EventType, Event } from "@aws-sdk/client-lambda"; +import { ErrorObject, EventType } from "@aws-sdk/client-lambda"; import { StartDurableExecutionRequest, StartInvocationRequest, } from "../worker-api/worker-api-request"; +import { CompleteInvocationResponse } from "../worker-api/worker-api-response"; export interface InvocationResult { checkpointToken: CheckpointToken; @@ -105,7 +106,7 @@ export class ExecutionManager { executionId: ExecutionId, invocationId: InvocationId, error: ErrorObject | undefined, - ): Event { + ): CompleteInvocationResponse { const checkpointStorage = this.executions.get(executionId); if (!checkpointStorage) { @@ -115,7 +116,7 @@ export class ExecutionManager { const { startTimestamp, endTimestamp } = checkpointStorage.completeInvocation(invocationId); - return checkpointStorage.eventProcessor.createHistoryEvent( + const event = checkpointStorage.eventProcessor.createHistoryEvent( EventType.InvocationCompleted, undefined, "InvocationCompletedDetails", @@ -128,6 +129,11 @@ export class ExecutionManager { RequestId: invocationId, }, ); + + return { + event, + hasDirtyOperations: checkpointStorage.hasDirtyOperations(), + }; } /** diff --git a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker-api/worker-api-response.ts b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker-api/worker-api-response.ts index ecd80d99c..90804aaf8 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker-api/worker-api-response.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker-api/worker-api-response.ts @@ -16,10 +16,15 @@ export interface PollCheckpointDataResponse { operations: CheckpointOperation[]; } +export interface CompleteInvocationResponse { + event: Event; + hasDirtyOperations: boolean; +} + export interface WorkerApiResponseMapping { [ApiType.StartDurableExecution]: InvocationResult; [ApiType.StartInvocation]: InvocationResult; - [ApiType.CompleteInvocation]: Event; + [ApiType.CompleteInvocation]: CompleteInvocationResponse; [ApiType.UpdateCheckpointData]: Record; [ApiType.PollCheckpointData]: Promise; [ApiType.GetDurableExecutionState]: GetDurableExecutionStateResponse; diff --git a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker/__tests__/checkpoint-worker.test.ts b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker/__tests__/checkpoint-worker.test.ts index 922ef715a..83373a7b1 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker/__tests__/checkpoint-worker.test.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/worker/__tests__/checkpoint-worker.test.ts @@ -306,7 +306,10 @@ describe("CheckpointWorker", () => { Timestamp: new Date(), Type: "InvocationComplete", }; - mockApiHandlerInstance.performApiCall.mockReturnValue(mockEvent); + mockApiHandlerInstance.performApiCall.mockReturnValue({ + event: mockEvent, + hasDirtyOperations: false, + }); messageHandler(command); @@ -318,7 +321,10 @@ describe("CheckpointWorker", () => { data: { type: ApiType.CompleteInvocation, requestId: "complete-request-456", - response: mockEvent, + response: { + event: mockEvent, + hasDirtyOperations: false, + }, }, }); }); diff --git a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator-invocation-order.test.ts b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator-invocation-order.test.ts index 4a85a048c..bb9fefee3 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator-invocation-order.test.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator-invocation-order.test.ts @@ -25,6 +25,7 @@ import { ILocalDurableTestRunnerFactory } from "../interfaces/durable-test-runne import { DurableApiClient } from "../../common/create-durable-api-client"; import { CheckpointApiClient } from "../api-client/checkpoint-api-client"; import { InvocationResult } from "../../../checkpoint-server/storage/execution-manager"; +import { CompleteInvocationResponse } from "../../../checkpoint-server/worker-api/worker-api-response"; // Mock dependencies jest.mock("../operations/local-operation-storage"); @@ -75,15 +76,20 @@ describe("TestExecutionOrchestrator - Invocation History Ordering", () => { // Tracking arrays for call order verification let callOrder: string[]; let addHistoryEventSpy: jest.SpyInstance; - let completeInvocationSpy: jest.SpyInstance; - - const mockInvocationCompletedEvent: Event = { - EventType: EventType.InvocationCompleted, - InvocationCompletedDetails: { - RequestId: "invocation-request-id", - StartTimestamp: new Date(), - EndTimestamp: new Date(), + let completeInvocationSpy: jest.SpyInstance< + Promise + >; + + const mockInvocationCompletedEvent: CompleteInvocationResponse = { + event: { + EventType: EventType.InvocationCompleted, + InvocationCompletedDetails: { + RequestId: "invocation-request-id", + StartTimestamp: new Date(), + EndTimestamp: new Date(), + }, }, + hasDirtyOperations: false, }; const nonResolvingPromise = new Promise(() => { @@ -135,7 +141,10 @@ describe("TestExecutionOrchestrator - Invocation History Ordering", () => { }), completeInvocation: jest.fn().mockImplementation(() => { callOrder.push("completeInvocation"); - return Promise.resolve(mockInvocationCompletedEvent); + return Promise.resolve({ + event: mockInvocationCompletedEvent, + hasDirtyOperations: false, + }); }), }; @@ -509,8 +518,11 @@ describe("TestExecutionOrchestrator - Invocation History Ordering", () => { invocationCount++; callOrder.push(`completeInvocation${invocationCount}`); return Promise.resolve({ - ...mockInvocationCompletedEvent, - EventId: invocationCount, + event: { + ...mockInvocationCompletedEvent, + EventId: invocationCount, + }, + hasDirtyOperations: false, }); }); diff --git a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator-pending-rejection.test.ts b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator-pending-rejection.test.ts new file mode 100644 index 000000000..b68024583 --- /dev/null +++ b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator-pending-rejection.test.ts @@ -0,0 +1,765 @@ +import { TestExecutionOrchestrator } from "../test-execution-orchestrator"; +import { LocalOperationStorage } from "../operations/local-operation-storage"; +import { + createCheckpointToken, + createExecutionId, +} from "../../../checkpoint-server/utils/tagged-strings"; +import { InvocationStatus } from "@aws/durable-execution-sdk-js"; +import { + Event, + EventType, + OperationAction, + OperationStatus, + OperationType, +} from "@aws-sdk/client-lambda"; +import { OperationWaitManager } from "../operations/operation-wait-manager"; +import { IndexedOperations } from "../../common/indexed-operations"; +import { OperationEvents } from "../../common/operations/operation-with-data"; +import { FunctionStorage } from "../operations/function-storage"; +import { ILocalDurableTestRunnerFactory } from "../interfaces/durable-test-runner-factory"; +import { DurableApiClient } from "../../common/create-durable-api-client"; +import { CheckpointApiClient } from "../api-client/checkpoint-api-client"; + +// Mock dependencies +jest.mock("../operations/local-operation-storage"); + +const mockInvoke = jest.fn(); + +jest.mock("../invoke-handler", () => ({ + InvokeHandler: jest.fn().mockImplementation(() => ({ + invoke: mockInvoke, + })), +})); + +/** + * Test suite focused on verifying that the TestExecutionOrchestrator correctly rejects + * executions with the error "Cannot return pending status with no operations" when a handler + * returns PENDING status but there are no pending operations or scheduled functions to justify + * continuing execution. + */ +describe("TestExecutionOrchestrator - Pending Status Rejection", () => { + const mockHandlerFunction = jest.fn(); + const mockExecutionId = createExecutionId("test-execution-id"); + const mockCheckpointToken = createCheckpointToken("test-checkpoint-token"); + + const mockExecutionOperationEvents: OperationEvents[] = [ + { + events: [ + { + Id: "execution-id", + EventId: 1, + EventType: EventType.ExecutionStarted, + }, + ], + operation: { + Id: "execution-id", + StartTimestamp: new Date(), + Status: OperationStatus.STARTED, + Type: OperationType.EXECUTION, + ExecutionDetails: {}, + }, + }, + ]; + + let orchestrator: TestExecutionOrchestrator; + let mockOperationStorage: jest.Mocked; + let checkpointApi: CheckpointApiClient; + let mockFunctionStorage: FunctionStorage; + let mockDurableApiClient: DurableApiClient; + + const mockInvocationCompletedEvent: Event = { + EventType: EventType.InvocationCompleted, + InvocationCompletedDetails: { + RequestId: "invocation-request-id", + StartTimestamp: new Date(), + EndTimestamp: new Date(), + }, + }; + + const nonResolvingPromise = new Promise(() => { + // never resolve + }); + + beforeEach(() => { + jest.clearAllMocks(); + mockInvoke.mockReset(); + + mockDurableApiClient = { + sendCallbackFailure: jest.fn(), + sendCallbackSuccess: jest.fn(), + sendCallbackHeartbeat: jest.fn(), + }; + + const indexedOperations = new IndexedOperations([]); + + // Mock OperationStorage + mockOperationStorage = new LocalOperationStorage( + new OperationWaitManager(indexedOperations), + indexedOperations, + mockDurableApiClient, + jest.fn(), + ) as jest.Mocked; + + mockOperationStorage.populateOperations = jest.fn(); + mockOperationStorage.addHistoryEvent = jest.fn(); + + checkpointApi = { + startDurableExecution: jest.fn().mockResolvedValue({ + executionId: mockExecutionId, + checkpointToken: mockCheckpointToken, + operationEvents: mockExecutionOperationEvents, + }), + pollCheckpointData: jest.fn().mockReturnValue(nonResolvingPromise), + updateCheckpointData: jest.fn().mockResolvedValue(undefined), + startInvocation: jest.fn().mockResolvedValue({ + checkpointToken: mockCheckpointToken, + executionId: mockExecutionId, + operationEvents: [], + }), + completeInvocation: jest.fn().mockResolvedValue({ + hasDirtyOperations: false, + event: mockInvocationCompletedEvent, + }), + }; + + // Create a mock factory for FunctionStorage + const mockFactory: ILocalDurableTestRunnerFactory = { + createRunner: jest.fn().mockReturnValue({ + run: jest.fn().mockResolvedValue({ + getStatus: () => "SUCCEEDED", + getResult: () => ({}), + }), + }), + }; + + mockFunctionStorage = new FunctionStorage(mockFactory); + + orchestrator = new TestExecutionOrchestrator( + mockHandlerFunction, + mockOperationStorage, + checkpointApi, + mockFunctionStorage, + { + enabled: false, + }, + ); + }); + + describe("Single operation type pending scenarios", () => { + it("should not reject when only invoke operation is pending", async () => { + const invokeOperationId = "pending-invoke-op"; + + // Set up complete polling sequence upfront + jest + .spyOn(checkpointApi, "pollCheckpointData") + .mockResolvedValueOnce({ + operations: [ + { + operation: { + Id: invokeOperationId, + StartTimestamp: new Date(), + Status: OperationStatus.STARTED, + Type: OperationType.CHAINED_INVOKE, + ChainedInvokeDetails: {}, + }, + update: { + Id: invokeOperationId, + Type: OperationType.CHAINED_INVOKE, + Action: OperationAction.START, + ChainedInvokeOptions: { + FunctionName: "test-function", + }, + Payload: "{}", + }, + events: [], + }, + ], + }) + // Then complete the invoke operation + .mockResolvedValueOnce({ + operations: [ + { + operation: { + Id: invokeOperationId, + StartTimestamp: new Date(), + Status: OperationStatus.SUCCEEDED, + Type: OperationType.CHAINED_INVOKE, + ChainedInvokeDetails: { + Result: JSON.stringify({ invoke: "success" }), + }, + }, + update: { + Id: invokeOperationId, + Type: OperationType.CHAINED_INVOKE, + Action: OperationAction.SUCCEED, + }, + events: [], + }, + ], + }) + .mockReturnValue(nonResolvingPromise); + + // Mock function storage to complete the invoke + jest.spyOn(mockFunctionStorage, "runHandler").mockResolvedValue({ + result: JSON.stringify({ invoke: "success" }), + error: undefined, + }); + + // Handler returns PENDING but there's a pending invoke operation + mockInvoke + .mockResolvedValueOnce({ + Status: InvocationStatus.PENDING, + }) + .mockResolvedValueOnce({ + Status: InvocationStatus.SUCCEEDED, + Result: JSON.stringify({ success: true }), + }); + + const executePromise = orchestrator.executeHandler({ + payload: { input: "test-pending-invoke" }, + }); + + const result = await executePromise; + + // Should succeed, not reject with pending status error + expect(result.status).toBe(OperationStatus.SUCCEEDED); + expect(result.result).toBe(JSON.stringify({ success: true })); + }); + + it("should not reject when only callback operation is pending", async () => { + const callbackOperationId = "pending-callback-op"; + + // Set up complete polling sequence upfront + jest + .spyOn(checkpointApi, "pollCheckpointData") + .mockResolvedValueOnce({ + operations: [ + { + operation: { + Id: callbackOperationId, + StartTimestamp: new Date(), + Status: OperationStatus.STARTED, + Type: OperationType.CALLBACK, + CallbackDetails: {}, + }, + update: undefined, // No update for started callback operations + events: [], + }, + ], + }) + // Then complete the callback operation + .mockResolvedValueOnce({ + operations: [ + { + operation: { + Id: callbackOperationId, + StartTimestamp: new Date(), + Status: OperationStatus.SUCCEEDED, + Type: OperationType.CALLBACK, + CallbackDetails: { Result: "{}" }, + }, + update: { + Id: callbackOperationId, + Type: OperationType.CALLBACK, + Action: OperationAction.SUCCEED, + }, + events: [], + }, + ], + }) + .mockReturnValue(nonResolvingPromise); + + // Handler returns PENDING but there's a pending callback operation + mockInvoke + .mockResolvedValueOnce({ + Status: InvocationStatus.PENDING, + }) + .mockResolvedValueOnce({ + Status: InvocationStatus.SUCCEEDED, + Result: JSON.stringify({ success: true }), + }); + + const executePromise = orchestrator.executeHandler({ + payload: { input: "test-pending-callback" }, + }); + + const result = await executePromise; + + // Should succeed, not reject with pending status error + expect(result.status).toBe(OperationStatus.SUCCEEDED); + expect(result.result).toBe(JSON.stringify({ success: true })); + }); + + it("should reject when handler returns PENDING with no operations", async () => { + // Set up empty polling response - no operations + jest + .spyOn(checkpointApi, "pollCheckpointData") + .mockReturnValue(nonResolvingPromise); + + // Handler returns PENDING but there are no pending operations or scheduled functions + mockInvoke.mockResolvedValue({ + Status: InvocationStatus.PENDING, + }); + + const executePromise = orchestrator.executeHandler({ + payload: { input: "test-no-operations" }, + }); + + await expect(executePromise).rejects.toEqual({ + error: { + ErrorType: "InvalidParameterValueException", + ErrorMessage: + "Cannot return PENDING status with no pending operations.", + }, + status: "FAILED", + }); + }); + + it("should not reject when there are pending operations, but dirty operations exist", async () => { + // Set up empty polling response - no operations + jest + .spyOn(checkpointApi, "pollCheckpointData") + .mockReturnValue(nonResolvingPromise); + + // Set up dirty operations + jest.spyOn(checkpointApi, "completeInvocation").mockResolvedValue({ + hasDirtyOperations: true, + event: mockInvocationCompletedEvent, + }); + + // Handler returns PENDING but there are no pending operations or scheduled functions + mockInvoke + .mockResolvedValue({ + Status: InvocationStatus.PENDING, + }) + .mockResolvedValueOnce({ + Status: InvocationStatus.SUCCEEDED, + Result: JSON.stringify({ success: true }), + }); + + const result = await orchestrator.executeHandler({ + payload: { input: "test-no-operations" }, + }); + + expect(result.status).toBe(OperationStatus.SUCCEEDED); + expect(result.result).toBe(JSON.stringify({ success: true })); + }); + }); + + describe("Scheduled function scenarios", () => { + it("should not reject when scheduled functions are pending", async () => { + const waitOperationId = "scheduled-wait-op"; + const futureTimestamp = new Date(Date.now() + 100); + + jest + .spyOn(checkpointApi, "pollCheckpointData") + .mockResolvedValueOnce({ + operations: [ + { + operation: { + Id: waitOperationId, + StartTimestamp: new Date(), + Status: OperationStatus.STARTED, + Type: OperationType.WAIT, + WaitDetails: { + ScheduledEndTimestamp: futureTimestamp, + }, + }, + update: { + Id: waitOperationId, + Type: OperationType.WAIT, + Action: OperationAction.START, + }, + events: [], + }, + ], + }) + // Then complete the wait operation + .mockResolvedValueOnce({ + operations: [ + { + operation: { + Id: waitOperationId, + StartTimestamp: new Date(), + Status: OperationStatus.SUCCEEDED, + Type: OperationType.WAIT, + WaitDetails: { ScheduledEndTimestamp: futureTimestamp }, + }, + update: { + Id: waitOperationId, + Type: OperationType.WAIT, + Action: OperationAction.SUCCEED, + }, + events: [], + }, + ], + }) + .mockReturnValue(nonResolvingPromise); + + // Handler returns PENDING but there's a scheduled wait operation + mockInvoke + .mockResolvedValueOnce({ + Status: InvocationStatus.PENDING, + }) + .mockResolvedValueOnce({ + Status: InvocationStatus.SUCCEEDED, + Result: JSON.stringify({ success: true }), + }); + + const executePromise = orchestrator.executeHandler({ + payload: { input: "test-scheduled-wait" }, + }); + + const result = await executePromise; + + expect(result.status).toBe(OperationStatus.SUCCEEDED); + expect(result.result).toBe(JSON.stringify({ success: true })); + }); + + it("should not immediately re-invoke when there are dirty operations but scheduled functions exist", async () => { + // Set up empty polling response - no operations + jest + .spyOn(checkpointApi, "pollCheckpointData") + .mockReturnValue(nonResolvingPromise); + + // Set up dirty operations + jest.spyOn(checkpointApi, "completeInvocation").mockResolvedValue({ + hasDirtyOperations: true, + event: mockInvocationCompletedEvent, + }); + + // Handler returns PENDING but there are no pending operations or scheduled functions + mockInvoke + .mockResolvedValue({ + Status: InvocationStatus.PENDING, + }) + .mockResolvedValueOnce({ + Status: InvocationStatus.SUCCEEDED, + Result: JSON.stringify({ success: true }), + }); + + const result = await orchestrator.executeHandler({ + payload: { input: "test-no-operations" }, + }); + + expect(result.status).toBe(OperationStatus.SUCCEEDED); + expect(result.result).toBe(JSON.stringify({ success: true })); + }); + }); + + describe("Operations completed scenarios", () => { + it("should reject when operations were created but are no longer pending", async () => { + const completedInvokeId = "completed-invoke-op"; + const completedCallbackId = "completed-callback-op"; + + // Mock polling to return already completed operations + jest + .spyOn(checkpointApi, "pollCheckpointData") + .mockResolvedValueOnce({ + operations: [ + { + operation: { + Id: completedInvokeId, + StartTimestamp: new Date(), + Status: OperationStatus.SUCCEEDED, + Type: OperationType.CHAINED_INVOKE, + ChainedInvokeDetails: { Result: "{}" }, + }, + update: { + Id: completedInvokeId, + Type: OperationType.CHAINED_INVOKE, + Action: OperationAction.SUCCEED, + }, + events: [], + }, + { + operation: { + Id: completedCallbackId, + StartTimestamp: new Date(), + Status: OperationStatus.SUCCEEDED, + Type: OperationType.CALLBACK, + CallbackDetails: { Result: "{}" }, + }, + update: { + Id: completedCallbackId, + Type: OperationType.CALLBACK, + Action: OperationAction.SUCCEED, + }, + events: [], + }, + ], + }) + .mockReturnValue(nonResolvingPromise); + + // Handler returns PENDING, but all operations are already completed + mockInvoke.mockResolvedValue({ + Status: InvocationStatus.PENDING, + }); + + const executePromise = orchestrator.executeHandler({ + payload: { input: "test-completed-operations" }, + }); + + // The handler still returns PENDING with no actual pending work + await expect(executePromise).rejects.toEqual({ + error: { + ErrorType: "InvalidParameterValueException", + ErrorMessage: + "Cannot return PENDING status with no pending operations.", + }, + status: "FAILED", + }); + }); + + it("should reject when callback completes between invocations leaving nothing pending", async () => { + const callbackOperationId = "completing-callback-op"; + + // First polling returns pending callback + jest + .spyOn(checkpointApi, "pollCheckpointData") + .mockResolvedValueOnce({ + operations: [ + { + operation: { + Id: callbackOperationId, + StartTimestamp: new Date(), + Status: OperationStatus.STARTED, + Type: OperationType.CALLBACK, + CallbackDetails: {}, + }, + update: undefined, + events: [], + }, + ], + }) + // Second polling returns completed callback + .mockResolvedValueOnce({ + operations: [ + { + operation: { + Id: callbackOperationId, + StartTimestamp: new Date(), + Status: OperationStatus.SUCCEEDED, + Type: OperationType.CALLBACK, + CallbackDetails: { Result: "{}" }, + }, + update: { + Id: callbackOperationId, + Type: OperationType.CALLBACK, + Action: OperationAction.SUCCEED, + }, + events: [], + }, + ], + }) + .mockReturnValue(nonResolvingPromise); + + // Handler returns PENDING on both invocations + mockInvoke.mockResolvedValue({ + Status: InvocationStatus.PENDING, + }); + + const executePromise = orchestrator.executeHandler({ + payload: { input: "test-completing-callback" }, + }); + + // Should reject on the second invocation when callback completes but handler still returns PENDING + await expect(executePromise).rejects.toEqual({ + error: { + ErrorType: "InvalidParameterValueException", + ErrorMessage: + "Cannot return PENDING status with no pending operations.", + }, + status: "FAILED", + }); + }); + }); + + describe("Edge cases and error conditions", () => { + it("should handle mixed scenarios with some operations completing while others remain pending", async () => { + const invokeOperationId = "completing-invoke-op"; + const callbackOperationId = "pending-callback-op"; + + // First polling: both operations pending, then complete both + jest + .spyOn(checkpointApi, "pollCheckpointData") + .mockResolvedValueOnce({ + operations: [ + { + operation: { + Id: invokeOperationId, + StartTimestamp: new Date(), + Status: OperationStatus.STARTED, + Type: OperationType.CHAINED_INVOKE, + ChainedInvokeDetails: {}, + }, + update: { + Id: invokeOperationId, + Type: OperationType.CHAINED_INVOKE, + Action: OperationAction.START, + ChainedInvokeOptions: { + FunctionName: "test-function", + }, + Payload: "{}", + }, + events: [], + }, + { + operation: { + Id: callbackOperationId, + StartTimestamp: new Date(), + Status: OperationStatus.STARTED, + Type: OperationType.CALLBACK, + CallbackDetails: {}, + }, + update: undefined, + events: [], + }, + ], + }) + // Complete callback to finish execution + .mockResolvedValueOnce({ + operations: [ + { + operation: { + Id: callbackOperationId, + StartTimestamp: new Date(), + Status: OperationStatus.SUCCEEDED, + Type: OperationType.CALLBACK, + CallbackDetails: { Result: "{}" }, + }, + update: { + Id: callbackOperationId, + Type: OperationType.CALLBACK, + Action: OperationAction.SUCCEED, + }, + events: [], + }, + ], + }) + .mockReturnValue(nonResolvingPromise); + + // Mock function storage to complete invoke quickly + jest.spyOn(mockFunctionStorage, "runHandler").mockResolvedValue({ + result: JSON.stringify({ invoke: "completed" }), + error: undefined, + }); + + // Handler returns PENDING - should be valid with callback still pending + mockInvoke + .mockResolvedValueOnce({ + Status: InvocationStatus.PENDING, + }) + .mockResolvedValueOnce({ + Status: InvocationStatus.SUCCEEDED, + Result: JSON.stringify({ success: true }), + }); + + const executePromise = orchestrator.executeHandler({ + payload: { input: "test-mixed-completion" }, + }); + + const result = await executePromise; + + // Should succeed - callback was still pending when first PENDING was returned + expect(result.status).toBe(OperationStatus.SUCCEEDED); + expect(result.result).toBe(JSON.stringify({ success: true })); + }); + + it("should properly track pending operations across multiple polling cycles", async () => { + const firstCallbackId = "first-callback-op"; + const secondCallbackId = "second-callback-op"; + + // First polling: one callback, then first completes and second starts, then second completes + jest + .spyOn(checkpointApi, "pollCheckpointData") + .mockResolvedValueOnce({ + operations: [ + { + operation: { + Id: firstCallbackId, + StartTimestamp: new Date(), + Status: OperationStatus.STARTED, + Type: OperationType.CALLBACK, + CallbackDetails: {}, + }, + update: undefined, + events: [], + }, + ], + }) + // Second polling: first completes, second starts + .mockResolvedValueOnce({ + operations: [ + { + operation: { + Id: firstCallbackId, + StartTimestamp: new Date(), + Status: OperationStatus.SUCCEEDED, + Type: OperationType.CALLBACK, + CallbackDetails: { Result: "{}" }, + }, + update: { + Id: firstCallbackId, + Type: OperationType.CALLBACK, + Action: OperationAction.SUCCEED, + }, + events: [], + }, + { + operation: { + Id: secondCallbackId, + StartTimestamp: new Date(), + Status: OperationStatus.STARTED, + Type: OperationType.CALLBACK, + CallbackDetails: {}, + }, + update: undefined, + events: [], + }, + ], + }) + // Complete second callback + .mockResolvedValueOnce({ + operations: [ + { + operation: { + Id: secondCallbackId, + StartTimestamp: new Date(), + Status: OperationStatus.SUCCEEDED, + Type: OperationType.CALLBACK, + CallbackDetails: { Result: "{}" }, + }, + update: { + Id: secondCallbackId, + Type: OperationType.CALLBACK, + Action: OperationAction.SUCCEED, + }, + events: [], + }, + ], + }) + .mockReturnValue(nonResolvingPromise); + + // Handler returns PENDING on both invocations (valid in both cases) + mockInvoke + .mockResolvedValueOnce({ + Status: InvocationStatus.PENDING, + }) + .mockResolvedValueOnce({ + Status: InvocationStatus.SUCCEEDED, + Result: JSON.stringify({ success: true }), + }); + + const executePromise = orchestrator.executeHandler({ + payload: { input: "test-multiple-polling-cycles" }, + }); + + const result = await executePromise; + + // Should succeed - there was always at least one pending operation + expect(result.status).toBe(OperationStatus.SUCCEEDED); + expect(result.result).toBe(JSON.stringify({ success: true })); + }); + }); +}); diff --git a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator.test.ts b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator.test.ts index 30f11fe6f..fa34533c4 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator.test.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/__tests__/test-execution-orchestrator.test.ts @@ -904,9 +904,14 @@ describe("TestExecutionOrchestrator", () => { ) .mockRejectedValue(new Error("Not implemented")); - mockInvoke.mockResolvedValue({ - Status: InvocationStatus.PENDING, - }); + mockInvoke + .mockResolvedValueOnce({ + Status: InvocationStatus.PENDING, + }) + .mockResolvedValueOnce({ + Status: InvocationStatus.SUCCEEDED, + Result: JSON.stringify({ success: true }), + }); const executePromise = orchestrator.executeHandler(); diff --git a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/api-client/checkpoint-api-client.ts b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/api-client/checkpoint-api-client.ts index d84b770dc..4da3aef48 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/api-client/checkpoint-api-client.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/api-client/checkpoint-api-client.ts @@ -1,4 +1,4 @@ -import { ErrorObject, Operation, Event } from "@aws-sdk/client-lambda"; +import { ErrorObject, Operation } from "@aws-sdk/client-lambda"; import { CheckpointOperation } from "../../../checkpoint-server/storage/checkpoint-manager"; import { InvocationResult } from "../../../checkpoint-server/storage/execution-manager"; import { SerializedCheckpointOperation } from "../../../checkpoint-server/types/operation-event"; @@ -10,6 +10,7 @@ import { StartDurableExecutionRequest, StartInvocationRequest, } from "../../../checkpoint-server/worker-api/worker-api-request"; +import { CompleteInvocationResponse } from "../../../checkpoint-server/worker-api/worker-api-response"; export interface SerializedPollCheckpointResponse { operations: SerializedCheckpointOperation[]; @@ -62,5 +63,5 @@ export interface CheckpointApiClient { executionId: ExecutionId, invocationId: InvocationId, error: ErrorObject | undefined, - ): Promise; + ): Promise; } diff --git a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/api-client/checkpoint-worker-api-client.ts b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/api-client/checkpoint-worker-api-client.ts index 5eb65914c..17e36ce2f 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/api-client/checkpoint-worker-api-client.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/api-client/checkpoint-worker-api-client.ts @@ -1,4 +1,4 @@ -import { Operation, ErrorObject, Event } from "@aws-sdk/client-lambda"; +import { Operation, ErrorObject } from "@aws-sdk/client-lambda"; import { CheckpointOperation } from "../../../checkpoint-server/storage/checkpoint-manager"; import { InvocationResult } from "../../../checkpoint-server/storage/execution-manager"; import { @@ -12,6 +12,7 @@ import { StartDurableExecutionRequest, StartInvocationRequest, } from "../../../checkpoint-server/worker-api/worker-api-request"; +import { CompleteInvocationResponse } from "../../../checkpoint-server/worker-api/worker-api-response"; export class CheckpointWorkerApiClient implements CheckpointApiClient { constructor(private readonly workerManager: CheckpointWorkerManager) {} @@ -76,7 +77,7 @@ export class CheckpointWorkerApiClient implements CheckpointApiClient { executionId: ExecutionId, invocationId: InvocationId, error: ErrorObject | undefined, - ): Promise { + ): Promise { return this.workerManager.sendApiRequest(ApiType.CompleteInvocation, { executionId, invocationId, diff --git a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/operations/__tests__/invocation-tracker.test.ts b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/operations/__tests__/invocation-tracker.test.ts index e91cc82b8..8c4084a7e 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/operations/__tests__/invocation-tracker.test.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/operations/__tests__/invocation-tracker.test.ts @@ -19,14 +19,17 @@ describe("InvocationTracker", () => { startInvocation: jest.fn(), completeInvocation: jest.fn((_executionId, invocationId, error) => Promise.resolve({ - InvocationCompletedDetails: { - StartTimestamp: new Date(), - EndTimestamp: new Date(), - Error: { - Payload: error, + event: { + InvocationCompletedDetails: { + StartTimestamp: new Date(), + EndTimestamp: new Date(), + Error: { + Payload: error, + }, + RequestId: invocationId, }, - RequestId: invocationId, }, + hasDirtyOperations: false, }), ), }; diff --git a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/operations/invocation-tracker.ts b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/operations/invocation-tracker.ts index b3615ff2a..f6a84a185 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/operations/invocation-tracker.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/operations/invocation-tracker.ts @@ -1,10 +1,11 @@ -import { ErrorObject, Event } from "@aws-sdk/client-lambda"; +import { ErrorObject } from "@aws-sdk/client-lambda"; import { createInvocationId, ExecutionId, InvocationId, } from "../../../checkpoint-server/utils/tagged-strings"; import { CheckpointApiClient } from "../api-client/checkpoint-api-client"; +import { CompleteInvocationResponse } from "../../../checkpoint-server/worker-api/worker-api-response"; /** * Manages tracking of invocations in local runner. @@ -49,7 +50,7 @@ export class InvocationTracker { executionId: ExecutionId, invocationId: InvocationId, error: ErrorObject | undefined, - ): Promise { + ): Promise { if (!this.invocations.has(invocationId)) { throw new Error(`Invocation with ID ${invocationId} not found`); } diff --git a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/test-execution-orchestrator.ts b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/test-execution-orchestrator.ts index d77c447ee..5026468f7 100644 --- a/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/test-execution-orchestrator.ts +++ b/packages/aws-durable-execution-sdk-js-testing/src/test-runner/local/test-execution-orchestrator.ts @@ -46,6 +46,7 @@ export class TestExecutionOrchestrator { private invokeHandlerInstance: InvokeHandler; private invocationTracker: InvocationTracker; private readonly scheduler: Scheduler; + private readonly pendingOperations = new Set(); constructor( private handlerFunction: DurableLambdaHandler, @@ -294,18 +295,21 @@ export class TestExecutionOrchestrator { ); } + const operationId = update.Id; + if (operationId === undefined) { + throw new Error("Missing operation id"); + } + + this.pendingOperations.add(operationId); + const { result, error } = await this.functionStorage.runHandler( functionName, update.Payload, ); - if (update.Id === undefined) { - throw new Error("Missing operation id"); - } - await this.checkpointApi.updateCheckpointData({ executionId, - operationId: update.Id, + operationId: operationId, operationData: { // todo: handle other operation types as well Status: result ? OperationStatus.SUCCEEDED : OperationStatus.FAILED, @@ -321,6 +325,11 @@ export class TestExecutionOrchestrator { (err) => { this.executionState.rejectWith(err); }, + undefined, + () => { + this.pendingOperations.delete(operationId); + return Promise.resolve(); + }, ); } @@ -444,7 +453,13 @@ export class TestExecutionOrchestrator { operation: Operation, executionId: ExecutionId, ): void { + const operationId = operation.Id; + if (!operationId) { + throw new Error("Missing operation id"); + } + if (operation.Status === OperationStatus.STARTED) { + this.pendingOperations.add(operationId); return; } @@ -453,6 +468,11 @@ export class TestExecutionOrchestrator { (err) => { this.executionState.rejectWith(err); }, + undefined, + () => { + this.pendingOperations.delete(operationId); + return Promise.resolve(); + }, ); } @@ -587,24 +607,59 @@ export class TestExecutionOrchestrator { value, ); - this.operationStorage.addHistoryEvent( + const { event, hasDirtyOperations } = await this.invocationTracker.completeInvocation( executionId, invocationId, "Error" in value ? value.Error : undefined, - ), - ); + ); + + this.operationStorage.addHistoryEvent(event); if (value.Status === InvocationStatus.SUCCEEDED) { this.executionState.resolveWith({ result: value.Result, status: OperationStatus.SUCCEEDED, }); - } else if (value.Status === InvocationStatus.FAILED) { + return; + } + + if (value.Status === InvocationStatus.FAILED) { this.executionState.resolveWith({ error: value.Error, status: OperationStatus.FAILED, }); + return; + } + + if (!this.scheduler.hasScheduledFunction() && hasDirtyOperations) { + defaultLogger.debug( + "Re-invoking handler since invocation completed with pending dirty operations", + ); + // Re-invoke the handler if its last checkpoint was not synchronized + // with the backend and there are pending dirty operations when the + // invocation completed. + this.scheduler.scheduleFunction( + () => this.invokeHandler(executionId), + (err) => { + this.executionState.rejectWith(err); + }, + ); + return; + } + + if ( + !this.scheduler.hasScheduledFunction() && + !this.pendingOperations.size + ) { + this.executionState.rejectWith({ + error: { + ErrorType: "InvalidParameterValueException", + ErrorMessage: + "Cannot return PENDING status with no pending operations.", + }, + status: ExecutionStatus.FAILED, + }); } } catch (err) { defaultLogger.debug( @@ -612,18 +667,17 @@ export class TestExecutionOrchestrator { err, ); - this.operationStorage.addHistoryEvent( - await this.invocationTracker.completeInvocation( - executionId, - invocationId, - { - ErrorMessage: err instanceof Error ? err.message : undefined, - ErrorType: err instanceof Error ? err.name : undefined, - StackTrace: - err instanceof Error ? err.stack?.split("\n") : undefined, - }, - ), + const { event } = await this.invocationTracker.completeInvocation( + executionId, + invocationId, + { + ErrorMessage: err instanceof Error ? err.message : undefined, + ErrorType: err instanceof Error ? err.name : undefined, + StackTrace: err instanceof Error ? err.stack?.split("\n") : undefined, + }, ); + + this.operationStorage.addHistoryEvent(event); this.executionState.rejectWith(err); } }