Skip to content

Commit 8ffcf85

Browse files
authored
fix(sdk): correct FAILURE_TOLERANCE_EXCEEDED behavior in concurrent operations (#359)
Fix bug where tolerance checks occurred after ALL_COMPLETED check, causing incorrect completion reasons when failure thresholds were exceeded. Changes: - Fix logic ordering in getCompletionReason to check tolerance before completion - Add failure-threshold-exceeded examples for map and parallel operations - Create tests validating FAILURE_TOLERANCE_EXCEEDED behavior
1 parent a18dfe8 commit 8ffcf85

File tree

27 files changed

+1712
-53
lines changed

27 files changed

+1712
-53
lines changed

packages/aws-durable-execution-sdk-js-examples/jest.config.integration.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const defaultPreset = createDefaultPreset();
66
module.exports = {
77
...defaultPreset,
88
testMatch: ["**/src/examples/**/*.test.ts"],
9-
testTimeout: 90000,
9+
testTimeout: 120000,
1010
testNamePattern: "cloud",
1111
bail: true,
1212
};

packages/aws-durable-execution-sdk-js-examples/src/examples/force-checkpointing/invoke/force-checkpointing-invoke.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,6 @@ createTests({
6060
// Verify operations were tracked
6161
const operations = execution.getOperations();
6262
expect(operations.length).toBeGreaterThan(0);
63-
}, 30000); // 30 second timeout
63+
}, 60000); // 60 second timeout
6464
},
6565
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { handler } from "./map-failure-threshold-exceeded-count";
2+
import { createTests } from "../../../utils/test-helper";
3+
import { OperationStatus } from "@aws/durable-execution-sdk-js-testing";
4+
5+
createTests({
6+
name: "Map failure threshold exceeded count",
7+
functionName: "map-failure-threshold-exceeded-count",
8+
handler,
9+
tests: (runner) => {
10+
it("should return FAILURE_TOLERANCE_EXCEEDED when failure count exceeds threshold", async () => {
11+
const execution = await runner.run();
12+
const result = execution.getResult() as any;
13+
14+
expect(result.completionReason).toBe("FAILURE_TOLERANCE_EXCEEDED");
15+
expect(result.successCount).toBe(2); // Items 4 and 5 succeed
16+
expect(result.failureCount).toBe(3); // Items 1, 2, 3 fail (exceeds threshold of 2)
17+
expect(result.totalCount).toBe(5);
18+
19+
// Verify individual operation statuses
20+
[
21+
{ name: "process-0", status: OperationStatus.FAILED },
22+
{ name: "process-1", status: OperationStatus.FAILED },
23+
{ name: "process-2", status: OperationStatus.FAILED },
24+
{ name: "process-3", status: OperationStatus.SUCCEEDED },
25+
{ name: "process-4", status: OperationStatus.SUCCEEDED },
26+
].forEach(({ name, status }) => {
27+
expect(runner.getOperation(name)?.getStatus()).toBe(status);
28+
});
29+
});
30+
},
31+
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import {
2+
DurableContext,
3+
withDurableExecution,
4+
createRetryStrategy,
5+
} from "@aws/durable-execution-sdk-js";
6+
import { ExampleConfig } from "../../../types";
7+
8+
export const config: ExampleConfig = {
9+
name: "Map failure threshold exceeded count",
10+
description: "Map operation where failure count exceeds tolerance threshold",
11+
};
12+
13+
export const handler = withDurableExecution(
14+
async (event: any, context: DurableContext) => {
15+
const items = [1, 2, 3, 4, 5];
16+
17+
const result = await context.map(
18+
"failure-threshold-items",
19+
items,
20+
async (ctx: DurableContext, item: number, index: number) => {
21+
return await ctx.step(
22+
`process-${index}`,
23+
async () => {
24+
if (item <= 3) {
25+
throw new Error(`Item ${item} failed`);
26+
}
27+
return item * 2;
28+
},
29+
{ retryStrategy: createRetryStrategy({ maxAttempts: 2 }) },
30+
);
31+
},
32+
{
33+
completionConfig: {
34+
toleratedFailureCount: 2, // Allow only 2 failures, but we'll have 3
35+
},
36+
},
37+
);
38+
39+
await context.wait({ seconds: 1 });
40+
41+
return {
42+
completionReason: result.completionReason,
43+
successCount: result.successCount,
44+
failureCount: result.failureCount,
45+
totalCount: result.totalCount,
46+
};
47+
},
48+
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { handler } from "./map-failure-threshold-exceeded-percentage";
2+
import { createTests } from "../../../utils/test-helper";
3+
import { OperationStatus } from "@aws/durable-execution-sdk-js-testing";
4+
5+
createTests({
6+
name: "Map failure threshold exceeded percentage",
7+
functionName: "map-failure-threshold-exceeded-percentage",
8+
handler,
9+
tests: (runner) => {
10+
it("should return FAILURE_TOLERANCE_EXCEEDED when failure percentage exceeds threshold", async () => {
11+
const execution = await runner.run();
12+
const result = execution.getResult() as any;
13+
14+
expect(result.completionReason).toBe("FAILURE_TOLERANCE_EXCEEDED");
15+
expect(result.successCount).toBe(2); // Items 4 and 5 succeed
16+
expect(result.failureCount).toBe(3); // Items 1, 2, 3 fail (60% > 50% threshold)
17+
expect(result.totalCount).toBe(5);
18+
19+
// Verify individual operation statuses
20+
[
21+
{ name: "process-0", status: OperationStatus.FAILED },
22+
{ name: "process-1", status: OperationStatus.FAILED },
23+
{ name: "process-2", status: OperationStatus.FAILED },
24+
{ name: "process-3", status: OperationStatus.SUCCEEDED },
25+
{ name: "process-4", status: OperationStatus.SUCCEEDED },
26+
].forEach(({ name, status }) => {
27+
expect(runner.getOperation(name)?.getStatus()).toBe(status);
28+
});
29+
});
30+
},
31+
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import {
2+
DurableContext,
3+
withDurableExecution,
4+
createRetryStrategy,
5+
} from "@aws/durable-execution-sdk-js";
6+
import { ExampleConfig } from "../../../types";
7+
8+
export const config: ExampleConfig = {
9+
name: "Map failure threshold exceeded percentage",
10+
description:
11+
"Map operation where failure percentage exceeds tolerance threshold",
12+
};
13+
14+
export const handler = withDurableExecution(
15+
async (event: any, context: DurableContext) => {
16+
const items = [1, 2, 3, 4, 5];
17+
18+
const result = await context.map(
19+
"failure-threshold-items",
20+
items,
21+
async (ctx: DurableContext, item: number, index: number) => {
22+
return await ctx.step(
23+
`process-${index}`,
24+
async () => {
25+
if (item <= 3) {
26+
throw new Error(`Item ${item} failed`);
27+
}
28+
return item * 2;
29+
},
30+
{ retryStrategy: createRetryStrategy({ maxAttempts: 2 }) },
31+
);
32+
},
33+
{
34+
completionConfig: {
35+
toleratedFailurePercentage: 50, // Allow 50% failures, but we'll have 60% (3/5)
36+
},
37+
},
38+
);
39+
40+
await context.wait({ seconds: 1 });
41+
42+
return {
43+
completionReason: result.completionReason,
44+
successCount: result.successCount,
45+
failureCount: result.failureCount,
46+
totalCount: result.totalCount,
47+
};
48+
},
49+
);
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { handler } from "./map-min-successful";
2+
import { createTests } from "../../../utils/test-helper";
3+
import { OperationStatus } from "@aws/durable-execution-sdk-js-testing";
4+
5+
createTests({
6+
name: "Map minSuccessful",
7+
functionName: "map-min-successful",
8+
handler,
9+
tests: (runner) => {
10+
it("should complete early when minSuccessful is reached", async () => {
11+
const execution = await runner.run();
12+
const result = execution.getResult() as any;
13+
14+
// Assert overall results
15+
expect(result.successCount).toBe(2);
16+
expect(result.completionReason).toBe("MIN_SUCCESSFUL_REACHED");
17+
expect(result.results).toHaveLength(2);
18+
expect(result.totalCount).toBe(5);
19+
20+
// Get the map operation from history to verify individual item results
21+
// Get the map operation result
22+
const mapResult = runner.getOperation("min-successful-items");
23+
24+
// Get individual map item operations
25+
const item0 = runner.getOperation("process-0");
26+
const item1 = runner.getOperation("process-1");
27+
const item2 = runner.getOperation("process-2");
28+
const item3 = runner.getOperation("process-3");
29+
const item4 = runner.getOperation("process-4");
30+
31+
// First two items should succeed (items 1 and 2 process fastest due to timeout)
32+
expect(item0?.getStatus()).toBe(OperationStatus.SUCCEEDED);
33+
expect(item1?.getStatus()).toBe(OperationStatus.SUCCEEDED);
34+
35+
// TODO: Re-enable these assertions when we find the root cause of the cloud timing issue
36+
// where remaining items show SUCCEEDED instead of STARTED
37+
// Remaining items should be in STARTED state (not completed)
38+
// expect(item2?.getStatus()).toBe(OperationStatus.STARTED);
39+
// expect(item3?.getStatus()).toBe(OperationStatus.STARTED);
40+
// expect(item4?.getStatus()).toBe(OperationStatus.STARTED);
41+
42+
// Verify the results array matches
43+
expect(result.results).toEqual(["Item 1 processed", "Item 2 processed"]);
44+
});
45+
},
46+
});
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import {
2+
DurableContext,
3+
withDurableExecution,
4+
} from "@aws/durable-execution-sdk-js";
5+
import { ExampleConfig } from "../../../types";
6+
import { log } from "../../../utils/logger";
7+
8+
export const config: ExampleConfig = {
9+
name: "Map minSuccessful",
10+
description: "Map operation with minSuccessful completion config",
11+
};
12+
13+
export const handler = withDurableExecution(
14+
async (event: any, context: DurableContext) => {
15+
const items = [1, 2, 3, 4, 5];
16+
17+
log(`Processing ${items.length} items with minSuccessful: 2`);
18+
19+
const results = await context.map(
20+
"min-successful-items",
21+
items,
22+
async (ctx, item, index) => {
23+
return await ctx.step(`process-${index}`, async () => {
24+
// Simulate processing time
25+
await new Promise((resolve) => setTimeout(resolve, 100 * item));
26+
return `Item ${item} processed`;
27+
});
28+
},
29+
{
30+
completionConfig: {
31+
minSuccessful: 2,
32+
},
33+
},
34+
);
35+
36+
await context.wait({ seconds: 1 });
37+
38+
log(`Completed with ${results.successCount} successes`);
39+
log(`Completion reason: ${results.completionReason}`);
40+
41+
return {
42+
successCount: results.successCount,
43+
totalCount: results.totalCount,
44+
completionReason: results.completionReason,
45+
results: results.getResults(),
46+
};
47+
},
48+
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import { handler } from "./map-tolerated-failure-count";
2+
import { createTests } from "../../../utils/test-helper";
3+
import { OperationStatus } from "@aws/durable-execution-sdk-js-testing";
4+
5+
createTests({
6+
name: "Map toleratedFailureCount",
7+
functionName: "map-tolerated-failure-count",
8+
handler,
9+
tests: (runner) => {
10+
it("should complete when failure tolerance is reached", async () => {
11+
const execution = await runner.run();
12+
const result = execution.getResult() as any;
13+
14+
// Assert overall results
15+
expect(result.failureCount).toBe(2);
16+
expect(result.successCount).toBe(3);
17+
expect(result.completionReason).toBe("ALL_COMPLETED");
18+
expect(result.hasFailure).toBe(true);
19+
expect(result.totalCount).toBe(5);
20+
21+
// Verify individual operation statuses
22+
[
23+
{ name: "process-0", status: OperationStatus.SUCCEEDED },
24+
{ name: "process-1", status: OperationStatus.FAILED },
25+
{ name: "process-2", status: OperationStatus.SUCCEEDED },
26+
{ name: "process-3", status: OperationStatus.FAILED },
27+
{ name: "process-4", status: OperationStatus.SUCCEEDED },
28+
].forEach(({ name, status }) => {
29+
expect(runner.getOperation(name)?.getStatus()).toBe(status);
30+
});
31+
});
32+
},
33+
});
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import {
2+
DurableContext,
3+
withDurableExecution,
4+
retryPresets,
5+
} from "@aws/durable-execution-sdk-js";
6+
import { ExampleConfig } from "../../../types";
7+
import { log } from "../../../utils/logger";
8+
9+
export const config: ExampleConfig = {
10+
name: "Map toleratedFailureCount",
11+
description: "Map operation with toleratedFailureCount completion config",
12+
};
13+
14+
export const handler = withDurableExecution(
15+
async (event: any, context: DurableContext) => {
16+
const items = [1, 2, 3, 4, 5];
17+
18+
log(`Processing ${items.length} items with toleratedFailureCount: 2`);
19+
20+
const results = await context.map(
21+
"failure-count-items",
22+
items,
23+
async (ctx, item, index) => {
24+
return await ctx.step(
25+
`process-${index}`,
26+
async () => {
27+
// Items 2 and 4 will fail
28+
if (item === 2 || item === 4) {
29+
throw new Error(`Processing failed for item ${item}`);
30+
}
31+
return `Item ${item} processed`;
32+
},
33+
{ retryStrategy: retryPresets.noRetry },
34+
);
35+
},
36+
{
37+
completionConfig: {
38+
toleratedFailureCount: 2,
39+
},
40+
},
41+
);
42+
43+
await context.wait({ seconds: 1 });
44+
45+
log(`Completed with ${results.failureCount} failures`);
46+
log(`Completion reason: ${results.completionReason}`);
47+
48+
return {
49+
successCount: results.successCount,
50+
failureCount: results.failureCount,
51+
totalCount: results.totalCount,
52+
completionReason: results.completionReason,
53+
hasFailure: results.hasFailure,
54+
};
55+
},
56+
);

0 commit comments

Comments
 (0)