diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py index c1a6fc9bc..56ed42000 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from collections.abc import Iterator, Mapping from datafusion.plan import LogicalPlan @@ -8,8 +9,11 @@ from pyiceberg.table import FileScanTask from openhouse.dataloader._table_scan_context import TableScanContext +from openhouse.dataloader._timer import log_duration from openhouse.dataloader.udf_registry import NoOpRegistry, UDFRegistry +logger = logging.getLogger(__name__) + class DataLoaderSplit: """A single data split""" @@ -44,4 +48,10 @@ def __iter__(self) -> Iterator[RecordBatch]: projected_schema=ctx.projected_schema, row_filter=ctx.row_filter, ) - yield from arrow_scan.to_record_batches([self._file_scan_task]) + it = iter(arrow_scan.to_record_batches([self._file_scan_task])) + while True: + with log_duration(logger, "record_batch %s", self._file_scan_task.file.file_path): + batch = next(it, None) + if batch is None: + break + yield batch