Skip to content

[FLINK-39725][python] Apply the Python TimestampAssigner in from_source#28491

Open
wilmerdooley wants to merge 1 commit into
apache:masterfrom
wilmerdooley:oss/flink-39725
Open

[FLINK-39725][python] Apply the Python TimestampAssigner in from_source#28491
wilmerdooley wants to merge 1 commit into
apache:masterfrom
wilmerdooley:oss/flink-39725

Conversation

@wilmerdooley

@wilmerdooley wilmerdooley commented Jun 19, 2026

Copy link
Copy Markdown

What is the purpose of the change

When StreamExecutionEnvironment.from_source is called in PyFlink with a WatermarkStrategy that carries a custom Python TimestampAssigner, the assigner was never invoked. The strategy was passed straight to the Java source operator, which runs timestamp extraction and watermark generation on the JVM side and so ignores the Python assigner entirely.

Brief change log

  • flink-python/pyflink/datastream/stream_execution_environment.py: in from_source, when the WatermarkStrategy carries a Python TimestampAssigner (watermark_strategy._timestamp_assigner is not None), create the source with WatermarkStrategy.no_watermarks() and then apply the original strategy downstream via DataStream.assign_timestamps_and_watermarks, so the Python TimestampAssigner is executed. The existing path (no Python assigner) is unchanged.

Verifying this change

This change added a unit test: test_from_source_with_timestamp_assigner in flink-python/pyflink/datastream/tests/test_stream_execution_environment.py runs a NumberSequenceSource with a WatermarkStrategy.for_monotonous_timestamps() and a custom TimestampAssigner, then reads ctx.timestamp() in a ProcessFunction and asserts the timestamps the Python assigner produced (value * 1000). The test fails on the pre-fix code, where the Python assigner is ignored and the timestamp is not set.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no (the change is in PyFlink Python stream_execution_environment.py; no Java @Public(Evolving) class is touched. It does change the observable behavior of the public PyFlink from_source method, as described above, without changing its signature.)
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes (only when a Python TimestampAssigner is supplied: the fix then routes the stream through a downstream assign_timestamps_and_watermarks operator so the assigner runs per record, which is the intended behavior and the existing PyFlink mechanism. The default path, with no Python assigner, is unchanged.)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: Claude Code

JIRA: https://issues.apache.org/jira/browse/FLINK-39725

When StreamExecutionEnvironment.from_source is called in PyFlink with a
WatermarkStrategy that carries a custom Python TimestampAssigner, the assigner
was never invoked: the strategy was passed straight to the Java source operator,
which runs timestamp extraction on the JVM side. This detects a Python
TimestampAssigner and instead creates the source with no watermarks, then applies
the strategy downstream via DataStream.assign_timestamps_and_watermarks so the
Python assigner is executed, with a regression test in
test_stream_execution_environment.py.

Signed-off-by: wilmerdooley <wilmerdooley1@gmail.com>
Generated-by: Claude Code
@flinkbot

flinkbot commented Jun 19, 2026

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@wilmerdooley wilmerdooley changed the title FLINK-39725: fix(python): apply timestamp assigner in from_source for pyflink [FLINK-39725][python] Apply the Python TimestampAssigner in from_source Jun 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants