Builds a minimal reference data pipeline that keeps a bi-temporal history of change. Shows how to track both event/valid time (when a fact was true) and knowledge/transaction time (when we learned about the fact) by applying Slowly Changing Dimension Type 2 (SCD2) rules for idempotent merges into SQLite, keeping history instead of overwriting rows. Query as-of snapshots to recover what the system believed at any point in time, e.g. after backfills or corrections. Guards against duplicate processing with an event time watermark. Workflow orchestrated as extract → transform → load stages with Apache Airflow (Local Executor) or via CLI runner.
Data source: Synthetic CSV (data/synthetic_refdata.csv) with a few securities out of order to show late arrivals.
Pipeline orchestration: dags/reference_data_pipeline.py defines three Airflow tasks (extract, transform, load) that run sequentially under the Local Executor.
Merge engine: bitemporal/merge_logic.py wraps SQLite transactions to enforce SCD2 semantics, including overlap checks knowledge timeline updates and watermark persistence.
Utilities: A CLI runner (scripts/run_pipeline.py) for quick demos and a snapshot query helper (scripts/query_as_of.py).
Tests: Pytest to verify merge behaviour, as of queries and watermark controls.
reference-data-bitemporal-demo/
├── bitemporal/ # Merge logic package (initialise DB merge, query)
├── dags/ # Airflow DAG orchestrating the ETL pipeline
├── data/ # Synthetic CSV data
├── scripts/ # CLI helpers (pipeline runner, as-of query)
├── tests/ # Pytest suites covering core behaviours
├── pyproject.toml # Project metadata & optional dependencies
└── requirements.txt # Dev/test dependency pin stub
Requirements:
- Python 3.9+
- Optional: Apache Airflow ≥ 2.9 (install with
pip install -e .[airflow])
Setup steps:
cd reference-data-bitemporal-demo
python -m venv .venv && source .venv/bin/activate
pip install -e .[dev]This installs pytest and optionally Airflow with the Local Executor.
The quickest way to see the merge logic in action is via the CLI runner:
cd reference-data-bitemporal-demo
python scripts/run_pipeline.pyThis will read data/synthetic_refdata.csv, apply a consistent knowledge timestamp, and merge everything into reference_data.db. Can be rerun to observe idempotency (no duplicate rows are created).
Add the --allow-late flag to process records whose event times are older than the stored watermark:
python scripts/run_pipeline.py --allow-lateThis simulates late-arriving corrections and triggers the backfill branch of the merge algorithm.
- Install Airflow and configure the Local Executor. A minimal local setup can rely on the default SQLite metadata database.
- Place
dags/reference_data_pipeline.pyin your Airflowdags/folder (or mount this repository). - Ensure
reference-data-bitemporal-demois available on the Airflow workerPYTHONPATH(e.g., by installing it withpip install -e .). - Start the Airflow scheduler and trigger the
reference_data_ingestionDAG via the UI or CLI.
The DAG exposes an optional allow_late run configuration flag so you can trigger backfills on demand (airflow dags trigger reference_data_ingestion -c '{"allow_late": true}').
Use the provided helper to inspect the database state as it was known on any knowledge date:
python scripts/query_as_of.py 2025-03-01T00:00:00ZAdd --effective-time to answer questions like “as of 2025‑04‑10, what do we
now believe the data was on 2025‑02‑20?”
python scripts/query_as_of.py 2025-04-10T00:00:00Z --effective-time 2025-02-20T00:00:00ZThe underlying SQL filters on both knowledge and valid timelines:
All critical behaviours come with pytest coverage. Run from project root:
cd reference-data-bitemporal-demo
python -m pytestThe tests cover: New inserts vs. updates (including corrections at the same event boundary), backfills that split existing timelines, as of knowledge/effective date queries, and atermark enforcement and deliberate overrides
The SQLite table reference_data stores:
| Column | Description |
|---|---|
security_id |
Business key of the reference entity |
attributes |
JSON blob of attributes (name, status, etc.) |
event_time |
Original event timestamp for traceability |
valid_from / valid_to |
Valid (event) timeline range |
knowledge_from / knowledge_to |
Knowledge (transaction) timeline range |
is_current |
Convenience flag (1 = still current in both timelines) |
Table pipeline_watermarks keeps the latest processed event time to guard against unintended reprocessing.