-
Notifications
You must be signed in to change notification settings - Fork 34
Provide Struct data type support for oracle plugin #659
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,9 +23,14 @@ | |
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.sql.Connection; | ||
| import java.sql.PreparedStatement; | ||
| import java.sql.ResultSet; | ||
| import java.sql.ResultSetMetaData; | ||
| import java.sql.SQLException; | ||
| import java.sql.Types; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
| import javax.annotation.Nullable; | ||
|
|
||
|
|
@@ -70,6 +75,7 @@ public class OracleSourceSchemaReader extends CommonSchemaReader { | |
| private final Boolean isTimestampOldBehavior; | ||
| private final Boolean isPrecisionlessNumAsDecimal; | ||
| private final Boolean isTimestampLtzFieldTimestamp; | ||
| private Connection connection; | ||
|
|
||
| public OracleSourceSchemaReader() { | ||
| this(null, false, false, false); | ||
|
|
@@ -136,11 +142,130 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti | |
| } | ||
| return Schema.decimalOf(precision, scale); | ||
| } | ||
| case Types.STRUCT: | ||
| if (connection == null) { | ||
| throw new SQLException("Cannot resolve STRUCT schema without a database connection. " | ||
| + "Use getSchemaFields(ResultSet) to enable STRUCT type resolution."); | ||
| } | ||
| String typeName = metadata.getColumnTypeName(index); | ||
| String oracleSchemaName = metadata.getSchemaName(index); | ||
| return getStructSchema(connection, oracleSchemaName, typeName); | ||
| default: | ||
| return super.getSchema(metadata, index); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLException { | ||
| this.connection = resultSet.getStatement().getConnection(); | ||
| return super.getSchemaFields(resultSet); | ||
| } | ||
|
|
||
| /** | ||
| * Builds a CDAP RECORD schema for an Oracle STRUCT type by querying the database metadata | ||
| * for the type's attributes. | ||
| * | ||
| * @param connection the database connection | ||
| * @param schemaName the Oracle schema owning the type | ||
| * @param typeName the Oracle type name (e.g., "ADDRESS_TYPE") | ||
| * @return a CDAP RECORD schema with fields corresponding to the STRUCT's attributes | ||
| */ | ||
| private Schema getStructSchema(Connection connection, String schemaName, | ||
| String typeName) throws SQLException { | ||
| List<Schema.Field> fields = new ArrayList<>(); | ||
|
|
||
| String sql = "SELECT * FROM ALL_TYPE_ATTRS WHERE TYPE_NAME = ? ORDER BY ATTR_NO"; | ||
|
|
||
| try (PreparedStatement stmt = connection.prepareStatement(sql)) { | ||
| stmt.setString(1, typeName.substring(typeName.lastIndexOf('.') + 1)); | ||
|
|
||
| try (ResultSet attrRs = stmt.executeQuery()) { | ||
| while (attrRs.next()) { | ||
| String attrName = attrRs.getString("ATTR_NAME"); | ||
| String attrTypeName = attrRs.getString("ATTR_TYPE_NAME"); | ||
| int attrSize = attrRs.getInt("PRECISION"); | ||
| int attrScale = attrRs.getInt("SCALE"); | ||
|
|
||
| Schema attrSchema = mapPrimitiveOracleType(attrTypeName, attrSize, attrScale); | ||
| if (attrSchema != null) { | ||
| fields.add(Schema.Field.of(attrName, attrSchema)); | ||
| } else { | ||
| Schema nestedSchema = getStructSchema(connection, schemaName, attrTypeName); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| fields.add(Schema.Field.of(attrName, nestedSchema)); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| if (fields.isEmpty()) { | ||
| throw new SQLException(String.format( | ||
| "No attributes found for Oracle STRUCT type '%s.%s'. " | ||
| + "Ensure the type exists and is accessible.", | ||
| schemaName, typeName)); | ||
| } | ||
|
|
||
| return Schema.recordOf(typeName, fields); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CDAP String recordName = typeName.contains(".") ? typeName.substring(typeName.lastIndexOf('.') + 1) : typeName;
return Schema.recordOf(recordName, fields); |
||
| } | ||
|
|
||
| private Schema mapPrimitiveOracleType(String typeName, int precision, int scale) { | ||
| switch (typeName) { | ||
| case "TIMESTAMP WITH TZ": | ||
| return isTimestampOldBehavior ? Schema.of(Schema.Type.STRING) : Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); | ||
| case "TIMESTAMP WITH LTZ": | ||
| return getTimestampLtzSchema(); | ||
| case "TIMESTAMP": | ||
| return Schema.of(Schema.LogicalType.DATETIME); | ||
| case "DATE" : | ||
| return Schema.of(Schema.LogicalType.DATE); | ||
| case "BINARY FLOAT": | ||
| case "FLOAT": | ||
| return Schema.of(Schema.Type.FLOAT); | ||
| case "BINARY DOUBLE": | ||
| case "DOUBLE": | ||
| return Schema.of(Schema.Type.DOUBLE); | ||
| case "BFILE": | ||
| case "RAW": | ||
| case "LONG RAW": | ||
| return Schema.of(Schema.Type.BYTES); | ||
| case "INTERVAL DAY TO SECOND": | ||
| case "INTERVAL YEAR TO MONTH": | ||
| case "VARCHAR2": | ||
| case "VARCHAR": | ||
| case "CHAR": | ||
| case "CLOB": | ||
| case "BLOB": | ||
| case "LONG": | ||
| return Schema.of(Schema.Type.STRING); | ||
| case "INTEGER": | ||
| return Schema.of(Schema.Type.INT); | ||
| case "NUMBER": | ||
| case "DECIMAL": | ||
| // FLOAT and REAL are returned as java.sql.Types.NUMERIC but with value that is a java.lang.Double | ||
| if (Double.class.getTypeName().equals(typeName)) { | ||
| return Schema.of(Schema.Type.DOUBLE); | ||
|
Comment on lines
+211
to
+244
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are two issues in
|
||
| } else { | ||
| if (precision == 0) { | ||
| if (isPrecisionlessNumAsDecimal) { | ||
| precision = 38; | ||
| scale = 0; | ||
| LOG.warn(String.format("%s type with undefined precision and scale is detected, " | ||
| + "there may be a precision loss while running the pipeline. " | ||
| + "Please define an output precision and scale for field to avoid " | ||
| + "precision loss.", typeName)); | ||
| return Schema.decimalOf(precision, scale); | ||
| } else { | ||
| LOG.warn(String.format("%s type without precision and scale, " | ||
| + "converting into STRING type to avoid any precision loss.", | ||
| typeName)); | ||
| return Schema.of(Schema.Type.STRING); | ||
| } | ||
| } | ||
| return Schema.decimalOf(precision, scale); | ||
| } | ||
| default: | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| private @NotNull Schema getTimestampLtzSchema() { | ||
| return isTimestampOldBehavior || isTimestampLtzFieldTimestamp | ||
| ? Schema.of(Schema.LogicalType.TIMESTAMP_MICROS) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The query to
ALL_TYPE_ATTRSshould include a filter on theOWNERcolumn. Without it, if multiple schemas define a type with the same name, the query will return attributes for all of them, leading to an incorrect or corrupted schema.Additionally, the
typeNameprovided by the JDBC driver is often fully qualified (e.g.,OWNER.TYPE_NAME). You should parse the owner from thetypeNamestring if present, and fallback to the providedschemaNameotherwise.