Skip to content

Commit 392f1cb

Browse files
authored
fix ingestion for large sample data (#308)
1 parent 809bc32 commit 392f1cb

File tree

2 files changed

+16
-9
lines changed

2 files changed

+16
-9
lines changed

backend/deepchecks_monitoring/ee/bgtasks/object_storage_ingestor.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,22 +147,22 @@ async def run(self, task: 'Task', session: AsyncSession, resources_provider: Res
147147
for prefix in version_prefixes:
148148
for df, time in self.ingest_prefix(s3, bucket, f'{version_path}/{prefix}', version.latest_file_time,
149149
errors, version.model_id, version.id):
150+
# For each file, set lock expiry to 240 seconds from now
151+
await lock.extend(240, replace_ttl=True)
150152
await self.ingestion_backend.log_samples(version, df, session, organization_id, new_scan_time)
151153
version.latest_file_time = max(version.latest_file_time or
152154
pdl.datetime(year=1970, month=1, day=1), time)
153-
# For each file, set lock expiry to 120 seconds from now
154-
await lock.extend(120, replace_ttl=True)
155155

156156
# Ingest labels
157157
for prefix in model_prefixes:
158158
labels_path = f'{model_path}/labels/{prefix}'
159159
for df, time in self.ingest_prefix(s3, bucket, labels_path, model.latest_labels_file_time,
160160
errors, model_id):
161+
# For each file, set lock expiry to 240 seconds from now
162+
await lock.extend(240, replace_ttl=True)
161163
await self.ingestion_backend.log_labels(model, df, session, organization_id)
162164
model.latest_labels_file_time = max(model.latest_labels_file_time
163165
or pdl.datetime(year=1970, month=1, day=1), time)
164-
# For each file, set lock expiry to 120 seconds from now
165-
await lock.extend(120, replace_ttl=True)
166166

167167
model.obj_store_last_scan_time = new_scan_time
168168
except Exception: # pylint: disable=broad-except

backend/deepchecks_monitoring/logic/data_ingestion.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@
4444
__all__ = ["DataIngestionBackend", "log_data", "log_labels", "save_failures"]
4545

4646

47+
QUERY_PARAM_LIMIT = 32765
48+
49+
4750
async def log_data(
4851
model_version: ModelVersion,
4952
data: t.List[t.Dict[t.Any, t.Any]],
@@ -113,17 +116,21 @@ async def log_data(
113116
# Starting by adding to the version map
114117
versions_map = model.get_samples_versions_map_table(session)
115118
ids_to_log = [{SAMPLE_ID_COL: sample_id, "version_id": model_version.id} for sample_id in valid_data]
116-
statement = (postgresql.insert(versions_map).values(ids_to_log)
117-
.on_conflict_do_nothing(index_elements=versions_map.primary_key.columns)
118-
.returning(versions_map.c[SAMPLE_ID_COL]))
119-
ids_not_existing = set((await session.execute(statement)).scalars())
119+
ids_not_existing = set()
120+
max_messages_per_insert = QUERY_PARAM_LIMIT // 5
121+
for start_index in range(0, len(ids_to_log), max_messages_per_insert):
122+
statement = (postgresql.insert(versions_map)
123+
.values(ids_to_log[start_index:start_index + max_messages_per_insert])
124+
.on_conflict_do_nothing(index_elements=versions_map.primary_key.columns)
125+
.returning(versions_map.c[SAMPLE_ID_COL]))
126+
ids_not_existing.update((await session.execute(statement)).scalars())
120127
# Filter from the data ids which weren't logged to the versions table
121128
data_list = [sample for id, sample in valid_data.items() if id in ids_not_existing]
122129
if data_list:
123130
# Postgres driver has a limit of 32767 query params, which for 1000 messages, limits us to 32 columns. In
124131
# order to solve that we can either pre-compile the statement with bind literals, or separate to batches
125132
num_columns = len(data_list[0])
126-
max_messages_per_insert = 32767 // num_columns
133+
max_messages_per_insert = QUERY_PARAM_LIMIT // num_columns
127134
monitor_table = model_version.get_monitor_table(session)
128135
for start_index in range(0, len(data_list), max_messages_per_insert):
129136
batch = data_list[start_index:start_index + max_messages_per_insert]

0 commit comments

Comments
 (0)