Skip to content

Commit 767b99c

Browse files
authored
Merge pull request JanKaul#249 from JanKaul/add-logging-information
Add logging information
2 parents a0c9824 + a37b680 commit 767b99c

File tree

7 files changed

+104
-26
lines changed

7 files changed

+104
-26
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion_iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ regex = "1.11.1"
2626
serde_json = { workspace = true }
2727
thiserror = { workspace = true }
2828
tokio = { version = "1.43", features = ["rt-multi-thread"] }
29+
tracing = { workspace = true }
2930
url = { workspace = true }
3031
uuid = { workspace = true }
3132

datafusion_iceberg/src/table.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use tokio::sync::{
3333
mpsc::{self},
3434
RwLock, RwLockWriteGuard,
3535
};
36+
use tracing::instrument;
3637

3738
use datafusion::{
3839
arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaBuilder, SchemaRef},
@@ -355,6 +356,13 @@ fn fake_object_store_url(table_location_url: &str) -> Option<ObjectStoreUrl> {
355356
}
356357

357358
#[allow(clippy::too_many_arguments)]
359+
#[instrument(level = "debug", skip(arrow_schema, statistics, session, filters), fields(
360+
table_location = %table.metadata().location,
361+
snapshot_range = ?snapshot_range,
362+
projection = ?projection,
363+
filter_count = filters.len(),
364+
limit = ?limit
365+
))]
358366
async fn table_scan(
359367
table: &Table,
360368
snapshot_range: &(Option<i64>, Option<i64>),

iceberg-rust/src/table/transaction/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
//! * Managing snapshots and branches
1717
1818
use std::collections::HashMap;
19+
use tracing::debug;
1920

2021
use iceberg_rust_spec::spec::{manifest::DataFile, schema::Schema, snapshot::SnapshotReference};
2122

@@ -398,6 +399,14 @@ impl<'table> TableTransaction<'table> {
398399
return Ok(());
399400
}
400401

402+
debug!(
403+
"Committing {} updates to table {}: requirements={:?}, updates={:?}",
404+
updates.len(),
405+
identifier,
406+
requirements,
407+
updates
408+
);
409+
401410
let new_table = catalog
402411
.clone()
403412
.update_table(CommitTable {

iceberg-rust/src/table/transaction/operation.rs

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use iceberg_rust_spec::util::strip_prefix;
2323
use object_store::ObjectStore;
2424
use smallvec::SmallVec;
2525
use tokio::task::JoinHandle;
26+
use tracing::{debug, instrument};
2627

2728
use crate::table::manifest::ManifestWriter;
2829
use crate::table::manifest_list::ManifestListWriter;
@@ -90,6 +91,7 @@ pub enum Operation {
9091
}
9192

9293
impl Operation {
94+
#[instrument(level = "debug", skip(object_store))]
9395
pub async fn execute(
9496
self,
9597
table_metadata: &TableMetadata,
@@ -102,6 +104,10 @@ impl Operation {
102104
delete_files,
103105
additional_summary,
104106
} => {
107+
debug!(
108+
"Executing Append operation: branch={:?}, data_files={}, delete_files={}, additional_summary={:?}",
109+
branch, data_files.len(), delete_files.len(), additional_summary
110+
);
105111
let old_snapshot = table_metadata.current_snapshot(branch.as_deref())?;
106112

107113
let manifest_list_schema = match table_metadata.format_version {
@@ -220,6 +226,12 @@ impl Operation {
220226
files,
221227
additional_summary,
222228
} => {
229+
debug!(
230+
"Executing Replace operation: branch={:?}, files={}, additional_summary={:?}",
231+
branch,
232+
files.len(),
233+
additional_summary
234+
);
223235
let partition_fields =
224236
table_metadata.current_partition_fields(branch.as_deref())?;
225237
let old_snapshot = table_metadata.current_snapshot(branch.as_deref())?;
@@ -367,6 +379,10 @@ impl Operation {
367379
files_to_overwrite,
368380
additional_summary,
369381
} => {
382+
debug!(
383+
"Executing Overwrite operation: branch={:?}, data_files={}, files_to_overwrite={:?}, additional_summary={:?}",
384+
branch, data_files.len(), files_to_overwrite, additional_summary
385+
);
370386
let old_snapshot = table_metadata
371387
.current_snapshot(branch.as_deref())?
372388
.ok_or(Error::InvalidFormat("Snapshot to overwrite".to_owned()))?;
@@ -500,26 +516,42 @@ impl Operation {
500516
],
501517
))
502518
}
503-
Operation::UpdateProperties(entries) => Ok((
504-
None,
505-
vec![TableUpdate::SetProperties {
506-
updates: HashMap::from_iter(entries),
507-
}],
508-
)),
509-
Operation::SetSnapshotRef((key, value)) => Ok((
510-
table_metadata
511-
.refs
512-
.get(&key)
513-
.map(|x| TableRequirement::AssertRefSnapshotId {
514-
r#ref: key.clone(),
515-
snapshot_id: x.snapshot_id,
516-
}),
517-
vec![TableUpdate::SetSnapshotRef {
518-
ref_name: key,
519-
snapshot_reference: value,
520-
}],
521-
)),
519+
Operation::UpdateProperties(entries) => {
520+
debug!(
521+
"Executing UpdateProperties operation: entries={:?}",
522+
entries
523+
);
524+
Ok((
525+
None,
526+
vec![TableUpdate::SetProperties {
527+
updates: HashMap::from_iter(entries),
528+
}],
529+
))
530+
}
531+
Operation::SetSnapshotRef((key, value)) => {
532+
debug!(
533+
"Executing SetSnapshotRef operation: key={}, value={:?}",
534+
key, value
535+
);
536+
Ok((
537+
table_metadata
538+
.refs
539+
.get(&key)
540+
.map(|x| TableRequirement::AssertRefSnapshotId {
541+
r#ref: key.clone(),
542+
snapshot_id: x.snapshot_id,
543+
}),
544+
vec![TableUpdate::SetSnapshotRef {
545+
ref_name: key,
546+
snapshot_reference: value,
547+
}],
548+
))
549+
}
522550
Operation::AddSchema(schema) => {
551+
debug!(
552+
"Executing AddSchema operation: schema_id={:?}",
553+
schema.schema_id()
554+
);
523555
let last_column_id = schema.fields().iter().map(|x| x.id).max();
524556
Ok((
525557
None,
@@ -530,6 +562,7 @@ impl Operation {
530562
))
531563
}
532564
Operation::SetDefaultSpec(spec_id) => {
565+
debug!("Executing SetDefaultSpec operation: spec_id={}", spec_id);
533566
Ok((None, vec![TableUpdate::SetDefaultSpec { spec_id }]))
534567
}
535568
}

iceberg-rust/src/view/transaction/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
pub mod operation;
66
use iceberg_rust_spec::spec::{types::StructType, view_metadata::ViewRepresentation};
7+
use tracing::debug;
78

89
use crate::{catalog::commit::CommitView, error::Error};
910

@@ -61,6 +62,15 @@ impl<'view> Transaction<'view> {
6162
}
6263
updates.extend(update);
6364
}
65+
66+
debug!(
67+
"Committing {} updates to view {}: requirements={:?}, updates={:?}",
68+
updates.len(),
69+
identifier,
70+
requirements,
71+
updates
72+
);
73+
6474
let new_view = catalog
6575
.clone()
6676
.update_view(CommitView {

iceberg-rust/src/view/transaction/operation.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@ use iceberg_rust_spec::{
1212
};
1313
use std::{
1414
collections::HashMap,
15+
fmt::Debug,
1516
time::{SystemTime, UNIX_EPOCH},
1617
};
18+
use tracing::{debug, instrument};
1719

1820
use crate::{
1921
catalog::commit::{ViewRequirement, ViewUpdate},
2022
error::Error,
2123
};
2224

25+
#[derive(Debug)]
2326
/// View operation
2427
pub enum Operation {
2528
/// Update vresion
@@ -77,7 +80,8 @@ fn upsert_representations(
7780

7881
impl Operation {
7982
/// Execute operation
80-
pub async fn execute<T: Materialization>(
83+
#[instrument(level = "debug")]
84+
pub async fn execute<T: Materialization + Debug>(
8185
self,
8286
metadata: &GeneralViewMetadata<T>,
8387
) -> Result<(Option<ViewRequirement>, Vec<ViewUpdate<T>>), Error> {
@@ -87,6 +91,12 @@ impl Operation {
8791
schema,
8892
branch,
8993
} => {
94+
debug!(
95+
"Executing UpdateRepresentations operation: representations={}, schema_fields={}, branch={:?}",
96+
representations.len(),
97+
schema.len(),
98+
branch
99+
);
90100
let schema_changed = metadata
91101
.current_schema(branch.as_deref())
92102
.map(|s| schema != *s.fields())
@@ -158,12 +168,18 @@ impl Operation {
158168
view_updates,
159169
))
160170
}
161-
Operation::UpdateProperties(entries) => Ok((
162-
None,
163-
vec![ViewUpdate::SetProperties {
164-
updates: HashMap::from_iter(entries),
165-
}],
166-
)),
171+
Operation::UpdateProperties(entries) => {
172+
debug!(
173+
"Executing UpdateProperties operation: entries={:?}",
174+
entries
175+
);
176+
Ok((
177+
None,
178+
vec![ViewUpdate::SetProperties {
179+
updates: HashMap::from_iter(entries),
180+
}],
181+
))
182+
}
167183
}
168184
}
169185
}

0 commit comments

Comments
 (0)