์ด์ปค๋จธ์ค ๋ฐ์ดํฐ ์์ง, ๋ณํ, ์ ์ฌ ํตํฉ ํ๋ ์์ํฌ
๋ค์ํ ์ด์ปค๋จธ์ค API์ ์น๋ฌธ์ ์๋ต์ ์์งํ๊ณ DuckDB ๊ธฐ๋ฐ ๋ณํ์ ๊ฑฐ์ณ,
BigQuery, PostgreSQL, Google Sheets ๋ฑ ์ธ๋ถ ์์คํ ์ ๋ฐ์ดํฐ๋ฅผ ์ ์ฌํ๋ ์ํฌํ๋ก์ฐ๋ฅผ ๊ด๋ฆฌํ๊ธฐ ์ํ ํ๋ก์ ํธ๋ฅผ ์๋ดํ๋ค.
- ํ๋ก์ ํธ ๊ฐ์
- ํต์ฌ ์ํคํ ์ฒ
- ํ์ค ETL ๋ชจ๋ ๊ตฌ์กฐ
- Airflow ์๋น์ค ๊ตฌ์กฐ
- Airflow ์์ ์ค์ผ์ค๋ง
- PostgreSQL ์ ์ฌ ํ๊ฒฝ
- Streamlit UI
- ๋น ๋ฅธ ์์
- ์์ ์ฝ๋
LinkMerce ํ๋ก์ ํธ๋ ์ด์ปค๋จธ์ค ํ๋ซํผ์ผ๋ก๋ถํฐ ์ผํ๋ชฐ ์ด์์ ํ์ํ ๋ฐ์ดํฐ๋ฅผ ์์งํ๊ธฐ ์ํ ๋ชฉ์ ์ ๊ฐ์ง๋ค.
Python ์คํฌ๋ํ ๋ก์ง์ ๊ตฌํํ PyPI ํจํค์ง๊ฐ ํ๋ก์ ํธ์ ์ค์ฌ์ด ๋๋ฉฐ,
์์
์ค์ผ์ค๋ง์ ์ฒ๋ฆฌํ๊ธฐ ์ํด Apache Airflow๋ฅผ ์ ๊ทน์ ์ผ๋ก ํ์ฉํ๋ค.
ํ์ฌ linkmerce ํจํค์ง ๋ฒ์ ์ 1.0.6์ด๋ค.
ํ๋ก์ ํธ์์ ์ฃผ๋ชฉํ ๋ถ๋ถ์ ๋ค์ 5๊ฐ์ง๋ค.
src/linkmerce: ์น ์คํฌ๋ํ ๋ฐ ์ธ๋ถ ์์คํ ๊ณผ์ ๋ฐ์ดํฐ ์ฐ๋์ ์ง์ํ๋ ํต์ฌ Python ํจํค์งsrc/tests: Python ํจํค์ง์ ๋์์ ๊ฒ์ฆํ๊ณ ์ค๊ฐ ์คํ ๊ฒฐ๊ณผ๋ฅผ ์ ์ฅํ๋ ํ ์คํธ ๋ชจ์airflow: ์์ ์ค์ผ์ค๋ง, ์ค์ผ์คํธ๋ ์ด์ , Playwright ๊ธฐ๋ฐ ๋ธ๋ผ์ฐ์ ์๋ํ๊ฐ ๋ค์ด ์๋ DAG ๋ชจ์postgres: ๋ก์ปฌ PostgreSQL 18 ์ ์ฌ ํ๊ฒฝ๊ณผ ์ด๊ธฐ ์คํค๋ง, ํํฐ์ , Parquet ํ์ฅ์ ๊ด๋ฆฌํ๋ ์คํ ํ๊ฒฝstreamlit: ์ผํ๋ชฐ ์ด์ ๋ด๋น์๊ฐ ๋น์ ๊ธฐ์ ์ธ ๋ฐ์ดํฐ ์์ง์ ์ํด DAG์ ์๋์ผ๋ก ํธ๋ฆฌ๊ฑฐํ ๋ ์ฌ์ฉํ๋ UI
linkmerce ํจํค์ง๊ฐ ์ง์ํ๋ ์ด์ปค๋จธ์ค ๊ด๋ จ ํ๋ซํผ์ ์์ง ๋ฒ์๋ ๋ค์๊ณผ ๊ฐ๋ค.
| ํ๋ซํผ ๊ตฌ๋ถ | ์์ง ๋ฒ์ |
|---|---|
| CJ๋ํํต์ด eFLEXs | ์ฌ๊ณ |
| ์ฟ ํก ๊ด๊ณ ์ผํฐ | ๊ด๊ณ |
| ์ฟ ํก ํ๋งค์์ผํฐ | ์ํ, ๋งค์ถ |
| ์ด์นด์ดํธ API | ์ํ, ์ฌ๊ณ |
| ๊ตฌ๊ธ API | ๊ด๊ณ |
| ๋ฉํ API | ๊ด๊ณ |
| ๋ค์ด๋ฒ ๋ฉ์ธ | ๊ฒ์ |
| ๋ค์ด๋ฒ ์คํ API | ๊ฒ์ |
| ์ฌ๋ฐฉ๋ท ์์คํ | ์ฃผ๋ฌธ, ์ํ |
| ๋ค์ด๋ฒ ๊ฒ์๊ด๊ณ API | ๊ด๊ณ ๋ณด๊ณ ์, ๊ด๊ณ ๊ณ์ฝ, ๊ฒ์๋ |
| ๋ค์ด๋ฒ ๊ด๊ณ ์ฃผ์ผํฐ (๊ฒ์๊ด๊ณ ) | ๊ด๊ณ ๋ณด๊ณ ์ |
| ๋ค์ด๋ฒ ์ฑ๊ณผํ ๋์คํ๋ ์ด ๊ด๊ณ | ๊ด๊ณ ๋ณด๊ณ ์, ๊ด๊ณ ์์ |
| ๋ค์ด๋ฒ ์ปค๋จธ์ค API | ์ฃผ๋ฌธ, ์ํ, ํต๊ณ |
| ๋ค์ด๋ฒ ์ผํํํธ๋์ผํฐ | ๋งค์ถ, ๋ฐฉ๋ฌธ ํต๊ณ, ์นดํ๋ก๊ทธ/์ํ |
linkmerce ํจํค์ง๋ฅผ ์ดํดํ๊ธฐ ์ํด ์ฃผ๋ชฉํด์ผ ํ ๊ฒ์ Extractor โ Transformer ์ฐ๊ฒฐ์ด๋ค.
linkmerce ํจํค์ง๋ ETL ํ๋ก์ธ์ค๋ฅผ [ ์ถ์ถ(Extract), ๋ณํ(Transform), ์ ์ฌ(Load) ] 3๊ฐ์ง ๋ถ๋ถ์ผ๋ก ๊ตฌ๋ถํ๋ค.
Extractor์ Transformer๋ ๊ฐ๊ฐ ์ถ์ถ๊ณผ ๋ณํ ์ญํ ์ ๋ด๋นํ๋ค.
Extractor๋ ๋๊ธฐ ๋๋ ๋น๋๊ธฐ HTTP ์ธ์ ์ ํ์ฉํ HTTP ์์ฒญ์ ๋ด๋นํ๋ค.
์ผ๋ถ ์์ ์ ๋งค๊ฐ๋ณ์ ๋ชฉ๋ก์ ๋ํด ๋ฐ๋ณต ์์ฒญํ๋๋ฐ,Task๋ฅผ ํ์ฉํด ์ด๋ฌํ ๋์์ ์ถ์ํํ๋ค.Transformer๋ HTTP ์๋ต ๊ฒฐ๊ณผ๋ฅผ JSON ํ์์ผ๋ก ํ์ฑํ๊ณ , DuckDB ํ ์ด๋ธ์ ์คํค๋ง์ ๋ง๊ฒ ๋ณํํด ์ ์ฌํ๋ค.
DuckDB ์ฐ๊ฒฐ์ ํตํด BigQuery, PostgreSQL, Google Sheets ๊ฐ์ ์ธ๋ถ ์์คํ ์ ๋ฐ์ดํฐ๋ฅผ ์ ์ฌํ๋ ํ์ฅ ๊ธฐ๋ฅ์ ๋ณ๋๋ก ์ ๊ณตํ๋ค.
์ง๊ธ๊น์ง์ ์ค๋ช ์ ์๋ ํ๋ก ์ ๋ฆฌํ ์ ์๋ค.
| ๊ณ์ธต ๊ตฌ๋ถ | ์ฑ ์ | ๊ตฌํ ๊ฒฝ๋ก |
|---|---|---|
Extractor |
HTTP ์ธ์ ๊ด๋ฆฌ, ์์ฒญ ๋ฉ์์ง ๋น๋ | src/linkmerce/common/extract.py |
Task |
๋ฐ๋ณต ์์ฒญ, ์ฌ์๋, ํ์ด์ง๋ค์ด์ | src/linkmerce/common/tasks.py |
ResponseTransformer |
์๋ต ๊ฒฐ๊ณผ ํ์ฑ, JSON ํ์์ผ๋ก ๋ณํ | src/linkmerce/common/transform.py |
DuckDBTransformer |
DuckDB ํ ์ด๋ธ ์์ฑ ๋ฐ ์ ์ฌ | src/linkmerce/common/transform.py |
DuckDBConnection |
DuckDB ์ฐ๊ฒฐ ๊ด๋ฆฌ, CRUD ์์ ์ง์ | src/linkmerce/common/load.py |
API Endpoint |
Extractor์ Transformer ์ฐ๊ฒฐ |
src/linkmerce/api/common.py |
Extensions |
DuckDB ํ ์ด๋ธ์ ์ธ๋ถ ์์คํ ๊ณผ ์ฐ๋ | src/linkmerce/extensions/*.py |
๊ณ์ธต ๊ตฌ๋ถ์ ๋ฐ๋ฅธ ์ํฌํ๋ก์ฐ๋ ๋ํ ๋ค์์ ํ๋ฆ๋๋ก ์ ๋ฆฌํด๋ณผ ์๋ ์๋ค.
[API Endpoint] (api/...)
โ ์ฌ์ฉ์ ํ๋ผ๋ฏธํฐ ์ ๋ฌ
Extractor (core/.../extract.py)
โ HTTP ์์ฒญ ๋ฐ ์๋ต ์์
ResponseTransformer (core/.../transform.py)
โ JSON ํ์์ผ๋ก ๋ณํ
DuckDBTransformer (core/.../transform.py)
โ DuckDB ํ
์ด๋ธ์ ์ ์ฌ
DuckDBConnection (common/load.py)
โ ์ธ๋ถ ์์คํ
์ ์ ์ฌ
BigQueryClient / PostgresClient / WorksheetClient (extensions/...)ETL ํ๋ก์ธ์ค๋ฅผ ํ๋ซํผ, ํธ์คํธ๋ช , ์นดํ ๊ณ ๋ฆฌ๋ก ๊ตฌ์ฑ๋ 3๋จ๊ณ ํ์ ๊ฒฝ๋ก๋ก ๊ตฌ๋ถํ๋ค.
core/{platform}/{hostname}/{category}/
โโโ extract.py
โโโ transform.py
โโโ models.sql์ด ๊ตฌ์กฐ๋ ํ๋์ ์นดํ ๊ณ ๋ฆฌ ์์์ ์ฑ ์์ ๋ช ํํ ๋ถ๋ฆฌํ๋ค.
extract.py: HTTP ์์ฒญ ๋ฐฉ์ ๊ตฌํtransform.py: ์๋ต ํ์ฑ ๋ฐ DuckDB ์ ์ฌ ๋ฐฉ์ ๊ตฌํmodels.sql: CREATE, INSERT ๋ฑ SQL ์ฟผ๋ฆฌ๋ฌธ ์ค๊ณ
์ด ํจํด ๋๋ถ์ ํ๋ซํผ ๋๋ ํธ์คํธ๋ณ๋ก ๊ตฌํ์ด ๋ฌ๋ผ๋, ์คํ ๋ฐฉ์๊ณผ ํ ์คํธ ๋ฐฉ์์ ๋น๊ต์ ์ผ๊ด๋๊ฒ ์ ์ง๋๋ค.
ETL ํ๋ก์ธ์ค๋ฅผ ์คํํ ๋๋ ํธ์์ฑ์ ์ํด core/ ๋ชจ๋์ ์์๋๋ก ํธ์ถํ์ง ์๊ณ , ํ๋์ API ํจ์๋ฅผ ํธ์ถํ๋ค.
linkmerce ํจํค์ง์ ๊ด๋ จ๋ src/linkmerce/ ๊ฒฝ๋ก์ ๋ํ ์์ธ ์ค๋ช
์
๋ณ๋์ ๋ฌธ์๋ฅผ ์ฐธ๊ณ ํ๋ค.
src/tests ๊ฒฝ๋ก์์ ๋จ์ผ Extractor ๋๋ Transformer ๋จ์๋ก ์ ์ ๋์ํ๋์ง ํ
์คํธ๋ฅผ ๋ด๋นํ๋ค.
test_extract.py:Extractor.extract()์คํ ๊ฒฐ๊ณผ ์ ์ฅtest_transform.py:DuckDBTransformer.parse()๋ฐbulk_insert()์คํ ๊ฒฐ๊ณผ ์ ์ฅconftest.py: ๊ณต์ฉ ๋ฆฌ์์ค Fixture ์ ์,Transformerํ ์คํธ๋ฅผ ์ง์ํ๋TransformerHarness์ ๊ณตresults/: ๊ฐ ํ ์คํธ์ ์คํ ๊ฒฐ๊ณผ๊ฐ ์ ์ฅ๋๋ ๊ฒฝ๋ก. Git ๋ฒ์ ๊ด๋ฆฌ์์๋ ์ ์ธ๋๋ค.
ํ
์คํธ์ ๊ด๋ จ๋ src/linkmerce/tests/ ๊ฒฝ๋ก์ ๋ํ ์์ธ ์ค๋ช
์
๋ณ๋์ ๋ฌธ์๋ฅผ ์ฐธ๊ณ ํ๋ค.
Airflow ์์คํ
์ ๊ตฌ์ฑํ๋ ์๋น์ค ๋ชฉ๋ก์ docker-compose.yaml ์ค์ ์์ ์ ์ํ๋ค.
์ด์ ํ๊ฒฝ์์ Airflow Celery Executor ๊ตฌ์ฑ์ ๊ธฐ์ค์ผ๋ก ๋ค์ ์๋น์ค๋ค์ด ์คํ๋๊ณ ์๋ค.
postgresredisplaywrightairflow-apiserverairflow-schedulerairflow-dag-processorairflow-workerairflow-triggererairflow-init
๋๋จธ์ง๋ Apache Airflow์์ ๊ธฐ๋ณธ์ผ๋ก ์ ์ํ ์๋น์ค๋ค์ด์ง๋ง, playwright ์๋น์ค๋ฅผ ์์ธ์ ์ผ๋ก ์ถ๊ฐํ๋ค.
์ผ๋ถ ์์
์์๋ playwright ์๋น์ค๋ฅผ ํ์ฉํด ๋ธ๋ผ์ฐ์ ๋ ๋๋ง์ ํ์ฉํ ์น ์คํฌ๋ํ์ ์ํํ๋ค.
Docker Compose ์คํ์ ๋ค์ ๋ช
๋ น์ด ๋๋ init.sh ์คํฌ๋ฆฝํธ๋ฅผ ์คํํ๋ค.
cd airflow
docker compose up airflow-init
docker compose up -dlinkmerce ํจํค์ง๋ฅผ ํ์ฉํ๋ Airflow DAG์ ๊ณตํต์ ์ผ๋ก ๋ค์ ํ๋ฆ์ ๊ฐ์ง๋ค.
airflow_utilsํ๋ฌ๊ทธ์ธ์read_config()๋๋read_credentials()ํจ์๋ก ์ค์ ๊ณผ ์ธ์ฆ ์ ๋ณด๋ฅผ ๋ถ๋ฌ์จ๋ค.linkmerce.api.*ํจ์๋ฅผ ํธ์ถํด ์ถ์ถ(extract)๊ณผ ๋ณํ(transform) ๊ณผ์ ์ ์ํํ๋ค.DuckDBConnection์ ํตํด ํ ์ด๋ธ์ ์ ์ฌ๋ API ์คํ ๊ฒฐ๊ณผ๋ฅผ ๋ถ๋ฌ์ฌ ์ ์๋ค.dual_loadํ๋ฌ๊ทธ์ธ์ผ๋ก DuckDB ํ ์ด๋ธ์ PostgreSQL์ ๋จผ์ ์ ์ฌํ ๋ค BigQuery์๋ ์ ์ฌํ๋ค.- Task ๊ฒฐ๊ณผ๋ฅผ
{params: {...}, results: {...}}๋์ ๋๋ฆฌ๋ก ๋ฐํํ๋ค.
์ด๋ฌํ ํ๋ฆ์ DAG์ผ๋ก ๊ตฌํํ ๊ฒฝ์ฐ ๋ค์๊ณผ ๊ฐ์ ์ฝ๋๋ก ๋ํ๋ผ ์ ์๋ค.
with DAG(dag_id="...") as dag:
PATH = "platform.hostname.category"
@task(task_id="...")
def read_configs() -> dict:
from airflow_utils import read_config
return read_config(PATH, tables=True)
@task(task_id="...")
def read_credentials() -> list:
from airflow_utils import read_config
return read_config(PATH, credentials=True)["credentials"]
@task(task_id="...", map_index_template="{{ credentials['id'] }}")
def etl_task(credentials: dict, configs: dict, **kwargs) -> dict:
return main(**credentials, **configs)
def main(tables: dict[str, str], **kwargs) -> dict:
from linkmerce.common.load import DuckDBConnection
from linkmerce.api.platform.hostname import example_api
from dual_load import load_table_from_duckdb
with DuckDBConnection(tzinfo="Asia/Seoul") as conn:
example_api(**kwargs, connection=conn)
return {
"params": {},
"results": {
tables["table"]: load_table_from_duckdb(
connection = conn,
source_table = "table",
target_table = tables["table"],
),
},
}
(etl_task
.partial(configs=read_configs())
.expand(credentials=read_credentials()))Airflow์ ๊ด๋ จ๋ airflow/ ๊ฒฝ๋ก์ ๋ํ ์์ธ ์ค๋ช
์
๋ณ๋์ ๋ฌธ์๋ฅผ ์ฐธ๊ณ ํ๋ค.
postgres/ ๊ฒฝ๋ก๋ Airflow MetaDB์ ๋ณ๊ฐ๋ก ETL ๊ฒฐ๊ณผ๋ฅผ ์ ์ฌํ๊ธฐ ์ํ ๋ก์ปฌ PostgreSQL 18 ํ๊ฒฝ์ ์ ๊ณตํ๋ค.
๊ตฌ์ฑ ์์๋ ๋ค์๊ณผ ๊ฐ๋ค.
Dockerfile: PostgreSQL 18,pg_partman, Apache Arrow / Parquet ๋ฐํ์, ์์ฒดparquet_ioํ์ฅ์ ํฌํจํ ์ด๋ฏธ์ง ๋น๋init.sql: ํ๋ซํผ๋ณ ์คํค๋ง์ ํ ์ด๋ธ, ์ผ๋ณ ํํฐ์ ์ด๊ธฐํpartman_maintenance.sql: ์ด์ ์ค ๋ฏธ๋์ ํํฐ์ ์ ์์ฑํ๊ธฐ ์ํ ์ ์ง๋ณด์ SQLresources/bq_schemas.json: PostgreSQLinit.sql๊ธฐ์ค์ผ๋ก ์์ฑํ BigQuery ์คํค๋ง ์ฐธ๊ณ ํ์ผresources/parquet_io.md:parquet_ioํ์ฅ์ SQL ์ธํฐํ์ด์ค์ ํ์ ๋ณํ ์ ์ฑ ์ค๋ช
Airflow DAG์ ๋๋ถ๋ถ dual_load ํ๋ฌ๊ทธ์ธ์ ํตํด PostgreSQL ์ ์ฌ๋ฅผ ๋จผ์ ์ํํ๊ณ , ์ฑ๊ณตํ๋ฉด BigQuery ์ ์ฌ๋ฅผ ์ด์ด์ ์ํํ๋ค.
PostgreSQL์ ๋ ์๊ฒฉํ ๊ธฐ๋ณธํค์ ํ์
์ ์ฝ์ ๊ฒ์ฆํ๋ 1์ฐจ ์ ์ฌ ๋์์ผ๋ก ์ฌ์ฉํ๊ณ , BigQuery๋ ๊ธฐ์กด ๋ถ์ ํ
์ด๋ธ ์ ์ฌ ๋์์ผ๋ก ์ ์งํ๋ค.
PostgreSQL ์คํ ํ๊ฒฝ์ ๋ํ ์์ธ ์ค๋ช ์ ๋ณ๋์ ๋ฌธ์๋ฅผ ์ฐธ๊ณ ํ๋ค.
streamlit/app.py๋ Airflow REST API๋ฅผ ํธ์ถํ์ฌ ์๋์ผ๋ก DAG์ ํธ๋ฆฌ๊ฑฐํ๊ธฐ ์ํ ์ด์์ฉ UI๋ค.
Airflow UI์ ์ ๊ทผํ์ฌ ์ง์ DAG์ ํธ๋ฆฌ๊ฑฐํ ์ ์๋ ์ผํ๋ชฐ ์ด์ ๋ด๋น์๊ฐ
๋น์ ๊ธฐ์ ์ธ ๋ฐ์ดํฐ ์์ง ์์
(์ฌ๋ฐฉ๋ท ์ฃผ๋ฌธ ETL๋ง ์ง์)์์ DAG์ ์ ์ดํ ์ ์๊ฒ ๊ฐ๋จํ UI์ ์ ๊ณตํ๋ค.
Airflow์๋ ๋ณ๋์ Docker Compose๋ก ๋จ์ผ Streamlit ์๋น์ค๋ฅผ ๊ด๋ฆฌํ๋ฉฐ,
๋ก์ปฌ ๋คํธ์ํฌ์์ ์๋ฒ ์์ดํผ์ ํฌํธ๋ฅผ ์กฐํฉํ ์ฃผ์๋ก ์ ๊ทผํ๋ค.
Docker Compose ์คํ์ ๋ค์ ๋ช ๋ น์ด๋ฅผ ์ฌ์ฉํ๋ค.
cd streamlit
docker compose up -dpip install -e .ํจํค์ง ๋ฉํ๋ฐ์ดํฐ๋ pyproject.toml์ ์ ์๋์ด ์๋ค.
cd postgres
./build.sh
docker compose up -dcd airflow
docker compose up airflow-init
docker compose up -dAirflow์์ dual load๋ฅผ ์คํํ๋ ค๋ฉด postgres, bigquery Airflow Connection์ ๋จผ์ ๋ฑ๋กํด์ผ ํ๋ค.
pytest src/tests/test_extract.py -m extract -v -s
pytest src/tests/test_transform.py -m transform -v -s
pytest src/tests/test_load.py -m load -v -sfrom linkmerce.api.smartstore.api import marketing_channel
from linkmerce.common.load import DuckDBConnection
with DuckDBConnection(tzinfo="Asia/Seoul") as conn:
rows = marketing_channel(
client_id="...",
client_secret="...",
channel_seq="...",
start_date="2025-01-01",
end_date="2025-01-31",
connection=conn,
return_type="json",
)
print(len(rows))์ด ํธ์ถ์ API ํจ์๋ฅผ ํตํด Extractor์ DuckDBTransformer๋ฅผ ์ฐ์์ ์ผ๋ก ์คํํ๊ณ ,
๊ฒฐ๊ณผ๋ฅผ DuckDB ํ
์ด๋ธ์ ์ ์ฌํ ๋ค, ํ
์ด๋ธ ํ์ ์กฐํํ์ฌ list[dict] ํ์์ผ๋ก ๋ฐํํ๋ค.