Skip to content

dbconsultoria/myspark

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

22 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

MySpark - Data Warehouse with Medallion Architecture πŸ—οΈ

πŸ“Œ About the Project

This is my personal Data Engineering portfolio project, developed to demonstrate practical skills in building ETL pipelines, dimensional modeling, and implementing modern Data Warehouses.

After several courses and studies on Spark and Medallion architecture, I decided to build my own project from scratch, applying advanced concepts and adding my own implementation vision. A key differentiator of this project is the implementation of SCD Type 2 (Slowly Changing Dimensions) in Spark, a technique rarely seen in educational projects but essential in corporate environments for maintaining dimensional change history.

πŸ‘¨β€πŸ’» Author

Rodrigo Ribeiro
Data Engineer
LinkedIn


🎯 Project Goals

  • Implement complete Medallion architecture (Bronze β†’ Silver β†’ Gold)
  • Build Data Warehouse with dimensional model (Star Schema)
  • Apply SCD Type 2 for historical change tracking
  • Process data with Apache Spark using PySpark
  • Orchestrate complete environment with Docker
  • Validate data quality across layers
  • Query the lakehouse via SQL IDE using Apache Kyuubi
  • Demonstrate Data Engineering best practices

πŸ›οΈ Project Architecture

Technologies Used

  • Apache Spark 3.5 - Distributed data processing
  • PySpark - Python API for Spark
  • MySQL 8.0 - Source database (OLTP)
  • Docker & Docker Compose - Containerization and orchestration
  • Jupyter Notebook - Exploratory analysis
  • Apache Airflow 2.9.1 - Pipeline orchestration
  • Apache Kyuubi 1.10.3 - SQL gateway for JDBC/ODBC access to Spark
  • DBeaver - SQL IDE connected to the lakehouse via Kyuubi
  • Parquet - Columnar storage format

Alt text

Medallion Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   MySQL     β”‚ ───> β”‚   Bronze    β”‚ ───> β”‚   Silver    β”‚ ───> β”‚    Gold     β”‚
β”‚   (OLTP)    β”‚      β”‚  Raw Data   β”‚      β”‚  Cleaned    β”‚      β”‚  Business   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
   Source            Ingestion           Validation         DW Modeling
                                                                    β”‚
                                                                    β–Ό
                                                           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                                           β”‚  Apache Kyuubi  β”‚
                                                           β”‚  (SQL Gateway)  β”‚
                                                           β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                                                    β”‚ JDBC
                                                                    β–Ό
                                                           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                                           β”‚     DBeaver     β”‚
                                                           β”‚   (SQL IDE)     β”‚
                                                           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ₯‰ Bronze Layer (Raw Data)

  • Purpose: Ingest raw data from MySQL without transformations
  • Format: Parquet
  • Characteristics:
    • Faithful copy of source data
    • Preservation of original data types
    • Foundation for future reprocessing
  • Tables: tbcategories, tbproducts, tbcustomers, tborders, tborderdetail

πŸ₯ˆ Silver Layer (Cleaned & Validated)

  • Purpose: Cleansing, validation, and standardization of data
  • Format: Parquet
  • Characteristics:
    • Duplicate removal
    • Null value handling
    • Format standardization
    • Integrity validations
  • Tables: Same as Bronze, but refined

πŸ₯‡ Gold Layer (Business Ready)

  • Purpose: Dimensional modeling for analytics and BI
  • Format: Parquet
  • Characteristics:
    • Star Schema model
    • Dimensions with SCD Type 2
    • Aggregated fact table
    • Optimized for analytical queries

πŸ“Š Dimensional Modeling (Star Schema)

Dimensions (SCD Type 2)

dim_categories

Product categories dimension with change history.

Columns:

  • category_sk - Surrogate Key
  • category_code - Business Key (original code)
  • category_description - Category description
  • valid_from - Start date of validity
  • valid_to - End date of validity (NULL = current)
  • is_current - Flag indicating current version

dim_products

Products dimension with tracking of changes in price, description, status, and category.

Columns:

  • product_sk - Surrogate Key
  • product_code - Business Key
  • product_description - Product description
  • sale_value - Sale price
  • is_active - Active/inactive status
  • category_code - FK to category
  • valid_from, valid_to, is_current - SCD2 control

Tracked changes:

  • Price changes
  • Description updates
  • Category modifications
  • Product activation/deactivation

dim_customers

Customers dimension with history of registration changes.

Columns:

  • customer_sk - Surrogate Key
  • customer_code - Business Key
  • customer_name - Customer name
  • customer_address - Address
  • customer_phone - Phone number
  • customer_email - Email
  • birth_date - Birth date
  • valid_from, valid_to, is_current - SCD2 control

Tracked changes:

  • Address updates
  • Phone/email changes
  • Registration updates

Fact Table

fact_orders

Consolidated sales fact table with aggregated metrics.

Columns:

  • order_code - Order code (Degenerate Dimension)
  • customer_code - FK to dim_customers
  • product_code - FK to dim_products
  • order_date - Order date/time
  • total_quantity - Total quantity sold
  • total_sales - Total value (quantity Γ— unit_price)
  • line_count - Number of detail lines

Granularity: One row per product per order

Calculated metrics:

  • Total revenue per product/order
  • Total quantity sold
  • Transaction count

πŸ”„ SCD Type 2 (Slowly Changing Dimensions)

Concept

SCD Type 2 maintains complete change history in dimensions, enabling accurate temporal analysis.

Practical example:

A product had its price changed:

product_sk product_code description sale_value valid_from valid_to is_current
1 100 Gaming Mouse 150.00 2024-01-01 2024-06-15 false
42 100 RGB Gaming Mouse 180.00 2024-06-16 NULL true

Benefits:

  • βœ… Historical price analysis
  • βœ… Registration change tracking
  • βœ… Complete audit trail
  • βœ… Point-in-time reporting

Implementation

SCD2 Algorithm:

  1. Detect new records and changes
  2. Expire old versions (set is_current = false, valid_to = current_date)
  3. Generate new surrogate key for updated versions
  4. Insert new versions with is_current = true

πŸ”Œ Apache Kyuubi β€” SQL Gateway

Apache Kyuubi is a distributed SQL gateway that exposes Spark via a HiveServer2-compatible JDBC/ODBC interface, enabling any SQL IDE (such as DBeaver) to query the lakehouse directly without replacing the Spark engine.

How it fits in the stack

DBeaver (SQL IDE)
      β”‚  JDBC (port 10009)
      β–Ό
Apache Kyuubi
      β”‚  spark-submit
      β–Ό
Apache Spark ──► Parquet files (Bronze / Silver / Gold)

Kyuubi acts as a gateway β€” it receives SQL queries from DBeaver, submits them to Spark, and returns the results. Unlike Trino, it does not replace Spark as the execution engine.

Configuration (kyuubi-defaults.conf)

kyuubi.engine.spark.master=spark://spark-master:7077
kyuubi.engine.share.level=SERVER
kyuubi.engine.type=SPARK_SQL
spark.home=/opt/kyuubi/externals/spark-3.5.2-bin-hadoop3

DBeaver Connection

Field Value
Driver Apache Hive 2
Host localhost
Port 10009
Database default
URL jdbc:hive2://localhost:10009/default;connectTimeout=60000;socketTimeout=120000
Username / Password (leave blank)

Note: On first connection, Kyuubi needs ~15–20 seconds to launch the Spark engine. Increase DBeaver's connection timeout to at least 60 seconds to avoid premature disconnection.

Registering tables in DBeaver

Since the Gold layer uses plain Parquet files (no Hive Metastore), tables must be registered per session before querying:

CREATE DATABASE IF NOT EXISTS bronze;
CREATE DATABASE IF NOT EXISTS silver;
CREATE DATABASE IF NOT EXISTS gold;

CREATE TABLE IF NOT EXISTS gold.dim_categories  USING parquet LOCATION '/data/gold/dim_categories';
CREATE TABLE IF NOT EXISTS gold.dim_products     USING parquet LOCATION '/data/gold/dim_products';
CREATE TABLE IF NOT EXISTS gold.dim_customers    USING parquet LOCATION '/data/gold/dim_customers';
CREATE TABLE IF NOT EXISTS gold.fact_orders      USING parquet LOCATION '/data/gold/fact_orders';

Note: These registrations live in memory. If the Kyuubi container restarts, re-run the CREATE TABLE statements. A persistent Hive Metastore would eliminate this step and is listed as a future improvement.

Example analytical query

SELECT
    c.customer_name,
    p.product_description,
    DATE_FORMAT(f.order_date, 'dd-MM-yyyy') AS date,
    SUM(f.total_quantity) AS quantity,
    ROUND(SUM(f.total_sales), 2) AS sales
FROM gold.fact_orders f
INNER JOIN gold.dim_customers c
    ON f.customer_code = c.customer_code
    AND c.is_current = true
INNER JOIN gold.dim_products p
    ON f.product_code = p.product_code
    AND p.is_current = true
GROUP BY c.customer_name, p.product_description, DATE_FORMAT(f.order_date, 'dd-MM-yyyy')
ORDER BY date DESC, sales DESC

πŸ“ Project Structure

myspark/
β”œβ”€β”€ spark/
β”‚   β”œβ”€β”€ docker-compose.yml
β”‚   β”œβ”€β”€ kyuubi-defaults.conf         # Kyuubi configuration
β”‚   β”œβ”€β”€ Dockerfile
β”‚   β”œβ”€β”€ jobs/
β”‚   β”‚   β”œβ”€β”€ test_mysql.py
β”‚   β”‚   β”œβ”€β”€ bronze_*.py              # Bronze ingestion
β”‚   β”‚   β”œβ”€β”€ silver_*.py              # Silver transformation
β”‚   β”‚   β”œβ”€β”€ gold_dim_*_scd2.py       # Gold dimensions with SCD2
β”‚   β”‚   β”œβ”€β”€ gold_fact_orders.py      # Fact table
β”‚   β”‚   β”œβ”€β”€ read_gold_*.py           # Read scripts
β”‚   β”‚   └── generic_query.py         # Generic query
β”‚   β”œβ”€β”€ lakehouse/
β”‚   β”‚   β”œβ”€β”€ bronze/                  # Raw data
β”‚   β”‚   β”œβ”€β”€ silver/                  # Clean data
β”‚   β”‚   └── gold/                    # Dimensional model
β”‚   └── work/                        # Jupyter notebooks
β”œβ”€β”€ airflow/
β”‚   β”œβ”€β”€ dags/
β”‚   β”‚   β”œβ”€β”€ dag_lakehouse_pipeline.py  # Full pipeline DAG
β”‚   β”‚   └── dag_test_tbcategories.py   # Test DAG (categories only)
β”‚   β”œβ”€β”€ logs/
β”‚   └── plugins/
└── mysql/
    β”œβ”€β”€ docker-compose.yml
    └── init/
        └── init.sql                 # Initialization script

πŸš€ Execution Guide

Prerequisites

  • Docker installed
  • Docker Compose installed
  • 8GB RAM available (recommended)
  • DBeaver installed (for SQL IDE access via Kyuubi)

Initial Setup

1. Create Docker network (mandatory)

docker network create lakehouse_net

2. Create directory structure

Inside the spark/ folder, create:

mkdir -p lakehouse/bronze lakehouse/silver lakehouse/gold

Inside the airflow/ folder, create:

mkdir -p airflow/dags airflow/logs airflow/plugins

3. Start MySQL and Spark containers

cd mysql/
docker compose up -d

cd ../spark/
docker compose up -d

4. Get Jupyter Notebook token

docker logs spark_notebook

Look for:

[I ########] http://127.0.0.1:8888/lab?token=99999...

Access: http://localhost:8888 and use the token.


Services Overview

Service URL Credentials
Spark Master UI http://localhost:8080 β€”
Jupyter Notebook http://localhost:8888 token from logs
Airflow UI http://localhost:8081 admin / admin
Kyuubi JDBC localhost:10009 β€”

Execution Pipeline

MySQL Connection Test

docker exec -it spark_master spark-submit \
  --master local[*] \
  --packages com.mysql:mysql-connector-j:8.0.33 \
  /opt/spark/jobs/test_mysql.py

Bronze Layer - Ingestion

docker exec -it spark_master spark-submit --master local[*] --packages com.mysql:mysql-connector-j:8.0.33 /opt/spark/jobs/bronze_tbcategories.py
docker exec -it spark_master spark-submit --master local[*] --packages com.mysql:mysql-connector-j:8.0.33 /opt/spark/jobs/bronze_tbproducts.py
docker exec -it spark_master spark-submit --master local[*] --packages com.mysql:mysql-connector-j:8.0.33 /opt/spark/jobs/bronze_tbcustomers.py
docker exec -it spark_master spark-submit --master local[*] --packages com.mysql:mysql-connector-j:8.0.33 /opt/spark/jobs/bronze_tborders.py
docker exec -it spark_master spark-submit --master local[*] --packages com.mysql:mysql-connector-j:8.0.33 /opt/spark/jobs/bronze_tborderdetail.py

Silver Layer - Cleansing and Validation

docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/silver_tbcategories.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/silver_tbproducts.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/silver_tbcustomers.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/silver_tborders.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/silver_tborderdetail.py

Gold Layer - Dimensional Modeling

Dimensions with SCD Type 2:

docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/gold_dim_categories_scd2.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/gold_dim_products_scd2.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/gold_dim_customers_scd2.py

Fact Table:

docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/gold_fact_orders.py

πŸ“– Read and Analysis Scripts

docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/read_gold_dim_categories.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/read_gold_dim_products.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/read_gold_dim_customers.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/read_gold_fact_orders.py
docker exec -it spark_master spark-submit --master local[*] /opt/spark/jobs/generic_query.py

πŸ” Data Validation

Spark vs MySQL Comparison

Execute equivalent SQL queries in MySQL to validate totals:

SELECT 
    o.customer AS customer_code,
    COUNT(DISTINCT o.code) AS total_orders,
    SUM(od.quantity) AS total_items,
    ROUND(SUM(od.quantity * od.salesvalue), 2) AS total_revenue
FROM tborders o
INNER JOIN tborderdetail od ON o.code = od.orders
WHERE o.customer IS NOT NULL
GROUP BY o.customer
ORDER BY total_revenue DESC
LIMIT 10;

Validation checklist:

  • βœ… Revenue totals must be identical
  • βœ… Order counts must match
  • βœ… Top customers/products must be the same
  • βœ… Record numbers must align

πŸ“Š Analysis Examples

Query 1: Sales by Customer, Product and Date

SELECT
    c.customer_name,
    p.product_description,
    DATE_FORMAT(f.order_date, 'dd-MM-yyyy') AS date,
    SUM(f.total_quantity) AS quantity,
    ROUND(SUM(f.total_sales), 2) AS sales
FROM gold.fact_orders f
INNER JOIN gold.dim_customers c
    ON f.customer_code = c.customer_code
    AND c.is_current = true
INNER JOIN gold.dim_products p
    ON f.product_code = p.product_code
    AND p.is_current = true
GROUP BY c.customer_name, p.product_description, DATE_FORMAT(f.order_date, 'dd-MM-yyyy')
ORDER BY date DESC, sales DESC

Query 2: Analysis by Category

SELECT
    cat.category_description,
    COUNT(DISTINCT f.order_code) AS total_orders,
    ROUND(SUM(f.total_sales), 2) AS revenue
FROM gold.fact_orders f
INNER JOIN gold.dim_products p
    ON f.product_code = p.product_code
    AND p.is_current = true
INNER JOIN gold.dim_categories cat
    ON p.category_code = cat.category_code
    AND cat.is_current = true
GROUP BY cat.category_description
ORDER BY revenue DESC

πŸŒ€ Airflow Orchestration

Apache Airflow automates and orchestrates the full Medallion pipeline, replacing manual execution of each Spark job.

Setup

Airflow runs as part of the same docker-compose.yml alongside Spark, sharing the lakehouse_net network. It uses PostgreSQL as its metadata database and the LocalExecutor.

DAGs

lakehouse_pipeline

Full pipeline executing all tables across all three layers in the correct dependency order. Triggered manually (schedule_interval=None).

Execution flow:

bronze_categories  β†’ silver_categories  β†’ gold_dim_categories  ──┐
bronze_customers   β†’ silver_customers   β†’ gold_dim_customers   ───
bronze_products    β†’ silver_products    β†’ gold_dim_products    ──┼──► gold_fact_orders
bronze_orders      β†’ silver_orders      ─────────────────────────────
bronze_orderdetail β†’ silver_orderdetail β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

test_tbcategories

Simplified DAG for testing the pipeline with a single table (tbcategories), running bronze β†’ silver β†’ gold in sequence.


πŸŽ“ Learning Outcomes and Applied Techniques

Data Engineering

  • βœ… Complete ETL pipeline (Extract, Transform, Load)
  • βœ… Layered data architecture (Medallion)
  • βœ… Dimensional modeling (Star Schema)
  • βœ… SCD Type 2 for historical dimensions
  • βœ… Surrogate Keys and Business Keys
  • βœ… Fact and dimension tables
  • βœ… Data quality validation
  • βœ… Pipeline orchestration with Apache Airflow
  • βœ… SQL IDE access to lakehouse via Apache Kyuubi

Apache Spark / PySpark

  • βœ… DataFrames API
  • βœ… Spark SQL
  • βœ… Transformations (select, filter, join, groupBy)
  • βœ… Complex aggregations
  • βœ… Window functions
  • βœ… Read/Write Parquet
  • βœ… JDBC connections
  • βœ… Performance optimization (cache, persist)

DevOps / Infrastructure

  • βœ… Docker containerization
  • βœ… Docker Compose orchestration
  • βœ… Network configuration
  • βœ… Volume management
  • βœ… Environment variables

πŸ”§ Future Improvements

  • Hive Metastore for persistent table registration (eliminate per-session CREATE TABLE)
  • Data quality framework (Great Expectations)
  • Optimized partitioning by date
  • Automated testing (pytest)
  • CI/CD pipeline
  • BI Dashboard (Power BI / Metabase)
  • Data lineage documentation
  • Monitoring and alerts
  • Parquet compression and optimization
  • Delta Lake implementation

πŸ“ License

This project was developed for educational and personal portfolio purposes.


πŸ’¬ Contact

Rodrigo Ribeiro
Data Engineer
LinkedIn: https://www.linkedin.com/in/rodrigo-ribeiro-pro/


⭐ If this project was useful to you, consider leaving a star!

Releases

No releases published

Packages

 
 
 

Contributors