Skip to content

[FLINK-39036][formats] Honor microsecond Avro logical types in RowDataToAvroConverters#28071

Open
daguimu wants to merge 1 commit intoapache:masterfrom
daguimu:fix/avro-timestamp-micros-FLINK-39036
Open

[FLINK-39036][formats] Honor microsecond Avro logical types in RowDataToAvroConverters#28071
daguimu wants to merge 1 commit intoapache:masterfrom
daguimu:fix/avro-timestamp-micros-FLINK-39036

Conversation

@daguimu
Copy link
Copy Markdown

@daguimu daguimu commented Apr 29, 2026

What is the purpose of the change

Fix FLINK-39036: RowDataToAvroConverters always converts TIMESTAMP and TIMESTAMP_WITH_LOCAL_TIME_ZONE columns to milliseconds-since-epoch, ignoring the Avro logical type carried by the target schema. When the schema declares logicalType=timestamp-micros or local-timestamp-micros, readers that respect the logical type interpret that millisecond value as microseconds and shift the timestamp by a factor of 1000.

Brief change log

  • flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
    • In each TIMESTAMP_WITH_LOCAL_TIME_ZONE and TIMESTAMP_WITHOUT_TIME_ZONE converter (legacy and non-legacy mapping), check whether the supplied schema declares timestamp-micros / local-timestamp-micros. If yes, emit the value in microseconds; otherwise keep the existing milliseconds output.
    • Add private helpers isMicrosLogicalType(Schema) and toEpochMicros(Instant).
  • flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RowDataToAvroConvertersTest.java (new)
    • Three test methods covering the three converter branches that were updated; each asserts the exact micro/milli numeric output for a fixed 2024-01-02T03:04:05.123456 timestamp.

Verifying this change

This change adds tests and is already covered by them:

  • RowDataToAvroConvertersTest#testTimestampWithLocalTimeZoneRespectsMicrosLogicalTypeLocalZonedTimestampType(6) writer emits 1_704_164_645_123_456L for the micros schema and 1_704_164_645_123L for the millis schema.
  • RowDataToAvroConvertersTest#testTimestampWithoutTimeZoneRespectsLocalMicrosLogicalTypeTimestampType(6) writer emits the same numeric values for the local-timestamp-micros / -millis schemas.
  • RowDataToAvroConvertersTest#testLegacyTimestampMappingRespectsMicrosLogicalTypelegacyTimestampMapping=true path still honours timestamp-micros.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no (only a single schema-logical-type check before the existing branch)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature: no (bug fix)
  • If yes, how is the feature documented: not applicable

…aToAvroConverters

RowDataToAvroConverters always converted TIMESTAMP / TIMESTAMP_WITH_LOCAL_TIME_ZONE
columns to milliseconds-since-epoch and ignored the Avro logical type carried by
the target schema. When the destination schema declared
logicalType=timestamp-micros (or local-timestamp-micros), readers that respect
the logical type interpreted the millisecond value as microseconds and shifted
the timestamp by a factor of 1000.

Detect timestamp-micros and local-timestamp-micros at conversion time and emit
the value in microseconds; otherwise keep the existing milliseconds path so
existing schemas using timestamp-millis are unaffected.

Add three RowDataToAvroConvertersTest cases covering:
* TIMESTAMP_WITH_LOCAL_TIME_ZONE under both micros and millis schemas
* TIMESTAMP_WITHOUT_TIME_ZONE under both local-timestamp-micros and -millis schemas
* the legacyTimestampMapping path under timestamp-micros

Closes #FLINK-39036
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 29, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

}

private static boolean isMicrosLogicalType(Schema schema) {
final org.apache.avro.LogicalType logicalType = schema.getLogicalType();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this should be an import

public Object convert(Schema schema, Object object) {
return ((TimestampData) object).toInstant().toEpochMilli();
final TimestampData timestampData = (TimestampData) object;
if (isMicrosLogicalType(schema)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the the idea here, but am concerned that introducing this behaviour might result in a regression for existing applications. I wonder if it would be safer to introduce the new behaviour under a config flag until there is a version change.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the careful read.

My read is that this is a correctness fix rather than a behavior change worth preserving. The read path in AvroToRowDataConverters already honors timestamp-micros / local-timestamp-micros and returns microseconds, while the write path here ignores the same logical type and always emits milliseconds. A pipeline that round-trips through Flink with a timestamp-micros schema silently scales timestamps by 1000× — that's a data-corruption bug, not a stable contract.

The fix is also gated by what the schema declares: users whose schemas are timestamp-millis (the previous behavior for both branches) see no change at all. Only users who already declared timestamp-micros and were silently getting wrong values are affected, and for them the new output is what their schema asked for.

That said, the scope question deserves a committer's call. Could one of the flink-avro maintainers weigh in on whether a config flag is warranted on master? If so, happy to add one as a follow-up.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants