From 757a741d2839e236cc7c937dab3f998811938710 Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Tue, 10 Feb 2026 14:04:51 +0100 Subject: [PATCH 01/11] databricks uc skeleton --- pom.xml | 1 + .../xtable/model/storage/CatalogType.java | 1 + xtable-databricks/pom.xml | 44 +++++++ .../DatabricksUnityCatalogConfig.java | 53 ++++++++ .../DatabricksUnityCatalogSyncClient.java | 123 ++++++++++++++++++ ...g.apache.xtable.spi.sync.CatalogSyncClient | 1 + 6 files changed, 223 insertions(+) create mode 100644 xtable-databricks/pom.xml create mode 100644 xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogConfig.java create mode 100644 xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java create mode 100644 xtable-databricks/src/main/resources/META-INF/services/org.apache.xtable.spi.sync.CatalogSyncClient diff --git a/pom.xml b/pom.xml index c6c4a833a..e7cb679f0 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ xtable-core xtable-utilities xtable-aws + xtable-databricks xtable-hive-metastore xtable-service diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java index e4b778d6b..ed41a4fbf 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java @@ -27,4 +27,5 @@ public class CatalogType { public static final String STORAGE = "STORAGE"; public static final String GLUE = "GLUE"; public static final String HMS = "HMS"; + public static final String DATABRICKS_UC = "DATABRICKS_UC"; } diff --git a/xtable-databricks/pom.xml b/xtable-databricks/pom.xml new file mode 100644 index 000000000..54b7b5c81 --- /dev/null +++ b/xtable-databricks/pom.xml @@ -0,0 +1,44 @@ + + + + 4.0.0 + + org.apache.xtable + xtable + 0.3.0-incubating + + + xtable-databricks + XTable Databricks + + + + org.apache.xtable + xtable-core_${scala.binary.version} + ${project.version} + + + + com.databricks + databricks-sdk-java + 0.85.0 + + + diff --git a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogConfig.java b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogConfig.java new file mode 100644 index 000000000..c3c9355a2 --- /dev/null +++ b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogConfig.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.databricks; + +import java.util.Map; + +import lombok.Value; + +import org.apache.xtable.conversion.ExternalCatalogConfig; + +@Value +public class DatabricksUnityCatalogConfig { + public static final String HOST = "externalCatalog.uc.host"; + public static final String WAREHOUSE_ID = "externalCatalog.uc.warehouseId"; + public static final String AUTH_TYPE = "externalCatalog.uc.authType"; + public static final String CLIENT_ID = "externalCatalog.uc.clientId"; + public static final String CLIENT_SECRET = "externalCatalog.uc.clientSecret"; + public static final String TOKEN = "externalCatalog.uc.token"; + + String host; + String warehouseId; + String authType; + String clientId; + String clientSecret; + String token; + + public static DatabricksUnityCatalogConfig from(ExternalCatalogConfig catalogConfig) { + Map props = catalogConfig.getCatalogProperties(); + return new DatabricksUnityCatalogConfig( + props.get(HOST), + props.get(WAREHOUSE_ID), + props.get(AUTH_TYPE), + props.get(CLIENT_ID), + props.get(CLIENT_SECRET), + props.get(TOKEN)); + } +} diff --git a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java new file mode 100644 index 000000000..62f08e443 --- /dev/null +++ b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.databricks; + +import lombok.extern.log4j.Log4j2; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.exception.CatalogSyncException; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.storage.CatalogType; +import org.apache.xtable.spi.sync.CatalogSyncClient; + +/** + * Databricks Unity Catalog implementation skeleton for CatalogSyncClient. + * + *

This is a placeholder to wire Databricks UC as a catalog target via the SQL Statement + * Execution API. Actual DDL execution and schema diffing should be implemented in a follow-up + * change. + */ +@Log4j2 +public class DatabricksUnityCatalogSyncClient implements CatalogSyncClient { + + private ExternalCatalogConfig catalogConfig; + private DatabricksUnityCatalogConfig databricksConfig; + private Configuration hadoopConf; + private String tableFormat; + + // For loading the instance using ServiceLoader + public DatabricksUnityCatalogSyncClient() {} + + public DatabricksUnityCatalogSyncClient( + ExternalCatalogConfig catalogConfig, String tableFormat, Configuration configuration) { + init(catalogConfig, tableFormat, configuration); + } + + @Override + public String getCatalogId() { + return catalogConfig.getCatalogId(); + } + + @Override + public String getCatalogType() { + return CatalogType.DATABRICKS_UC; + } + + @Override + public String getStorageLocation(Object table) { + throw new UnsupportedOperationException("Databricks UC sync not implemented"); + } + + @Override + public boolean hasDatabase(CatalogTableIdentifier tableIdentifier) { + throw new UnsupportedOperationException("Databricks UC sync not implemented"); + } + + @Override + public void createDatabase(CatalogTableIdentifier tableIdentifier) { + throw new UnsupportedOperationException("Databricks UC sync not implemented"); + } + + @Override + public Object getTable(CatalogTableIdentifier tableIdentifier) { + throw new UnsupportedOperationException("Databricks UC sync not implemented"); + } + + @Override + public void createTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { + throw new UnsupportedOperationException("Databricks UC sync not implemented"); + } + + @Override + public void refreshTable( + InternalTable table, Object catalogTable, CatalogTableIdentifier tableIdentifier) { + throw new UnsupportedOperationException("Databricks UC sync not implemented"); + } + + @Override + public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { + throw new UnsupportedOperationException("Databricks UC sync not implemented"); + } + + @Override + public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { + throw new UnsupportedOperationException("Databricks UC sync not implemented"); + } + + @Override + public void init( + ExternalCatalogConfig catalogConfig, String tableFormat, Configuration configuration) { + this.catalogConfig = catalogConfig; + this.tableFormat = tableFormat; + this.hadoopConf = configuration; + this.databricksConfig = DatabricksUnityCatalogConfig.from(catalogConfig); + + if (databricksConfig.getHost() == null || databricksConfig.getWarehouseId() == null) { + throw new CatalogSyncException( + "Databricks UC catalog requires host and warehouseId in catalogProperties"); + } + log.info( + "Initialized Databricks UC sync client for catalogId={} tableFormat={}", + catalogConfig.getCatalogId(), + tableFormat); + } +} diff --git a/xtable-databricks/src/main/resources/META-INF/services/org.apache.xtable.spi.sync.CatalogSyncClient b/xtable-databricks/src/main/resources/META-INF/services/org.apache.xtable.spi.sync.CatalogSyncClient new file mode 100644 index 000000000..29db29258 --- /dev/null +++ b/xtable-databricks/src/main/resources/META-INF/services/org.apache.xtable.spi.sync.CatalogSyncClient @@ -0,0 +1 @@ +org.apache.xtable.databricks.DatabricksUnityCatalogSyncClient From 2c323cbeb274270c3827fbc1de6b1a0032e81c1a Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Tue, 10 Feb 2026 14:31:50 +0100 Subject: [PATCH 02/11] implem/test create table --- xtable-databricks/pom.xml | 36 ++++++ .../DatabricksUnityCatalogSyncClient.java | 106 +++++++++++++++- ...g.apache.xtable.spi.sync.CatalogSyncClient | 15 +++ .../TestDatabricksUnityCatalogSyncClient.java | 116 ++++++++++++++++++ 4 files changed, 272 insertions(+), 1 deletion(-) create mode 100644 xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java diff --git a/xtable-databricks/pom.xml b/xtable-databricks/pom.xml index 54b7b5c81..5943b9440 100644 --- a/xtable-databricks/pom.xml +++ b/xtable-databricks/pom.xml @@ -40,5 +40,41 @@ databricks-sdk-java 0.85.0 + + + + org.apache.hadoop + hadoop-common + provided + + + + + org.junit.jupiter + junit-jupiter-api + test + + + org.junit.jupiter + junit-jupiter-params + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + + + org.mockito + mockito-core + test + + + org.mockito + mockito-junit-jupiter + test + diff --git a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java index 62f08e443..64418f12a 100644 --- a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java +++ b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java @@ -18,15 +18,31 @@ package org.apache.xtable.databricks; +import java.util.Objects; + import lombok.extern.log4j.Log4j2; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import com.databricks.sdk.WorkspaceClient; +import com.databricks.sdk.core.DatabricksConfig; +import com.databricks.sdk.service.sql.Disposition; +import com.databricks.sdk.service.sql.ExecuteStatementRequest; +import com.databricks.sdk.service.sql.ExecuteStatementRequestOnWaitTimeout; +import com.databricks.sdk.service.sql.Format; +import com.databricks.sdk.service.sql.StatementExecutionAPI; +import com.databricks.sdk.service.sql.StatementResponse; +import com.databricks.sdk.service.sql.StatementState; + +import org.apache.xtable.catalog.CatalogUtils; import org.apache.xtable.conversion.ExternalCatalogConfig; import org.apache.xtable.exception.CatalogSyncException; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; import org.apache.xtable.model.storage.CatalogType; +import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.spi.sync.CatalogSyncClient; /** @@ -43,6 +59,8 @@ public class DatabricksUnityCatalogSyncClient implements CatalogSyncClient props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + props.put(DatabricksUnityCatalogConfig.AUTH_TYPE, "oauth-m2m"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, TableFormat.DELTA, new Configuration(), mockStatementExecution); + + when(mockStatementExecution.executeStatement(any(ExecuteStatementRequest.class))) + .thenReturn( + new StatementResponse() + .setStatus(new StatementStatus().setState(StatementState.SUCCEEDED))); + + InternalTable table = InternalTable.builder().basePath("s3://bucket/path").build(); + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + + client.createTable(table, tableIdentifier); + + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(ExecuteStatementRequest.class); + verify(mockStatementExecution).executeStatement(requestCaptor.capture()); + ExecuteStatementRequest request = requestCaptor.getValue(); + assertEquals("wh-1", request.getWarehouseId()); + assertEquals( + "CREATE TABLE IF NOT EXISTS main.default.people USING DELTA LOCATION 's3://bucket/path'", + request.getStatement()); + } + + @Test + void testCreateTableRejectsNonDelta() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, TableFormat.ICEBERG, new Configuration(), mockStatementExecution); + + InternalTable table = InternalTable.builder().basePath("s3://bucket/path").build(); + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + + assertThrows(CatalogSyncException.class, () -> client.createTable(table, tableIdentifier)); + } +} From e590d89007894da3f8e4e5199d024720f8740e2e Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Tue, 10 Feb 2026 14:35:19 +0100 Subject: [PATCH 03/11] implem/test hasDatabase/getTable --- .../DatabricksUnityCatalogSyncClient.java | 75 ++++++- .../TestDatabricksUnityCatalogSyncClient.java | 203 +++++++++++++++++- 2 files changed, 263 insertions(+), 15 deletions(-) diff --git a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java index 64418f12a..98b8b643a 100644 --- a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java +++ b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java @@ -27,6 +27,11 @@ import com.databricks.sdk.WorkspaceClient; import com.databricks.sdk.core.DatabricksConfig; +import com.databricks.sdk.core.error.platform.NotFound; +import com.databricks.sdk.service.catalog.SchemaInfo; +import com.databricks.sdk.service.catalog.SchemasAPI; +import com.databricks.sdk.service.catalog.TableInfo; +import com.databricks.sdk.service.catalog.TablesAPI; import com.databricks.sdk.service.sql.Disposition; import com.databricks.sdk.service.sql.ExecuteStatementRequest; import com.databricks.sdk.service.sql.ExecuteStatementRequestOnWaitTimeout; @@ -53,7 +58,7 @@ * change. */ @Log4j2 -public class DatabricksUnityCatalogSyncClient implements CatalogSyncClient { +public class DatabricksUnityCatalogSyncClient implements CatalogSyncClient { private ExternalCatalogConfig catalogConfig; private DatabricksUnityCatalogConfig databricksConfig; @@ -61,6 +66,8 @@ public class DatabricksUnityCatalogSyncClient implements CatalogSyncClient client.createTable(table, tableIdentifier)); } + + @Test + void testHasDatabaseTrue() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + when(mockSchemasApi.get("main.default")).thenReturn(new SchemaInfo()); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + boolean exists = client.hasDatabase(tableIdentifier); + assertEquals(true, exists); + + verify(mockSchemasApi).get("main.default"); + } + + @Test + void testHasDatabaseFalse() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + when(mockSchemasApi.get("main.default")).thenThrow(new NotFound("not found", null)); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + boolean exists = client.hasDatabase(tableIdentifier); + assertEquals(false, exists); + } + + @Test + void testHasDatabaseFailedStatement() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + when(mockSchemasApi.get("main.default")).thenThrow(new RuntimeException("boom")); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + assertThrows(CatalogSyncException.class, () -> client.hasDatabase(tableIdentifier)); + } + + @Test + void testGetTableFound() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + TableInfo tableInfo = new TableInfo().setStorageLocation("s3://bucket/path"); + when(mockTablesApi.get("main.default.people")).thenReturn(tableInfo); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + Object table = client.getTable(tableIdentifier); + assertNotNull(table); + + verify(mockTablesApi).get("main.default.people"); + } + + @Test + void testGetTableNotFound() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + when(mockTablesApi.get("main.default.people")).thenThrow(new NotFound("not found", null)); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + Object table = client.getTable(tableIdentifier); + assertNull(table); + } + + @Test + void testGetTableFailedStatement() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + when(mockTablesApi.get("main.default.people")).thenThrow(new RuntimeException("boom")); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + assertThrows(CatalogSyncException.class, () -> client.getTable(tableIdentifier)); + } } From df63007a9d02b7449a2ac8940fae5c340976a721 Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Tue, 10 Feb 2026 14:54:02 +0100 Subject: [PATCH 04/11] implem/test createDatabase --- .../DatabricksUnityCatalogSyncClient.java | 19 +++++- .../TestDatabricksUnityCatalogSyncClient.java | 64 +++++++++++++++++++ 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java index 98b8b643a..2c52721ac 100644 --- a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java +++ b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java @@ -28,6 +28,7 @@ import com.databricks.sdk.WorkspaceClient; import com.databricks.sdk.core.DatabricksConfig; import com.databricks.sdk.core.error.platform.NotFound; +import com.databricks.sdk.service.catalog.CreateSchema; import com.databricks.sdk.service.catalog.SchemaInfo; import com.databricks.sdk.service.catalog.SchemasAPI; import com.databricks.sdk.service.catalog.TableInfo; @@ -135,7 +136,23 @@ public boolean hasDatabase(CatalogTableIdentifier tableIdentifier) { @Override public void createDatabase(CatalogTableIdentifier tableIdentifier) { - throw new UnsupportedOperationException("Databricks UC sync not implemented"); + HierarchicalTableIdentifier hierarchical = + CatalogUtils.toHierarchicalTableIdentifier(tableIdentifier); + String catalog = hierarchical.getCatalogName(); + if (StringUtils.isBlank(catalog)) { + throw new CatalogSyncException( + "Databricks UC requires a catalog name (expected catalog.schema.table)"); + } + String schema = hierarchical.getDatabaseName(); + if (StringUtils.isBlank(schema)) { + throw new CatalogSyncException("Databricks UC requires a schema name"); + } + + try { + schemasApi.create(new CreateSchema().setCatalogName(catalog).setName(schema)); + } catch (Exception e) { + throw new CatalogSyncException("Failed to create database: " + schema, e); + } } @Override diff --git a/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java b/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java index 5dc88d825..1f77d6fee 100644 --- a/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java +++ b/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java @@ -23,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -37,6 +38,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import com.databricks.sdk.core.error.platform.NotFound; +import com.databricks.sdk.service.catalog.CreateSchema; import com.databricks.sdk.service.catalog.SchemaInfo; import com.databricks.sdk.service.catalog.SchemasAPI; import com.databricks.sdk.service.catalog.TableInfo; @@ -220,6 +222,68 @@ void testHasDatabaseFailedStatement() { assertThrows(CatalogSyncException.class, () -> client.hasDatabase(tableIdentifier)); } + @Test + void testCreateDatabase() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + + client.createDatabase(tableIdentifier); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(CreateSchema.class); + verify(mockSchemasApi).create(requestCaptor.capture()); + CreateSchema request = requestCaptor.getValue(); + assertEquals("main", request.getCatalogName()); + assertEquals("default", request.getName()); + } + + @Test + void testCreateDatabaseFailure() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + + doThrow(new RuntimeException("boom")).when(mockSchemasApi).create(any(CreateSchema.class)); + + assertThrows(CatalogSyncException.class, () -> client.createDatabase(tableIdentifier)); + } + @Test void testGetTableFound() { Map props = new HashMap<>(); From 9767c5c0146e5c03ad38b08ae24160bf9eb0ce78 Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Tue, 10 Feb 2026 14:58:06 +0100 Subject: [PATCH 05/11] implem/test create or replace table --- .../DatabricksUnityCatalogSyncClient.java | 13 +++- .../TestDatabricksUnityCatalogSyncClient.java | 70 +++++++++++++++++++ 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java index 2c52721ac..1a0711207 100644 --- a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java +++ b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java @@ -191,7 +191,18 @@ public void refreshTable( @Override public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { - throw new UnsupportedOperationException("Databricks UC sync not implemented"); + ensureDeltaOnly(); + String fullName = getFullName(tableIdentifier); + String location = table.getBasePath(); + if (StringUtils.isBlank(location)) { + throw new CatalogSyncException("Storage location is required for external Delta tables"); + } + + String statement = + String.format( + "CREATE OR REPLACE TABLE %s USING DELTA LOCATION '%s'", + fullName, escapeSqlString(location)); + executeStatement(statement); } @Override diff --git a/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java b/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java index 1f77d6fee..57cae184e 100644 --- a/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java +++ b/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java @@ -134,6 +134,76 @@ void testCreateTableRejectsNonDelta() { assertThrows(CatalogSyncException.class, () -> client.createTable(table, tableIdentifier)); } + @Test + void testCreateOrReplaceTableDelta() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + when(mockStatementExecution.executeStatement(any(ExecuteStatementRequest.class))) + .thenReturn( + new StatementResponse() + .setStatus(new StatementStatus().setState(StatementState.SUCCEEDED))); + + InternalTable table = InternalTable.builder().basePath("s3://bucket/path").build(); + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + + client.createOrReplaceTable(table, tableIdentifier); + + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(ExecuteStatementRequest.class); + verify(mockStatementExecution).executeStatement(requestCaptor.capture()); + ExecuteStatementRequest request = requestCaptor.getValue(); + assertEquals( + "CREATE OR REPLACE TABLE main.default.people USING DELTA LOCATION 's3://bucket/path'", + request.getStatement()); + } + + @Test + void testCreateOrReplaceTableRejectsNonDelta() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.ICEBERG, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + InternalTable table = InternalTable.builder().basePath("s3://bucket/path").build(); + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + + assertThrows( + CatalogSyncException.class, () -> client.createOrReplaceTable(table, tableIdentifier)); + } + @Test void testHasDatabaseTrue() { Map props = new HashMap<>(); From c5ef4b57959c1476711aa0ab3299b8dfde5dd989 Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Tue, 10 Feb 2026 15:15:46 +0100 Subject: [PATCH 06/11] implem/test drop table --- .../DatabricksUnityCatalogSyncClient.java | 7 ++- .../TestDatabricksUnityCatalogSyncClient.java | 62 +++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java index 1a0711207..7e15c35a6 100644 --- a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java +++ b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java @@ -207,7 +207,12 @@ public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tab @Override public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { - throw new UnsupportedOperationException("Databricks UC sync not implemented"); + String fullName = getFullName(tableIdentifier); + try { + tablesApi.delete(fullName); + } catch (Exception e) { + throw new CatalogSyncException("Failed to drop table: " + fullName, e); + } } @Override diff --git a/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java b/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java index 57cae184e..69161458c 100644 --- a/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java +++ b/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java @@ -204,6 +204,68 @@ void testCreateOrReplaceTableRejectsNonDelta() { CatalogSyncException.class, () -> client.createOrReplaceTable(table, tableIdentifier)); } + @Test + void testDropTable() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + + client.dropTable(InternalTable.builder().basePath("s3://bucket/path").build(), tableIdentifier); + + verify(mockTablesApi).delete("main.default.people"); + } + + @Test + void testDropTableFailure() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + + doThrow(new RuntimeException("boom")).when(mockTablesApi).delete("main.default.people"); + + assertThrows( + CatalogSyncException.class, + () -> + client.dropTable( + InternalTable.builder().basePath("s3://bucket/path").build(), tableIdentifier)); + } + @Test void testHasDatabaseTrue() { Map props = new HashMap<>(); From 0d117eadcb7c139d54e12d2bb7edfc326289af2e Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Tue, 10 Feb 2026 15:17:12 +0100 Subject: [PATCH 07/11] implem: dummy refresh table not sure if unity catalog refresh by itself the table metadata. will have to investigate for now skip refresh table --- .../xtable/databricks/DatabricksUnityCatalogSyncClient.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java index 7e15c35a6..da2f8c3ef 100644 --- a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java +++ b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java @@ -186,7 +186,9 @@ public void createTable(InternalTable table, CatalogTableIdentifier tableIdentif @Override public void refreshTable( InternalTable table, TableInfo catalogTable, CatalogTableIdentifier tableIdentifier) { - throw new UnsupportedOperationException("Databricks UC sync not implemented"); + log.warn( + "Databricks UC refreshTable is not implemented. Skipping refresh for {}", + tableIdentifier.getId()); } @Override From af8cc83e27a8f091c4af2cb7d3f22a226e82c191 Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Tue, 10 Feb 2026 16:41:52 +0100 Subject: [PATCH 08/11] implem/test refresh table --- .../DatabricksUnityCatalogSyncClient.java | 252 +++++++++++++++++- .../TestDatabricksUnityCatalogSyncClient.java | 87 ++++++ 2 files changed, 335 insertions(+), 4 deletions(-) diff --git a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java index da2f8c3ef..2381536d9 100644 --- a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java +++ b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java @@ -18,6 +18,11 @@ package org.apache.xtable.databricks; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; import java.util.Objects; import lombok.extern.log4j.Log4j2; @@ -28,6 +33,7 @@ import com.databricks.sdk.WorkspaceClient; import com.databricks.sdk.core.DatabricksConfig; import com.databricks.sdk.core.error.platform.NotFound; +import com.databricks.sdk.service.catalog.ColumnInfo; import com.databricks.sdk.service.catalog.CreateSchema; import com.databricks.sdk.service.catalog.SchemaInfo; import com.databricks.sdk.service.catalog.SchemasAPI; @@ -47,6 +53,8 @@ import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.catalog.CatalogTableIdentifier; import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.storage.CatalogType; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.spi.sync.CatalogSyncClient; @@ -186,9 +194,97 @@ public void createTable(InternalTable table, CatalogTableIdentifier tableIdentif @Override public void refreshTable( InternalTable table, TableInfo catalogTable, CatalogTableIdentifier tableIdentifier) { - log.warn( - "Databricks UC refreshTable is not implemented. Skipping refresh for {}", - tableIdentifier.getId()); + ensureDeltaOnly(); + if (catalogTable == null) { + log.warn( + "Databricks UC refreshTable called with null catalog table for {}", + tableIdentifier.getId()); + return; + } + + InternalSchema schema = table.getReadSchema(); + if (schema == null || schema.getFields() == null || schema.getFields().isEmpty()) { + log.warn( + "Databricks UC refreshTable skipped due to missing schema for {}", + tableIdentifier.getId()); + return; + } + + String fullName = getFullName(tableIdentifier); + Map desired = buildDesiredColumns(schema); + Map existing = buildExistingColumns(catalogTable); + + List statements = new ArrayList<>(); + + // Add columns + List addColumnDefs = new ArrayList<>(); + for (DesiredColumn column : desired.values()) { + if (!existing.containsKey(column.normalizedName)) { + addColumnDefs.add(formatColumnDefinition(column)); + } + } + if (!addColumnDefs.isEmpty()) { + statements.add( + String.format( + "ALTER TABLE %s ADD COLUMNS (%s)", fullName, String.join(", ", addColumnDefs))); + } + + // Alter columns (type/comment/nullability) + for (DesiredColumn column : desired.values()) { + ColumnInfo current = existing.get(column.normalizedName); + if (current == null) { + continue; + } + String currentType = normalizeType(current.getTypeText()); + String desiredType = normalizeType(column.typeText); + if (!Objects.equals(currentType, desiredType)) { + statements.add( + String.format( + "ALTER TABLE %s ALTER COLUMN %s TYPE %s", + fullName, quoteIdentifier(column.name), column.typeText)); + } + + Boolean currentNullable = current.getNullable(); + boolean desiredNullable = column.nullable; + if (currentNullable != null && currentNullable.booleanValue() != desiredNullable) { + statements.add( + String.format( + "ALTER TABLE %s ALTER COLUMN %s %s", + fullName, + quoteIdentifier(column.name), + desiredNullable ? "DROP NOT NULL" : "SET NOT NULL")); + } + + String currentComment = current.getComment(); + String desiredComment = column.comment; + if (!Objects.equals( + StringUtils.defaultString(currentComment), StringUtils.defaultString(desiredComment))) { + String commentValue = StringUtils.defaultString(desiredComment); + statements.add( + String.format( + "ALTER TABLE %s ALTER COLUMN %s COMMENT '%s'", + fullName, quoteIdentifier(column.name), escapeSqlString(commentValue))); + } + } + + // Drop columns + for (ColumnInfo current : existing.values()) { + String normalized = normalizeName(current.getName()); + if (!desired.containsKey(normalized)) { + statements.add( + String.format( + "ALTER TABLE %s DROP COLUMN %s", fullName, quoteIdentifier(current.getName()))); + } + } + + if (statements.isEmpty()) { + log.info("Databricks UC refreshTable: no schema changes for {}", tableIdentifier.getId()); + return; + } + + for (String statement : statements) { + executeStatement(statement); + } } @Override @@ -281,7 +377,15 @@ private StatementResponse executeStatement(String statement) { StatementResponse response = statementExecution.executeStatement(request); if (response.getStatus() != null && response.getStatus().getState() == StatementState.FAILED) { - throw new CatalogSyncException("Databricks UC statement failed: " + statement); + String errorMessage = null; + if (response.getStatus().getError() != null) { + errorMessage = response.getStatus().getError().getMessage(); + } + if (StringUtils.isBlank(errorMessage)) { + throw new CatalogSyncException("Databricks UC statement failed: " + statement); + } + throw new CatalogSyncException( + "Databricks UC statement failed: " + statement + " (" + errorMessage + ")"); } return response; } @@ -308,6 +412,146 @@ private static String escapeSqlString(String value) { return value.replace("'", "''"); } + private static String quoteIdentifier(String value) { + return "`" + value.replace("`", "``") + "`"; + } + + private static String normalizeName(String value) { + return value == null ? "" : value.toLowerCase(Locale.ROOT); + } + + private static String normalizeType(String value) { + if (value == null) { + return ""; + } + return value.replaceAll("\\s+", "").toLowerCase(Locale.ROOT); + } + + private static String toSparkSqlType(InternalField field) { + switch (field.getSchema().getDataType()) { + case ENUM: + case STRING: + return "string"; + case INT: + return "int"; + case LONG: + return "bigint"; + case BYTES: + case FIXED: + case UUID: + return "binary"; + case BOOLEAN: + return "boolean"; + case FLOAT: + return "float"; + case DATE: + return "date"; + case TIMESTAMP: + return "timestamp"; + case TIMESTAMP_NTZ: + return "timestamp_ntz"; + case DOUBLE: + return "double"; + case DECIMAL: + int precision = + (int) field.getSchema().getMetadata().get(InternalSchema.MetadataKey.DECIMAL_PRECISION); + int scale = + (int) field.getSchema().getMetadata().get(InternalSchema.MetadataKey.DECIMAL_SCALE); + return String.format("decimal(%d,%d)", precision, scale); + case RECORD: + return toStructType(field.getSchema()); + case MAP: + InternalField key = + field.getSchema().getFields().stream() + .filter( + mapField -> + InternalField.Constants.MAP_KEY_FIELD_NAME.equals(mapField.getName())) + .findFirst() + .orElseThrow(() -> new CatalogSyncException("Invalid map schema")); + InternalField value = + field.getSchema().getFields().stream() + .filter( + mapField -> + InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName())) + .findFirst() + .orElseThrow(() -> new CatalogSyncException("Invalid map schema")); + return String.format("map<%s,%s>", toSparkSqlType(key), toSparkSqlType(value)); + case LIST: + InternalField element = + field.getSchema().getFields().stream() + .filter( + arrayField -> + InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals( + arrayField.getName())) + .findFirst() + .orElseThrow(() -> new CatalogSyncException("Invalid array schema")); + return String.format("array<%s>", toSparkSqlType(element)); + default: + throw new CatalogSyncException("Unsupported type: " + field.getSchema().getDataType()); + } + } + + private static String toStructType(InternalSchema schema) { + List fields = new ArrayList<>(); + for (InternalField field : schema.getFields()) { + fields.add(field.getName() + ":" + toSparkSqlType(field)); + } + return "struct<" + String.join(",", fields) + ">"; + } + + private static Map buildDesiredColumns(InternalSchema schema) { + Map result = new LinkedHashMap<>(); + for (InternalField field : schema.getFields()) { + String name = field.getName(); + String normalized = normalizeName(name); + String typeText = toSparkSqlType(field); + boolean nullable = field.getSchema().isNullable(); + String comment = field.getSchema().getComment(); + result.put(normalized, new DesiredColumn(name, normalized, typeText, nullable, comment)); + } + return result; + } + + private static Map buildExistingColumns(TableInfo tableInfo) { + Map result = new LinkedHashMap<>(); + if (tableInfo == null || tableInfo.getColumns() == null) { + return result; + } + for (ColumnInfo column : tableInfo.getColumns()) { + result.put(normalizeName(column.getName()), column); + } + return result; + } + + private static String formatColumnDefinition(DesiredColumn column) { + StringBuilder builder = new StringBuilder(); + builder.append(quoteIdentifier(column.name)).append(" ").append(column.typeText); + if (!column.nullable) { + builder.append(" NOT NULL"); + } + if (!StringUtils.isBlank(column.comment)) { + builder.append(" COMMENT '").append(escapeSqlString(column.comment)).append("'"); + } + return builder.toString(); + } + + private static final class DesiredColumn { + private final String name; + private final String normalizedName; + private final String typeText; + private final boolean nullable; + private final String comment; + + private DesiredColumn( + String name, String normalizedName, String typeText, boolean nullable, String comment) { + this.name = name; + this.normalizedName = normalizedName; + this.typeText = typeText; + this.nullable = nullable; + this.comment = comment; + } + } + @Override public void close() { // WorkspaceClient has no explicit close hook; no-op for now. diff --git a/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java b/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java index 69161458c..e52a923d1 100644 --- a/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java +++ b/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.when; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; @@ -38,6 +39,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import com.databricks.sdk.core.error.platform.NotFound; +import com.databricks.sdk.service.catalog.ColumnInfo; import com.databricks.sdk.service.catalog.CreateSchema; import com.databricks.sdk.service.catalog.SchemaInfo; import com.databricks.sdk.service.catalog.SchemasAPI; @@ -53,6 +55,9 @@ import org.apache.xtable.exception.CatalogSyncException; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; import org.apache.xtable.model.storage.CatalogType; import org.apache.xtable.model.storage.TableFormat; @@ -204,6 +209,88 @@ void testCreateOrReplaceTableRejectsNonDelta() { CatalogSyncException.class, () -> client.createOrReplaceTable(table, tableIdentifier)); } + @Test + void testRefreshTableSchemaEvolution() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + when(mockStatementExecution.executeStatement(any(ExecuteStatementRequest.class))) + .thenReturn( + new StatementResponse() + .setStatus(new StatementStatus().setState(StatementState.SUCCEEDED))); + + InternalSchema idSchema = + InternalSchema.builder() + .name("id") + .dataType(InternalType.INT) + .isNullable(false) + .comment("new") + .build(); + InternalSchema ageSchema = + InternalSchema.builder().name("age").dataType(InternalType.INT).isNullable(true).build(); + InternalSchema readSchema = + InternalSchema.builder() + .name("root") + .dataType(InternalType.RECORD) + .isNullable(true) + .fields( + java.util.Arrays.asList( + InternalField.builder().name("id").schema(idSchema).build(), + InternalField.builder().name("age").schema(ageSchema).build())) + .build(); + + InternalTable table = + InternalTable.builder().readSchema(readSchema).basePath("s3://bucket/path").build(); + TableInfo catalogTable = + new TableInfo() + .setColumns( + java.util.Arrays.asList( + new ColumnInfo() + .setName("id") + .setTypeText("int") + .setNullable(true) + .setComment("old"), + new ColumnInfo().setName("name").setTypeText("string").setNullable(true))); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + + client.refreshTable(table, catalogTable, tableIdentifier); + + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(ExecuteStatementRequest.class); + verify(mockStatementExecution, org.mockito.Mockito.times(4)) + .executeStatement(requestCaptor.capture()); + + List requests = requestCaptor.getAllValues(); + assertEquals( + "ALTER TABLE main.default.people ADD COLUMNS (`age` int)", requests.get(0).getStatement()); + assertEquals( + "ALTER TABLE main.default.people ALTER COLUMN `id` SET NOT NULL", + requests.get(1).getStatement()); + assertEquals( + "ALTER TABLE main.default.people ALTER COLUMN `id` COMMENT 'new'", + requests.get(2).getStatement()); + assertEquals( + "ALTER TABLE main.default.people DROP COLUMN `name`", requests.get(3).getStatement()); + } + @Test void testDropTable() { Map props = new HashMap<>(); From dc22a1e6c488fe33b0a336dfe1e45fc40bb85095 Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Tue, 10 Feb 2026 17:30:53 +0100 Subject: [PATCH 09/11] fix: refresh table to recreate the table when diff - alter column stmt is meant for delta, not uc so it tries to upgrade delta logs - refresh table stmt does not refresh the columns - create or replace stmt is also meant for delta, not uc so tries to upgrade delta logs - drop table + create new table: does the work, also sounds like uc does keep track of the table history so we don't loose associated stuff (top users, related assets, creation date...) --- .../DatabricksUnityCatalogSyncClient.java | 239 +++++++----------- .../TestDatabricksUnityCatalogSyncClient.java | 41 +-- 2 files changed, 101 insertions(+), 179 deletions(-) diff --git a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java index 2381536d9..a69a88bb1 100644 --- a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java +++ b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java @@ -18,9 +18,7 @@ package org.apache.xtable.databricks; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; +import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Objects; @@ -188,6 +186,7 @@ public void createTable(InternalTable table, CatalogTableIdentifier tableIdentif String.format( "CREATE TABLE IF NOT EXISTS %s USING DELTA LOCATION '%s'", fullName, escapeSqlString(location)); + log.info("Databricks UC create table: {}", fullName); executeStatement(statement); } @@ -201,7 +200,6 @@ public void refreshTable( tableIdentifier.getId()); return; } - InternalSchema schema = table.getReadSchema(); if (schema == null || schema.getFields() == null || schema.getFields().isEmpty()) { log.warn( @@ -209,104 +207,26 @@ public void refreshTable( tableIdentifier.getId()); return; } - - String fullName = getFullName(tableIdentifier); - Map desired = buildDesiredColumns(schema); - Map existing = buildExistingColumns(catalogTable); - - List statements = new ArrayList<>(); - - // Add columns - List addColumnDefs = new ArrayList<>(); - for (DesiredColumn column : desired.values()) { - if (!existing.containsKey(column.normalizedName)) { - addColumnDefs.add(formatColumnDefinition(column)); - } - } - if (!addColumnDefs.isEmpty()) { - statements.add( - String.format( - "ALTER TABLE %s ADD COLUMNS (%s)", fullName, String.join(", ", addColumnDefs))); - } - - // Alter columns (type/comment/nullability) - for (DesiredColumn column : desired.values()) { - ColumnInfo current = existing.get(column.normalizedName); - if (current == null) { - continue; - } - String currentType = normalizeType(current.getTypeText()); - String desiredType = normalizeType(column.typeText); - if (!Objects.equals(currentType, desiredType)) { - statements.add( - String.format( - "ALTER TABLE %s ALTER COLUMN %s TYPE %s", - fullName, quoteIdentifier(column.name), column.typeText)); - } - - Boolean currentNullable = current.getNullable(); - boolean desiredNullable = column.nullable; - if (currentNullable != null && currentNullable.booleanValue() != desiredNullable) { - statements.add( - String.format( - "ALTER TABLE %s ALTER COLUMN %s %s", - fullName, - quoteIdentifier(column.name), - desiredNullable ? "DROP NOT NULL" : "SET NOT NULL")); - } - - String currentComment = current.getComment(); - String desiredComment = column.comment; - if (!Objects.equals( - StringUtils.defaultString(currentComment), StringUtils.defaultString(desiredComment))) { - String commentValue = StringUtils.defaultString(desiredComment); - statements.add( - String.format( - "ALTER TABLE %s ALTER COLUMN %s COMMENT '%s'", - fullName, quoteIdentifier(column.name), escapeSqlString(commentValue))); - } - } - - // Drop columns - for (ColumnInfo current : existing.values()) { - String normalized = normalizeName(current.getName()); - if (!desired.containsKey(normalized)) { - statements.add( - String.format( - "ALTER TABLE %s DROP COLUMN %s", fullName, quoteIdentifier(current.getName()))); - } - } - - if (statements.isEmpty()) { - log.info("Databricks UC refreshTable: no schema changes for {}", tableIdentifier.getId()); - return; - } - - for (String statement : statements) { - executeStatement(statement); + if (!schemasMatch(schema, catalogTable)) { + createOrReplaceTable(table, tableIdentifier); + } else { + log.info( + "Databricks UC refreshTable: schema already up to date for {}", tableIdentifier.getId()); } } @Override public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { ensureDeltaOnly(); - String fullName = getFullName(tableIdentifier); - String location = table.getBasePath(); - if (StringUtils.isBlank(location)) { - throw new CatalogSyncException("Storage location is required for external Delta tables"); - } - - String statement = - String.format( - "CREATE OR REPLACE TABLE %s USING DELTA LOCATION '%s'", - fullName, escapeSqlString(location)); - executeStatement(statement); + dropTable(table, tableIdentifier); + createTable(table, tableIdentifier); } @Override public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { String fullName = getFullName(tableIdentifier); try { + log.info("Databricks UC drop table: {}", fullName); tablesApi.delete(fullName); } catch (Exception e) { throw new CatalogSyncException("Failed to drop table: " + fullName, e); @@ -412,8 +332,80 @@ private static String escapeSqlString(String value) { return value.replace("'", "''"); } - private static String quoteIdentifier(String value) { - return "`" + value.replace("`", "``") + "`"; + private static boolean schemasMatch(InternalSchema desired, TableInfo existing) { + Map desiredColumns = toColumnSignatureMap(desired); + Map existingColumns = toColumnSignatureMap(existing); + if (desiredColumns.size() != existingColumns.size()) { + return false; + } + for (Map.Entry entry : desiredColumns.entrySet()) { + ColumnSignature existingSignature = existingColumns.get(entry.getKey()); + if (existingSignature == null) { + return false; + } + if (!entry.getValue().equals(existingSignature)) { + return false; + } + } + return true; + } + + private static Map toColumnSignatureMap(InternalSchema schema) { + Map result = new HashMap<>(); + for (InternalField field : schema.getFields()) { + String name = normalizeName(field.getName()); + String type = normalizeType(toSparkSqlType(field)); + boolean nullable = field.getSchema().isNullable(); + String comment = StringUtils.defaultString(field.getSchema().getComment()); + result.put(name, new ColumnSignature(type, nullable, comment)); + } + return result; + } + + private static Map toColumnSignatureMap(TableInfo tableInfo) { + Map result = new HashMap<>(); + if (tableInfo == null || tableInfo.getColumns() == null) { + return result; + } + for (ColumnInfo column : tableInfo.getColumns()) { + String name = normalizeName(column.getName()); + String type = normalizeType(column.getTypeText()); + boolean nullable = column.getNullable() == null || column.getNullable(); + String comment = StringUtils.defaultString(column.getComment()); + result.put(name, new ColumnSignature(type, nullable, comment)); + } + return result; + } + + private static final class ColumnSignature { + private final String type; + private final boolean nullable; + private final String comment; + + private ColumnSignature(String type, boolean nullable, String comment) { + this.type = type; + this.nullable = nullable; + this.comment = comment; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ColumnSignature that = (ColumnSignature) o; + return nullable == that.nullable + && Objects.equals(type, that.type) + && Objects.equals(comment, that.comment); + } + + @Override + public int hashCode() { + return Objects.hash(type, nullable, comment); + } } private static String normalizeName(String value) { @@ -492,66 +484,19 @@ private static String toSparkSqlType(InternalField field) { } private static String toStructType(InternalSchema schema) { - List fields = new ArrayList<>(); - for (InternalField field : schema.getFields()) { - fields.add(field.getName() + ":" + toSparkSqlType(field)); - } - return "struct<" + String.join(",", fields) + ">"; - } - - private static Map buildDesiredColumns(InternalSchema schema) { - Map result = new LinkedHashMap<>(); + StringBuilder builder = new StringBuilder("struct<"); + boolean first = true; for (InternalField field : schema.getFields()) { - String name = field.getName(); - String normalized = normalizeName(name); - String typeText = toSparkSqlType(field); - boolean nullable = field.getSchema().isNullable(); - String comment = field.getSchema().getComment(); - result.put(normalized, new DesiredColumn(name, normalized, typeText, nullable, comment)); - } - return result; - } - - private static Map buildExistingColumns(TableInfo tableInfo) { - Map result = new LinkedHashMap<>(); - if (tableInfo == null || tableInfo.getColumns() == null) { - return result; - } - for (ColumnInfo column : tableInfo.getColumns()) { - result.put(normalizeName(column.getName()), column); - } - return result; - } - - private static String formatColumnDefinition(DesiredColumn column) { - StringBuilder builder = new StringBuilder(); - builder.append(quoteIdentifier(column.name)).append(" ").append(column.typeText); - if (!column.nullable) { - builder.append(" NOT NULL"); - } - if (!StringUtils.isBlank(column.comment)) { - builder.append(" COMMENT '").append(escapeSqlString(column.comment)).append("'"); + if (!first) { + builder.append(","); + } + builder.append(field.getName()).append(":").append(toSparkSqlType(field)); + first = false; } + builder.append(">"); return builder.toString(); } - private static final class DesiredColumn { - private final String name; - private final String normalizedName; - private final String typeText; - private final boolean nullable; - private final String comment; - - private DesiredColumn( - String name, String normalizedName, String typeText, boolean nullable, String comment) { - this.name = name; - this.normalizedName = normalizedName; - this.typeText = typeText; - this.nullable = nullable; - this.comment = comment; - } - } - @Override public void close() { // WorkspaceClient has no explicit close hook; no-op for now. diff --git a/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java b/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java index e52a923d1..a68f30e8a 100644 --- a/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java +++ b/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java @@ -28,7 +28,6 @@ import static org.mockito.Mockito.when; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; @@ -174,9 +173,10 @@ void testCreateOrReplaceTableDelta() { ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(ExecuteStatementRequest.class); verify(mockStatementExecution).executeStatement(requestCaptor.capture()); + verify(mockTablesApi).delete("main.default.people"); ExecuteStatementRequest request = requestCaptor.getValue(); assertEquals( - "CREATE OR REPLACE TABLE main.default.people USING DELTA LOCATION 's3://bucket/path'", + "CREATE TABLE IF NOT EXISTS main.default.people USING DELTA LOCATION 's3://bucket/path'", request.getStatement()); } @@ -236,14 +236,7 @@ void testRefreshTableSchemaEvolution() { .setStatus(new StatementStatus().setState(StatementState.SUCCEEDED))); InternalSchema idSchema = - InternalSchema.builder() - .name("id") - .dataType(InternalType.INT) - .isNullable(false) - .comment("new") - .build(); - InternalSchema ageSchema = - InternalSchema.builder().name("age").dataType(InternalType.INT).isNullable(true).build(); + InternalSchema.builder().name("id").dataType(InternalType.INT).isNullable(true).build(); InternalSchema readSchema = InternalSchema.builder() .name("root") @@ -251,21 +244,14 @@ void testRefreshTableSchemaEvolution() { .isNullable(true) .fields( java.util.Arrays.asList( - InternalField.builder().name("id").schema(idSchema).build(), - InternalField.builder().name("age").schema(ageSchema).build())) + InternalField.builder().name("id").schema(idSchema).build())) .build(); - InternalTable table = - InternalTable.builder().readSchema(readSchema).basePath("s3://bucket/path").build(); + InternalTable.builder().basePath("s3://bucket/path").readSchema(readSchema).build(); TableInfo catalogTable = new TableInfo() .setColumns( java.util.Arrays.asList( - new ColumnInfo() - .setName("id") - .setTypeText("int") - .setNullable(true) - .setComment("old"), new ColumnInfo().setName("name").setTypeText("string").setNullable(true))); ThreePartHierarchicalTableIdentifier tableIdentifier = @@ -275,20 +261,11 @@ void testRefreshTableSchemaEvolution() { ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(ExecuteStatementRequest.class); - verify(mockStatementExecution, org.mockito.Mockito.times(4)) - .executeStatement(requestCaptor.capture()); - - List requests = requestCaptor.getAllValues(); - assertEquals( - "ALTER TABLE main.default.people ADD COLUMNS (`age` int)", requests.get(0).getStatement()); - assertEquals( - "ALTER TABLE main.default.people ALTER COLUMN `id` SET NOT NULL", - requests.get(1).getStatement()); - assertEquals( - "ALTER TABLE main.default.people ALTER COLUMN `id` COMMENT 'new'", - requests.get(2).getStatement()); + verify(mockStatementExecution).executeStatement(requestCaptor.capture()); + verify(mockTablesApi).delete("main.default.people"); assertEquals( - "ALTER TABLE main.default.people DROP COLUMN `name`", requests.get(3).getStatement()); + "CREATE TABLE IF NOT EXISTS main.default.people USING DELTA LOCATION 's3://bucket/path'", + requestCaptor.getValue().getStatement()); } @Test From 0cef043fc10e5098e58aa2a1059d0a9ed4fbe612 Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Wed, 11 Feb 2026 11:38:18 +0100 Subject: [PATCH 10/11] doc --- website/docs/unity-catalog.md | 132 ++++++++++++++++++++++++++++++---- website/sidebars.js | 2 +- 2 files changed, 120 insertions(+), 14 deletions(-) diff --git a/website/docs/unity-catalog.md b/website/docs/unity-catalog.md index cc5ccb0d4..fc2372d8a 100644 --- a/website/docs/unity-catalog.md +++ b/website/docs/unity-catalog.md @@ -4,29 +4,36 @@ title: "Unity Catalog" --- # Syncing to Unity Catalog -This document walks through the steps to register an Apache XTable™ (Incubating) synced Delta table in Unity Catalog on Databricks and open-source Unity Catalog. + +This page covers **Databricks Unity Catalog** and **open-source Unity Catalog**. They are different systems: +Databricks Unity Catalog is a managed service in Databricks, while open-source Unity Catalog is a standalone server. +Both support **Delta external tables only** as Unity Catalog targets; Iceberg/Hudi are not supported as UC targets. ## Pre-requisites (for Databricks Unity Catalog) + 1. Source table(s) (Hudi/Iceberg) already written to external storage locations like S3/GCS/ADLS. If you don't have a source table written in S3/GCS/ADLS, you can follow the steps in [this](/docs/hms) tutorial to set it up. 2. Setup connection to external storage locations from Databricks. - * Follow the steps outlined [here](https://docs.databricks.com/en/storage/amazon-s3.html) for Amazon S3 - * Follow the steps outlined [here](https://docs.databricks.com/en/storage/gcs.html) for Google Cloud Storage - * Follow the steps outlined [here](https://docs.databricks.com/en/storage/azure-storage.html) for Azure Data Lake Storage Gen2 and Blob Storage. + - Follow the steps outlined [here](https://docs.databricks.com/en/storage/amazon-s3.html) for Amazon S3 + - Follow the steps outlined [here](https://docs.databricks.com/en/storage/gcs.html) for Google Cloud Storage + - Follow the steps outlined [here](https://docs.databricks.com/en/storage/azure-storage.html) for Azure Data Lake Storage Gen2 and Blob Storage. 3. Create a Unity Catalog metastore in Databricks as outlined [here](https://docs.gcp.databricks.com/data-governance/unity-catalog/create-metastore.html#create-a-unity-catalog-metastore). 4. Create an external location in Databricks as outlined [here](https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-ddl-create-location.html). 5. Clone the Apache XTable™ (Incubating) [repository](https://github.com/apache/incubator-xtable) and create the `xtable-utilities_2.12-0.2.0-SNAPSHOT-bundled.jar` by following the steps on the [Installation page](/docs/setup) ## Pre-requisites (for open-source Unity Catalog) + 1. Source table(s) (Hudi/Iceberg) already written to external storage locations like S3/GCS/ADLS or local. - In this guide, we will use the local file system. + In this guide, we will use the local file system. But for S3/GCS/ADLS, you must add additional properties related to the respective cloud object storage system you're working with as mentioned [here](https://github.com/unitycatalog/unitycatalog/blob/main/docs/server.md) 2. Clone the Unity Catalog repository from [here](https://github.com/unitycatalog/unitycatalog) and build the project by following the steps outlined [here](https://github.com/unitycatalog/unitycatalog?tab=readme-ov-file#prerequisites) ## Steps + ### Running sync + Create `my_config.yaml` in the cloned Apache XTable™ (Incubating) directory. ```yaml md title="yaml" @@ -34,16 +41,17 @@ sourceFormat: HUDI|ICEBERG # choose only one targetFormats: - DELTA datasets: - - - tableBasePath: s3://path/to/source/data + - tableBasePath: s3://path/to/source/data tableName: table_name partitionSpec: partitionpath:VALUE # you only need to specify partitionSpec for HUDI sourceFormat ``` + :::note Note: + 1. Replace `s3://path/to/source/data` to `gs://path/to/source/data` if you have your source table in GCS and `abfss://@.dfs.core.windows.net/` if you have your source table in ADLS. -2. And replace with appropriate values for `sourceFormat`, and `tableName` fields. -::: +2. And replace with appropriate values for `sourceFormat`, and `tableName` fields. + ::: From your terminal under the cloned Apache XTable™ (Incubating) directory, run the sync process using the below command. @@ -51,12 +59,13 @@ From your terminal under the cloned Apache XTable™ (Incubating) directory, run java -jar xtable-utilities/target/xtable-utilities_2.12-0.2.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml ``` -:::tip Note: -At this point, if you check your bucket path, you will be able to see `_delta_log` directory with +:::tip Note: +At this point, if you check your bucket path, you will be able to see `_delta_log` directory with 00000000000000000000.json which contains the logs that helps query engines to interpret the source table as a Delta table. ::: -### Register the target table in Databricks Unity Catalog +### Databricks Unity Catalog: manual registration (SQL) + (After making sure you complete the pre-requisites mentioned for Databricks Unity Catalog above) In your Databricks workspace, under SQL editor, run the following queries. ```sql md title="SQL" @@ -68,12 +77,14 @@ CREATE TABLE xtable.synced_delta_schema. USING DELTA LOCATION 's3://path/to/source/data'; ``` + :::note Note: Replace `s3://path/to/source/data` to `gs://path/to/source/data` if you have your source table in GCS and `abfss://@.dfs.core.windows.net/` if you have your source table in ADLS. ::: -### Validating the results +### Validating the results (Databricks) + You can now see the created delta table in **Unity Catalog** under **Catalog** as `` under `synced_delta_schema` and also query the table in the SQL editor: @@ -82,6 +93,7 @@ SELECT * FROM xtable.synced_delta_schema.; ``` ### Register the target table in open-source Unity Catalog using the CLI + (After making sure you complete the pre-requisites mentioned for open-source Unity Catalog above) In your terminal start the UC server by following the steps outlined [here](https://github.com/unitycatalog/unitycatalog/tree/main?tab=readme-ov-file#quickstart---hello-uc) In a different terminal, run the following commands to register the target table in Unity Catalog. @@ -91,14 +103,108 @@ bin/uc table create --full_name unity.default.people --columns "id INT, name STR ``` ### Validating the results + You can now read the table registered in Unity Catalog using the below command. ```shell md title="shell" bin/uc table read --full_name unity.default.people ``` +### Databricks Unity Catalog: built-in catalog sync (XTable) + +XTable can also register the Delta table directly in Databricks Unity Catalog using the catalog +sync configuration. This uses the Databricks Java SDK and issues DDL against a SQL Warehouse. + +```yaml md title="yaml" +sourceCatalog: + catalogId: source + catalogType: STORAGE + catalogProperties: {} +targetCatalogs: + - catalogId: uc + catalogType: DATABRICKS_UC + catalogProperties: + externalCatalog.uc.host: https:// + externalCatalog.uc.warehouseId: + # OAuth M2M (recommended) + externalCatalog.uc.authType: oauth-m2m + externalCatalog.uc.clientId: + externalCatalog.uc.clientSecret: +datasets: + - sourceCatalogTableIdentifier: + tableIdentifier: + hierarchicalId: . + partitionSpec: partitionpath:VALUE + storageIdentifier: + tableFormat: HUDI + tableBasePath: s3://path/to/source/data + tableDataPath: s3://path/to/source/data + tableName:
+ partitionSpec: partitionpath:VALUE + namespace: + targetCatalogTableIdentifiers: + - catalogId: uc + tableFormat: DELTA + tableIdentifier: + hierarchicalId: ..
+``` + +### Authentication (Databricks UC) + +**Supported now** + +- OAuth M2M via: + - `externalCatalog.uc.authType: oauth-m2m` + - `externalCatalog.uc.clientId` + - `externalCatalog.uc.clientSecret` + +**Not supported yet** + +- PAT/token-based auth is intentionally not wired in the current XTable UC integration. + +**Possible later** + +- PAT or other auth flows could be added by extending the UC config and SDK wiring, + but they are out of scope for now. + +### Implementation details (Databricks UC) + +- XTable uses the Databricks SQL Statement Execution API (`StatementExecutionAPI`) to run DDL + against a SQL Warehouse. +- The built-in sync registers **external Delta tables** only: + - `CREATE TABLE IF NOT EXISTS ..
USING DELTA LOCATION ''` +- Schema evolution currently uses **drop + recreate** of the UC table metadata (not data). + See the limitations section below. + +### Schema evolution limitations (Databricks UC) + +Unity Catalog does not provide a catalog-only schema evolution API for external tables. +While `ALTER TABLE ...` can update the catalog, it also assumes control over the Delta +transaction log. For external tables managed outside Databricks, this can be unsafe. + +To avoid mutating the Delta log, XTable currently: + +1. Detects any schema differences (new columns, dropped columns, type/comment changes). +2. Drops the UC table metadata. +3. Recreates the table at the same location. + +This approach **does not delete data** and typically preserves metadata such as statistics, +usage, and lineage because Unity Catalog keeps that information even after a drop/recreate. +However, it causes a short period (seconds) where the table is not accessible. +Schema evolution is usually rare in production pipelines, so this trade-off is considered acceptable. + +### Databricks UC limitations and requirements + +- Unity Catalog enforces **unique external locations**. A location used by another + table/volume cannot be reused. This means you cannot register multiple external tables + (e.g., Iceberg/Delta via Glue federation and Databricks UC external) pointing to the same location. +- Ensure your Unity Catalog storage credential has **write** access to the Delta + `_delta_log` directory at the table location. + ## Conclusion + In this guide we saw how to, + 1. sync a source table to create metadata for the desired target table formats using Apache XTable™ (Incubating) 2. catalog the data in Delta format in Unity Catalog on Databricks, and also open-source Unity Catalog 3. query the Delta table using Databricks SQL editor, and open-source Unity Catalog CLI. diff --git a/website/sidebars.js b/website/sidebars.js index 9f0b55943..d39effb9e 100644 --- a/website/sidebars.js +++ b/website/sidebars.js @@ -38,7 +38,7 @@ module.exports = { items: [ 'hms', 'glue-catalog', - 'unity-catalog', + 'databricks-unity-catalog', 'biglake-metastore', ], }, From 8e971fb284fe30bca78cbc75c4529e3c03d0cc4f Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Wed, 11 Feb 2026 15:32:02 +0100 Subject: [PATCH 11/11] fix: use msck repair in place of replacetable for refresh --- website/docs/unity-catalog.md | 12 +++++------- .../databricks/DatabricksUnityCatalogSyncClient.java | 4 +++- .../TestDatabricksUnityCatalogSyncClient.java | 3 +-- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/website/docs/unity-catalog.md b/website/docs/unity-catalog.md index fc2372d8a..ff6f7060d 100644 --- a/website/docs/unity-catalog.md +++ b/website/docs/unity-catalog.md @@ -173,8 +173,8 @@ datasets: against a SQL Warehouse. - The built-in sync registers **external Delta tables** only: - `CREATE TABLE IF NOT EXISTS ..
USING DELTA LOCATION ''` -- Schema evolution currently uses **drop + recreate** of the UC table metadata (not data). - See the limitations section below. +- Schema evolution currently runs `MSCK REPAIR TABLE
SYNC METADATA` when a schema diff is + detected, to refresh catalog metadata without touching the Delta log. ### Schema evolution limitations (Databricks UC) @@ -185,12 +185,10 @@ transaction log. For external tables managed outside Databricks, this can be uns To avoid mutating the Delta log, XTable currently: 1. Detects any schema differences (new columns, dropped columns, type/comment changes). -2. Drops the UC table metadata. -3. Recreates the table at the same location. +2. Runs `MSCK REPAIR TABLE
SYNC METADATA` to refresh UC metadata. -This approach **does not delete data** and typically preserves metadata such as statistics, -usage, and lineage because Unity Catalog keeps that information even after a drop/recreate. -However, it causes a short period (seconds) where the table is not accessible. +This approach **does not delete data** and avoids modifying the Delta log. It refreshes +catalog metadata in-place via `MSCK REPAIR TABLE ... SYNC METADATA`. Schema evolution is usually rare in production pipelines, so this trade-off is considered acceptable. ### Databricks UC limitations and requirements diff --git a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java index a69a88bb1..469ac3a4a 100644 --- a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java +++ b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java @@ -208,7 +208,9 @@ public void refreshTable( return; } if (!schemasMatch(schema, catalogTable)) { - createOrReplaceTable(table, tableIdentifier); + String fullName = getFullName(tableIdentifier); + log.info("Databricks UC refresh table metadata (MSCK REPAIR TABLE): {}", fullName); + executeStatement(String.format("MSCK REPAIR TABLE %s SYNC METADATA", fullName)); } else { log.info( "Databricks UC refreshTable: schema already up to date for {}", tableIdentifier.getId()); diff --git a/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java b/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java index a68f30e8a..f760a48aa 100644 --- a/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java +++ b/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java @@ -262,9 +262,8 @@ void testRefreshTableSchemaEvolution() { ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(ExecuteStatementRequest.class); verify(mockStatementExecution).executeStatement(requestCaptor.capture()); - verify(mockTablesApi).delete("main.default.people"); assertEquals( - "CREATE TABLE IF NOT EXISTS main.default.people USING DELTA LOCATION 's3://bucket/path'", + "MSCK REPAIR TABLE main.default.people SYNC METADATA", requestCaptor.getValue().getStatement()); }