diff --git a/flink-python/pom.xml b/flink-python/pom.xml index 0f5e5aa07a01c..28390abe8f6a2 100644 --- a/flink-python/pom.xml +++ b/flink-python/pom.xml @@ -293,7 +293,7 @@ under the License. org.apache.flink flink-sql-connector-kafka - 4.0.1-2.0 + 5.0.0-2.2 test diff --git a/flink-python/pyflink/datastream/connectors/dynamic_kafka.py b/flink-python/pyflink/datastream/connectors/dynamic_kafka.py index 7473077a08eaa..9845b66acd306 100644 --- a/flink-python/pyflink/datastream/connectors/dynamic_kafka.py +++ b/flink-python/pyflink/datastream/connectors/dynamic_kafka.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -from typing import Dict, Set, Union +from typing import Dict, Optional, Set, Union from py4j.java_gateway import JavaObject @@ -36,6 +36,17 @@ ] +def _to_j_offsets_initializer( + offsets_initializer: Optional[KafkaOffsetsInitializer]) -> Optional[JavaObject]: + return offsets_initializer._j_initializer if offsets_initializer is not None else None + + +def _has_cluster_offsets( + starting_offsets_initializer: Optional[KafkaOffsetsInitializer], + stopping_offsets_initializer: Optional[KafkaOffsetsInitializer]) -> bool: + return starting_offsets_initializer is not None or stopping_offsets_initializer is not None + + class KafkaMetadataService(object): """ Base class for Kafka metadata service wrappers. @@ -50,13 +61,30 @@ class SingleClusterTopicMetadataService(KafkaMetadataService): A KafkaMetadataService backed by a single Kafka cluster where stream ids map to topics. """ - def __init__(self, kafka_cluster_id: str, properties: Dict[str, str]): + def __init__(self, kafka_cluster_id: str, properties: Dict[str, str], + starting_offsets_initializer: Optional[KafkaOffsetsInitializer] = None, + stopping_offsets_initializer: Optional[KafkaOffsetsInitializer] = None): + """ + :param kafka_cluster_id: The ID of the Kafka cluster. + :param properties: The properties to access the cluster. + :param starting_offsets_initializer: Optional starting offsets initializer for this cluster. + :param stopping_offsets_initializer: Optional stopping offsets initializer for this cluster. + """ gateway = get_gateway() j_properties = gateway.jvm.java.util.Properties() for key, value in properties.items(): j_properties.setProperty(key, value) - j_service = gateway.jvm.org.apache.flink.connector.kafka.dynamic.metadata \ - .SingleClusterTopicMetadataService(kafka_cluster_id, j_properties) + + j_metadata_service = gateway.jvm.org.apache.flink.connector.kafka.dynamic.metadata \ + .SingleClusterTopicMetadataService + if _has_cluster_offsets(starting_offsets_initializer, stopping_offsets_initializer): + j_service = j_metadata_service( + kafka_cluster_id, + j_properties, + _to_j_offsets_initializer(starting_offsets_initializer), + _to_j_offsets_initializer(stopping_offsets_initializer)) + else: + j_service = j_metadata_service(kafka_cluster_id, j_properties) super().__init__(j_service) diff --git a/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py b/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py index 448289f2727f3..8e545c1f9db81 100644 --- a/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py +++ b/flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py @@ -20,8 +20,8 @@ from pyflink.common.serialization import SimpleStringSchema from pyflink.common.watermark_strategy import WatermarkStrategy from pyflink.datastream.connectors.dynamic_kafka import DynamicKafkaSource, \ - KafkaRecordDeserializationSchema, KafkaStreamSetSubscriber, StreamPatternSubscriber, \ - SingleClusterTopicMetadataService + KafkaRecordDeserializationSchema, KafkaStreamSetSubscriber, SingleClusterTopicMetadataService, \ + StreamPatternSubscriber from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer, KafkaOffsetResetStrategy, \ KafkaTopicPartition from pyflink.java_gateway import get_gateway @@ -411,3 +411,92 @@ def _check_specified_offsets_initializer(self, self.assertTrue( offset_reset_strategy.equals(reset_strategy._to_j_offset_reset_strategy()) ) + + def test_single_cluster_metadata_service_with_offsets(self): + """ + Test the new constructor parameters for SingleClusterTopicMetadataService + to ensure offsets are correctly forwarded to the underlying Java object. + """ + starting_offset = KafkaOffsetsInitializer.earliest() + stopping_offset = KafkaOffsetsInitializer.latest() + + metadata_service = SingleClusterTopicMetadataService( + 'test-cluster', + {'bootstrap.servers': 'localhost:9092'}, + starting_offset, + stopping_offset + ) + + self.assertIsNotNone(metadata_service._j_metadata_service) + + self._check_metadata_service_offsets( + metadata_service._j_metadata_service, + starting_offset, + stopping_offset) + + def test_single_cluster_metadata_service_default_offsets(self): + """ + Test that SingleClusterTopicMetadataService maintains backward compatibility + and leaves offsets unset when not explicitly provided. + """ + metadata_service = self._build_metadata_service() + + self._check_metadata_service_offsets(metadata_service._j_metadata_service, None, None) + + def test_single_cluster_metadata_service_starting_offsets_only(self): + """ + Test that SingleClusterTopicMetadataService allows a per-cluster starting + offsets initializer without a stopping offsets initializer. + """ + starting_offset = KafkaOffsetsInitializer.earliest() + + metadata_service = SingleClusterTopicMetadataService( + 'test-cluster', + {'bootstrap.servers': 'localhost:9092'}, + starting_offset + ) + + self._check_metadata_service_offsets( + metadata_service._j_metadata_service, + starting_offset, + None) + + def test_single_cluster_metadata_service_stopping_offsets_only(self): + """ + Test that SingleClusterTopicMetadataService allows a per-cluster stopping + offsets initializer without a starting offsets initializer. + """ + stopping_offset = KafkaOffsetsInitializer.latest() + + metadata_service = SingleClusterTopicMetadataService( + 'test-cluster', + {'bootstrap.servers': 'localhost:9092'}, + stopping_offsets_initializer=stopping_offset + ) + + self._check_metadata_service_offsets( + metadata_service._j_metadata_service, + None, + stopping_offset) + + def _check_metadata_service_offsets(self, metadata_service, expected_starting_offset, + expected_stopping_offset): + j_starting_initializer = get_field_value(metadata_service, 'startingOffsetsInitializer') + j_stopping_initializer = get_field_value(metadata_service, 'stoppingOffsetsInitializer') + self._check_offsets( + j_starting_initializer, + j_stopping_initializer, + expected_starting_offset, + expected_stopping_offset) + + def _check_offsets(self, j_starting_initializer, j_stopping_initializer, + expected_starting_offset, expected_stopping_offset): + if expected_starting_offset is None: + self.assertIsNone(j_starting_initializer) + else: + self.assertTrue(expected_starting_offset._j_initializer.equals(j_starting_initializer)) + + if expected_stopping_offset is None: + self.assertIsNone(j_stopping_initializer) + else: + self.assertTrue(expected_stopping_offset._j_initializer.equals(j_stopping_initializer))