[FLINK-38918][python][kafka connector] support per cluster offset for DynamicKafkaSource in pyflink#28512
Open
bowenli86 wants to merge 4 commits into
Conversation
Generated-by: Codex GPT-5
Generated-by: Codex GPT-5
Generated-by: Codex GPT-5
Generated-by: Codex GPT-5
Collaborator
psharma-openai
approved these changes
Jun 22, 2026
psharma-openai
approved these changes
Jun 22, 2026
| j_service = gateway.jvm.org.apache.flink.connector.kafka.dynamic.metadata \ | ||
| .SingleClusterTopicMetadataService(kafka_cluster_id, j_properties) | ||
|
|
||
| j_metadata_service = gateway.jvm.org.apache.flink.connector.kafka.dynamic.metadata \ |
There was a problem hiding this comment.
ooc: is this a common pattern in python just looks a bit weird.
Member
Author
There was a problem hiding this comment.
the pattern is for Py4J, not a general Python pattern
lnbest0707
reviewed
Jun 23, 2026
| return starting_offsets_initializer is not None or stopping_offsets_initializer is not None | ||
|
|
||
|
|
||
| class ClusterMetadata(object): |
There was a problem hiding this comment.
The SingleClusterTopicMetadataService offset path is usable, but I do not see a public consumer for the new ClusterMetadata wrapper. After a user constructs ClusterMetadata(...), the only accessible value is the private _j_cluster_metadata field; DynamicKafkaSourceBuilder accepts a metadata service, not cluster metadata. Should we either remove this public wrapper for now or add/document the supported API that consumes it?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
This PR addresses FLINK-38918 by adding PyFlink support for configuring per-cluster starting and stopping offsets for
DynamicKafkaSource.The Java dynamic Kafka connector already supports offset initializers on
ClusterMetadataandSingleClusterTopicMetadataService; this change exposes that capability through the PyFlink wrappers.Brief change log
ClusterMetadatawrapper that forwards topics, properties, and optional starting/stoppingKafkaOffsetsInitializers to the JavaClusterMetadata.SingleClusterTopicMetadataServicewith optional per-cluster starting and stopping offsets while preserving the existing constructor behavior when offsets are omitted.ClusterMetadatathroughpyflink.datastream.connectors.flink-pythontest Kafka SQL connector dependency to5.0.0-2.2.Verifying this change
This change added tests and can be verified as follows:
flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.pyforSingleClusterTopicMetadataServiceoffset forwarding.flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.pyforClusterMetadataoffset forwarding.Does this pull request potentially affect one of the following parts:
flink-pythontest-scoped Kafka SQL connector dependency@Public(Evolving): yes, exposes PyFlink connector API additionsDocumentation
Was generative AI tooling used to co-author this PR?
Generated-by: Codex GPT-5