Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import logging
from collections.abc import Iterator, Mapping

from datafusion.plan import LogicalPlan
Expand All @@ -8,8 +9,11 @@
from pyiceberg.table import FileScanTask

from openhouse.dataloader._table_scan_context import TableScanContext
from openhouse.dataloader._timer import log_duration
from openhouse.dataloader.udf_registry import NoOpRegistry, UDFRegistry

logger = logging.getLogger(__name__)


class DataLoaderSplit:
"""A single data split"""
Expand Down Expand Up @@ -44,4 +48,10 @@ def __iter__(self) -> Iterator[RecordBatch]:
projected_schema=ctx.projected_schema,
row_filter=ctx.row_filter,
)
yield from arrow_scan.to_record_batches([self._file_scan_task])
it = iter(arrow_scan.to_record_batches([self._file_scan_task]))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I don't think this will work since it materializes all batches. The actual expensive line is at arrow_scan.to_record_batches right?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current way works if it was truly streamed. Maybe we just add another timer for creating the iterator to cover both cases and know where all time is spent. WDYT?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its going to be switched over by this PR, though? Right now, without that change, this timing is meaningless IMO because of the materialization behavior.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm going to close this PR and add it back once we have true streaming in pyiceberg

Copy link
Copy Markdown
Collaborator

@robreeves robreeves Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we time both then we dont need to worry about knowing how PyIceberg does things. Timing both places is cheap. I think we should add it now. It will help in early performance testing and show us how much apache/iceberg-python#3046 improves performance.

while True:
with log_duration(logger, "record_batch %s", self._file_scan_task.file.file_path):
batch = next(it, None)
if batch is None:
break
yield batch