Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;

import org.apache.iceberg.Metrics;
import org.apache.iceberg.Schema;
Expand All @@ -40,6 +41,7 @@
import org.apache.xtable.model.stat.Range;

/** Column stats extractor for iceberg table format. */
@Log4j2
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class IcebergColumnStatsConverter {
private static final IcebergSchemaExtractor SCHEMA_EXTRACTOR =
Expand Down Expand Up @@ -67,13 +69,15 @@ public Metrics toIceberg(Schema schema, long totalRowCount, List<ColumnStat> fie
valueCounts.put(fieldId, columnStats.getNumValues());
nullValueCounts.put(fieldId, columnStats.getNumNulls());
Type fieldType = icebergField.type();
if (columnStats.getRange().getMinValue() != null) {
lowerBounds.put(
fieldId, Conversions.toByteBuffer(fieldType, columnStats.getRange().getMinValue()));
Object minValue = coerceStatValue(fieldType, columnStats.getRange().getMinValue());
ByteBuffer minBuffer = safeToByteBuffer(fieldType, minValue, field.getPath(), "min");
if (minBuffer != null) {
lowerBounds.put(fieldId, minBuffer);
}
if (columnStats.getRange().getMaxValue() != null) {
upperBounds.put(
fieldId, Conversions.toByteBuffer(fieldType, columnStats.getRange().getMaxValue()));
Object maxValue = coerceStatValue(fieldType, columnStats.getRange().getMaxValue());
ByteBuffer maxBuffer = safeToByteBuffer(fieldType, maxValue, field.getPath(), "max");
if (maxBuffer != null) {
upperBounds.put(fieldId, maxBuffer);
}
});
return new Metrics(
Expand Down Expand Up @@ -130,4 +134,55 @@ private Object convertFromIcebergValue(Type fieldType, ByteBuffer value) {
}
return convertedValue;
}

private ByteBuffer safeToByteBuffer(Type fieldType, Object value, String fieldPath, String side) {
if (value == null) {
return null;
}
try {
return Conversions.toByteBuffer(fieldType, value);
} catch (RuntimeException e) {
log.warn(
"Skipping {} bound for field {} due to uncoercible value {} for type {}",
side,
fieldPath,
value,
fieldType,
e);
return null;
}
}

private Object coerceStatValue(Type fieldType, Object value) {
if (value == null) {
return null;
}
switch (fieldType.typeId()) {
case STRING:
return value.toString();
case FLOAT:
return value instanceof Number ? ((Number) value).floatValue() : null;
case DOUBLE:
return value instanceof Number ? ((Number) value).doubleValue() : null;
case INTEGER:
return value instanceof Number ? ((Number) value).intValue() : null;
case LONG:
return value instanceof Number ? ((Number) value).longValue() : null;
case DATE:
return value instanceof Number ? ((Number) value).intValue() : null;
case TIME:
case TIMESTAMP:
return value instanceof Number ? ((Number) value).longValue() : null;
case BOOLEAN:
if (value instanceof Boolean) {
return value;
}
if (value instanceof Number) {
return ((Number) value).intValue() != 0;
}
return null;
default:
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.xtable.iceberg;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand Down Expand Up @@ -382,6 +383,53 @@ public void fromIceberg() {
assertEquals(expected, actual);
}

@Test
public void testSchemaEvolutionStatsCoercionAndSkip() {
Schema icebergSchema =
new Schema(
Types.NestedField.required(1, "int_field", Types.IntegerType.get()),
Types.NestedField.optional(2, "decimal_field", Types.DecimalType.of(9, 2)));

InternalField intField =
InternalField.builder()
.name("int_field")
.schema(InternalSchema.builder().name("int").dataType(InternalType.INT).build())
.build();
InternalField decimalField =
InternalField.builder()
.name("decimal_field")
.schema(InternalSchema.builder().name("decimal").dataType(InternalType.DECIMAL).build())
.build();

ColumnStat intStats =
ColumnStat.builder()
.field(intField)
.numValues(10)
.numNulls(0)
.totalSize(40)
.range(Range.vector(10L, 20L))
.build();
ColumnStat decimalStats =
ColumnStat.builder()
.field(decimalField)
.numValues(5)
.numNulls(0)
.totalSize(80)
.range(Range.vector("not-a-decimal", "also-bad"))
.build();

Metrics actual =
IcebergColumnStatsConverter.getInstance()
.toIceberg(icebergSchema, 10L, Arrays.asList(intStats, decimalStats));

assertEquals(
Conversions.toByteBuffer(Types.IntegerType.get(), 10), actual.lowerBounds().get(1));
assertEquals(
Conversions.toByteBuffer(Types.IntegerType.get(), 20), actual.upperBounds().get(1));
assertFalse(actual.lowerBounds().containsKey(2));
assertFalse(actual.upperBounds().containsKey(2));
}

private Map<Integer, ByteBuffer> getBoundsMap(
Object timestampColumnStatsValue,
Object dateColumnStatsValue,
Expand Down