diff --git a/README.md b/README.md index 7d70287..3a119e9 100644 --- a/README.md +++ b/README.md @@ -6,16 +6,15 @@ [![Gem Version](https://badge.fury.io/rb/rapidflow.svg)](https://badge.fury.io/rb/rapidflow) [![License](https://img.shields.io/badge/license-MIT-blue.svg)](LICENSE) -> **⚠️ This library is at a very early stage of development. The interfaces and APIs -> may change without backward compatibility guarantees. +> Note: ⚠️ This library is at a very early stage of development. The interfaces and APIs +> may change without backward compatibility guarantees in minor versions (0.[minor version].[patch]). -RapidFlow is a lightweight, concurrent pipeline processor for Ruby that transforms data through multiple stages like -items moving through stages in a rapid flow. Perfect for I/O-bound operations like web scraping, API calls, -and data processing. +RapidFlow is a lightweight, concurrent pipeline processor for Ruby that transforms data through multiple stages using Ruby Threads. +Perfect for I/O-bound operations like web scraping, API calls, and data processing. ## Features -- 🚀 **Concurrent Processing** - Multiple workers per stage process items in parallel +- 🚀 **Concurrent Processing** - Multiple workers per stage process items concurrently - 🔄 **True Pipelining** - Different stages process different items simultaneously - 📦 **Order Preservation** - Results returned in the same order items were pushed - 🛡️ **Error Handling** - Captures exceptions without stopping the flow @@ -89,7 +88,7 @@ Note that Once you call `Batch#results`, it will block the batch until all proce longer push items to the batch instance. The results are returned in the same order as the original items were pushed. Each result is an array of -`[data, error]`. No error means, the item successfully were processed through the stages. +`[data, error]`. No error means the item successfully were processed through the stages. ```ruby results.each_with_index do |(data, error), index| @@ -105,7 +104,7 @@ end RapidFlow continues running even when errors occur, instead of stopping the entire pipeline. -When an item encounters an error at any stage, RapidFlow captures that error and immediately moves the item to the +When an item encounters an error at any stage, RapidFlow captures that error and moves the item to the final results—skipping all remaining stages for that particular item. Each result comes as a pair: `[data, error]`. @@ -124,11 +123,17 @@ results = batch.results results.each_with_index do |(data, error), index| if error - # Original input is preserved in 'data' for debugging + # Original input if error happened at first stage. Otherwise, transformed data from the previous stage before the error happened + # It is preserved in 'data' for debugging if needed. + puts "Data state before error #{data}" + puts "Failed to process #{urls[index]}: #{error.class} - #{error.message}" # Log error, retry, or handle gracefully + + puts "Error backtrace: " + pp error.backtrace # As any Exception contains the backtrace(https://docs.ruby-lang.org/en/master/Exception.html#method-i-backtrace), - # for further debugging, you can refer to backtrace. + # for further debugging, you can look into backtrace. else puts "Success: #{data}" end @@ -374,7 +379,7 @@ RapidFlow::Batch.new({ fn: lambda1, workers: 2 }, { fn: lambda2, workers: 2 }) For the best throughput, workers should be assigned based on the I/O-bound workload of each stage: ```ruby -# ❌ Same number of workers even though stages have different I/O duration +# ❌ Same number of workers even though stages have different I/O load RapidFlow::Batch.build do stage ->(x) { sleep(10); x }, workers: 4 # 10 seconds - SLOW! (Assume a heavy or long-running I/O task) stage ->(x) { sleep(0.1); x }, workers: 4 # 0.1 seconds - fast @@ -393,7 +398,7 @@ end ### Memory Considerations -- Each queue can grow unbounded - don't push millions of items without consuming results +- Each queue can grow unbounded—don't push millions of items without consuming results - Workers hold items in memory during processing - Memory usage ≈ (items in queues + items being processed) × item size