Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions flink-python/pyflink/datastream/stream_execution_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,18 @@ def from_source(self,
j_type_info = type_info.get_java_type_info()
else:
j_type_info = None
if watermark_strategy._timestamp_assigner is not None:
# in case users have specified a custom Python TimestampAssigner, the timestamp
# extraction and watermark generation cannot be performed on the Java source. So we
# create the source without watermarks first and then apply the watermark strategy
# downstream, where the custom Python TimestampAssigner can be executed.
j_data_stream = self._j_stream_execution_environment.fromSource(
source.get_java_function(),
WatermarkStrategy.no_watermarks()._j_watermark_strategy,
source_name,
j_type_info)
return DataStream(j_data_stream=j_data_stream).assign_timestamps_and_watermarks(
watermark_strategy)
j_data_stream = self._j_stream_execution_environment.fromSource(
source.get_java_function(),
watermark_strategy._j_watermark_strategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@

from pyflink.common import Configuration, ExecutionConfig
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
from pyflink.datastream import (StreamExecutionEnvironment, CheckpointConfig,
CheckpointingMode, SlotSharingGroup)
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.datastream.connectors.number_seq import NumberSequenceSource
from pyflink.datastream.execution_mode import RuntimeExecutionMode
from pyflink.datastream.formats.json import JsonRowDeserializationSchema
from pyflink.datastream.functions import SourceFunction
from pyflink.datastream.functions import ProcessFunction, SourceFunction
from pyflink.datastream.slot_sharing_group import MemorySize
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
from pyflink.find_flink_home import _find_flink_source_root
Expand Down Expand Up @@ -674,5 +676,35 @@ def test_register_cached_file(self):
self.assertEqual(cached_files.size(), 1)
self.assertEqual(cached_files[0].getField(0), 'cache_test')

def test_from_source_with_timestamp_assigner(self):
self.env.set_parallelism(1)
self.env.get_config().set_auto_watermark_interval(2000)
seq_source = NumberSequenceSource(1, 4)

class MyTimestampAssigner(TimestampAssigner):

def extract_timestamp(self, value, record_timestamp) -> int:
return value * 1000

class MyProcessFunction(ProcessFunction):

def process_element(self, value, ctx):
yield "value: {}, timestamp: {}".format(str(value), str(ctx.timestamp()))

watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
ds = self.env.from_source(seq_source, watermark_strategy, "seq_source",
type_info=Types.LONG())
ds.process(MyProcessFunction(), output_type=Types.STRING()).add_sink(self.test_sink)
self.env.execute('test from source with timestamp assigner')
results = self.test_sink.get_results()
expected = ["value: 1, timestamp: 1000",
"value: 2, timestamp: 2000",
"value: 3, timestamp: 3000",
"value: 4, timestamp: 4000"]
results.sort()
expected.sort()
self.assertEqual(expected, results)

def tearDown(self) -> None:
self.test_sink.clear()