[FLINK-39036][formats] Honor microsecond Avro logical types in RowDataToAvroConverters#28071
[FLINK-39036][formats] Honor microsecond Avro logical types in RowDataToAvroConverters#28071daguimu wants to merge 1 commit intoapache:masterfrom
Conversation
…aToAvroConverters RowDataToAvroConverters always converted TIMESTAMP / TIMESTAMP_WITH_LOCAL_TIME_ZONE columns to milliseconds-since-epoch and ignored the Avro logical type carried by the target schema. When the destination schema declared logicalType=timestamp-micros (or local-timestamp-micros), readers that respect the logical type interpreted the millisecond value as microseconds and shifted the timestamp by a factor of 1000. Detect timestamp-micros and local-timestamp-micros at conversion time and emit the value in microseconds; otherwise keep the existing milliseconds path so existing schemas using timestamp-millis are unaffected. Add three RowDataToAvroConvertersTest cases covering: * TIMESTAMP_WITH_LOCAL_TIME_ZONE under both micros and millis schemas * TIMESTAMP_WITHOUT_TIME_ZONE under both local-timestamp-micros and -millis schemas * the legacyTimestampMapping path under timestamp-micros Closes #FLINK-39036
| } | ||
|
|
||
| private static boolean isMicrosLogicalType(Schema schema) { | ||
| final org.apache.avro.LogicalType logicalType = schema.getLogicalType(); |
There was a problem hiding this comment.
nit: this should be an import
| public Object convert(Schema schema, Object object) { | ||
| return ((TimestampData) object).toInstant().toEpochMilli(); | ||
| final TimestampData timestampData = (TimestampData) object; | ||
| if (isMicrosLogicalType(schema)) { |
There was a problem hiding this comment.
I agree the the idea here, but am concerned that introducing this behaviour might result in a regression for existing applications. I wonder if it would be safer to introduce the new behaviour under a config flag until there is a version change.
There was a problem hiding this comment.
Thanks for the careful read.
My read is that this is a correctness fix rather than a behavior change worth preserving. The read path in AvroToRowDataConverters already honors timestamp-micros / local-timestamp-micros and returns microseconds, while the write path here ignores the same logical type and always emits milliseconds. A pipeline that round-trips through Flink with a timestamp-micros schema silently scales timestamps by 1000× — that's a data-corruption bug, not a stable contract.
The fix is also gated by what the schema declares: users whose schemas are timestamp-millis (the previous behavior for both branches) see no change at all. Only users who already declared timestamp-micros and were silently getting wrong values are affected, and for them the new output is what their schema asked for.
That said, the scope question deserves a committer's call. Could one of the flink-avro maintainers weigh in on whether a config flag is warranted on master? If so, happy to add one as a follow-up.
What is the purpose of the change
Fix FLINK-39036:
RowDataToAvroConvertersalways convertsTIMESTAMPandTIMESTAMP_WITH_LOCAL_TIME_ZONEcolumns to milliseconds-since-epoch, ignoring the Avro logical type carried by the target schema. When the schema declareslogicalType=timestamp-microsorlocal-timestamp-micros, readers that respect the logical type interpret that millisecond value as microseconds and shift the timestamp by a factor of 1000.Brief change log
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.javaTIMESTAMP_WITH_LOCAL_TIME_ZONEandTIMESTAMP_WITHOUT_TIME_ZONEconverter (legacy and non-legacy mapping), check whether the supplied schema declarestimestamp-micros/local-timestamp-micros. If yes, emit the value in microseconds; otherwise keep the existing milliseconds output.isMicrosLogicalType(Schema)andtoEpochMicros(Instant).flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RowDataToAvroConvertersTest.java(new)2024-01-02T03:04:05.123456timestamp.Verifying this change
This change adds tests and is already covered by them:
RowDataToAvroConvertersTest#testTimestampWithLocalTimeZoneRespectsMicrosLogicalType—LocalZonedTimestampType(6)writer emits1_704_164_645_123_456Lfor the micros schema and1_704_164_645_123Lfor the millis schema.RowDataToAvroConvertersTest#testTimestampWithoutTimeZoneRespectsLocalMicrosLogicalType—TimestampType(6)writer emits the same numeric values for the local-timestamp-micros / -millis schemas.RowDataToAvroConvertersTest#testLegacyTimestampMappingRespectsMicrosLogicalType—legacyTimestampMapping=truepath still honours timestamp-micros.Does this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation