diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergColumnStatsConverter.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergColumnStatsConverter.java index 60d4a453b..7fd59dee9 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergColumnStatsConverter.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergColumnStatsConverter.java @@ -27,6 +27,7 @@ import lombok.AccessLevel; import lombok.NoArgsConstructor; +import lombok.extern.log4j.Log4j2; import org.apache.iceberg.Metrics; import org.apache.iceberg.Schema; @@ -40,6 +41,7 @@ import org.apache.xtable.model.stat.Range; /** Column stats extractor for iceberg table format. */ +@Log4j2 @NoArgsConstructor(access = AccessLevel.PRIVATE) public class IcebergColumnStatsConverter { private static final IcebergSchemaExtractor SCHEMA_EXTRACTOR = @@ -67,13 +69,15 @@ public Metrics toIceberg(Schema schema, long totalRowCount, List fie valueCounts.put(fieldId, columnStats.getNumValues()); nullValueCounts.put(fieldId, columnStats.getNumNulls()); Type fieldType = icebergField.type(); - if (columnStats.getRange().getMinValue() != null) { - lowerBounds.put( - fieldId, Conversions.toByteBuffer(fieldType, columnStats.getRange().getMinValue())); + Object minValue = coerceStatValue(fieldType, columnStats.getRange().getMinValue()); + ByteBuffer minBuffer = safeToByteBuffer(fieldType, minValue, field.getPath(), "min"); + if (minBuffer != null) { + lowerBounds.put(fieldId, minBuffer); } - if (columnStats.getRange().getMaxValue() != null) { - upperBounds.put( - fieldId, Conversions.toByteBuffer(fieldType, columnStats.getRange().getMaxValue())); + Object maxValue = coerceStatValue(fieldType, columnStats.getRange().getMaxValue()); + ByteBuffer maxBuffer = safeToByteBuffer(fieldType, maxValue, field.getPath(), "max"); + if (maxBuffer != null) { + upperBounds.put(fieldId, maxBuffer); } }); return new Metrics( @@ -130,4 +134,55 @@ private Object convertFromIcebergValue(Type fieldType, ByteBuffer value) { } return convertedValue; } + + private ByteBuffer safeToByteBuffer(Type fieldType, Object value, String fieldPath, String side) { + if (value == null) { + return null; + } + try { + return Conversions.toByteBuffer(fieldType, value); + } catch (RuntimeException e) { + log.warn( + "Skipping {} bound for field {} due to uncoercible value {} for type {}", + side, + fieldPath, + value, + fieldType, + e); + return null; + } + } + + private Object coerceStatValue(Type fieldType, Object value) { + if (value == null) { + return null; + } + switch (fieldType.typeId()) { + case STRING: + return value.toString(); + case FLOAT: + return value instanceof Number ? ((Number) value).floatValue() : null; + case DOUBLE: + return value instanceof Number ? ((Number) value).doubleValue() : null; + case INTEGER: + return value instanceof Number ? ((Number) value).intValue() : null; + case LONG: + return value instanceof Number ? ((Number) value).longValue() : null; + case DATE: + return value instanceof Number ? ((Number) value).intValue() : null; + case TIME: + case TIMESTAMP: + return value instanceof Number ? ((Number) value).longValue() : null; + case BOOLEAN: + if (value instanceof Boolean) { + return value; + } + if (value instanceof Number) { + return ((Number) value).intValue() != 0; + } + return null; + default: + return value; + } + } } diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergColumnStatsConverter.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergColumnStatsConverter.java index 9154a3d35..73edbe4b9 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergColumnStatsConverter.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergColumnStatsConverter.java @@ -19,6 +19,7 @@ package org.apache.xtable.iceberg; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -382,6 +383,53 @@ public void fromIceberg() { assertEquals(expected, actual); } + @Test + public void testSchemaEvolutionStatsCoercionAndSkip() { + Schema icebergSchema = + new Schema( + Types.NestedField.required(1, "int_field", Types.IntegerType.get()), + Types.NestedField.optional(2, "decimal_field", Types.DecimalType.of(9, 2))); + + InternalField intField = + InternalField.builder() + .name("int_field") + .schema(InternalSchema.builder().name("int").dataType(InternalType.INT).build()) + .build(); + InternalField decimalField = + InternalField.builder() + .name("decimal_field") + .schema(InternalSchema.builder().name("decimal").dataType(InternalType.DECIMAL).build()) + .build(); + + ColumnStat intStats = + ColumnStat.builder() + .field(intField) + .numValues(10) + .numNulls(0) + .totalSize(40) + .range(Range.vector(10L, 20L)) + .build(); + ColumnStat decimalStats = + ColumnStat.builder() + .field(decimalField) + .numValues(5) + .numNulls(0) + .totalSize(80) + .range(Range.vector("not-a-decimal", "also-bad")) + .build(); + + Metrics actual = + IcebergColumnStatsConverter.getInstance() + .toIceberg(icebergSchema, 10L, Arrays.asList(intStats, decimalStats)); + + assertEquals( + Conversions.toByteBuffer(Types.IntegerType.get(), 10), actual.lowerBounds().get(1)); + assertEquals( + Conversions.toByteBuffer(Types.IntegerType.get(), 20), actual.upperBounds().get(1)); + assertFalse(actual.lowerBounds().containsKey(2)); + assertFalse(actual.upperBounds().containsKey(2)); + } + private Map getBoundsMap( Object timestampColumnStatsValue, Object dateColumnStatsValue,