Skip to content

Commit f65bf64

Browse files
committed
Merge branch 'hotfix/2.6.4'
2 parents f2fcab9 + 7a8d9ae commit f65bf64

20 files changed

Lines changed: 404 additions & 236 deletions

CHANGELOG.md

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
1-
### 2.6.3 2025-12-16
1+
#### 2.6.4 2025-12-19
2+
3+
- Tri et numérotation des modifications après la concaténation plutôt que par ressource, pour réduire le nombre de doublons ([#156](https://github.com/ColinMaudry/decp-processing/issues/156))
4+
- Utilisation du logger de prefect plûtot que `log_prints=True`
5+
6+
#### 2.6.3 2025-12-16
27

38
- Téléchargement des ressources plus résilient aux erreurs ([tenacity](https://tenacity.readthedocs.io/en/latest/))
49
- Téléchargement des données établissements plus résilient aux erreurs ([tenacity](https://tenacity.readthedocs.io/en/latest/))
510

6-
### 2.6.2 2025-12-15
11+
#### 2.6.2 2025-12-15
712

813
- Réduction du nombre de tâches prefect pour réduire la charge sur la BDD et la latence
914
- Utilisation du multithreading standard de Python plutôt que celui de Prefect
1015
- Le nom d'établissement n'est ajouté entre parenthèses que s'il est différent de celui de l'unité légale
1116

12-
### 2.6.1 2025-12-14
17+
#### 2.6.1 2025-12-14
1318

1419
- Séparation des fichiers de référence et des fichiers de données
1520
- Réorganisation des variables d'environnement

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,13 @@ testpaths = [
4242
"tests",
4343
]
4444
env = [
45+
"LOG_LEVEL=DEBUG",
4546
"DATASETS_REFERENCE_FILEPATH=tests/data/source_datasets_test.json",
4647
"SIRENE_DATA_DIR=tests/data/sirene",
4748
"PREFECT_API_URL=",
4849
"DECP_PROCESSING_PUBLISH=",
49-
"DECP_USE_CACHE=false"
50+
"DECP_USE_CACHE=false",
51+
"MAX_PREFECT_WORKERS=1"
5052
]
5153
addopts = "-p no:warnings"
5254

src/config.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,14 @@
99
import httpx
1010
from dotenv import find_dotenv, load_dotenv
1111
from ijson import sendable_list
12+
from prefect.logging import get_logger
1213
from prefect.variables import Variable
1314

15+
logger = get_logger(__name__)
16+
1417
dotenv_path = find_dotenv()
1518
if dotenv_path == "":
16-
print("Création du fichier .env à partir de template.env")
19+
logger.info("Création du fichier .env à partir de template.env")
1720
template_dotenv_path = Path(find_dotenv("template.env"))
1821
dotenv_path = template_dotenv_path.with_name(".env")
1922
shutil.copyfile(template_dotenv_path, dotenv_path)
@@ -28,6 +31,9 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path:
2831

2932
ALL_CONFIG = {}
3033

34+
# Niveau des logs
35+
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
36+
3137
# Nombre maximal de workers utilisables par Prefect. Défaut : 16
3238
MAX_PREFECT_WORKERS = int(os.getenv("MAX_PREFECT_WORKERS", 4))
3339
ALL_CONFIG["MAX_PREFECT_WORKERS"] = MAX_PREFECT_WORKERS

src/flows/decp_processing.py

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
DATE_NOW,
1616
DECP_PROCESSING_PUBLISH,
1717
DIST_DIR,
18+
LOG_LEVEL,
1819
MAX_PREFECT_WORKERS,
1920
PREFECT_API_URL,
2021
RESOURCE_CACHE_DIR,
@@ -32,22 +33,26 @@
3233
calculate_naf_cpv_matching,
3334
concat_parquet_files,
3435
sort_columns,
36+
sort_modifications,
3537
)
3638
from src.tasks.utils import (
3739
full_resource_name,
3840
generate_stats,
41+
get_logger,
3942
print_all_config,
4043
remove_unused_cache,
4144
)
4245

4346

4447
@flow(log_prints=True)
4548
def decp_processing(enable_cache_removal: bool = True):
46-
print("🚀 Début du flow decp-processing")
49+
logger = get_logger(level=LOG_LEVEL)
50+
51+
logger.info("🚀 Début du flow decp-processing")
4752

4853
print_all_config(ALL_CONFIG)
4954

50-
print("Liste de toutes les ressources des datasets...")
55+
logger.info("Liste de toutes les ressources des datasets...")
5156
resources: list[dict] = list_resources(TRACKED_DATASETS)
5257

5358
# Initialisation du tableau des artifacts de ressources
@@ -84,10 +89,13 @@ def decp_processing(enable_cache_removal: bool = True):
8489
)
8590
del resources_artifact
8691

87-
print("Fusion des dataframes...")
92+
logger.info("Concaténation des dataframes...")
8893
lf: pl.LazyFrame = concat_parquet_files(parquet_files)
8994

90-
print("Ajout des données SIRENE...")
95+
logger.info("Tri des modifications...")
96+
lf = sort_modifications(lf)
97+
98+
logger.info("Ajout des données SIRENE...")
9199
# Preprocessing des données SIRENE si :
92100
# - le dossier n'existe pas encore (= les données n'ont pas déjà été preprocessed ce mois-ci)
93101
# - on est au moins le 5 du mois (pour être sûr que les données SIRENE ont été mises à jour sur data.gouv.fr)
@@ -104,17 +112,19 @@ def decp_processing(enable_cache_removal: bool = True):
104112
sink_to_files(lf, DIST_DIR / "decp", file_format="parquet")
105113
lf: pl.LazyFrame = pl.scan_parquet(DIST_DIR / "decp.parquet")
106114

107-
print("Ajout de la colonne 'dureeRestanteMois'...")
115+
logger.info("Ajout de la colonne 'dureeRestanteMois'...")
108116
lf = add_duree_restante(lf)
109117

110-
print("Génération des probabilités NAF/CPV...")
118+
logger.info("Génération des probabilités NAF/CPV...")
111119
calculate_naf_cpv_matching(lf)
112120
lf = lf.drop(cs.starts_with("activite"))
113121

114-
print("Génération de l'artefact (statistiques) sur le base df...")
122+
logger.info("Génération de l'artefact (statistiques) sur le base df...")
115123
generate_stats(lf)
116124

117-
print("Génération du schéma et enregistrement des DECP aux formats CSV, Parquet...")
125+
logger.info(
126+
"Génération du schéma et enregistrement des DECP aux formats CSV, Parquet..."
127+
)
118128
lf: pl.LazyFrame = sort_columns(lf, BASE_DF_COLUMNS)
119129
generate_final_schema(lf)
120130
sink_to_files(lf, DIST_DIR / "decp")
@@ -124,16 +134,16 @@ def decp_processing(enable_cache_removal: bool = True):
124134
# make_data_tables()
125135

126136
if decp_publish:
127-
print("Publication sur data.gouv.fr...")
137+
logger.info("Publication sur data.gouv.fr...")
128138
publish_to_datagouv()
129139
else:
130-
print("Publication sur data.gouv.fr désactivée.")
140+
logger.info("Publication sur data.gouv.fr désactivée.")
131141

132142
if enable_cache_removal:
133-
print("Suppression des fichiers de cache inutilisés...")
143+
logger.info("Suppression des fichiers de cache inutilisés...")
134144
remove_unused_cache()
135145

136-
print("☑️ Fin du flow principal decp_processing.")
146+
logger.info("☑️ Fin du flow principal decp_processing.")
137147

138148

139149
@task(retries=2)
@@ -145,8 +155,9 @@ def process_batch(
145155
resources_artifact,
146156
resources_to_process,
147157
):
158+
logger = get_logger(level=LOG_LEVEL)
148159
batch = resources_to_process[i : i + batch_size]
149-
print(
160+
logger.info(
150161
f"🗃️ Traitement du lot {i // batch_size + 1} / {len(resources_to_process) // batch_size + 1}"
151162
)
152163
futures = {}
@@ -164,8 +175,10 @@ def process_batch(
164175
parquet_files.append(result)
165176
except Exception as e:
166177
resource_name = futures[future]
167-
print(f"❌ Erreur de traitement de {resource_name} ({type(e).__name__}):")
168-
print(e)
178+
logger.error(
179+
f"❌ Erreur de traitement de {resource_name} ({type(e).__name__}):"
180+
)
181+
logger.info(e)
169182
# Nettoyage explicite
170183
futures.clear()
171184

src/flows/get_cog.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
import polars as pl
22
from prefect import flow
33

4-
from src.config import DATA_DIR
4+
from src.config import DATA_DIR, LOG_LEVEL
55
from src.tasks.get import get_insee_cog_data
6+
from src.tasks.utils import get_logger
67

78

89
@flow(log_prints=True)
910
def get_cog():
1011
"""Téléchargement et préparation des données du Code Officiel Géographique"""
1112

12-
print("Téléchargement et préparation des données du Code Officiel Géographique...")
13+
logger = get_logger(level=LOG_LEVEL)
14+
logger.info(
15+
"Téléchargement et préparation des données du Code Officiel Géographique..."
16+
)
1317

1418
# # # # # # # # #
1519
# Communes #

src/flows/scrap.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,25 @@
33

44
from prefect import flow
55

6-
from src.config import DATE_NOW, DIST_DIR, MONTH_NOW, SCRAPING_MODE, SCRAPING_TARGET
6+
from src.config import (
7+
DATE_NOW,
8+
DIST_DIR,
9+
LOG_LEVEL,
10+
MONTH_NOW,
11+
SCRAPING_MODE,
12+
SCRAPING_TARGET,
13+
)
714
from src.tasks.scrap import scrap_aws_month, scrap_marches_securises_month
15+
from src.tasks.utils import get_logger
816

917

1018
@flow(log_prints=True)
1119
def scrap(target: str = None, mode: str = None, month=None, year=None):
20+
logger = get_logger(level=LOG_LEVEL)
1221
# Remise à zéro du dossier dist
1322
dist_dir: Path = DIST_DIR / target
1423
if dist_dir.exists():
15-
print(f"Suppression de {dist_dir}...")
24+
logger.debug(f"Suppression de {dist_dir}...")
1625
rmtree(dist_dir)
1726
else:
1827
dist_dir.mkdir(parents=True)
@@ -26,7 +35,7 @@ def scrap(target: str = None, mode: str = None, month=None, year=None):
2635
elif target == "marches-securises.fr":
2736
scrap_target_month = scrap_marches_securises_month
2837
else:
29-
print("Quel target ?")
38+
logger.error("Quel target ?")
3039
raise ValueError
3140

3241
current_year = DATE_NOW[:4]
@@ -52,4 +61,4 @@ def scrap(target: str = None, mode: str = None, month=None, year=None):
5261
scrap(target=target, mode="year", year=str(year))
5362

5463
else:
55-
print("Mauvaise configuration")
64+
logger.error("Mauvaise configuration")

src/flows/sirene_preprocess.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
from prefect import flow
22
from prefect.transactions import transaction
33

4-
from src.config import SIRENE_DATA_DIR
4+
from src.config import LOG_LEVEL, SIRENE_DATA_DIR
55
from src.flows.get_cog import get_cog
66
from src.tasks.get import get_etablissements, get_unite_legales
77
from src.tasks.transform import prepare_etablissements
8-
from src.tasks.utils import create_sirene_data_dir
8+
from src.tasks.utils import create_sirene_data_dir, get_logger
99

1010

1111
@flow(log_prints=True)
@@ -14,7 +14,9 @@ def sirene_preprocess():
1414
Pour chaque ressource (unités légales, établissements), un fichier parquet est produit.
1515
"""
1616

17-
print("🚀 Pré-traitement des données SIRENE")
17+
logger = get_logger(level=LOG_LEVEL)
18+
19+
logger.info("🚀 Pré-traitement des données SIRENE")
1820
# Soit les tâches de ce flow vont au bout (success), soit le dossier SIRENE_DATA_DIR est supprimé (voir remove_sirene_data_dir())
1921
with transaction():
2022
create_sirene_data_dir()
@@ -25,18 +27,18 @@ def sirene_preprocess():
2527
# préparer les données unités légales
2628
processed_ul_parquet_path = SIRENE_DATA_DIR / "unites_legales.parquet"
2729
if not processed_ul_parquet_path.exists():
28-
print("Téléchargement et préparation des unités légales...")
30+
logger.info("Téléchargement et préparation des unités légales...")
2931
get_unite_legales(processed_ul_parquet_path)
3032
else:
31-
print(processed_ul_parquet_path, " existe, skipping.")
33+
logger.info(processed_ul_parquet_path, " existe, skipping.")
3234

3335
# préparer les données établissements
3436
processed_etab_parquet_path = SIRENE_DATA_DIR / "etablissements.parquet"
3537
if not processed_etab_parquet_path.exists():
36-
print("Téléchargement et préparation des établissements...")
38+
logger.info("Téléchargement et préparation des établissements...")
3739
lf = get_etablissements()
3840
prepare_etablissements(lf).sink_parquet(processed_etab_parquet_path)
3941
else:
40-
print(processed_etab_parquet_path, " existe, skipping.")
42+
logger.info(processed_etab_parquet_path, " existe, skipping.")
4143

42-
print("☑️ Fin du flow sirene_preprocess.")
44+
logger.info("☑️ Fin du flow sirene_preprocess.")

0 commit comments

Comments
 (0)