diff --git a/pom.xml b/pom.xml index c6c4a833a..e7cb679f0 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ xtable-core xtable-utilities xtable-aws + xtable-databricks xtable-hive-metastore xtable-service diff --git a/website/docs/unity-catalog.md b/website/docs/unity-catalog.md index cc5ccb0d4..ff6f7060d 100644 --- a/website/docs/unity-catalog.md +++ b/website/docs/unity-catalog.md @@ -4,29 +4,36 @@ title: "Unity Catalog" --- # Syncing to Unity Catalog -This document walks through the steps to register an Apache XTable™ (Incubating) synced Delta table in Unity Catalog on Databricks and open-source Unity Catalog. + +This page covers **Databricks Unity Catalog** and **open-source Unity Catalog**. They are different systems: +Databricks Unity Catalog is a managed service in Databricks, while open-source Unity Catalog is a standalone server. +Both support **Delta external tables only** as Unity Catalog targets; Iceberg/Hudi are not supported as UC targets. ## Pre-requisites (for Databricks Unity Catalog) + 1. Source table(s) (Hudi/Iceberg) already written to external storage locations like S3/GCS/ADLS. If you don't have a source table written in S3/GCS/ADLS, you can follow the steps in [this](/docs/hms) tutorial to set it up. 2. Setup connection to external storage locations from Databricks. - * Follow the steps outlined [here](https://docs.databricks.com/en/storage/amazon-s3.html) for Amazon S3 - * Follow the steps outlined [here](https://docs.databricks.com/en/storage/gcs.html) for Google Cloud Storage - * Follow the steps outlined [here](https://docs.databricks.com/en/storage/azure-storage.html) for Azure Data Lake Storage Gen2 and Blob Storage. + - Follow the steps outlined [here](https://docs.databricks.com/en/storage/amazon-s3.html) for Amazon S3 + - Follow the steps outlined [here](https://docs.databricks.com/en/storage/gcs.html) for Google Cloud Storage + - Follow the steps outlined [here](https://docs.databricks.com/en/storage/azure-storage.html) for Azure Data Lake Storage Gen2 and Blob Storage. 3. Create a Unity Catalog metastore in Databricks as outlined [here](https://docs.gcp.databricks.com/data-governance/unity-catalog/create-metastore.html#create-a-unity-catalog-metastore). 4. Create an external location in Databricks as outlined [here](https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-ddl-create-location.html). 5. Clone the Apache XTable™ (Incubating) [repository](https://github.com/apache/incubator-xtable) and create the `xtable-utilities_2.12-0.2.0-SNAPSHOT-bundled.jar` by following the steps on the [Installation page](/docs/setup) ## Pre-requisites (for open-source Unity Catalog) + 1. Source table(s) (Hudi/Iceberg) already written to external storage locations like S3/GCS/ADLS or local. - In this guide, we will use the local file system. + In this guide, we will use the local file system. But for S3/GCS/ADLS, you must add additional properties related to the respective cloud object storage system you're working with as mentioned [here](https://github.com/unitycatalog/unitycatalog/blob/main/docs/server.md) 2. Clone the Unity Catalog repository from [here](https://github.com/unitycatalog/unitycatalog) and build the project by following the steps outlined [here](https://github.com/unitycatalog/unitycatalog?tab=readme-ov-file#prerequisites) ## Steps + ### Running sync + Create `my_config.yaml` in the cloned Apache XTable™ (Incubating) directory. ```yaml md title="yaml" @@ -34,16 +41,17 @@ sourceFormat: HUDI|ICEBERG # choose only one targetFormats: - DELTA datasets: - - - tableBasePath: s3://path/to/source/data + - tableBasePath: s3://path/to/source/data tableName: table_name partitionSpec: partitionpath:VALUE # you only need to specify partitionSpec for HUDI sourceFormat ``` + :::note Note: + 1. Replace `s3://path/to/source/data` to `gs://path/to/source/data` if you have your source table in GCS and `abfss://@.dfs.core.windows.net/` if you have your source table in ADLS. -2. And replace with appropriate values for `sourceFormat`, and `tableName` fields. -::: +2. And replace with appropriate values for `sourceFormat`, and `tableName` fields. + ::: From your terminal under the cloned Apache XTable™ (Incubating) directory, run the sync process using the below command. @@ -51,12 +59,13 @@ From your terminal under the cloned Apache XTable™ (Incubating) directory, run java -jar xtable-utilities/target/xtable-utilities_2.12-0.2.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml ``` -:::tip Note: -At this point, if you check your bucket path, you will be able to see `_delta_log` directory with +:::tip Note: +At this point, if you check your bucket path, you will be able to see `_delta_log` directory with 00000000000000000000.json which contains the logs that helps query engines to interpret the source table as a Delta table. ::: -### Register the target table in Databricks Unity Catalog +### Databricks Unity Catalog: manual registration (SQL) + (After making sure you complete the pre-requisites mentioned for Databricks Unity Catalog above) In your Databricks workspace, under SQL editor, run the following queries. ```sql md title="SQL" @@ -68,12 +77,14 @@ CREATE TABLE xtable.synced_delta_schema. USING DELTA LOCATION 's3://path/to/source/data'; ``` + :::note Note: Replace `s3://path/to/source/data` to `gs://path/to/source/data` if you have your source table in GCS and `abfss://@.dfs.core.windows.net/` if you have your source table in ADLS. ::: -### Validating the results +### Validating the results (Databricks) + You can now see the created delta table in **Unity Catalog** under **Catalog** as `` under `synced_delta_schema` and also query the table in the SQL editor: @@ -82,6 +93,7 @@ SELECT * FROM xtable.synced_delta_schema.; ``` ### Register the target table in open-source Unity Catalog using the CLI + (After making sure you complete the pre-requisites mentioned for open-source Unity Catalog above) In your terminal start the UC server by following the steps outlined [here](https://github.com/unitycatalog/unitycatalog/tree/main?tab=readme-ov-file#quickstart---hello-uc) In a different terminal, run the following commands to register the target table in Unity Catalog. @@ -91,14 +103,106 @@ bin/uc table create --full_name unity.default.people --columns "id INT, name STR ``` ### Validating the results + You can now read the table registered in Unity Catalog using the below command. ```shell md title="shell" bin/uc table read --full_name unity.default.people ``` +### Databricks Unity Catalog: built-in catalog sync (XTable) + +XTable can also register the Delta table directly in Databricks Unity Catalog using the catalog +sync configuration. This uses the Databricks Java SDK and issues DDL against a SQL Warehouse. + +```yaml md title="yaml" +sourceCatalog: + catalogId: source + catalogType: STORAGE + catalogProperties: {} +targetCatalogs: + - catalogId: uc + catalogType: DATABRICKS_UC + catalogProperties: + externalCatalog.uc.host: https:// + externalCatalog.uc.warehouseId: + # OAuth M2M (recommended) + externalCatalog.uc.authType: oauth-m2m + externalCatalog.uc.clientId: + externalCatalog.uc.clientSecret: +datasets: + - sourceCatalogTableIdentifier: + tableIdentifier: + hierarchicalId: . + partitionSpec: partitionpath:VALUE + storageIdentifier: + tableFormat: HUDI + tableBasePath: s3://path/to/source/data + tableDataPath: s3://path/to/source/data + tableName:
+ partitionSpec: partitionpath:VALUE + namespace: + targetCatalogTableIdentifiers: + - catalogId: uc + tableFormat: DELTA + tableIdentifier: + hierarchicalId: ..
+``` + +### Authentication (Databricks UC) + +**Supported now** + +- OAuth M2M via: + - `externalCatalog.uc.authType: oauth-m2m` + - `externalCatalog.uc.clientId` + - `externalCatalog.uc.clientSecret` + +**Not supported yet** + +- PAT/token-based auth is intentionally not wired in the current XTable UC integration. + +**Possible later** + +- PAT or other auth flows could be added by extending the UC config and SDK wiring, + but they are out of scope for now. + +### Implementation details (Databricks UC) + +- XTable uses the Databricks SQL Statement Execution API (`StatementExecutionAPI`) to run DDL + against a SQL Warehouse. +- The built-in sync registers **external Delta tables** only: + - `CREATE TABLE IF NOT EXISTS ..
USING DELTA LOCATION ''` +- Schema evolution currently runs `MSCK REPAIR TABLE
SYNC METADATA` when a schema diff is + detected, to refresh catalog metadata without touching the Delta log. + +### Schema evolution limitations (Databricks UC) + +Unity Catalog does not provide a catalog-only schema evolution API for external tables. +While `ALTER TABLE ...` can update the catalog, it also assumes control over the Delta +transaction log. For external tables managed outside Databricks, this can be unsafe. + +To avoid mutating the Delta log, XTable currently: + +1. Detects any schema differences (new columns, dropped columns, type/comment changes). +2. Runs `MSCK REPAIR TABLE
SYNC METADATA` to refresh UC metadata. + +This approach **does not delete data** and avoids modifying the Delta log. It refreshes +catalog metadata in-place via `MSCK REPAIR TABLE ... SYNC METADATA`. +Schema evolution is usually rare in production pipelines, so this trade-off is considered acceptable. + +### Databricks UC limitations and requirements + +- Unity Catalog enforces **unique external locations**. A location used by another + table/volume cannot be reused. This means you cannot register multiple external tables + (e.g., Iceberg/Delta via Glue federation and Databricks UC external) pointing to the same location. +- Ensure your Unity Catalog storage credential has **write** access to the Delta + `_delta_log` directory at the table location. + ## Conclusion + In this guide we saw how to, + 1. sync a source table to create metadata for the desired target table formats using Apache XTable™ (Incubating) 2. catalog the data in Delta format in Unity Catalog on Databricks, and also open-source Unity Catalog 3. query the Delta table using Databricks SQL editor, and open-source Unity Catalog CLI. diff --git a/website/sidebars.js b/website/sidebars.js index 9f0b55943..d39effb9e 100644 --- a/website/sidebars.js +++ b/website/sidebars.js @@ -38,7 +38,7 @@ module.exports = { items: [ 'hms', 'glue-catalog', - 'unity-catalog', + 'databricks-unity-catalog', 'biglake-metastore', ], }, diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java index e4b778d6b..ed41a4fbf 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java @@ -27,4 +27,5 @@ public class CatalogType { public static final String STORAGE = "STORAGE"; public static final String GLUE = "GLUE"; public static final String HMS = "HMS"; + public static final String DATABRICKS_UC = "DATABRICKS_UC"; } diff --git a/xtable-databricks/pom.xml b/xtable-databricks/pom.xml new file mode 100644 index 000000000..5943b9440 --- /dev/null +++ b/xtable-databricks/pom.xml @@ -0,0 +1,80 @@ + + + + 4.0.0 + + org.apache.xtable + xtable + 0.3.0-incubating + + + xtable-databricks + XTable Databricks + + + + org.apache.xtable + xtable-core_${scala.binary.version} + ${project.version} + + + + com.databricks + databricks-sdk-java + 0.85.0 + + + + + org.apache.hadoop + hadoop-common + provided + + + + + org.junit.jupiter + junit-jupiter-api + test + + + org.junit.jupiter + junit-jupiter-params + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + + + org.mockito + mockito-core + test + + + org.mockito + mockito-junit-jupiter + test + + + diff --git a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogConfig.java b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogConfig.java new file mode 100644 index 000000000..c3c9355a2 --- /dev/null +++ b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogConfig.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.databricks; + +import java.util.Map; + +import lombok.Value; + +import org.apache.xtable.conversion.ExternalCatalogConfig; + +@Value +public class DatabricksUnityCatalogConfig { + public static final String HOST = "externalCatalog.uc.host"; + public static final String WAREHOUSE_ID = "externalCatalog.uc.warehouseId"; + public static final String AUTH_TYPE = "externalCatalog.uc.authType"; + public static final String CLIENT_ID = "externalCatalog.uc.clientId"; + public static final String CLIENT_SECRET = "externalCatalog.uc.clientSecret"; + public static final String TOKEN = "externalCatalog.uc.token"; + + String host; + String warehouseId; + String authType; + String clientId; + String clientSecret; + String token; + + public static DatabricksUnityCatalogConfig from(ExternalCatalogConfig catalogConfig) { + Map props = catalogConfig.getCatalogProperties(); + return new DatabricksUnityCatalogConfig( + props.get(HOST), + props.get(WAREHOUSE_ID), + props.get(AUTH_TYPE), + props.get(CLIENT_ID), + props.get(CLIENT_SECRET), + props.get(TOKEN)); + } +} diff --git a/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java new file mode 100644 index 000000000..469ac3a4a --- /dev/null +++ b/xtable-databricks/src/main/java/org/apache/xtable/databricks/DatabricksUnityCatalogSyncClient.java @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.databricks; + +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; + +import lombok.extern.log4j.Log4j2; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import com.databricks.sdk.WorkspaceClient; +import com.databricks.sdk.core.DatabricksConfig; +import com.databricks.sdk.core.error.platform.NotFound; +import com.databricks.sdk.service.catalog.ColumnInfo; +import com.databricks.sdk.service.catalog.CreateSchema; +import com.databricks.sdk.service.catalog.SchemaInfo; +import com.databricks.sdk.service.catalog.SchemasAPI; +import com.databricks.sdk.service.catalog.TableInfo; +import com.databricks.sdk.service.catalog.TablesAPI; +import com.databricks.sdk.service.sql.Disposition; +import com.databricks.sdk.service.sql.ExecuteStatementRequest; +import com.databricks.sdk.service.sql.ExecuteStatementRequestOnWaitTimeout; +import com.databricks.sdk.service.sql.Format; +import com.databricks.sdk.service.sql.StatementExecutionAPI; +import com.databricks.sdk.service.sql.StatementResponse; +import com.databricks.sdk.service.sql.StatementState; + +import org.apache.xtable.catalog.CatalogUtils; +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.exception.CatalogSyncException; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.CatalogType; +import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.spi.sync.CatalogSyncClient; + +/** + * Databricks Unity Catalog implementation skeleton for CatalogSyncClient. + * + *

This is a placeholder to wire Databricks UC as a catalog target via the SQL Statement + * Execution API. Actual DDL execution and schema diffing should be implemented in a follow-up + * change. + */ +@Log4j2 +public class DatabricksUnityCatalogSyncClient implements CatalogSyncClient { + + private ExternalCatalogConfig catalogConfig; + private DatabricksUnityCatalogConfig databricksConfig; + private Configuration hadoopConf; + private String tableFormat; + private WorkspaceClient workspaceClient; + private StatementExecutionAPI statementExecution; + private TablesAPI tablesApi; + private SchemasAPI schemasApi; + + // For loading the instance using ServiceLoader + public DatabricksUnityCatalogSyncClient() {} + + public DatabricksUnityCatalogSyncClient( + ExternalCatalogConfig catalogConfig, String tableFormat, Configuration configuration) { + init(catalogConfig, tableFormat, configuration); + } + + DatabricksUnityCatalogSyncClient( + ExternalCatalogConfig catalogConfig, + String tableFormat, + Configuration configuration, + StatementExecutionAPI statementExecution, + TablesAPI tablesApi, + SchemasAPI schemasApi) { + this.statementExecution = statementExecution; + this.tablesApi = tablesApi; + this.schemasApi = schemasApi; + init(catalogConfig, tableFormat, configuration); + } + + @Override + public String getCatalogId() { + return catalogConfig.getCatalogId(); + } + + @Override + public String getCatalogType() { + return CatalogType.DATABRICKS_UC; + } + + @Override + public String getStorageLocation(TableInfo table) { + if (table == null) { + return null; + } + return table.getStorageLocation(); + } + + @Override + public boolean hasDatabase(CatalogTableIdentifier tableIdentifier) { + HierarchicalTableIdentifier hierarchical = + CatalogUtils.toHierarchicalTableIdentifier(tableIdentifier); + String catalog = hierarchical.getCatalogName(); + if (StringUtils.isBlank(catalog)) { + throw new CatalogSyncException( + "Databricks UC requires a catalog name (expected catalog.schema.table)"); + } + String schema = hierarchical.getDatabaseName(); + if (StringUtils.isBlank(schema)) { + throw new CatalogSyncException("Databricks UC requires a schema name"); + } + + String fullName = catalog + "." + schema; + try { + SchemaInfo schemaInfo = schemasApi.get(fullName); + return schemaInfo != null; + } catch (NotFound e) { + return false; + } catch (Exception e) { + throw new CatalogSyncException("Failed to get schema: " + fullName, e); + } + } + + @Override + public void createDatabase(CatalogTableIdentifier tableIdentifier) { + HierarchicalTableIdentifier hierarchical = + CatalogUtils.toHierarchicalTableIdentifier(tableIdentifier); + String catalog = hierarchical.getCatalogName(); + if (StringUtils.isBlank(catalog)) { + throw new CatalogSyncException( + "Databricks UC requires a catalog name (expected catalog.schema.table)"); + } + String schema = hierarchical.getDatabaseName(); + if (StringUtils.isBlank(schema)) { + throw new CatalogSyncException("Databricks UC requires a schema name"); + } + + try { + schemasApi.create(new CreateSchema().setCatalogName(catalog).setName(schema)); + } catch (Exception e) { + throw new CatalogSyncException("Failed to create database: " + schema, e); + } + } + + @Override + public TableInfo getTable(CatalogTableIdentifier tableIdentifier) { + String fullName = getFullName(tableIdentifier); + try { + return tablesApi.get(fullName); + } catch (NotFound e) { + return null; + } catch (Exception e) { + throw new CatalogSyncException("Failed to get table: " + fullName, e); + } + } + + @Override + public void createTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { + ensureDeltaOnly(); + String fullName = getFullName(tableIdentifier); + String location = table.getBasePath(); + if (StringUtils.isBlank(location)) { + throw new CatalogSyncException("Storage location is required for external Delta tables"); + } + + String statement = + String.format( + "CREATE TABLE IF NOT EXISTS %s USING DELTA LOCATION '%s'", + fullName, escapeSqlString(location)); + log.info("Databricks UC create table: {}", fullName); + executeStatement(statement); + } + + @Override + public void refreshTable( + InternalTable table, TableInfo catalogTable, CatalogTableIdentifier tableIdentifier) { + ensureDeltaOnly(); + if (catalogTable == null) { + log.warn( + "Databricks UC refreshTable called with null catalog table for {}", + tableIdentifier.getId()); + return; + } + InternalSchema schema = table.getReadSchema(); + if (schema == null || schema.getFields() == null || schema.getFields().isEmpty()) { + log.warn( + "Databricks UC refreshTable skipped due to missing schema for {}", + tableIdentifier.getId()); + return; + } + if (!schemasMatch(schema, catalogTable)) { + String fullName = getFullName(tableIdentifier); + log.info("Databricks UC refresh table metadata (MSCK REPAIR TABLE): {}", fullName); + executeStatement(String.format("MSCK REPAIR TABLE %s SYNC METADATA", fullName)); + } else { + log.info( + "Databricks UC refreshTable: schema already up to date for {}", tableIdentifier.getId()); + } + } + + @Override + public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { + ensureDeltaOnly(); + dropTable(table, tableIdentifier); + createTable(table, tableIdentifier); + } + + @Override + public void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier) { + String fullName = getFullName(tableIdentifier); + try { + log.info("Databricks UC drop table: {}", fullName); + tablesApi.delete(fullName); + } catch (Exception e) { + throw new CatalogSyncException("Failed to drop table: " + fullName, e); + } + } + + @Override + public void init( + ExternalCatalogConfig catalogConfig, String tableFormat, Configuration configuration) { + this.catalogConfig = catalogConfig; + this.tableFormat = tableFormat; + this.hadoopConf = configuration; + this.databricksConfig = DatabricksUnityCatalogConfig.from(catalogConfig); + + if (databricksConfig.getHost() == null || databricksConfig.getWarehouseId() == null) { + throw new CatalogSyncException( + "Databricks UC catalog requires host and warehouseId in catalogProperties"); + } + if (this.statementExecution == null) { + this.workspaceClient = new WorkspaceClient(buildConfig(databricksConfig)); + this.statementExecution = workspaceClient.statementExecution(); + } + if (this.tablesApi == null) { + if (this.workspaceClient == null) { + this.workspaceClient = new WorkspaceClient(buildConfig(databricksConfig)); + } + this.tablesApi = this.workspaceClient.tables(); + } + if (this.schemasApi == null) { + if (this.workspaceClient == null) { + this.workspaceClient = new WorkspaceClient(buildConfig(databricksConfig)); + } + this.schemasApi = this.workspaceClient.schemas(); + } + log.info( + "Initialized Databricks UC sync client for catalogId={} tableFormat={}", + catalogConfig.getCatalogId(), + tableFormat); + } + + private void ensureDeltaOnly() { + if (!Objects.equals(tableFormat, TableFormat.DELTA)) { + throw new CatalogSyncException( + "Databricks UC sync client currently supports external DELTA only"); + } + } + + private String getFullName(CatalogTableIdentifier tableIdentifier) { + HierarchicalTableIdentifier hierarchical = + CatalogUtils.toHierarchicalTableIdentifier(tableIdentifier); + String catalog = hierarchical.getCatalogName(); + if (StringUtils.isBlank(catalog)) { + throw new CatalogSyncException( + "Databricks UC requires a catalog name (expected catalog.schema.table)"); + } + return hierarchical.getId(); + } + + private StatementResponse executeStatement(String statement) { + ExecuteStatementRequest request = + new ExecuteStatementRequest() + .setStatement(statement) + .setWarehouseId(databricksConfig.getWarehouseId()) + .setFormat(Format.JSON_ARRAY) + .setDisposition(Disposition.INLINE) + .setWaitTimeout("30s") + .setOnWaitTimeout(ExecuteStatementRequestOnWaitTimeout.CANCEL); + + StatementResponse response = statementExecution.executeStatement(request); + if (response.getStatus() != null && response.getStatus().getState() == StatementState.FAILED) { + String errorMessage = null; + if (response.getStatus().getError() != null) { + errorMessage = response.getStatus().getError().getMessage(); + } + if (StringUtils.isBlank(errorMessage)) { + throw new CatalogSyncException("Databricks UC statement failed: " + statement); + } + throw new CatalogSyncException( + "Databricks UC statement failed: " + statement + " (" + errorMessage + ")"); + } + return response; + } + + private DatabricksConfig buildConfig(DatabricksUnityCatalogConfig config) { + DatabricksConfig dbConfig = new DatabricksConfig().setHost(config.getHost()); + if (!StringUtils.isBlank(config.getAuthType())) { + dbConfig.setAuthType(config.getAuthType()); + } + if (!StringUtils.isBlank(config.getToken())) { + dbConfig.setToken(config.getToken()); + } else if (!StringUtils.isBlank(config.getClientId()) + && !StringUtils.isBlank(config.getClientSecret())) { + dbConfig.setClientId(config.getClientId()); + dbConfig.setClientSecret(config.getClientSecret()); + if (StringUtils.isBlank(config.getAuthType())) { + dbConfig.setAuthType("oauth-m2m"); + } + } + return dbConfig; + } + + private static String escapeSqlString(String value) { + return value.replace("'", "''"); + } + + private static boolean schemasMatch(InternalSchema desired, TableInfo existing) { + Map desiredColumns = toColumnSignatureMap(desired); + Map existingColumns = toColumnSignatureMap(existing); + if (desiredColumns.size() != existingColumns.size()) { + return false; + } + for (Map.Entry entry : desiredColumns.entrySet()) { + ColumnSignature existingSignature = existingColumns.get(entry.getKey()); + if (existingSignature == null) { + return false; + } + if (!entry.getValue().equals(existingSignature)) { + return false; + } + } + return true; + } + + private static Map toColumnSignatureMap(InternalSchema schema) { + Map result = new HashMap<>(); + for (InternalField field : schema.getFields()) { + String name = normalizeName(field.getName()); + String type = normalizeType(toSparkSqlType(field)); + boolean nullable = field.getSchema().isNullable(); + String comment = StringUtils.defaultString(field.getSchema().getComment()); + result.put(name, new ColumnSignature(type, nullable, comment)); + } + return result; + } + + private static Map toColumnSignatureMap(TableInfo tableInfo) { + Map result = new HashMap<>(); + if (tableInfo == null || tableInfo.getColumns() == null) { + return result; + } + for (ColumnInfo column : tableInfo.getColumns()) { + String name = normalizeName(column.getName()); + String type = normalizeType(column.getTypeText()); + boolean nullable = column.getNullable() == null || column.getNullable(); + String comment = StringUtils.defaultString(column.getComment()); + result.put(name, new ColumnSignature(type, nullable, comment)); + } + return result; + } + + private static final class ColumnSignature { + private final String type; + private final boolean nullable; + private final String comment; + + private ColumnSignature(String type, boolean nullable, String comment) { + this.type = type; + this.nullable = nullable; + this.comment = comment; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ColumnSignature that = (ColumnSignature) o; + return nullable == that.nullable + && Objects.equals(type, that.type) + && Objects.equals(comment, that.comment); + } + + @Override + public int hashCode() { + return Objects.hash(type, nullable, comment); + } + } + + private static String normalizeName(String value) { + return value == null ? "" : value.toLowerCase(Locale.ROOT); + } + + private static String normalizeType(String value) { + if (value == null) { + return ""; + } + return value.replaceAll("\\s+", "").toLowerCase(Locale.ROOT); + } + + private static String toSparkSqlType(InternalField field) { + switch (field.getSchema().getDataType()) { + case ENUM: + case STRING: + return "string"; + case INT: + return "int"; + case LONG: + return "bigint"; + case BYTES: + case FIXED: + case UUID: + return "binary"; + case BOOLEAN: + return "boolean"; + case FLOAT: + return "float"; + case DATE: + return "date"; + case TIMESTAMP: + return "timestamp"; + case TIMESTAMP_NTZ: + return "timestamp_ntz"; + case DOUBLE: + return "double"; + case DECIMAL: + int precision = + (int) field.getSchema().getMetadata().get(InternalSchema.MetadataKey.DECIMAL_PRECISION); + int scale = + (int) field.getSchema().getMetadata().get(InternalSchema.MetadataKey.DECIMAL_SCALE); + return String.format("decimal(%d,%d)", precision, scale); + case RECORD: + return toStructType(field.getSchema()); + case MAP: + InternalField key = + field.getSchema().getFields().stream() + .filter( + mapField -> + InternalField.Constants.MAP_KEY_FIELD_NAME.equals(mapField.getName())) + .findFirst() + .orElseThrow(() -> new CatalogSyncException("Invalid map schema")); + InternalField value = + field.getSchema().getFields().stream() + .filter( + mapField -> + InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName())) + .findFirst() + .orElseThrow(() -> new CatalogSyncException("Invalid map schema")); + return String.format("map<%s,%s>", toSparkSqlType(key), toSparkSqlType(value)); + case LIST: + InternalField element = + field.getSchema().getFields().stream() + .filter( + arrayField -> + InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals( + arrayField.getName())) + .findFirst() + .orElseThrow(() -> new CatalogSyncException("Invalid array schema")); + return String.format("array<%s>", toSparkSqlType(element)); + default: + throw new CatalogSyncException("Unsupported type: " + field.getSchema().getDataType()); + } + } + + private static String toStructType(InternalSchema schema) { + StringBuilder builder = new StringBuilder("struct<"); + boolean first = true; + for (InternalField field : schema.getFields()) { + if (!first) { + builder.append(","); + } + builder.append(field.getName()).append(":").append(toSparkSqlType(field)); + first = false; + } + builder.append(">"); + return builder.toString(); + } + + @Override + public void close() { + // WorkspaceClient has no explicit close hook; no-op for now. + } +} diff --git a/xtable-databricks/src/main/resources/META-INF/services/org.apache.xtable.spi.sync.CatalogSyncClient b/xtable-databricks/src/main/resources/META-INF/services/org.apache.xtable.spi.sync.CatalogSyncClient new file mode 100644 index 000000000..9ac93574c --- /dev/null +++ b/xtable-databricks/src/main/resources/META-INF/services/org.apache.xtable.spi.sync.CatalogSyncClient @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.xtable.databricks.DatabricksUnityCatalogSyncClient diff --git a/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java b/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java new file mode 100644 index 000000000..f760a48aa --- /dev/null +++ b/xtable-databricks/src/test/java/org/apache/xtable/databricks/TestDatabricksUnityCatalogSyncClient.java @@ -0,0 +1,570 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.databricks; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.databricks.sdk.core.error.platform.NotFound; +import com.databricks.sdk.service.catalog.ColumnInfo; +import com.databricks.sdk.service.catalog.CreateSchema; +import com.databricks.sdk.service.catalog.SchemaInfo; +import com.databricks.sdk.service.catalog.SchemasAPI; +import com.databricks.sdk.service.catalog.TableInfo; +import com.databricks.sdk.service.catalog.TablesAPI; +import com.databricks.sdk.service.sql.ExecuteStatementRequest; +import com.databricks.sdk.service.sql.StatementExecutionAPI; +import com.databricks.sdk.service.sql.StatementResponse; +import com.databricks.sdk.service.sql.StatementState; +import com.databricks.sdk.service.sql.StatementStatus; + +import org.apache.xtable.conversion.ExternalCatalogConfig; +import org.apache.xtable.exception.CatalogSyncException; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.model.storage.CatalogType; +import org.apache.xtable.model.storage.TableFormat; + +@ExtendWith(MockitoExtension.class) +public class TestDatabricksUnityCatalogSyncClient { + + @Mock private StatementExecutionAPI mockStatementExecution; + @Mock private TablesAPI mockTablesApi; + @Mock private SchemasAPI mockSchemasApi; + + @Test + void testCreateTableDelta_NoColumns() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + props.put(DatabricksUnityCatalogConfig.AUTH_TYPE, "oauth-m2m"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + when(mockStatementExecution.executeStatement(any(ExecuteStatementRequest.class))) + .thenReturn( + new StatementResponse() + .setStatus(new StatementStatus().setState(StatementState.SUCCEEDED))); + + InternalTable table = InternalTable.builder().basePath("s3://bucket/path").build(); + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + + client.createTable(table, tableIdentifier); + + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(ExecuteStatementRequest.class); + verify(mockStatementExecution).executeStatement(requestCaptor.capture()); + ExecuteStatementRequest request = requestCaptor.getValue(); + assertEquals("wh-1", request.getWarehouseId()); + assertEquals( + "CREATE TABLE IF NOT EXISTS main.default.people USING DELTA LOCATION 's3://bucket/path'", + request.getStatement()); + } + + @Test + void testCreateTableRejectsNonDelta() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.ICEBERG, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + InternalTable table = InternalTable.builder().basePath("s3://bucket/path").build(); + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + + assertThrows(CatalogSyncException.class, () -> client.createTable(table, tableIdentifier)); + } + + @Test + void testCreateOrReplaceTableDelta() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + when(mockStatementExecution.executeStatement(any(ExecuteStatementRequest.class))) + .thenReturn( + new StatementResponse() + .setStatus(new StatementStatus().setState(StatementState.SUCCEEDED))); + + InternalTable table = InternalTable.builder().basePath("s3://bucket/path").build(); + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + + client.createOrReplaceTable(table, tableIdentifier); + + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(ExecuteStatementRequest.class); + verify(mockStatementExecution).executeStatement(requestCaptor.capture()); + verify(mockTablesApi).delete("main.default.people"); + ExecuteStatementRequest request = requestCaptor.getValue(); + assertEquals( + "CREATE TABLE IF NOT EXISTS main.default.people USING DELTA LOCATION 's3://bucket/path'", + request.getStatement()); + } + + @Test + void testCreateOrReplaceTableRejectsNonDelta() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.ICEBERG, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + InternalTable table = InternalTable.builder().basePath("s3://bucket/path").build(); + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + + assertThrows( + CatalogSyncException.class, () -> client.createOrReplaceTable(table, tableIdentifier)); + } + + @Test + void testRefreshTableSchemaEvolution() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + when(mockStatementExecution.executeStatement(any(ExecuteStatementRequest.class))) + .thenReturn( + new StatementResponse() + .setStatus(new StatementStatus().setState(StatementState.SUCCEEDED))); + + InternalSchema idSchema = + InternalSchema.builder().name("id").dataType(InternalType.INT).isNullable(true).build(); + InternalSchema readSchema = + InternalSchema.builder() + .name("root") + .dataType(InternalType.RECORD) + .isNullable(true) + .fields( + java.util.Arrays.asList( + InternalField.builder().name("id").schema(idSchema).build())) + .build(); + InternalTable table = + InternalTable.builder().basePath("s3://bucket/path").readSchema(readSchema).build(); + TableInfo catalogTable = + new TableInfo() + .setColumns( + java.util.Arrays.asList( + new ColumnInfo().setName("name").setTypeText("string").setNullable(true))); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + + client.refreshTable(table, catalogTable, tableIdentifier); + + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(ExecuteStatementRequest.class); + verify(mockStatementExecution).executeStatement(requestCaptor.capture()); + assertEquals( + "MSCK REPAIR TABLE main.default.people SYNC METADATA", + requestCaptor.getValue().getStatement()); + } + + @Test + void testDropTable() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + + client.dropTable(InternalTable.builder().basePath("s3://bucket/path").build(), tableIdentifier); + + verify(mockTablesApi).delete("main.default.people"); + } + + @Test + void testDropTableFailure() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + + doThrow(new RuntimeException("boom")).when(mockTablesApi).delete("main.default.people"); + + assertThrows( + CatalogSyncException.class, + () -> + client.dropTable( + InternalTable.builder().basePath("s3://bucket/path").build(), tableIdentifier)); + } + + @Test + void testHasDatabaseTrue() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + when(mockSchemasApi.get("main.default")).thenReturn(new SchemaInfo()); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + boolean exists = client.hasDatabase(tableIdentifier); + assertEquals(true, exists); + + verify(mockSchemasApi).get("main.default"); + } + + @Test + void testHasDatabaseFalse() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + when(mockSchemasApi.get("main.default")).thenThrow(new NotFound("not found", null)); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + boolean exists = client.hasDatabase(tableIdentifier); + assertEquals(false, exists); + } + + @Test + void testHasDatabaseFailedStatement() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + when(mockSchemasApi.get("main.default")).thenThrow(new RuntimeException("boom")); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + assertThrows(CatalogSyncException.class, () -> client.hasDatabase(tableIdentifier)); + } + + @Test + void testCreateDatabase() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + + client.createDatabase(tableIdentifier); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(CreateSchema.class); + verify(mockSchemasApi).create(requestCaptor.capture()); + CreateSchema request = requestCaptor.getValue(); + assertEquals("main", request.getCatalogName()); + assertEquals("default", request.getName()); + } + + @Test + void testCreateDatabaseFailure() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + + doThrow(new RuntimeException("boom")).when(mockSchemasApi).create(any(CreateSchema.class)); + + assertThrows(CatalogSyncException.class, () -> client.createDatabase(tableIdentifier)); + } + + @Test + void testGetTableFound() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + TableInfo tableInfo = new TableInfo().setStorageLocation("s3://bucket/path"); + when(mockTablesApi.get("main.default.people")).thenReturn(tableInfo); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + Object table = client.getTable(tableIdentifier); + assertNotNull(table); + + verify(mockTablesApi).get("main.default.people"); + } + + @Test + void testGetTableNotFound() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + when(mockTablesApi.get("main.default.people")).thenThrow(new NotFound("not found", null)); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + Object table = client.getTable(tableIdentifier); + assertNull(table); + } + + @Test + void testGetTableFailedStatement() { + Map props = new HashMap<>(); + props.put(DatabricksUnityCatalogConfig.HOST, "https://example.cloud.databricks.com"); + props.put(DatabricksUnityCatalogConfig.WAREHOUSE_ID, "wh-1"); + ExternalCatalogConfig config = + ExternalCatalogConfig.builder() + .catalogId("uc") + .catalogType(CatalogType.DATABRICKS_UC) + .catalogProperties(props) + .build(); + + DatabricksUnityCatalogSyncClient client = + new DatabricksUnityCatalogSyncClient( + config, + TableFormat.DELTA, + new Configuration(), + mockStatementExecution, + mockTablesApi, + mockSchemasApi); + + when(mockTablesApi.get("main.default.people")).thenThrow(new RuntimeException("boom")); + + ThreePartHierarchicalTableIdentifier tableIdentifier = + new ThreePartHierarchicalTableIdentifier("main", "default", "people"); + assertThrows(CatalogSyncException.class, () -> client.getTable(tableIdentifier)); + } +}