Skip to content

minyeamer/linkmerce

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 

History

443 Commits
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

LinkMerce

์ด์ปค๋จธ์Šค ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘, ๋ณ€ํ™˜, ์ ์žฌ ํ†ตํ•ฉ ํ”„๋ ˆ์ž„์›Œํฌ

๋‹ค์–‘ํ•œ ์ด์ปค๋จธ์Šค API์™€ ์›น๋ฌธ์„œ ์‘๋‹ต์„ ์ˆ˜์ง‘ํ•˜๊ณ  DuckDB ๊ธฐ๋ฐ˜ ๋ณ€ํ™˜์„ ๊ฑฐ์ณ,
BigQuery, PostgreSQL, Google Sheets ๋“ฑ ์™ธ๋ถ€ ์‹œ์Šคํ…œ์— ๋ฐ์ดํ„ฐ๋ฅผ ์ ์žฌํ•˜๋Š” ์›Œํฌํ”Œ๋กœ์šฐ๋ฅผ ๊ด€๋ฆฌํ•˜๊ธฐ ์œ„ํ•œ ํ”„๋กœ์ ํŠธ๋ฅผ ์•ˆ๋‚ดํ•œ๋‹ค.

๋ชฉ์ฐจ

ํ”„๋กœ์ ํŠธ ๊ฐœ์š”

LinkMerce ํ”„๋กœ์ ํŠธ๋Š” ์ด์ปค๋จธ์Šค ํ”Œ๋žซํผ์œผ๋กœ๋ถ€ํ„ฐ ์‡ผํ•‘๋ชฐ ์šด์˜์— ํ•„์š”ํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์ง‘ํ•˜๊ธฐ ์œ„ํ•œ ๋ชฉ์ ์„ ๊ฐ€์ง„๋‹ค.

Python ์Šคํฌ๋ž˜ํ•‘ ๋กœ์ง์„ ๊ตฌํ˜„ํ•œ PyPI ํŒจํ‚ค์ง€๊ฐ€ ํ”„๋กœ์ ํŠธ์˜ ์ค‘์‹ฌ์ด ๋˜๋ฉฐ,
์ž‘์—… ์Šค์ผ€์ค„๋ง์„ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด Apache Airflow๋ฅผ ์ ๊ทน์ ์œผ๋กœ ํ™œ์šฉํ•œ๋‹ค.

ํ˜„์žฌ linkmerce ํŒจํ‚ค์ง€ ๋ฒ„์ „์€ 1.0.6์ด๋‹ค.

ํ”„๋กœ์ ํŠธ์—์„œ ์ฃผ๋ชฉํ•  ๋ถ€๋ถ„์€ ๋‹ค์Œ 5๊ฐ€์ง€๋‹ค.

  1. src/linkmerce: ์›น ์Šคํฌ๋ž˜ํ•‘ ๋ฐ ์™ธ๋ถ€ ์‹œ์Šคํ…œ๊ณผ์˜ ๋ฐ์ดํ„ฐ ์—ฐ๋™์„ ์ง€์›ํ•˜๋Š” ํ•ต์‹ฌ Python ํŒจํ‚ค์ง€
  2. src/tests: Python ํŒจํ‚ค์ง€์˜ ๋™์ž‘์„ ๊ฒ€์ฆํ•˜๊ณ  ์ค‘๊ฐ„ ์‹คํ–‰ ๊ฒฐ๊ณผ๋ฅผ ์ €์žฅํ•˜๋Š” ํ…Œ์ŠคํŠธ ๋ชจ์Œ
  3. airflow: ์ž‘์—… ์Šค์ผ€์ค„๋ง, ์˜ค์ผ€์ŠคํŠธ๋ ˆ์ด์…˜, Playwright ๊ธฐ๋ฐ˜ ๋ธŒ๋ผ์šฐ์ € ์ž๋™ํ™”๊ฐ€ ๋“ค์–ด ์žˆ๋Š” DAG ๋ชจ์Œ
  4. postgres: ๋กœ์ปฌ PostgreSQL 18 ์ ์žฌ ํ™˜๊ฒฝ๊ณผ ์ดˆ๊ธฐ ์Šคํ‚ค๋งˆ, ํŒŒํ‹ฐ์…˜, Parquet ํ™•์žฅ์„ ๊ด€๋ฆฌํ•˜๋Š” ์‹คํ–‰ ํ™˜๊ฒฝ
  5. streamlit: ์‡ผํ•‘๋ชฐ ์šด์˜ ๋‹ด๋‹น์ž๊ฐ€ ๋น„์ •๊ธฐ์ ์ธ ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘์„ ์œ„ํ•ด DAG์„ ์ˆ˜๋™์œผ๋กœ ํŠธ๋ฆฌ๊ฑฐํ•  ๋•Œ ์‚ฌ์šฉํ•˜๋Š” UI

linkmerce ํŒจํ‚ค์ง€๊ฐ€ ์ง€์›ํ•˜๋Š” ์ด์ปค๋จธ์Šค ๊ด€๋ จ ํ”Œ๋žซํผ์˜ ์ˆ˜์ง‘ ๋ฒ”์œ„๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

ํ”Œ๋žซํผ ๊ตฌ๋ถ„ ์ˆ˜์ง‘ ๋ฒ”์œ„
CJ๋Œ€ํ•œํ†ต์šด eFLEXs ์žฌ๊ณ 
์ฟ ํŒก ๊ด‘๊ณ ์„ผํ„ฐ ๊ด‘๊ณ 
์ฟ ํŒก ํŒ๋งค์ž์„ผํ„ฐ ์ƒํ’ˆ, ๋งค์ถœ
์ด์นด์šดํŠธ API ์ƒํ’ˆ, ์žฌ๊ณ 
๊ตฌ๊ธ€ API ๊ด‘๊ณ 
๋ฉ”ํƒ€ API ๊ด‘๊ณ 
๋„ค์ด๋ฒ„ ๋ฉ”์ธ ๊ฒ€์ƒ‰
๋„ค์ด๋ฒ„ ์˜คํ”ˆ API ๊ฒ€์ƒ‰
์‚ฌ๋ฐฉ๋„ท ์‹œ์Šคํ…œ ์ฃผ๋ฌธ, ์ƒํ’ˆ
๋„ค์ด๋ฒ„ ๊ฒ€์ƒ‰๊ด‘๊ณ  API ๊ด‘๊ณ  ๋ณด๊ณ ์„œ, ๊ด‘๊ณ  ๊ณ„์•ฝ, ๊ฒ€์ƒ‰๋Ÿ‰
๋„ค์ด๋ฒ„ ๊ด‘๊ณ ์ฃผ์„ผํ„ฐ (๊ฒ€์ƒ‰๊ด‘๊ณ ) ๊ด‘๊ณ  ๋ณด๊ณ ์„œ
๋„ค์ด๋ฒ„ ์„ฑ๊ณผํ˜• ๋””์Šคํ”Œ๋ ˆ์ด ๊ด‘๊ณ  ๊ด‘๊ณ  ๋ณด๊ณ ์„œ, ๊ด‘๊ณ  ์ˆœ์œ„
๋„ค์ด๋ฒ„ ์ปค๋จธ์Šค API ์ฃผ๋ฌธ, ์ƒํ’ˆ, ํ†ต๊ณ„
๋„ค์ด๋ฒ„ ์‡ผํ•‘ํŒŒํŠธ๋„ˆ์„ผํ„ฐ ๋งค์ถœ, ๋ฐฉ๋ฌธ ํ†ต๊ณ„, ์นดํƒˆ๋กœ๊ทธ/์ƒํ’ˆ

ํ•ต์‹ฌ ์•„ํ‚คํ…์ฒ˜

linkmerce ํŒจํ‚ค์ง€๋ฅผ ์ดํ•ดํ•˜๊ธฐ ์œ„ํ•ด ์ฃผ๋ชฉํ•ด์•ผ ํ•  ๊ฒƒ์€ Extractor โ†’ Transformer ์—ฐ๊ฒฐ์ด๋‹ค.

linkmerce ํŒจํ‚ค์ง€๋Š” ETL ํ”„๋กœ์„ธ์Šค๋ฅผ [ ์ถ”์ถœ(Extract), ๋ณ€ํ™˜(Transform), ์ ์žฌ(Load) ] 3๊ฐ€์ง€ ๋ถ€๋ถ„์œผ๋กœ ๊ตฌ๋ถ„ํ•œ๋‹ค.
Extractor์™€ Transformer๋Š” ๊ฐ๊ฐ ์ถ”์ถœ๊ณผ ๋ณ€ํ™˜ ์—ญํ• ์„ ๋‹ด๋‹นํ•œ๋‹ค.

  1. Extractor๋Š” ๋™๊ธฐ ๋˜๋Š” ๋น„๋™๊ธฐ HTTP ์„ธ์…˜์„ ํ™œ์šฉํ•œ HTTP ์š”์ฒญ์„ ๋‹ด๋‹นํ•œ๋‹ค.
    ์ผ๋ถ€ ์ž‘์—…์€ ๋งค๊ฐœ๋ณ€์ˆ˜ ๋ชฉ๋ก์— ๋Œ€ํ•ด ๋ฐ˜๋ณต ์š”์ฒญํ•˜๋Š”๋ฐ, Task๋ฅผ ํ™œ์šฉํ•ด ์ด๋Ÿฌํ•œ ๋™์ž‘์„ ์ถ”์ƒํ™”ํ•œ๋‹ค.
  2. Transformer๋Š” HTTP ์‘๋‹ต ๊ฒฐ๊ณผ๋ฅผ JSON ํ˜•์‹์œผ๋กœ ํŒŒ์‹ฑํ•˜๊ณ , DuckDB ํ…Œ์ด๋ธ”์˜ ์Šคํ‚ค๋งˆ์— ๋งž๊ฒŒ ๋ณ€ํ™˜ํ•ด ์ ์žฌํ•œ๋‹ค.
    DuckDB ์—ฐ๊ฒฐ์„ ํ†ตํ•ด BigQuery, PostgreSQL, Google Sheets ๊ฐ™์€ ์™ธ๋ถ€ ์‹œ์Šคํ…œ์— ๋ฐ์ดํ„ฐ๋ฅผ ์ ์žฌํ•˜๋Š” ํ™•์žฅ ๊ธฐ๋Šฅ์„ ๋ณ„๋„๋กœ ์ œ๊ณตํ•œ๋‹ค.

์ง€๊ธˆ๊นŒ์ง€์˜ ์„ค๋ช…์„ ์•„๋ž˜ ํ‘œ๋กœ ์ •๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.

๊ณ„์ธต ๊ตฌ๋ถ„ ์ฑ…์ž„ ๊ตฌํ˜„ ๊ฒฝ๋กœ
Extractor HTTP ์„ธ์…˜ ๊ด€๋ฆฌ, ์š”์ฒญ ๋ฉ”์‹œ์ง€ ๋นŒ๋“œ src/linkmerce/common/extract.py
Task ๋ฐ˜๋ณต ์š”์ฒญ, ์žฌ์‹œ๋„, ํŽ˜์ด์ง€๋„ค์ด์…˜ src/linkmerce/common/tasks.py
ResponseTransformer ์‘๋‹ต ๊ฒฐ๊ณผ ํŒŒ์‹ฑ, JSON ํ˜•์‹์œผ๋กœ ๋ณ€ํ™˜ src/linkmerce/common/transform.py
DuckDBTransformer DuckDB ํ…Œ์ด๋ธ” ์ƒ์„ฑ ๋ฐ ์ ์žฌ src/linkmerce/common/transform.py
DuckDBConnection DuckDB ์—ฐ๊ฒฐ ๊ด€๋ฆฌ, CRUD ์ž‘์—… ์ง€์› src/linkmerce/common/load.py
API Endpoint Extractor์™€ Transformer ์—ฐ๊ฒฐ src/linkmerce/api/common.py
Extensions DuckDB ํ…Œ์ด๋ธ”์„ ์™ธ๋ถ€ ์‹œ์Šคํ…œ๊ณผ ์—ฐ๋™ src/linkmerce/extensions/*.py

๊ณ„์ธต ๊ตฌ๋ถ„์— ๋”ฐ๋ฅธ ์›Œํฌํ”Œ๋กœ์šฐ๋Š” ๋˜ํ•œ ๋‹ค์Œ์˜ ํ๋ฆ„๋„๋กœ ์ •๋ฆฌํ•ด๋ณผ ์ˆ˜๋„ ์žˆ๋‹ค.

[API Endpoint] (api/...)
    โ†“   ์‚ฌ์šฉ์ž ํŒŒ๋ผ๋ฏธํ„ฐ ์ „๋‹ฌ
Extractor (core/.../extract.py)
    โ†“   HTTP ์š”์ฒญ ๋ฐ ์‘๋‹ต ์ˆ˜์‹ 
ResponseTransformer (core/.../transform.py)
    โ†“   JSON ํ˜•์‹์œผ๋กœ ๋ณ€ํ™˜
DuckDBTransformer (core/.../transform.py)
    โ†“   DuckDB ํ…Œ์ด๋ธ”์— ์ ์žฌ
DuckDBConnection (common/load.py)
    โ†“   ์™ธ๋ถ€ ์‹œ์Šคํ…œ์— ์ ์žฌ
BigQueryClient / PostgresClient / WorksheetClient (extensions/...)

ํ‘œ์ค€ ETL ๋ชจ๋“ˆ ๊ตฌ์กฐ

ETL ํ”„๋กœ์„ธ์Šค๋ฅผ ํ”Œ๋žซํผ, ํ˜ธ์ŠคํŠธ๋ช…, ์นดํ…Œ๊ณ ๋ฆฌ๋กœ ๊ตฌ์„ฑ๋œ 3๋‹จ๊ณ„ ํ•˜์œ„ ๊ฒฝ๋กœ๋กœ ๊ตฌ๋ถ„ํ•œ๋‹ค.

core/{platform}/{hostname}/{category}/
โ”œโ”€โ”€ extract.py
โ”œโ”€โ”€ transform.py
โ””โ”€โ”€ models.sql

์ด ๊ตฌ์กฐ๋Š” ํ•˜๋‚˜์˜ ์นดํ…Œ๊ณ ๋ฆฌ ์•ˆ์—์„œ ์ฑ…์ž„์„ ๋ช…ํ™•ํžˆ ๋ถ„๋ฆฌํ•œ๋‹ค.

  • extract.py: HTTP ์š”์ฒญ ๋ฐฉ์‹ ๊ตฌํ˜„
  • transform.py: ์‘๋‹ต ํŒŒ์‹ฑ ๋ฐ DuckDB ์ ์žฌ ๋ฐฉ์‹ ๊ตฌํ˜„
  • models.sql: CREATE, INSERT ๋“ฑ SQL ์ฟผ๋ฆฌ๋ฌธ ์„ค๊ณ„

์ด ํŒจํ„ด ๋•๋ถ„์— ํ”Œ๋žซํผ ๋˜๋Š” ํ˜ธ์ŠคํŠธ๋ณ„๋กœ ๊ตฌํ˜„์ด ๋‹ฌ๋ผ๋„, ์‹คํ–‰ ๋ฐฉ์‹๊ณผ ํ…Œ์ŠคํŠธ ๋ฐฉ์‹์€ ๋น„๊ต์  ์ผ๊ด€๋˜๊ฒŒ ์œ ์ง€๋œ๋‹ค.

ETL ํ”„๋กœ์„ธ์Šค๋ฅผ ์‹คํ–‰ํ•  ๋•Œ๋Š” ํŽธ์˜์„ฑ์„ ์œ„ํ•ด core/ ๋ชจ๋“ˆ์„ ์ˆœ์„œ๋Œ€๋กœ ํ˜ธ์ถœํ•˜์ง€ ์•Š๊ณ , ํ•˜๋‚˜์˜ API ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•œ๋‹ค.

linkmerce ํŒจํ‚ค์ง€์™€ ๊ด€๋ จ๋œ src/linkmerce/ ๊ฒฝ๋กœ์— ๋Œ€ํ•œ ์ƒ์„ธ ์„ค๋ช…์€ ๋ณ„๋„์˜ ๋ฌธ์„œ๋ฅผ ์ฐธ๊ณ ํ•œ๋‹ค.

ํ…Œ์ŠคํŠธ ์•ˆ๋‚ด

src/tests ๊ฒฝ๋กœ์—์„  ๋‹จ์ผ Extractor ๋˜๋Š” Transformer ๋‹จ์œ„๋กœ ์ •์ƒ ๋™์ž‘ํ•˜๋Š”์ง€ ํ…Œ์ŠคํŠธ๋ฅผ ๋‹ด๋‹นํ•œ๋‹ค.

  • test_extract.py: Extractor.extract() ์‹คํ–‰ ๊ฒฐ๊ณผ ์ €์žฅ
  • test_transform.py: DuckDBTransformer.parse() ๋ฐ bulk_insert() ์‹คํ–‰ ๊ฒฐ๊ณผ ์ €์žฅ
  • conftest.py: ๊ณต์šฉ ๋ฆฌ์†Œ์Šค Fixture ์ •์˜, Transformer ํ…Œ์ŠคํŠธ๋ฅผ ์ง€์›ํ•˜๋Š” TransformerHarness ์ œ๊ณต
  • results/: ๊ฐ ํ…Œ์ŠคํŠธ์˜ ์‹คํ–‰ ๊ฒฐ๊ณผ๊ฐ€ ์ €์žฅ๋˜๋Š” ๊ฒฝ๋กœ. Git ๋ฒ„์ „ ๊ด€๋ฆฌ์—์„œ๋Š” ์ œ์™ธ๋œ๋‹ค.

ํ…Œ์ŠคํŠธ์™€ ๊ด€๋ จ๋œ src/linkmerce/tests/ ๊ฒฝ๋กœ์— ๋Œ€ํ•œ ์ƒ์„ธ ์„ค๋ช…์€ ๋ณ„๋„์˜ ๋ฌธ์„œ๋ฅผ ์ฐธ๊ณ ํ•œ๋‹ค.

Airflow ์„œ๋น„์Šค ๊ตฌ์กฐ

Airflow ์‹œ์Šคํ…œ์„ ๊ตฌ์„ฑํ•˜๋Š” ์„œ๋น„์Šค ๋ชฉ๋ก์€ docker-compose.yaml ์„ค์ •์—์„œ ์ •์˜ํ•œ๋‹ค.
์šด์˜ ํ™˜๊ฒฝ์—์„œ Airflow Celery Executor ๊ตฌ์„ฑ์„ ๊ธฐ์ค€์œผ๋กœ ๋‹ค์Œ ์„œ๋น„์Šค๋“ค์ด ์‹คํ–‰๋˜๊ณ  ์žˆ๋‹ค.

  • postgres
  • redis
  • playwright
  • airflow-apiserver
  • airflow-scheduler
  • airflow-dag-processor
  • airflow-worker
  • airflow-triggerer
  • airflow-init

๋‚˜๋จธ์ง€๋Š” Apache Airflow์—์„œ ๊ธฐ๋ณธ์œผ๋กœ ์ •์˜ํ•œ ์„œ๋น„์Šค๋“ค์ด์ง€๋งŒ, playwright ์„œ๋น„์Šค๋ฅผ ์˜ˆ์™ธ์ ์œผ๋กœ ์ถ”๊ฐ€ํ–ˆ๋‹ค.
์ผ๋ถ€ ์ž‘์—…์—์„œ๋Š” playwright ์„œ๋น„์Šค๋ฅผ ํ™œ์šฉํ•ด ๋ธŒ๋ผ์šฐ์ € ๋ Œ๋”๋ง์„ ํ™œ์šฉํ•œ ์›น ์Šคํฌ๋ž˜ํ•‘์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.

Docker Compose ์‹คํ–‰์€ ๋‹ค์Œ ๋ช…๋ น์–ด ๋˜๋Š” init.sh ์Šคํฌ๋ฆฝํŠธ๋ฅผ ์‹คํ–‰ํ•œ๋‹ค.

cd airflow
docker compose up airflow-init
docker compose up -d

Airflow ์ž‘์—… ์Šค์ผ€์ค„๋ง

linkmerce ํŒจํ‚ค์ง€๋ฅผ ํ™œ์šฉํ•˜๋Š” Airflow DAG์€ ๊ณตํ†ต์ ์œผ๋กœ ๋‹ค์Œ ํ๋ฆ„์„ ๊ฐ€์ง„๋‹ค.

  1. airflow_utils ํ”Œ๋Ÿฌ๊ทธ์ธ์˜ read_config() ๋˜๋Š” read_credentials() ํ•จ์ˆ˜๋กœ ์„ค์ •๊ณผ ์ธ์ฆ ์ •๋ณด๋ฅผ ๋ถˆ๋Ÿฌ์˜จ๋‹ค.
  2. linkmerce.api.* ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•ด ์ถ”์ถœ(extract)๊ณผ ๋ณ€ํ™˜(transform) ๊ณผ์ •์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.
  3. DuckDBConnection์„ ํ†ตํ•ด ํ…Œ์ด๋ธ”์— ์ ์žฌ๋œ API ์‹คํ–‰ ๊ฒฐ๊ณผ๋ฅผ ๋ถˆ๋Ÿฌ์˜ฌ ์ˆ˜ ์žˆ๋‹ค.
  4. dual_load ํ”Œ๋Ÿฌ๊ทธ์ธ์œผ๋กœ DuckDB ํ…Œ์ด๋ธ”์„ PostgreSQL์— ๋จผ์ € ์ ์žฌํ•œ ๋’ค BigQuery์—๋„ ์ ์žฌํ•œ๋‹ค.
  5. Task ๊ฒฐ๊ณผ๋ฅผ {params: {...}, results: {...}} ๋”•์…”๋„ˆ๋ฆฌ๋กœ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

์ด๋Ÿฌํ•œ ํ๋ฆ„์„ DAG์œผ๋กœ ๊ตฌํ˜„ํ•  ๊ฒฝ์šฐ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์ฝ”๋“œ๋กœ ๋‚˜ํƒ€๋‚ผ ์ˆ˜ ์žˆ๋‹ค.

with DAG(dag_id="...") as dag:

    PATH = "platform.hostname.category"

    @task(task_id="...")
    def read_configs() -> dict:
        from airflow_utils import read_config
        return read_config(PATH, tables=True)

    @task(task_id="...")
    def read_credentials() -> list:
        from airflow_utils import read_config
        return read_config(PATH, credentials=True)["credentials"]

    @task(task_id="...", map_index_template="{{ credentials['id'] }}")
    def etl_task(credentials: dict, configs: dict, **kwargs) -> dict:
        return main(**credentials, **configs)

    def main(tables: dict[str, str], **kwargs) -> dict:
        from linkmerce.common.load import DuckDBConnection
        from linkmerce.api.platform.hostname import example_api
        from dual_load import load_table_from_duckdb

        with DuckDBConnection(tzinfo="Asia/Seoul") as conn:
            example_api(**kwargs, connection=conn)

            return {
                "params": {},
                "results": {
                    tables["table"]: load_table_from_duckdb(
                        connection = conn,
                        source_table = "table",
                        target_table = tables["table"],
                    ),
                },
            }

    (etl_task
    .partial(configs=read_configs())
    .expand(credentials=read_credentials()))

Airflow์™€ ๊ด€๋ จ๋œ airflow/ ๊ฒฝ๋กœ์— ๋Œ€ํ•œ ์ƒ์„ธ ์„ค๋ช…์€ ๋ณ„๋„์˜ ๋ฌธ์„œ๋ฅผ ์ฐธ๊ณ ํ•œ๋‹ค.

PostgreSQL ์ ์žฌ ํ™˜๊ฒฝ

postgres/ ๊ฒฝ๋กœ๋Š” Airflow MetaDB์™€ ๋ณ„๊ฐœ๋กœ ETL ๊ฒฐ๊ณผ๋ฅผ ์ ์žฌํ•˜๊ธฐ ์œ„ํ•œ ๋กœ์ปฌ PostgreSQL 18 ํ™˜๊ฒฝ์„ ์ œ๊ณตํ•œ๋‹ค.

๊ตฌ์„ฑ ์š”์†Œ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

  • Dockerfile: PostgreSQL 18, pg_partman, Apache Arrow / Parquet ๋Ÿฐํƒ€์ž„, ์ž์ฒด parquet_io ํ™•์žฅ์„ ํฌํ•จํ•œ ์ด๋ฏธ์ง€ ๋นŒ๋“œ
  • init.sql: ํ”Œ๋žซํผ๋ณ„ ์Šคํ‚ค๋งˆ์™€ ํ…Œ์ด๋ธ”, ์ผ๋ณ„ ํŒŒํ‹ฐ์…˜ ์ดˆ๊ธฐํ™”
  • partman_maintenance.sql: ์šด์˜ ์ค‘ ๋ฏธ๋ž˜์˜ ํŒŒํ‹ฐ์…˜์„ ์ƒ์„ฑํ•˜๊ธฐ ์œ„ํ•œ ์œ ์ง€๋ณด์ˆ˜ SQL
  • resources/bq_schemas.json: PostgreSQL init.sql ๊ธฐ์ค€์œผ๋กœ ์ƒ์„ฑํ•œ BigQuery ์Šคํ‚ค๋งˆ ์ฐธ๊ณ  ํŒŒ์ผ
  • resources/parquet_io.md: parquet_io ํ™•์žฅ์˜ SQL ์ธํ„ฐํŽ˜์ด์Šค์™€ ํƒ€์ž… ๋ณ€ํ™˜ ์ •์ฑ… ์„ค๋ช…

Airflow DAG์€ ๋Œ€๋ถ€๋ถ„ dual_load ํ”Œ๋Ÿฌ๊ทธ์ธ์„ ํ†ตํ•ด PostgreSQL ์ ์žฌ๋ฅผ ๋จผ์ € ์ˆ˜ํ–‰ํ•˜๊ณ , ์„ฑ๊ณตํ•˜๋ฉด BigQuery ์ ์žฌ๋ฅผ ์ด์–ด์„œ ์ˆ˜ํ–‰ํ•œ๋‹ค. PostgreSQL์€ ๋” ์—„๊ฒฉํ•œ ๊ธฐ๋ณธํ‚ค์™€ ํƒ€์ž… ์ œ์•ฝ์„ ๊ฒ€์ฆํ•˜๋Š” 1์ฐจ ์ ์žฌ ๋Œ€์ƒ์œผ๋กœ ์‚ฌ์šฉํ•˜๊ณ , BigQuery๋Š” ๊ธฐ์กด ๋ถ„์„ ํ…Œ์ด๋ธ” ์ ์žฌ ๋Œ€์ƒ์œผ๋กœ ์œ ์ง€ํ•œ๋‹ค.

PostgreSQL ์‹คํ–‰ ํ™˜๊ฒฝ์— ๋Œ€ํ•œ ์ƒ์„ธ ์„ค๋ช…์€ ๋ณ„๋„์˜ ๋ฌธ์„œ๋ฅผ ์ฐธ๊ณ ํ•œ๋‹ค.

Streamlit UI

streamlit/app.py๋Š” Airflow REST API๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ์ˆ˜๋™์œผ๋กœ DAG์„ ํŠธ๋ฆฌ๊ฑฐํ•˜๊ธฐ ์œ„ํ•œ ์šด์˜์šฉ UI๋‹ค.

Airflow UI์— ์ ‘๊ทผํ•˜์—ฌ ์ง์ ‘ DAG์„ ํŠธ๋ฆฌ๊ฑฐํ•  ์ˆ˜ ์—†๋Š” ์‡ผํ•‘๋ชฐ ์šด์˜ ๋‹ด๋‹น์ž๊ฐ€
๋น„์ •๊ธฐ์ ์ธ ๋ฐ์ดํ„ฐ ์ˆ˜์ง‘ ์ž‘์—…(์‚ฌ๋ฐฉ๋„ท ์ฃผ๋ฌธ ETL๋งŒ ์ง€์›)์—์„œ DAG์„ ์ œ์–ดํ•  ์ˆ˜ ์žˆ๊ฒŒ ๊ฐ„๋‹จํ•œ UI์„ ์ œ๊ณตํ•œ๋‹ค.

Airflow์™€๋Š” ๋ณ„๋„์˜ Docker Compose๋กœ ๋‹จ์ผ Streamlit ์„œ๋น„์Šค๋ฅผ ๊ด€๋ฆฌํ•˜๋ฉฐ,
๋กœ์ปฌ ๋„คํŠธ์›Œํฌ์—์„œ ์„œ๋ฒ„ ์•„์ดํ”ผ์™€ ํฌํŠธ๋ฅผ ์กฐํ•ฉํ•œ ์ฃผ์†Œ๋กœ ์ ‘๊ทผํ•œ๋‹ค.

Docker Compose ์‹คํ–‰์€ ๋‹ค์Œ ๋ช…๋ น์–ด๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

cd streamlit
docker compose up -d

๋น ๋ฅธ ์‹œ์ž‘

1. ํŒจํ‚ค์ง€ ๊ฐœ๋ฐœ ํ™˜๊ฒฝ

pip install -e .

ํŒจํ‚ค์ง€ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋Š” pyproject.toml์— ์ •์˜๋˜์–ด ์žˆ๋‹ค.

2. PostgreSQL ๋กœ์ปฌ ์‹คํ–‰

cd postgres
./build.sh
docker compose up -d

3. Airflow ๋กœ์ปฌ ์‹คํ–‰

cd airflow
docker compose up airflow-init
docker compose up -d

Airflow์—์„œ dual load๋ฅผ ์‹คํ–‰ํ•˜๋ ค๋ฉด postgres, bigquery Airflow Connection์„ ๋จผ์ € ๋“ฑ๋กํ•ด์•ผ ํ•œ๋‹ค.

4. ํ…Œ์ŠคํŠธ ์‹คํ–‰

pytest src/tests/test_extract.py -m extract -v -s
pytest src/tests/test_transform.py -m transform -v -s
pytest src/tests/test_load.py -m load -v -s

์˜ˆ์‹œ ์ฝ”๋“œ

from linkmerce.api.smartstore.api import marketing_channel
from linkmerce.common.load import DuckDBConnection

with DuckDBConnection(tzinfo="Asia/Seoul") as conn:
    rows = marketing_channel(
        client_id="...",
        client_secret="...",
        channel_seq="...",
        start_date="2025-01-01",
        end_date="2025-01-31",
        connection=conn,
        return_type="json",
    )

print(len(rows))

์ด ํ˜ธ์ถœ์€ API ํ•จ์ˆ˜๋ฅผ ํ†ตํ•ด Extractor์™€ DuckDBTransformer๋ฅผ ์—ฐ์‡„์ ์œผ๋กœ ์‹คํ–‰ํ•˜๊ณ ,
๊ฒฐ๊ณผ๋ฅผ DuckDB ํ…Œ์ด๋ธ”์— ์ ์žฌํ•œ ๋’ค, ํ…Œ์ด๋ธ” ํ–‰์„ ์กฐํšŒํ•˜์—ฌ list[dict] ํ˜•์‹์œผ๋กœ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

About

E-commerce API integration management

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors