| title | Stream Pattern 7: Error Handling in Streams | ||||||
|---|---|---|---|---|---|---|---|
| id | stream-pattern-error-handling | ||||||
| skillLevel | advanced | ||||||
| applicationPatternId | streams | ||||||
| summary | Handle errors gracefully in streams with recovery strategies, resuming after failures, and maintaining stream integrity. | ||||||
| tags |
|
||||||
| rule |
|
||||||
| related |
|
||||||
| author | effect_website | ||||||
| lessonOrder | 3 |
Stream error handling enables resilience:
- Continue on error: Skip failed element, process rest
- Recover: Provide fallback value
- Retry: Attempt failed element again
- Aggregate: Collect errors alongside successful values
- Terminate gracefully: Controlled shutdown
- Propagate: Let errors flow upstream
Pattern: Stream.catchAll(), Stream.retry(), Stream.recover(), Stream.runCollect()
Errors in streams cause cascading failures:
Problem 1: Stream death
- Process 10,000 records
- Record #5000 has bad data
- Stream crashes
- 9,000 records not processed
- Manual re-run needed
Problem 2: Silent data loss
- Stream encounters error
- Stops processing
- Caller doesn't notice
- Missing data goes undetected
- Reports wrong numbers
Problem 3: No recovery visibility
- Error happens
- Is it retried? How many times?
- Did it recover?
- Silent guessing required
Problem 4: Downstream effects
- Stream error affects all subscribers
- Cascading failure
- System becomes unavailable
- All downstream blocked
Solutions:
Continue on error:
- Skip failed element
- Process rest of stream
- Collect error for later
- Partial success acceptable
Retry with backoff:
- Transient error? Retry
- Exponential backoff
- Eventually give up
- Move to next element
Error aggregation:
- Collect all errors
- Collect all successes
- Report both
- Analytics/debugging
Graceful termination:
- Signal end of stream on error
- Allow cleanup
- Prevent resource leak
- Controlled shutdown
This example demonstrates stream error handling patterns.
import { Effect, Stream, Ref } from "effect";
interface DataRecord {
id: string;
value: number;
}
interface ProcessingResult {
successful: DataRecord[];
failed: Array<{ id: string; error: string }>;
retried: number;
}
const program = Effect.gen(function* () {
console.log(`\n[STREAM ERROR HANDLING] Resilient stream processing\n`);
// Example 1: Continue on error (skip failed, process rest)
console.log(`[1] Continue processing despite errors:\n`);
const processElement = (record: DataRecord): Effect.Effect<string> =>
Effect.gen(function* () {
if (record.value < 0) {
yield* Effect.fail(new Error(`Invalid value: ${record.value}`));
}
return `processed-${record.id}`;
});
const records = [
{ id: "rec1", value: 10 },
{ id: "rec2", value: -5 }, // Will fail
{ id: "rec3", value: 20 },
{ id: "rec4", value: -1 }, // Will fail
{ id: "rec5", value: 30 },
];
const successfulProcessing = yield* Stream.fromIterable(records).pipe(
Stream.mapEffect((record) =>
processElement(record).pipe(
Effect.map((result) => ({ success: true, result })),
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.log(`[ERROR] Record ${record.id} failed`);
return { success: false, error };
})
)
)
),
Stream.runCollect
);
yield* Effect.log(
`[RESULTS] ${successfulProcessing.filter((r) => r.success).length}/${records.length} succeeded\n`
);
// Example 2: Recover with fallback value
console.log(`[2] Providing fallback on error:\n`);
const getData = (id: string): Effect.Effect<number> =>
id.includes("fail") ? Effect.fail(new Error("Data error")) : Effect.succeed(42);
const recovered = yield* Stream.fromIterable(["ok1", "fail1", "ok2"]).pipe(
Stream.mapEffect((id) =>
getData(id).pipe(
Effect.catchAll(() =>
Effect.gen(function* () {
yield* Effect.log(`[FALLBACK] Using default for ${id}`);
return -1; // Fallback value
})
)
)
),
Stream.runCollect
);
yield* Effect.log(`[VALUES] ${recovered.join(", ")}\n`);
// Example 3: Collect errors alongside successes
console.log(`[3] Collecting errors and successes:\n`);
const results = yield* Ref.make<ProcessingResult>({
successful: [],
failed: [],
retried: 0,
});
yield* Stream.fromIterable(records).pipe(
Stream.mapEffect((record) =>
processElement(record).pipe(
Effect.tap((result) =>
Ref.modify(results, (r) => [
undefined,
{
...r,
successful: [...r.successful, record],
},
])
),
Effect.catchAll((error) =>
Ref.modify(results, (r) => [
undefined,
{
...r,
failed: [
...r.failed,
{ id: record.id, error: error.message },
],
},
])
)
)
),
Stream.runDrain
);
const finalResults = yield* Ref.get(results);
yield* Effect.log(
`[AGGREGATE] ${finalResults.successful.length} succeeded, ${finalResults.failed.length} failed`
);
for (const failure of finalResults.failed) {
yield* Effect.log(` - ${failure.id}: ${failure.error}`);
}
// Example 4: Retry on error with backoff
console.log(`\n[4] Retry with exponential backoff:\n`);
let attemptCount = 0;
const unreliableOperation = (id: string): Effect.Effect<string> =>
Effect.gen(function* () {
attemptCount++;
if (attemptCount <= 2) {
yield* Effect.log(`[ATTEMPT ${attemptCount}] Failing for ${id}`);
yield* Effect.fail(new Error("Temporary failure"));
}
yield* Effect.log(`[SUCCESS] Succeeded on attempt ${attemptCount}`);
return `result-${id}`;
});
const retried = unreliableOperation("test").pipe(
Effect.retry(
Schedule.exponential("10 millis").pipe(
Schedule.upTo("100 millis"),
Schedule.recurs(3)
)
),
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.log(`[EXHAUSTED] All retries failed`);
return "fallback";
})
)
);
yield* retried;
// Example 5: Error context in streams
console.log(`\n[5] Propagating error context:\n`);
interface StreamContext {
batchId: string;
timestamp: Date;
}
const processWithContext = (context: StreamContext) =>
Stream.fromIterable([1, 2, -3, 4]).pipe(
Stream.mapEffect((value) =>
Effect.gen(function* () {
if (value < 0) {
yield* Effect.fail(
new Error(
`Negative value in batch ${context.batchId} at ${context.timestamp.toISOString()}`
)
);
}
return value * 2;
})
),
Stream.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.log(`[CONTEXT ERROR] ${error.message}`);
return Stream.empty;
})
)
);
const context: StreamContext = {
batchId: "batch-001",
timestamp: new Date(),
};
yield* processWithContext(context).pipe(Stream.runDrain);
// Example 6: Partial recovery (keep good data, log bad)
console.log(`\n[6] Partial recovery strategy:\n`);
const mixedQuality = [
{ id: "1", data: "good" },
{ id: "2", data: "bad" },
{ id: "3", data: "good" },
{ id: "4", data: "bad" },
{ id: "5", data: "good" },
];
const processQuality = (record: { id: string; data: string }) =>
record.data === "good"
? Effect.succeed(`valid-${record.id}`)
: Effect.fail(new Error(`Invalid data for ${record.id}`));
const partialResults = yield* Stream.fromIterable(mixedQuality).pipe(
Stream.mapEffect((record) =>
processQuality(record).pipe(
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.log(`[LOG] ${error.message}`);
return null; // Skip this record
})
)
)
),
Stream.filter((result) => result !== null),
Stream.runCollect
);
yield* Effect.log(
`[PARTIAL] Kept ${partialResults.length}/${mixedQuality.length} valid records\n`
);
// Example 7: Timeout handling in streams
console.log(`[7] Timeout handling per element:\n`);
const slowOperation = (id: string): Effect.Effect<string> =>
Effect.gen(function* () {
// Simulate slow operations
if (id === "slow") {
yield* Effect.sleep("200 millis");
} else {
yield* Effect.sleep("50 millis");
}
return `done-${id}`;
});
const withTimeout = yield* Stream.fromIterable(["fast1", "slow", "fast2"]).pipe(
Stream.mapEffect((id) =>
slowOperation(id).pipe(
Effect.timeout("100 millis"),
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.log(`[TIMEOUT] Operation ${id} timed out`);
return "timeout-fallback";
})
)
)
),
Stream.runCollect
);
yield* Effect.log(`[RESULTS] ${withTimeout.join(", ")}\n`);
// Example 8: Stream termination on critical error
console.log(`[8] Terminating stream on critical error:\n`);
const isCritical = (error: Error): boolean =>
error.message.includes("CRITICAL");
const terminateOnCritical = Stream.fromIterable([1, 2, 3]).pipe(
Stream.mapEffect((value) =>
value === 2
? Effect.fail(new Error("CRITICAL: System failure"))
: Effect.succeed(value)
),
Stream.catchAll((error) =>
Effect.gen(function* () {
if (isCritical(error)) {
yield* Effect.log(`[CRITICAL] Terminating stream`);
return Stream.fail(error);
}
yield* Effect.log(`[WARNING] Continuing despite error`);
return Stream.empty;
})
)
);
yield* terminateOnCritical.pipe(
Stream.runCollect,
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.log(`[STOPPED] Stream stopped: ${error.message}`);
return [];
})
)
);
});
Effect.runPromise(program);Build sophisticated recovery pipelines:
interface RecoveryStrategy {
canRecover: (error: Error) => boolean;
recover: (error: Error, element: unknown) =>
Effect.Effect<unknown> | null;
priority: number;
}
const applyRecoveryStrategies = (
stream: Stream.Stream<unknown>,
strategies: RecoveryStrategy[]
) =>
stream.pipe(
Stream.catchAll((error) =>
Effect.gen(function* () {
const sorted = strategies.sort((a, b) =>
b.priority - a.priority
);
for (const strategy of sorted) {
if (strategy.canRecover(error)) {
const recovered = yield* strategy.recover(error, null).pipe(
Effect.catchAll(() => Effect.fail(error))
);
return Stream.succeed(recovered);
}
}
return Stream.fail(error);
})
)
);✅ Use error recovery when:
- Stream processes many items
- Some failures acceptable
- Need partial success
- Resilience matters
- Want to continue processing
✅ Use retry when:
- Transient errors likely
- Temporary failures common
- Recovery expected
- Backoff needed
- More error cases to handle
- Complexity increases
- Debugging harder
- Performance impact
| Strategy | When | Trade-off |
|---|---|---|
| Continue | Some errors OK | Partial data |
| Retry | Transient failures | Latency overhead |
| Aggregate | Need all errors | More memory |
| Terminate | Critical errors | Data loss |
| Fallback | Recovery possible | Approximation |
- Error Handling Pattern 2: Propagation - Error chains
- Error Handling Pattern 3: Custom Strategies - Custom errors
- Scheduling Pattern 5: Advanced Retries - Retry patterns
- Stream Pattern 6: Resource Management - Safe cleanup