diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py index c69e105347aa3..aabd203175eac 100644 --- a/flink-python/pyflink/datastream/stream_execution_environment.py +++ b/flink-python/pyflink/datastream/stream_execution_environment.py @@ -767,6 +767,18 @@ def from_source(self, j_type_info = type_info.get_java_type_info() else: j_type_info = None + if watermark_strategy._timestamp_assigner is not None: + # in case users have specified a custom Python TimestampAssigner, the timestamp + # extraction and watermark generation cannot be performed on the Java source. So we + # create the source without watermarks first and then apply the watermark strategy + # downstream, where the custom Python TimestampAssigner can be executed. + j_data_stream = self._j_stream_execution_environment.fromSource( + source.get_java_function(), + WatermarkStrategy.no_watermarks()._j_watermark_strategy, + source_name, + j_type_info) + return DataStream(j_data_stream=j_data_stream).assign_timestamps_and_watermarks( + watermark_strategy) j_data_stream = self._j_stream_execution_environment.fromSource( source.get_java_function(), watermark_strategy._j_watermark_strategy, diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py index 8a10244a8a3bd..61f3a4411c5f8 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py @@ -27,12 +27,14 @@ from pyflink.common import Configuration, ExecutionConfig from pyflink.common.typeinfo import Types +from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner from pyflink.datastream import (StreamExecutionEnvironment, CheckpointConfig, CheckpointingMode, SlotSharingGroup) from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer +from pyflink.datastream.connectors.number_seq import NumberSequenceSource from pyflink.datastream.execution_mode import RuntimeExecutionMode from pyflink.datastream.formats.json import JsonRowDeserializationSchema -from pyflink.datastream.functions import SourceFunction +from pyflink.datastream.functions import ProcessFunction, SourceFunction from pyflink.datastream.slot_sharing_group import MemorySize from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction from pyflink.find_flink_home import _find_flink_source_root @@ -674,5 +676,35 @@ def test_register_cached_file(self): self.assertEqual(cached_files.size(), 1) self.assertEqual(cached_files[0].getField(0), 'cache_test') + def test_from_source_with_timestamp_assigner(self): + self.env.set_parallelism(1) + self.env.get_config().set_auto_watermark_interval(2000) + seq_source = NumberSequenceSource(1, 4) + + class MyTimestampAssigner(TimestampAssigner): + + def extract_timestamp(self, value, record_timestamp) -> int: + return value * 1000 + + class MyProcessFunction(ProcessFunction): + + def process_element(self, value, ctx): + yield "value: {}, timestamp: {}".format(str(value), str(ctx.timestamp())) + + watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \ + .with_timestamp_assigner(MyTimestampAssigner()) + ds = self.env.from_source(seq_source, watermark_strategy, "seq_source", + type_info=Types.LONG()) + ds.process(MyProcessFunction(), output_type=Types.STRING()).add_sink(self.test_sink) + self.env.execute('test from source with timestamp assigner') + results = self.test_sink.get_results() + expected = ["value: 1, timestamp: 1000", + "value: 2, timestamp: 2000", + "value: 3, timestamp: 3000", + "value: 4, timestamp: 4000"] + results.sort() + expected.sort() + self.assertEqual(expected, results) + def tearDown(self) -> None: self.test_sink.clear()