From 231c6ee101607aac17154c98a76e3381243e46a7 Mon Sep 17 00:00:00 2001 From: vanshikaagupta22 Date: Wed, 6 May 2026 09:25:07 +0000 Subject: [PATCH] Provide Struct datat type support for oracle plugin --- .../plugin/oracle/OracleSourceDBRecord.java | 50 ++++++- .../oracle/OracleSourceSchemaReader.java | 125 ++++++++++++++++++ .../plugin/oracle/OracleSchemaReaderTest.java | 63 ++++++++- .../oracle/OracleSourceDBRecordUnitTest.java | 116 ++++++++++++++++ 4 files changed, 352 insertions(+), 2 deletions(-) diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java index 44131a01b..d79d67936 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java @@ -35,6 +35,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Struct; import java.sql.Timestamp; import java.sql.Types; import java.time.LocalDateTime; @@ -106,7 +107,7 @@ record = recordBuilder.build(); @Override protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field, int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException { - if (OracleSourceSchemaReader.ORACLE_TYPES.contains(sqlType) || sqlType == Types.NCLOB) { + if (OracleSourceSchemaReader.ORACLE_TYPES.contains(sqlType) || sqlType == Types.NCLOB || sqlType == Types.STRUCT) { handleOracleSpecificType(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale); } else { setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale); @@ -341,6 +342,13 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil case OracleSourceSchemaReader.LONG_RAW: recordBuilder.set(field.getName(), resultSet.getBytes(columnIndex)); break; + case Types.STRUCT: + java.sql.Struct structValue = (java.sql.Struct) resultSet.getObject(columnIndex); + if (structValue != null) { + recordBuilder.set(field.getName(), convertStructToRecord(structValue, nonNullSchema, + resultSet.getStatement().getConnection())); + } + break; case Types.DECIMAL: case Types.NUMERIC: // This is the only way to differentiate FLOAT/REAL columns from other numeric columns, that based on NUMBER. @@ -371,6 +379,46 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil } } + private StructuredRecord convertStructToRecord(java.sql.Struct struct, Schema schema, + Connection connection) throws SQLException { + Object[] attributes = struct.getAttributes(); + List fields = schema.getFields(); + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + + for (int i = 0; i < fields.size() && i < attributes.length; i++) { + Schema.Field field = fields.get(i); + Object attrValue = attributes[i]; + + if (attrValue == null) { + builder.set(field.getName(), null); + continue; + } + + Schema fieldSchema = field.getSchema().isNullable() + ? field.getSchema().getNonNullable() : field.getSchema(); + + if (attrValue instanceof Struct) { + builder.set(field.getName(), convertStructToRecord((Struct) attrValue, fieldSchema, connection)); + } else if (attrValue instanceof java.sql.Date) { + builder.setDate(field.getName(), ((java.sql.Date) attrValue).toLocalDate()); + } else if (attrValue instanceof java.sql.Time) { + builder.setTime(field.getName(), ((java.sql.Time) attrValue).toLocalTime()); + } else if (attrValue instanceof Timestamp) { + if (Schema.LogicalType.DATETIME.equals(fieldSchema.getLogicalType())) { + builder.setDateTime(field.getName(), ((Timestamp) attrValue).toLocalDateTime()); + } else { + builder.setTimestamp(field.getName(), + ((Timestamp) attrValue).toInstant().atZone(java.time.ZoneId.of("UTC"))); + } + } else if (attrValue instanceof BigDecimal) { + builder.setDecimal(field.getName(), (BigDecimal) attrValue); + } else { + builder.set(field.getName(), attrValue); + } + } + return builder.build(); + } + /** * Get the scale set in Non-nullable schema associated with the schema * */ diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java index 208b70410..595ac26d9 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java @@ -23,9 +23,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Types; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import javax.annotation.Nullable; @@ -70,6 +75,7 @@ public class OracleSourceSchemaReader extends CommonSchemaReader { private final Boolean isTimestampOldBehavior; private final Boolean isPrecisionlessNumAsDecimal; private final Boolean isTimestampLtzFieldTimestamp; + private Connection connection; public OracleSourceSchemaReader() { this(null, false, false, false); @@ -136,11 +142,130 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti } return Schema.decimalOf(precision, scale); } + case Types.STRUCT: + if (connection == null) { + throw new SQLException("Cannot resolve STRUCT schema without a database connection. " + + "Use getSchemaFields(ResultSet) to enable STRUCT type resolution."); + } + String typeName = metadata.getColumnTypeName(index); + String oracleSchemaName = metadata.getSchemaName(index); + return getStructSchema(connection, oracleSchemaName, typeName); default: return super.getSchema(metadata, index); } } + @Override + public List getSchemaFields(ResultSet resultSet) throws SQLException { + this.connection = resultSet.getStatement().getConnection(); + return super.getSchemaFields(resultSet); + } + + /** + * Builds a CDAP RECORD schema for an Oracle STRUCT type by querying the database metadata + * for the type's attributes. + * + * @param connection the database connection + * @param schemaName the Oracle schema owning the type + * @param typeName the Oracle type name (e.g., "ADDRESS_TYPE") + * @return a CDAP RECORD schema with fields corresponding to the STRUCT's attributes + */ + private Schema getStructSchema(Connection connection, String schemaName, + String typeName) throws SQLException { + List fields = new ArrayList<>(); + + String sql = "SELECT * FROM ALL_TYPE_ATTRS WHERE TYPE_NAME = ? ORDER BY ATTR_NO"; + + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setString(1, typeName.substring(typeName.lastIndexOf('.') + 1)); + + try (ResultSet attrRs = stmt.executeQuery()) { + while (attrRs.next()) { + String attrName = attrRs.getString("ATTR_NAME"); + String attrTypeName = attrRs.getString("ATTR_TYPE_NAME"); + int attrSize = attrRs.getInt("PRECISION"); + int attrScale = attrRs.getInt("SCALE"); + + Schema attrSchema = mapPrimitiveOracleType(attrTypeName, attrSize, attrScale); + if (attrSchema != null) { + fields.add(Schema.Field.of(attrName, attrSchema)); + } else { + Schema nestedSchema = getStructSchema(connection, schemaName, attrTypeName); + fields.add(Schema.Field.of(attrName, nestedSchema)); + } + } + } + } + if (fields.isEmpty()) { + throw new SQLException(String.format( + "No attributes found for Oracle STRUCT type '%s.%s'. " + + "Ensure the type exists and is accessible.", + schemaName, typeName)); + } + + return Schema.recordOf(typeName, fields); + } + + private Schema mapPrimitiveOracleType(String typeName, int precision, int scale) { + switch (typeName) { + case "TIMESTAMP WITH TZ": + return isTimestampOldBehavior ? Schema.of(Schema.Type.STRING) : Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); + case "TIMESTAMP WITH LTZ": + return getTimestampLtzSchema(); + case "TIMESTAMP": + return Schema.of(Schema.LogicalType.DATETIME); + case "DATE" : + return Schema.of(Schema.LogicalType.DATE); + case "BINARY FLOAT": + case "FLOAT": + return Schema.of(Schema.Type.FLOAT); + case "BINARY DOUBLE": + case "DOUBLE": + return Schema.of(Schema.Type.DOUBLE); + case "BFILE": + case "RAW": + case "LONG RAW": + return Schema.of(Schema.Type.BYTES); + case "INTERVAL DAY TO SECOND": + case "INTERVAL YEAR TO MONTH": + case "VARCHAR2": + case "VARCHAR": + case "CHAR": + case "CLOB": + case "BLOB": + case "LONG": + return Schema.of(Schema.Type.STRING); + case "INTEGER": + return Schema.of(Schema.Type.INT); + case "NUMBER": + case "DECIMAL": + // FLOAT and REAL are returned as java.sql.Types.NUMERIC but with value that is a java.lang.Double + if (Double.class.getTypeName().equals(typeName)) { + return Schema.of(Schema.Type.DOUBLE); + } else { + if (precision == 0) { + if (isPrecisionlessNumAsDecimal) { + precision = 38; + scale = 0; + LOG.warn(String.format("%s type with undefined precision and scale is detected, " + + "there may be a precision loss while running the pipeline. " + + "Please define an output precision and scale for field to avoid " + + "precision loss.", typeName)); + return Schema.decimalOf(precision, scale); + } else { + LOG.warn(String.format("%s type without precision and scale, " + + "converting into STRING type to avoid any precision loss.", + typeName)); + return Schema.of(Schema.Type.STRING); + } + } + return Schema.decimalOf(precision, scale); + } + default: + return null; + } + } + private @NotNull Schema getTimestampLtzSchema() { return isTimestampOldBehavior || isTimestampLtzFieldTimestamp ? Schema.of(Schema.LogicalType.TIMESTAMP_MICROS) diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java index 1ff77c533..c38e03acc 100644 --- a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java @@ -24,9 +24,13 @@ import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; import java.util.List; public class OracleSchemaReaderTest { @@ -37,6 +41,12 @@ public void getSchema_timestampLTZFieldTrue_returnTimestamp() throws SQLExceptio ResultSet resultSet = Mockito.mock(ResultSet.class); ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Statement statement = Mockito.mock(Statement.class); + Connection connection = Mockito.mock(Connection.class); + + Mockito.when(resultSet.getMetaData()).thenReturn(metadata); + Mockito.when(resultSet.getStatement()).thenReturn(statement); + Mockito.when(statement.getConnection()).thenReturn(connection); Mockito.when(resultSet.getMetaData()).thenReturn(metadata); @@ -68,9 +78,12 @@ public void getSchema_timestampLTZFieldFalse_returnDatetime() throws SQLExceptio ResultSet resultSet = Mockito.mock(ResultSet.class); ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Statement statement = Mockito.mock(Statement.class); + Connection connection = Mockito.mock(Connection.class); Mockito.when(resultSet.getMetaData()).thenReturn(metadata); - + Mockito.when(resultSet.getStatement()).thenReturn(statement); + Mockito.when(statement.getConnection()).thenReturn(connection); Mockito.when(metadata.getColumnCount()).thenReturn(2); // -101 is for TIMESTAMP_TZ Mockito.when(metadata.getColumnType(1)).thenReturn(-101); @@ -91,4 +104,52 @@ public void getSchema_timestampLTZFieldFalse_returnDatetime() throws SQLExceptio Assert.assertEquals(expectedSchemaFields.get(1).getName(), actualSchemaFields.get(1).getName()); Assert.assertEquals(expectedSchemaFields.get(1).getSchema(), actualSchemaFields.get(1).getSchema()); } + + @Test + public void getSchemaFields_structType_returnRecord() throws SQLException { + OracleSourceSchemaReader schemaReader = new OracleSourceSchemaReader(); + + ResultSet resultSet = Mockito.mock(ResultSet.class); + ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Statement statement = Mockito.mock(Statement.class); + Connection connection = Mockito.mock(Connection.class); + PreparedStatement stmt = Mockito.mock(PreparedStatement.class); + ResultSet attrRs = Mockito.mock(ResultSet.class); + + Mockito.when(resultSet.getMetaData()).thenReturn(metadata); + Mockito.when(resultSet.getStatement()).thenReturn(statement); + Mockito.when(statement.getConnection()).thenReturn(connection); + Mockito.when(connection.prepareStatement(Mockito.anyString())).thenReturn(stmt); + Mockito.when(stmt.executeQuery()).thenReturn(attrRs); + + // One STRUCT column + Mockito.when(metadata.getColumnCount()).thenReturn(1); + Mockito.when(metadata.getColumnType(1)).thenReturn(Types.STRUCT); + Mockito.when(metadata.getColumnName(1)).thenReturn("address"); + Mockito.when(metadata.getColumnTypeName(1)).thenReturn("ADDRESS_TYPE"); + Mockito.when(metadata.getSchemaName(1)).thenReturn("TEST_SCHEMA"); + + // Mock ALL_TYPE_ATTRS for ADDRESS_TYPE with two VARCHAR2 attributes + Mockito.when(attrRs.next()).thenReturn(true, true, false); + Mockito.when(attrRs.getString("ATTR_NAME")).thenReturn("STREET", "CITY"); + Mockito.when(attrRs.getString("ATTR_TYPE_NAME")).thenReturn("VARCHAR2", "VARCHAR2"); + Mockito.when(attrRs.getInt("PRECISION")).thenReturn(0, 0); + Mockito.when(attrRs.getInt("SCALE")).thenReturn(0, 0); + + List actualFields = schemaReader.getSchemaFields(resultSet); + + Assert.assertEquals(1, actualFields.size()); + Schema.Field addressField = actualFields.get(0); + Assert.assertEquals("address", addressField.getName()); + + Schema addressSchema = addressField.getSchema().isNullable() + ? addressField.getSchema().getNonNullable() : addressField.getSchema(); + Assert.assertEquals(Schema.Type.RECORD, addressSchema.getType()); + Assert.assertEquals("ADDRESS_TYPE", addressSchema.getRecordName()); + + List structFields = addressSchema.getFields(); + Assert.assertEquals(2, structFields.size()); + Assert.assertEquals("STREET", structFields.get(0).getName()); + Assert.assertEquals("CITY", structFields.get(1).getName()); + } } diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java index 77136e841..61d644b54 100644 --- a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java @@ -25,6 +25,7 @@ import org.mockito.junit.MockitoJUnitRunner; import java.math.BigDecimal; +import java.sql.Date; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.Timestamp; @@ -234,4 +235,119 @@ public void validateTimestampTZTypeNullHandling() throws Exception { StructuredRecord record = builder.build(); Assert.assertNull(record.get("field1")); } + + @Test + public void validateStructHandling() throws Exception { + Schema streetFieldSchema = Schema.of(Schema.Type.STRING); + Schema cityFieldSchema = Schema.of(Schema.Type.STRING); + Schema addressStructSchema = Schema.recordOf("ADDRESS_TYPE", + Schema.Field.of("STREET", streetFieldSchema), + Schema.Field.of("CITY", cityFieldSchema) + ); + Schema.Field addressField = Schema.Field.of("address", addressStructSchema); + Schema schema = Schema.recordOf("dbRecord", addressField); + java.sql.Struct structMock = org.mockito.Mockito.mock(java.sql.Struct.class); + Object[] attributes = new Object[] { "123 Main St", "San Jose" }; + + when(structMock.getAttributes()).thenReturn(attributes); + java.sql.Statement statementMock = org.mockito.Mockito.mock(java.sql.Statement.class); + java.sql.Connection connectionMock = org.mockito.Mockito.mock(java.sql.Connection.class); + when(resultSet.getStatement()).thenReturn(statementMock); + when(statementMock.getConnection()).thenReturn(connectionMock); + when(resultSet.getObject(eq(1))).thenReturn(structMock); + + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + OracleSourceDBRecord dbRecord = new OracleSourceDBRecord(null, null); + dbRecord.handleField(resultSet, builder, addressField, 1, Types.STRUCT, 0, 0); + StructuredRecord record = builder.build(); + StructuredRecord addressRecord = record.get("address"); + + Assert.assertNotNull(addressRecord); + Assert.assertEquals("123 Main St", addressRecord.get("STREET")); + Assert.assertEquals("San Jose", addressRecord.get("CITY")); + } + + @Test + public void validateNestedStructHandling() throws Exception { + Schema streetFieldSchema = Schema.of(Schema.Type.STRING); + Schema cityFieldSchema = Schema.of(Schema.Type.STRING); + Schema addressStructSchema = Schema.recordOf("ADDRESS_TYPE", + Schema.Field.of("STREET", streetFieldSchema), + Schema.Field.of("CITY", cityFieldSchema) + ); + Schema personStructSchema = Schema.recordOf("PERSON_TYPE", + Schema.Field.of("NAME", Schema.of(Schema.Type.STRING)), + Schema.Field.of("ADDRESS", addressStructSchema) + ); + Schema.Field personField = Schema.Field.of("person", personStructSchema); + Schema schema = Schema.recordOf("dbRecord", personField); + + java.sql.Struct addressStructMock = org.mockito.Mockito.mock(java.sql.Struct.class); + Object[] addressAttrs = new Object[] { "123 Main St", "San Jose" }; + when(addressStructMock.getAttributes()).thenReturn(addressAttrs); + java.sql.Struct personStructMock = org.mockito.Mockito.mock(java.sql.Struct.class); + Object[] personAttrs = new Object[] { "John Doe", addressStructMock }; + when(personStructMock.getAttributes()).thenReturn(personAttrs); + + java.sql.Statement statementMock = org.mockito.Mockito.mock(java.sql.Statement.class); + java.sql.Connection connectionMock = org.mockito.Mockito.mock(java.sql.Connection.class); + when(resultSet.getStatement()).thenReturn(statementMock); + when(statementMock.getConnection()).thenReturn(connectionMock); + when(resultSet.getObject(eq(1))).thenReturn(personStructMock); + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + OracleSourceDBRecord dbRecord = new OracleSourceDBRecord(null, null); + dbRecord.handleField(resultSet, builder, personField, 1, Types.STRUCT, 0, 0); + StructuredRecord record = builder.build(); + StructuredRecord personRecord = record.get("person"); + + Assert.assertNotNull(personRecord); + Assert.assertEquals("John Doe", personRecord.get("NAME")); + StructuredRecord addressRecord = personRecord.get("ADDRESS"); + Assert.assertNotNull(addressRecord); + Assert.assertEquals("123 Main St", addressRecord.get("STREET")); + Assert.assertEquals("San Jose", addressRecord.get("CITY")); + } + + @Test + public void validatePrimitiveTypesInStruct() throws Exception { + Schema mixStructSchema = Schema.recordOf("MIX_TYPE", + Schema.Field.of("INT_VAL", Schema.of(Schema.Type.INT)), + Schema.Field.of("DECIMAL_VAL", Schema.decimalOf(10, 2)), + Schema.Field.of("DATE_VAL", Schema.of(Schema.LogicalType.DATE)), + Schema.Field.of("DATETIME_VAL", Schema.of(Schema.LogicalType.DATETIME)), + Schema.Field.of("BYTES_VAL", Schema.of(Schema.Type.BYTES)) + ); + + Schema.Field mixField = Schema.Field.of("mix", mixStructSchema); + Schema schema = Schema.recordOf("dbRecord", mixField); + java.sql.Timestamp timestamp = java.sql.Timestamp.valueOf("2026-05-06 10:30:00"); + java.sql.Date sqlDate = java.sql.Date.valueOf("2026-05-06"); + byte[] bytes = new byte[] { 1, 2, 3 }; + java.sql.Struct mixStructMock = org.mockito.Mockito.mock(java.sql.Struct.class); + Object[] mixAttrs = new Object[] { + 123, + new BigDecimal("45.67"), + sqlDate, + timestamp, + bytes + }; + when(mixStructMock.getAttributes()).thenReturn(mixAttrs); + java.sql.Statement statementMock = org.mockito.Mockito.mock(java.sql.Statement.class); + java.sql.Connection connectionMock = org.mockito.Mockito.mock(java.sql.Connection.class); + when(resultSet.getStatement()).thenReturn(statementMock); + when(statementMock.getConnection()).thenReturn(connectionMock); + when(resultSet.getObject(eq(1))).thenReturn(mixStructMock); + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + OracleSourceDBRecord dbRecord = new OracleSourceDBRecord(null, null); + dbRecord.handleField(resultSet, builder, mixField, 1, Types.STRUCT, 0, 0); + StructuredRecord record = builder.build(); + StructuredRecord mixRecord = record.get("mix"); + + Assert.assertNotNull(mixRecord); + Assert.assertEquals(Integer.valueOf(123), mixRecord.get("INT_VAL")); + Assert.assertEquals(new BigDecimal("45.67"), mixRecord.getDecimal("DECIMAL_VAL")); + Assert.assertEquals(sqlDate.toLocalDate(), mixRecord.getDate("DATE_VAL")); + Assert.assertEquals(timestamp.toLocalDateTime(), mixRecord.getDateTime("DATETIME_VAL")); + Assert.assertArrayEquals(bytes, mixRecord.get("BYTES_VAL")); + } }