Skip to content

Commit 3d92880

Browse files
committed
use iterator instead of stream
1 parent 6c5c5ee commit 3d92880

File tree

4 files changed

+36
-44
lines changed

4 files changed

+36
-44
lines changed

datafusion_iceberg/src/statistics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use datafusion::{
55
physical_plan::{ColumnStatistics, Statistics},
66
scalar::ScalarValue,
77
};
8-
use futures::{future, TryFutureExt, TryStreamExt};
8+
use futures::{future, stream, TryFutureExt, TryStreamExt};
99
use iceberg_rust::spec::{
1010
manifest::{ManifestEntry, Status},
1111
schema::Schema,
@@ -48,7 +48,7 @@ pub(crate) async fn table_statistics(
4848
let datafiles = table
4949
.datafiles(&manifests, None, sequence_number_range)
5050
.await?;
51-
datafiles
51+
stream::iter(datafiles)
5252
.try_filter(|manifest| future::ready(!matches!(manifest.1.status(), Status::Deleted)))
5353
.try_fold(
5454
Statistics {

datafusion_iceberg/src/table.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use tokio::sync::{
3333
mpsc::{self},
3434
RwLock, RwLockWriteGuard,
3535
};
36-
use tracing::{instrument, Instrument};
36+
use tracing::instrument;
3737

3838
use crate::{
3939
error::Error as DataFusionIcebergError,
@@ -492,21 +492,13 @@ async fn table_scan(
492492
.await
493493
.map_err(DataFusionIcebergError::from)?
494494
.try_collect()
495-
.instrument(tracing::debug_span!(
496-
"datafusion_iceberg::collect_datafiles"
497-
))
498-
.await
499495
.map_err(DataFusionIcebergError::from)?
500496
} else {
501497
table
502498
.datafiles(&manifests, None, sequence_number_range)
503499
.await
504500
.map_err(DataFusionIcebergError::from)?
505501
.try_collect()
506-
.instrument(tracing::debug_span!(
507-
"datafusion_iceberg::collect_datafiles"
508-
))
509-
.await
510502
.map_err(DataFusionIcebergError::from)?
511503
};
512504

@@ -553,10 +545,6 @@ async fn table_scan(
553545
.await
554546
.map_err(DataFusionIcebergError::from)?
555547
.try_collect()
556-
.instrument(tracing::debug_span!(
557-
"datafusion_iceberg::collect_datafiles"
558-
))
559-
.await
560548
.map_err(DataFusionIcebergError::from)?;
561549
data_files.into_iter().for_each(|manifest| {
562550
if *manifest.1.status() != Status::Deleted {

iceberg-rust/src/table/manifest_list.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::{
1313
use apache_avro::{
1414
types::Value as AvroValue, Reader as AvroReader, Schema as AvroSchema, Writer as AvroWriter,
1515
};
16-
use futures::{future::join_all, TryFutureExt, TryStreamExt};
16+
use futures::{future::join_all, stream, TryFutureExt, TryStreamExt};
1717
use iceberg_rust_spec::{
1818
manifest::{partition_value_schema, DataFile, ManifestEntry, Status},
1919
manifest_list::{
@@ -255,7 +255,7 @@ pub async fn snapshot_column_bounds(
255255

256256
let primitive_field_ids = schema.primitive_field_ids().collect::<Vec<_>>();
257257
let n = primitive_field_ids.len();
258-
datafiles
258+
stream::iter(datafiles)
259259
.try_fold(None::<Rectangle>, |acc, (_, manifest)| {
260260
let primitive_field_ids = &primitive_field_ids;
261261
async move {

iceberg-rust/src/table/mod.rs

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@
1212
1313
use std::{io::Cursor, sync::Arc};
1414

15-
use futures::future::{self, try_join_all};
15+
use futures::future::try_join_all;
1616
use itertools::Itertools;
1717
use manifest::ManifestReader;
1818
use manifest_list::read_snapshot;
1919
use object_store::{path::Path, ObjectStore};
2020

21-
use futures::{stream, Stream, StreamExt, TryFutureExt, TryStreamExt};
21+
use futures::{stream, StreamExt, TryFutureExt, TryStreamExt};
2222
use iceberg_rust_spec::util::{self};
2323
use iceberg_rust_spec::{
2424
spec::{
@@ -262,7 +262,8 @@ impl Table {
262262
manifests: &'a [ManifestListEntry],
263263
filter: Option<Vec<bool>>,
264264
sequence_number_range: (Option<i64>, Option<i64>),
265-
) -> Result<impl Stream<Item = Result<(ManifestPath, ManifestEntry), Error>> + 'a, Error> {
265+
) -> Result<impl Iterator<Item = Result<(ManifestPath, ManifestEntry), Error>> + 'a, Error>
266+
{
266267
datafiles(
267268
self.object_store(),
268269
manifests,
@@ -279,7 +280,7 @@ impl Table {
279280
) -> Result<bool, Error> {
280281
let manifests = self.manifests(start, end).await?;
281282
let datafiles = self.datafiles(&manifests, None, (None, None)).await?;
282-
datafiles
283+
stream::iter(datafiles)
283284
.try_any(|entry| async move { !matches!(entry.1.data_file().content(), Content::Data) })
284285
.await
285286
}
@@ -311,7 +312,7 @@ async fn datafiles(
311312
manifests: &'_ [ManifestListEntry],
312313
filter: Option<Vec<bool>>,
313314
sequence_number_range: (Option<i64>, Option<i64>),
314-
) -> Result<impl Stream<Item = Result<(ManifestPath, ManifestEntry), Error>> + '_, Error> {
315+
) -> Result<impl Iterator<Item = Result<(ManifestPath, ManifestEntry), Error>> + '_, Error> {
315316
// filter manifest files according to filter vector
316317
let iter: Box<dyn Iterator<Item = &ManifestListEntry> + Send + Sync> = match filter {
317318
Some(predicate) => {
@@ -345,31 +346,34 @@ async fn datafiles(
345346

346347
let results = try_join_all(futures).await?;
347348

348-
Ok(stream::iter(results).flat_map(move |result| {
349+
Ok(results.into_iter().flat_map(move |result| {
349350
let (bytes, path, sequence_number) = result;
350351

351352
let reader = ManifestReader::new(bytes).unwrap();
352-
stream::iter(reader).try_filter_map(move |mut x| {
353-
future::ready({
354-
let sequence_number = if let Some(sequence_number) = x.sequence_number() {
355-
*sequence_number
356-
} else {
357-
*x.sequence_number_mut() = Some(sequence_number);
358-
sequence_number
359-
};
353+
reader.filter_map(move |x| {
354+
let mut x = match x {
355+
Ok(entry) => entry,
356+
Err(_) => return None,
357+
};
360358

361-
let filter = match sequence_number_range {
362-
(Some(start), Some(end)) => start < sequence_number && sequence_number <= end,
363-
(Some(start), None) => start < sequence_number,
364-
(None, Some(end)) => sequence_number <= end,
365-
_ => true,
366-
};
367-
if filter {
368-
Ok(Some((path.to_owned(), x)))
369-
} else {
370-
Ok(None)
371-
}
372-
})
359+
let sequence_number = if let Some(sequence_number) = x.sequence_number() {
360+
*sequence_number
361+
} else {
362+
*x.sequence_number_mut() = Some(sequence_number);
363+
sequence_number
364+
};
365+
366+
let filter = match sequence_number_range {
367+
(Some(start), Some(end)) => start < sequence_number && sequence_number <= end,
368+
(Some(start), None) => start < sequence_number,
369+
(None, Some(end)) => sequence_number <= end,
370+
_ => true,
371+
};
372+
if filter {
373+
Some(Ok((path.to_owned(), x)))
374+
} else {
375+
None
376+
}
373377
})
374378
}))
375379
}
@@ -390,7 +394,7 @@ pub(crate) async fn delete_all_table_files(
390394
let snapshots = &metadata.snapshots;
391395

392396
// stream::iter(datafiles.into_iter())
393-
datafiles
397+
stream::iter(datafiles)
394398
.try_for_each_concurrent(None, |datafile| {
395399
let object_store = object_store.clone();
396400
async move {

0 commit comments

Comments
 (0)