Skip to content

Latest commit

 

History

History
504 lines (406 loc) · 12.4 KB

File metadata and controls

504 lines (406 loc) · 12.4 KB
title Stream Pattern 7: Error Handling in Streams
id stream-pattern-error-handling
skillLevel advanced
applicationPatternId streams
summary Handle errors gracefully in streams with recovery strategies, resuming after failures, and maintaining stream integrity.
tags
streams
error-handling
recovery
fault-tolerance
error-propagation
stream-resilience
rule
description
Use Stream error handlers to recover from failures, retry operations, and maintain stream integrity even when individual elements fail.
related
error-handling-pattern-custom-strategies
error-handling-pattern-propagation
scheduling-pattern-advanced-retry-chains
author effect_website
lessonOrder 3

Guideline

Stream error handling enables resilience:

  • Continue on error: Skip failed element, process rest
  • Recover: Provide fallback value
  • Retry: Attempt failed element again
  • Aggregate: Collect errors alongside successful values
  • Terminate gracefully: Controlled shutdown
  • Propagate: Let errors flow upstream

Pattern: Stream.catchAll(), Stream.retry(), Stream.recover(), Stream.runCollect()


Rationale

Errors in streams cause cascading failures:

Problem 1: Stream death

  • Process 10,000 records
  • Record #5000 has bad data
  • Stream crashes
  • 9,000 records not processed
  • Manual re-run needed

Problem 2: Silent data loss

  • Stream encounters error
  • Stops processing
  • Caller doesn't notice
  • Missing data goes undetected
  • Reports wrong numbers

Problem 3: No recovery visibility

  • Error happens
  • Is it retried? How many times?
  • Did it recover?
  • Silent guessing required

Problem 4: Downstream effects

  • Stream error affects all subscribers
  • Cascading failure
  • System becomes unavailable
  • All downstream blocked

Solutions:

Continue on error:

  • Skip failed element
  • Process rest of stream
  • Collect error for later
  • Partial success acceptable

Retry with backoff:

  • Transient error? Retry
  • Exponential backoff
  • Eventually give up
  • Move to next element

Error aggregation:

  • Collect all errors
  • Collect all successes
  • Report both
  • Analytics/debugging

Graceful termination:

  • Signal end of stream on error
  • Allow cleanup
  • Prevent resource leak
  • Controlled shutdown

Good Example

This example demonstrates stream error handling patterns.

import { Effect, Stream, Ref } from "effect";

interface DataRecord {
  id: string;
  value: number;
}

interface ProcessingResult {
  successful: DataRecord[];
  failed: Array<{ id: string; error: string }>;
  retried: number;
}

const program = Effect.gen(function* () {
  console.log(`\n[STREAM ERROR HANDLING] Resilient stream processing\n`);

  // Example 1: Continue on error (skip failed, process rest)
  console.log(`[1] Continue processing despite errors:\n`);

  const processElement = (record: DataRecord): Effect.Effect<string> =>
    Effect.gen(function* () {
      if (record.value < 0) {
        yield* Effect.fail(new Error(`Invalid value: ${record.value}`));
      }

      return `processed-${record.id}`;
    });

  const records = [
    { id: "rec1", value: 10 },
    { id: "rec2", value: -5 }, // Will fail
    { id: "rec3", value: 20 },
    { id: "rec4", value: -1 }, // Will fail
    { id: "rec5", value: 30 },
  ];

  const successfulProcessing = yield* Stream.fromIterable(records).pipe(
    Stream.mapEffect((record) =>
      processElement(record).pipe(
        Effect.map((result) => ({ success: true, result })),
        Effect.catchAll((error) =>
          Effect.gen(function* () {
            yield* Effect.log(`[ERROR] Record ${record.id} failed`);

            return { success: false, error };
          })
        )
      )
    ),
    Stream.runCollect
  );

  yield* Effect.log(
    `[RESULTS] ${successfulProcessing.filter((r) => r.success).length}/${records.length} succeeded\n`
  );

  // Example 2: Recover with fallback value
  console.log(`[2] Providing fallback on error:\n`);

  const getData = (id: string): Effect.Effect<number> =>
    id.includes("fail") ? Effect.fail(new Error("Data error")) : Effect.succeed(42);

  const recovered = yield* Stream.fromIterable(["ok1", "fail1", "ok2"]).pipe(
    Stream.mapEffect((id) =>
      getData(id).pipe(
        Effect.catchAll(() =>
          Effect.gen(function* () {
            yield* Effect.log(`[FALLBACK] Using default for ${id}`);

            return -1; // Fallback value
          })
        )
      )
    ),
    Stream.runCollect
  );

  yield* Effect.log(`[VALUES] ${recovered.join(", ")}\n`);

  // Example 3: Collect errors alongside successes
  console.log(`[3] Collecting errors and successes:\n`);

  const results = yield* Ref.make<ProcessingResult>({
    successful: [],
    failed: [],
    retried: 0,
  });

  yield* Stream.fromIterable(records).pipe(
    Stream.mapEffect((record) =>
      processElement(record).pipe(
        Effect.tap((result) =>
          Ref.modify(results, (r) => [
            undefined,
            {
              ...r,
              successful: [...r.successful, record],
            },
          ])
        ),
        Effect.catchAll((error) =>
          Ref.modify(results, (r) => [
            undefined,
            {
              ...r,
              failed: [
                ...r.failed,
                { id: record.id, error: error.message },
              ],
            },
          ])
        )
      )
    ),
    Stream.runDrain
  );

  const finalResults = yield* Ref.get(results);

  yield* Effect.log(
    `[AGGREGATE] ${finalResults.successful.length} succeeded, ${finalResults.failed.length} failed`
  );

  for (const failure of finalResults.failed) {
    yield* Effect.log(`  - ${failure.id}: ${failure.error}`);
  }

  // Example 4: Retry on error with backoff
  console.log(`\n[4] Retry with exponential backoff:\n`);

  let attemptCount = 0;

  const unreliableOperation = (id: string): Effect.Effect<string> =>
    Effect.gen(function* () {
      attemptCount++;

      if (attemptCount <= 2) {
        yield* Effect.log(`[ATTEMPT ${attemptCount}] Failing for ${id}`);

        yield* Effect.fail(new Error("Temporary failure"));
      }

      yield* Effect.log(`[SUCCESS] Succeeded on attempt ${attemptCount}`);

      return `result-${id}`;
    });

  const retried = unreliableOperation("test").pipe(
    Effect.retry(
      Schedule.exponential("10 millis").pipe(
        Schedule.upTo("100 millis"),
        Schedule.recurs(3)
      )
    ),
    Effect.catchAll((error) =>
      Effect.gen(function* () {
        yield* Effect.log(`[EXHAUSTED] All retries failed`);

        return "fallback";
      })
    )
  );

  yield* retried;

  // Example 5: Error context in streams
  console.log(`\n[5] Propagating error context:\n`);

  interface StreamContext {
    batchId: string;
    timestamp: Date;
  }

  const processWithContext = (context: StreamContext) =>
    Stream.fromIterable([1, 2, -3, 4]).pipe(
      Stream.mapEffect((value) =>
        Effect.gen(function* () {
          if (value < 0) {
            yield* Effect.fail(
              new Error(
                `Negative value in batch ${context.batchId} at ${context.timestamp.toISOString()}`
              )
            );
          }

          return value * 2;
        })
      ),
      Stream.catchAll((error) =>
        Effect.gen(function* () {
          yield* Effect.log(`[CONTEXT ERROR] ${error.message}`);

          return Stream.empty;
        })
      )
    );

  const context: StreamContext = {
    batchId: "batch-001",
    timestamp: new Date(),
  };

  yield* processWithContext(context).pipe(Stream.runDrain);

  // Example 6: Partial recovery (keep good data, log bad)
  console.log(`\n[6] Partial recovery strategy:\n`);

  const mixedQuality = [
    { id: "1", data: "good" },
    { id: "2", data: "bad" },
    { id: "3", data: "good" },
    { id: "4", data: "bad" },
    { id: "5", data: "good" },
  ];

  const processQuality = (record: { id: string; data: string }) =>
    record.data === "good"
      ? Effect.succeed(`valid-${record.id}`)
      : Effect.fail(new Error(`Invalid data for ${record.id}`));

  const partialResults = yield* Stream.fromIterable(mixedQuality).pipe(
    Stream.mapEffect((record) =>
      processQuality(record).pipe(
        Effect.catchAll((error) =>
          Effect.gen(function* () {
            yield* Effect.log(`[LOG] ${error.message}`);

            return null; // Skip this record
          })
        )
      )
    ),
    Stream.filter((result) => result !== null),
    Stream.runCollect
  );

  yield* Effect.log(
    `[PARTIAL] Kept ${partialResults.length}/${mixedQuality.length} valid records\n`
  );

  // Example 7: Timeout handling in streams
  console.log(`[7] Timeout handling per element:\n`);

  const slowOperation = (id: string): Effect.Effect<string> =>
    Effect.gen(function* () {
      // Simulate slow operations
      if (id === "slow") {
        yield* Effect.sleep("200 millis");
      } else {
        yield* Effect.sleep("50 millis");
      }

      return `done-${id}`;
    });

  const withTimeout = yield* Stream.fromIterable(["fast1", "slow", "fast2"]).pipe(
    Stream.mapEffect((id) =>
      slowOperation(id).pipe(
        Effect.timeout("100 millis"),
        Effect.catchAll((error) =>
          Effect.gen(function* () {
            yield* Effect.log(`[TIMEOUT] Operation ${id} timed out`);

            return "timeout-fallback";
          })
        )
      )
    ),
    Stream.runCollect
  );

  yield* Effect.log(`[RESULTS] ${withTimeout.join(", ")}\n`);

  // Example 8: Stream termination on critical error
  console.log(`[8] Terminating stream on critical error:\n`);

  const isCritical = (error: Error): boolean =>
    error.message.includes("CRITICAL");

  const terminateOnCritical = Stream.fromIterable([1, 2, 3]).pipe(
    Stream.mapEffect((value) =>
      value === 2
        ? Effect.fail(new Error("CRITICAL: System failure"))
        : Effect.succeed(value)
    ),
    Stream.catchAll((error) =>
      Effect.gen(function* () {
        if (isCritical(error)) {
          yield* Effect.log(`[CRITICAL] Terminating stream`);

          return Stream.fail(error);
        }

        yield* Effect.log(`[WARNING] Continuing despite error`);

        return Stream.empty;
      })
    )
  );

  yield* terminateOnCritical.pipe(
    Stream.runCollect,
    Effect.catchAll((error) =>
      Effect.gen(function* () {
        yield* Effect.log(`[STOPPED] Stream stopped: ${error.message}`);

        return [];
      })
    )
  );
});

Effect.runPromise(program);

Advanced: Error Recovery Strategies

Build sophisticated recovery pipelines:

interface RecoveryStrategy {
  canRecover: (error: Error) => boolean;
  recover: (error: Error, element: unknown) =>
    Effect.Effect<unknown> | null;
  priority: number;
}

const applyRecoveryStrategies = (
  stream: Stream.Stream<unknown>,
  strategies: RecoveryStrategy[]
) =>
  stream.pipe(
    Stream.catchAll((error) =>
      Effect.gen(function* () {
        const sorted = strategies.sort((a, b) =>
          b.priority - a.priority
        );

        for (const strategy of sorted) {
          if (strategy.canRecover(error)) {
            const recovered = yield* strategy.recover(error, null).pipe(
              Effect.catchAll(() => Effect.fail(error))
            );

            return Stream.succeed(recovered);
          }
        }

        return Stream.fail(error);
      })
    )
  );

When to Use This Pattern

Use error recovery when:

  • Stream processes many items
  • Some failures acceptable
  • Need partial success
  • Resilience matters
  • Want to continue processing

Use retry when:

  • Transient errors likely
  • Temporary failures common
  • Recovery expected
  • Backoff needed

⚠️ Trade-offs:

  • More error cases to handle
  • Complexity increases
  • Debugging harder
  • Performance impact

Error Handling Strategies

Strategy When Trade-off
Continue Some errors OK Partial data
Retry Transient failures Latency overhead
Aggregate Need all errors More memory
Terminate Critical errors Data loss
Fallback Recovery possible Approximation

See Also