Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.CollectionUtil;

import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -166,7 +168,11 @@ public Object convert(Schema schema, Object object) {

@Override
public Object convert(Schema schema, Object object) {
return ((TimestampData) object).toInstant().toEpochMilli();
final TimestampData timestampData = (TimestampData) object;
if (isMicrosLogicalType(schema)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the the idea here, but am concerned that introducing this behaviour might result in a regression for existing applications. I wonder if it would be safer to introduce the new behaviour under a config flag until there is a version change.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the careful read.

My read is that this is a correctness fix rather than a behavior change worth preserving. The read path in AvroToRowDataConverters already honors timestamp-micros / local-timestamp-micros and returns microseconds, while the write path here ignores the same logical type and always emits milliseconds. A pipeline that round-trips through Flink with a timestamp-micros schema silently scales timestamps by 1000× — that's a data-corruption bug, not a stable contract.

The fix is also gated by what the schema declares: users whose schemas are timestamp-millis (the previous behavior for both branches) see no change at all. Only users who already declared timestamp-micros and were silently getting wrong values are affected, and for them the new output is what their schema asked for.

That said, the scope question deserves a committer's call. Could one of the flink-avro maintainers weigh in on whether a config flag is warranted on master? If so, happy to add one as a follow-up.

return toEpochMicros(timestampData.toInstant());
}
return timestampData.toInstant().toEpochMilli();
}
};
} else {
Expand All @@ -176,7 +182,14 @@ public Object convert(Schema schema, Object object) {

@Override
public Object convert(Schema schema, Object object) {
return ((TimestampData) object)
final TimestampData timestampData = (TimestampData) object;
if (isMicrosLogicalType(schema)) {
return toEpochMicros(
timestampData
.toLocalDateTime()
.toInstant(ZoneOffset.UTC));
}
return timestampData
.toLocalDateTime()
.toInstant(ZoneOffset.UTC)
.toEpochMilli();
Expand All @@ -194,7 +207,11 @@ public Object convert(Schema schema, Object object) {

@Override
public Object convert(Schema schema, Object object) {
return ((TimestampData) object).toInstant().toEpochMilli();
final TimestampData timestampData = (TimestampData) object;
if (isMicrosLogicalType(schema)) {
return toEpochMicros(timestampData.toInstant());
}
return timestampData.toInstant().toEpochMilli();
}
};
}
Expand Down Expand Up @@ -353,4 +370,16 @@ public Object convert(Schema schema, Object object) {
}
};
}

private static boolean isMicrosLogicalType(Schema schema) {
final org.apache.avro.LogicalType logicalType = schema.getLogicalType();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this should be an import

return logicalType == LogicalTypes.timestampMicros()
|| logicalType == LogicalTypes.localTimestampMicros();
}

private static long toEpochMicros(Instant instant) {
return Math.addExact(
Math.multiplyExact(instant.getEpochSecond(), 1_000_000L),
instant.getNano() / 1_000L);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.avro;

import org.apache.flink.formats.avro.RowDataToAvroConverters.RowDataToAvroConverter;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.TimestampType;

import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;

import java.time.LocalDateTime;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link RowDataToAvroConverters}. */
class RowDataToAvroConvertersTest {

private static final String TIMESTAMP_MILLIS_SCHEMA =
"{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}";
private static final String TIMESTAMP_MICROS_SCHEMA =
"{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}";
private static final String LOCAL_TIMESTAMP_MILLIS_SCHEMA =
"{\"type\":\"long\",\"logicalType\":\"local-timestamp-millis\"}";
private static final String LOCAL_TIMESTAMP_MICROS_SCHEMA =
"{\"type\":\"long\",\"logicalType\":\"local-timestamp-micros\"}";

@Test
void testTimestampWithLocalTimeZoneRespectsMicrosLogicalType() {
// FLINK-39036: writer must produce micros when the Avro schema declares
// logicalType=timestamp-micros, otherwise downstream readers that respect
// the logical type interpret the millis value as micros and shift the
// timestamp by a factor of 1000.
final RowDataToAvroConverter converter =
RowDataToAvroConverters.createConverter(new LocalZonedTimestampType(6), false);

final TimestampData timestamp =
TimestampData.fromLocalDateTime(
LocalDateTime.parse("2024-01-02T03:04:05.123456"));

final long micros =
(long) converter.convert(new Schema.Parser().parse(TIMESTAMP_MICROS_SCHEMA), timestamp);
assertThat(micros).isEqualTo(1_704_164_645_123_456L);

final long millis =
(long) converter.convert(new Schema.Parser().parse(TIMESTAMP_MILLIS_SCHEMA), timestamp);
assertThat(millis).isEqualTo(1_704_164_645_123L);
}

@Test
void testTimestampWithoutTimeZoneRespectsLocalMicrosLogicalType() {
// TIMESTAMP_WITHOUT_TIME_ZONE maps to Avro local-timestamp-* logical types
// under the new mapping. The converter must honor local-timestamp-micros
// and emit microseconds instead of milliseconds.
final RowDataToAvroConverter converter =
RowDataToAvroConverters.createConverter(new TimestampType(6), false);

final TimestampData timestamp =
TimestampData.fromLocalDateTime(
LocalDateTime.parse("2024-01-02T03:04:05.123456"));

final long micros =
(long)
converter.convert(
new Schema.Parser().parse(LOCAL_TIMESTAMP_MICROS_SCHEMA), timestamp);
assertThat(micros).isEqualTo(1_704_164_645_123_456L);

final long millis =
(long)
converter.convert(
new Schema.Parser().parse(LOCAL_TIMESTAMP_MILLIS_SCHEMA), timestamp);
assertThat(millis).isEqualTo(1_704_164_645_123L);
}

@Test
void testLegacyTimestampMappingRespectsMicrosLogicalType() {
// The legacy mapping path also has to honor timestamp-micros when present
// in the Avro schema, since users may serialize against an externally
// provided schema with the micros logical type.
final RowDataToAvroConverter converter =
RowDataToAvroConverters.createConverter(new TimestampType(6), true);

final TimestampData timestamp =
TimestampData.fromLocalDateTime(
LocalDateTime.parse("2024-01-02T03:04:05.123456"));

final long micros =
(long) converter.convert(new Schema.Parser().parse(TIMESTAMP_MICROS_SCHEMA), timestamp);
assertThat(micros).isEqualTo(1_704_164_645_123_456L);
}
}