diff --git a/conda/base.yaml b/conda/base.yaml index 93ed9411..e321822b 100644 --- a/conda/base.yaml +++ b/conda/base.yaml @@ -10,6 +10,7 @@ dependencies: - make - sqlite - webdriver-manager + - python-chromedriver-binary - pip - pip: - epigraphhub diff --git a/containers/airflow/dags/brasil/sinan.py b/containers/airflow/dags/brasil/sinan.py index f1b45962..bfda461c 100644 --- a/containers/airflow/dags/brasil/sinan.py +++ b/containers/airflow/dags/brasil/sinan.py @@ -58,13 +58,15 @@ from epigraphhub.settings import env DEFAULT_ARGS = { - 'owner': 'epigraphhub', - 'depends_on_past': False, - 'email': ['epigraphhub@thegraphnetwork.org'], - 'email_on_failure': True, - 'email_on_retry': False, - 'retries': 2, - 'retry_delay': timedelta(minutes=2), + "owner": "epigraphhub", + "depends_on_past": False, + "email": ["epigraphhub@thegraphnetwork.org"], + "email_on_failure": True, + "email_on_retry": False, + "retries": 2, + "retry_delay": timedelta(minutes=2), + "dagrun_timeout": timedelta(minutes=600), + "max_active_runs": 2, } @@ -79,33 +81,33 @@ def task_flow_for(disease: str): from airflow.exceptions import AirflowSkipException from epigraphhub.data.brasil.sinan import normalize_str - schema = 'brasil' - tablename = 'sinan_' + normalize_str(disease) + '_m' + schema = "brasil" + tablename = "sinan_" + normalize_str(disease) + "_m" engine = get_engine(credential_name=env.db.default_credential) # Extracts year from parquet file - get_year = lambda file: int(str(file).split('.parquet')[0][-2:]) - + get_year = lambda file: int(str(file).split(".parquet")[0][-2:]) + # Uploads DataFrame into EGH db upload_df = lambda df: df.to_sql( - name=tablename, - con=engine.connect(), - schema=schema, - if_exists='append', - index=False + name=tablename, + con=engine.connect(), + schema=schema, + if_exists="append", + index=False, ) # Does nothing start = EmptyOperator( - task_id='start', + task_id="start", ) - @task(task_id='get_updates') + @task(task_id="get_updates") def dbcs_to_fetch() -> dict: # Get years that were not inserted yet with engine.connect() as conn: cur = conn.execute( - f'SELECT year FROM {schema}.sinan_update_ctl' + f"SELECT year FROM {schema}.sinan_update_ctl" f" WHERE disease = '{disease}' AND last_insert IS NULL" ) years_to_insert = cur.all() @@ -114,10 +116,10 @@ def dbcs_to_fetch() -> dict: # Get prelims with engine.connect() as conn: cur = conn.execute( - f'SELECT year FROM {schema}.sinan_update_ctl WHERE' + f"SELECT year FROM {schema}.sinan_update_ctl WHERE" f" disease = '{disease}' AND" - f' prelim IS True AND' - f' last_insert IS NOT NULL' + f" prelim IS True AND" + f" last_insert IS NOT NULL" ) prelim_years = cur.all() prelim_to_update = list(chain(*prelim_years)) @@ -125,10 +127,10 @@ def dbcs_to_fetch() -> dict: # Get years that are not prelim anymore with engine.connect() as conn: cur = conn.execute( - f'SELECT year FROM {schema}.sinan_update_ctl WHERE' + f"SELECT year FROM {schema}.sinan_update_ctl WHERE" f" disease = '{disease}' AND" - f' prelim IS False AND' - f' to_final IS True' + f" prelim IS False AND" + f" to_final IS True" ) prelim_to_final_years = cur.all() prelim_to_final = list(chain(*prelim_to_final_years)) @@ -139,40 +141,44 @@ def dbcs_to_fetch() -> dict: to_update=prelim_to_update, ) - @task(task_id='extract') + @task(task_id="extract") def extract_parquets(**kwargs) -> dict: from epigraphhub.data.brasil.sinan import extract - ti = kwargs['ti'] - years = ti.xcom_pull(task_ids='get_updates') + ti = kwargs["ti"] + years = ti.xcom_pull(task_ids="get_updates") - # Downloads dbc files into parquets to + # Downloads dbc files into parquets to extract_pqs = ( - lambda stage: extract.download( - disease=disease, years=years[stage] - ) if any(years[stage]) else () + lambda stage: extract.download(disease=disease, years=years[stage]) + if any(years[stage]) + else () ) + def to_list(ite) -> list: + if ite: + return list(ite) if not isinstance(ite, str) else [ite] + return list() + return dict( - pqs_to_insert=extract_pqs('to_insert'), - pqs_to_finals=extract_pqs('to_finals'), - pqs_to_update=extract_pqs('to_update'), + pqs_to_insert=to_list(extract_pqs("to_insert")), + pqs_to_finals=to_list(extract_pqs("to_finals")), + pqs_to_update=to_list(extract_pqs("to_update")), ) - @task(task_id='first_insertion', trigger_rule='all_done') + @task(task_id="first_insertion", trigger_rule="all_done") def upload_not_inserted(**kwargs) -> dict: from pysus.online_data import parquets_to_dataframe - ti = kwargs['ti'] - parquets = ti.xcom_pull(task_ids='extract')['pqs_to_insert'] - prelim_years = ti.xcom_pull(task_ids='get_updates')['to_update'] + ti = kwargs["ti"] + parquets = ti.xcom_pull(task_ids="extract")["pqs_to_insert"] + prelim_years = ti.xcom_pull(task_ids="get_updates")["to_update"] inserted_rows = dict() if not parquets: - logger.info('There is no new DBCs to insert on DB') + logger.info("There is no new DBCs to insert on DB") raise AirflowSkipException() - finals, prelims = ([], []) for parquet in parquets: ( @@ -182,10 +188,13 @@ def upload_not_inserted(**kwargs) -> dict: ) def insert_parquets(stage): - parquets = finals or [] if stage == 'finals' else prelims or [] - prelim = False if stage == 'finals' else True + parquets = finals or [] if stage == "finals" else prelims or [] + prelim = False if stage == "finals" else True for parquet in parquets: + if not parquet: + continue + if not any(os.listdir(parquet)): continue @@ -193,77 +202,75 @@ def insert_parquets(stage): df = parquets_to_dataframe(str(parquet)) if df.empty: - logger.error('DataFrame is empty') + logger.error("DataFrame is empty") continue - df['year'] = year - df['prelim'] = prelim + df["year"] = year + df["prelim"] = prelim df.columns = map(str.lower, df.columns) try: upload_df(df) - logger.info(f'{parquet} inserted into db') + logger.info(f"{parquet} inserted into db") except Exception as e: if "UndefinedColumn" in str(e): sql_dtypes = { - 'int64': 'INT', - 'float64': 'FLOAT', - 'string': 'TEXT', - 'object': 'TEXT', - 'datetime64[ns]': 'TEXT', + "int64": "INT", + "float64": "FLOAT", + "string": "TEXT", + "object": "TEXT", + "datetime64[ns]": "TEXT", } - + with engine.connect() as conn: cur = conn.execute( - f'SELECT * FROM {schema}.{tablename} LIMIT 0' + f"SELECT * FROM {schema}.{tablename} LIMIT 0" ) tcolumns = cur.keys() newcols = [c for c in df.columns if c not in tcolumns] - insert_cols_query = f'ALTER TABLE {schema}.{tablename}' + insert_cols_query = f"ALTER TABLE {schema}.{tablename}" for column in newcols: t = df[column].dtype sqlt = sql_dtypes[str(t)] - add_col = f' ADD COLUMN {column} {str(sqlt)}' + add_col = f" ADD COLUMN {column} {str(sqlt)}" if column == newcols[-1]: - add_col += ';' + add_col += ";" else: - add_col += ',' + add_col += "," insert_cols_query += add_col with engine.connect() as conn: conn.execute(insert_cols_query) - + with engine.connect() as conn: conn.execute( - f'UPDATE {schema}.sinan_update_ctl SET' + f"UPDATE {schema}.sinan_update_ctl SET" f" last_insert = '{ti.execution_date}' WHERE" f" disease = '{disease}' AND year = {year}" ) cur = conn.execute( - f'SELECT COUNT(*) FROM {schema}.{tablename}' - f' WHERE year = {year}' + f"SELECT COUNT(*) FROM {schema}.{tablename}" + f" WHERE year = {year}" ) inserted_rows[str(year)] = cur.fetchone()[0] if finals: - insert_parquets('finals') + insert_parquets("finals") if prelims: - insert_parquets('prelims') + insert_parquets("prelims") return inserted_rows - @task(task_id='prelims_to_finals', trigger_rule='all_done') + @task(task_id="prelims_to_finals", trigger_rule="all_done") def update_prelim_to_final(**kwargs): from pysus.online_data import parquets_to_dataframe - ti = kwargs['ti'] - parquets = ti.xcom_pull(task_ids='extract')['pqs_to_finals'] + ti = kwargs["ti"] + parquets = ti.xcom_pull(task_ids="extract")["pqs_to_finals"] if not parquets: - logger.info( - 'Not found any prelim DBC that have been passed to finals' - ) + logger.info("Not found any prelim DBC that have been passed to finals") raise AirflowSkipException() for parquet in parquets: @@ -274,39 +281,39 @@ def update_prelim_to_final(**kwargs): df = parquets_to_dataframe(parquet) if df.empty: - logger.info('DataFrame is empty') + logger.info("DataFrame is empty") continue - df['year'] = year - df['prelim'] = False + df["year"] = year + df["prelim"] = False df.columns = map(str.lower, df.columns) with engine.connect() as conn: conn.execute( - f'DELETE FROM {schema}.{tablename}' - f' WHERE year = {year}' - f' AND prelim = True' + f"DELETE FROM {schema}.{tablename}" + f" WHERE year = {year}" + f" AND prelim = True" ) upload_df(df) - logger.info(f'{parquet} data updated from prelim to final.') + logger.info(f"{parquet} data updated from prelim to final.") with engine.connect() as conn: conn.execute( - f'UPDATE {schema}.sinan_update_ctl' + f"UPDATE {schema}.sinan_update_ctl" f" SET 'to_final' = False, last_insert = '{ti.execution_date}'" f" WHERE disease = '{disease}' AND year = {year}" ) - @task(task_id='update_prelims', trigger_rule='all_done') + @task(task_id="update_prelims", trigger_rule="all_done") def update_prelim_parquets(**kwargs): from pysus.online_data import parquets_to_dataframe - ti = kwargs['ti'] - parquets = ti.xcom_pull(task_ids='extract')['pqs_to_update'] + ti = kwargs["ti"] + parquets = ti.xcom_pull(task_ids="extract")["pqs_to_update"] if not parquets: - logger.info('No preliminary parquet found to update') + logger.info("No preliminary parquet found to update") raise AirflowSkipException() for parquet in parquets: @@ -316,54 +323,56 @@ def update_prelim_parquets(**kwargs): df = parquets_to_dataframe(parquet) if df.empty: - logger.info('DataFrame is empty') + logger.info("DataFrame is empty") continue - df['year'] = year - df['prelim'] = True + df["year"] = year + df["prelim"] = True df.columns = map(str.lower, df.columns) with engine.connect() as conn: cur = conn.execute( - f'SELECT COUNT(*) FROM {schema}.{tablename}' - f' WHERE year = {year}' + f"SELECT COUNT(*) FROM {schema}.{tablename}" f" WHERE year = {year}" ) conn.execute( - f'DELETE FROM {schema}.{tablename}' - f' WHERE year = {year}' - f' AND prelim = True' + f"DELETE FROM {schema}.{tablename}" + f" WHERE year = {year}" + f" AND prelim = True" ) old_rows = cur.fetchone()[0] upload_df(df) logger.info( - f'{parquet} data updated' - '\n~~~~~ ' - f'\nRows inserted: {len(df)}' - f'\nNew rows: {len(df) - int(old_rows)}' - '\n~~~~~ ' + f"{parquet} data updated" + "\n~~~~~ " + f"\nRows inserted: {len(df)}" + f"\nNew rows: {len(df) - int(old_rows)}" + "\n~~~~~ " ) with engine.connect() as conn: conn.execute( - f'UPDATE {schema}.sinan_update_ctl' + f"UPDATE {schema}.sinan_update_ctl" f" SET last_insert = '{ti.execution_date}'" f" WHERE disease = '{disease}' AND year = {year}" ) - @task(trigger_rule='all_done') + @task(trigger_rule="all_done") def remove_parquets(**kwargs) -> None: import shutil + """ This task will be responsible for deleting all parquet files downloaded. It will receive the same parquet dirs the `upload` task receives and delete all them. """ - ti = kwargs['ti'] - pqts = ti.xcom_pull(task_ids='extract') + ti = kwargs["ti"] + pqts = ti.xcom_pull(task_ids="extract") parquet_dirs = list( - chain(*(pqts['pqs_to_insert'], pqts['pqs_to_finals'], pqts['pqs_to_update'])) + chain( + *(pqts["pqs_to_insert"], pqts["pqs_to_finals"], pqts["pqs_to_update"]) + ) ) if not parquet_dirs: @@ -371,15 +380,15 @@ def remove_parquets(**kwargs) -> None: for dir in parquet_dirs: for file in os.listdir(dir): - if str(file).endswith('.parquet'): - os.remove(file) - if str(dir).endswith('.parquet'): + if str(file).endswith(".parquet"): + os.remove(f"{dir}/{file}") + if str(dir).endswith(".parquet"): os.rmdir(dir) - logger.warning(f'{dir} removed') + logger.warning(f"{dir} removed") end = EmptyOperator( - task_id='done', - trigger_rule='all_success', + task_id="done", + trigger_rule="all_success", ) # Defining the tasks @@ -396,23 +405,24 @@ def remove_parquets(**kwargs) -> None: # DAGs # Here its where the DAGs are created, an specific case can be specified +from itertools import cycle from airflow.models.dag import DAG from epigraphhub.data.brasil.sinan import DISEASES -from random import randint + +day = cycle(range(1, 28)) for disease in DISEASES: - dag_id = 'SINAN_' + DISEASES[disease] + dag_id = "SINAN_" + DISEASES[disease] with DAG( dag_id=dag_id, default_args=DEFAULT_ARGS, - tags=['SINAN', 'Brasil', disease], - start_date=pendulum.datetime( - 2022, 2, randint(2,28) - ), + tags=["SINAN", "Brasil", disease], + start_date=pendulum.datetime(2022, 2, 1), catchup=False, - schedule='@monthly', + schedule=f"0 11 {next(day)} * *", dagrun_timeout=None, + max_active_runs=2, ): task_flow_for(disease) diff --git a/containers/compose-base.yaml b/containers/compose-base.yaml index 728908c0..43f45bc0 100644 --- a/containers/compose-base.yaml +++ b/containers/compose-base.yaml @@ -31,6 +31,7 @@ services: depends_on: - redis - flower + - postgres airflow: platform: linux/amd64 @@ -69,6 +70,7 @@ services: - redis - flower - minio + - postgres redis: platform: linux/amd64 diff --git a/tests/test_webapp.py b/tests/test_webapp.py index 3d5763ce..8313ef20 100644 --- a/tests/test_webapp.py +++ b/tests/test_webapp.py @@ -1,20 +1,18 @@ from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options -from webdriver_manager.chrome import ChromeDriverManager import unittest class TestEpiGraphHub(unittest.TestCase): @classmethod def setUpClass(cls): - chrome_options = Options() - chrome_options.add_argument("--disable-gpu") + chrome_options = webdriver.ChromeOptions() chrome_options.add_argument("--headless") - cls.driver = webdriver.Chrome( - ChromeDriverManager().install(), - options=chrome_options - ) + chrome_options.add_argument("--no-sandbox") + chrome_options.add_argument("--disable-gpu") + chrome_options.add_argument("--disable-dev-shm-usage") + cls.driver = webdriver.Chrome(options=chrome_options) cls.driver.get("http://localhost:8088") @classmethod @@ -26,7 +24,7 @@ def find_css_element(self, value): def test_title(self): title = self.driver.title - self.assertEqual(title, 'EpiGraphHub') + self.assertEqual(title, "EpiGraphHub") def test_login_as_guest(self): self.driver.implicitly_wait(0.5) @@ -35,12 +33,11 @@ def test_login_as_guest(self): login_button.click() self.driver.implicitly_wait(0.5) # guest:guest - self.find_css_element("input#username.form-control").send_keys('guest') - self.find_css_element("input#password.form-control").send_keys('guest') + self.find_css_element("input#username.form-control").send_keys("guest") + self.find_css_element("input#password.form-control").send_keys("guest") # Sign In self.find_css_element("input.btn.btn-primary.btn-block").click() -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(verbosity=2) -