diff --git a/linkis-dist/package/db/linkis_dml.sql b/linkis-dist/package/db/linkis_dml.sql index 831c24ff37c..e59498ccafa 100644 --- a/linkis-dist/package/db/linkis_dml.sql +++ b/linkis-dist/package/db/linkis_dml.sql @@ -571,6 +571,19 @@ INSERT INTO `linkis_ps_dm_datasource_type` (`name`, `description`, `option`, `cl INSERT INTO `linkis_ps_dm_datasource_type` (`name`, `description`, `option`, `classifier`, `icon`, `layers`) VALUES ('doris','doris数据库','doris','olap','',4); INSERT INTO `linkis_ps_dm_datasource_type` (`name`, `description`, `option`, `classifier`, `icon`, `layers`) VALUES ('clickhouse','clickhouse数据库','clickhouse','olap','',4); INSERT INTO `linkis_ps_dm_datasource_type` (`name`, `description`, `option`, `classifier`, `icon`, `layers`) VALUES ('starrocks','starrocks数据库','starrocks','olap','',4); +INSERT INTO `linkis_ps_dm_datasource_type` (`name`, `description`, `option`, `classifier`, `icon`, `layers`) VALUES ('oscar','神通数据库','oscar','关系型数据库','',3); + +select @data_source_type_id := id from `linkis_ps_dm_datasource_type` where `name` = 'oscar'; +INSERT INTO `linkis_ps_dm_datasource_type_key` + (`data_source_type_id`, `key`, `name`, `name_en`, `default_value`, `value_type`, `scope`, `require`, `description`, `description_en`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) +VALUES (@data_source_type_id, 'address', '地址', 'Address', NULL, 'TEXT', NULL, 0, '地址(host1:port1,host2:port2...)', 'Address(host1:port1,host2:port2...)', NULL, NULL, NULL, NULL, now(), now()), + (@data_source_type_id, 'host', '主机名(Host)', 'Host', NULL, 'TEXT', NULL, 1, '主机名(Host)', 'Host', NULL, NULL, NULL, NULL, now(), now()), + (@data_source_type_id, 'port', '端口号(Port)', 'Port', NULL, 'TEXT', NULL, 1, '端口号(Port)', 'Port', NULL, NULL, NULL, NULL, now(), now()), + (@data_source_type_id, 'driverClassName', '驱动类名(Driver class name)', 'Driver class name', 'com.oscar.Driver', 'TEXT', NULL, 1, '驱动类名(Driver class name)', 'Driver class name', NULL, NULL, NULL, NULL, now(), now()), + (@data_source_type_id, 'params', '连接参数(Connection params)', 'Connection params', NULL, 'TEXT', NULL, 0, '输入JSON格式(Input JSON format): {"param":"value"}', 'Input JSON format: {"param":"value"}', NULL, NULL, NULL, NULL, now(), now()), + (@data_source_type_id, 'username', '用户名(Username)', 'Username', NULL, 'TEXT', NULL, 1, '用户名(Username)', 'Username', '^[0-9A-Za-z_-]+$', NULL, NULL, NULL, now(), now()), + (@data_source_type_id, 'password', '密码(Password)', 'Password', NULL, 'PASSWORD', NULL, 1, '密码(Password)', 'Password', '', NULL, NULL, NULL, now(), now()), + (@data_source_type_id, 'instance', '实例名(instance)', 'Instance', NULL, 'TEXT', NULL, 1, '实例名(instance)', 'Instance', NULL, NULL, NULL, NULL, now(), now()); select @data_source_type_id := id from `linkis_ps_dm_datasource_type` where `name` = 'mongodb'; INSERT INTO `linkis_ps_dm_datasource_type_key` (`data_source_type_id`, `key`, `name`, `name_en`, `default_value`, `value_type`, `scope`, `require`, `description`, `description_en`, `value_regex`, `ref_id`, `ref_value`, `data_source`, `update_time`, `create_time`) VALUES (@data_source_type_id, 'username', '用户名', 'Username', NULL, 'TEXT', NULL, 1, '用户名', 'Username', '^[0-9A-Za-z_-]+$', NULL, '', NULL, now(), now()); diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/restful/MetadataCoreRestful.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/restful/MetadataCoreRestful.java index ef8517a69fa..ab3e5084768 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/restful/MetadataCoreRestful.java +++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/restful/MetadataCoreRestful.java @@ -310,6 +310,53 @@ public Message getColumns( } } + @RequestMapping( + value = "/exists/{dataSourceId}/db/{database}/table/{table}", + method = RequestMethod.GET) + public Message existsTable( + @PathVariable("dataSourceId") String dataSourceId, + @PathVariable("database") String database, + @PathVariable("table") String table, + @RequestParam("system") String system, + HttpServletRequest request) { + try { + if (StringUtils.isBlank(system)) { + return Message.error("'system' is missing[缺少系统名]"); + } + if (!MetadataUtils.nameRegexPattern.matcher(system).matches()) { + return Message.error("'system' is invalid[系统名错误]"); + } + if (!MetadataUtils.nameRegexPattern.matcher(database).matches()) { + return Message.error("'database' is invalid[数据库名错误]"); + } + if (!MetadataUtils.nameRegexPattern.matcher(table).matches()) { + return Message.error("'table' is invalid[表名错误]"); + } + if (!MetadataUtils.nameRegexPattern.matcher(dataSourceId).matches()) { + return Message.error("'dataSourceId' is invalid[数据源错误]"); + } + + String userName = + ModuleUserUtils.getOperationUser(request, "existsTable, dataSourceId:" + dataSourceId); + boolean exists = + metadataAppService.existsTableByDsId(dataSourceId, database, table, system, userName); + return Message.ok().data("exists", exists); + } catch (Exception e) { + return errorToResponseMessage( + "Fail to check table existence[判断表是否存在失败], id:[" + + dataSourceId + + "]" + + ", system:[" + + system + + "], database:[" + + database + + "], table:[" + + table + + "]", + e); + } + } + private Message errorToResponseMessage(String uiMessage, Exception e) { if (e instanceof MetaMethodInvokeException) { MetaMethodInvokeException invokeException = (MetaMethodInvokeException) e; diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/restful/MetadataQueryRestful.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/restful/MetadataQueryRestful.java index b8d2d58abe0..138308bb372 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/restful/MetadataQueryRestful.java +++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/restful/MetadataQueryRestful.java @@ -409,6 +409,63 @@ public Message getColumns( } } + @ApiOperation(value = "existsTable", notes = "check if table exists", response = Message.class) + @ApiImplicitParams({ + @ApiImplicitParam(name = "dataSourceName", required = true, dataType = "String"), + @ApiImplicitParam(name = "envId", required = false, dataType = "String"), + @ApiImplicitParam(name = "system", required = true, dataType = "String"), + @ApiImplicitParam(name = "database", required = true, dataType = "String"), + @ApiImplicitParam(name = "table", required = true, dataType = "String") + }) + @RequestMapping(value = "/existsTable", method = RequestMethod.GET) + public Message existsTable( + @RequestParam("dataSourceName") String dataSourceName, + @RequestParam(value = "envId", required = false) String envId, + @RequestParam("database") String database, + @RequestParam("table") String table, + @RequestParam("system") String system, + HttpServletRequest request) { + try { + if (StringUtils.isBlank(system)) { + return Message.error("'system' is missing[缺少系统名]"); + } + if (!MetadataUtils.nameRegexPattern.matcher(system).matches()) { + return Message.error("'system' is invalid[系统名错误]"); + } + if (!MetadataUtils.nameRegexPattern.matcher(database).matches()) { + return Message.error("'database' is invalid[数据库名错误]"); + } + if (!MetadataUtils.nameRegexPattern.matcher(table).matches()) { + return Message.error("'table' is invalid[表名错误]"); + } + if (!MetadataUtils.nameRegexPattern.matcher(dataSourceName).matches()) { + return Message.error("'dataSourceName' is invalid[数据源错误]"); + } + + String userName = + ModuleUserUtils.getOperationUser( + request, "existsTable, dataSourceName:" + dataSourceName); + + boolean exists = + metadataQueryService.existsTableByDsNameAndEnvId( + dataSourceName, database, table, system, userName, envId); + return Message.ok().data("exists", exists); + } catch (Exception e) { + return errorToResponseMessage( + "Fail to check table existence[判断表是否存在失败], name:[" + + dataSourceName + + "]" + + ", system:[" + + system + + "], database:[" + + database + + "], table:[" + + table + + "]", + e); + } + } + private Message errorToResponseMessage(String uiMessage, Exception e) { if (e instanceof MetaMethodInvokeException) { MetaMethodInvokeException invokeException = (MetaMethodInvokeException) e; diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/MetadataQueryService.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/MetadataQueryService.java index f156058ec19..e8866387db9 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/MetadataQueryService.java +++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/MetadataQueryService.java @@ -114,6 +114,22 @@ List getColumnsByDsId( String dataSourceId, String database, String table, String system, String userName) throws ErrorException; + /** + * Check if the table exists by data source id + * + * @param dataSourceId data source id + * @param database database + * @param table table + * @param system system + * @param userName userName + * @return true if the table exists + * @throws ErrorException + */ + @Deprecated + boolean existsTableByDsId( + String dataSourceId, String database, String table, String system, String userName) + throws ErrorException; + /** * Get connection information * @@ -224,6 +240,21 @@ List getColumnsByDsName( String dataSourceName, String database, String table, String system, String userName) throws ErrorException; + /** + * Check if the table exists by data source name + * + * @param dataSourceName data source name + * @param database database + * @param table table + * @param system system + * @param userName userName + * @return true if the table exists + * @throws ErrorException + */ + boolean existsTableByDsName( + String dataSourceName, String database, String table, String system, String userName) + throws ErrorException; + /** * @param dataSourceName * @param database @@ -242,4 +273,25 @@ List getColumnsByDsNameAndEnvId( String userName, String envId) throws ErrorException; + + /** + * Check if the table exists by data source name and env id + * + * @param dataSourceName data source name + * @param database database + * @param table table + * @param system system + * @param userName userName + * @param envId env id + * @return true if the table exists + * @throws ErrorException + */ + boolean existsTableByDsNameAndEnvId( + String dataSourceName, + String database, + String table, + String system, + String userName, + String envId) + throws ErrorException; } diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java index ab38c4228b0..1caf5776f3c 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java +++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java @@ -192,6 +192,26 @@ public List getColumnsByDsId( return new ArrayList<>(); } + @Override + @Deprecated + public boolean existsTableByDsId( + String dataSourceId, String database, String table, String system, String userName) + throws ErrorException { + DsInfoResponse dsInfoResponse = reqToGetDataSourceInfo(dataSourceId, system, userName); + if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { + Boolean exists = + invokeMetaMethod( + dsInfoResponse.getDsType(), + "existsTable", + new Object[] { + dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table + }, + Boolean.class); + return Objects.nonNull(exists) && exists; + } + return false; + } + @Override public List getDatabasesByDsName(String dataSourceName, String system, String userName) throws ErrorException { @@ -341,6 +361,25 @@ public List getColumnsByDsName( return new ArrayList<>(); } + @Override + public boolean existsTableByDsName( + String dataSourceName, String database, String table, String system, String userName) + throws ErrorException { + DsInfoResponse dsInfoResponse = queryDataSourceInfoByName(dataSourceName, system, userName); + if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { + Boolean exists = + invokeMetaMethod( + dsInfoResponse.getDsType(), + "existsTable", + new Object[] { + dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table + }, + Boolean.class); + return Objects.nonNull(exists) && exists; + } + return false; + } + @Override public List getColumnsByDsNameAndEnvId( String dataSourceName, @@ -362,6 +401,31 @@ public List getColumnsByDsNameAndEnvId( return new ArrayList<>(); } + @Override + public boolean existsTableByDsNameAndEnvId( + String dataSourceName, + String database, + String table, + String system, + String userName, + String envId) + throws ErrorException { + DsInfoResponse dsInfoResponse = + queryDataSourceInfoByNameAndEnvId(dataSourceName, system, userName, envId); + if (StringUtils.isNotBlank(dsInfoResponse.getDsType())) { + Boolean exists = + invokeMetaMethod( + dsInfoResponse.getDsType(), + "existsTable", + new Object[] { + dsInfoResponse.getCreator(), dsInfoResponse.getParams(), database, table + }, + Boolean.class); + return Objects.nonNull(exists) && exists; + } + return false; + } + /** * Request to get data source information (type and connection parameters) * diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/elasticsearch/src/main/java/org/apache/linkis/metadata/query/service/ElasticConnection.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/elasticsearch/src/main/java/org/apache/linkis/metadata/query/service/ElasticConnection.java index 5f47452cd69..b5683ec8ed0 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/elasticsearch/src/main/java/org/apache/linkis/metadata/query/service/ElasticConnection.java +++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/elasticsearch/src/main/java/org/apache/linkis/metadata/query/service/ElasticConnection.java @@ -45,7 +45,11 @@ public class ElasticConnection implements Closeable { private static final String DEFAULT_MAPPING_NAME = "mappings"; private static final String DEFAULT_INDEX_NAME = "index"; - private static final String FIELD_PROPS = "properties"; + /** + * Nested properties key in ElasticSearch mappings, used to descend into nested objects + * (ElasticSearch mappings 中嵌套对象的 properties 键,用于递归展开嵌套字段) + */ + public static final String FIELD_PROPS = "properties"; private RestClient restClient; diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/elasticsearch/src/main/java/org/apache/linkis/metadata/query/service/EsMetaService.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/elasticsearch/src/main/java/org/apache/linkis/metadata/query/service/EsMetaService.java index e462d088ee1..2f977687e43 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/elasticsearch/src/main/java/org/apache/linkis/metadata/query/service/EsMetaService.java +++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/elasticsearch/src/main/java/org/apache/linkis/metadata/query/service/EsMetaService.java @@ -23,9 +23,12 @@ import org.apache.linkis.metadata.query.common.service.AbstractDbMetaService; import org.apache.linkis.metadata.query.common.service.MetadataConnection; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; +import java.util.Objects; public class EsMetaService extends AbstractDbMetaService { @Override @@ -76,22 +79,69 @@ public List queryColumns( ElasticConnection connection, String database, String table) { try { Map props = connection.getProps(database, table); - return props.entrySet().stream() - .map( - entry -> { - MetaColumnInfo info = new MetaColumnInfo(); - info.setName(String.valueOf(entry.getKey())); - Object value = entry.getValue(); - if (value instanceof Map) { - info.setType( - String.valueOf( - ((Map) value).getOrDefault(ElasticConnection.DEFAULT_TYPE_NAME, ""))); - } - return info; - }) - .collect(Collectors.toList()); + List columns = new ArrayList<>(); + if (Objects.nonNull(props)) { + // Flatten nested object fields recursively(递归展开嵌套对象字段) + flattenFields(props, "", columns); + } + return columns; } catch (Exception e) { throw new RuntimeException("Fail to get ElasticSearch columns(获取索引字段失败)", e); } } + + /** Separator between levels of a flattened nested field name(展开嵌套字段时层级之间的分隔符) */ + private static final String FIELD_NAME_SEPARATOR = "|"; + + /** + * Recursively flatten ElasticSearch mapping properties into a flat column list. + * + *

For each entry of the props map: + * + *

    + *
  • If the value contains {@link ElasticConnection#DEFAULT_TYPE_NAME}, stop recursion and add + * a column whose name is the parent prefix joined with the key (or just the key at the top + * level) and whose type is the value of {@code type}. + *
  • Else if the value contains {@link ElasticConnection#FIELD_PROPS} as a map, descend into + * it with the key appended to the prefix. + *
  • Otherwise stop recursion and add nothing. + *
+ * + * (递归展开 ES mappings properties 为扁平字段列表:value 含 type 则停止递归并加字段;含 properties 且为 Map 则用 properties + * 进入下一层;其他情况停止递归不加字段) + * + * @param props properties map at current level + * @param prefix parent field name prefix, empty at the top level + * @param columns accumulator for flattened columns + */ + @SuppressWarnings("unchecked") + private void flattenFields( + Map props, String prefix, List columns) { + for (Map.Entry entry : props.entrySet()) { + Object value = entry.getValue(); + if (!(value instanceof Map)) { + // Not a structured mapping, stop recursion(非结构化 mapping,停止递归) + continue; + } + Map fieldMap = (Map) value; + String key = String.valueOf(entry.getKey()); + String fieldName = StringUtils.isBlank(prefix) ? key : prefix + FIELD_NAME_SEPARATOR + key; + if (fieldMap.containsKey(ElasticConnection.DEFAULT_TYPE_NAME)) { + // Leaf field, stop recursion(叶子字段,停止递归) + MetaColumnInfo info = new MetaColumnInfo(); + info.setIndex(columns.size()); + info.setName(fieldName); + info.setType( + String.valueOf(fieldMap.getOrDefault(ElasticConnection.DEFAULT_TYPE_NAME, ""))); + columns.add(info); + } else if (fieldMap.containsKey(ElasticConnection.FIELD_PROPS) + && fieldMap.get(ElasticConnection.FIELD_PROPS) instanceof Map) { + // Nested object, descend with the properties(嵌套对象,用 properties 进入下一层) + Map nestedProps = + (Map) fieldMap.get(ElasticConnection.FIELD_PROPS); + flattenFields(nestedProps, fieldName, columns); + } + // Otherwise stop recursion(其他情况停止递归,不加字段) + } + } } diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/hive/src/main/java/org/apache/linkis/metadata/query/service/HiveMetaService.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/hive/src/main/java/org/apache/linkis/metadata/query/service/HiveMetaService.java index 73c4e86f7bd..049c496b225 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/hive/src/main/java/org/apache/linkis/metadata/query/service/HiveMetaService.java +++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/hive/src/main/java/org/apache/linkis/metadata/query/service/HiveMetaService.java @@ -31,9 +31,11 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; @@ -58,6 +60,12 @@ public class HiveMetaService extends AbstractDbMetaService { private static final String PARTITION_KV_SEPARATOR = "="; private static final String PARTITION_CV_SEPARATOR = "/"; + /** + * Placeholder table name used to request database properties when the client does not allow an + * empty table name(部分终端不允许传空表名,用该占位表名代表查询库参数) + */ + private static final String DB_DEFAULT_PLACEHOLDER_TABLE = "__DB_DEFAULT__"; + public HiveMetaService() { client = BmlClientFactory.createBmlClient(); } @@ -219,10 +227,63 @@ public List queryColumns( return columns; } + @Override + public boolean queryExistsTable(HiveConnection connection, String database, String table) { + try { + // Throw InvalidTableException when the table does not exist(表不存在时抛 InvalidTableException) + connection.getClient().getTable(database, table); + return true; + } catch (InvalidTableException e) { + // Table not found(表不存在), log message only without stack trace + LOG.warn( + "Hive table not exists:[" + + database + + "." + + table + + "], message:[" + + e.getMessage() + + "](Hive表不存在)"); + return false; + } catch (Exception e) { + // Other exceptions are rethrown(其他异常直接抛出) + throw new RuntimeException( + "Fail to check Hive table existence(判断Hive表是否存在失败):[" + database + "." + table + "]", e); + } + } + @Override public Map queryTableProps( HiveConnection connection, String database, String table) { try { + // When the table is the placeholder, fall back to query the database properties + // only if the placeholder table does not really exist(当表名为占位符时,仅在该占位表真实不存在时才回退查询数据库参数及常用元数据) + if (isDbDefaultPlaceholder(table) && !placeholderTableExists(connection, database, table)) { + Database database0 = connection.getClient().getDatabase(database); + Map properties = new HashMap<>(); + if (Objects.nonNull(database0)) { + // Custom database properties set via DBPROPERTIES(库的自定义参数) + if (Objects.nonNull(database0.getParameters())) { + properties.putAll(database0.getParameters()); + } + // Append the database common metadata(补充数据库的常用元数据) + Optional.ofNullable(database0.getLocationUri()) + .filter(StringUtils::isNotBlank) + .ifPresent(location -> properties.put("location", location)); + Optional.ofNullable(database0.getOwnerName()) + .filter(StringUtils::isNotBlank) + .ifPresent(owner -> properties.put("owner", owner)); + Optional.ofNullable(database0.getOwnerType()) + .map(Enum::name) + .ifPresent(ownerType -> properties.put("ownerType", ownerType)); + Optional.ofNullable(database0.getName()) + .filter(StringUtils::isNotBlank) + .ifPresent(name -> properties.put("name", name)); + Optional.ofNullable(database0.getDescription()) + .filter(StringUtils::isNotBlank) + .ifPresent(desc -> properties.put("description", desc)); + } + return properties; + } Table rawTable = connection.getClient().getTable(database, table); return new HashMap<>((Map) rawTable.getMetadata()); } catch (Exception e) { @@ -230,6 +291,43 @@ public Map queryTableProps( } } + /** + * Whether the given table name is the database-level placeholder, which is used to request + * database properties instead of table ones when some clients do not allow an empty table name + * (判断给定表名是否为库级占位符;部分终端不允许传空表名,故用占位表名代表查询库参数) + * + * @param table table name + * @return true if it is the placeholder + */ + private boolean isDbDefaultPlaceholder(String table) { + return DB_DEFAULT_PLACEHOLDER_TABLE.equals(table); + } + + /** + * Check whether the placeholder table really exists, so that we never silently fall back to the + * database query when a real table with the same name exists(校验占位表是否真实存在,避免真实存在同名表时被误当作占位符而漏查表参数) + * + * @param connection hive connection + * @param database database name + * @param table placeholder table name + * @return true if a real table with the placeholder name exists + */ + private boolean placeholderTableExists(HiveConnection connection, String database, String table) { + try { + // throwException=false -> returns null when the table is missing, no exception thrown + Table rawTable = connection.getClient().getTable(database, table, false); + return Objects.nonNull(rawTable); + } catch (Exception e) { + // If the existence cannot be determined, treat it as existing to be safe(无法判定时按存在处理,走表查询更安全) + LOG.warn( + "Fail to check placeholder table existence:[" + + table + + "], treat as existing(判定占位表是否存在失败,按存在处理)", + e); + return true; + } + } + @Override public Map queryPartitionProps( HiveConnection connection, String database, String table, String partition) { diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/OscarMetaService.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/OscarMetaService.java new file mode 100644 index 00000000000..15dee75a85f --- /dev/null +++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/OscarMetaService.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.metadata.query.service; + +import org.apache.linkis.datasourcemanager.common.util.json.Json; +import org.apache.linkis.metadata.query.common.domain.MetaColumnInfo; +import org.apache.linkis.metadata.query.common.service.AbstractDbMetaService; +import org.apache.linkis.metadata.query.common.service.MetadataConnection; +import org.apache.linkis.metadata.query.service.conf.SqlParamsMapper; +import org.apache.linkis.metadata.query.service.oscar.SqlConnection; + +import java.sql.SQLException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class OscarMetaService extends AbstractDbMetaService { + + @Override + public MetadataConnection getConnection( + String operator, Map params) throws Exception { + String host = + String.valueOf(params.getOrDefault(SqlParamsMapper.PARAM_SQL_HOST.getValue(), "")); + Integer port = + (Double.valueOf( + String.valueOf(params.getOrDefault(SqlParamsMapper.PARAM_SQL_PORT.getValue(), 0)))) + .intValue(); + String username = + String.valueOf(params.getOrDefault(SqlParamsMapper.PARAM_SQL_USERNAME.getValue(), "")); + String password = + String.valueOf(params.getOrDefault(SqlParamsMapper.PARAM_SQL_PASSWORD.getValue(), "")); + + String database = + String.valueOf(params.getOrDefault(SqlParamsMapper.PARAM_SQL_DATABASE.getValue(), "")); + Map extraParams = new HashMap<>(); + Object sqlParamObj = params.get(SqlParamsMapper.PARAM_SQL_EXTRA_PARAMS.getValue()); + if (null != sqlParamObj) { + if (!(sqlParamObj instanceof Map)) { + extraParams = + Json.fromJson(String.valueOf(sqlParamObj), Map.class, String.class, Object.class); + } else { + extraParams = (Map) sqlParamObj; + } + } + assert extraParams != null; + return new MetadataConnection<>( + new SqlConnection(host, port, username, password, database, extraParams)); + } + + @Override + public List queryDatabases(SqlConnection connection) { + try { + return connection.getAllDatabases(); + } catch (SQLException e) { + throw new RuntimeException("Fail to get Sql databases(获取数据库列表失败)", e); + } + } + + @Override + public List queryTables(SqlConnection connection, String schemaname) { + try { + return connection.getAllTables(schemaname); + } catch (SQLException e) { + throw new RuntimeException("Fail to get Sql tables(获取表列表失败)", e); + } + } + + @Override + public List queryColumns( + SqlConnection connection, String schemaname, String table) { + try { + return connection.getColumns(schemaname, table); + } catch (SQLException | ClassNotFoundException e) { + throw new RuntimeException("Fail to get Sql columns(获取字段列表失败)", e); + } + } +} diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/db2/SqlConnection.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/db2/SqlConnection.java index 5e711511cb1..27019ba0de7 100644 --- a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/db2/SqlConnection.java +++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/db2/SqlConnection.java @@ -113,16 +113,17 @@ public List getAllTables(String tabschema) throws SQLException { public List getColumns(String schemaname, String table) throws SQLException, ClassNotFoundException { List columns = new ArrayList<>(); - // String columnSql = "SELECT * FROM syscat.columns WHERE TABSCHEMA = '" + schemaname - // + "' AND TABNAME = '" + table + "'"; - String columnSql = "SELECT * FROM " + schemaname + "." + table + " WHERE 1 = 2"; + // Quote the table identifier with double quotes so that the original case is preserved. + // For tables created with a quoted lowercase name, e.g. DB2INST1."db2hive04", the unquoted + // identifier would be folded to uppercase by DB2 and fail to match. + // (对表标识符加双引号以保留原始大小写;带引号建的小写表名如 DB2INST1."db2hive04", + // 不加引号会被 DB2 折叠为大写导致找不到表) + String columnSql = "SELECT * FROM " + schemaname + ".\"" + table + "\" WHERE 1 = 2"; PreparedStatement ps = null; ResultSet rs = null; ResultSetMetaData meta = null; try { - // List primaryKeys = getPrimaryKeys(getDBConnection(connectMessage, - // schemaname), table); - List primaryKeys = getPrimaryKeys(conn, table); + List primaryKeys = getPrimaryKeys(schemaname, table); ps = conn.prepareStatement(columnSql); rs = ps.executeQuery(); meta = rs.getMetaData(); @@ -144,30 +145,36 @@ public List getColumns(String schemaname, String table) } /** - * Get primary keys + * Get primary key column names by querying SYSCAT.KEYCOLUSE directly. * - * @param connection connection + *

The JDBC {@link DatabaseMetaData#getPrimaryKeys} is backed by the {@code + * SYSIBM.SQLPRIMARYKEYS} procedure, which is unreliable for quoted lowercase table names because + * the table name may be folded to uppercase before matching. Querying {@code SYSCAT.KEYCOLUSE} + * with the exact case stored in the catalog avoids this issue. (JDBC getPrimaryKeys 底层走 + * SYSIBM.SQLPRIMARYKEYS,对带引号的小写表名不可靠; 直接查 SYSCAT.KEYCOLUSE,按库中存储的真实大小写匹配主键列) + * + * @param schemaname schema name * @param table table name - * @return + * @return primary key column names * @throws SQLException */ - private List getPrimaryKeys(Connection connection, String table) throws SQLException { - ResultSet rs = null; + private List getPrimaryKeys(String schemaname, String table) throws SQLException { List primaryKeys = new ArrayList<>(); + PreparedStatement ps = null; + ResultSet rs = null; try { - DatabaseMetaData dbMeta = connection.getMetaData(); - rs = dbMeta.getPrimaryKeys(null, null, table); + ps = + conn.prepareStatement( + "SELECT COLNAME FROM SYSCAT.KEYCOLUSE WHERE TABSCHEMA = ? AND TABNAME = ? ORDER BY COLSEQ"); + ps.setString(1, schemaname); + ps.setString(2, table); + rs = ps.executeQuery(); while (rs.next()) { - primaryKeys.add(rs.getString("column_name")); + primaryKeys.add(rs.getString("COLNAME")); } return primaryKeys; } finally { - if (null != rs) { - rs.close(); - } - // if(null != rs){ - // closeResource(connection, null, rs); - // } + closeResource(null, ps, rs); } } diff --git a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/oscar/SqlConnection.java b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/oscar/SqlConnection.java new file mode 100644 index 00000000000..8fb3aa2be46 --- /dev/null +++ b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/oscar/SqlConnection.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.metadata.query.service.oscar; + +import org.apache.linkis.common.conf.CommonVars; +import org.apache.linkis.common.utils.AESUtils; +import org.apache.linkis.metadata.query.common.domain.MetaColumnInfo; + +import org.apache.commons.lang3.StringUtils; + +import java.io.Closeable; +import java.io.IOException; +import java.sql.*; +import java.util.*; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SqlConnection implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(SqlConnection.class); + + private static final CommonVars SQL_DRIVER_CLASS = + CommonVars.apply("wds.linkis.server.mdm.service.oscar.driver", "com.oscar.Driver"); + + private static final CommonVars SQL_CONNECT_URL = + CommonVars.apply("wds.linkis.server.mdm.service.oscar.url", "jdbc:oscar://%s:%s/%s"); + + private Connection conn; + + private ConnectMessage connectMessage; + + public SqlConnection( + String host, + Integer port, + String username, + String password, + String database, + Map extraParams) + throws ClassNotFoundException, SQLException { + connectMessage = new ConnectMessage(host, port, username, password, extraParams); + conn = getDBConnection(connectMessage, database); + Statement statement = conn.createStatement(); + statement.close(); + } + + public List getAllDatabases() throws SQLException { + List schemaNames = new ArrayList<>(); + Statement stmt = null; + ResultSet rs = null; + try { + stmt = conn.createStatement(); + rs = stmt.executeQuery("SELECT USERNAME FROM ALL_USERS ORDER BY USERNAME"); + while (rs.next()) { + schemaNames.add(rs.getString(1)); + } + } finally { + closeResource(null, stmt, rs); + } + return schemaNames; + } + + public List getAllTables(String schema) throws SQLException { + List tableNames = new ArrayList<>(); + Statement stmt = null; + ResultSet rs = null; + try { + stmt = conn.createStatement(); + rs = + stmt.executeQuery( + "SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = '" + + schema + + "' ORDER BY TABLE_NAME"); + while (rs.next()) { + tableNames.add(rs.getString(1)); + } + } finally { + closeResource(null, stmt, rs); + } + return tableNames; + } + + public List getColumns(String schema, String table) + throws SQLException, ClassNotFoundException { + List columns = new ArrayList<>(); + String columnSql = "SELECT * FROM \"" + schema + "\".\"" + table + "\" WHERE 1 = 2"; + PreparedStatement ps = null; + ResultSet rs = null; + ResultSetMetaData meta = null; + try { + List primaryKeys = getPrimaryKeys(schema, table); + Map columnCommentMap = getColumnComment(schema, table); + ps = conn.prepareStatement(columnSql); + rs = ps.executeQuery(); + meta = rs.getMetaData(); + int columnCount = meta.getColumnCount(); + for (int i = 1; i < columnCount + 1; i++) { + MetaColumnInfo info = new MetaColumnInfo(); + info.setIndex(i); + info.setLength(meta.getColumnDisplaySize(i)); + info.setName(meta.getColumnName(i)); + info.setType(meta.getColumnTypeName(i)); + if (primaryKeys.contains(meta.getColumnName(i))) { + info.setPrimaryKey(true); + } + String colComment = columnCommentMap.get(meta.getColumnName(i)); + if (StringUtils.isNotBlank(colComment)) { + info.setColComment(colComment); + } else { + info.setColComment(StringUtils.EMPTY); + } + columns.add(info); + } + } finally { + closeResource(null, ps, rs); + } + return columns; + } + + private List getPrimaryKeys(String schema, String table) throws SQLException { + ResultSet rs = null; + List primaryKeys = new ArrayList<>(); + DatabaseMetaData dbMeta = conn.getMetaData(); + rs = dbMeta.getPrimaryKeys(null, schema, table); + while (rs.next()) { + primaryKeys.add(rs.getString("COLUMN_NAME")); + } + return primaryKeys; + } + + private Map getColumnComment(String schema, String table) throws SQLException { + ResultSet rs = null; + Map columnComment = new HashMap<>(); + DatabaseMetaData dbMeta = conn.getMetaData(); + rs = dbMeta.getColumns(null, schema, table, "%"); + while (rs.next()) { + columnComment.put(rs.getString("COLUMN_NAME"), rs.getString("REMARKS")); + } + return columnComment; + } + + private void closeResource(Connection connection, Statement statement, ResultSet resultSet) { + try { + if (null != resultSet && !resultSet.isClosed()) { + resultSet.close(); + } + if (null != statement && !statement.isClosed()) { + statement.close(); + } + if (null != connection && !connection.isClosed()) { + connection.close(); + } + } catch (SQLException e) { + LOG.warn("Fail to release resource [" + e.getMessage() + "]", e); + } + } + + @Override + public void close() throws IOException { + closeResource(conn, null, null); + } + + private Connection getDBConnection(ConnectMessage connectMessage, String database) + throws ClassNotFoundException, SQLException { + String extraParamString = + connectMessage.extraParams.entrySet().stream() + .map(e -> String.join("=", e.getKey(), String.valueOf(e.getValue()))) + .collect(Collectors.joining("&")); + Class.forName(SQL_DRIVER_CLASS.getValue()); + String url = + String.format( + SQL_CONNECT_URL.getValue(), connectMessage.host, connectMessage.port, database); + if (!connectMessage.extraParams.isEmpty()) { + url += "?" + extraParamString; + } + try { + Properties prop = new Properties(); + prop.put("user", connectMessage.username); + prop.put("password", AESUtils.isDecryptByConf(connectMessage.password)); + return DriverManager.getConnection(url, prop); + } catch (Exception e) { + LOG.error("Fail to create Oscar connection", e); + throw e; + } + } + + private static class ConnectMessage { + private String host; + private Integer port; + private String username; + private String password; + private Map extraParams; + + public ConnectMessage( + String host, + Integer port, + String username, + String password, + Map extraParams) { + this.host = host; + this.port = port; + this.username = username; + this.password = password; + this.extraParams = extraParams; + } + } +} diff --git a/linkis-public-enhancements/linkis-pes-common/src/main/java/org/apache/linkis/metadata/query/common/cache/CacheConfiguration.java b/linkis-public-enhancements/linkis-pes-common/src/main/java/org/apache/linkis/metadata/query/common/cache/CacheConfiguration.java index bfff0fe92e1..3fce2d5eea1 100644 --- a/linkis-public-enhancements/linkis-pes-common/src/main/java/org/apache/linkis/metadata/query/common/cache/CacheConfiguration.java +++ b/linkis-public-enhancements/linkis-pes-common/src/main/java/org/apache/linkis/metadata/query/common/cache/CacheConfiguration.java @@ -34,7 +34,7 @@ public class CacheConfiguration { public static final CommonVars MYSQL_RELATIONSHIP_LIST = CommonVars.apply( "wds.linkis.server.mdq.mysql.relationship", - "mysql,oracle,kingbase,postgresql,sqlserver,db2,greenplum,dm,doris,clickhouse,tidb,starrocks,gaussdb,oceanbase"); + "mysql,oracle,kingbase,postgresql,sqlserver,db2,greenplum,dm,doris,clickhouse,tidb,starrocks,gaussdb,oceanbase,oscar"); public static final CommonVars QUERY_DATABASE_RELATIONSHIP = CommonVars.apply( diff --git a/linkis-public-enhancements/linkis-pes-common/src/main/java/org/apache/linkis/metadata/query/common/service/AbstractDbMetaService.java b/linkis-public-enhancements/linkis-pes-common/src/main/java/org/apache/linkis/metadata/query/common/service/AbstractDbMetaService.java index f2108eafd2f..2ed0aeaaf16 100644 --- a/linkis-public-enhancements/linkis-pes-common/src/main/java/org/apache/linkis/metadata/query/common/service/AbstractDbMetaService.java +++ b/linkis-public-enhancements/linkis-pes-common/src/main/java/org/apache/linkis/metadata/query/common/service/AbstractDbMetaService.java @@ -79,6 +79,13 @@ public List getColumns( return this.getConnAndRun(operator, params, conn -> this.queryColumns(conn, database, table)); } + @Override + public boolean existsTable( + String operator, Map params, String database, String table) { + return this.getConnAndRun( + operator, params, conn -> this.queryExistsTable(conn, database, table)); + } + @Override public Map getPartitionProps( String operator, @@ -158,6 +165,18 @@ public List queryColumns(C connection, String database, String t throw new WarnException(-1, "This method is no supported"); } + /** + * Check if the table exists by connection, database and table + * + * @param connection metadata connection + * @param database database + * @param table table + * @return true if the table exists + */ + public boolean queryExistsTable(C connection, String database, String table) { + throw new WarnException(-1, "This method is no supported"); + } + /** * Get the properties of partition * diff --git a/linkis-public-enhancements/linkis-pes-common/src/main/java/org/apache/linkis/metadata/query/common/service/MetadataDbService.java b/linkis-public-enhancements/linkis-pes-common/src/main/java/org/apache/linkis/metadata/query/common/service/MetadataDbService.java index 58ef7984c4c..e175c3931a0 100644 --- a/linkis-public-enhancements/linkis-pes-common/src/main/java/org/apache/linkis/metadata/query/common/service/MetadataDbService.java +++ b/linkis-public-enhancements/linkis-pes-common/src/main/java/org/apache/linkis/metadata/query/common/service/MetadataDbService.java @@ -86,6 +86,17 @@ Map getPartitionProps( List getColumns( String operator, Map params, String database, String table); + /** + * Check if the table exists in the database specified + * + * @param operator operator + * @param params params + * @param database database name + * @param table table name + * @return true if the table exists, false otherwise + */ + boolean existsTable(String operator, Map params, String database, String table); + /** * Get sql connect url *