Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,15 @@
[![Gem Version](https://badge.fury.io/rb/rapidflow.svg)](https://badge.fury.io/rb/rapidflow)
[![License](https://img.shields.io/badge/license-MIT-blue.svg)](LICENSE)

> **⚠️ This library is at a very early stage of development. The interfaces and APIs
> may change without backward compatibility guarantees.
> Note: ⚠️ This library is at a very early stage of development. The interfaces and APIs
> may change without backward compatibility guarantees in minor versions (0.[minor version].[patch]).

RapidFlow is a lightweight, concurrent pipeline processor for Ruby that transforms data through multiple stages like
items moving through stages in a rapid flow. Perfect for I/O-bound operations like web scraping, API calls,
and data processing.
RapidFlow is a lightweight, concurrent pipeline processor for Ruby that transforms data through multiple stages using Ruby Threads.
Perfect for I/O-bound operations like web scraping, API calls, and data processing.

## Features

- 🚀 **Concurrent Processing** - Multiple workers per stage process items in parallel
- 🚀 **Concurrent Processing** - Multiple workers per stage process items concurrently
- 🔄 **True Pipelining** - Different stages process different items simultaneously
- 📦 **Order Preservation** - Results returned in the same order items were pushed
- 🛡️ **Error Handling** - Captures exceptions without stopping the flow
Expand Down Expand Up @@ -89,7 +88,7 @@ Note that Once you call `Batch#results`, it will block the batch until all proce
longer push items to the batch instance.

The results are returned in the same order as the original items were pushed. Each result is an array of
`[data, error]`. No error means, the item successfully were processed through the stages.
`[data, error]`. No error means the item successfully were processed through the stages.

```ruby
results.each_with_index do |(data, error), index|
Expand All @@ -105,7 +104,7 @@ end

RapidFlow continues running even when errors occur, instead of stopping the entire pipeline.

When an item encounters an error at any stage, RapidFlow captures that error and immediately moves the item to the
When an item encounters an error at any stage, RapidFlow captures that error and moves the item to the
final results—skipping all remaining stages for that particular item.

Each result comes as a pair: `[data, error]`.
Expand All @@ -124,11 +123,17 @@ results = batch.results

results.each_with_index do |(data, error), index|
if error
# Original input is preserved in 'data' for debugging
# Original input if error happened at first stage. Otherwise, transformed data from the previous stage before the error happened
# It is preserved in 'data' for debugging if needed.
puts "Data state before error #{data}"

puts "Failed to process #{urls[index]}: #{error.class} - #{error.message}"
# Log error, retry, or handle gracefully

puts "Error backtrace: "
pp error.backtrace
# As any Exception contains the backtrace(https://docs.ruby-lang.org/en/master/Exception.html#method-i-backtrace),
# for further debugging, you can refer to backtrace.
# for further debugging, you can look into backtrace.
else
puts "Success: #{data}"
end
Expand Down Expand Up @@ -374,7 +379,7 @@ RapidFlow::Batch.new({ fn: lambda1, workers: 2 }, { fn: lambda2, workers: 2 })
For the best throughput, workers should be assigned based on the I/O-bound workload of each stage:

```ruby
# ❌ Same number of workers even though stages have different I/O duration
# ❌ Same number of workers even though stages have different I/O load
RapidFlow::Batch.build do
stage ->(x) { sleep(10); x }, workers: 4 # 10 seconds - SLOW! (Assume a heavy or long-running I/O task)
stage ->(x) { sleep(0.1); x }, workers: 4 # 0.1 seconds - fast
Expand All @@ -393,7 +398,7 @@ end

### Memory Considerations

- Each queue can grow unbounded - don't push millions of items without consuming results
- Each queue can grow unboundeddon't push millions of items without consuming results
- Workers hold items in memory during processing
- Memory usage ≈ (items in queues + items being processed) × item size

Expand Down