Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import { createTests } from "../../../utils/test-helper";
createTests({
handler,
invocationType: InvocationType.Event,
localRunnerConfig: {
skipTime: false,
},
tests: (runner) => {
it("should handle multiple invocations tracking with waitForCallback operations", async () => {
// Get operations for verification
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ describe("execution handlers", () => {
Error: { Payload: undefined },
},
};
completeInvocationSpy.mockReturnValue(mockEvent);
completeInvocationSpy.mockReturnValue({
event: mockEvent,
hasDirtyOperations: false,
});

const errorObject: ErrorObject = {
ErrorType: "TestError",
Expand All @@ -160,7 +163,10 @@ describe("execution handlers", () => {
invocationId,
errorObject,
);
expect(result).toEqual(mockEvent);
expect(result).toEqual({
event: mockEvent,
hasDirtyOperations: false,
});
});
});
});
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ErrorObject, Event } from "@aws-sdk/client-lambda";
import { ErrorObject } from "@aws-sdk/client-lambda";
import {
createExecutionId,
ExecutionId,
Expand All @@ -12,6 +12,7 @@ import {
StartDurableExecutionRequest,
StartInvocationRequest,
} from "../worker-api/worker-api-request";
import { CompleteInvocationResponse } from "../worker-api/worker-api-response";

/**
* Starts a durable execution. Returns the data needed for the handler invocation event.
Expand Down Expand Up @@ -42,6 +43,6 @@ export function processCompleteInvocation(
invocationId: InvocationId,
error: ErrorObject | undefined,
executionManager: ExecutionManager,
): Event {
): CompleteInvocationResponse {
return executionManager.completeInvocation(executionId, invocationId, error);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ describe("CheckpointManager dirty operation tracking", () => {
const dirtyOperations = storage.getDirtyOperations();

expect(dirtyOperations).toEqual([]);
expect(storage.hasDirtyOperations()).toBe(false);
});

it("should return dirty operations and clear the dirty set", () => {
Expand All @@ -51,8 +52,11 @@ describe("CheckpointManager dirty operation tracking", () => {
expect(firstCall).toHaveLength(1);
expect(firstCall[0].Id).toBe("dirty-step");

expect(storage.hasDirtyOperations()).toBe(false);

const secondCall = storage.getDirtyOperations();
expect(secondCall).toEqual([]);
expect(storage.hasDirtyOperations()).toBe(false);
});

it("should return multiple dirty operations in correct order", () => {
Expand Down Expand Up @@ -86,6 +90,7 @@ describe("CheckpointManager dirty operation tracking", () => {
"second-op",
"third-op",
]);
expect(storage.hasDirtyOperations()).toBe(false);
});
});

Expand All @@ -100,16 +105,20 @@ describe("CheckpointManager dirty operation tracking", () => {
storage.registerUpdate(stepUpdate);

// Verify operation is dirty before invocation
expect(storage.hasDirtyOperations()).toBe(true);
const dirtyBefore = storage.getDirtyOperations();
expect(dirtyBefore).toHaveLength(1);
expect(storage.hasDirtyOperations()).toBe(false);

// Start new invocation should clear dirty set
const invocationId = createInvocationId("test-invocation");
storage.startInvocation(invocationId);
expect(storage.hasDirtyOperations()).toBe(false);

// Verify dirty set is cleared after invocation
const dirtyAfter = storage.getDirtyOperations();
expect(dirtyAfter).toEqual([]);
expect(storage.hasDirtyOperations()).toBe(false);
});

it("should return all operations from startInvocation", () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
OperationType,
EventType,
ErrorObject,
OperationAction,
} from "@aws-sdk/client-lambda";
import { ExecutionManager, StartExecutionParams } from "../execution-manager";
import { CheckpointManager } from "../checkpoint-manager";
Expand Down Expand Up @@ -520,7 +521,10 @@ describe("execution-manager", () => {
);

// Verify the result
expect(result).toBe(mockHistoryEvent);
expect(result).toEqual({
event: mockHistoryEvent,
hasDirtyOperations: false,
});

// Verify CheckpointManager.completeInvocation was called
expect(completeInvocationSpy).toHaveBeenCalledWith(invocationId);
Expand Down Expand Up @@ -593,7 +597,10 @@ describe("execution-manager", () => {
);

// Verify the result
expect(result).toBe(mockHistoryEvent);
expect(result).toEqual({
event: mockHistoryEvent,
hasDirtyOperations: false,
});

// Verify CheckpointManager.completeInvocation was called
expect(completeInvocationSpy).toHaveBeenCalledWith(invocationId);
Expand Down Expand Up @@ -642,6 +649,91 @@ describe("execution-manager", () => {
// Verify CheckpointManager.completeInvocation was called
expect(completeInvocationSpy).toHaveBeenCalledWith(invocationId);
});

it("should return hasDirtyOperations as true when checkpoint storage has dirty operations", () => {
// Create an execution first
const executionId = createExecutionId("test-execution-id");
const invocationId = createInvocationId("test-invocation-id");
executionManager.startExecution({ executionId, invocationId });

const storage = executionManager.getCheckpointsByExecution(executionId);
expect(storage).toBeDefined();

// Register an operation to make the storage dirty
const stepUpdate = {
Id: "test-step-operation",
Type: OperationType.STEP,
Action: OperationAction.START,
};
storage!.registerUpdate(stepUpdate);

// Mock the completeInvocation method on CheckpointManager
const mockTimestamps = {
startTimestamp: new Date("2023-01-01T00:00:00.000Z"),
endTimestamp: new Date("2023-01-01T00:01:00.000Z"),
};

const completeInvocationSpy = jest
.spyOn(storage!, "completeInvocation")
.mockReturnValue(mockTimestamps);

// Mock the eventProcessor.createHistoryEvent method
const mockHistoryEvent = {
EventType: EventType.InvocationCompleted,
Timestamp: new Date(),
InvocationCompletedDetails: {
StartTimestamp: mockTimestamps.startTimestamp,
EndTimestamp: mockTimestamps.endTimestamp,
Error: {
Payload: undefined,
},
RequestId: invocationId,
},
};

const createHistoryEventSpy = jest
.spyOn(storage!.eventProcessor, "createHistoryEvent")
.mockReturnValue(mockHistoryEvent);

// Mock hasDirtyOperations to return true
const hasDirtyOperationsSpy = jest
.spyOn(storage!, "hasDirtyOperations")
.mockReturnValue(true);

// Call the method
const result = executionManager.completeInvocation(
executionId,
invocationId,
undefined,
);

// Verify the result shows dirty operations exist
expect(result).toEqual({
event: mockHistoryEvent,
hasDirtyOperations: true,
});

// Verify CheckpointManager.completeInvocation was called
expect(completeInvocationSpy).toHaveBeenCalledWith(invocationId);

// Verify hasDirtyOperations was called
expect(hasDirtyOperationsSpy).toHaveBeenCalled();

// Verify eventProcessor.createHistoryEvent was called with correct parameters
expect(createHistoryEventSpy).toHaveBeenCalledWith(
EventType.InvocationCompleted,
undefined,
"InvocationCompletedDetails",
{
StartTimestamp: mockTimestamps.startTimestamp,
EndTimestamp: mockTimestamps.endTimestamp,
Error: {
Payload: undefined,
},
RequestId: invocationId,
},
);
});
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ export class CheckpointManager {
return this.operationDataMap;
}

hasDirtyOperations(): boolean {
return !!this.dirtyOperationIds.size;
}

getDirtyOperations(): Operation[] {
const dirtyOperations = Array.from(this.dirtyOperationIds).map((id) => {
const operationEvents = this.operationDataMap.get(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import { CallbackId, CheckpointToken } from "../utils/tagged-strings";
import { ExecutionId, InvocationId } from "../utils/tagged-strings";
import { decodeCallbackId } from "../utils/callback-id";
import { OperationEvents } from "../../test-runner/common/operations/operation-with-data";
import { ErrorObject, EventType, Event } from "@aws-sdk/client-lambda";
import { ErrorObject, EventType } from "@aws-sdk/client-lambda";
import {
StartDurableExecutionRequest,
StartInvocationRequest,
} from "../worker-api/worker-api-request";
import { CompleteInvocationResponse } from "../worker-api/worker-api-response";

export interface InvocationResult {
checkpointToken: CheckpointToken;
Expand Down Expand Up @@ -105,7 +106,7 @@ export class ExecutionManager {
executionId: ExecutionId,
invocationId: InvocationId,
error: ErrorObject | undefined,
): Event {
): CompleteInvocationResponse {
const checkpointStorage = this.executions.get(executionId);

if (!checkpointStorage) {
Expand All @@ -115,7 +116,7 @@ export class ExecutionManager {
const { startTimestamp, endTimestamp } =
checkpointStorage.completeInvocation(invocationId);

return checkpointStorage.eventProcessor.createHistoryEvent(
const event = checkpointStorage.eventProcessor.createHistoryEvent(
EventType.InvocationCompleted,
undefined,
"InvocationCompletedDetails",
Expand All @@ -128,6 +129,11 @@ export class ExecutionManager {
RequestId: invocationId,
},
);

return {
event,
hasDirtyOperations: checkpointStorage.hasDirtyOperations(),
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ export interface PollCheckpointDataResponse {
operations: CheckpointOperation[];
}

export interface CompleteInvocationResponse {
event: Event;
hasDirtyOperations: boolean;
}

export interface WorkerApiResponseMapping {
[ApiType.StartDurableExecution]: InvocationResult;
[ApiType.StartInvocation]: InvocationResult;
[ApiType.CompleteInvocation]: Event;
[ApiType.CompleteInvocation]: CompleteInvocationResponse;
[ApiType.UpdateCheckpointData]: Record<string, never>;
[ApiType.PollCheckpointData]: Promise<PollCheckpointDataResponse>;
[ApiType.GetDurableExecutionState]: GetDurableExecutionStateResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,10 @@ describe("CheckpointWorker", () => {
Timestamp: new Date(),
Type: "InvocationComplete",
};
mockApiHandlerInstance.performApiCall.mockReturnValue(mockEvent);
mockApiHandlerInstance.performApiCall.mockReturnValue({
event: mockEvent,
hasDirtyOperations: false,
});

messageHandler(command);

Expand All @@ -318,7 +321,10 @@ describe("CheckpointWorker", () => {
data: {
type: ApiType.CompleteInvocation,
requestId: "complete-request-456",
response: mockEvent,
response: {
event: mockEvent,
hasDirtyOperations: false,
},
},
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { ILocalDurableTestRunnerFactory } from "../interfaces/durable-test-runne
import { DurableApiClient } from "../../common/create-durable-api-client";
import { CheckpointApiClient } from "../api-client/checkpoint-api-client";
import { InvocationResult } from "../../../checkpoint-server/storage/execution-manager";
import { CompleteInvocationResponse } from "../../../checkpoint-server/worker-api/worker-api-response";

// Mock dependencies
jest.mock("../operations/local-operation-storage");
Expand Down Expand Up @@ -75,15 +76,20 @@ describe("TestExecutionOrchestrator - Invocation History Ordering", () => {
// Tracking arrays for call order verification
let callOrder: string[];
let addHistoryEventSpy: jest.SpyInstance;
let completeInvocationSpy: jest.SpyInstance;

const mockInvocationCompletedEvent: Event = {
EventType: EventType.InvocationCompleted,
InvocationCompletedDetails: {
RequestId: "invocation-request-id",
StartTimestamp: new Date(),
EndTimestamp: new Date(),
let completeInvocationSpy: jest.SpyInstance<
Promise<CompleteInvocationResponse>
>;

const mockInvocationCompletedEvent: CompleteInvocationResponse = {
event: {
EventType: EventType.InvocationCompleted,
InvocationCompletedDetails: {
RequestId: "invocation-request-id",
StartTimestamp: new Date(),
EndTimestamp: new Date(),
},
},
hasDirtyOperations: false,
};

const nonResolvingPromise = new Promise<never>(() => {
Expand Down Expand Up @@ -135,7 +141,10 @@ describe("TestExecutionOrchestrator - Invocation History Ordering", () => {
}),
completeInvocation: jest.fn().mockImplementation(() => {
callOrder.push("completeInvocation");
return Promise.resolve(mockInvocationCompletedEvent);
return Promise.resolve({
event: mockInvocationCompletedEvent,
hasDirtyOperations: false,
});
}),
};

Expand Down Expand Up @@ -509,8 +518,11 @@ describe("TestExecutionOrchestrator - Invocation History Ordering", () => {
invocationCount++;
callOrder.push(`completeInvocation${invocationCount}`);
return Promise.resolve({
...mockInvocationCompletedEvent,
EventId: invocationCount,
event: {
...mockInvocationCompletedEvent,
EventId: invocationCount,
},
hasDirtyOperations: false,
});
});

Expand Down
Loading
Loading