Skip to content

Commit 704e497

Browse files
committed
add statistics test
1 parent e43f47b commit 704e497

File tree

1 file changed

+181
-0
lines changed

1 file changed

+181
-0
lines changed
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
use std::sync::Arc;
2+
3+
use datafusion::{
4+
common::{stats::Precision, tree_node::{TransformedResult, TreeNode}, ScalarValue},
5+
execution::{context::SessionContext, SessionStateBuilder},
6+
};
7+
use datafusion_expr::ScalarUDF;
8+
use iceberg_rust::object_store::ObjectStoreBuilder;
9+
use iceberg_sql_catalog::SqlCatalogList;
10+
11+
use datafusion_iceberg::{
12+
catalog::catalog_list::IcebergCatalogList,
13+
planner::{iceberg_transform, IcebergQueryPlanner, RefreshMaterializedView},
14+
};
15+
16+
#[tokio::test]
17+
async fn test_table_statistics() {
18+
let object_store = ObjectStoreBuilder::memory();
19+
let iceberg_catalog_list = Arc::new(
20+
SqlCatalogList::new("sqlite://", object_store)
21+
.await
22+
.unwrap(),
23+
);
24+
25+
let catalog_list = {
26+
Arc::new(
27+
IcebergCatalogList::new(iceberg_catalog_list.clone())
28+
.await
29+
.unwrap(),
30+
)
31+
};
32+
33+
let state = SessionStateBuilder::new()
34+
.with_default_features()
35+
.with_catalog_list(catalog_list)
36+
.with_query_planner(Arc::new(IcebergQueryPlanner::new()))
37+
.build();
38+
39+
let ctx = SessionContext::new_with_state(state);
40+
41+
ctx.register_udf(ScalarUDF::from(RefreshMaterializedView::new(
42+
iceberg_catalog_list,
43+
)));
44+
45+
let sql = "CREATE EXTERNAL TABLE lineitem (
46+
L_ORDERKEY BIGINT NOT NULL,
47+
L_PARTKEY BIGINT NOT NULL,
48+
L_SUPPKEY BIGINT NOT NULL,
49+
L_LINENUMBER INT NOT NULL,
50+
L_QUANTITY DOUBLE NOT NULL,
51+
L_EXTENDED_PRICE DOUBLE NOT NULL,
52+
L_DISCOUNT DOUBLE NOT NULL,
53+
L_TAX DOUBLE NOT NULL,
54+
L_RETURNFLAG CHAR NOT NULL,
55+
L_LINESTATUS CHAR NOT NULL,
56+
L_SHIPDATE DATE NOT NULL,
57+
L_COMMITDATE DATE NOT NULL,
58+
L_RECEIPTDATE DATE NOT NULL,
59+
L_SHIPINSTRUCT VARCHAR NOT NULL,
60+
L_SHIPMODE VARCHAR NOT NULL,
61+
L_COMMENT VARCHAR NOT NULL ) STORED AS CSV LOCATION 'testdata/tpch/lineitem.csv' OPTIONS ('has_header' 'false');";
62+
63+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
64+
65+
let transformed = plan.transform(iceberg_transform).data().unwrap();
66+
67+
ctx.execute_logical_plan(transformed)
68+
.await
69+
.unwrap()
70+
.collect()
71+
.await
72+
.expect("Failed to execute query plan.");
73+
74+
let sql = "CREATE SCHEMA warehouse.tpch;";
75+
76+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
77+
78+
let transformed = plan.transform(iceberg_transform).data().unwrap();
79+
80+
ctx.execute_logical_plan(transformed)
81+
.await
82+
.unwrap()
83+
.collect()
84+
.await
85+
.expect("Failed to execute query plan.");
86+
87+
let sql = "CREATE EXTERNAL TABLE warehouse.tpch.lineitem (
88+
L_ORDERKEY BIGINT NOT NULL,
89+
L_PARTKEY BIGINT NOT NULL,
90+
L_SUPPKEY BIGINT NOT NULL,
91+
L_LINENUMBER INT NOT NULL,
92+
L_QUANTITY DOUBLE NOT NULL,
93+
L_EXTENDED_PRICE DOUBLE NOT NULL,
94+
L_DISCOUNT DOUBLE NOT NULL,
95+
L_TAX DOUBLE NOT NULL,
96+
L_RETURNFLAG CHAR NOT NULL,
97+
L_LINESTATUS CHAR NOT NULL,
98+
L_SHIPDATE DATE NOT NULL,
99+
L_COMMITDATE DATE NOT NULL,
100+
L_RECEIPTDATE DATE NOT NULL,
101+
L_SHIPINSTRUCT VARCHAR NOT NULL,
102+
L_SHIPMODE VARCHAR NOT NULL,
103+
L_COMMENT VARCHAR NOT NULL ) STORED AS ICEBERG LOCATION '/warehouse/tpch/lineitem' PARTITIONED BY ( \"month(L_SHIPDATE)\" );";
104+
105+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
106+
107+
let transformed = plan.transform(iceberg_transform).data().unwrap();
108+
109+
ctx.execute_logical_plan(transformed)
110+
.await
111+
.unwrap()
112+
.collect()
113+
.await
114+
.expect("Failed to execute query plan.");
115+
116+
let sql = "insert into warehouse.tpch.lineitem select * from lineitem;";
117+
118+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
119+
120+
let transformed = plan.transform(iceberg_transform).data().unwrap();
121+
122+
ctx.execute_logical_plan(transformed)
123+
.await
124+
.unwrap()
125+
.collect()
126+
.await
127+
.expect("Failed to execute query plan.");
128+
129+
let sql = ctx
130+
.sql("select sum(L_QUANTITY), L_PARTKEY from warehouse.tpch.lineitem group by L_PARTKEY;")
131+
.await
132+
.expect("Failed to create plan for select");
133+
134+
let physical_plan = sql.create_physical_plan().await.unwrap();
135+
136+
let stats = physical_plan.partition_statistics(None).unwrap();
137+
138+
// Validate table-level statistics
139+
assert_eq!(
140+
stats.num_rows,
141+
Precision::Inexact(47048),
142+
"num_rows should match the total rows in the CSV file"
143+
);
144+
assert!(
145+
matches!(stats.total_byte_size, Precision::Inexact(size) if size > 0),
146+
"total_byte_size should be Inexact and greater than 0"
147+
);
148+
149+
// Validate column count (sum(L_QUANTITY) and L_PARTKEY)
150+
assert_eq!(
151+
stats.column_statistics.len(),
152+
2,
153+
"Should have statistics for 2 columns"
154+
);
155+
156+
// Validate first column (sum(L_QUANTITY)) - aggregate column should have Absent statistics
157+
assert!(
158+
matches!(stats.column_statistics[0].min_value, Precision::Absent),
159+
"Aggregate column min_value should be Absent"
160+
);
161+
assert!(
162+
matches!(stats.column_statistics[0].max_value, Precision::Absent),
163+
"Aggregate column max_value should be Absent"
164+
);
165+
assert!(
166+
matches!(stats.column_statistics[0].null_count, Precision::Absent),
167+
"Aggregate column null_count should be Absent"
168+
);
169+
170+
// Validate second column (L_PARTKEY) - should have min/max bounds from Iceberg metadata
171+
assert_eq!(
172+
stats.column_statistics[1].min_value,
173+
Precision::Exact(ScalarValue::Int64(Some(2))),
174+
"L_PARTKEY min_value should be 2"
175+
);
176+
assert_eq!(
177+
stats.column_statistics[1].max_value,
178+
Precision::Exact(ScalarValue::Int64(Some(200000))),
179+
"L_PARTKEY max_value should be 200000"
180+
);
181+
}

0 commit comments

Comments
 (0)