diff --git a/flink-python/pyflink/table/tests/test_catalog_completeness.py b/flink-python/pyflink/table/tests/test_catalog_completeness.py index c1ab64c96aa01..3e20aab37ca7d 100644 --- a/flink-python/pyflink/table/tests/test_catalog_completeness.py +++ b/flink-python/pyflink/table/tests/test_catalog_completeness.py @@ -45,7 +45,13 @@ def excluded_methods(cls): 'getFactory', 'getTableFactory', 'getFunctionDefinitionFactory', - 'listPartitionsByFilter'} + 'listPartitionsByFilter', + 'getConnection', + 'dropConnection', + 'connectionExists', + 'listConnections', + 'createConnection', + 'alterConnection'} class CatalogDatabaseAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase): diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 4743663e8a402..ea8fd2714a988 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -85,6 +85,7 @@ import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor; import org.apache.flink.table.factories.ApiFactoryUtil; import org.apache.flink.table.factories.CatalogStoreFactory; +import org.apache.flink.table.factories.DefaultConnectionFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.PlannerFactoryUtil; import org.apache.flink.table.factories.TableFactoryUtil; @@ -122,6 +123,7 @@ import org.apache.flink.table.resource.ResourceUri; import org.apache.flink.table.secret.SecretStore; import org.apache.flink.table.secret.SecretStoreFactory; +import org.apache.flink.table.secret.WritableSecretStore; import org.apache.flink.table.types.AbstractDataType; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.DataTypeUtils; @@ -275,6 +277,11 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) { final ResourceManager resourceManager = new ResourceManager(settings.getConfiguration(), userClassLoader); final ModuleManager moduleManager = new ModuleManager(); + final WritableSecretStore writableSecretStore = + secretStore instanceof WritableSecretStore + ? (WritableSecretStore) secretStore + : null; + final CatalogManager catalogManager = CatalogManager.newBuilder() .classLoader(userClassLoader) @@ -297,6 +304,8 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) { .sqlFactory( settings.getSqlFactory() .orElseGet(() -> DefaultSqlFactory.INSTANCE)) + .connectionFactory(new DefaultConnectionFactory()) + .writableSecretStore(writableSecretStore) .build(); final FunctionCatalog functionCatalog = diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index 047d2c89d5bf7..1039215ea357e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -33,6 +33,8 @@ import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; import org.apache.flink.table.catalog.StartMode.StartModeKind; import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.ConnectionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.ConnectionNotExistException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; @@ -41,14 +43,17 @@ import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.listener.AlterConnectionEvent; import org.apache.flink.table.catalog.listener.AlterDatabaseEvent; import org.apache.flink.table.catalog.listener.AlterModelEvent; import org.apache.flink.table.catalog.listener.AlterTableEvent; import org.apache.flink.table.catalog.listener.CatalogContext; import org.apache.flink.table.catalog.listener.CatalogModificationListener; +import org.apache.flink.table.catalog.listener.CreateConnectionEvent; import org.apache.flink.table.catalog.listener.CreateDatabaseEvent; import org.apache.flink.table.catalog.listener.CreateModelEvent; import org.apache.flink.table.catalog.listener.CreateTableEvent; +import org.apache.flink.table.catalog.listener.DropConnectionEvent; import org.apache.flink.table.catalog.listener.DropDatabaseEvent; import org.apache.flink.table.catalog.listener.DropModelEvent; import org.apache.flink.table.catalog.listener.DropTableEvent; @@ -57,9 +62,13 @@ import org.apache.flink.table.expressions.DefaultSqlFactory; import org.apache.flink.table.expressions.SqlFactory; import org.apache.flink.table.expressions.resolver.ExpressionResolver.ExpressionResolverBuilder; +import org.apache.flink.table.factories.ConnectionFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; +import org.apache.flink.table.secret.GenericInMemorySecretStore; +import org.apache.flink.table.secret.WritableSecretStore; +import org.apache.flink.table.secret.exceptions.SecretException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; @@ -98,6 +107,7 @@ */ @Internal public final class CatalogManager implements CatalogRegistry, AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class); // A map between names and catalogs. @@ -111,6 +121,15 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { // models coming from catalogs. private final Map temporaryModels; + // Those connections take precedence over corresponding permanent connections, thus they shadow + // connections coming from catalogs. + private final Map temporaryConnections; + + // Backing store for secrets of temporary connections. Lifetime is tied to this + // CatalogManager — temporary connections are session-scoped, so their secrets should not + // be persisted in the configured (potentially persistent) writableSecretStore. + private final WritableSecretStore temporarySecretStore; + // The name of the current catalog and database private @Nullable String currentCatalogName; @@ -132,6 +151,10 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { private final MaterializedTableEnricher materializedTableEnricher; + @Nullable private final ConnectionFactory connectionFactory; + + @Nullable private final WritableSecretStore writableSecretStore; + private CatalogManager( String defaultCatalogName, Catalog defaultCatalog, @@ -139,7 +162,9 @@ private CatalogManager( List catalogModificationListeners, CatalogStoreHolder catalogStoreHolder, SqlFactory sqlFactory, - MaterializedTableEnricher materializedTableEnricher) { + MaterializedTableEnricher materializedTableEnricher, + @Nullable ConnectionFactory connectionFactory, + @Nullable WritableSecretStore writableSecretStore) { checkArgument( !StringUtils.isNullOrWhitespaceOnly(defaultCatalogName), "Default catalog name cannot be null or empty"); @@ -152,6 +177,8 @@ private CatalogManager( temporaryTables = new HashMap<>(); temporaryModels = new HashMap<>(); + temporaryConnections = new HashMap<>(); + temporarySecretStore = new GenericInMemorySecretStore(); // right now the default catalog is always the built-in one builtInCatalogName = defaultCatalogName; @@ -164,6 +191,8 @@ private CatalogManager( this.sqlFactory = sqlFactory; this.materializedTableEnricher = checkNotNull(materializedTableEnricher, "MaterializedTableEnricher cannot be null"); + this.connectionFactory = connectionFactory; + this.writableSecretStore = writableSecretStore; } @VisibleForTesting @@ -203,6 +232,10 @@ public static final class Builder { private MaterializedTableEnricher materializedTableEnricher; + private @Nullable ConnectionFactory connectionFactory; + + private @Nullable WritableSecretStore writableSecretStore; + public Builder classLoader(ClassLoader classLoader) { this.classLoader = classLoader; return this; @@ -251,6 +284,16 @@ public Builder materializedTableEnricher( return this; } + public Builder connectionFactory(@Nullable ConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + return this; + } + + public Builder writableSecretStore(@Nullable WritableSecretStore writableSecretStore) { + this.writableSecretStore = writableSecretStore; + return this; + } + public CatalogManager build() { checkNotNull(classLoader, "Class loader cannot be null"); checkNotNull(config, "Config cannot be null"); @@ -271,7 +314,9 @@ public CatalogManager build() { sqlFactory, materializedTableEnricher != null ? materializedTableEnricher - : createDefaultMaterializedTableEnricher()); + : createDefaultMaterializedTableEnricher(), + connectionFactory, + writableSecretStore); } private MaterializedTableEnricher createDefaultMaterializedTableEnricher() { @@ -1791,6 +1836,328 @@ public ResolvedCatalogModel resolveCatalogModel(CatalogModel model) { return ResolvedCatalogModel.of(model, resolvedInputSchema, resolvedOutputSchema); } + // ------ connections ------ + + /** + * Get a connection from the catalog with the given object identifier. + * + * @param objectIdentifier The fully qualified path of the connection. + * @return The requested connection wrapped in Optional. + */ + public Optional getConnection(ObjectIdentifier objectIdentifier) { + CatalogConnection temporaryConnection = temporaryConnections.get(objectIdentifier); + if (temporaryConnection != null) { + return Optional.of(temporaryConnection); + } + + Optional catalog = getCatalog(objectIdentifier.getCatalogName()); + if (catalog.isPresent()) { + try { + return Optional.of(catalog.get().getConnection(objectIdentifier.toObjectPath())); + } catch (ConnectionNotExistException | UnsupportedOperationException e) { + // ConnectionNotExistException: connection does not exist in this catalog. + // UnsupportedOperationException: catalog does not support connections. + return Optional.empty(); + } + } else { + return Optional.empty(); + } + } + + /** + * List all connections in the given catalog and database. + * + * @param catalogName The name of the catalog. + * @param databaseName The name of the database. + * @return A set of connection names. + */ + public Set listConnections(String catalogName, String databaseName) { + Catalog catalog = getCatalogOrError(catalogName); + try { + Set connections = new HashSet<>(catalog.listConnections(databaseName)); + + // Add temporary connections for this catalog and database + temporaryConnections.keySet().stream() + .filter( + identifier -> + identifier.getCatalogName().equals(catalogName) + && identifier.getDatabaseName().equals(databaseName)) + .map(ObjectIdentifier::getObjectName) + .forEach(connections::add); + + return connections; + } catch (DatabaseNotExistException e) { + throw new ValidationException( + String.format( + "Database %s does not exist in catalog %s.", databaseName, catalogName), + e); + } catch (CatalogException e) { + throw new TableException( + String.format( + "Failed to list connections in catalog %s and database %s.", + catalogName, databaseName), + e); + } + } + + /** + * Create a permanent connection in the given fully qualified path. + * + *

If a {@link ConnectionFactory} and {@link WritableSecretStore} are configured, sensitive + * fields are extracted from the connection and stored in the secret store before persisting the + * non-sensitive {@link CatalogConnection} to the catalog. + * + * @param connection The connection with all options including sensitive fields. + * @param objectIdentifier The fully qualified path where to create the connection. + * @param ignoreIfExists If false exception will be thrown if the connection already exists. + */ + public void createConnection( + SensitiveConnection connection, + ObjectIdentifier objectIdentifier, + boolean ignoreIfExists) { + if (connectionFactory == null || writableSecretStore == null) { + throw new ValidationException( + "ConnectionFactory and WritableSecretStore must be configured to create connections."); + } + if (getConnection(objectIdentifier).isPresent()) { + if (ignoreIfExists) { + return; + } + throw new ValidationException( + String.format( + "Connection with identifier '%s' already exists.", + objectIdentifier.asSummaryString())); + } + final CatalogConnection catalogConnection = + connectionFactory.createConnection(connection, writableSecretStore); + boolean persisted = false; + try { + execute( + (catalog, path) -> { + catalog.createConnection(path, catalogConnection, ignoreIfExists); + catalogModificationListeners.forEach( + listener -> + listener.onEvent( + CreateConnectionEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + catalogConnection, + ignoreIfExists, + false))); + }, + objectIdentifier, + ignoreIfExists, + "CreateConnection"); + persisted = true; + } finally { + if (!persisted) { + tryDeleteSecrets( + catalogConnection, + writableSecretStore, + "rollback createConnection " + objectIdentifier); + } + } + } + + /** + * Create a temporary connection in the given fully qualified path. + * + * @param connection The connection with all options including sensitive fields. + * @param objectIdentifier The fully qualified path where to create the connection. + * @param ignoreIfExists If false exception will be thrown if the connection already exists. + */ + public void createTemporaryConnection( + SensitiveConnection connection, + ObjectIdentifier objectIdentifier, + boolean ignoreIfExists) { + if (connectionFactory == null) { + throw new ValidationException( + "ConnectionFactory must be configured to create connections."); + } + if (temporaryConnections.containsKey(objectIdentifier)) { + if (ignoreIfExists) { + return; + } + throw new ValidationException( + String.format("Temporary connection '%s' already exists", objectIdentifier)); + } + // Temporary connections are session-scoped; store secrets in an in-memory store rather + // than the configured (potentially persistent) writableSecretStore. + final CatalogConnection catalogConnection = + connectionFactory.createConnection(connection, temporarySecretStore); + temporaryConnections.put(objectIdentifier, catalogConnection); + Catalog catalog = getCatalog(objectIdentifier.getCatalogName()).orElse(null); + catalogModificationListeners.forEach( + listener -> + listener.onEvent( + CreateConnectionEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), catalog), + objectIdentifier, + catalogConnection, + ignoreIfExists, + true))); + } + + /** + * Alter a connection in the given fully qualified path. + * + * @param newConnection The new connection containing changes. + * @param objectIdentifier The fully qualified path where to alter the connection. + * @param ignoreIfNotExists If false exception will be thrown if the connection to be altered + * does not exist. + */ + public void alterConnection( + SensitiveConnection newConnection, + ObjectIdentifier objectIdentifier, + boolean ignoreIfNotExists) { + if (connectionFactory == null || writableSecretStore == null) { + throw new ValidationException( + "ConnectionFactory and WritableSecretStore must be configured to alter connections."); + } + Optional existingOpt = getConnection(objectIdentifier); + if (!existingOpt.isPresent()) { + if (ignoreIfNotExists) { + return; + } + throw new ValidationException( + String.format( + "Connection with identifier '%s' does not exist.", + objectIdentifier.asSummaryString())); + } + final CatalogConnection existing = existingOpt.get(); + final CatalogConnection newCatalogConnection = + connectionFactory.createConnection(newConnection, writableSecretStore); + boolean persisted = false; + try { + execute( + (catalog, path) -> { + catalog.alterConnection(path, newCatalogConnection, ignoreIfNotExists); + catalogModificationListeners.forEach( + listener -> + listener.onEvent( + AlterConnectionEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + newCatalogConnection, + ignoreIfNotExists))); + }, + objectIdentifier, + ignoreIfNotExists, + "AlterConnection"); + persisted = true; + } finally { + // On success: drop the OLD secret. On failure: drop the freshly-stored NEW secret. + tryDeleteSecrets( + persisted ? existing : newCatalogConnection, + writableSecretStore, + persisted + ? "post-alter cleanup of old secret for " + objectIdentifier + : "rollback alterConnection " + objectIdentifier); + } + } + + /** + * Drop a permanent connection from the given fully qualified path. + * + * @param objectIdentifier The fully qualified path of the connection to be dropped. + * @param ignoreIfNotExists If false exception will be thrown if the connection to be dropped + * does not exist. + */ + public void dropConnection(ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) { + Optional existingOpt = getConnection(objectIdentifier); + if (!existingOpt.isPresent()) { + if (ignoreIfNotExists) { + return; + } + throw new ValidationException( + String.format( + "Connection with identifier '%s' does not exist.", + objectIdentifier.asSummaryString())); + } + final CatalogConnection existing = existingOpt.get(); + execute( + (catalog, path) -> { + catalog.dropConnection(path, ignoreIfNotExists); + catalogModificationListeners.forEach( + listener -> + listener.onEvent( + DropConnectionEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), + catalog), + objectIdentifier, + existing, + ignoreIfNotExists, + false))); + }, + objectIdentifier, + ignoreIfNotExists, + "DropConnection"); + if (connectionFactory != null && writableSecretStore != null) { + tryDeleteSecrets( + existing, writableSecretStore, "post-drop cleanup for " + objectIdentifier); + } + } + + /** + * Drop a temporary connection from the given fully qualified path. + * + * @param objectIdentifier The fully qualified path of the connection to be dropped. + * @param ignoreIfNotExists If false exception will be thrown if the connection to be dropped + * does not exist. + */ + public void dropTemporaryConnection( + ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) { + CatalogConnection connection = temporaryConnections.get(objectIdentifier); + if (connection != null) { + temporaryConnections.remove(objectIdentifier); + Catalog catalog = getCatalog(objectIdentifier.getCatalogName()).orElse(null); + catalogModificationListeners.forEach( + listener -> + listener.onEvent( + DropConnectionEvent.createEvent( + CatalogContext.createContext( + objectIdentifier.getCatalogName(), catalog), + objectIdentifier, + connection, + ignoreIfNotExists, + true))); + if (connectionFactory != null) { + tryDeleteSecrets( + connection, + temporarySecretStore, + "post-drop cleanup of temporary " + objectIdentifier); + } + } else if (!ignoreIfNotExists) { + throw new ValidationException( + String.format( + "Temporary connection with identifier '%s' does not exist.", + objectIdentifier.asSummaryString())); + } + } + + /** + * Best-effort cleanup of a connection's secrets. The catalog state has already been mutated (or + * failed); a cleanup failure should not mask the user-visible result. Logs the failure (which + * may indicate an orphaned secret in the underlying store) and swallows the exception. + */ + private void tryDeleteSecrets( + CatalogConnection connection, WritableSecretStore store, String context) { + try { + connectionFactory.deleteSecrets(connection, store); + } catch (SecretException e) { + LOG.warn( + "Failed to delete connection secrets during {}; the catalog state is correct, but the secret may be orphaned in the secret store.", + context, + e); + } + } + /** * A command that modifies given {@link Catalog} in an {@link ObjectPath}. This unifies error * handling across different commands. @@ -1813,6 +2180,8 @@ private void execute( | TableNotExistException | ModelNotExistException | ModelAlreadyExistException + | ConnectionNotExistException + | ConnectionAlreadyExistException | DatabaseNotExistException e) { throw new ValidationException(getErrorMessage(objectIdentifier, commandName), e); } catch (Exception e) { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java index 15c8f00dc9112..bc678672d91d6 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java @@ -20,6 +20,8 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.ConnectionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.ConnectionNotExistException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; @@ -59,6 +61,7 @@ public class GenericInMemoryCatalog extends AbstractCatalog { private final Map databases; private final Map tables; private final Map models; + private final Map connections; private final Map functions; private final Map> partitions; @@ -79,6 +82,7 @@ public GenericInMemoryCatalog(String name, String defaultDatabase) { this.databases.put(defaultDatabase, new CatalogDatabaseImpl(new HashMap<>(), null)); this.tables = new LinkedHashMap<>(); this.models = new LinkedHashMap<>(); + this.connections = new LinkedHashMap<>(); this.functions = new LinkedHashMap<>(); this.partitions = new LinkedHashMap<>(); this.tableStats = new LinkedHashMap<>(); @@ -453,6 +457,78 @@ public boolean modelExists(ObjectPath modelPath) { return databaseExists(modelPath.getDatabaseName()) && models.containsKey(modelPath); } + // ------ connections ------ + + @Override + public void createConnection( + ObjectPath connectionPath, CatalogConnection connection, boolean ignoreIfExists) + throws ConnectionAlreadyExistException, DatabaseNotExistException { + checkNotNull(connectionPath); + checkNotNull(connection); + if (!databaseExists(connectionPath.getDatabaseName())) { + throw new DatabaseNotExistException(getName(), connectionPath.getDatabaseName()); + } + if (connectionExists(connectionPath)) { + if (!ignoreIfExists) { + throw new ConnectionAlreadyExistException(getName(), connectionPath); + } + } else { + connections.put(connectionPath, connection.copy()); + } + } + + @Override + public void alterConnection( + ObjectPath connectionPath, CatalogConnection newConnection, boolean ignoreIfNotExists) + throws ConnectionNotExistException { + checkNotNull(connectionPath); + checkNotNull(newConnection); + + if (!connectionExists(connectionPath)) { + if (ignoreIfNotExists) { + return; + } + throw new ConnectionNotExistException(getName(), connectionPath); + } + + connections.put(connectionPath, newConnection.copy()); + } + + @Override + public void dropConnection(ObjectPath connectionPath, boolean ignoreIfNotExists) + throws ConnectionNotExistException { + checkNotNull(connectionPath); + if (connectionExists(connectionPath)) { + connections.remove(connectionPath); + } else if (!ignoreIfNotExists) { + throw new ConnectionNotExistException(getName(), connectionPath); + } + } + + @Override + public List listConnections(String databaseName) throws DatabaseNotExistException { + return listObjectsUnderDatabase(connections, databaseName, k -> true); + } + + @Override + public CatalogConnection getConnection(ObjectPath connectionPath) + throws ConnectionNotExistException { + checkNotNull(connectionPath); + + if (!connectionExists(connectionPath)) { + throw new ConnectionNotExistException(getName(), connectionPath); + } else { + return connections.get(connectionPath).copy(); + } + } + + @Override + public boolean connectionExists(ObjectPath connectionPath) { + checkNotNull(connectionPath); + return databaseExists(connectionPath.getDatabaseName()) + && connections.containsKey(connectionPath); + } + // ------ functions ------ @Override diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/AlterConnectionEvent.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/AlterConnectionEvent.java new file mode 100644 index 0000000000000..475a877386fe0 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/AlterConnectionEvent.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.listener; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.CatalogConnection; +import org.apache.flink.table.catalog.ObjectIdentifier; + +/** When a connection is altered, a {@link AlterConnectionEvent} event will be created and fired. */ +@PublicEvolving +public interface AlterConnectionEvent extends ConnectionModificationEvent { + ObjectIdentifier identifier(); + + CatalogConnection newConnection(); + + boolean ignoreIfNotExists(); + + static AlterConnectionEvent createEvent( + final CatalogContext context, + final ObjectIdentifier identifier, + final CatalogConnection newConnection, + final boolean ignoreIfNotExists) { + return new AlterConnectionEvent() { + @Override + public CatalogConnection newConnection() { + return newConnection; + } + + @Override + public boolean ignoreIfNotExists() { + return ignoreIfNotExists; + } + + @Override + public ObjectIdentifier identifier() { + return identifier; + } + + @Override + public CatalogConnection connection() { + throw new IllegalStateException( + "There is no connection in AlterConnectionEvent, use identifier() instead."); + } + + @Override + public boolean isTemporary() { + return false; + } + + @Override + public CatalogContext context() { + return context; + } + }; + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/ConnectionModificationEvent.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/ConnectionModificationEvent.java new file mode 100644 index 0000000000000..98609622f88d2 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/ConnectionModificationEvent.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.listener; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.CatalogConnection; +import org.apache.flink.table.catalog.ObjectIdentifier; + +/** Basic event for connection modification such as create, alter and drop. */ +@PublicEvolving +public interface ConnectionModificationEvent extends CatalogModificationEvent { + + ObjectIdentifier identifier(); + + CatalogConnection connection(); + + boolean isTemporary(); +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CreateConnectionEvent.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CreateConnectionEvent.java new file mode 100644 index 0000000000000..21a5297132bdb --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CreateConnectionEvent.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.listener; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.CatalogConnection; +import org.apache.flink.table.catalog.ObjectIdentifier; + +/** + * When a connection is created, a {@link CreateConnectionEvent} event will be created and fired. + */ +@PublicEvolving +public interface CreateConnectionEvent extends ConnectionModificationEvent { + ObjectIdentifier identifier(); + + CatalogConnection connection(); + + boolean ignoreIfExists(); + + boolean isTemporary(); + + static CreateConnectionEvent createEvent( + final CatalogContext context, + final ObjectIdentifier identifier, + final CatalogConnection connection, + final boolean ignoreIfExists, + final boolean isTemporary) { + return new CreateConnectionEvent() { + @Override + public boolean ignoreIfExists() { + return ignoreIfExists; + } + + @Override + public ObjectIdentifier identifier() { + return identifier; + } + + @Override + public CatalogConnection connection() { + return connection; + } + + @Override + public CatalogContext context() { + return context; + } + + @Override + public boolean isTemporary() { + return isTemporary; + } + }; + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/DropConnectionEvent.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/DropConnectionEvent.java new file mode 100644 index 0000000000000..3affd30326eb6 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/DropConnectionEvent.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.listener; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.CatalogConnection; +import org.apache.flink.table.catalog.ObjectIdentifier; + +import javax.annotation.Nullable; + +/** When a connection is dropped, a {@link DropConnectionEvent} event will be created and fired. */ +@PublicEvolving +public interface DropConnectionEvent extends ConnectionModificationEvent { + ObjectIdentifier identifier(); + + boolean ignoreIfNotExists(); + + boolean isTemporary(); + + CatalogConnection connection(); + + static DropConnectionEvent createEvent( + final CatalogContext context, + final ObjectIdentifier identifier, + @Nullable final CatalogConnection connection, + final boolean ignoreIfNotExists, + final boolean isTemporary) { + return new DropConnectionEvent() { + @Override + public boolean ignoreIfNotExists() { + return ignoreIfNotExists; + } + + @Override + public ObjectIdentifier identifier() { + return identifier; + } + + @Override + @Nullable + public CatalogConnection connection() { + return connection; + } + + @Override + public CatalogContext context() { + return context; + } + + @Override + public boolean isTemporary() { + return isTemporary; + } + }; + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java index 4b79b18c9d155..89f73271aba54 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java @@ -22,17 +22,23 @@ import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.listener.AlterConnectionEvent; import org.apache.flink.table.catalog.listener.AlterDatabaseEvent; import org.apache.flink.table.catalog.listener.AlterModelEvent; import org.apache.flink.table.catalog.listener.AlterTableEvent; import org.apache.flink.table.catalog.listener.CatalogModificationEvent; import org.apache.flink.table.catalog.listener.CatalogModificationListener; +import org.apache.flink.table.catalog.listener.CreateConnectionEvent; import org.apache.flink.table.catalog.listener.CreateDatabaseEvent; import org.apache.flink.table.catalog.listener.CreateModelEvent; import org.apache.flink.table.catalog.listener.CreateTableEvent; +import org.apache.flink.table.catalog.listener.DropConnectionEvent; import org.apache.flink.table.catalog.listener.DropDatabaseEvent; import org.apache.flink.table.catalog.listener.DropModelEvent; import org.apache.flink.table.catalog.listener.DropTableEvent; +import org.apache.flink.table.factories.DefaultConnectionFactory; +import org.apache.flink.table.secret.GenericInMemorySecretStore; +import org.apache.flink.table.secret.WritableSecretStore; import org.apache.flink.table.utils.CatalogManagerMocks; import org.apache.flink.table.utils.ExpressionResolverMocks; import org.apache.flink.table.utils.ParserMock; @@ -365,6 +371,127 @@ public void testModelModificationListener() throws Exception { assertThat(dropTemporaryEvent.identifier().getObjectName()).isEqualTo("model2"); } + @Test + public void testConnectionModificationListener() throws Exception { + CompletableFuture createFuture = new CompletableFuture<>(); + CompletableFuture createTemporaryFuture = new CompletableFuture<>(); + CompletableFuture alterFuture = new CompletableFuture<>(); + CompletableFuture dropFuture = new CompletableFuture<>(); + CompletableFuture dropTemporaryFuture = new CompletableFuture<>(); + WritableSecretStore secretStore = new GenericInMemorySecretStore(); + CatalogManager catalogManager = + CatalogManagerMocks.preparedCatalogManager() + .defaultCatalog("default", new GenericInMemoryCatalog("default")) + .classLoader(CatalogManagerTest.class.getClassLoader()) + .config(new Configuration()) + .catalogModificationListeners( + Collections.singletonList( + new TestingConnectionModificationListener( + createFuture, + createTemporaryFuture, + alterFuture, + dropFuture, + dropTemporaryFuture))) + .catalogStoreHolder( + CatalogStoreHolder.newBuilder() + .classloader(CatalogManagerTest.class.getClassLoader()) + .catalogStore(new GenericInMemoryCatalogStore()) + .config(new Configuration()) + .build()) + .connectionFactory(new DefaultConnectionFactory()) + .writableSecretStore(secretStore) + .build(); + + catalogManager.initSchemaResolver( + true, ExpressionResolverMocks.dummyResolver(), new ParserMock()); + + HashMap options = + new HashMap() { + { + put("type", "kafka"); + put("bootstrap.servers", "localhost:9092"); + put("password", "secret-pw"); + } + }; + + // Create a connection + catalogManager.createConnection( + SensitiveConnection.of(options, null), + ObjectIdentifier.of( + catalogManager.getCurrentCatalog(), + catalogManager.getCurrentDatabase(), + "conn1"), + true); + CreateConnectionEvent createConnectionEvent = createFuture.get(10, TimeUnit.SECONDS); + assertThat(createConnectionEvent.identifier().getObjectName()).isEqualTo("conn1"); + assertThat(createConnectionEvent.ignoreIfExists()).isTrue(); + assertThat(createConnectionEvent.isTemporary()).isFalse(); + // Sensitive field should be stripped from the persisted CatalogConnection + assertThat(createConnectionEvent.connection().getOptions()).doesNotContainKey("password"); + + // Create a temporary connection + catalogManager.createTemporaryConnection( + SensitiveConnection.of(options, null), + ObjectIdentifier.of( + catalogManager.getCurrentCatalog(), + catalogManager.getCurrentDatabase(), + "conn2"), + false); + CreateConnectionEvent createTemporaryEvent = + createTemporaryFuture.get(10, TimeUnit.SECONDS); + assertThat(createTemporaryEvent.isTemporary()).isTrue(); + assertThat(createTemporaryEvent.identifier().getObjectName()).isEqualTo("conn2"); + assertThat(createTemporaryEvent.ignoreIfExists()).isFalse(); + + // Alter a connection + HashMap alteredOptions = + new HashMap() { + { + put("type", "kafka"); + put("bootstrap.servers", "remote:9092"); + put("password", "rotated-pw"); + } + }; + catalogManager.alterConnection( + SensitiveConnection.of(alteredOptions, "conn1 comment"), + ObjectIdentifier.of( + catalogManager.getCurrentCatalog(), + catalogManager.getCurrentDatabase(), + "conn1"), + false); + AlterConnectionEvent alterEvent = alterFuture.get(10, TimeUnit.SECONDS); + assertThat(alterEvent.identifier().getObjectName()).isEqualTo("conn1"); + assertThat(alterEvent.newConnection().getComment()).isEqualTo("conn1 comment"); + assertThat(alterEvent.newConnection().getOptions().get("bootstrap.servers")) + .isEqualTo("remote:9092"); + assertThat(alterEvent.newConnection().getOptions()).doesNotContainKey("password"); + assertThat(alterEvent.ignoreIfNotExists()).isFalse(); + + // Drop a connection + ObjectIdentifier oi = + ObjectIdentifier.of( + catalogManager.getCurrentCatalog(), + catalogManager.getCurrentDatabase(), + "conn1"); + catalogManager.dropConnection(oi, true); + DropConnectionEvent dropEvent = dropFuture.get(10, TimeUnit.SECONDS); + assertThat(dropEvent.ignoreIfNotExists()).isTrue(); + assertThat(dropEvent.identifier().getObjectName()).isEqualTo("conn1"); + assertThat(dropEvent.isTemporary()).isFalse(); + + // Drop a temporary connection + catalogManager.dropTemporaryConnection( + ObjectIdentifier.of( + catalogManager.getCurrentCatalog(), + catalogManager.getCurrentDatabase(), + "conn2"), + false); + DropConnectionEvent dropTemporaryEvent = dropTemporaryFuture.get(10, TimeUnit.SECONDS); + assertThat(dropTemporaryEvent.isTemporary()).isTrue(); + assertThat(dropTemporaryEvent.ignoreIfNotExists()).isFalse(); + assertThat(dropTemporaryEvent.identifier().getObjectName()).isEqualTo("conn2"); + } + private CatalogManager createCatalogManager(@Nullable CatalogModificationListener listener) { CatalogManager.Builder builder = CatalogManager.newBuilder() @@ -457,6 +584,49 @@ public void onEvent(CatalogModificationEvent event) { } } + /** Testing connection modification listener. */ + static class TestingConnectionModificationListener implements CatalogModificationListener { + private final CompletableFuture createFuture; + private final CompletableFuture createTemporaryFuture; + private final CompletableFuture alterFuture; + private final CompletableFuture dropFuture; + private final CompletableFuture dropTemporaryFuture; + + TestingConnectionModificationListener( + CompletableFuture createFuture, + CompletableFuture createTemporaryFuture, + CompletableFuture alterFuture, + CompletableFuture dropFuture, + CompletableFuture dropTemporaryFuture) { + this.createFuture = createFuture; + this.createTemporaryFuture = createTemporaryFuture; + this.alterFuture = alterFuture; + this.dropFuture = dropFuture; + this.dropTemporaryFuture = dropTemporaryFuture; + } + + @Override + public void onEvent(CatalogModificationEvent event) { + if (event instanceof CreateConnectionEvent) { + if (((CreateConnectionEvent) event).isTemporary()) { + createTemporaryFuture.complete((CreateConnectionEvent) event); + } else { + createFuture.complete((CreateConnectionEvent) event); + } + } else if (event instanceof AlterConnectionEvent) { + alterFuture.complete((AlterConnectionEvent) event); + } else if (event instanceof DropConnectionEvent) { + if (((DropConnectionEvent) event).isTemporary()) { + dropTemporaryFuture.complete((DropConnectionEvent) event); + } else { + dropFuture.complete((DropConnectionEvent) event); + } + } else { + throw new UnsupportedOperationException(); + } + } + } + /** Testing model modification listener. */ static class TestingModelModificationListener implements CatalogModificationListener { private final CompletableFuture createFuture; diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java index 52a7cd7ccb530..403d94459fb02 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java @@ -255,6 +255,11 @@ protected boolean supportsModels() { return true; } + @Override + protected boolean supportsConnections() { + return true; + } + @Override protected CatalogFunction createPythonFunction() { return new CatalogFunctionImpl("test.func1", FunctionLanguage.PYTHON); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java index e230b616cdbfd..3db7d7cf2ee8b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java @@ -20,6 +20,8 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.ConnectionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.ConnectionNotExistException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; @@ -932,4 +934,102 @@ default void alterModel( throws ModelNotExistException, CatalogException { alterModel(modelPath, newModel, ignoreIfNotExists); } + + // ------ connections ------ + + /** + * Get names of all connections under this database. An empty list is returned if none exists. + * + * @return a list of the names of all connections in this database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException in case of any runtime exception + */ + default List listConnections(String databaseName) + throws DatabaseNotExistException, CatalogException { + return Collections.emptyList(); + } + + /** + * Returns a {@link CatalogConnection} identified by the given {@link ObjectPath}. + * + * @param connectionPath Path of the connection + * @return The requested connection + * @throws ConnectionNotExistException if the target does not exist + * @throws CatalogException in case of any runtime exception + */ + default CatalogConnection getConnection(ObjectPath connectionPath) + throws ConnectionNotExistException, CatalogException { + throw new UnsupportedOperationException( + String.format( + "getConnection(ObjectPath) is not implemented for %s.", this.getClass())); + } + + /** + * Check if a connection exists in this catalog. + * + * @param connectionPath Path of the connection + * @return true if the given connection exists in the catalog false otherwise + * @throws CatalogException in case of any runtime exception + */ + default boolean connectionExists(ObjectPath connectionPath) throws CatalogException { + return false; + } + + /** + * Creates a new connection. + * + * @param connectionPath path of the connection to be created + * @param connection the CatalogConnection definition + * @param ignoreIfExists flag to specify behavior when a connection already exists at the given + * path: if set to false, it throws a ConnectionAlreadyExistException, if set to true, do + * nothing. + * @throws ConnectionAlreadyExistException if connection already exists and ignoreIfExists is + * false + * @throws DatabaseNotExistException if the database in connectionPath doesn't exist + * @throws CatalogException in case of any runtime exception + */ + default void createConnection( + ObjectPath connectionPath, CatalogConnection connection, boolean ignoreIfExists) + throws ConnectionAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException( + String.format( + "createConnection(ObjectPath, CatalogConnection, boolean) is not implemented for %s.", + this.getClass())); + } + + /** + * Modifies an existing connection. + * + * @param connectionPath path of the connection to be modified + * @param newConnection the new connection definition + * @param ignoreIfNotExists flag to specify behavior when the connection does not exist: if set + * to false, throw an exception, if set to true, do nothing. + * @throws ConnectionNotExistException if the connection does not exist + * @throws CatalogException in case of any runtime exception + */ + default void alterConnection( + ObjectPath connectionPath, CatalogConnection newConnection, boolean ignoreIfNotExists) + throws ConnectionNotExistException, CatalogException { + throw new UnsupportedOperationException( + String.format( + "alterConnection(ObjectPath, CatalogConnection, boolean) is not implemented for %s.", + this.getClass())); + } + + /** + * Drop a connection. + * + * @param connectionPath Path of the connection to be dropped + * @param ignoreIfNotExists Flag to specify behavior when the connection does not exist: if set + * to false, throw an exception, if set to true, do nothing. + * @throws ConnectionNotExistException if the connection does not exist + * @throws CatalogException in case of any runtime exception + */ + default void dropConnection(ObjectPath connectionPath, boolean ignoreIfNotExists) + throws ConnectionNotExistException, CatalogException { + throw new UnsupportedOperationException( + String.format( + "dropConnection(ObjectPath, boolean) is not implemented for %s.", + this.getClass())); + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionAlreadyExistException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionAlreadyExistException.java new file mode 100644 index 0000000000000..871a5146ea2da --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionAlreadyExistException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.exceptions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.ObjectPath; + +/** Exception for trying to create a connection that already exists. */ +@PublicEvolving +public class ConnectionAlreadyExistException extends Exception { + + private static final String MSG = "Connection %s already exists in Catalog %s."; + + public ConnectionAlreadyExistException(String catalogName, ObjectPath connectionPath) { + this(catalogName, connectionPath, null); + } + + public ConnectionAlreadyExistException( + String catalogName, ObjectPath connectionPath, Throwable cause) { + super(String.format(MSG, connectionPath.getFullName(), catalogName), cause); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionNotExistException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionNotExistException.java new file mode 100644 index 0000000000000..020b66b93ea0f --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/exceptions/ConnectionNotExistException.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.exceptions; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.ObjectPath; + +/** Exception for trying to operate on a connection that doesn't exist. */ +@PublicEvolving +public class ConnectionNotExistException extends Exception { + + private static final String MSG = "Connection '`%s`.`%s`.`%s`' does not exist."; + private static final String MSG_WITHOUT_CATALOG = "Connection '`%s`.`%s`' does not exist."; + + public ConnectionNotExistException(String catalogName, ObjectPath connectionPath) { + this(catalogName, connectionPath, null); + } + + public ConnectionNotExistException( + String catalogName, ObjectPath connectionPath, Throwable cause) { + super(formatMsg(catalogName, connectionPath), cause); + } + + private static String formatMsg(String catalogName, ObjectPath connectionPath) { + if (catalogName != null) { + return String.format( + MSG, + catalogName, + connectionPath.getDatabaseName(), + connectionPath.getObjectName()); + } + return String.format( + MSG_WITHOUT_CATALOG, + connectionPath.getDatabaseName(), + connectionPath.getObjectName()); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ConnectionFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ConnectionFactory.java new file mode 100644 index 0000000000000..2e164de85854f --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ConnectionFactory.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.CatalogConnection; +import org.apache.flink.table.catalog.SensitiveConnection; +import org.apache.flink.table.secret.ReadableSecretStore; +import org.apache.flink.table.secret.WritableSecretStore; +import org.apache.flink.table.secret.exceptions.SecretException; + +/** + * Factory for creating and resolving connections, handling the encryption and decryption of + * sensitive connection fields. + * + *

A {@code ConnectionFactory} is responsible for: + * + *

    + *
  • Extracting sensitive fields from a {@link SensitiveConnection} and storing them in a {@link + * WritableSecretStore}, returning a {@link CatalogConnection} that is safe to persist in a + * catalog. + *
  • Resolving a {@link CatalogConnection} from a catalog by retrieving its secrets from a + * {@link ReadableSecretStore} and returning a complete {@link SensitiveConnection}. + *
+ * + * @see org.apache.flink.table.factories.DefaultConnectionFactory + */ +@PublicEvolving +public interface ConnectionFactory extends Factory { + + /** + * Creates a {@link CatalogConnection} from a {@link SensitiveConnection} by extracting + * sensitive fields and storing them in the provided {@link WritableSecretStore}. + * + *

The returned {@link CatalogConnection} contains only non-sensitive options plus a secret + * reference that can be used to retrieve the sensitive fields later via {@link + * #resolveConnection(CatalogConnection, ReadableSecretStore)}. + * + * @param connection the connection with all options including sensitive fields + * @param secretStore the secret store where sensitive fields will be stored + * @return a catalog-safe connection with sensitive fields replaced by a secret reference + * @throws SecretException if storing the secret fails (e.g. underlying-store error) + */ + CatalogConnection createConnection( + SensitiveConnection connection, WritableSecretStore secretStore) throws SecretException; + + /** + * Resolves a {@link CatalogConnection} into a {@link SensitiveConnection} by retrieving secrets + * from the provided {@link ReadableSecretStore}. + * + * @param connection the catalog connection containing non-sensitive options and a secret + * reference + * @param secretStore the secret store from which sensitive fields are retrieved + * @return the complete connection with all options including sensitive fields + * @throws SecretException if retrieving the secret fails (e.g. underlying-store error) + */ + SensitiveConnection resolveConnection( + CatalogConnection connection, ReadableSecretStore secretStore) throws SecretException; + + /** + * Deletes any secrets associated with the given {@link CatalogConnection} from the provided + * {@link WritableSecretStore}. + * + *

Implementations should locate the secret reference embedded in the connection (created by + * {@link #createConnection(SensitiveConnection, WritableSecretStore)}) and remove the + * corresponding entry from the secret store. This is intended to be called when a connection is + * dropped or replaced (e.g. on alter), to avoid orphaned secrets. + * + *

The default implementation is a no-op for factories that do not externalize secrets. + * + * @param connection the catalog connection whose backing secrets should be removed + * @param secretStore the secret store from which secrets should be deleted + * @throws SecretException if removing the secret fails (e.g. underlying-store error) + */ + default void deleteSecrets(CatalogConnection connection, WritableSecretStore secretStore) + throws SecretException {} +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DefaultConnectionFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DefaultConnectionFactory.java new file mode 100644 index 0000000000000..e9d11f27f076a --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DefaultConnectionFactory.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogConnection; +import org.apache.flink.table.catalog.SensitiveConnection; +import org.apache.flink.table.secret.ReadableSecretStore; +import org.apache.flink.table.secret.WritableSecretStore; +import org.apache.flink.table.secret.exceptions.SecretException; +import org.apache.flink.table.secret.exceptions.SecretNotFoundException; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Default implementation of {@link ConnectionFactory} that identifies sensitive fields by a + * predefined whitelist of field names. + * + *

During {@link #createConnection}, sensitive fields are extracted from the connection options + * and stored as a single secret in the {@link WritableSecretStore}. A reference key ({@value + * #SECRET_REFERENCE_KEY}) pointing to the stored secret is added to the returned {@link + * CatalogConnection}. + * + *

During {@link #resolveConnection}, the secret reference is used to retrieve the sensitive + * fields from the {@link ReadableSecretStore} and merge them back into the options. + * + *

The following field names are treated as sensitive by default: {@code password}, {@code + * secret}, {@code fs.azure.account.key}, {@code apikey}, {@code api-key}, {@code auth-params}, + * {@code service-key}, {@code token}, {@code basic-auth}, {@code jaas.config}, {@code + * http-headers}. + */ +@Internal +public class DefaultConnectionFactory implements ConnectionFactory { + + /** + * Reserved option key used to store the reference to secrets in the secret store. The + * surrounding double underscores make collision with user-supplied option names unlikely; user + * options containing this key will be rejected at create-time. + */ + public static final String SECRET_REFERENCE_KEY = "__flink.encrypted_secret_key__"; + + private static final Set SENSITIVE_FIELD_NAMES = + Collections.unmodifiableSet( + new HashSet<>( + Arrays.asList( + "password", + "secret", + "fs.azure.account.key", + "apikey", + "api-key", + "auth-params", + "service-key", + "token", + "basic-auth", + "jaas.config", + "http-headers"))); + + @Override + public String factoryIdentifier() { + return "default"; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } + + @Override + public CatalogConnection createConnection( + SensitiveConnection connection, WritableSecretStore secretStore) { + Map allOptions = connection.getOptions(); + + if (allOptions.containsKey(SECRET_REFERENCE_KEY)) { + throw new ValidationException( + String.format( + "Connection option '%s' is reserved and cannot be set by users.", + SECRET_REFERENCE_KEY)); + } + + Map sensitiveOptions = + allOptions.entrySet().stream() + .filter(e -> SENSITIVE_FIELD_NAMES.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Map nonSensitiveOptions = + allOptions.entrySet().stream() + .filter(e -> !SENSITIVE_FIELD_NAMES.contains(e.getKey())) + .collect( + Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (a, b) -> a, + HashMap::new)); + + if (!sensitiveOptions.isEmpty()) { + final String secretId; + try { + secretId = secretStore.storeSecret(sensitiveOptions); + } catch (SecretException e) { + throw e; + } catch (RuntimeException e) { + throw new SecretException("Failed to store connection secret.", e); + } + nonSensitiveOptions.put(SECRET_REFERENCE_KEY, secretId); + } + + return CatalogConnection.of(nonSensitiveOptions, connection.getComment()); + } + + @Override + public SensitiveConnection resolveConnection( + CatalogConnection connection, ReadableSecretStore secretStore) { + Map options = new HashMap<>(connection.getOptions()); + + String secretId = options.remove(SECRET_REFERENCE_KEY); + if (secretId != null) { + try { + Map secrets = secretStore.getSecret(secretId); + options.putAll(secrets); + } catch (SecretNotFoundException e) { + throw new ValidationException( + String.format( + "Failed to resolve connection secrets. Secret with ID '%s' not found.", + secretId), + e); + } catch (SecretException e) { + throw e; + } catch (RuntimeException e) { + throw new SecretException( + String.format( + "Failed to retrieve connection secret with ID '%s'.", secretId), + e); + } + } + + return SensitiveConnection.of(options, connection.getComment()); + } + + @Override + public void deleteSecrets(CatalogConnection connection, WritableSecretStore secretStore) { + String secretId = connection.getOptions().get(SECRET_REFERENCE_KEY); + if (secretId != null) { + try { + secretStore.removeSecret(secretId); + } catch (SecretException e) { + throw e; + } catch (RuntimeException e) { + throw new SecretException( + String.format("Failed to remove connection secret with ID '%s'.", secretId), + e); + } + } + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java index 70dd48e21f979..1cf7120abedc7 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java @@ -19,6 +19,7 @@ package org.apache.flink.table.secret; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.secret.exceptions.SecretException; import org.apache.flink.table.secret.exceptions.SecretNotFoundException; import java.util.Map; @@ -40,6 +41,8 @@ public interface ReadableSecretStore extends SecretStore { * @param secretId the unique identifier of the secret to retrieve * @return a map containing the secret data as key-value pairs * @throws SecretNotFoundException if the secret with the given identifier does not exist + * @throws SecretException if the operation fails due to underlying-store errors (network, + * permission, etc.) */ - Map getSecret(String secretId) throws SecretNotFoundException; + Map getSecret(String secretId) throws SecretNotFoundException, SecretException; } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java index db5037b7b2f53..11167d861a607 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java @@ -19,6 +19,7 @@ package org.apache.flink.table.secret; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.secret.exceptions.SecretException; import org.apache.flink.table.secret.exceptions.SecretNotFoundException; import java.util.Map; @@ -40,15 +41,19 @@ public interface WritableSecretStore extends SecretStore { * * @param secretData a map containing the secret data as key-value pairs to be stored * @return a unique identifier for the stored secret + * @throws SecretException if the operation fails due to underlying-store errors (network, + * permission, quota, etc.) */ - String storeSecret(Map secretData); + String storeSecret(Map secretData) throws SecretException; /** * Removes a secret from the secret store. * * @param secretId the unique identifier of the secret to remove + * @throws SecretException if the operation fails due to underlying-store errors (network, + * permission, etc.) */ - void removeSecret(String secretId); + void removeSecret(String secretId) throws SecretException; /** * Atomically updates an existing secret with new data. @@ -58,7 +63,9 @@ public interface WritableSecretStore extends SecretStore { * @param secretId the unique identifier of the secret to update * @param newSecretData a map containing the new secret data as key-value pairs * @throws SecretNotFoundException if the secret with the given identifier does not exist + * @throws SecretException if the operation fails due to underlying-store errors (network, + * permission, etc.) */ void updateSecret(String secretId, Map newSecretData) - throws SecretNotFoundException; + throws SecretNotFoundException, SecretException; } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java index 228b5485f0168..ba4ed5bbbc4bb 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java @@ -21,6 +21,8 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.ConnectionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.ConnectionNotExistException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; @@ -68,12 +70,16 @@ public abstract class CatalogTest { protected final String t3 = "t3"; protected final String m1 = "m1"; protected final String m2 = "m2"; + protected final String c1 = "c1"; + protected final String c2 = "c2"; protected final ObjectPath path1 = new ObjectPath(db1, t1); protected final ObjectPath path2 = new ObjectPath(db2, t2); protected final ObjectPath path3 = new ObjectPath(db1, t2); protected final ObjectPath path4 = new ObjectPath(db1, t3); protected final ObjectPath modelPath1 = new ObjectPath(db1, m1); protected final ObjectPath modelPath2 = new ObjectPath(db1, m2); + protected final ObjectPath connectionPath1 = new ObjectPath(db1, c1); + protected final ObjectPath connectionPath2 = new ObjectPath(db1, c2); protected final ObjectPath nonExistDbPath = ObjectPath.fromString("non.exist"); protected final ObjectPath nonExistObjectPath = ObjectPath.fromString("db1.nonexist"); @@ -108,6 +114,14 @@ void cleanup() throws Exception { catalog.dropModel(modelPath2, true); } } + if (supportsConnections()) { + if (catalog.connectionExists(connectionPath1)) { + catalog.dropConnection(connectionPath1, true); + } + if (catalog.connectionExists(connectionPath2)) { + catalog.dropConnection(connectionPath2, true); + } + } // Delete db last so that other resources can be found and dropped if (catalog.databaseExists(db1)) { @@ -463,6 +477,199 @@ public void testDropMissingModelIgnoreIfNotExist() throws Exception { catalog.dropModel(modelPath1, true); } + // ------ connections ------ + @Test + public void testCreateConnection() throws Exception { + if (!supportsConnections()) { + return; + } + catalog.createDatabase(db1, createDb(), false); + CatalogConnection connection = createConnection(); + catalog.createConnection(connectionPath1, connection, false); + + List connections = catalog.listConnections(db1); + assertThat(connections).isEqualTo(Collections.singletonList(c1)); + } + + @Test + public void testCreateConnection_DatabaseNotExistException() { + if (!supportsConnections()) { + return; + } + assertThat(catalog.databaseExists(db1)).isFalse(); + + assertThatThrownBy( + () -> + catalog.createConnection( + nonExistObjectPath, createConnection(), false)) + .isInstanceOf(DatabaseNotExistException.class) + .hasMessage("Database db1 does not exist in Catalog " + TEST_CATALOG_NAME + "."); + } + + @Test + public void testCreateConnection_ConnectionAlreadyExistException() throws Exception { + if (!supportsConnections()) { + return; + } + catalog.createDatabase(db1, createDb(), false); + catalog.createConnection(connectionPath1, createConnection(), false); + + assertThatThrownBy( + () -> catalog.createConnection(connectionPath1, createConnection(), false)) + .isInstanceOf(ConnectionAlreadyExistException.class) + .hasMessage( + "Connection db1.c1 already exists in Catalog " + TEST_CATALOG_NAME + "."); + } + + @Test + public void testCreateConnection_ConnectionAlreadyExist_ignored() throws Exception { + if (!supportsConnections()) { + return; + } + catalog.createDatabase(db1, createDb(), false); + + CatalogConnection connection = createConnection(); + catalog.createConnection(connectionPath1, connection, false); + catalog.createConnection(connectionPath1, connection, true); + + List connections = catalog.listConnections(db1); + assertThat(connections).isEqualTo(Collections.singletonList(c1)); + } + + @Test + public void testListConnections() throws Exception { + if (!supportsConnections()) { + return; + } + catalog.createDatabase(db1, createDb(), false); + + catalog.createConnection(connectionPath1, createConnection(), false); + catalog.createConnection(connectionPath2, createConnection(), false); + + assertThat(catalog.listConnections(db1)).isEqualTo(Arrays.asList(c1, c2)); + } + + @Test + public void testGetConnection() throws Exception { + if (!supportsConnections()) { + return; + } + catalog.createDatabase(db1, createDb(), false); + catalog.createConnection(connectionPath1, createConnection(), false); + assertThat(catalog.getConnection(connectionPath1)).isNotNull(); + } + + @Test + public void testGetConnection_ConnectionNotExistException() throws Exception { + if (!supportsConnections()) { + return; + } + catalog.createDatabase(db1, createDb(), false); + assertThatThrownBy(() -> catalog.getConnection(connectionPath1)) + .isInstanceOf(ConnectionNotExistException.class) + .hasMessage("Connection '`test-catalog`.`db1`.`c1`' does not exist."); + } + + @Test + public void testDropConnection() throws Exception { + if (!supportsConnections()) { + return; + } + catalog.createDatabase(db1, createDb(), false); + catalog.createConnection(connectionPath1, createConnection(), false); + assertThat(catalog.getConnection(connectionPath1)).isNotNull(); + catalog.dropConnection(connectionPath1, false); + assertThatThrownBy(() -> catalog.getConnection(connectionPath1)) + .isInstanceOf(ConnectionNotExistException.class) + .hasMessage("Connection '`test-catalog`.`db1`.`c1`' does not exist."); + } + + @Test + public void testAlterConnection() throws Exception { + if (!supportsConnections()) { + return; + } + catalog.createDatabase(db1, createDb(), false); + catalog.createConnection(connectionPath1, createConnection(), false); + assertThat(catalog.getConnection(connectionPath1)).isNotNull(); + CatalogConnection newConnection = + CatalogConnection.of( + new HashMap() { + { + put("type", "kafka"); + put("bootstrap.servers", "remote:9092"); + put("group.id", "my-group"); + } + }, + "updated connection"); + catalog.alterConnection(connectionPath1, newConnection, false); + assertThat(catalog.getConnection(connectionPath1).getComment()) + .isEqualTo("updated connection"); + Map expectedOptions = new HashMap<>(); + expectedOptions.put("type", "kafka"); + expectedOptions.put("bootstrap.servers", "remote:9092"); + expectedOptions.put("group.id", "my-group"); + assertThat(catalog.getConnection(connectionPath1).getOptions()).isEqualTo(expectedOptions); + } + + @Test + public void testAlterConnection_ConnectionNotExistException() throws Exception { + if (!supportsConnections()) { + return; + } + catalog.createDatabase(db1, createDb(), false); + CatalogConnection newConnection = + CatalogConnection.of( + new HashMap() { + { + put("type", "kafka"); + put("bootstrap.servers", "remote:9092"); + } + }, + "new connection"); + assertThatThrownBy(() -> catalog.alterConnection(connectionPath1, newConnection, false)) + .isInstanceOf(ConnectionNotExistException.class) + .hasMessage("Connection '`test-catalog`.`db1`.`c1`' does not exist."); + } + + @Test + public void testAlterMissingConnectionIgnoreIfNotExist() throws Exception { + if (!supportsConnections()) { + return; + } + catalog.createDatabase(db1, createDb(), false); + CatalogConnection newConnection = + CatalogConnection.of( + new HashMap() { + { + put("type", "kafka"); + put("bootstrap.servers", "remote:9092"); + } + }, + "new connection"); + catalog.alterConnection(connectionPath1, newConnection, true); + } + + @Test + public void testDropMissingConnectionNotExistException() throws Exception { + if (!supportsConnections()) { + return; + } + catalog.createDatabase(db1, createDb(), false); + assertThatThrownBy(() -> catalog.dropConnection(connectionPath1, false)) + .isInstanceOf(ConnectionNotExistException.class) + .hasMessage("Connection '`test-catalog`.`db1`.`c1`' does not exist."); + } + + @Test + public void testDropMissingConnectionIgnoreIfNotExist() throws Exception { + if (!supportsConnections()) { + return; + } + catalog.createDatabase(db1, createDb(), false); + catalog.dropConnection(connectionPath1, true); + } + // ------ tables ------ @Test @@ -1614,6 +1821,19 @@ public void testAlterTableStats_TableNotExistException_ignore() throws Exception protected abstract boolean supportsModels(); + protected abstract boolean supportsConnections(); + + protected CatalogConnection createConnection() { + return CatalogConnection.of( + new HashMap() { + { + put("type", "kafka"); + put("bootstrap.servers", "localhost:9092"); + } + }, + null); + } + protected ResolvedSchema createSchema() { return new ResolvedSchema( Arrays.asList(