Skip to content

Commit f614586

Browse files
author
Jan Kaul
committed
fix missing schemas for tests
1 parent cec8f67 commit f614586

File tree

10 files changed

+155
-16
lines changed

10 files changed

+155
-16
lines changed

catalogs/iceberg-sql-catalog/src/lib.rs

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,34 @@ impl Catalog for SqlCatalog {
148148
/// Create a namespace in the catalog
149149
async fn create_namespace(
150150
&self,
151-
_namespace: &Namespace,
152-
_properties: Option<HashMap<String, String>>,
151+
namespace: &Namespace,
152+
properties: Option<HashMap<String, String>>,
153153
) -> Result<HashMap<String, String>, IcebergError> {
154-
todo!()
154+
let catalog_name = self.name.clone();
155+
let namespace_str = namespace.to_string();
156+
let properties = properties.unwrap_or_default();
157+
158+
// Insert namespace properties into the database
159+
for (key, value) in &properties {
160+
sqlx::query(&format!(
161+
"insert into iceberg_namespace_properties (catalog_name, namespace, property_key, property_value) values ('{catalog_name}', '{namespace_str}', '{key}', '{value}');"
162+
))
163+
.execute(&self.pool)
164+
.await
165+
.map_err(Error::from)?;
166+
}
167+
168+
// If no properties were provided, still create an entry to mark the namespace as existing
169+
if properties.is_empty() {
170+
sqlx::query(&format!(
171+
"insert into iceberg_namespace_properties (catalog_name, namespace, property_key, property_value) values ('{catalog_name}', '{namespace_str}', null, null);"
172+
))
173+
.execute(&self.pool)
174+
.await
175+
.map_err(Error::from)?;
176+
}
177+
178+
Ok(properties)
155179
}
156180
/// Drop a namespace in the catalog
157181
async fn drop_namespace(&self, _namespace: &Namespace) -> Result<(), IcebergError> {
@@ -201,8 +225,7 @@ impl Catalog for SqlCatalog {
201225

202226
let rows = {
203227
sqlx::query(&format!(
204-
"select distinct table_namespace from iceberg_tables where catalog_name = '{}';",
205-
&name
228+
"select distinct namespace from iceberg_namespace_properties where catalog_name = '{name}';",
206229
))
207230
.fetch_all(&self.pool)
208231
.await

datafusion_iceberg/src/catalog/catalog.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ impl IcebergCatalog {
2929
pub fn catalog(&self) -> Arc<dyn Catalog> {
3030
self.catalog.catalog()
3131
}
32+
33+
pub fn mirror(&self) -> Arc<Mirror> {
34+
self.catalog.clone()
35+
}
3236
}
3337

3438
impl CatalogProvider for IcebergCatalog {

datafusion_iceberg/src/catalog/catalog_list.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@ impl CatalogProviderList for IcebergCatalogList {
5050
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
5151
self.catalogs.get(name).as_deref().cloned().or_else(|| {
5252
self.catalog_list.catalog(name).map(|catalog| {
53-
Arc::new(IcebergCatalog::new_sync(catalog, None)) as Arc<dyn CatalogProvider>
53+
let iceberg_catalog = Arc::new(IcebergCatalog::new_sync(catalog, None));
54+
self.catalogs
55+
.insert(name.to_owned(), iceberg_catalog.clone());
56+
iceberg_catalog as Arc<dyn CatalogProvider>
5457
})
5558
})
5659
}

datafusion_iceberg/src/catalog/mirror.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -204,16 +204,14 @@ impl Mirror {
204204
)
205205
.map_err(|err| DataFusionError::External(Box::new(err)))?;
206206

207-
let old_value =
208-
self.storage
209-
.get(&namespace.to_string())
210-
.and_then(|entry| match entry.value() {
211-
Node::Namespace(namespace_node) => Some(namespace_node.clone()),
212-
_ => None,
213-
});
207+
let old_value = self
208+
.storage
209+
.insert(namespace.to_string(), Node::Namespace(HashSet::new()))
210+
.and_then(|entry| match entry {
211+
Node::Namespace(namespace_node) => Some(namespace_node.clone()),
212+
_ => None,
213+
});
214214

215-
self.storage
216-
.insert(namespace.to_string(), Node::Namespace(HashSet::new()));
217215
Ok(old_value)
218216
}
219217

datafusion_iceberg/src/materialized_view/delta_queries/mod.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,19 @@ mod tests {
114114

115115
let transformed = plan.transform(iceberg_transform).data().unwrap();
116116

117+
ctx.execute_logical_plan(transformed)
118+
.await
119+
.unwrap()
120+
.collect()
121+
.await
122+
.expect("Failed to execute query plan.");
123+
124+
let sql = &"CREATE SCHEMA warehouse.tpch;".to_string();
125+
126+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
127+
128+
let transformed = plan.transform(iceberg_transform).data().unwrap();
129+
117130
ctx.execute_logical_plan(transformed)
118131
.await
119132
.unwrap()
@@ -552,6 +565,19 @@ WHERE L_SHIPDATE >= '1996-01-01';
552565

553566
let transformed = plan.transform(iceberg_transform).data().unwrap();
554567

568+
ctx.execute_logical_plan(transformed)
569+
.await
570+
.unwrap()
571+
.collect()
572+
.await
573+
.expect("Failed to execute query plan.");
574+
575+
let sql = &"CREATE SCHEMA warehouse.tpch;".to_string();
576+
577+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
578+
579+
let transformed = plan.transform(iceberg_transform).data().unwrap();
580+
555581
ctx.execute_logical_plan(transformed)
556582
.await
557583
.unwrap()
@@ -975,6 +1001,19 @@ WHERE L_SHIPDATE >= '1996-01-01';
9751001

9761002
let transformed = plan.transform(iceberg_transform).data().unwrap();
9771003

1004+
ctx.execute_logical_plan(transformed)
1005+
.await
1006+
.unwrap()
1007+
.collect()
1008+
.await
1009+
.expect("Failed to execute query plan.");
1010+
1011+
let sql = &"CREATE SCHEMA warehouse.tpch;".to_string();
1012+
1013+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
1014+
1015+
let transformed = plan.transform(iceberg_transform).data().unwrap();
1016+
9781017
ctx.execute_logical_plan(transformed)
9791018
.await
9801019
.unwrap()
@@ -1281,6 +1320,19 @@ WHERE L_SHIPDATE >= '1996-01-01';
12811320

12821321
let transformed = plan.transform(iceberg_transform).data().unwrap();
12831322

1323+
ctx.execute_logical_plan(transformed)
1324+
.await
1325+
.unwrap()
1326+
.collect()
1327+
.await
1328+
.expect("Failed to execute query plan.");
1329+
1330+
let sql = &"CREATE SCHEMA warehouse.tpch;".to_string();
1331+
1332+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
1333+
1334+
let transformed = plan.transform(iceberg_transform).data().unwrap();
1335+
12841336
ctx.execute_logical_plan(transformed)
12851337
.await
12861338
.unwrap()

datafusion_iceberg/src/materialized_view/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,19 @@ mod tests {
470470
iceberg_catalog_list,
471471
)));
472472

473+
let sql = &"CREATE SCHEMA warehouse.public;".to_string();
474+
475+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
476+
477+
let transformed = plan.transform(iceberg_transform).data().unwrap();
478+
479+
ctx.execute_logical_plan(transformed)
480+
.await
481+
.unwrap()
482+
.collect()
483+
.await
484+
.expect("Failed to execute query plan.");
485+
473486
let sql = "CREATE EXTERNAL TABLE warehouse.public.orders (
474487
id BIGINT NOT NULL,
475488
order_date DATE NOT NULL,

datafusion_iceberg/src/planner.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@ use itertools::Itertools;
99
use regex::Regex;
1010

1111
use crate::{
12-
catalog::catalog::IcebergCatalog,
12+
catalog::{catalog::IcebergCatalog, schema::IcebergSchema},
1313
materialized_view::{delta_queries::fork_node::ForkNodePlanner, refresh_materialized_view},
1414
};
1515
use datafusion::{
1616
arrow::datatypes::{DataType, Schema as ArrowSchema},
17+
catalog::CatalogProvider,
1718
common::{tree_node::Transformed, SchemaReference},
1819
error::DataFusionError,
1920
execution::context::{QueryPlanner, SessionState},
@@ -335,6 +336,11 @@ async fn plan_create_namespace(
335336
.await
336337
.map_err(|err| DataFusionError::External(Box::new(err)))?;
337338

339+
iceberg_catalog.register_schema(
340+
namespace_name,
341+
Arc::new(IcebergSchema::new(namespace, iceberg_catalog.mirror())),
342+
)?;
343+
338344
Ok(Some(Arc::new(EmptyExec::new(Arc::new(
339345
ArrowSchema::empty(),
340346
)))))
@@ -409,6 +415,8 @@ async fn plan_drop_namespace(
409415
.await
410416
.map_err(|err| DataFusionError::External(Box::new(err)))?;
411417

418+
iceberg_catalog.deregister_schema(&namespace_name, false)?;
419+
412420
Ok(Some(Arc::new(EmptyExec::new(Arc::new(
413421
ArrowSchema::empty(),
414422
)))))
@@ -939,6 +947,19 @@ mod tests {
939947
iceberg_catalog_list,
940948
)));
941949

950+
let sql = &"CREATE SCHEMA iceberg.public;".to_string();
951+
952+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
953+
954+
let transformed = plan.transform(iceberg_transform).data().unwrap();
955+
956+
ctx.execute_logical_plan(transformed)
957+
.await
958+
.unwrap()
959+
.collect()
960+
.await
961+
.expect("Failed to execute query plan.");
962+
942963
let sql = "CREATE EXTERNAL TABLE iceberg.public.orders (
943964
id BIGINT NOT NULL,
944965
order_date DATE NOT NULL,

datafusion_iceberg/src/table.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1046,6 +1046,7 @@ mod tests {
10461046
catalog::tabular::Tabular,
10471047
object_store::ObjectStoreBuilder,
10481048
spec::{
1049+
namespace::Namespace,
10491050
partition::{PartitionField, Transform},
10501051
schema::Schema,
10511052
types::{PrimitiveType, StructField, Type},
@@ -1488,6 +1489,11 @@ mod tests {
14881489
.unwrap(),
14891490
);
14901491

1492+
catalog
1493+
.create_namespace(&Namespace::try_new(&["test".to_owned()]).unwrap(), None)
1494+
.await
1495+
.unwrap();
1496+
14911497
let schema = Schema::builder()
14921498
.with_struct_field(StructField {
14931499
id: 1,

datafusion_iceberg/tests/empty_insert.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,19 @@ pub async fn test_empty_insert() {
101101

102102
let transformed = plan.transform(iceberg_transform).data().unwrap();
103103

104+
ctx.execute_logical_plan(transformed)
105+
.await
106+
.unwrap()
107+
.collect()
108+
.await
109+
.expect("Failed to execute query plan.");
110+
111+
let sql = &"CREATE SCHEMA warehouse.tpch;".to_string();
112+
113+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
114+
115+
let transformed = plan.transform(iceberg_transform).data().unwrap();
116+
104117
ctx.execute_logical_plan(transformed)
105118
.await
106119
.unwrap()

datafusion_iceberg/tests/equality_delete.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use datafusion_iceberg::catalog::catalog::IcebergCatalog;
33
use futures::stream;
44
use iceberg_rust::catalog::identifier::Identifier;
55
use iceberg_rust::catalog::tabular::Tabular;
6+
use iceberg_rust::spec::namespace::Namespace;
67
use iceberg_rust::{
78
arrow::write::write_equality_deletes_parquet_partitioned,
89
catalog::Catalog,
@@ -27,6 +28,11 @@ pub async fn test_equality_delete() {
2728
.unwrap(),
2829
);
2930

31+
catalog
32+
.create_namespace(&Namespace::try_new(&["test".to_string()]).unwrap(), None)
33+
.await
34+
.unwrap();
35+
3036
let schema = Schema::builder()
3137
.with_struct_field(StructField {
3238
id: 1,

0 commit comments

Comments
 (0)