Skip to content

siqueiraa/data_feeder

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

51 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

πŸš€ Cryptocurrency Data Feeder

Build Status License Rust Version PRs Welcome

A high-performance, actor-based cryptocurrency data pipeline built in Rust for real-time market analysis and historical data processing.

🎯 Overview

The Cryptocurrency Data Feeder is a robust, production-ready system designed to collect, process, and analyze cryptocurrency market data from Binance. Built with Rust's performance and safety guarantees, it provides real-time technical analysis indicators and comprehensive historical data management.

Perfect for:

  • πŸ“Š Quantitative trading and research
  • πŸ“ˆ Real-time market monitoring and alerts
  • πŸ” Technical analysis and backtesting
  • πŸ“‰ Market data infrastructure for trading systems

✨ Key Features

πŸ—οΈ Actor-Based Architecture

  • Concurrent Processing: Kameo framework for fault-tolerant, concurrent operations
  • Modular Design: Independent actors for historical data, WebSocket streaming, technical analysis
  • Scalable: Handle multiple symbols and timeframes simultaneously

πŸ“Š Smart Data Management

  • Intelligent Path Selection: Automatically chooses optimal data sources (monthly vs daily)
  • Gap Detection: Automatically identifies and fills missing data periods, including post-reconnection gap detection
  • LMDB Storage: Lightning-fast memory-mapped database for primary storage
  • PostgreSQL Support: Optional relational database integration
  • Data Integrity: SHA256 checksums and certified range tracking

πŸ“‘ Real-time Processing

  • WebSocket Streaming: Live market data from Binance Futures
  • Multi-timeframe Analysis: 1m, 5m, 15m, 1h, 4h technical indicators
  • Kafka Integration: Real-time publishing of analysis results
  • Auto-reconnection: Robust connection management with automatic post-reconnection gap detection and recovery

🎯 Technical Analysis

  • EMA Indicators: 21 and 89-period exponential moving averages
  • Trend Analysis: Multi-timeframe trend detection using EMA crossovers
  • Volume Analysis: Maximum volume tracking with trend correlation
  • Volume Profiles: Daily volume distribution analysis with POC, VWAP, and value area calculation
  • Volume Profile Reprocessing: Intelligent reprocessing with three modes:
    • missing_days_only (default): Gap detection and targeted reprocessing
    • today_only: Current day reprocessing for quick updates
    • reprocess_whole_history: Complete historical data reprocessing
  • Real-time Alerts: Instant indicator updates via Kafka

πŸ”§ Production Ready

  • Zero Warnings Policy: Clean, maintainable codebase
  • Comprehensive Testing: Unit, integration, and end-to-end tests
  • Performance Optimized: 10-100x faster than equivalent Python implementations
  • Error Resilience: Production-grade error handling and recovery

πŸ›οΈ Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Binance API   β”‚    β”‚   WebSocket      β”‚    β”‚   Kafka Broker  β”‚
β”‚  (Historical)   β”‚    β”‚ (Real-time Data) β”‚    β”‚  (Indicators)   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
          β”‚                      β”‚                       β”‚
          β–Ό                      β–Ό                       β–²
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Historical      β”‚    β”‚ WebSocket        β”‚    β”‚ Kafka           β”‚
β”‚ Actor           β”‚    β”‚ Actor            β”‚    β”‚ Actor           β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
          β”‚                      β”‚                       β”‚
          β–Ό                      β–Ό                       β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”
β”‚                    TimeFrame Actor                      β”‚       β”‚
β”‚            (Aggregates to multiple timeframes)         β”‚       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚
                          β–Ό                                       β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”
β”‚                   Indicator Actor                               β”‚ β”‚
β”‚      (EMA, Trend Analysis, Volume Tracking)                    β”‚ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”˜
                          β”‚                                       β”‚
                          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                          
                          β–Ό
                β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                β”‚      LMDB       β”‚    β”‚   PostgreSQL    β”‚
                β”‚   (Primary)     β”‚    β”‚   (Optional)    β”‚
                β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸŽ›οΈ Feature Flags

This application uses Rust feature flags to enable/disable functionality at compile time, allowing you to create lightweight binaries with only the components you need.

Available Features

Feature Description Default Dependencies
kafka Real-time indicator publishing to Kafka ❌ librdkafka
postgres PostgreSQL database integration ❌ PostgreSQL client libs
volume_profile Daily volume profile analysis ❌ None
volume_profile_reprocessing Volume profile reprocessing utilities ❌ volume_profile

Feature Combinations

# Minimal build (LMDB storage only)
cargo build --release --no-default-features

# Kafka integration only  
cargo build --release --no-default-features --features="kafka"

# Full featured build
cargo build --release --features="kafka,postgres,volume_profile,volume_profile_reprocessing"

# PostgreSQL + Volume Profile + Reprocessing (no Kafka)
cargo build --release --no-default-features --features="postgres,volume_profile,volume_profile_reprocessing"

# Volume Profile with Reprocessing only
cargo build --release --no-default-features --features="volume_profile,volume_profile_reprocessing"

Configuration Validation

The application performs comprehensive validation to ensure your config.toml matches the enabled features and contains valid values:

βœ… Enhanced Validation Features

  • Feature Dependency Checking: Validates that required feature dependencies are enabled
  • Configuration Sanitization: Automatically removes sections for disabled features with warnings
  • Value Validation: Checks that enabled features have valid configuration values
  • Migration Warnings: Alerts for potentially problematic feature combinations
  • Detailed Error Messages: Provides specific remediation steps for configuration issues

πŸ” Validation Types

Feature Alignment Validation:

  • ❌ Error: Feature enabled in config but not compiled in Cargo.toml
  • ⚠️ Warning: Configuration section present for disabled feature (automatically removed)
  • βœ… Success: Configuration matches compiled features

Feature Dependency Validation:

  • ❌ Error: volume_profile_reprocessing enabled without volume_profile feature
  • βœ… Success: All feature dependencies satisfied

Configuration Value Validation:

  • ❌ Error: PostgreSQL enabled but host/database/username empty
  • ❌ Error: Kafka enabled but no bootstrap servers configured
  • ⚠️ Warning: Volume profile target price levels < 10 (may reduce accuracy)

πŸ› οΈ Common Configuration Errors & Fixes

Error Fix
"PostgreSQL is enabled but host is empty" Set database.host = "localhost"
"Kafka is enabled but no bootstrap servers are configured" Set kafka.bootstrap_servers = ["localhost:9092"]
"Feature 'volume_profile_reprocessing' requires 'volume_profile'" Add "volume_profile" to Cargo.toml features
"Volume profile historical_days cannot be 0" Set volume_profile.historical_days = 30

πŸ“‹ Configuration Checklist

Before running the application:

  1. Enable Features: Add required features to Cargo.toml
[features]
default = []
postgres = ["tokio-postgres", "deadpool-postgres"]  
kafka = ["rdkafka"]
volume_profile = []
volume_profile_reprocessing = ["volume_profile"]
  1. Configure Sections: Only configure sections for enabled features
  2. Validate Settings: Ensure all required fields have valid values
  3. Test Configuration: Run with --check-config flag to validate without starting

πŸš€ Quick Start

Prerequisites

  • Rust: 1.70+ (install from rustup.rs)
  • System Requirements:
    • 4GB+ RAM (recommended 8GB)
    • 10GB+ disk space for historical data
    • Network access to Binance API

Installation

  1. Clone the repository:
git clone https://github.com/siqueiraa/data_feeder.git
cd data_feeder
  1. Build the project:
# Choose your build based on needed features:

# Minimal build (LMDB only)
cargo build --release --no-default-features

# With Kafka support
cargo build --release --no-default-features --features="kafka"  

# Full featured (recommended for development)
cargo build --release --features="kafka,postgres,volume_profile"
  1. Copy and configure settings:
cp config.toml.example config.toml
# Edit config.toml with your settings
  1. Run the application:
cargo run --release

Basic Configuration

Edit config.toml to customize your setup:

[application]
symbols = ["BTCUSDT", "ETHUSDT"]  # Trading pairs to monitor
timeframes = [60, 300, 900, 3600, 14400]  # 1m, 5m, 15m, 1h, 4h
enable_technical_analysis = true

# Kafka section (only processed if 'kafka' feature enabled)
[kafka]
enabled = true  # Enable real-time indicator publishing
bootstrap_servers = ["your-kafka-broker:9092"]
topic_prefix = "ta_"  # Results in "ta_data" topic

# PostgreSQL section (only processed if 'postgres' feature enabled)
[database]
enabled = false  # Optional PostgreSQL integration

# Volume Profile section (only processed if 'volume_profile' feature enabled)
[volume_profile]
enabled = true
historical_days = 60

πŸ“Š Data Output

Kafka Message Format

Technical analysis indicators are published to Kafka in JSON format:

{
  "symbol": "BTCUSDT",
  "timestamp": 1674123456789,
  "close_5m": 23450.50,
  "close_15m": 23455.25,
  "close_60m": 23460.75,
  "close_4h": 23470.00,
  "ema21_1min": 23445.30,
  "ema89_1min": 23440.15,
  "ema89_5min": 23435.80,
  "ema89_15min": 23430.45,
  "ema89_1h": 23425.20,
  "ema89_4h": 23420.10,
  "trend_1min": "Buy",
  "trend_5min": "Neutral",
  "trend_15min": "Sell",
  "trend_1h": "Buy",
  "trend_4h": "Neutral",
  "max_volume": 1250000.0,
  "max_volume_price": 23445.30,
  "max_volume_time": "2023-01-19T14:30:00Z",
  "max_volume_trend": "Buy"
}

Message Routing

  • Topic: {topic_prefix}data (e.g., ta_data)
  • Key: Symbol name (e.g., BTCUSDT) for partitioning
  • Timestamp: 1-minute candle close time in milliseconds

πŸ› οΈ Development

Project Structure

src/
β”œβ”€β”€ main.rs                    # Application entry point
β”œβ”€β”€ historical/                # Historical data processing
β”‚   β”œβ”€β”€ actor.rs              # Main historical data actor
β”‚   β”œβ”€β”€ utils.rs              # Data processing utilities
β”‚   └── structs.rs            # Data structures
β”œβ”€β”€ websocket/                 # Real-time data streaming
β”‚   β”œβ”€β”€ actor.rs              # WebSocket connection manager
β”‚   β”œβ”€β”€ connection.rs         # Connection handling
β”‚   └── binance/              # Binance-specific implementations
β”œβ”€β”€ technical_analysis/        # Technical indicators
β”‚   β”œβ”€β”€ actors/               # Analysis actors
β”‚   β”œβ”€β”€ structs.rs            # Indicator data structures
β”‚   └── utils.rs              # Calculation utilities
β”œβ”€β”€ kafka/                     # Kafka integration
β”‚   β”œβ”€β”€ actor.rs              # Kafka producer actor
β”‚   └── errors.rs             # Error handling
└── postgres/                  # PostgreSQL integration
    β”œβ”€β”€ actor.rs              # Database actor
    └── errors.rs             # Database errors

Building from Source

# Development build
cargo build

# Release build (optimized)
cargo build --release

# Run tests
cargo test

# Run with logging
RUST_LOG=info cargo run

# Check code quality
cargo check
cargo clippy

# Test feature combinations
./test_feature_combinations.sh

# Build specific feature combinations
cargo check --no-default-features --features="kafka"
cargo check --features="kafka,postgres,volume_profile"

Running Tests

# All tests
cargo test

# Integration tests only
cargo test --test kafka_integration_test

# End-to-end tests
cargo test --test e2e_kafka_test

# With output
cargo test -- --nocapture

⚑ Performance

Benchmarks

  • Historical Data Processing: 10,000+ candles/second
  • Real-time Processing: <1ms latency for indicator updates
  • Memory Usage: ~50MB base + ~10MB per symbol
  • Kafka Throughput: 5,000+ messages/second

Optimization Tips

  1. Increase Batch Sizes: For historical processing, use larger batch sizes
  2. Tune LMDB: Adjust map sizes based on data volume
  3. Kafka Configuration: Optimize producer settings for throughput
  4. Symbol Limits: Monitor memory usage with large symbol sets

πŸ”§ Configuration Reference

Application Settings

Setting Default Description
symbols ["BTCUSDT"] Trading pairs to process
timeframes [60] Timeframes in seconds
storage_path "lmdb_data" Local storage directory
enable_technical_analysis true Enable indicator calculation

Kafka Settings

Setting Default Description
enabled false Enable Kafka publishing
bootstrap_servers ["localhost:9092"] Kafka brokers
topic_prefix "ta_" Topic name prefix
acks "all" Acknowledgment level

Technical Analysis Settings

Setting Default Description
min_history_days 60 Minimum history for indicators
ema_periods [21, 89] EMA periods to calculate
volume_lookback_days 60 Volume analysis window

Volume Profile Settings

Setting Default Description
enabled true Enable volume profile calculation
price_increment_mode "Fixed" Price bucketing mode: "Fixed" or "Adaptive"
fixed_price_increment 0.01 Fixed price increment when mode is "Fixed"
min_price_increment 0.001 Minimum price increment for "Adaptive" mode
max_price_increment 1.0 Maximum price increment for "Adaptive" mode
update_frequency "EveryCandle" Update frequency: "EveryCandle", "Every5Candles", or "Every10Candles"
batch_size 1 Number of profiles to batch before storage
value_area_percentage 70.0 Percentage of volume to include in value area calculation

❗ Troubleshooting

Common Issues

Feature Flag Configuration Errors:

# Error: Kafka enabled in config but feature disabled
# Solution: Either disable in config or rebuild with kafka feature
cargo build --release --features="kafka"

# Error: Missing dependencies for features
# Solution: Install required system dependencies
# For Kafka: install librdkafka-dev
# For PostgreSQL: install libpq-dev

Connection Errors:

# Check network connectivity
curl -I https://fapi.binance.com/fapi/v1/ping

# Verify Kafka broker
telnet your-kafka-broker 9092

High Memory Usage:

  • Reduce number of symbols
  • Decrease volume_lookback_days
  • Adjust LMDB map sizes

Missing Data:

  • Check gap detection logs
  • Verify Binance API limits
  • Review start_date configuration

Logging

# Enable debug logging
RUST_LOG=debug cargo run

# Module-specific logging
RUST_LOG=data_feeder::kafka=debug cargo run

# Save logs to file
RUST_LOG=info cargo run 2>&1 | tee app.log

🀝 Contributing

We welcome contributions! Please see our Contributing Guidelines for details.

Quick Contribution Guide

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes
  4. Run tests (cargo test)
  5. Commit changes (git commit -m 'Add amazing feature')
  6. Push to branch (git push origin feature/amazing-feature)
  7. Open a Pull Request

Code Standards

  • Zero Warnings: cargo check must show no warnings
  • Testing: Add tests for new features
  • Documentation: Update docs for public APIs
  • Performance: Consider performance impact of changes

πŸ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

πŸ™ Acknowledgments


⭐ If this project helps you, please give it a star!

For questions, issues, or feature requests, please open an issue.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages