Skip to content

[FLINK-38918][python][kafka connector] support per cluster offset for DynamicKafkaSource in pyflink#28512

Open
bowenli86 wants to merge 4 commits into
apache:masterfrom
bowenli86:dev/bowenli/codex/flink-38918-pyflink-per-cluster-offset
Open

[FLINK-38918][python][kafka connector] support per cluster offset for DynamicKafkaSource in pyflink#28512
bowenli86 wants to merge 4 commits into
apache:masterfrom
bowenli86:dev/bowenli/codex/flink-38918-pyflink-per-cluster-offset

Conversation

@bowenli86

@bowenli86 bowenli86 commented Jun 22, 2026

Copy link
Copy Markdown
Member

What is the purpose of the change

This PR addresses FLINK-38918 by adding PyFlink support for configuring per-cluster starting and stopping offsets for DynamicKafkaSource.

The Java dynamic Kafka connector already supports offset initializers on ClusterMetadata and SingleClusterTopicMetadataService; this change exposes that capability through the PyFlink wrappers.

Brief change log

  • Added a PyFlink ClusterMetadata wrapper that forwards topics, properties, and optional starting/stopping KafkaOffsetsInitializers to the Java ClusterMetadata.
  • Extended SingleClusterTopicMetadataService with optional per-cluster starting and stopping offsets while preserving the existing constructor behavior when offsets are omitted.
  • Exported ClusterMetadata through pyflink.datastream.connectors.
  • Updated the flink-python test Kafka SQL connector dependency to 5.0.0-2.2.
  • Added Python unit tests for full, default, starting-only, and stopping-only per-cluster offset configurations.

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests in flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py for SingleClusterTopicMetadataService offset forwarding.
  • Added unit tests in flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py for ClusterMetadata offset forwarding.
  • Added coverage for backward-compatible default behavior when no per-cluster offsets are provided.
  • Added coverage for partial configurations where only starting offsets or only stopping offsets are provided.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): yes, upgrades a flink-python test-scoped Kafka SQL connector dependency
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes, exposes PyFlink connector API additions
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? Python API docstrings

Was generative AI tooling used to co-author this PR?
  • Yes (Codex GPT-5)

Generated-by: Codex GPT-5

@flinkbot

flinkbot commented Jun 22, 2026

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

j_service = gateway.jvm.org.apache.flink.connector.kafka.dynamic.metadata \
.SingleClusterTopicMetadataService(kafka_cluster_id, j_properties)

j_metadata_service = gateway.jvm.org.apache.flink.connector.kafka.dynamic.metadata \

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooc: is this a common pattern in python just looks a bit weird.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the pattern is for Py4J, not a general Python pattern

return starting_offsets_initializer is not None or stopping_offsets_initializer is not None


class ClusterMetadata(object):

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SingleClusterTopicMetadataService offset path is usable, but I do not see a public consumer for the new ClusterMetadata wrapper. After a user constructs ClusterMetadata(...), the only accessible value is the private _j_cluster_metadata field; DynamicKafkaSourceBuilder accepts a metadata service, not cluster metadata. Should we either remove this public wrapper for now or add/document the supported API that consumes it?

@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label Jun 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants