From 464ac5f432459f8223f84f3dae6f41f676531ec1 Mon Sep 17 00:00:00 2001 From: Sinaru Gunawardena Date: Tue, 11 Nov 2025 10:53:26 +0000 Subject: [PATCH 1/2] Fix file name for batch builder --- lib/rapidflow.rb | 2 +- lib/rapidflow/{builder_builder.rb => batch_builder.rb} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename lib/rapidflow/{builder_builder.rb => batch_builder.rb} (100%) diff --git a/lib/rapidflow.rb b/lib/rapidflow.rb index 51c01cb..5f5fd68 100644 --- a/lib/rapidflow.rb +++ b/lib/rapidflow.rb @@ -6,7 +6,7 @@ require_relative "rapidflow/pipeline" require_relative "rapidflow/work_item" require_relative "rapidflow/stage" -require_relative "rapidflow/builder_builder" +require_relative "rapidflow/batch_builder" require_relative "rapidflow/batch" module RapidFlow diff --git a/lib/rapidflow/builder_builder.rb b/lib/rapidflow/batch_builder.rb similarity index 100% rename from lib/rapidflow/builder_builder.rb rename to lib/rapidflow/batch_builder.rb From d38d7b2619cf791d3be2c351c379e15320bb72c4 Mon Sep 17 00:00:00 2001 From: Sinaru Gunawardena Date: Tue, 11 Nov 2025 12:10:59 +0000 Subject: [PATCH 2/2] Add more error handling tests and update README.md for error handling --- README.md | 73 ++++++++++++--------- test/rapidflow/batch/error_handling_test.rb | 68 +++++++++++++++++++ 2 files changed, 109 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index d05007b..1b2cca0 100644 --- a/README.md +++ b/README.md @@ -89,8 +89,7 @@ Note that Once you call `Batch#results`, it will block the batch until all proce longer push items to the batch instance. The results are returned in the same order as the original items were pushed. Each result is an array of -`[data, error]`. If an error happened for an item during processing, `error` will be present with the Error instance. -Otherwise `data` represent the final data that was successfully processed and `error` will be `nil`. +`[data, error]`. No error means, the item successfully were processed through the stages. ```ruby results.each_with_index do |(data, error), index| @@ -102,6 +101,46 @@ results.each_with_index do |(data, error), index| end ``` +## Error Handling + +RapidFlow continues running even when errors occur, instead of stopping the entire pipeline. + +When an item encounters an error at any stage, RapidFlow captures that error and immediately moves the item to the +final results—skipping all remaining stages for that particular item. + +Each result comes as a pair: `[data, error]`. +- If processing failed: `error` contains the Error instance, and `data` holds whatever transformed data existed +from the last successful stage (or original input data if the error occurred at the first stage). +- If processing succeeded: `data` contains the fully processed result, and `error` is `nil`. + +```ruby +batch = RapidFlow::Batch.new( + { fn: ->(url) { HTTP.get(url).body } }, # May raise network errors + { fn: ->(body) { JSON.parse(body) } } # May raise JSON parse errors +) + +urls.each { |url| batch.push(url) } +results = batch.results + +results.each_with_index do |(data, error), index| + if error + # Original input is preserved in 'data' for debugging + puts "Failed to process #{urls[index]}: #{error.class} - #{error.message}" + # Log error, retry, or handle gracefully + # As any Exception contains the backtrace(https://docs.ruby-lang.org/en/master/Exception.html#method-i-backtrace), + # for further debugging, you can refer to backtrace. + else + puts "Success: #{data}" + end +end +``` + +**Error behavior:** +- Exceptions are caught and returned with results +- The transformed data from the previous stage is preserved when an error occurs +- Errors in early stages skip remaining stages until they reach the result queue +- Other items continue processing (errors don't stop the batch) + ## Usage Examples ### Web Scraping Pipeline @@ -218,36 +257,6 @@ urls.each { |url| fetcher.push(url) } pages = fetcher.results ``` -## Error Handling - -RapidFlow captures exceptions without stopping the pipeline: - -```ruby -batch = RapidFlow::Batch.new( - { fn: ->(url) { HTTP.get(url).body } }, # May raise network errors - { fn: ->(body) { JSON.parse(body) } } # May raise JSON parse errors -) - -urls.each { |url| batch.push(url) } -results = batch.results - -results.each_with_index do |(data, error), index| - if error - # Original input is preserved in 'data' for debugging - puts "Failed to process #{urls[index]}: #{error.class} - #{error.message}" - # Log error, retry, or handle gracefully - else - puts "Success: #{data}" - end -end -``` - -**Error behavior:** -- Exceptions are caught and returned with results -- The original data is preserved when an error occurs -- Errors in early stages passed down to remaining stages until they reach the result queue -- Other items continue processing (errors don't stop the batch) - ## Architecture RapidFlow uses a multi-stage pipeline architecture with concurrent workers at each stage. diff --git a/test/rapidflow/batch/error_handling_test.rb b/test/rapidflow/batch/error_handling_test.rb index af50a77..2cc9856 100644 --- a/test/rapidflow/batch/error_handling_test.rb +++ b/test/rapidflow/batch/error_handling_test.rb @@ -139,5 +139,73 @@ def test_all_items_fail assert_equal "Always fails", error.message end end + + def test_no_method_error_in_first_lamda_function + batch = Batch.build do + # invalid stage - calling invalid method + stage ->(data) { data.foobar } + + # valid stage + stage ->(data) { data.upcase } + + # valid stage + stage ->(data) { data + '!' } + end + + batch.push("hello") + + results = batch.results + + assert_equal 1, results.length + assert_equal "hello", results.first[0] # preserved the original input as the error happened in the first stage + assert_instance_of NoMethodError, results.first[1] + + expected_error_message = case RUBY_VERSION + when /^3.4/ + "undefined method 'foobar' for an instance of String" + when /^3.3/ + "undefined method `foobar' for an instance of String" + when /^3.2/ + "undefined method `foobar' for \"hello\":String" + else + raise "Unexpected ruby version: #{RUBY_VERSION}" + end + + assert_equal expected_error_message, results.first[1].message + end + + def test_no_method_error_in_mid_lamda_function + batch = Batch.build do + # valid stage + stage ->(data) { data.upcase } + + # invalid stage - calling invalid method + stage ->(data) { data.foobar } + + # valid stage + stage ->(data) { data + '!' } + end + + batch.push("hello") + + results = batch.results + + assert_equal 1, results.length + assert_equal "HELLO", results.first[0] + assert_instance_of NoMethodError, results.first[1] + + expected_error_message = case RUBY_VERSION + when /^3.4/ + "undefined method 'foobar' for an instance of String" + when /^3.3/ + "undefined method `foobar' for an instance of String" + when /^3.2/ + "undefined method `foobar' for \"HELLO\":String" + else + raise "Unexpected ruby version: #{RUBY_VERSION}" + end + + assert_equal expected_error_message, results.first[1].message + end end end