Skip to content

Commit aa087b5

Browse files
authored
Merge pull request JanKaul#266 from JanKaul/populate-snapshot-summary
Populate snapshot summary
2 parents 07d3b8c + 56975c6 commit aa087b5

File tree

1 file changed

+172
-16
lines changed

1 file changed

+172
-16
lines changed

iceberg-rust/src/table/transaction/operation.rs

Lines changed: 172 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ use std::{collections::HashMap, sync::Arc};
1010
use bytes::Bytes;
1111
use futures::future;
1212
use iceberg_rust_spec::manifest_list::{
13-
manifest_list_schema_v1, manifest_list_schema_v2, Content, ManifestListEntry,
13+
manifest_list_schema_v1, manifest_list_schema_v2, Content as ManifestListContent,
14+
ManifestListEntry,
1415
};
1516
use iceberg_rust_spec::snapshot::{Operation as SnapshotOperation, Snapshot};
17+
use iceberg_rust_spec::spec::manifest::Content;
1618
use iceberg_rust_spec::spec::table_metadata::TableMetadata;
1719
use iceberg_rust_spec::spec::{
1820
manifest::{partition_value_schema, DataFile, ManifestEntry, Status},
@@ -201,12 +203,12 @@ impl Operation {
201203
// Split manifest file if limit is exceeded
202204
for (content, files, n_files) in [
203205
(
204-
Content::Data,
206+
ManifestListContent::Data,
205207
Either::Left(new_datafile_iter),
206208
n_data_files_in_group,
207209
),
208210
(
209-
Content::Deletes,
211+
ManifestListContent::Deletes,
210212
Either::Right(new_deletefile_iter),
211213
n_delete_files_in_group,
212214
),
@@ -230,14 +232,28 @@ impl Operation {
230232
(_, _) => Ok(SnapshotOperation::Overwrite),
231233
}?;
232234

235+
// Compute updated snapshot summary
236+
// Separate data files from delete files using iterators
237+
let data_files_iter = all_files.iter().filter(|f| *f.content() == Content::Data);
238+
let delete_files_iter = all_files.iter().filter(|f| *f.content() != Content::Data);
239+
240+
let mut summary_fields = update_snapshot_summary(
241+
old_snapshot.map(|s| s.summary()),
242+
data_files_iter,
243+
delete_files_iter,
244+
);
245+
246+
// Merge with any additional summary fields
247+
summary_fields.extend(additional_summary.unwrap_or_default());
248+
233249
let mut snapshot_builder = SnapshotBuilder::default();
234250
snapshot_builder
235251
.with_snapshot_id(snapshot_id)
236252
.with_manifest_list(new_manifest_list_location)
237253
.with_sequence_number(table_metadata.last_sequence_number + dsn_offset)
238254
.with_summary(Summary {
239255
operation: snapshot_operation,
240-
other: additional_summary.unwrap_or_default(),
256+
other: summary_fields,
241257
})
242258
.with_schema_id(
243259
*table_metadata
@@ -290,6 +306,13 @@ impl Operation {
290306
return Ok((None, Vec::new()));
291307
}
292308

309+
// Compute summary before moving data_files and delete_files
310+
let summary_fields = update_snapshot_summary(
311+
old_snapshot.map(|s| s.summary()),
312+
data_files.iter(),
313+
delete_files.iter(),
314+
);
315+
293316
let data_files_iter = delete_files.iter().chain(data_files.iter());
294317

295318
let mut manifest_list_writer = if let Some(manifest_list_bytes) =
@@ -339,9 +362,13 @@ impl Operation {
339362
Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
340363
> = Vec::new();
341364
for (content, files, n_files) in [
342-
(Content::Data, Either::Left(new_datafile_iter), n_data_files),
343365
(
344-
Content::Deletes,
366+
ManifestListContent::Data,
367+
Either::Left(new_datafile_iter),
368+
n_data_files,
369+
),
370+
(
371+
ManifestListContent::Deletes,
345372
Either::Right(new_deletefile_iter),
346373
n_delete_files,
347374
),
@@ -389,14 +416,20 @@ impl Operation {
389416
(_, _) => Ok(SnapshotOperation::Overwrite),
390417
}?;
391418

419+
// Merge with any additional summary fields provided by the caller
420+
let mut summary_fields = summary_fields;
421+
if let Some(additional) = additional_summary {
422+
summary_fields.extend(additional);
423+
}
424+
392425
let mut snapshot_builder = SnapshotBuilder::default();
393426
snapshot_builder
394427
.with_snapshot_id(snapshot_id)
395428
.with_manifest_list(new_manifest_list_location)
396429
.with_sequence_number(table_metadata.last_sequence_number + 1)
397430
.with_summary(Summary {
398431
operation: snapshot_operation,
399-
other: additional_summary.unwrap_or_default(),
432+
other: summary_fields,
400433
})
401434
.with_schema_id(
402435
*table_metadata
@@ -495,7 +528,7 @@ impl Operation {
495528
snapshot_id,
496529
&manifest_schema,
497530
table_metadata,
498-
Content::Data,
531+
ManifestListContent::Data,
499532
branch.as_deref(),
500533
)?;
501534

@@ -527,7 +560,7 @@ impl Operation {
527560
snapshot_id,
528561
&manifest_schema,
529562
table_metadata,
530-
Content::Data,
563+
ManifestListContent::Data,
531564
branch.as_deref(),
532565
)?;
533566

@@ -550,6 +583,23 @@ impl Operation {
550583
)
551584
.await?;
552585

586+
// For Replace operation, we're replacing all data with new files
587+
// Compute summary for the new state (not incremental)
588+
// We set old_summary to None so totals equal the new files only
589+
let data_files_iter = files.iter().filter(|f| *f.content() == Content::Data);
590+
let delete_files_iter = files.iter().filter(|f| *f.content() != Content::Data);
591+
592+
let mut summary_fields = update_snapshot_summary(
593+
None, // No old summary - this is a full replacement
594+
data_files_iter,
595+
delete_files_iter,
596+
);
597+
598+
// Merge with any additional summary fields
599+
if let Some(additional) = additional_summary {
600+
summary_fields.extend(additional);
601+
}
602+
553603
let mut snapshot_builder = SnapshotBuilder::default();
554604
snapshot_builder
555605
.with_snapshot_id(snapshot_id)
@@ -558,7 +608,7 @@ impl Operation {
558608
.with_manifest_list(new_manifest_list_location)
559609
.with_summary(Summary {
560610
operation: iceberg_rust_spec::spec::snapshot::Operation::Overwrite,
561-
other: additional_summary.unwrap_or_default(),
611+
other: summary_fields,
562612
});
563613
let snapshot = snapshot_builder.build()?;
564614

@@ -604,6 +654,13 @@ impl Operation {
604654
return Ok((None, Vec::new()));
605655
}
606656

657+
// Compute summary before moving data_files
658+
let summary_fields = update_snapshot_summary(
659+
Some(old_snapshot.summary()),
660+
data_files.iter(),
661+
std::iter::empty::<&DataFile>(), // No separate delete files in this operation
662+
);
663+
607664
let data_files_iter = data_files.iter();
608665

609666
let manifests_to_overwrite: HashSet<String> =
@@ -631,7 +688,8 @@ impl Operation {
631688
)
632689
.await?;
633690

634-
let n_splits = manifest_list_writer.n_splits(n_data_files, Content::Data);
691+
let n_splits =
692+
manifest_list_writer.n_splits(n_data_files, ManifestListContent::Data);
635693

636694
let new_datafile_iter = data_files.into_iter().map(|data_file| {
637695
ManifestEntry::builder()
@@ -649,9 +707,9 @@ impl Operation {
649707
.map(|x| x.manifest_path.clone())
650708
.ok_or(Error::NotFound("Selected manifest".to_owned()))?;
651709

652-
let data_files = files_to_overwrite.get(&selected_manifest_location);
710+
let files_to_filter = files_to_overwrite.get(&selected_manifest_location);
653711

654-
let filter = if let Some(filter_files) = data_files {
712+
let filter = if let Some(filter_files) = files_to_filter {
655713
let filter_files: HashSet<String> =
656714
filter_files.iter().map(ToOwned::to_owned).collect();
657715
Some(move |file: &Result<ManifestEntry, Error>| {
@@ -672,7 +730,7 @@ impl Operation {
672730
snapshot_id,
673731
filter,
674732
object_store.clone(),
675-
Content::Data,
733+
ManifestListContent::Data,
676734
)
677735
.await?;
678736
manifest_list_writer
@@ -686,7 +744,7 @@ impl Operation {
686744
n_splits,
687745
filter,
688746
object_store.clone(),
689-
Content::Data,
747+
ManifestListContent::Data,
690748
)
691749
.await?;
692750
manifest_list_writer
@@ -696,14 +754,20 @@ impl Operation {
696754

697755
let snapshot_operation = SnapshotOperation::Overwrite;
698756

757+
// Merge with any additional summary fields
758+
let mut summary_fields = summary_fields;
759+
if let Some(additional) = additional_summary {
760+
summary_fields.extend(additional);
761+
}
762+
699763
let mut snapshot_builder = SnapshotBuilder::default();
700764
snapshot_builder
701765
.with_snapshot_id(snapshot_id)
702766
.with_manifest_list(new_manifest_list_location)
703767
.with_sequence_number(table_metadata.last_sequence_number + 1)
704768
.with_summary(Summary {
705769
operation: snapshot_operation,
706-
other: additional_summary.unwrap_or_default(),
770+
other: summary_fields,
707771
})
708772
.with_schema_id(
709773
*table_metadata
@@ -840,6 +904,98 @@ fn prefetch_manifest_list(
840904
})
841905
}
842906

907+
/// Updates snapshot summary fields incrementally based on operation changes.
908+
///
909+
/// This function computes snapshot summary metrics by:
910+
/// 1. Parsing existing totals from the old snapshot summary
911+
/// 2. Computing deltas from the new data/delete files being added
912+
/// 3. Adding deltas to existing totals
913+
///
914+
/// # Arguments
915+
/// * `old_summary` - The summary from the parent snapshot (if it exists)
916+
/// * `data_files` - Iterator over new data files being added in this operation
917+
/// * `delete_files` - Iterator over new delete files being added in this operation
918+
///
919+
/// # Returns
920+
/// A HashMap with updated summary fields including:
921+
/// - `total-records`: Total record count across all data files
922+
/// - `total-data-files`: Total number of data files
923+
/// - `total-delete-files`: Total number of delete files
924+
/// - `total-file-size-bytes`: Total size of all files in bytes
925+
/// - `added-records`: Records added in this operation
926+
/// - `added-data-files`: Data files added in this operation
927+
/// - `added-files-size-bytes`: Size of files added in this operation
928+
pub fn update_snapshot_summary<'files>(
929+
old_summary: Option<&Summary>,
930+
data_files: impl Iterator<Item = &'files DataFile>,
931+
delete_files: impl Iterator<Item = &'files DataFile>,
932+
) -> HashMap<String, String> {
933+
// Parse existing values from old summary
934+
let old_other = old_summary.map(|s| &s.other);
935+
936+
let parse_i64 = |key: &str| -> i64 {
937+
old_other
938+
.and_then(|m| m.get(key))
939+
.and_then(|v| v.parse::<i64>().ok())
940+
.unwrap_or(0)
941+
};
942+
943+
let old_total_records = parse_i64("total-records");
944+
let old_total_data_files = parse_i64("total-data-files");
945+
let old_total_delete_files = parse_i64("total-delete-files");
946+
let old_total_file_size = parse_i64("total-file-size-bytes");
947+
948+
// Compute deltas from new files - we need to iterate to count and sum
949+
let mut added_data_files = 0i64;
950+
let mut added_records = 0i64;
951+
let mut added_data_files_size = 0i64;
952+
953+
for file in data_files {
954+
added_data_files += 1;
955+
if *file.content() == Content::Data {
956+
added_records += file.record_count();
957+
}
958+
added_data_files_size += file.file_size_in_bytes();
959+
}
960+
961+
let mut added_delete_files = 0i64;
962+
let mut added_delete_files_size = 0i64;
963+
964+
for file in delete_files {
965+
added_delete_files += 1;
966+
added_delete_files_size += file.file_size_in_bytes();
967+
}
968+
969+
let added_files_size = added_data_files_size + added_delete_files_size;
970+
971+
// Compute new totals
972+
let total_records = old_total_records + added_records;
973+
let total_data_files = old_total_data_files + added_data_files;
974+
let total_delete_files = old_total_delete_files + added_delete_files;
975+
let total_file_size = old_total_file_size + added_files_size;
976+
977+
// Build result map
978+
let mut result = HashMap::new();
979+
result.insert("total-records".to_string(), total_records.to_string());
980+
result.insert("total-data-files".to_string(), total_data_files.to_string());
981+
result.insert(
982+
"total-delete-files".to_string(),
983+
total_delete_files.to_string(),
984+
);
985+
result.insert(
986+
"total-file-size-bytes".to_string(),
987+
total_file_size.to_string(),
988+
);
989+
result.insert("added-records".to_string(), added_records.to_string());
990+
result.insert("added-data-files".to_string(), added_data_files.to_string());
991+
result.insert(
992+
"added-files-size-bytes".to_string(),
993+
added_files_size.to_string(),
994+
);
995+
996+
result
997+
}
998+
843999
pub(crate) fn new_manifest_location(
8441000
table_metadata_location: &str,
8451001
commit_uuid: &str,

0 commit comments

Comments
 (0)