Skip to content

Commit d3a705a

Browse files
committed
populate snapshot summary
1 parent 07d3b8c commit d3a705a

File tree

1 file changed

+173
-16
lines changed

1 file changed

+173
-16
lines changed

iceberg-rust/src/table/transaction/operation.rs

Lines changed: 173 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ use std::{collections::HashMap, sync::Arc};
1010
use bytes::Bytes;
1111
use futures::future;
1212
use iceberg_rust_spec::manifest_list::{
13-
manifest_list_schema_v1, manifest_list_schema_v2, Content, ManifestListEntry,
13+
manifest_list_schema_v1, manifest_list_schema_v2, Content as ManifestListContent, ManifestListEntry,
1414
};
15+
use iceberg_rust_spec::spec::manifest::Content;
1516
use iceberg_rust_spec::snapshot::{Operation as SnapshotOperation, Snapshot};
1617
use iceberg_rust_spec::spec::table_metadata::TableMetadata;
1718
use iceberg_rust_spec::spec::{
@@ -201,12 +202,12 @@ impl Operation {
201202
// Split manifest file if limit is exceeded
202203
for (content, files, n_files) in [
203204
(
204-
Content::Data,
205+
ManifestListContent::Data,
205206
Either::Left(new_datafile_iter),
206207
n_data_files_in_group,
207208
),
208209
(
209-
Content::Deletes,
210+
ManifestListContent::Deletes,
210211
Either::Right(new_deletefile_iter),
211212
n_delete_files_in_group,
212213
),
@@ -230,14 +231,36 @@ impl Operation {
230231
(_, _) => Ok(SnapshotOperation::Overwrite),
231232
}?;
232233

234+
// Compute updated snapshot summary
235+
// Separate data files from delete files
236+
let data_files_vec: Vec<DataFile> = all_files
237+
.iter()
238+
.filter(|f| *f.content() == Content::Data)
239+
.cloned()
240+
.collect();
241+
let delete_files_vec: Vec<DataFile> = all_files
242+
.iter()
243+
.filter(|f| *f.content() != Content::Data)
244+
.cloned()
245+
.collect();
246+
247+
let mut summary_fields = update_snapshot_summary(
248+
old_snapshot.map(|s| s.summary()),
249+
&data_files_vec,
250+
&delete_files_vec,
251+
);
252+
253+
// Merge with any additional summary fields
254+
summary_fields.extend(additional_summary.unwrap_or_default());
255+
233256
let mut snapshot_builder = SnapshotBuilder::default();
234257
snapshot_builder
235258
.with_snapshot_id(snapshot_id)
236259
.with_manifest_list(new_manifest_list_location)
237260
.with_sequence_number(table_metadata.last_sequence_number + dsn_offset)
238261
.with_summary(Summary {
239262
operation: snapshot_operation,
240-
other: additional_summary.unwrap_or_default(),
263+
other: summary_fields,
241264
})
242265
.with_schema_id(
243266
*table_metadata
@@ -290,6 +313,13 @@ impl Operation {
290313
return Ok((None, Vec::new()));
291314
}
292315

316+
// Compute summary before moving data_files and delete_files
317+
let summary_fields = update_snapshot_summary(
318+
old_snapshot.map(|s| s.summary()),
319+
&data_files,
320+
&delete_files,
321+
);
322+
293323
let data_files_iter = delete_files.iter().chain(data_files.iter());
294324

295325
let mut manifest_list_writer = if let Some(manifest_list_bytes) =
@@ -339,9 +369,9 @@ impl Operation {
339369
Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>,
340370
> = Vec::new();
341371
for (content, files, n_files) in [
342-
(Content::Data, Either::Left(new_datafile_iter), n_data_files),
372+
(ManifestListContent::Data, Either::Left(new_datafile_iter), n_data_files),
343373
(
344-
Content::Deletes,
374+
ManifestListContent::Deletes,
345375
Either::Right(new_deletefile_iter),
346376
n_delete_files,
347377
),
@@ -389,14 +419,20 @@ impl Operation {
389419
(_, _) => Ok(SnapshotOperation::Overwrite),
390420
}?;
391421

422+
// Merge with any additional summary fields provided by the caller
423+
let mut summary_fields = summary_fields;
424+
if let Some(additional) = additional_summary {
425+
summary_fields.extend(additional);
426+
}
427+
392428
let mut snapshot_builder = SnapshotBuilder::default();
393429
snapshot_builder
394430
.with_snapshot_id(snapshot_id)
395431
.with_manifest_list(new_manifest_list_location)
396432
.with_sequence_number(table_metadata.last_sequence_number + 1)
397433
.with_summary(Summary {
398434
operation: snapshot_operation,
399-
other: additional_summary.unwrap_or_default(),
435+
other: summary_fields,
400436
})
401437
.with_schema_id(
402438
*table_metadata
@@ -495,7 +531,7 @@ impl Operation {
495531
snapshot_id,
496532
&manifest_schema,
497533
table_metadata,
498-
Content::Data,
534+
ManifestListContent::Data,
499535
branch.as_deref(),
500536
)?;
501537

@@ -527,7 +563,7 @@ impl Operation {
527563
snapshot_id,
528564
&manifest_schema,
529565
table_metadata,
530-
Content::Data,
566+
ManifestListContent::Data,
531567
branch.as_deref(),
532568
)?;
533569

@@ -550,6 +586,37 @@ impl Operation {
550586
)
551587
.await?;
552588

589+
// For Replace operation, we're replacing all data with new files
590+
// Compute summary for the new state (not incremental)
591+
let data_files_vec: Vec<DataFile> = files
592+
.iter()
593+
.filter(|f| *f.content() == Content::Data)
594+
.cloned()
595+
.collect();
596+
let delete_files_vec: Vec<DataFile> = files
597+
.iter()
598+
.filter(|f| *f.content() != Content::Data)
599+
.cloned()
600+
.collect();
601+
602+
// For replace, set totals to match only the new files
603+
let total_records: i64 = data_files_vec.iter().map(|f| f.record_count()).sum();
604+
let total_file_size: i64 = files.iter().map(|f| f.file_size_in_bytes()).sum();
605+
606+
let mut summary_fields = HashMap::new();
607+
summary_fields.insert("total-records".to_string(), total_records.to_string());
608+
summary_fields.insert("total-data-files".to_string(), data_files_vec.len().to_string());
609+
summary_fields.insert("total-delete-files".to_string(), delete_files_vec.len().to_string());
610+
summary_fields.insert("total-file-size-bytes".to_string(), total_file_size.to_string());
611+
summary_fields.insert("added-records".to_string(), total_records.to_string());
612+
summary_fields.insert("added-data-files".to_string(), data_files_vec.len().to_string());
613+
summary_fields.insert("added-files-size-bytes".to_string(), total_file_size.to_string());
614+
615+
// Merge with any additional summary fields
616+
if let Some(additional) = additional_summary {
617+
summary_fields.extend(additional);
618+
}
619+
553620
let mut snapshot_builder = SnapshotBuilder::default();
554621
snapshot_builder
555622
.with_snapshot_id(snapshot_id)
@@ -558,7 +625,7 @@ impl Operation {
558625
.with_manifest_list(new_manifest_list_location)
559626
.with_summary(Summary {
560627
operation: iceberg_rust_spec::spec::snapshot::Operation::Overwrite,
561-
other: additional_summary.unwrap_or_default(),
628+
other: summary_fields,
562629
});
563630
let snapshot = snapshot_builder.build()?;
564631

@@ -604,6 +671,14 @@ impl Operation {
604671
return Ok((None, Vec::new()));
605672
}
606673

674+
// Compute summary before moving data_files
675+
let empty_vec = Vec::new();
676+
let summary_fields = update_snapshot_summary(
677+
Some(old_snapshot.summary()),
678+
&data_files,
679+
&empty_vec, // No separate delete files in this operation
680+
);
681+
607682
let data_files_iter = data_files.iter();
608683

609684
let manifests_to_overwrite: HashSet<String> =
@@ -631,7 +706,7 @@ impl Operation {
631706
)
632707
.await?;
633708

634-
let n_splits = manifest_list_writer.n_splits(n_data_files, Content::Data);
709+
let n_splits = manifest_list_writer.n_splits(n_data_files, ManifestListContent::Data);
635710

636711
let new_datafile_iter = data_files.into_iter().map(|data_file| {
637712
ManifestEntry::builder()
@@ -649,9 +724,9 @@ impl Operation {
649724
.map(|x| x.manifest_path.clone())
650725
.ok_or(Error::NotFound("Selected manifest".to_owned()))?;
651726

652-
let data_files = files_to_overwrite.get(&selected_manifest_location);
727+
let files_to_filter = files_to_overwrite.get(&selected_manifest_location);
653728

654-
let filter = if let Some(filter_files) = data_files {
729+
let filter = if let Some(filter_files) = files_to_filter {
655730
let filter_files: HashSet<String> =
656731
filter_files.iter().map(ToOwned::to_owned).collect();
657732
Some(move |file: &Result<ManifestEntry, Error>| {
@@ -672,7 +747,7 @@ impl Operation {
672747
snapshot_id,
673748
filter,
674749
object_store.clone(),
675-
Content::Data,
750+
ManifestListContent::Data,
676751
)
677752
.await?;
678753
manifest_list_writer
@@ -686,7 +761,7 @@ impl Operation {
686761
n_splits,
687762
filter,
688763
object_store.clone(),
689-
Content::Data,
764+
ManifestListContent::Data,
690765
)
691766
.await?;
692767
manifest_list_writer
@@ -696,14 +771,20 @@ impl Operation {
696771

697772
let snapshot_operation = SnapshotOperation::Overwrite;
698773

774+
// Merge with any additional summary fields
775+
let mut summary_fields = summary_fields;
776+
if let Some(additional) = additional_summary {
777+
summary_fields.extend(additional);
778+
}
779+
699780
let mut snapshot_builder = SnapshotBuilder::default();
700781
snapshot_builder
701782
.with_snapshot_id(snapshot_id)
702783
.with_manifest_list(new_manifest_list_location)
703784
.with_sequence_number(table_metadata.last_sequence_number + 1)
704785
.with_summary(Summary {
705786
operation: snapshot_operation,
706-
other: additional_summary.unwrap_or_default(),
787+
other: summary_fields,
707788
})
708789
.with_schema_id(
709790
*table_metadata
@@ -840,6 +921,82 @@ fn prefetch_manifest_list(
840921
})
841922
}
842923

924+
/// Updates snapshot summary fields incrementally based on operation changes.
925+
///
926+
/// This function computes snapshot summary metrics by:
927+
/// 1. Parsing existing totals from the old snapshot summary
928+
/// 2. Computing deltas from the new data/delete files being added
929+
/// 3. Adding deltas to existing totals
930+
///
931+
/// # Arguments
932+
/// * `old_summary` - The summary from the parent snapshot (if it exists)
933+
/// * `data_files` - New data files being added in this operation
934+
/// * `delete_files` - New delete files being added in this operation
935+
///
936+
/// # Returns
937+
/// A HashMap with updated summary fields including:
938+
/// - `total-records`: Total record count across all data files
939+
/// - `total-data-files`: Total number of data files
940+
/// - `total-delete-files`: Total number of delete files
941+
/// - `total-file-size-bytes`: Total size of all files in bytes
942+
/// - `added-records`: Records added in this operation
943+
/// - `added-data-files`: Data files added in this operation
944+
/// - `added-files-size-bytes`: Size of files added in this operation
945+
pub fn update_snapshot_summary(
946+
old_summary: Option<&Summary>,
947+
data_files: &[DataFile],
948+
delete_files: &[DataFile],
949+
) -> HashMap<String, String> {
950+
// Parse existing values from old summary
951+
let old_other = old_summary.map(|s| &s.other);
952+
953+
let parse_i64 = |key: &str| -> i64 {
954+
old_other
955+
.and_then(|m| m.get(key))
956+
.and_then(|v| v.parse::<i64>().ok())
957+
.unwrap_or(0)
958+
};
959+
960+
let old_total_records = parse_i64("total-records");
961+
let old_total_data_files = parse_i64("total-data-files");
962+
let old_total_delete_files = parse_i64("total-delete-files");
963+
let old_total_file_size = parse_i64("total-file-size-bytes");
964+
965+
// Compute deltas from new files
966+
let added_data_files = data_files.len() as i64;
967+
let added_delete_files = delete_files.len() as i64;
968+
969+
let added_records: i64 = data_files
970+
.iter()
971+
.filter(|f| *f.content() == Content::Data)
972+
.map(|f| f.record_count())
973+
.sum();
974+
975+
let added_files_size: i64 = data_files
976+
.iter()
977+
.chain(delete_files.iter())
978+
.map(|f| f.file_size_in_bytes())
979+
.sum();
980+
981+
// Compute new totals
982+
let total_records = old_total_records + added_records;
983+
let total_data_files = old_total_data_files + added_data_files;
984+
let total_delete_files = old_total_delete_files + added_delete_files;
985+
let total_file_size = old_total_file_size + added_files_size;
986+
987+
// Build result map
988+
let mut result = HashMap::new();
989+
result.insert("total-records".to_string(), total_records.to_string());
990+
result.insert("total-data-files".to_string(), total_data_files.to_string());
991+
result.insert("total-delete-files".to_string(), total_delete_files.to_string());
992+
result.insert("total-file-size-bytes".to_string(), total_file_size.to_string());
993+
result.insert("added-records".to_string(), added_records.to_string());
994+
result.insert("added-data-files".to_string(), added_data_files.to_string());
995+
result.insert("added-files-size-bytes".to_string(), added_files_size.to_string());
996+
997+
result
998+
}
999+
8431000
pub(crate) fn new_manifest_location(
8441001
table_metadata_location: &str,
8451002
commit_uuid: &str,

0 commit comments

Comments
 (0)