Skip to content

Commit 7b21aa6

Browse files
authored
Merge pull request #161 from ColinMaudry/hotfix/2.6.2
Hotfix/2.6.2
2 parents 2cc1768 + f72a297 commit 7b21aa6

8 files changed

Lines changed: 107 additions & 87 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
### 2.6.2 2025-12-15
2+
3+
- Réduction du nombre de tâches prefect pour réduire la charge sur la BDD et la latence
4+
- Utilisation du multithreading standard de Python plutôt que celui de Prefect
5+
- Le nom d'établissement n'est ajouté entre parenthèses que s'il est différent de celui de l'unité légale
6+
17
### 2.6.1 2025-12-14
28

39
- Séparation des fichiers de référence et des fichiers de données

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# DECP processing
22

3-
> version 2.6.0 ([notes de version](https://github.com/ColinMaudry/decp-processing/blob/main/CHANGELOG.md))
3+
> version 2.6.2 ([notes de version](https://github.com/ColinMaudry/decp-processing/blob/main/CHANGELOG.md))
44
55
Projet de traitement et de publication de meilleures données sur les marchés publics attribués en France. Vous pouvez consulter, filtrer et télécharger
66
ces données sur le site [decp.info](https://decp.info). Enfin la section [À propos](https://decp.info/a-propos) décrit les objectifs du projet et regroupe toutes les informations clés.

pyproject.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[project]
22
name = "decp-processing"
33
description = "Traitement des données des marchés publics français."
4-
version = "2.5.0"
4+
version = "2.6.2"
55
requires-python = ">= 3.9"
66
authors = [
77
{ name = "Colin Maudry", email = "colin+decp@maudry.com" }
@@ -22,7 +22,8 @@ dependencies = [
2222
"bs4",
2323
"selenium",
2424
"polars_ds",
25-
"scikit-learn"
25+
"scikit-learn",
26+
"tenacity"
2627
]
2728

2829
[project.optional-dependencies]

src/config.py

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,35 +26,30 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path:
2626
return Path(os.getenv(env) or alternative_path)
2727

2828

29-
print("""
30-
##########
31-
# Config #
32-
##########
33-
""")
34-
29+
ALL_CONFIG = {}
3530

3631
# Nombre maximal de workers utilisables par Prefect. Défaut : 16
3732
MAX_PREFECT_WORKERS = int(os.getenv("MAX_PREFECT_WORKERS", 4))
38-
print(f"{'MAX_PREFECT_WORKERS':<40}", MAX_PREFECT_WORKERS)
33+
ALL_CONFIG["MAX_PREFECT_WORKERS"] = MAX_PREFECT_WORKERS
3934

4035
# Durée avant l'expiration du cache des ressources (en heure). Défaut : 168 (7 jours)
4136
CACHE_EXPIRATION_TIME_HOURS = int(os.getenv("CACHE_EXPIRATION_TIME_HOURS", 168))
42-
print(f"{'CACHE_EXPIRATION_TIME_HOURS':<40}", CACHE_EXPIRATION_TIME_HOURS)
37+
ALL_CONFIG["CACHE_EXPIRATION_TIME_HOURS"] = CACHE_EXPIRATION_TIME_HOURS
4338

4439

4540
DATE_NOW = datetime.now().isoformat()[0:10] # YYYY-MM-DD
4641
MONTH_NOW = DATE_NOW[:7] # YYYY-MM
4742

4843
# Publication ou non des fichiers produits sur data.gouv.fr
4944
DECP_PROCESSING_PUBLISH = os.getenv("DECP_PROCESSING_PUBLISH", "").lower() == "true"
50-
print(f"{'DECP_PROCESSING_PUBLISH':<40}", DECP_PROCESSING_PUBLISH)
45+
ALL_CONFIG["DECP_PROCESSING_PUBLISH"] = DECP_PROCESSING_PUBLISH
5146

5247

5348
# Client HTTP
5449
HTTP_CLIENT = httpx.Client()
5550
HTTP_HEADERS = {
5651
"Connection": "keep-alive",
57-
"User-agent": "Projet : https://decp.info/a-propos | Client HTTP : https://pypi.org/project/httpx/",
52+
"User-agent": "decp.info",
5853
}
5954

6055
# Timeout pour la publication de chaque ressource sur data.gouv.fr
@@ -77,21 +72,22 @@ def make_path_from_env(env: str, alternative_path: Path) -> Path:
7772

7873
# Dossier racine
7974
BASE_DIR = make_path_from_env("DECP_BASE_DIR", Path(__file__).absolute().parent.parent)
80-
print(f"{'BASE_DIR':<40}", BASE_DIR)
75+
ALL_CONFIG["BASE_DIR"] = BASE_DIR
8176

8277
# Les variables configurées sur le serveur doivent avoir la priorité
8378
DATA_DIR = make_path_from_env("DECP_DATA_DIR", BASE_DIR / "data")
8479
DATA_DIR.mkdir(exist_ok=True, parents=True)
85-
print(f"{'DATA_DIR':<40}", DATA_DIR)
80+
ALL_CONFIG["DATA_DIR"] = DATA_DIR
8681

8782
RESOURCE_CACHE_DIR = make_path_from_env(
8883
"RESOURCE_CACHE_DIR", DATA_DIR / "resource_cache"
8984
)
9085
RESOURCE_CACHE_DIR.mkdir(exist_ok=True, parents=True)
86+
ALL_CONFIG["RESOURCE_CACHE_DIR"] = RESOURCE_CACHE_DIR
9187

9288
DIST_DIR = make_path_from_env("DECP_DIST_DIR", BASE_DIR / "dist")
9389
DIST_DIR.mkdir(exist_ok=True, parents=True, mode=777)
94-
print(f"{'DIST_DIR':<40}", DIST_DIR)
90+
ALL_CONFIG["DIST_DIR"] = DIST_DIR
9591

9692

9793
def make_sirene_data_dir(sirene_data_parent_dir) -> Path:
@@ -116,19 +112,19 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path:
116112
SIRENE_DATA_DIR = Path(os.path.join(BASE_DIR, SIRENE_DATA_DIR))
117113

118114
# SIRENE_DATA_DIR on ne le crée que si nécessaire, dans flows.py
119-
print(f"{'SIRENE_DATA_PARENT_DIR':<40}", SIRENE_DATA_PARENT_DIR)
120-
print(f"{'SIRENE_DATA_DIR':<40}", SIRENE_DATA_DIR)
115+
ALL_CONFIG["SIRENE_DATA_PARENT_DIR"] = SIRENE_DATA_PARENT_DIR
116+
ALL_CONFIG["SIRENE_DATA_DIR"] = SIRENE_DATA_DIR
121117

122118

123119
SIRENE_UNITES_LEGALES_URL = os.getenv("SIRENE_UNITES_LEGALES_URL", "")
124120

125121
# Mode de scraping
126122
SCRAPING_MODE = os.getenv("SCRAPING_MODE", "month")
127-
print(f"{'SCRAPING_MODE':<40}", SCRAPING_MODE)
123+
ALL_CONFIG["SCRAPING_MODE"] = SCRAPING_MODE
128124

129125
# Target (plateforme cible pour le scraping)
130126
SCRAPING_TARGET = os.getenv("SCRAPING_TARGET")
131-
print(f"{'SCRAPING_TARGET':<40}", SCRAPING_TARGET)
127+
ALL_CONFIG["SCRAPING_TARGET"] = SCRAPING_TARGET
132128

133129
# Lecture ou non des ressource en cache
134130
DECP_USE_CACHE = os.getenv("DECP_USE_CACHE", "false").lower() == "true"
@@ -138,6 +134,7 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path:
138134

139135
# Données de référence
140136
REFERENCE_DIR = BASE_DIR / "reference"
137+
ALL_CONFIG["REFERENCE_DIR"] = REFERENCE_DIR
141138

142139
# Liste et ordre des colonnes pour le mono dataframe de base (avant normalisation et spécialisation)
143140
# Sert aussi à vérifier qu'au moins ces colonnes sont présentes (d'autres peuvent être présentes en plus, les colonnes "innatendues")
@@ -169,7 +166,7 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path:
169166

170167
# Liste des ID de ressources présentes dans un dataset à traiter, au format JSON ou XML, mais exclues du traitement
171168
EXCLUDED_RESOURCES = os.getenv("EXCLUDED_RESOURCES", "").replace(" ", "")
172-
print(f"{'EXCLUDED_RESOURCES':<40}", EXCLUDED_RESOURCES)
169+
ALL_CONFIG["EXCLUDED_RESOURCES"] = EXCLUDED_RESOURCES
173170

174171
EXCLUDED_RESOURCES = EXCLUDED_RESOURCES.split(",")
175172
EXCLUDED_RESOURCES = (
@@ -191,7 +188,7 @@ def make_sirene_data_dir(sirene_data_parent_dir) -> Path:
191188

192189
# Ne traiter qu'un seul dataset identifier par son ID
193190
SOLO_DATASET = os.getenv("SOLO_DATASET", "")
194-
print(f"{'SOLO_DATASET':<40}", SOLO_DATASET)
191+
ALL_CONFIG["SOLO_DATASET"] = SOLO_DATASET
195192

196193
with open(
197194
make_path_from_env(
@@ -212,6 +209,3 @@ class DecpFormat:
212209
prefixe_json_marches: str
213210
liste_marches_ijson: sendable_list | None = None
214211
coroutine_ijson: Coroutine | None = None
215-
216-
217-
print("")

src/flows/decp_processing.py

Lines changed: 58 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
import os
22
import shutil
3+
from concurrent.futures import ThreadPoolExecutor
34

45
import polars as pl
56
import polars.selectors as cs
6-
from prefect import flow
7+
from prefect import flow, task
78
from prefect.artifacts import create_table_artifact
89
from prefect.context import get_run_context
9-
from prefect.task_runners import ConcurrentTaskRunner
1010
from prefect_email import EmailServerCredentials, email_send_message
1111

1212
from src.config import (
13+
ALL_CONFIG,
1314
BASE_DF_COLUMNS,
14-
BASE_DIR,
1515
DATE_NOW,
1616
DECP_PROCESSING_PUBLISH,
1717
DIST_DIR,
@@ -33,15 +33,19 @@
3333
concat_parquet_files,
3434
sort_columns,
3535
)
36-
from src.tasks.utils import full_resource_name, generate_stats, remove_unused_cache
36+
from src.tasks.utils import (
37+
full_resource_name,
38+
generate_stats,
39+
print_all_config,
40+
remove_unused_cache,
41+
)
3742

3843

39-
@flow(
40-
log_prints=True,
41-
task_runner=ConcurrentTaskRunner(max_workers=MAX_PREFECT_WORKERS),
42-
)
44+
@flow(log_prints=True)
4345
def decp_processing(enable_cache_removal: bool = True):
44-
print(f"🚀 Début du flow decp-processing dans base dir {BASE_DIR} ")
46+
print("🚀 Début du flow decp-processing")
47+
48+
print_all_config(ALL_CONFIG)
4549

4650
print("Liste de toutes les ressources des datasets...")
4751
resources: list[dict] = list_resources(TRACKED_DATASETS)
@@ -53,42 +57,24 @@ def decp_processing(enable_cache_removal: bool = True):
5357
available_parquet_files = set(os.listdir(RESOURCE_CACHE_DIR))
5458

5559
# Traitement parallèle des ressources par lots pour éviter la surcharge mémoire
56-
batch_size = 500
60+
batch_size = 100
5761
parquet_files = []
5862

59-
# Filtrer les ressources à traiter
63+
# Filtrer les ressources à traiter, en ne gardant que les fichiers > 100 octets
6064
resources_to_process = [r for r in resources if r["filesize"] > 100]
6165

62-
# Afin d'être sûr que je ne publie pas par erreur un jeu de données de test
63-
decp_publish = DECP_PROCESSING_PUBLISH and len(resources_to_process) > 5000
64-
6566
for i in range(0, len(resources_to_process), batch_size):
66-
batch = resources_to_process[i : i + batch_size]
67-
print(
68-
f"🗃️ Traitement du lot {i // batch_size + 1} / {len(resources_to_process) // batch_size + 1}"
67+
process_batch(
68+
available_parquet_files,
69+
batch_size,
70+
i,
71+
parquet_files,
72+
resources_artifact,
73+
resources_to_process,
6974
)
7075

71-
futures = {}
72-
for resource in batch:
73-
future = get_clean.submit(
74-
resource, resources_artifact, available_parquet_files
75-
)
76-
futures[future] = full_resource_name(resource)
77-
78-
for f in futures:
79-
try:
80-
result = f.result()
81-
if result is not None:
82-
parquet_files.append(result)
83-
except Exception as e:
84-
resource_name = futures[f]
85-
print(
86-
f"❌ Erreur de traitement de {resource_name} ({type(e).__name__}):"
87-
)
88-
print(e)
89-
90-
# Nettoyage explicite
91-
futures.clear()
76+
# Afin d'être sûr que je ne publie pas par erreur un jeu de données de test
77+
decp_publish = DECP_PROCESSING_PUBLISH and len(resources_to_process) > 5000
9278

9379
if decp_publish:
9480
create_table_artifact(
@@ -150,6 +136,40 @@ def decp_processing(enable_cache_removal: bool = True):
150136
print("☑️ Fin du flow principal decp_processing.")
151137

152138

139+
@task(retries=2)
140+
def process_batch(
141+
available_parquet_files,
142+
batch_size,
143+
i,
144+
parquet_files,
145+
resources_artifact,
146+
resources_to_process,
147+
):
148+
batch = resources_to_process[i : i + batch_size]
149+
print(
150+
f"🗃️ Traitement du lot {i // batch_size + 1} / {len(resources_to_process) // batch_size + 1}"
151+
)
152+
futures = {}
153+
with ThreadPoolExecutor(max_workers=MAX_PREFECT_WORKERS) as executor:
154+
for resource in batch:
155+
future = executor.submit(
156+
get_clean, resource, resources_artifact, available_parquet_files
157+
)
158+
futures[future] = full_resource_name(resource)
159+
160+
for future in futures:
161+
try:
162+
result = future.result()
163+
if result is not None:
164+
parquet_files.append(result)
165+
except Exception as e:
166+
resource_name = futures[future]
167+
print(f"❌ Erreur de traitement de {resource_name} ({type(e).__name__}):")
168+
print(e)
169+
# Nettoyage explicite
170+
futures.clear()
171+
172+
153173
@sirene_preprocess.on_failure
154174
@decp_processing.on_failure
155175
def notify_exception_by_email(flow, flow_run, state):

src/tasks/enrich.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import polars as pl
22
import polars.selectors as cs
33
from polars_ds import haversine
4-
from prefect import task
54

65
from src.config import SIRENE_DATA_DIR
76
from src.tasks.transform import (
@@ -29,7 +28,10 @@ def add_etablissement_data(
2928
# Si il y a un etablissement_nom (Enseigne1Etablissement ou denominationUsuelleEtablissement),
3029
# on l'ajoute au nom de l'organisme, entre parenthèses
3130
lf_sirets = lf_sirets.with_columns(
32-
pl.when(pl.col("etablissement_nom").is_not_null())
31+
pl.when(
32+
pl.col("etablissement_nom").is_not_null()
33+
& (pl.col("etablissement_nom") != pl.col(f"{type_siret}_nom"))
34+
)
3335
.then(
3436
pl.concat_str(
3537
pl.col(f"{type_siret}_nom"),
@@ -76,7 +78,6 @@ def add_unite_legale_data(
7678
return lf_sirets
7779

7880

79-
@task(log_prints=True)
8081
def enrich_from_sirene(lf: pl.LazyFrame):
8182
# Récupération des données SIRET/SIREN préparées dans sirene-preprocess()
8283
lf_etablissements = pl.scan_parquet(SIRENE_DATA_DIR / "etablissements.parquet")

0 commit comments

Comments
 (0)