A high-performance, actor-based cryptocurrency data pipeline built in Rust for real-time market analysis and historical data processing.
The Cryptocurrency Data Feeder is a robust, production-ready system designed to collect, process, and analyze cryptocurrency market data from Binance. Built with Rust's performance and safety guarantees, it provides real-time technical analysis indicators and comprehensive historical data management.
Perfect for:
- π Quantitative trading and research
- π Real-time market monitoring and alerts
- π Technical analysis and backtesting
- π Market data infrastructure for trading systems
- Concurrent Processing: Kameo framework for fault-tolerant, concurrent operations
- Modular Design: Independent actors for historical data, WebSocket streaming, technical analysis
- Scalable: Handle multiple symbols and timeframes simultaneously
- Intelligent Path Selection: Automatically chooses optimal data sources (monthly vs daily)
- Gap Detection: Automatically identifies and fills missing data periods, including post-reconnection gap detection
- LMDB Storage: Lightning-fast memory-mapped database for primary storage
- PostgreSQL Support: Optional relational database integration
- Data Integrity: SHA256 checksums and certified range tracking
- WebSocket Streaming: Live market data from Binance Futures
- Multi-timeframe Analysis: 1m, 5m, 15m, 1h, 4h technical indicators
- Kafka Integration: Real-time publishing of analysis results
- Auto-reconnection: Robust connection management with automatic post-reconnection gap detection and recovery
- EMA Indicators: 21 and 89-period exponential moving averages
- Trend Analysis: Multi-timeframe trend detection using EMA crossovers
- Volume Analysis: Maximum volume tracking with trend correlation
- Volume Profiles: Daily volume distribution analysis with POC, VWAP, and value area calculation
- Volume Profile Reprocessing: Intelligent reprocessing with three modes:
missing_days_only(default): Gap detection and targeted reprocessingtoday_only: Current day reprocessing for quick updatesreprocess_whole_history: Complete historical data reprocessing
- Real-time Alerts: Instant indicator updates via Kafka
- Zero Warnings Policy: Clean, maintainable codebase
- Comprehensive Testing: Unit, integration, and end-to-end tests
- Performance Optimized: 10-100x faster than equivalent Python implementations
- Error Resilience: Production-grade error handling and recovery
βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
β Binance API β β WebSocket β β Kafka Broker β
β (Historical) β β (Real-time Data) β β (Indicators) β
βββββββββββ¬ββββββββ βββββββββββ¬βββββββββ βββββββββββ¬ββββββββ
β β β
βΌ βΌ β²
βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ
β Historical β β WebSocket β β Kafka β
β Actor β β Actor β β Actor β
βββββββββββ¬ββββββββ βββββββββββ¬βββββββββ βββββββββββ¬ββββββββ
β β β
βΌ βΌ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββΌββββββββ
β TimeFrame Actor β β
β (Aggregates to multiple timeframes) β β
βββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββ β
βΌ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββΌββ
β Indicator Actor β β
β (EMA, Trend Analysis, Volume Tracking) β β
βββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββΌββ
β β
βββββββββββββββββββββββββββββββββββββββββ
βΌ
βββββββββββββββββββ βββββββββββββββββββ
β LMDB β β PostgreSQL β
β (Primary) β β (Optional) β
βββββββββββββββββββ βββββββββββββββββββ
This application uses Rust feature flags to enable/disable functionality at compile time, allowing you to create lightweight binaries with only the components you need.
| Feature | Description | Default | Dependencies |
|---|---|---|---|
kafka |
Real-time indicator publishing to Kafka | β | librdkafka |
postgres |
PostgreSQL database integration | β | PostgreSQL client libs |
volume_profile |
Daily volume profile analysis | β | None |
volume_profile_reprocessing |
Volume profile reprocessing utilities | β | volume_profile |
# Minimal build (LMDB storage only)
cargo build --release --no-default-features
# Kafka integration only
cargo build --release --no-default-features --features="kafka"
# Full featured build
cargo build --release --features="kafka,postgres,volume_profile,volume_profile_reprocessing"
# PostgreSQL + Volume Profile + Reprocessing (no Kafka)
cargo build --release --no-default-features --features="postgres,volume_profile,volume_profile_reprocessing"
# Volume Profile with Reprocessing only
cargo build --release --no-default-features --features="volume_profile,volume_profile_reprocessing"The application performs comprehensive validation to ensure your config.toml matches the enabled features and contains valid values:
- Feature Dependency Checking: Validates that required feature dependencies are enabled
- Configuration Sanitization: Automatically removes sections for disabled features with warnings
- Value Validation: Checks that enabled features have valid configuration values
- Migration Warnings: Alerts for potentially problematic feature combinations
- Detailed Error Messages: Provides specific remediation steps for configuration issues
Feature Alignment Validation:
- β Error: Feature enabled in config but not compiled in Cargo.toml
β οΈ Warning: Configuration section present for disabled feature (automatically removed)- β Success: Configuration matches compiled features
Feature Dependency Validation:
- β Error:
volume_profile_reprocessingenabled withoutvolume_profilefeature - β Success: All feature dependencies satisfied
Configuration Value Validation:
- β Error: PostgreSQL enabled but host/database/username empty
- β Error: Kafka enabled but no bootstrap servers configured
β οΈ Warning: Volume profile target price levels < 10 (may reduce accuracy)
| Error | Fix |
|---|---|
| "PostgreSQL is enabled but host is empty" | Set database.host = "localhost" |
| "Kafka is enabled but no bootstrap servers are configured" | Set kafka.bootstrap_servers = ["localhost:9092"] |
| "Feature 'volume_profile_reprocessing' requires 'volume_profile'" | Add "volume_profile" to Cargo.toml features |
| "Volume profile historical_days cannot be 0" | Set volume_profile.historical_days = 30 |
Before running the application:
- Enable Features: Add required features to
Cargo.toml
[features]
default = []
postgres = ["tokio-postgres", "deadpool-postgres"]
kafka = ["rdkafka"]
volume_profile = []
volume_profile_reprocessing = ["volume_profile"]- Configure Sections: Only configure sections for enabled features
- Validate Settings: Ensure all required fields have valid values
- Test Configuration: Run with
--check-configflag to validate without starting
- Rust: 1.70+ (install from rustup.rs)
- System Requirements:
- 4GB+ RAM (recommended 8GB)
- 10GB+ disk space for historical data
- Network access to Binance API
- Clone the repository:
git clone https://github.com/siqueiraa/data_feeder.git
cd data_feeder- Build the project:
# Choose your build based on needed features:
# Minimal build (LMDB only)
cargo build --release --no-default-features
# With Kafka support
cargo build --release --no-default-features --features="kafka"
# Full featured (recommended for development)
cargo build --release --features="kafka,postgres,volume_profile"- Copy and configure settings:
cp config.toml.example config.toml
# Edit config.toml with your settings- Run the application:
cargo run --releaseEdit config.toml to customize your setup:
[application]
symbols = ["BTCUSDT", "ETHUSDT"] # Trading pairs to monitor
timeframes = [60, 300, 900, 3600, 14400] # 1m, 5m, 15m, 1h, 4h
enable_technical_analysis = true
# Kafka section (only processed if 'kafka' feature enabled)
[kafka]
enabled = true # Enable real-time indicator publishing
bootstrap_servers = ["your-kafka-broker:9092"]
topic_prefix = "ta_" # Results in "ta_data" topic
# PostgreSQL section (only processed if 'postgres' feature enabled)
[database]
enabled = false # Optional PostgreSQL integration
# Volume Profile section (only processed if 'volume_profile' feature enabled)
[volume_profile]
enabled = true
historical_days = 60Technical analysis indicators are published to Kafka in JSON format:
{
"symbol": "BTCUSDT",
"timestamp": 1674123456789,
"close_5m": 23450.50,
"close_15m": 23455.25,
"close_60m": 23460.75,
"close_4h": 23470.00,
"ema21_1min": 23445.30,
"ema89_1min": 23440.15,
"ema89_5min": 23435.80,
"ema89_15min": 23430.45,
"ema89_1h": 23425.20,
"ema89_4h": 23420.10,
"trend_1min": "Buy",
"trend_5min": "Neutral",
"trend_15min": "Sell",
"trend_1h": "Buy",
"trend_4h": "Neutral",
"max_volume": 1250000.0,
"max_volume_price": 23445.30,
"max_volume_time": "2023-01-19T14:30:00Z",
"max_volume_trend": "Buy"
}- Topic:
{topic_prefix}data(e.g.,ta_data) - Key: Symbol name (e.g.,
BTCUSDT) for partitioning - Timestamp: 1-minute candle close time in milliseconds
src/
βββ main.rs # Application entry point
βββ historical/ # Historical data processing
β βββ actor.rs # Main historical data actor
β βββ utils.rs # Data processing utilities
β βββ structs.rs # Data structures
βββ websocket/ # Real-time data streaming
β βββ actor.rs # WebSocket connection manager
β βββ connection.rs # Connection handling
β βββ binance/ # Binance-specific implementations
βββ technical_analysis/ # Technical indicators
β βββ actors/ # Analysis actors
β βββ structs.rs # Indicator data structures
β βββ utils.rs # Calculation utilities
βββ kafka/ # Kafka integration
β βββ actor.rs # Kafka producer actor
β βββ errors.rs # Error handling
βββ postgres/ # PostgreSQL integration
βββ actor.rs # Database actor
βββ errors.rs # Database errors
# Development build
cargo build
# Release build (optimized)
cargo build --release
# Run tests
cargo test
# Run with logging
RUST_LOG=info cargo run
# Check code quality
cargo check
cargo clippy
# Test feature combinations
./test_feature_combinations.sh
# Build specific feature combinations
cargo check --no-default-features --features="kafka"
cargo check --features="kafka,postgres,volume_profile"# All tests
cargo test
# Integration tests only
cargo test --test kafka_integration_test
# End-to-end tests
cargo test --test e2e_kafka_test
# With output
cargo test -- --nocapture- Historical Data Processing: 10,000+ candles/second
- Real-time Processing: <1ms latency for indicator updates
- Memory Usage: ~50MB base + ~10MB per symbol
- Kafka Throughput: 5,000+ messages/second
- Increase Batch Sizes: For historical processing, use larger batch sizes
- Tune LMDB: Adjust map sizes based on data volume
- Kafka Configuration: Optimize producer settings for throughput
- Symbol Limits: Monitor memory usage with large symbol sets
| Setting | Default | Description |
|---|---|---|
symbols |
["BTCUSDT"] |
Trading pairs to process |
timeframes |
[60] |
Timeframes in seconds |
storage_path |
"lmdb_data" |
Local storage directory |
enable_technical_analysis |
true |
Enable indicator calculation |
| Setting | Default | Description |
|---|---|---|
enabled |
false |
Enable Kafka publishing |
bootstrap_servers |
["localhost:9092"] |
Kafka brokers |
topic_prefix |
"ta_" |
Topic name prefix |
acks |
"all" |
Acknowledgment level |
| Setting | Default | Description |
|---|---|---|
min_history_days |
60 |
Minimum history for indicators |
ema_periods |
[21, 89] |
EMA periods to calculate |
volume_lookback_days |
60 |
Volume analysis window |
| Setting | Default | Description |
|---|---|---|
enabled |
true |
Enable volume profile calculation |
price_increment_mode |
"Fixed" |
Price bucketing mode: "Fixed" or "Adaptive" |
fixed_price_increment |
0.01 |
Fixed price increment when mode is "Fixed" |
min_price_increment |
0.001 |
Minimum price increment for "Adaptive" mode |
max_price_increment |
1.0 |
Maximum price increment for "Adaptive" mode |
update_frequency |
"EveryCandle" |
Update frequency: "EveryCandle", "Every5Candles", or "Every10Candles" |
batch_size |
1 |
Number of profiles to batch before storage |
value_area_percentage |
70.0 |
Percentage of volume to include in value area calculation |
Feature Flag Configuration Errors:
# Error: Kafka enabled in config but feature disabled
# Solution: Either disable in config or rebuild with kafka feature
cargo build --release --features="kafka"
# Error: Missing dependencies for features
# Solution: Install required system dependencies
# For Kafka: install librdkafka-dev
# For PostgreSQL: install libpq-devConnection Errors:
# Check network connectivity
curl -I https://fapi.binance.com/fapi/v1/ping
# Verify Kafka broker
telnet your-kafka-broker 9092High Memory Usage:
- Reduce number of symbols
- Decrease
volume_lookback_days - Adjust LMDB map sizes
Missing Data:
- Check gap detection logs
- Verify Binance API limits
- Review start_date configuration
# Enable debug logging
RUST_LOG=debug cargo run
# Module-specific logging
RUST_LOG=data_feeder::kafka=debug cargo run
# Save logs to file
RUST_LOG=info cargo run 2>&1 | tee app.logWe welcome contributions! Please see our Contributing Guidelines for details.
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes
- Run tests (
cargo test) - Commit changes (
git commit -m 'Add amazing feature') - Push to branch (
git push origin feature/amazing-feature) - Open a Pull Request
- Zero Warnings:
cargo checkmust show no warnings - Testing: Add tests for new features
- Documentation: Update docs for public APIs
- Performance: Consider performance impact of changes
This project is licensed under the MIT License - see the LICENSE file for details.
- Binance API for market data
- Kameo for the actor framework
- rdkafka for Kafka integration
- LMDB for high-performance storage
β If this project helps you, please give it a star!
For questions, issues, or feature requests, please open an issue.