Skip to content

Commit a0c9824

Browse files
authored
Merge pull request JanKaul#243 from JanKaul/refactor-parquet-writer
Refactor parquet writer
2 parents 2782d71 + f0bbde7 commit a0c9824

File tree

9 files changed

+469
-56
lines changed

9 files changed

+469
-56
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

catalogs/iceberg-file-catalog/src/lib.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -663,12 +663,13 @@ pub mod tests {
663663
arrow::array::{Float64Array, Int64Array},
664664
common::tree_node::{TransformedResult, TreeNode},
665665
execution::SessionStateBuilder,
666-
prelude::SessionContext,
666+
prelude::{SessionConfig, SessionContext},
667667
};
668668
use datafusion_iceberg::{
669669
catalog::catalog::IcebergCatalog,
670670
planner::{iceberg_transform, IcebergQueryPlanner},
671671
};
672+
use futures::StreamExt;
672673
use iceberg_rust::{
673674
catalog::{namespace::Namespace, Catalog},
674675
object_store::{Bucket, ObjectStoreBuilder},
@@ -734,7 +735,17 @@ pub mod tests {
734735
.unwrap(),
735736
);
736737

738+
let mut config = SessionConfig::default();
739+
740+
config.options_mut().execution.minimum_parallel_output_files = 1;
741+
config
742+
.options_mut()
743+
.execution
744+
.parquet
745+
.maximum_parallel_row_group_writers = 4;
746+
737747
let state = SessionStateBuilder::new()
748+
.with_config(config)
738749
.with_default_features()
739750
.with_query_planner(Arc::new(IcebergQueryPlanner::new()))
740751
.build();
@@ -801,7 +812,7 @@ pub mod tests {
801812
L_RECEIPTDATE DATE NOT NULL,
802813
L_SHIPINSTRUCT VARCHAR NOT NULL,
803814
L_SHIPMODE VARCHAR NOT NULL,
804-
L_COMMENT VARCHAR NOT NULL ) STORED AS ICEBERG LOCATION 's3://warehouse/tpch/lineitem' PARTITIONED BY ( \"month(L_SHIPDATE)\" );";
815+
L_COMMENT VARCHAR NOT NULL ) STORED AS ICEBERG LOCATION 's3://warehouse/tpch/lineitem';";
805816

806817
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
807818

@@ -889,6 +900,21 @@ pub mod tests {
889900
std::str::from_utf8(&version_hint).unwrap(),
890901
"s3://warehouse/tpch/lineitem/metadata/v1.metadata.json"
891902
);
903+
904+
let files = object_store.list(None).collect::<Vec<_>>().await;
905+
906+
assert_eq!(
907+
files
908+
.iter()
909+
.filter(|x| x
910+
.as_ref()
911+
.unwrap()
912+
.location
913+
.extension()
914+
.is_some_and(|x| x == "parquet"))
915+
.count(),
916+
1
917+
);
892918
}
893919

894920
#[tokio::test]

datafusion_iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ derive_builder = { workspace = true }
1919
futures = { workspace = true }
2020
iceberg-rust = { path = "../iceberg-rust", version = "0.8.0" }
2121
itertools = { workspace = true }
22+
lru = { workspace = true }
2223
object_store = { workspace = true }
2324
pin-project-lite = "0.2.16"
2425
regex = "1.11.1"

datafusion_iceberg/src/error.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
Error type for iceberg
33
*/
44

5-
use datafusion::error::DataFusionError;
5+
use datafusion::{arrow::array::RecordBatch, error::DataFusionError};
66
use iceberg_rust::error::Error as IcebergError;
77
use thiserror::Error;
88

@@ -54,6 +54,15 @@ pub enum Error {
5454
/// parse int error
5555
#[error(transparent)]
5656
ParseInt(#[from] std::num::ParseIntError),
57+
/// Tokio error
58+
#[error(transparent)]
59+
TokioSend(
60+
#[from]
61+
tokio::sync::mpsc::error::SendError<(
62+
object_store::path::Path,
63+
tokio::sync::mpsc::Receiver<RecordBatch>,
64+
)>,
65+
),
5766
/// parse int error
5867
#[error(transparent)]
5968
DeriveBuilder(#[from] derive_builder::UninitializedFieldError),

0 commit comments

Comments
 (0)