[FLINK-39725][python] Apply the Python TimestampAssigner in from_source#28491
Open
wilmerdooley wants to merge 1 commit into
Open
[FLINK-39725][python] Apply the Python TimestampAssigner in from_source#28491wilmerdooley wants to merge 1 commit into
wilmerdooley wants to merge 1 commit into
Conversation
When StreamExecutionEnvironment.from_source is called in PyFlink with a WatermarkStrategy that carries a custom Python TimestampAssigner, the assigner was never invoked: the strategy was passed straight to the Java source operator, which runs timestamp extraction on the JVM side. This detects a Python TimestampAssigner and instead creates the source with no watermarks, then applies the strategy downstream via DataStream.assign_timestamps_and_watermarks so the Python assigner is executed, with a regression test in test_stream_execution_environment.py. Signed-off-by: wilmerdooley <wilmerdooley1@gmail.com> Generated-by: Claude Code
Collaborator
5d809c7 to
1d86e78
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
When
StreamExecutionEnvironment.from_sourceis called in PyFlink with aWatermarkStrategythat carries a custom PythonTimestampAssigner, the assigner was never invoked. The strategy was passed straight to the Java source operator, which runs timestamp extraction and watermark generation on the JVM side and so ignores the Python assigner entirely.Brief change log
flink-python/pyflink/datastream/stream_execution_environment.py: infrom_source, when theWatermarkStrategycarries a PythonTimestampAssigner(watermark_strategy._timestamp_assigner is not None), create the source withWatermarkStrategy.no_watermarks()and then apply the original strategy downstream viaDataStream.assign_timestamps_and_watermarks, so the PythonTimestampAssigneris executed. The existing path (no Python assigner) is unchanged.Verifying this change
This change added a unit test:
test_from_source_with_timestamp_assignerinflink-python/pyflink/datastream/tests/test_stream_execution_environment.pyruns aNumberSequenceSourcewith aWatermarkStrategy.for_monotonous_timestamps()and a customTimestampAssigner, then readsctx.timestamp()in aProcessFunctionand asserts the timestamps the Python assigner produced (value * 1000). The test fails on the pre-fix code, where the Python assigner is ignored and the timestamp is not set.Does this pull request potentially affect one of the following parts:
@Public(Evolving): no (the change is in PyFlink Pythonstream_execution_environment.py; no Java@Public(Evolving)class is touched. It does change the observable behavior of the public PyFlinkfrom_sourcemethod, as described above, without changing its signature.)TimestampAssigneris supplied: the fix then routes the stream through a downstreamassign_timestamps_and_watermarksoperator so the assigner runs per record, which is the intended behavior and the existing PyFlink mechanism. The default path, with no Python assigner, is unchanged.)Documentation
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code
JIRA: https://issues.apache.org/jira/browse/FLINK-39725