Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 41 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ Note that Once you call `Batch#results`, it will block the batch until all proce
longer push items to the batch instance.

The results are returned in the same order as the original items were pushed. Each result is an array of
`[data, error]`. If an error happened for an item during processing, `error` will be present with the Error instance.
Otherwise `data` represent the final data that was successfully processed and `error` will be `nil`.
`[data, error]`. No error means, the item successfully were processed through the stages.

```ruby
results.each_with_index do |(data, error), index|
Expand All @@ -102,6 +101,46 @@ results.each_with_index do |(data, error), index|
end
```

## Error Handling

RapidFlow continues running even when errors occur, instead of stopping the entire pipeline.

When an item encounters an error at any stage, RapidFlow captures that error and immediately moves the item to the
final results—skipping all remaining stages for that particular item.

Each result comes as a pair: `[data, error]`.
- If processing failed: `error` contains the Error instance, and `data` holds whatever transformed data existed
from the last successful stage (or original input data if the error occurred at the first stage).
- If processing succeeded: `data` contains the fully processed result, and `error` is `nil`.

```ruby
batch = RapidFlow::Batch.new(
{ fn: ->(url) { HTTP.get(url).body } }, # May raise network errors
{ fn: ->(body) { JSON.parse(body) } } # May raise JSON parse errors
)

urls.each { |url| batch.push(url) }
results = batch.results

results.each_with_index do |(data, error), index|
if error
# Original input is preserved in 'data' for debugging
puts "Failed to process #{urls[index]}: #{error.class} - #{error.message}"
# Log error, retry, or handle gracefully
# As any Exception contains the backtrace(https://docs.ruby-lang.org/en/master/Exception.html#method-i-backtrace),
# for further debugging, you can refer to backtrace.
else
puts "Success: #{data}"
end
end
```

**Error behavior:**
- Exceptions are caught and returned with results
- The transformed data from the previous stage is preserved when an error occurs
- Errors in early stages skip remaining stages until they reach the result queue
- Other items continue processing (errors don't stop the batch)

## Usage Examples

### Web Scraping Pipeline
Expand Down Expand Up @@ -218,36 +257,6 @@ urls.each { |url| fetcher.push(url) }
pages = fetcher.results
```

## Error Handling

RapidFlow captures exceptions without stopping the pipeline:

```ruby
batch = RapidFlow::Batch.new(
{ fn: ->(url) { HTTP.get(url).body } }, # May raise network errors
{ fn: ->(body) { JSON.parse(body) } } # May raise JSON parse errors
)

urls.each { |url| batch.push(url) }
results = batch.results

results.each_with_index do |(data, error), index|
if error
# Original input is preserved in 'data' for debugging
puts "Failed to process #{urls[index]}: #{error.class} - #{error.message}"
# Log error, retry, or handle gracefully
else
puts "Success: #{data}"
end
end
```

**Error behavior:**
- Exceptions are caught and returned with results
- The original data is preserved when an error occurs
- Errors in early stages passed down to remaining stages until they reach the result queue
- Other items continue processing (errors don't stop the batch)

## Architecture

RapidFlow uses a multi-stage pipeline architecture with concurrent workers at each stage.
Expand Down
2 changes: 1 addition & 1 deletion lib/rapidflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
require_relative "rapidflow/pipeline"
require_relative "rapidflow/work_item"
require_relative "rapidflow/stage"
require_relative "rapidflow/builder_builder"
require_relative "rapidflow/batch_builder"
require_relative "rapidflow/batch"

module RapidFlow
Expand Down
File renamed without changes.
68 changes: 68 additions & 0 deletions test/rapidflow/batch/error_handling_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,5 +139,73 @@ def test_all_items_fail
assert_equal "Always fails", error.message
end
end

def test_no_method_error_in_first_lamda_function
batch = Batch.build do
# invalid stage - calling invalid method
stage ->(data) { data.foobar }

# valid stage
stage ->(data) { data.upcase }

# valid stage
stage ->(data) { data + '!' }
end

batch.push("hello")

results = batch.results

assert_equal 1, results.length
assert_equal "hello", results.first[0] # preserved the original input as the error happened in the first stage
assert_instance_of NoMethodError, results.first[1]

expected_error_message = case RUBY_VERSION
when /^3.4/
"undefined method 'foobar' for an instance of String"
when /^3.3/
"undefined method `foobar' for an instance of String"
when /^3.2/
"undefined method `foobar' for \"hello\":String"
else
raise "Unexpected ruby version: #{RUBY_VERSION}"
end

assert_equal expected_error_message, results.first[1].message
end

def test_no_method_error_in_mid_lamda_function
batch = Batch.build do
# valid stage
stage ->(data) { data.upcase }

# invalid stage - calling invalid method
stage ->(data) { data.foobar }

# valid stage
stage ->(data) { data + '!' }
end

batch.push("hello")

results = batch.results

assert_equal 1, results.length
assert_equal "HELLO", results.first[0]
assert_instance_of NoMethodError, results.first[1]

expected_error_message = case RUBY_VERSION
when /^3.4/
"undefined method 'foobar' for an instance of String"
when /^3.3/
"undefined method `foobar' for an instance of String"
when /^3.2/
"undefined method `foobar' for \"HELLO\":String"
else
raise "Unexpected ruby version: #{RUBY_VERSION}"
end

assert_equal expected_error_message, results.first[1].message
end
end
end