From 9602cf767c2cb71786a63a5d0a06aa9ba41a689e Mon Sep 17 00:00:00 2001 From: bowenli86 Date: Mon, 15 Jun 2026 17:08:16 -0700 Subject: [PATCH 1/5] [FLINK-38918][PyFlink] Support per cluster offset for DynamicKafkaSource Generated-by: Codex GPT-5 --- .../datastream/connectors/dynamic_kafka.py | 72 +++++++++++++++++-- 1 file changed, 68 insertions(+), 4 deletions(-) diff --git a/flink-python/pyflink/datastream/connectors/dynamic_kafka.py b/flink-python/pyflink/datastream/connectors/dynamic_kafka.py index 7473077a08eaa4..2b38bf591bb856 100644 --- a/flink-python/pyflink/datastream/connectors/dynamic_kafka.py +++ b/flink-python/pyflink/datastream/connectors/dynamic_kafka.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -from typing import Dict, Set, Union +from typing import Dict, Optional, Set, Union from py4j.java_gateway import JavaObject @@ -25,6 +25,7 @@ from pyflink.java_gateway import get_gateway __all__ = [ + 'ClusterMetadata', 'DynamicKafkaSource', 'DynamicKafkaSourceBuilder', 'KafkaMetadataService', @@ -36,6 +37,52 @@ ] +def _to_j_offsets_initializer( + offsets_initializer: Optional[KafkaOffsetsInitializer]) -> Optional[JavaObject]: + return offsets_initializer._j_initializer if offsets_initializer is not None else None + + +def _has_cluster_offsets( + starting_offsets_initializer: Optional[KafkaOffsetsInitializer], + stopping_offsets_initializer: Optional[KafkaOffsetsInitializer]) -> bool: + return starting_offsets_initializer is not None or stopping_offsets_initializer is not None + + +class ClusterMetadata(object): + """ + Wrapper for Java ClusterMetadata. + """ + + def __init__(self, topics: Set[str], properties: Dict[str, str], + starting_offsets_initializer: Optional[KafkaOffsetsInitializer] = None, + stopping_offsets_initializer: Optional[KafkaOffsetsInitializer] = None): + """ + :param topics: The topics belonging to a cluster. + :param properties: The properties to access a cluster. + :param starting_offsets_initializer: Optional starting offsets initializer for the cluster. + :param stopping_offsets_initializer: Optional stopping offsets initializer for the cluster. + """ + gateway = get_gateway() + j_topics = gateway.jvm.java.util.HashSet() + for topic in topics: + j_topics.add(topic) + + j_properties = gateway.jvm.java.util.Properties() + for key, value in properties.items(): + j_properties.setProperty(key, value) + + j_cluster_metadata = gateway.jvm.org.apache.flink.connector.kafka.dynamic.metadata \ + .ClusterMetadata + if _has_cluster_offsets(starting_offsets_initializer, stopping_offsets_initializer): + self._j_cluster_metadata = j_cluster_metadata( + j_topics, + j_properties, + _to_j_offsets_initializer(starting_offsets_initializer), + _to_j_offsets_initializer(stopping_offsets_initializer)) + else: + self._j_cluster_metadata = j_cluster_metadata(j_topics, j_properties) + + class KafkaMetadataService(object): """ Base class for Kafka metadata service wrappers. @@ -50,13 +97,30 @@ class SingleClusterTopicMetadataService(KafkaMetadataService): A KafkaMetadataService backed by a single Kafka cluster where stream ids map to topics. """ - def __init__(self, kafka_cluster_id: str, properties: Dict[str, str]): + def __init__(self, kafka_cluster_id: str, properties: Dict[str, str], + starting_offsets_initializer: Optional[KafkaOffsetsInitializer] = None, + stopping_offsets_initializer: Optional[KafkaOffsetsInitializer] = None): + """ + :param kafka_cluster_id: The ID of the Kafka cluster. + :param properties: The properties to access the cluster. + :param starting_offsets_initializer: Optional starting offsets initializer for this cluster. + :param stopping_offsets_initializer: Optional stopping offsets initializer for this cluster. + """ gateway = get_gateway() j_properties = gateway.jvm.java.util.Properties() for key, value in properties.items(): j_properties.setProperty(key, value) - j_service = gateway.jvm.org.apache.flink.connector.kafka.dynamic.metadata \ - .SingleClusterTopicMetadataService(kafka_cluster_id, j_properties) + + j_metadata_service = gateway.jvm.org.apache.flink.connector.kafka.dynamic.metadata \ + .SingleClusterTopicMetadataService + if _has_cluster_offsets(starting_offsets_initializer, stopping_offsets_initializer): + j_service = j_metadata_service( + kafka_cluster_id, + j_properties, + _to_j_offsets_initializer(starting_offsets_initializer), + _to_j_offsets_initializer(stopping_offsets_initializer)) + else: + j_service = j_metadata_service(kafka_cluster_id, j_properties) super().__init__(j_service) From 3ac1026414dbb0caf0946159058a797263d24e40 Mon Sep 17 00:00:00 2001 From: bowenli86 Date: Mon, 15 Jun 2026 17:08:25 -0700 Subject: [PATCH 2/5] [FLINK-38918][PyFlink] Add unit tests for per cluster offset feature Generated-by: Codex GPT-5 --- .../connectors/tests/test_dynamic_kafka.py | 97 ++++++++++++++++++- 1 file changed, 94 insertions(+), 3 deletions(-) diff --git a/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py b/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py index 448289f2727f34..53159123b85755 100644 --- a/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py +++ b/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py @@ -19,9 +19,9 @@ from typing import Dict from pyflink.common.serialization import SimpleStringSchema from pyflink.common.watermark_strategy import WatermarkStrategy -from pyflink.datastream.connectors.dynamic_kafka import DynamicKafkaSource, \ - KafkaRecordDeserializationSchema, KafkaStreamSetSubscriber, StreamPatternSubscriber, \ - SingleClusterTopicMetadataService +from pyflink.datastream.connectors.dynamic_kafka import ClusterMetadata, DynamicKafkaSource, \ + KafkaRecordDeserializationSchema, KafkaStreamSetSubscriber, SingleClusterTopicMetadataService, \ + StreamPatternSubscriber from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer, KafkaOffsetResetStrategy, \ KafkaTopicPartition from pyflink.java_gateway import get_gateway @@ -411,3 +411,94 @@ def _check_specified_offsets_initializer(self, self.assertTrue( offset_reset_strategy.equals(reset_strategy._to_j_offset_reset_strategy()) ) + + def test_single_cluster_metadata_service_with_offsets(self): + """ + Test the new constructor parameters for SingleClusterTopicMetadataService + to ensure offsets are correctly forwarded to the underlying Java object. + """ + starting_offset = KafkaOffsetsInitializer.earliest() + stopping_offset = KafkaOffsetsInitializer.latest() + + metadata_service = SingleClusterTopicMetadataService( + 'test-cluster', + {'bootstrap.servers': 'localhost:9092'}, + starting_offset, + stopping_offset + ) + + self.assertIsNotNone(metadata_service._j_metadata_service) + + self._check_cluster_offsets( + metadata_service._j_metadata_service, + {'EarliestOffsetsInitializer', 'ReaderHandledOffsetsInitializer'}, + {'LatestOffsetsInitializer', 'ReaderHandledOffsetsInitializer'}) + + def test_single_cluster_metadata_service_default_offsets(self): + """ + Test that SingleClusterTopicMetadataService maintains backward compatibility + and leaves offsets unset when not explicitly provided. + """ + metadata_service = self._build_metadata_service() + + self._check_default_cluster_offsets(metadata_service._j_metadata_service) + + def test_cluster_metadata_with_offsets(self): + """ + Test that ClusterMetadata correctly forwards optional offsets initializers + to the underlying Java object. + """ + starting_offset = KafkaOffsetsInitializer.earliest() + stopping_offset = KafkaOffsetsInitializer.latest() + + cluster_metadata = ClusterMetadata( + {'test-topic'}, + {'bootstrap.servers': 'localhost:9092'}, + starting_offset, + stopping_offset + ) + + self.assertIsNotNone(cluster_metadata._j_cluster_metadata) + + self._check_cluster_offsets( + cluster_metadata._j_cluster_metadata, + {'EarliestOffsetsInitializer', 'ReaderHandledOffsetsInitializer'}, + {'LatestOffsetsInitializer', 'ReaderHandledOffsetsInitializer'}) + + def test_cluster_metadata_default_offsets(self): + """ + Test that ClusterMetadata keeps using the legacy Java constructor when offsets + are not explicitly provided. + """ + cluster_metadata = ClusterMetadata( + {'test-topic'}, {'bootstrap.servers': 'localhost:9092'}) + + self._check_default_cluster_offsets(cluster_metadata._j_cluster_metadata) + + def _check_default_cluster_offsets(self, cluster_metadata): + j_starting_initializer = get_field(cluster_metadata.getClass(), 'startingOffsetsInitializer') + j_stopping_initializer = get_field(cluster_metadata.getClass(), 'stoppingOffsetsInitializer') + + if j_starting_initializer is not None: + self.assertIsNone(j_starting_initializer.get(cluster_metadata)) + if j_stopping_initializer is not None: + self.assertIsNone(j_stopping_initializer.get(cluster_metadata)) + + def _check_cluster_offsets(self, cluster_metadata, expected_starting_class_names, + expected_stopping_class_names): + j_starting_initializer = get_field_value( + cluster_metadata, 'startingOffsetsInitializer') + j_stopping_initializer = get_field_value( + cluster_metadata, 'stoppingOffsetsInitializer') + + self.assertIsNotNone(j_starting_initializer) + self.assertIsNotNone(j_stopping_initializer) + + initializer_class_name = \ + 'org.apache.flink.connector.kafka.source.enumerator.initializer.{}' + self.assertIn( + j_starting_initializer.getClass().getCanonicalName(), + {initializer_class_name.format(class_name) for class_name in expected_starting_class_names}) + self.assertIn( + j_stopping_initializer.getClass().getCanonicalName(), + {initializer_class_name.format(class_name) for class_name in expected_stopping_class_names}) From e380a3da4ef3cd1ed682df710da08c5a4c3e9777 Mon Sep 17 00:00:00 2001 From: bowenli86 Date: Thu, 18 Jun 2026 16:39:12 -0700 Subject: [PATCH 3/5] [FLINK-38918][PyFlink] Align dynamic Kafka wrapper setup Generated-by: Codex GPT-5 --- flink-python/pom.xml | 2 +- flink-python/pyflink/datastream/connectors/__init__.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-python/pom.xml b/flink-python/pom.xml index 0f5e5aa07a01cf..28390abe8f6a29 100644 --- a/flink-python/pom.xml +++ b/flink-python/pom.xml @@ -293,7 +293,7 @@ under the License. org.apache.flink flink-sql-connector-kafka - 4.0.1-2.0 + 5.0.0-2.2 test diff --git a/flink-python/pyflink/datastream/connectors/__init__.py b/flink-python/pyflink/datastream/connectors/__init__.py index 4f2e1036b20a73..bd0f748cb959ba 100644 --- a/flink-python/pyflink/datastream/connectors/__init__.py +++ b/flink-python/pyflink/datastream/connectors/__init__.py @@ -46,6 +46,7 @@ def _install(): setattr(connectors, 'Semantic', kafka.Semantic) # dynamic kafka from pyflink.datastream.connectors import dynamic_kafka + setattr(connectors, 'ClusterMetadata', dynamic_kafka.ClusterMetadata) setattr(connectors, 'DynamicKafkaSource', dynamic_kafka.DynamicKafkaSource) setattr(connectors, 'DynamicKafkaSourceBuilder', dynamic_kafka.DynamicKafkaSourceBuilder) setattr(connectors, 'KafkaMetadataService', dynamic_kafka.KafkaMetadataService) From a7244becd7e7a5e6d7de5730bdc1043d3e21e327 Mon Sep 17 00:00:00 2001 From: bowenli86 Date: Thu, 18 Jun 2026 17:01:14 -0700 Subject: [PATCH 4/5] [FLINK-38918][PyFlink] Cover partial per cluster offsets Generated-by: Codex GPT-5 --- .../connectors/tests/test_dynamic_kafka.py | 146 +++++++++++++----- 1 file changed, 111 insertions(+), 35 deletions(-) diff --git a/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py b/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py index 53159123b85755..6b4f3c63500946 100644 --- a/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py +++ b/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py @@ -429,10 +429,10 @@ def test_single_cluster_metadata_service_with_offsets(self): self.assertIsNotNone(metadata_service._j_metadata_service) - self._check_cluster_offsets( + self._check_metadata_service_offsets( metadata_service._j_metadata_service, - {'EarliestOffsetsInitializer', 'ReaderHandledOffsetsInitializer'}, - {'LatestOffsetsInitializer', 'ReaderHandledOffsetsInitializer'}) + starting_offset, + stopping_offset) def test_single_cluster_metadata_service_default_offsets(self): """ @@ -441,7 +441,43 @@ def test_single_cluster_metadata_service_default_offsets(self): """ metadata_service = self._build_metadata_service() - self._check_default_cluster_offsets(metadata_service._j_metadata_service) + self._check_metadata_service_offsets(metadata_service._j_metadata_service, None, None) + + def test_single_cluster_metadata_service_starting_offsets_only(self): + """ + Test that SingleClusterTopicMetadataService allows a per-cluster starting + offsets initializer without a stopping offsets initializer. + """ + starting_offset = KafkaOffsetsInitializer.earliest() + + metadata_service = SingleClusterTopicMetadataService( + 'test-cluster', + {'bootstrap.servers': 'localhost:9092'}, + starting_offset + ) + + self._check_metadata_service_offsets( + metadata_service._j_metadata_service, + starting_offset, + None) + + def test_single_cluster_metadata_service_stopping_offsets_only(self): + """ + Test that SingleClusterTopicMetadataService allows a per-cluster stopping + offsets initializer without a starting offsets initializer. + """ + stopping_offset = KafkaOffsetsInitializer.latest() + + metadata_service = SingleClusterTopicMetadataService( + 'test-cluster', + {'bootstrap.servers': 'localhost:9092'}, + stopping_offsets_initializer=stopping_offset + ) + + self._check_metadata_service_offsets( + metadata_service._j_metadata_service, + None, + stopping_offset) def test_cluster_metadata_with_offsets(self): """ @@ -462,8 +498,8 @@ def test_cluster_metadata_with_offsets(self): self._check_cluster_offsets( cluster_metadata._j_cluster_metadata, - {'EarliestOffsetsInitializer', 'ReaderHandledOffsetsInitializer'}, - {'LatestOffsetsInitializer', 'ReaderHandledOffsetsInitializer'}) + starting_offset, + stopping_offset) def test_cluster_metadata_default_offsets(self): """ @@ -473,32 +509,72 @@ def test_cluster_metadata_default_offsets(self): cluster_metadata = ClusterMetadata( {'test-topic'}, {'bootstrap.servers': 'localhost:9092'}) - self._check_default_cluster_offsets(cluster_metadata._j_cluster_metadata) - - def _check_default_cluster_offsets(self, cluster_metadata): - j_starting_initializer = get_field(cluster_metadata.getClass(), 'startingOffsetsInitializer') - j_stopping_initializer = get_field(cluster_metadata.getClass(), 'stoppingOffsetsInitializer') - - if j_starting_initializer is not None: - self.assertIsNone(j_starting_initializer.get(cluster_metadata)) - if j_stopping_initializer is not None: - self.assertIsNone(j_stopping_initializer.get(cluster_metadata)) - - def _check_cluster_offsets(self, cluster_metadata, expected_starting_class_names, - expected_stopping_class_names): - j_starting_initializer = get_field_value( - cluster_metadata, 'startingOffsetsInitializer') - j_stopping_initializer = get_field_value( - cluster_metadata, 'stoppingOffsetsInitializer') - - self.assertIsNotNone(j_starting_initializer) - self.assertIsNotNone(j_stopping_initializer) - - initializer_class_name = \ - 'org.apache.flink.connector.kafka.source.enumerator.initializer.{}' - self.assertIn( - j_starting_initializer.getClass().getCanonicalName(), - {initializer_class_name.format(class_name) for class_name in expected_starting_class_names}) - self.assertIn( - j_stopping_initializer.getClass().getCanonicalName(), - {initializer_class_name.format(class_name) for class_name in expected_stopping_class_names}) + self._check_cluster_offsets(cluster_metadata._j_cluster_metadata, None, None) + + def test_cluster_metadata_starting_offsets_only(self): + """ + Test that ClusterMetadata allows a per-cluster starting offsets initializer + without a stopping offsets initializer. + """ + starting_offset = KafkaOffsetsInitializer.earliest() + + cluster_metadata = ClusterMetadata( + {'test-topic'}, + {'bootstrap.servers': 'localhost:9092'}, + starting_offset + ) + + self._check_cluster_offsets( + cluster_metadata._j_cluster_metadata, + starting_offset, + None) + + def test_cluster_metadata_stopping_offsets_only(self): + """ + Test that ClusterMetadata allows a per-cluster stopping offsets initializer + without a starting offsets initializer. + """ + stopping_offset = KafkaOffsetsInitializer.latest() + + cluster_metadata = ClusterMetadata( + {'test-topic'}, + {'bootstrap.servers': 'localhost:9092'}, + stopping_offsets_initializer=stopping_offset + ) + + self._check_cluster_offsets( + cluster_metadata._j_cluster_metadata, + None, + stopping_offset) + + def _check_metadata_service_offsets(self, metadata_service, expected_starting_offset, + expected_stopping_offset): + j_starting_initializer = get_field_value(metadata_service, 'startingOffsetsInitializer') + j_stopping_initializer = get_field_value(metadata_service, 'stoppingOffsetsInitializer') + self._check_offsets( + j_starting_initializer, + j_stopping_initializer, + expected_starting_offset, + expected_stopping_offset) + + def _check_cluster_offsets(self, cluster_metadata, expected_starting_offset, + expected_stopping_offset): + j_starting_initializer = cluster_metadata.getStartingOffsetsInitializer() + j_stopping_initializer = cluster_metadata.getStoppingOffsetsInitializer() + self._check_offsets( + j_starting_initializer, + j_stopping_initializer, + expected_starting_offset, + expected_stopping_offset) + + def _check_offsets(self, j_starting_initializer, j_stopping_initializer, + expected_starting_offset, expected_stopping_offset): + if expected_starting_offset is None: + self.assertIsNone(j_starting_initializer) + else: + self.assertTrue(expected_starting_offset._j_initializer.equals(j_starting_initializer)) + + if expected_stopping_offset is None: + self.assertIsNone(j_stopping_initializer) + else: + self.assertTrue(expected_stopping_offset._j_initializer.equals(j_stopping_initializer)) From b7b3cc7b863a3057197a6652757ec70d48d5c83f Mon Sep 17 00:00:00 2001 From: bowenli86 Date: Tue, 23 Jun 2026 14:28:45 -0700 Subject: [PATCH 5/5] [FLINK-38918][PyFlink] Remove unused cluster metadata wrapper Generated-by: Codex GPT-5 --- .../pyflink/datastream/connectors/__init__.py | 1 - .../datastream/connectors/dynamic_kafka.py | 36 --------- .../connectors/tests/test_dynamic_kafka.py | 80 +------------------ 3 files changed, 1 insertion(+), 116 deletions(-) diff --git a/flink-python/pyflink/datastream/connectors/__init__.py b/flink-python/pyflink/datastream/connectors/__init__.py index bd0f748cb959ba..4f2e1036b20a73 100644 --- a/flink-python/pyflink/datastream/connectors/__init__.py +++ b/flink-python/pyflink/datastream/connectors/__init__.py @@ -46,7 +46,6 @@ def _install(): setattr(connectors, 'Semantic', kafka.Semantic) # dynamic kafka from pyflink.datastream.connectors import dynamic_kafka - setattr(connectors, 'ClusterMetadata', dynamic_kafka.ClusterMetadata) setattr(connectors, 'DynamicKafkaSource', dynamic_kafka.DynamicKafkaSource) setattr(connectors, 'DynamicKafkaSourceBuilder', dynamic_kafka.DynamicKafkaSourceBuilder) setattr(connectors, 'KafkaMetadataService', dynamic_kafka.KafkaMetadataService) diff --git a/flink-python/pyflink/datastream/connectors/dynamic_kafka.py b/flink-python/pyflink/datastream/connectors/dynamic_kafka.py index 2b38bf591bb856..9845b66acd306c 100644 --- a/flink-python/pyflink/datastream/connectors/dynamic_kafka.py +++ b/flink-python/pyflink/datastream/connectors/dynamic_kafka.py @@ -25,7 +25,6 @@ from pyflink.java_gateway import get_gateway __all__ = [ - 'ClusterMetadata', 'DynamicKafkaSource', 'DynamicKafkaSourceBuilder', 'KafkaMetadataService', @@ -48,41 +47,6 @@ def _has_cluster_offsets( return starting_offsets_initializer is not None or stopping_offsets_initializer is not None -class ClusterMetadata(object): - """ - Wrapper for Java ClusterMetadata. - """ - - def __init__(self, topics: Set[str], properties: Dict[str, str], - starting_offsets_initializer: Optional[KafkaOffsetsInitializer] = None, - stopping_offsets_initializer: Optional[KafkaOffsetsInitializer] = None): - """ - :param topics: The topics belonging to a cluster. - :param properties: The properties to access a cluster. - :param starting_offsets_initializer: Optional starting offsets initializer for the cluster. - :param stopping_offsets_initializer: Optional stopping offsets initializer for the cluster. - """ - gateway = get_gateway() - j_topics = gateway.jvm.java.util.HashSet() - for topic in topics: - j_topics.add(topic) - - j_properties = gateway.jvm.java.util.Properties() - for key, value in properties.items(): - j_properties.setProperty(key, value) - - j_cluster_metadata = gateway.jvm.org.apache.flink.connector.kafka.dynamic.metadata \ - .ClusterMetadata - if _has_cluster_offsets(starting_offsets_initializer, stopping_offsets_initializer): - self._j_cluster_metadata = j_cluster_metadata( - j_topics, - j_properties, - _to_j_offsets_initializer(starting_offsets_initializer), - _to_j_offsets_initializer(stopping_offsets_initializer)) - else: - self._j_cluster_metadata = j_cluster_metadata(j_topics, j_properties) - - class KafkaMetadataService(object): """ Base class for Kafka metadata service wrappers. diff --git a/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py b/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py index 6b4f3c63500946..8e545c1f9db816 100644 --- a/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py +++ b/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py @@ -19,7 +19,7 @@ from typing import Dict from pyflink.common.serialization import SimpleStringSchema from pyflink.common.watermark_strategy import WatermarkStrategy -from pyflink.datastream.connectors.dynamic_kafka import ClusterMetadata, DynamicKafkaSource, \ +from pyflink.datastream.connectors.dynamic_kafka import DynamicKafkaSource, \ KafkaRecordDeserializationSchema, KafkaStreamSetSubscriber, SingleClusterTopicMetadataService, \ StreamPatternSubscriber from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer, KafkaOffsetResetStrategy, \ @@ -479,74 +479,6 @@ def test_single_cluster_metadata_service_stopping_offsets_only(self): None, stopping_offset) - def test_cluster_metadata_with_offsets(self): - """ - Test that ClusterMetadata correctly forwards optional offsets initializers - to the underlying Java object. - """ - starting_offset = KafkaOffsetsInitializer.earliest() - stopping_offset = KafkaOffsetsInitializer.latest() - - cluster_metadata = ClusterMetadata( - {'test-topic'}, - {'bootstrap.servers': 'localhost:9092'}, - starting_offset, - stopping_offset - ) - - self.assertIsNotNone(cluster_metadata._j_cluster_metadata) - - self._check_cluster_offsets( - cluster_metadata._j_cluster_metadata, - starting_offset, - stopping_offset) - - def test_cluster_metadata_default_offsets(self): - """ - Test that ClusterMetadata keeps using the legacy Java constructor when offsets - are not explicitly provided. - """ - cluster_metadata = ClusterMetadata( - {'test-topic'}, {'bootstrap.servers': 'localhost:9092'}) - - self._check_cluster_offsets(cluster_metadata._j_cluster_metadata, None, None) - - def test_cluster_metadata_starting_offsets_only(self): - """ - Test that ClusterMetadata allows a per-cluster starting offsets initializer - without a stopping offsets initializer. - """ - starting_offset = KafkaOffsetsInitializer.earliest() - - cluster_metadata = ClusterMetadata( - {'test-topic'}, - {'bootstrap.servers': 'localhost:9092'}, - starting_offset - ) - - self._check_cluster_offsets( - cluster_metadata._j_cluster_metadata, - starting_offset, - None) - - def test_cluster_metadata_stopping_offsets_only(self): - """ - Test that ClusterMetadata allows a per-cluster stopping offsets initializer - without a starting offsets initializer. - """ - stopping_offset = KafkaOffsetsInitializer.latest() - - cluster_metadata = ClusterMetadata( - {'test-topic'}, - {'bootstrap.servers': 'localhost:9092'}, - stopping_offsets_initializer=stopping_offset - ) - - self._check_cluster_offsets( - cluster_metadata._j_cluster_metadata, - None, - stopping_offset) - def _check_metadata_service_offsets(self, metadata_service, expected_starting_offset, expected_stopping_offset): j_starting_initializer = get_field_value(metadata_service, 'startingOffsetsInitializer') @@ -557,16 +489,6 @@ def _check_metadata_service_offsets(self, metadata_service, expected_starting_of expected_starting_offset, expected_stopping_offset) - def _check_cluster_offsets(self, cluster_metadata, expected_starting_offset, - expected_stopping_offset): - j_starting_initializer = cluster_metadata.getStartingOffsetsInitializer() - j_stopping_initializer = cluster_metadata.getStoppingOffsetsInitializer() - self._check_offsets( - j_starting_initializer, - j_stopping_initializer, - expected_starting_offset, - expected_stopping_offset) - def _check_offsets(self, j_starting_initializer, j_stopping_initializer, expected_starting_offset, expected_stopping_offset): if expected_starting_offset is None: