This is my personal Data Engineering portfolio project, developed to demonstrate practical skills in building ETL pipelines, dimensional modeling, and implementing modern Data Warehouses.
After several courses and studies on Spark and Medallion architecture, I decided to build my own project from scratch, applying advanced concepts and adding my own implementation vision. A key differentiator of this project is the implementation of SCD Type 2 (Slowly Changing Dimensions) in Spark, a technique rarely seen in educational projects but essential in corporate environments for maintaining dimensional change history.
- Implement complete Medallion architecture (Bronze β Silver β Gold)
- Build Data Warehouse with dimensional model (Star Schema)
- Apply SCD Type 2 for historical change tracking
- Process data with Apache Spark using PySpark
- Orchestrate complete environment with Docker
- Validate data quality across layers
- Query the lakehouse via SQL IDE using Apache Kyuubi
- Demonstrate Data Engineering best practices
- Apache Spark 3.5 - Distributed data processing
- PySpark - Python API for Spark
- MySQL 8.0 - Source database (OLTP)
- Docker & Docker Compose - Containerization and orchestration
- Jupyter Notebook - Exploratory analysis
- Apache Airflow 2.9.1 - Pipeline orchestration
- Apache Kyuubi 1.10.3 - SQL gateway for JDBC/ODBC access to Spark
- DBeaver - SQL IDE connected to the lakehouse via Kyuubi
- Parquet - Columnar storage format
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
β MySQL β βββ> β Bronze β βββ> β Silver β βββ> β Gold β
β (OLTP) β β Raw Data β β Cleaned β β Business β
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
Source Ingestion Validation DW Modeling
β
βΌ
βββββββββββββββββββ
β Apache Kyuubi β
β (SQL Gateway) β
ββββββββββ¬βββββββββ
β JDBC
βΌ
βββββββββββββββββββ
β DBeaver β
β (SQL IDE) β
βββββββββββββββββββ
- Purpose: Ingest raw data from MySQL without transformations
- Format: Parquet
- Characteristics:
- Faithful copy of source data
- Preservation of original data types
- Foundation for future reprocessing
- Tables:
tbcategories,tbproducts,tbcustomers,tborders,tborderdetail
- Purpose: Cleansing, validation, and standardization of data
- Format: Parquet
- Characteristics:
- Duplicate removal
- Null value handling
- Format standardization
- Integrity validations
- Tables: Same as Bronze, but refined
- Purpose: Dimensional modeling for analytics and BI
- Format: Parquet
- Characteristics:
- Star Schema model
- Dimensions with SCD Type 2
- Aggregated fact table
- Optimized for analytical queries
Product categories dimension with change history.
Columns:
category_sk- Surrogate Keycategory_code- Business Key (original code)category_description- Category descriptionvalid_from- Start date of validityvalid_to- End date of validity (NULL = current)is_current- Flag indicating current version
Products dimension with tracking of changes in price, description, status, and category.
Columns:
product_sk- Surrogate Keyproduct_code- Business Keyproduct_description- Product descriptionsale_value- Sale priceis_active- Active/inactive statuscategory_code- FK to categoryvalid_from,valid_to,is_current- SCD2 control
Tracked changes:
- Price changes
- Description updates
- Category modifications
- Product activation/deactivation
Customers dimension with history of registration changes.
Columns:
customer_sk- Surrogate Keycustomer_code- Business Keycustomer_name- Customer namecustomer_address- Addresscustomer_phone- Phone numbercustomer_email- Emailbirth_date- Birth datevalid_from,valid_to,is_current- SCD2 control
Tracked changes:
- Address updates
- Phone/email changes
- Registration updates
Consolidated sales fact table with aggregated metrics.
Columns:
order_code- Order code (Degenerate Dimension)customer_code- FK to dim_customersproduct_code- FK to dim_productsorder_date- Order date/timetotal_quantity- Total quantity soldtotal_sales- Total value (quantity Γ unit_price)line_count- Number of detail lines
Granularity: One row per product per order
Calculated metrics:
- Total revenue per product/order
- Total quantity sold
- Transaction count
SCD Type 2 maintains complete change history in dimensions, enabling accurate temporal analysis.
Practical example:
A product had its price changed:
| product_sk | product_code | description | sale_value | valid_from | valid_to | is_current |
|---|---|---|---|---|---|---|
| 1 | 100 | Gaming Mouse | 150.00 | 2024-01-01 | 2024-06-15 | false |
| 42 | 100 | RGB Gaming Mouse | 180.00 | 2024-06-16 | NULL | true |
Benefits:
- β Historical price analysis
- β Registration change tracking
- β Complete audit trail
- β Point-in-time reporting
SCD2 Algorithm:
- Detect new records and changes
- Expire old versions (set
is_current = false,valid_to = current_date) - Generate new surrogate key for updated versions
- Insert new versions with
is_current = true
Apache Kyuubi is a distributed SQL gateway that exposes Spark via a HiveServer2-compatible JDBC/ODBC interface, enabling any SQL IDE (such as DBeaver) to query the lakehouse directly without replacing the Spark engine.
DBeaver (SQL IDE)
β JDBC (port 10009)
βΌ
Apache Kyuubi
β spark-submit
βΌ
Apache Spark βββΊ Parquet files (Bronze / Silver / Gold)
Kyuubi acts as a gateway β it receives SQL queries from DBeaver, submits them to Spark, and returns the results. Unlike Trino, it does not replace Spark as the execution engine.
kyuubi.engine.spark.master=spark://spark-master:7077
kyuubi.engine.share.level=SERVER
kyuubi.engine.type=SPARK_SQL
spark.home=/opt/kyuubi/externals/spark-3.5.2-bin-hadoop3| Field | Value |
|---|---|
| Driver | Apache Hive 2 |
| Host | localhost |
| Port | 10009 |
| Database | default |
| URL | jdbc:hive2://localhost:10009/default;connectTimeout=60000;socketTimeout=120000 |
| Username / Password | (leave blank) |
Note: On first connection, Kyuubi needs ~15β20 seconds to launch the Spark engine. Increase DBeaver's connection timeout to at least 60 seconds to avoid premature disconnection.
Since the Gold layer uses plain Parquet files (no Hive Metastore), tables must be registered per session before querying:
CREATE DATABASE IF NOT EXISTS bronze;
CREATE DATABASE IF NOT EXISTS silver;
CREATE DATABASE IF NOT EXISTS gold;
CREATE TABLE IF NOT EXISTS gold.dim_categories USING parquet LOCATION '/data/gold/dim_categories';
CREATE TABLE IF NOT EXISTS gold.dim_products USING parquet LOCATION '/data/gold/dim_products';
CREATE TABLE IF NOT EXISTS gold.dim_customers USING parquet LOCATION '/data/gold/dim_customers';
CREATE TABLE IF NOT EXISTS gold.fact_orders USING parquet LOCATION '/data/gold/fact_orders';Note: These registrations live in memory. If the Kyuubi container restarts, re-run the
CREATE TABLEstatements. A persistent Hive Metastore would eliminate this step and is listed as a future improvement.
SELECT
c.customer_name,
p.product_description,
DATE_FORMAT(f.order_date, 'dd-MM-yyyy') AS date,
SUM(f.total_quantity) AS quantity,
ROUND(SUM(f.total_sales), 2) AS sales
FROM gold.fact_orders f
INNER JOIN gold.dim_customers c
ON f.customer_code = c.customer_code
AND c.is_current = true
INNER JOIN gold.dim_products p
ON f.product_code = p.product_code
AND p.is_current = true
GROUP BY c.customer_name, p.product_description, DATE_FORMAT(f.order_date, 'dd-MM-yyyy')
ORDER BY date DESC, sales DESCmyspark/
βββ spark/
β βββ docker-compose.yml
β βββ kyuubi-defaults.conf # Kyuubi configuration
β βββ Dockerfile
β βββ jobs/
β β βββ test_mysql.py
β β βββ bronze_*.py # Bronze ingestion
β β βββ silver_*.py # Silver transformation
β β βββ gold_dim_*_scd2.py # Gold dimensions with SCD2
β β βββ gold_fact_orders.py # Fact table
β β βββ read_gold_*.py # Read scripts
β β βββ generic_query.py # Generic query
β βββ lakehouse/
β β βββ bronze/ # Raw data
β β βββ silver/ # Clean data
β β βββ gold/ # Dimensional model
β βββ work/ # Jupyter notebooks
βββ airflow/
β βββ dags/
β β βββ dag_lakehouse_pipeline.py # Full pipeline DAG
β β βββ dag_test_tbcategories.py # Test DAG (categories only)
β βββ logs/
β βββ plugins/
βββ mysql/
βββ docker-compose.yml
βββ init/
βββ init.sql # Initialization script
- Docker installed
- Docker Compose installed
- 8GB RAM available (recommended)
- DBeaver installed (for SQL IDE access via Kyuubi)
docker network create lakehouse_netInside the spark/ folder, create:
mkdir -p lakehouse/bronze lakehouse/silver lakehouse/goldInside the airflow/ folder, create:
mkdir -p airflow/dags airflow/logs airflow/pluginscd mysql/
docker compose up -d
cd ../spark/
docker compose up -ddocker logs spark_notebookLook for:
[I ########] http://127.0.0.1:8888/lab?token=99999...
Access: http://localhost:8888 and use the token.
| Service | URL | Credentials |
|---|---|---|
| Spark Master UI | http://localhost:8080 | β |
| Jupyter Notebook | http://localhost:8888 | token from logs |
| Airflow UI | http://localhost:8081 | admin / admin |
| Kyuubi JDBC | localhost:10009 | β |
docker exec -it spark_master spark-submit \
--master local[*] \
--packages com.mysql:mysql-connector-j:8.0.33 \
/opt/spark/jobs/test_mysql.pydocker exec -it spark_master spark-submit --master local[*] --packages com.mysql:mysql-connector-j:8.0.33 /opt/spark/jobs/bronze_tbcategories.py
docker exec -it spark_master spark-submit --master local[*] --packages com.mysql:mysql-connector-j:8.0.33 /opt/spark/jobs/bronze_tbproducts.py
docker exec -it spark_master spark-submit --master local[*] --packages com.mysql:mysql-connector-j:8.0.33 /opt/spark/jobs/bronze_tbcustomers.py
docker exec -it spark_master spark-submit --master local[*] --packages com.mysql:mysql-connector-j:8.0.33 /opt/spark/jobs/bronze_tborders.py
docker exec -it spark_master spark-submit --master local[*] --packages com.mysql:mysql-connector-j:8.0.33 /opt/spark/jobs/bronze_tborderdetail.pydocker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/silver_tbcategories.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/silver_tbproducts.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/silver_tbcustomers.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/silver_tborders.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/silver_tborderdetail.pyDimensions with SCD Type 2:
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/gold_dim_categories_scd2.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/gold_dim_products_scd2.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/gold_dim_customers_scd2.pyFact Table:
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/gold_fact_orders.pydocker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/read_gold_dim_categories.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/read_gold_dim_products.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/read_gold_dim_customers.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/read_gold_fact_orders.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/generic_query.pyExecute equivalent SQL queries in MySQL to validate totals:
SELECT
o.customer AS customer_code,
COUNT(DISTINCT o.code) AS total_orders,
SUM(od.quantity) AS total_items,
ROUND(SUM(od.quantity * od.salesvalue), 2) AS total_revenue
FROM tborders o
INNER JOIN tborderdetail od ON o.code = od.orders
WHERE o.customer IS NOT NULL
GROUP BY o.customer
ORDER BY total_revenue DESC
LIMIT 10;Validation checklist:
- β Revenue totals must be identical
- β Order counts must match
- β Top customers/products must be the same
- β Record numbers must align
SELECT
c.customer_name,
p.product_description,
DATE_FORMAT(f.order_date, 'dd-MM-yyyy') AS date,
SUM(f.total_quantity) AS quantity,
ROUND(SUM(f.total_sales), 2) AS sales
FROM gold.fact_orders f
INNER JOIN gold.dim_customers c
ON f.customer_code = c.customer_code
AND c.is_current = true
INNER JOIN gold.dim_products p
ON f.product_code = p.product_code
AND p.is_current = true
GROUP BY c.customer_name, p.product_description, DATE_FORMAT(f.order_date, 'dd-MM-yyyy')
ORDER BY date DESC, sales DESCSELECT
cat.category_description,
COUNT(DISTINCT f.order_code) AS total_orders,
ROUND(SUM(f.total_sales), 2) AS revenue
FROM gold.fact_orders f
INNER JOIN gold.dim_products p
ON f.product_code = p.product_code
AND p.is_current = true
INNER JOIN gold.dim_categories cat
ON p.category_code = cat.category_code
AND cat.is_current = true
GROUP BY cat.category_description
ORDER BY revenue DESCApache Airflow automates and orchestrates the full Medallion pipeline, replacing manual execution of each Spark job.
Airflow runs as part of the same docker-compose.yml alongside Spark, sharing the lakehouse_net network. It uses PostgreSQL as its metadata database and the LocalExecutor.
Full pipeline executing all tables across all three layers in the correct dependency order. Triggered manually (schedule_interval=None).
Execution flow:
bronze_categories β silver_categories β gold_dim_categories βββ
bronze_customers β silver_customers β gold_dim_customers βββ€
bronze_products β silver_products β gold_dim_products βββΌβββΊ gold_fact_orders
bronze_orders β silver_orders βββββββββββββββββββββββββββββ€
bronze_orderdetail β silver_orderdetail βββββββββββββββββββββββββββββ
Simplified DAG for testing the pipeline with a single table (tbcategories), running bronze β silver β gold in sequence.
- β Complete ETL pipeline (Extract, Transform, Load)
- β Layered data architecture (Medallion)
- β Dimensional modeling (Star Schema)
- β SCD Type 2 for historical dimensions
- β Surrogate Keys and Business Keys
- β Fact and dimension tables
- β Data quality validation
- β Pipeline orchestration with Apache Airflow
- β SQL IDE access to lakehouse via Apache Kyuubi
- β DataFrames API
- β Spark SQL
- β Transformations (select, filter, join, groupBy)
- β Complex aggregations
- β Window functions
- β Read/Write Parquet
- β JDBC connections
- β Performance optimization (cache, persist)
- β Docker containerization
- β Docker Compose orchestration
- β Network configuration
- β Volume management
- β Environment variables
- Hive Metastore for persistent table registration (eliminate per-session CREATE TABLE)
- Data quality framework (Great Expectations)
- Optimized partitioning by date
- Automated testing (pytest)
- CI/CD pipeline
- BI Dashboard (Power BI / Metabase)
- Data lineage documentation
- Monitoring and alerts
- Parquet compression and optimization
- Delta Lake implementation
This project was developed for educational and personal portfolio purposes.
Rodrigo Ribeiro
Data Engineer
LinkedIn: https://www.linkedin.com/in/rodrigo-ribeiro-pro/
β If this project was useful to you, consider leaving a star!
