Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Struct;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -106,7 +107,7 @@ record = recordBuilder.build();
@Override
protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field,
int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException {
if (OracleSourceSchemaReader.ORACLE_TYPES.contains(sqlType) || sqlType == Types.NCLOB) {
if (OracleSourceSchemaReader.ORACLE_TYPES.contains(sqlType) || sqlType == Types.NCLOB || sqlType == Types.STRUCT) {
handleOracleSpecificType(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
} else {
setField(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale);
Expand Down Expand Up @@ -341,6 +342,13 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil
case OracleSourceSchemaReader.LONG_RAW:
recordBuilder.set(field.getName(), resultSet.getBytes(columnIndex));
break;
case Types.STRUCT:
java.sql.Struct structValue = (java.sql.Struct) resultSet.getObject(columnIndex);
if (structValue != null) {
recordBuilder.set(field.getName(), convertStructToRecord(structValue, nonNullSchema,
resultSet.getStatement().getConnection()));
}
break;
case Types.DECIMAL:
case Types.NUMERIC:
// This is the only way to differentiate FLOAT/REAL columns from other numeric columns, that based on NUMBER.
Expand Down Expand Up @@ -371,6 +379,46 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil
}
}

private StructuredRecord convertStructToRecord(java.sql.Struct struct, Schema schema,
Connection connection) throws SQLException {
Object[] attributes = struct.getAttributes();
List<Schema.Field> fields = schema.getFields();
StructuredRecord.Builder builder = StructuredRecord.builder(schema);

for (int i = 0; i < fields.size() && i < attributes.length; i++) {
Schema.Field field = fields.get(i);
Object attrValue = attributes[i];

if (attrValue == null) {
builder.set(field.getName(), null);
continue;
}

Schema fieldSchema = field.getSchema().isNullable()
? field.getSchema().getNonNullable() : field.getSchema();

if (attrValue instanceof Struct) {
builder.set(field.getName(), convertStructToRecord((Struct) attrValue, fieldSchema, connection));
} else if (attrValue instanceof java.sql.Date) {
builder.setDate(field.getName(), ((java.sql.Date) attrValue).toLocalDate());
} else if (attrValue instanceof java.sql.Time) {
builder.setTime(field.getName(), ((java.sql.Time) attrValue).toLocalTime());
} else if (attrValue instanceof Timestamp) {
if (Schema.LogicalType.DATETIME.equals(fieldSchema.getLogicalType())) {
builder.setDateTime(field.getName(), ((Timestamp) attrValue).toLocalDateTime());
} else {
builder.setTimestamp(field.getName(),
((Timestamp) attrValue).toInstant().atZone(java.time.ZoneId.of("UTC")));
}
} else if (attrValue instanceof BigDecimal) {
builder.setDecimal(field.getName(), (BigDecimal) attrValue);
} else {
builder.set(field.getName(), attrValue);
}
}
return builder.build();
}

/**
* Get the scale set in Non-nullable schema associated with the schema
* */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -70,6 +75,7 @@ public class OracleSourceSchemaReader extends CommonSchemaReader {
private final Boolean isTimestampOldBehavior;
private final Boolean isPrecisionlessNumAsDecimal;
private final Boolean isTimestampLtzFieldTimestamp;
private Connection connection;

public OracleSourceSchemaReader() {
this(null, false, false, false);
Expand Down Expand Up @@ -136,11 +142,130 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
}
return Schema.decimalOf(precision, scale);
}
case Types.STRUCT:
if (connection == null) {
throw new SQLException("Cannot resolve STRUCT schema without a database connection. "
+ "Use getSchemaFields(ResultSet) to enable STRUCT type resolution.");
}
String typeName = metadata.getColumnTypeName(index);
String oracleSchemaName = metadata.getSchemaName(index);
return getStructSchema(connection, oracleSchemaName, typeName);
default:
return super.getSchema(metadata, index);
}
}

@Override
public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLException {
this.connection = resultSet.getStatement().getConnection();
return super.getSchemaFields(resultSet);
}

/**
* Builds a CDAP RECORD schema for an Oracle STRUCT type by querying the database metadata
* for the type's attributes.
*
* @param connection the database connection
* @param schemaName the Oracle schema owning the type
* @param typeName the Oracle type name (e.g., "ADDRESS_TYPE")
* @return a CDAP RECORD schema with fields corresponding to the STRUCT's attributes
*/
private Schema getStructSchema(Connection connection, String schemaName,
String typeName) throws SQLException {
List<Schema.Field> fields = new ArrayList<>();

String sql = "SELECT * FROM ALL_TYPE_ATTRS WHERE TYPE_NAME = ? ORDER BY ATTR_NO";

try (PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setString(1, typeName.substring(typeName.lastIndexOf('.') + 1));
Comment on lines +177 to +180
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The query to ALL_TYPE_ATTRS should include a filter on the OWNER column. Without it, if multiple schemas define a type with the same name, the query will return attributes for all of them, leading to an incorrect or corrupted schema.

Additionally, the typeName provided by the JDBC driver is often fully qualified (e.g., OWNER.TYPE_NAME). You should parse the owner from the typeName string if present, and fallback to the provided schemaName otherwise.


try (ResultSet attrRs = stmt.executeQuery()) {
while (attrRs.next()) {
String attrName = attrRs.getString("ATTR_NAME");
String attrTypeName = attrRs.getString("ATTR_TYPE_NAME");
int attrSize = attrRs.getInt("PRECISION");
int attrScale = attrRs.getInt("SCALE");

Schema attrSchema = mapPrimitiveOracleType(attrTypeName, attrSize, attrScale);
if (attrSchema != null) {
fields.add(Schema.Field.of(attrName, attrSchema));
} else {
Schema nestedSchema = getStructSchema(connection, schemaName, attrTypeName);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When recursing for nested STRUCT types, the schemaName should be updated to the owner of the attribute's type to ensure the subsequent query filters correctly. This owner can be retrieved from the ATTR_TYPE_OWNER column in the ALL_TYPE_ATTRS result set.

fields.add(Schema.Field.of(attrName, nestedSchema));
}
}
}
}
if (fields.isEmpty()) {
throw new SQLException(String.format(
"No attributes found for Oracle STRUCT type '%s.%s'. "
+ "Ensure the type exists and is accessible.",
schemaName, typeName));
}

return Schema.recordOf(typeName, fields);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

CDAP Schema.recordOf requires the record name to be a valid identifier (matching [A-Za-z_][A-Za-z0-9_]*). If typeName is fully qualified (e.g., MY_SCHEMA.MY_TYPE), this call will throw an IllegalArgumentException due to the dot. Consider using only the simple type name part for the record name.

    String recordName = typeName.contains(".") ? typeName.substring(typeName.lastIndexOf('.') + 1) : typeName;
    return Schema.recordOf(recordName, fields);

}

private Schema mapPrimitiveOracleType(String typeName, int precision, int scale) {
switch (typeName) {
case "TIMESTAMP WITH TZ":
return isTimestampOldBehavior ? Schema.of(Schema.Type.STRING) : Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
case "TIMESTAMP WITH LTZ":
return getTimestampLtzSchema();
case "TIMESTAMP":
return Schema.of(Schema.LogicalType.DATETIME);
case "DATE" :
return Schema.of(Schema.LogicalType.DATE);
case "BINARY FLOAT":
case "FLOAT":
return Schema.of(Schema.Type.FLOAT);
case "BINARY DOUBLE":
case "DOUBLE":
return Schema.of(Schema.Type.DOUBLE);
case "BFILE":
case "RAW":
case "LONG RAW":
return Schema.of(Schema.Type.BYTES);
case "INTERVAL DAY TO SECOND":
case "INTERVAL YEAR TO MONTH":
case "VARCHAR2":
case "VARCHAR":
case "CHAR":
case "CLOB":
case "BLOB":
case "LONG":
return Schema.of(Schema.Type.STRING);
case "INTEGER":
return Schema.of(Schema.Type.INT);
case "NUMBER":
case "DECIMAL":
// FLOAT and REAL are returned as java.sql.Types.NUMERIC but with value that is a java.lang.Double
if (Double.class.getTypeName().equals(typeName)) {
return Schema.of(Schema.Type.DOUBLE);
Comment on lines +211 to +244
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There are two issues in mapPrimitiveOracleType:

  1. Type Name Mismatches: The strings used in the switch (e.g., "TIMESTAMP WITH TZ", "BINARY FLOAT") likely do not match the values returned by Oracle in ALL_TYPE_ATTRS.ATTR_TYPE_NAME. For example, Oracle typically uses "TIMESTAMP WITH TIME ZONE", "BINARY_FLOAT", and "BINARY_DOUBLE". Please verify these against the Oracle metadata documentation.
  2. Logic Error: At line 243, Double.class.getTypeName().equals(typeName) compares the Java class name string ("java.lang.Double") with the SQL type name (e.g., "NUMBER"), which will always be false. Since metadata is not available here to check the column class, this check should probably be removed or replaced with a check on the SQL type name if applicable.

} else {
if (precision == 0) {
if (isPrecisionlessNumAsDecimal) {
precision = 38;
scale = 0;
LOG.warn(String.format("%s type with undefined precision and scale is detected, "
+ "there may be a precision loss while running the pipeline. "
+ "Please define an output precision and scale for field to avoid "
+ "precision loss.", typeName));
return Schema.decimalOf(precision, scale);
} else {
LOG.warn(String.format("%s type without precision and scale, "
+ "converting into STRING type to avoid any precision loss.",
typeName));
return Schema.of(Schema.Type.STRING);
}
}
return Schema.decimalOf(precision, scale);
}
default:
return null;
}
}

private @NotNull Schema getTimestampLtzSchema() {
return isTimestampOldBehavior || isTimestampLtzFieldTimestamp
? Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.List;

public class OracleSchemaReaderTest {
Expand All @@ -37,6 +41,12 @@ public void getSchema_timestampLTZFieldTrue_returnTimestamp() throws SQLExceptio

ResultSet resultSet = Mockito.mock(ResultSet.class);
ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class);
Statement statement = Mockito.mock(Statement.class);
Connection connection = Mockito.mock(Connection.class);

Mockito.when(resultSet.getMetaData()).thenReturn(metadata);
Mockito.when(resultSet.getStatement()).thenReturn(statement);
Mockito.when(statement.getConnection()).thenReturn(connection);

Mockito.when(resultSet.getMetaData()).thenReturn(metadata);

Expand Down Expand Up @@ -68,9 +78,12 @@ public void getSchema_timestampLTZFieldFalse_returnDatetime() throws SQLExceptio

ResultSet resultSet = Mockito.mock(ResultSet.class);
ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class);
Statement statement = Mockito.mock(Statement.class);
Connection connection = Mockito.mock(Connection.class);

Mockito.when(resultSet.getMetaData()).thenReturn(metadata);

Mockito.when(resultSet.getStatement()).thenReturn(statement);
Mockito.when(statement.getConnection()).thenReturn(connection);
Mockito.when(metadata.getColumnCount()).thenReturn(2);
// -101 is for TIMESTAMP_TZ
Mockito.when(metadata.getColumnType(1)).thenReturn(-101);
Expand All @@ -91,4 +104,52 @@ public void getSchema_timestampLTZFieldFalse_returnDatetime() throws SQLExceptio
Assert.assertEquals(expectedSchemaFields.get(1).getName(), actualSchemaFields.get(1).getName());
Assert.assertEquals(expectedSchemaFields.get(1).getSchema(), actualSchemaFields.get(1).getSchema());
}

@Test
public void getSchemaFields_structType_returnRecord() throws SQLException {
OracleSourceSchemaReader schemaReader = new OracleSourceSchemaReader();

ResultSet resultSet = Mockito.mock(ResultSet.class);
ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class);
Statement statement = Mockito.mock(Statement.class);
Connection connection = Mockito.mock(Connection.class);
PreparedStatement stmt = Mockito.mock(PreparedStatement.class);
ResultSet attrRs = Mockito.mock(ResultSet.class);

Mockito.when(resultSet.getMetaData()).thenReturn(metadata);
Mockito.when(resultSet.getStatement()).thenReturn(statement);
Mockito.when(statement.getConnection()).thenReturn(connection);
Mockito.when(connection.prepareStatement(Mockito.anyString())).thenReturn(stmt);
Mockito.when(stmt.executeQuery()).thenReturn(attrRs);

// One STRUCT column
Mockito.when(metadata.getColumnCount()).thenReturn(1);
Mockito.when(metadata.getColumnType(1)).thenReturn(Types.STRUCT);
Mockito.when(metadata.getColumnName(1)).thenReturn("address");
Mockito.when(metadata.getColumnTypeName(1)).thenReturn("ADDRESS_TYPE");
Mockito.when(metadata.getSchemaName(1)).thenReturn("TEST_SCHEMA");

// Mock ALL_TYPE_ATTRS for ADDRESS_TYPE with two VARCHAR2 attributes
Mockito.when(attrRs.next()).thenReturn(true, true, false);
Mockito.when(attrRs.getString("ATTR_NAME")).thenReturn("STREET", "CITY");
Mockito.when(attrRs.getString("ATTR_TYPE_NAME")).thenReturn("VARCHAR2", "VARCHAR2");
Mockito.when(attrRs.getInt("PRECISION")).thenReturn(0, 0);
Mockito.when(attrRs.getInt("SCALE")).thenReturn(0, 0);

List<Schema.Field> actualFields = schemaReader.getSchemaFields(resultSet);

Assert.assertEquals(1, actualFields.size());
Schema.Field addressField = actualFields.get(0);
Assert.assertEquals("address", addressField.getName());

Schema addressSchema = addressField.getSchema().isNullable()
? addressField.getSchema().getNonNullable() : addressField.getSchema();
Assert.assertEquals(Schema.Type.RECORD, addressSchema.getType());
Assert.assertEquals("ADDRESS_TYPE", addressSchema.getRecordName());

List<Schema.Field> structFields = addressSchema.getFields();
Assert.assertEquals(2, structFields.size());
Assert.assertEquals("STREET", structFields.get(0).getName());
Assert.assertEquals("CITY", structFields.get(1).getName());
}
}
Loading