Skip to content

Commit b582b34

Browse files
authored
Merge pull request JanKaul#208 from JanKaul/extend-catalog-mirror
Extend catalog mirror with schema management
2 parents 44593db + e37b99a commit b582b34

File tree

11 files changed

+237
-12
lines changed

11 files changed

+237
-12
lines changed

catalogs/iceberg-file-catalog/src/lib.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ impl Catalog for FileCatalog {
7474
_namespace: &Namespace,
7575
_properties: Option<HashMap<String, String>>,
7676
) -> Result<HashMap<String, String>, IcebergError> {
77-
todo!()
77+
Ok(HashMap::new())
7878
}
7979
/// Drop a namespace in the catalog
8080
async fn drop_namespace(&self, _namespace: &Namespace) -> Result<(), IcebergError> {
@@ -743,6 +743,19 @@ pub mod tests {
743743

744744
ctx.register_catalog("warehouse", catalog);
745745

746+
let sql = &"CREATE SCHEMA warehouse.tpch;".to_string();
747+
748+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
749+
750+
let transformed = plan.transform(iceberg_transform).data().unwrap();
751+
752+
ctx.execute_logical_plan(transformed)
753+
.await
754+
.unwrap()
755+
.collect()
756+
.await
757+
.expect("Failed to execute query plan.");
758+
746759
let sql = "CREATE EXTERNAL TABLE lineitem (
747760
L_ORDERKEY BIGINT NOT NULL,
748761
L_PARTKEY BIGINT NOT NULL,

catalogs/iceberg-sql-catalog/src/lib.rs

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,34 @@ impl Catalog for SqlCatalog {
148148
/// Create a namespace in the catalog
149149
async fn create_namespace(
150150
&self,
151-
_namespace: &Namespace,
152-
_properties: Option<HashMap<String, String>>,
151+
namespace: &Namespace,
152+
properties: Option<HashMap<String, String>>,
153153
) -> Result<HashMap<String, String>, IcebergError> {
154-
todo!()
154+
let catalog_name = self.name.clone();
155+
let namespace_str = namespace.to_string();
156+
let properties = properties.unwrap_or_default();
157+
158+
// Insert namespace properties into the database
159+
for (key, value) in &properties {
160+
sqlx::query(&format!(
161+
"insert into iceberg_namespace_properties (catalog_name, namespace, property_key, property_value) values ('{catalog_name}', '{namespace_str}', '{key}', '{value}');"
162+
))
163+
.execute(&self.pool)
164+
.await
165+
.map_err(Error::from)?;
166+
}
167+
168+
// If no properties were provided, still create an entry to mark the namespace as existing
169+
if properties.is_empty() {
170+
sqlx::query(&format!(
171+
"insert into iceberg_namespace_properties (catalog_name, namespace, property_key, property_value) values ('{catalog_name}', '{namespace_str}', 'exists', 'true');"
172+
))
173+
.execute(&self.pool)
174+
.await
175+
.map_err(Error::from)?;
176+
}
177+
178+
Ok(properties)
155179
}
156180
/// Drop a namespace in the catalog
157181
async fn drop_namespace(&self, _namespace: &Namespace) -> Result<(), IcebergError> {
@@ -201,8 +225,7 @@ impl Catalog for SqlCatalog {
201225

202226
let rows = {
203227
sqlx::query(&format!(
204-
"select distinct table_namespace from iceberg_tables where catalog_name = '{}';",
205-
&name
228+
"select distinct namespace from iceberg_namespace_properties where catalog_name = '{name}';",
206229
))
207230
.fetch_all(&self.pool)
208231
.await
@@ -840,6 +863,19 @@ pub mod tests {
840863

841864
ctx.register_catalog("warehouse", catalog);
842865

866+
let sql = &"CREATE SCHEMA warehouse.tpch;".to_string();
867+
868+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
869+
870+
let transformed = plan.transform(iceberg_transform).data().unwrap();
871+
872+
ctx.execute_logical_plan(transformed)
873+
.await
874+
.unwrap()
875+
.collect()
876+
.await
877+
.expect("Failed to execute query plan.");
878+
843879
let sql = "CREATE EXTERNAL TABLE lineitem (
844880
L_ORDERKEY BIGINT NOT NULL,
845881
L_PARTKEY BIGINT NOT NULL,

datafusion_iceberg/src/catalog/catalog.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ impl IcebergCatalog {
2929
pub fn catalog(&self) -> Arc<dyn Catalog> {
3030
self.catalog.catalog()
3131
}
32+
33+
pub fn mirror(&self) -> Arc<Mirror> {
34+
self.catalog.clone()
35+
}
3236
}
3337

3438
impl CatalogProvider for IcebergCatalog {
@@ -43,6 +47,9 @@ impl CatalogProvider for IcebergCatalog {
4347
}
4448
}
4549
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
50+
if !self.catalog.schema_exists(name) {
51+
return None;
52+
}
4653
Some(Arc::new(IcebergSchema::new(
4754
Namespace::try_new(
4855
&name
@@ -57,9 +64,26 @@ impl CatalogProvider for IcebergCatalog {
5764

5865
fn register_schema(
5966
&self,
60-
_name: &str,
67+
name: &str,
6168
_schema: Arc<dyn SchemaProvider>,
6269
) -> Result<Option<Arc<dyn SchemaProvider>>> {
63-
unimplemented!()
70+
let old_namespace = self.catalog.register_schema(name)?;
71+
if old_namespace.is_some() {
72+
Ok(self.schema(name))
73+
} else {
74+
Ok(None)
75+
}
76+
}
77+
78+
fn deregister_schema(
79+
&self,
80+
name: &str,
81+
_cascade: bool,
82+
) -> Result<Option<Arc<dyn SchemaProvider>>> {
83+
if self.catalog.deregister_schema(name)?.is_some() {
84+
Ok(self.schema(name))
85+
} else {
86+
Ok(None)
87+
}
6488
}
6589
}

datafusion_iceberg/src/catalog/catalog_list.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@ impl CatalogProviderList for IcebergCatalogList {
5050
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
5151
self.catalogs.get(name).as_deref().cloned().or_else(|| {
5252
self.catalog_list.catalog(name).map(|catalog| {
53-
Arc::new(IcebergCatalog::new_sync(catalog, None)) as Arc<dyn CatalogProvider>
53+
let iceberg_catalog = Arc::new(IcebergCatalog::new_sync(catalog, None));
54+
self.catalogs
55+
.insert(name.to_owned(), iceberg_catalog.clone());
56+
iceberg_catalog as Arc<dyn CatalogProvider>
5457
})
5558
})
5659
}

datafusion_iceberg/src/catalog/mirror.rs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::{error::Error, DataFusionTable};
1212

1313
type NamespaceNode = HashSet<String>;
1414

15-
#[derive(Debug)]
15+
#[derive(Debug, Clone)]
1616
enum Node {
1717
Namespace(NamespaceNode),
1818
Relation(Identifier),
@@ -180,10 +180,48 @@ impl Mirror {
180180
.map_err(DataFusionError::from)
181181
}
182182
pub fn table_exists(&self, identifier: Identifier) -> bool {
183-
self.storage.contains_key(&identifier.to_string())
183+
self.storage
184+
.get(&identifier.to_string())
185+
.is_some_and(|node| matches!(node.value(), Node::Relation(_)))
186+
}
187+
188+
pub fn schema_exists(&self, name: &str) -> bool {
189+
self.storage
190+
.get(name)
191+
.is_some_and(|node| matches!(node.value(), Node::Namespace(_)))
184192
}
185193

186194
pub fn catalog(&self) -> Arc<dyn Catalog> {
187195
self.catalog.clone()
188196
}
197+
198+
pub fn register_schema(&self, name: &str) -> Result<Option<NamespaceNode>, DataFusionError> {
199+
let namespace = Namespace::try_new(
200+
&name
201+
.split('.')
202+
.map(|z| z.to_owned())
203+
.collect::<Vec<String>>(),
204+
)
205+
.map_err(|err| DataFusionError::External(Box::new(err)))?;
206+
207+
let old_value = self
208+
.storage
209+
.insert(namespace.to_string(), Node::Namespace(HashSet::new()))
210+
.and_then(|entry| match entry {
211+
Node::Namespace(namespace_node) => Some(namespace_node.clone()),
212+
_ => None,
213+
});
214+
215+
Ok(old_value)
216+
}
217+
218+
pub fn deregister_schema(&self, name: &str) -> Result<Option<NamespaceNode>, DataFusionError> {
219+
match self.storage.remove(name) {
220+
Some((_, Node::Namespace(namespace))) => Ok(Some(namespace)),
221+
None => Ok(None),
222+
_ => Err(DataFusionError::from(Error::from(
223+
IcebergError::InvalidFormat("Schema to drop".to_owned()),
224+
))),
225+
}
226+
}
189227
}

datafusion_iceberg/src/materialized_view/delta_queries/mod.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,19 @@ mod tests {
114114

115115
let transformed = plan.transform(iceberg_transform).data().unwrap();
116116

117+
ctx.execute_logical_plan(transformed)
118+
.await
119+
.unwrap()
120+
.collect()
121+
.await
122+
.expect("Failed to execute query plan.");
123+
124+
let sql = &"CREATE SCHEMA warehouse.tpch;".to_string();
125+
126+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
127+
128+
let transformed = plan.transform(iceberg_transform).data().unwrap();
129+
117130
ctx.execute_logical_plan(transformed)
118131
.await
119132
.unwrap()
@@ -552,6 +565,19 @@ WHERE L_SHIPDATE >= '1996-01-01';
552565

553566
let transformed = plan.transform(iceberg_transform).data().unwrap();
554567

568+
ctx.execute_logical_plan(transformed)
569+
.await
570+
.unwrap()
571+
.collect()
572+
.await
573+
.expect("Failed to execute query plan.");
574+
575+
let sql = &"CREATE SCHEMA warehouse.tpch;".to_string();
576+
577+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
578+
579+
let transformed = plan.transform(iceberg_transform).data().unwrap();
580+
555581
ctx.execute_logical_plan(transformed)
556582
.await
557583
.unwrap()
@@ -975,6 +1001,19 @@ WHERE L_SHIPDATE >= '1996-01-01';
9751001

9761002
let transformed = plan.transform(iceberg_transform).data().unwrap();
9771003

1004+
ctx.execute_logical_plan(transformed)
1005+
.await
1006+
.unwrap()
1007+
.collect()
1008+
.await
1009+
.expect("Failed to execute query plan.");
1010+
1011+
let sql = &"CREATE SCHEMA warehouse.tpch;".to_string();
1012+
1013+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
1014+
1015+
let transformed = plan.transform(iceberg_transform).data().unwrap();
1016+
9781017
ctx.execute_logical_plan(transformed)
9791018
.await
9801019
.unwrap()
@@ -1281,6 +1320,19 @@ WHERE L_SHIPDATE >= '1996-01-01';
12811320

12821321
let transformed = plan.transform(iceberg_transform).data().unwrap();
12831322

1323+
ctx.execute_logical_plan(transformed)
1324+
.await
1325+
.unwrap()
1326+
.collect()
1327+
.await
1328+
.expect("Failed to execute query plan.");
1329+
1330+
let sql = &"CREATE SCHEMA warehouse.tpch;".to_string();
1331+
1332+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
1333+
1334+
let transformed = plan.transform(iceberg_transform).data().unwrap();
1335+
12841336
ctx.execute_logical_plan(transformed)
12851337
.await
12861338
.unwrap()

datafusion_iceberg/src/materialized_view/mod.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,19 @@ mod tests {
470470
iceberg_catalog_list,
471471
)));
472472

473+
let sql = &"CREATE SCHEMA warehouse.public;".to_string();
474+
475+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
476+
477+
let transformed = plan.transform(iceberg_transform).data().unwrap();
478+
479+
ctx.execute_logical_plan(transformed)
480+
.await
481+
.unwrap()
482+
.collect()
483+
.await
484+
.expect("Failed to execute query plan.");
485+
473486
let sql = "CREATE EXTERNAL TABLE warehouse.public.orders (
474487
id BIGINT NOT NULL,
475488
order_date DATE NOT NULL,

datafusion_iceberg/src/planner.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@ use itertools::Itertools;
99
use regex::Regex;
1010

1111
use crate::{
12-
catalog::catalog::IcebergCatalog,
12+
catalog::{catalog::IcebergCatalog, schema::IcebergSchema},
1313
materialized_view::{delta_queries::fork_node::ForkNodePlanner, refresh_materialized_view},
1414
};
1515
use datafusion::{
1616
arrow::datatypes::{DataType, Schema as ArrowSchema},
17+
catalog::CatalogProvider,
1718
common::{tree_node::Transformed, SchemaReference},
1819
error::DataFusionError,
1920
execution::context::{QueryPlanner, SessionState},
@@ -335,6 +336,11 @@ async fn plan_create_namespace(
335336
.await
336337
.map_err(|err| DataFusionError::External(Box::new(err)))?;
337338

339+
iceberg_catalog.register_schema(
340+
namespace_name,
341+
Arc::new(IcebergSchema::new(namespace, iceberg_catalog.mirror())),
342+
)?;
343+
338344
Ok(Some(Arc::new(EmptyExec::new(Arc::new(
339345
ArrowSchema::empty(),
340346
)))))
@@ -409,6 +415,8 @@ async fn plan_drop_namespace(
409415
.await
410416
.map_err(|err| DataFusionError::External(Box::new(err)))?;
411417

418+
iceberg_catalog.deregister_schema(&namespace_name, false)?;
419+
412420
Ok(Some(Arc::new(EmptyExec::new(Arc::new(
413421
ArrowSchema::empty(),
414422
)))))
@@ -939,6 +947,19 @@ mod tests {
939947
iceberg_catalog_list,
940948
)));
941949

950+
let sql = &"CREATE SCHEMA iceberg.public;".to_string();
951+
952+
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
953+
954+
let transformed = plan.transform(iceberg_transform).data().unwrap();
955+
956+
ctx.execute_logical_plan(transformed)
957+
.await
958+
.unwrap()
959+
.collect()
960+
.await
961+
.expect("Failed to execute query plan.");
962+
942963
let sql = "CREATE EXTERNAL TABLE iceberg.public.orders (
943964
id BIGINT NOT NULL,
944965
order_date DATE NOT NULL,

datafusion_iceberg/src/table.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1046,6 +1046,7 @@ mod tests {
10461046
catalog::tabular::Tabular,
10471047
object_store::ObjectStoreBuilder,
10481048
spec::{
1049+
namespace::Namespace,
10491050
partition::{PartitionField, Transform},
10501051
schema::Schema,
10511052
types::{PrimitiveType, StructField, Type},
@@ -1488,6 +1489,11 @@ mod tests {
14881489
.unwrap(),
14891490
);
14901491

1492+
catalog
1493+
.create_namespace(&Namespace::try_new(&["test".to_owned()]).unwrap(), None)
1494+
.await
1495+
.unwrap();
1496+
14911497
let schema = Schema::builder()
14921498
.with_struct_field(StructField {
14931499
id: 1,

0 commit comments

Comments
 (0)