Skip to content

Commit 56975c6

Browse files
committed
use iterators instead of slice
1 parent d3a705a commit 56975c6

File tree

1 file changed

+64
-65
lines changed

1 file changed

+64
-65
lines changed

iceberg-rust/src/table/transaction/operation.rs

Lines changed: 64 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@ use std::{collections::HashMap, sync::Arc};
1010
use bytes::Bytes;
1111
use futures::future;
1212
use iceberg_rust_spec::manifest_list::{
13-
manifest_list_schema_v1, manifest_list_schema_v2, Content as ManifestListContent, ManifestListEntry,
13+
manifest_list_schema_v1, manifest_list_schema_v2, Content as ManifestListContent,
14+
ManifestListEntry,
1415
};
15-
use iceberg_rust_spec::spec::manifest::Content;
1616
use iceberg_rust_spec::snapshot::{Operation as SnapshotOperation, Snapshot};
17+
use iceberg_rust_spec::spec::manifest::Content;
1718
use iceberg_rust_spec::spec::table_metadata::TableMetadata;
1819
use iceberg_rust_spec::spec::{
1920
manifest::{partition_value_schema, DataFile, ManifestEntry, Status},
@@ -232,22 +233,14 @@ impl Operation {
232233
}?;
233234

234235
// Compute updated snapshot summary
235-
// Separate data files from delete files
236-
let data_files_vec: Vec<DataFile> = all_files
237-
.iter()
238-
.filter(|f| *f.content() == Content::Data)
239-
.cloned()
240-
.collect();
241-
let delete_files_vec: Vec<DataFile> = all_files
242-
.iter()
243-
.filter(|f| *f.content() != Content::Data)
244-
.cloned()
245-
.collect();
236+
// Separate data files from delete files using iterators
237+
let data_files_iter = all_files.iter().filter(|f| *f.content() == Content::Data);
238+
let delete_files_iter = all_files.iter().filter(|f| *f.content() != Content::Data);
246239

247240
let mut summary_fields = update_snapshot_summary(
248241
old_snapshot.map(|s| s.summary()),
249-
&data_files_vec,
250-
&delete_files_vec,
242+
data_files_iter,
243+
delete_files_iter,
251244
);
252245

253246
// Merge with any additional summary fields
@@ -316,8 +309,8 @@ impl Operation {
316309
// Compute summary before moving data_files and delete_files
317310
let summary_fields = update_snapshot_summary(
318311
old_snapshot.map(|s| s.summary()),
319-
&data_files,
320-
&delete_files,
312+
data_files.iter(),
313+
delete_files.iter(),
321314
);
322315

323316
let data_files_iter = delete_files.iter().chain(data_files.iter());
@@ -369,7 +362,11 @@ impl Operation {
369362
Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
370363
> = Vec::new();
371364
for (content, files, n_files) in [
372-
(ManifestListContent::Data, Either::Left(new_datafile_iter), n_data_files),
365+
(
366+
ManifestListContent::Data,
367+
Either::Left(new_datafile_iter),
368+
n_data_files,
369+
),
373370
(
374371
ManifestListContent::Deletes,
375372
Either::Right(new_deletefile_iter),
@@ -588,29 +585,15 @@ impl Operation {
588585

589586
// For Replace operation, we're replacing all data with new files
590587
// Compute summary for the new state (not incremental)
591-
let data_files_vec: Vec<DataFile> = files
592-
.iter()
593-
.filter(|f| *f.content() == Content::Data)
594-
.cloned()
595-
.collect();
596-
let delete_files_vec: Vec<DataFile> = files
597-
.iter()
598-
.filter(|f| *f.content() != Content::Data)
599-
.cloned()
600-
.collect();
601-
602-
// For replace, set totals to match only the new files
603-
let total_records: i64 = data_files_vec.iter().map(|f| f.record_count()).sum();
604-
let total_file_size: i64 = files.iter().map(|f| f.file_size_in_bytes()).sum();
588+
// We set old_summary to None so totals equal the new files only
589+
let data_files_iter = files.iter().filter(|f| *f.content() == Content::Data);
590+
let delete_files_iter = files.iter().filter(|f| *f.content() != Content::Data);
605591

606-
let mut summary_fields = HashMap::new();
607-
summary_fields.insert("total-records".to_string(), total_records.to_string());
608-
summary_fields.insert("total-data-files".to_string(), data_files_vec.len().to_string());
609-
summary_fields.insert("total-delete-files".to_string(), delete_files_vec.len().to_string());
610-
summary_fields.insert("total-file-size-bytes".to_string(), total_file_size.to_string());
611-
summary_fields.insert("added-records".to_string(), total_records.to_string());
612-
summary_fields.insert("added-data-files".to_string(), data_files_vec.len().to_string());
613-
summary_fields.insert("added-files-size-bytes".to_string(), total_file_size.to_string());
592+
let mut summary_fields = update_snapshot_summary(
593+
None, // No old summary - this is a full replacement
594+
data_files_iter,
595+
delete_files_iter,
596+
);
614597

615598
// Merge with any additional summary fields
616599
if let Some(additional) = additional_summary {
@@ -672,11 +655,10 @@ impl Operation {
672655
}
673656

674657
// Compute summary before moving data_files
675-
let empty_vec = Vec::new();
676658
let summary_fields = update_snapshot_summary(
677659
Some(old_snapshot.summary()),
678-
&data_files,
679-
&empty_vec, // No separate delete files in this operation
660+
data_files.iter(),
661+
std::iter::empty::<&DataFile>(), // No separate delete files in this operation
680662
);
681663

682664
let data_files_iter = data_files.iter();
@@ -706,7 +688,8 @@ impl Operation {
706688
)
707689
.await?;
708690

709-
let n_splits = manifest_list_writer.n_splits(n_data_files, ManifestListContent::Data);
691+
let n_splits =
692+
manifest_list_writer.n_splits(n_data_files, ManifestListContent::Data);
710693

711694
let new_datafile_iter = data_files.into_iter().map(|data_file| {
712695
ManifestEntry::builder()
@@ -930,8 +913,8 @@ fn prefetch_manifest_list(
930913
///
931914
/// # Arguments
932915
/// * `old_summary` - The summary from the parent snapshot (if it exists)
933-
/// * `data_files` - New data files being added in this operation
934-
/// * `delete_files` - New delete files being added in this operation
916+
/// * `data_files` - Iterator over new data files being added in this operation
917+
/// * `delete_files` - Iterator over new delete files being added in this operation
935918
///
936919
/// # Returns
937920
/// A HashMap with updated summary fields including:
@@ -942,10 +925,10 @@ fn prefetch_manifest_list(
942925
/// - `added-records`: Records added in this operation
943926
/// - `added-data-files`: Data files added in this operation
944927
/// - `added-files-size-bytes`: Size of files added in this operation
945-
pub fn update_snapshot_summary(
928+
pub fn update_snapshot_summary<'files>(
946929
old_summary: Option<&Summary>,
947-
data_files: &[DataFile],
948-
delete_files: &[DataFile],
930+
data_files: impl Iterator<Item = &'files DataFile>,
931+
delete_files: impl Iterator<Item = &'files DataFile>,
949932
) -> HashMap<String, String> {
950933
// Parse existing values from old summary
951934
let old_other = old_summary.map(|s| &s.other);
@@ -962,21 +945,28 @@ pub fn update_snapshot_summary(
962945
let old_total_delete_files = parse_i64("total-delete-files");
963946
let old_total_file_size = parse_i64("total-file-size-bytes");
964947

965-
// Compute deltas from new files
966-
let added_data_files = data_files.len() as i64;
967-
let added_delete_files = delete_files.len() as i64;
948+
// Compute deltas from new files - we need to iterate to count and sum
949+
let mut added_data_files = 0i64;
950+
let mut added_records = 0i64;
951+
let mut added_data_files_size = 0i64;
952+
953+
for file in data_files {
954+
added_data_files += 1;
955+
if *file.content() == Content::Data {
956+
added_records += file.record_count();
957+
}
958+
added_data_files_size += file.file_size_in_bytes();
959+
}
960+
961+
let mut added_delete_files = 0i64;
962+
let mut added_delete_files_size = 0i64;
968963

969-
let added_records: i64 = data_files
970-
.iter()
971-
.filter(|f| *f.content() == Content::Data)
972-
.map(|f| f.record_count())
973-
.sum();
964+
for file in delete_files {
965+
added_delete_files += 1;
966+
added_delete_files_size += file.file_size_in_bytes();
967+
}
974968

975-
let added_files_size: i64 = data_files
976-
.iter()
977-
.chain(delete_files.iter())
978-
.map(|f| f.file_size_in_bytes())
979-
.sum();
969+
let added_files_size = added_data_files_size + added_delete_files_size;
980970

981971
// Compute new totals
982972
let total_records = old_total_records + added_records;
@@ -988,11 +978,20 @@ pub fn update_snapshot_summary(
988978
let mut result = HashMap::new();
989979
result.insert("total-records".to_string(), total_records.to_string());
990980
result.insert("total-data-files".to_string(), total_data_files.to_string());
991-
result.insert("total-delete-files".to_string(), total_delete_files.to_string());
992-
result.insert("total-file-size-bytes".to_string(), total_file_size.to_string());
981+
result.insert(
982+
"total-delete-files".to_string(),
983+
total_delete_files.to_string(),
984+
);
985+
result.insert(
986+
"total-file-size-bytes".to_string(),
987+
total_file_size.to_string(),
988+
);
993989
result.insert("added-records".to_string(), added_records.to_string());
994990
result.insert("added-data-files".to_string(), added_data_files.to_string());
995-
result.insert("added-files-size-bytes".to_string(), added_files_size.to_string());
991+
result.insert(
992+
"added-files-size-bytes".to_string(),
993+
added_files_size.to_string(),
994+
);
996995

997996
result
998997
}

0 commit comments

Comments
 (0)