Skip to content

Commit c76c431

Browse files
committed
Include SINAN_CHIK DAG
1 parent afc99a1 commit c76c431

File tree

3 files changed

+183
-12
lines changed

3 files changed

+183
-12
lines changed

containers/airflow/Dockerfile

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,8 @@ COPY --chown=airflow containers/airflow/envs/* /opt/envs/
7373

7474
USER airflow
7575

76-
ARG POSTGRES_EPIGRAPH_HOST
77-
ARG POSTGRES_EPIGRAPH_PORT
78-
ARG POSTGRES_EPIGRAPH_USER
79-
ARG POSTGRES_EPIGRAPH_PASSWORD
80-
ARG POSTGRES_EPIGRAPH_DB
81-
ENV DB_USER "${POSTGRES_EPIGRAPH_USER}:${POSTGRES_EPIGRAPH_PASSWORD}"
82-
ENV DB_URI "${DB_USER}@${POSTGRES_EPIGRAPH_HOST}:${POSTGRES_EPIGRAPH_PORT}/${POSTGRES_EPIGRAPH_DB}"
76+
ARG DB_URI
77+
ENV DB_URI "${DB_URI}"
8378

8479
RUN /usr/local/bin/python -m virtualenv /opt/envs/py310 --python="/opt/py310/bin/python3.10" \
8580
&& sed -i "s/include-system-site-packages = false/include-system-site-packages = true/" /opt/envs/py310/pyvenv.cfg \
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
import pendulum
2+
3+
from datetime import timedelta
4+
from airflow import DAG
5+
from airflow.decorators import task
6+
from airflow.models import Variable
7+
8+
9+
default_args = {
10+
"owner": "epigraphhub",
11+
"depends_on_past": False,
12+
"start_date": pendulum.datetime(2023, 1, 3),
13+
"email": ["epigraphhub@thegraphnetwork.org"],
14+
"email_on_failure": True,
15+
"email_on_retry": False,
16+
"retries": 2,
17+
"retry_delay": timedelta(minutes=1),
18+
}
19+
20+
with DAG(
21+
dag_id='SINAN_CHIK',
22+
tags=['SINAN', 'Brasil', 'Chikungunya'],
23+
schedule='0 0 3 * *',
24+
default_args=default_args,
25+
catchup=False,
26+
) as dag:
27+
28+
CONN = Variable.get('egh_conn', deserialize_json=True)
29+
30+
@task.external_python(
31+
task_id='update_chik',
32+
python='/opt/py311/bin/python3.11'
33+
)
34+
def update_chik(egh_conn: dict):
35+
"""
36+
This task will run in an isolated python environment, containing PySUS
37+
package. The task will fetch for all Chikungunya years from DATASUS and
38+
insert them into EGH database
39+
"""
40+
import os
41+
import logging
42+
import pandas as pd
43+
44+
from sqlalchemy import create_engine, text
45+
from pysus.ftp.databases.sinan import SINAN
46+
47+
sinan = SINAN().load()
48+
dis_code = "CHIK"
49+
tablename = "sinan_chikungunya_m"
50+
files = sinan.get_files(dis_code=dis_code)
51+
52+
def to_sql_include_cols(df: pd.DataFrame, prelim: bool, engine):
53+
"""
54+
Insert dataframe into db, include missing columns if needed
55+
"""
56+
df.columns = df.columns.str.lower()
57+
58+
with create_engine(egh_conn['URI']).connect() as conn:
59+
# Get columns
60+
res = conn.execute(
61+
text(f'SELECT * FROM brasil.{tablename} LIMIT 1'))
62+
sql_columns = set(i[0] for i in res.cursor.description)
63+
64+
df_columns = set(df.columns)
65+
columns_to_add = df_columns.difference(sql_columns)
66+
67+
if columns_to_add:
68+
sql_statements = [f"ALTER TABLE brasil.{tablename}"]
69+
for column in columns_to_add:
70+
sql_statements.append(
71+
f"ADD COLUMN {column} TEXT,") # object
72+
73+
sql_statements[-1] = sql_statements[-1].replace(',', ';')
74+
75+
with create_engine(egh_conn['URI']).connect() as conn:
76+
sql = ' '.join(sql_statements)
77+
logging.warning(f"EXECUTING: {sql}")
78+
conn.execute(text(sql))
79+
conn.commit()
80+
81+
for col, dtype in df.dtypes.items():
82+
if col in ['dt_notific', 'dt_sin_pri']:
83+
try:
84+
df[col] = pd.to_datetime(df[col]).dt.strftime(
85+
'%d%m%Y').astype('object')
86+
dtype = 'object'
87+
logging.warning(
88+
f"Column '{col}' of type 'DATE' has been parsed to 'TEXT'"
89+
)
90+
except ValueError as error:
91+
logging.error(
92+
f'Could not format date column correctly: {error}')
93+
df[col] = df[col].astype('object')
94+
dtype = 'object'
95+
96+
if str(dtype) != 'object':
97+
df[col] = df[col].astype('object')
98+
logging.warning(
99+
f"Column '{col}' of type '{dtype}' has been parsed to 'object'"
100+
)
101+
102+
df['year'] = year
103+
df['prelim'] = prelim
104+
105+
df.to_sql(
106+
name=tablename,
107+
con=engine,
108+
schema="brasil",
109+
if_exists='append',
110+
index=False
111+
)
112+
113+
def insert_parquets(parquet_dir: str, year: int, prelim: bool):
114+
"""
115+
Insert parquet dir into database using its chunks. Delete the chunk
116+
and the directory after insertion
117+
"""
118+
for parquet in os.listdir(parquet_dir):
119+
file = os.path.join(parquet_dir, parquet)
120+
df = pd.read_parquet(str(file), engine='fastparquet')
121+
122+
to_sql_include_cols(df, prelim, create_engine(egh_conn['URI']))
123+
logging.warning(f"{len(df)} rows inserted into {tablename}")
124+
125+
del df
126+
os.remove(file)
127+
os.rmdir(parquets.path)
128+
129+
f_stage = {}
130+
for file in files:
131+
code, year = sinan.format(file)
132+
stage = 'prelim' if 'PRELIM' in file.path else 'final'
133+
134+
if not stage in f_stage:
135+
f_stage[stage] = [year]
136+
else:
137+
f_stage[stage].append(year)
138+
139+
for year in f_stage['final']:
140+
# Check if final is already in DB
141+
with create_engine(egh_conn['URI']).connect() as conn:
142+
cur = conn.execute(text(
143+
f'SELECT COUNT(*) FROM brasil.{tablename}'
144+
f" WHERE year = '{year}' AND prelim = False"
145+
))
146+
count = cur.fetchone()[0]
147+
148+
logging.info(f"Final year {year}: {count}")
149+
150+
if not count:
151+
# Check on prelims
152+
with create_engine(egh_conn['URI']).connect() as conn:
153+
cur = conn.execute(text(
154+
f'SELECT COUNT(*) FROM brasil.{tablename}'
155+
f" WHERE year = '{year}' AND prelim = True"
156+
))
157+
count = cur.fetchone()[0]
158+
159+
if count:
160+
# Update prelim to final
161+
cur = conn.execute(text(
162+
f'DELETE FROM brasil.{tablename}'
163+
f" WHERE year = '{year}' AND prelim = True"
164+
))
165+
166+
parquets = sinan.download(sinan.get_files(dis_code, year))
167+
insert_parquets(parquets.path, year, False)
168+
169+
for year in f_stage['prelim']:
170+
with create_engine(egh_conn['URI']).connect() as conn:
171+
# Update prelim
172+
cur = conn.execute(text(
173+
f'DELETE FROM brasil.{tablename}'
174+
f" WHERE year = '{year}' AND prelim = True"
175+
))
176+
177+
parquets = sinan.download(sinan.get_files(dis_code, year))
178+
insert_parquets(parquets.path, year, True)
179+
180+
update_chik(CONN)

containers/compose-airflow.yaml

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,7 @@ x-airflow-common:
55
context: ..
66
dockerfile: containers/airflow/Dockerfile
77
args:
8-
POSTGRES_EPIGRAPH_HOST: ${POSTGRES_EPIGRAPH_HOST}
9-
POSTGRES_EPIGRAPH_PORT: ${POSTGRES_EPIGRAPH_PORT}
10-
POSTGRES_EPIGRAPH_USER: ${POSTGRES_EPIGRAPH_USER}
11-
POSTGRES_EPIGRAPH_PASSWORD: ${POSTGRES_EPIGRAPH_PASSWORD}
12-
POSTGRES_EPIGRAPH_DB: ${POSTGRES_EPIGRAPH_DB}
8+
DB_URI: ${DB_URI}
139
environment:
1410
&airflow-common-env
1511
AIRFLOW_HOME: /opt/airflow

0 commit comments

Comments
 (0)