DefInd is a high-performance, Python-native DeFi log fetcher designed for efficient extraction and processing of Ethereum blockchain event data. Built specifically for DeFi protocols, it provides resumable, concurrent data fetching with dynamic column projections and universal storage format.
- π High Performance: Async/concurrent fetching with configurable parallelism
- π Resumable Operations: Live manifest system for fault-tolerant data collection
- π Dynamic Projections: Any registry key automatically becomes a column
- πΎ Universal Storage: Efficient Parquet-based sharding with bounded memory
- π― DeFi-Optimized: Pre-built support for CL pools, Gauge, and VFAT events
- β‘ Smart Filtering: Fast zero-word filtering and automatic interval splitting
- π‘οΈ Robust Error Handling: Automatic retries and graceful failure recovery
defind/
βββ core/ # Data models and constants
βββ clients/ # RPC client for Ethereum nodes
βββ decoding/ # Event decoding with dynamic projections
βββ storage/ # Universal shards and live manifests
βββ orchestration/ # Streaming fetch-decode-write pipeline
- EventLog: Raw RPC log records
- UniversalDynColumns: Dynamic columnar buffer for any projection
- EventRegistry: Configurable event decoding specifications
- ShardAggregator: Bounded-memory Parquet writer
- LiveManifest: Resumable operation tracking
# Clone the repository
git clone <repository-url>
cd define
# Install dependencies
pip install -e .Refer to scripts in the examples/ directory.
The easiest way of building an event registry is to create it from an ABI file:
from defind.abi_events import make_event_registry_from_abi
ABI = EXAMPLES_ROOT / "abi" / "aerodrome_clpool_abi.json"
registry = make_event_registry_from_abi(ABI)DefInd comes with pre-built registries for common DeFi protocols:
from defind.decoding.registries import make_clpool_registry
registry = make_clpool_registry()
# Supports: Mint, Burn, Collect, CollectFees eventsfrom defind.decoding.registries import make_gauge_registry
registry = make_gauge_registry()
# Supports: Deposit, Withdraw, ClaimRewards eventsfrom defind.decoding.registries import make_vfat_registry
registry = make_vfat_registry()
# Supports: Deploy and other VFAT-specific eventsfrom defind.decoding.specs import EventSpec, TopicFieldSpec, DataFieldSpec
# Define custom event specification
custom_spec = EventSpec(
topic0="0x...",
name="CustomEvent",
topic_fields=[
TopicFieldSpec("user", 1, "address"),
TopicFieldSpec("amount", 2, "uint256"),
],
data_fields=[
DataFieldSpec("timestamp", 0, "uint256"),
],
projection={
"user_address": ProjectionRefs.TopicRef(name="user"), # Output column "user_address" will be filled with "user" indexed event input
"token_amount": ProjectionRefs.TopicRef(name="amount"),# Output column t"oken_amount" will be filled with "amount" indexed event input
"event_time": ProjectionRefs.DataRef(name="timestamp"), # Output column "event_time" will be filled with "timestamp" unindexed event input
"custom_field": ProjectionRefs.Constant(value="my_custom_value"), # Output column "custom_field" will be filled with constant "my_custom_value" for all "CustomEvent"
}
)rows_per_shard: Number of rows per Parquet file (default: 250,000)batch_decode_rows: Batch size for decoding (default: 50,000)concurrency: Max parallel RPC requests (default: 16)
# High-throughput configuration
result = await fetch_decode(
# ... other params
rows_per_shard=500_000, # Larger shards
batch_decode_rows=100_000, # Larger batches
concurrency=32, # More parallelism
)DefInd produces Parquet files with a universal schema:
block_number: Block number (int64)block_timestamp: Block timestamp (int64)tx_hash: Transaction hash (string)log_index: Log index within transaction (int32)contract: Contract address (string)event: Event name (string)
Any key in your event registry's projection mapping becomes a column automatically, stored as strings for safety with large integers.
src/defind/
βββ __init__.py # Public API exports
βββ core/
β βββ models.py # Core data models
β βββ constants.py # Event topic constants
βββ clients/
β βββ rpc.py # Ethereum RPC client
βββ decoding/
β βββ specs.py # Event specifications
β βββ decoder.py # Generic event decoder
β βββ registries.py # Pre-built registries
β βββ utils.py # Decoding utilities
βββ storage/
β βββ shards.py # Universal shard writer
β βββ manifest.py # Live manifest system
βββ orchestration/
βββ orchestrator.py # Main streaming pipeline
βββ utils.py # Block range utilities
- httpx: Async HTTP client for RPC calls
- pyarrow: Columnar data processing and Parquet I/O
- pandas: Data manipulation and analysis
- eth-abi: Ethereum ABI encoding/decoding
- eth-utils: Ethereum utility functions
- rich: Beautiful terminal output
- click: CLI framework
DefInd is optimized for high-throughput data extraction:
- Concurrent Processing: Configurable parallelism for RPC requests
- Bounded Memory: Streaming processing with configurable batch sizes
- Efficient Storage: Columnar Parquet format with compression
- Smart Resumption: Skip already-processed ranges automatically
- Fast Filtering: Early zero-word detection to skip empty events
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
Built for the DeFi community with β€οΈ by youssefGha98