diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 0f5e5aa07a01c..28390abe8f6a2 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -293,7 +293,7 @@ under the License.
org.apache.flink
flink-sql-connector-kafka
- 4.0.1-2.0
+ 5.0.0-2.2
test
diff --git a/flink-python/pyflink/datastream/connectors/dynamic_kafka.py b/flink-python/pyflink/datastream/connectors/dynamic_kafka.py
index 7473077a08eaa..9845b66acd306 100644
--- a/flink-python/pyflink/datastream/connectors/dynamic_kafka.py
+++ b/flink-python/pyflink/datastream/connectors/dynamic_kafka.py
@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from typing import Dict, Set, Union
+from typing import Dict, Optional, Set, Union
from py4j.java_gateway import JavaObject
@@ -36,6 +36,17 @@
]
+def _to_j_offsets_initializer(
+ offsets_initializer: Optional[KafkaOffsetsInitializer]) -> Optional[JavaObject]:
+ return offsets_initializer._j_initializer if offsets_initializer is not None else None
+
+
+def _has_cluster_offsets(
+ starting_offsets_initializer: Optional[KafkaOffsetsInitializer],
+ stopping_offsets_initializer: Optional[KafkaOffsetsInitializer]) -> bool:
+ return starting_offsets_initializer is not None or stopping_offsets_initializer is not None
+
+
class KafkaMetadataService(object):
"""
Base class for Kafka metadata service wrappers.
@@ -50,13 +61,30 @@ class SingleClusterTopicMetadataService(KafkaMetadataService):
A KafkaMetadataService backed by a single Kafka cluster where stream ids map to topics.
"""
- def __init__(self, kafka_cluster_id: str, properties: Dict[str, str]):
+ def __init__(self, kafka_cluster_id: str, properties: Dict[str, str],
+ starting_offsets_initializer: Optional[KafkaOffsetsInitializer] = None,
+ stopping_offsets_initializer: Optional[KafkaOffsetsInitializer] = None):
+ """
+ :param kafka_cluster_id: The ID of the Kafka cluster.
+ :param properties: The properties to access the cluster.
+ :param starting_offsets_initializer: Optional starting offsets initializer for this cluster.
+ :param stopping_offsets_initializer: Optional stopping offsets initializer for this cluster.
+ """
gateway = get_gateway()
j_properties = gateway.jvm.java.util.Properties()
for key, value in properties.items():
j_properties.setProperty(key, value)
- j_service = gateway.jvm.org.apache.flink.connector.kafka.dynamic.metadata \
- .SingleClusterTopicMetadataService(kafka_cluster_id, j_properties)
+
+ j_metadata_service = gateway.jvm.org.apache.flink.connector.kafka.dynamic.metadata \
+ .SingleClusterTopicMetadataService
+ if _has_cluster_offsets(starting_offsets_initializer, stopping_offsets_initializer):
+ j_service = j_metadata_service(
+ kafka_cluster_id,
+ j_properties,
+ _to_j_offsets_initializer(starting_offsets_initializer),
+ _to_j_offsets_initializer(stopping_offsets_initializer))
+ else:
+ j_service = j_metadata_service(kafka_cluster_id, j_properties)
super().__init__(j_service)
diff --git a/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py b/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py
index 448289f2727f3..8e545c1f9db81 100644
--- a/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py
+++ b/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py
@@ -20,8 +20,8 @@
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream.connectors.dynamic_kafka import DynamicKafkaSource, \
- KafkaRecordDeserializationSchema, KafkaStreamSetSubscriber, StreamPatternSubscriber, \
- SingleClusterTopicMetadataService
+ KafkaRecordDeserializationSchema, KafkaStreamSetSubscriber, SingleClusterTopicMetadataService, \
+ StreamPatternSubscriber
from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer, KafkaOffsetResetStrategy, \
KafkaTopicPartition
from pyflink.java_gateway import get_gateway
@@ -411,3 +411,92 @@ def _check_specified_offsets_initializer(self,
self.assertTrue(
offset_reset_strategy.equals(reset_strategy._to_j_offset_reset_strategy())
)
+
+ def test_single_cluster_metadata_service_with_offsets(self):
+ """
+ Test the new constructor parameters for SingleClusterTopicMetadataService
+ to ensure offsets are correctly forwarded to the underlying Java object.
+ """
+ starting_offset = KafkaOffsetsInitializer.earliest()
+ stopping_offset = KafkaOffsetsInitializer.latest()
+
+ metadata_service = SingleClusterTopicMetadataService(
+ 'test-cluster',
+ {'bootstrap.servers': 'localhost:9092'},
+ starting_offset,
+ stopping_offset
+ )
+
+ self.assertIsNotNone(metadata_service._j_metadata_service)
+
+ self._check_metadata_service_offsets(
+ metadata_service._j_metadata_service,
+ starting_offset,
+ stopping_offset)
+
+ def test_single_cluster_metadata_service_default_offsets(self):
+ """
+ Test that SingleClusterTopicMetadataService maintains backward compatibility
+ and leaves offsets unset when not explicitly provided.
+ """
+ metadata_service = self._build_metadata_service()
+
+ self._check_metadata_service_offsets(metadata_service._j_metadata_service, None, None)
+
+ def test_single_cluster_metadata_service_starting_offsets_only(self):
+ """
+ Test that SingleClusterTopicMetadataService allows a per-cluster starting
+ offsets initializer without a stopping offsets initializer.
+ """
+ starting_offset = KafkaOffsetsInitializer.earliest()
+
+ metadata_service = SingleClusterTopicMetadataService(
+ 'test-cluster',
+ {'bootstrap.servers': 'localhost:9092'},
+ starting_offset
+ )
+
+ self._check_metadata_service_offsets(
+ metadata_service._j_metadata_service,
+ starting_offset,
+ None)
+
+ def test_single_cluster_metadata_service_stopping_offsets_only(self):
+ """
+ Test that SingleClusterTopicMetadataService allows a per-cluster stopping
+ offsets initializer without a starting offsets initializer.
+ """
+ stopping_offset = KafkaOffsetsInitializer.latest()
+
+ metadata_service = SingleClusterTopicMetadataService(
+ 'test-cluster',
+ {'bootstrap.servers': 'localhost:9092'},
+ stopping_offsets_initializer=stopping_offset
+ )
+
+ self._check_metadata_service_offsets(
+ metadata_service._j_metadata_service,
+ None,
+ stopping_offset)
+
+ def _check_metadata_service_offsets(self, metadata_service, expected_starting_offset,
+ expected_stopping_offset):
+ j_starting_initializer = get_field_value(metadata_service, 'startingOffsetsInitializer')
+ j_stopping_initializer = get_field_value(metadata_service, 'stoppingOffsetsInitializer')
+ self._check_offsets(
+ j_starting_initializer,
+ j_stopping_initializer,
+ expected_starting_offset,
+ expected_stopping_offset)
+
+ def _check_offsets(self, j_starting_initializer, j_stopping_initializer,
+ expected_starting_offset, expected_stopping_offset):
+ if expected_starting_offset is None:
+ self.assertIsNone(j_starting_initializer)
+ else:
+ self.assertTrue(expected_starting_offset._j_initializer.equals(j_starting_initializer))
+
+ if expected_stopping_offset is None:
+ self.assertIsNone(j_stopping_initializer)
+ else:
+ self.assertTrue(expected_stopping_offset._j_initializer.equals(j_stopping_initializer))