Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flink-python/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ under the License.
<!-- Indirectly accessed in pyflink_gateway_server -->
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>4.0.1-2.0</version>
<version>5.0.0-2.2</version>
<scope>test</scope>
</dependency>

Expand Down
36 changes: 32 additions & 4 deletions flink-python/pyflink/datastream/connectors/dynamic_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from typing import Dict, Set, Union
from typing import Dict, Optional, Set, Union

from py4j.java_gateway import JavaObject

Expand All @@ -36,6 +36,17 @@
]


def _to_j_offsets_initializer(
offsets_initializer: Optional[KafkaOffsetsInitializer]) -> Optional[JavaObject]:
return offsets_initializer._j_initializer if offsets_initializer is not None else None


def _has_cluster_offsets(
starting_offsets_initializer: Optional[KafkaOffsetsInitializer],
stopping_offsets_initializer: Optional[KafkaOffsetsInitializer]) -> bool:
return starting_offsets_initializer is not None or stopping_offsets_initializer is not None


class KafkaMetadataService(object):
"""
Base class for Kafka metadata service wrappers.
Expand All @@ -50,13 +61,30 @@ class SingleClusterTopicMetadataService(KafkaMetadataService):
A KafkaMetadataService backed by a single Kafka cluster where stream ids map to topics.
"""

def __init__(self, kafka_cluster_id: str, properties: Dict[str, str]):
def __init__(self, kafka_cluster_id: str, properties: Dict[str, str],
starting_offsets_initializer: Optional[KafkaOffsetsInitializer] = None,
stopping_offsets_initializer: Optional[KafkaOffsetsInitializer] = None):
"""
:param kafka_cluster_id: The ID of the Kafka cluster.
:param properties: The properties to access the cluster.
:param starting_offsets_initializer: Optional starting offsets initializer for this cluster.
:param stopping_offsets_initializer: Optional stopping offsets initializer for this cluster.
"""
gateway = get_gateway()
j_properties = gateway.jvm.java.util.Properties()
for key, value in properties.items():
j_properties.setProperty(key, value)
j_service = gateway.jvm.org.apache.flink.connector.kafka.dynamic.metadata \
.SingleClusterTopicMetadataService(kafka_cluster_id, j_properties)

j_metadata_service = gateway.jvm.org.apache.flink.connector.kafka.dynamic.metadata \
Comment thread
bowenli86 marked this conversation as resolved.
.SingleClusterTopicMetadataService
if _has_cluster_offsets(starting_offsets_initializer, stopping_offsets_initializer):
j_service = j_metadata_service(
kafka_cluster_id,
j_properties,
_to_j_offsets_initializer(starting_offsets_initializer),
_to_j_offsets_initializer(stopping_offsets_initializer))
else:
j_service = j_metadata_service(kafka_cluster_id, j_properties)
super().__init__(j_service)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream.connectors.dynamic_kafka import DynamicKafkaSource, \
KafkaRecordDeserializationSchema, KafkaStreamSetSubscriber, StreamPatternSubscriber, \
SingleClusterTopicMetadataService
KafkaRecordDeserializationSchema, KafkaStreamSetSubscriber, SingleClusterTopicMetadataService, \
StreamPatternSubscriber
from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer, KafkaOffsetResetStrategy, \
KafkaTopicPartition
from pyflink.java_gateway import get_gateway
Expand Down Expand Up @@ -411,3 +411,92 @@ def _check_specified_offsets_initializer(self,
self.assertTrue(
offset_reset_strategy.equals(reset_strategy._to_j_offset_reset_strategy())
)

def test_single_cluster_metadata_service_with_offsets(self):
"""
Test the new constructor parameters for SingleClusterTopicMetadataService
to ensure offsets are correctly forwarded to the underlying Java object.
"""
starting_offset = KafkaOffsetsInitializer.earliest()
stopping_offset = KafkaOffsetsInitializer.latest()

metadata_service = SingleClusterTopicMetadataService(
'test-cluster',
{'bootstrap.servers': 'localhost:9092'},
starting_offset,
stopping_offset
)

self.assertIsNotNone(metadata_service._j_metadata_service)

self._check_metadata_service_offsets(
metadata_service._j_metadata_service,
starting_offset,
stopping_offset)

def test_single_cluster_metadata_service_default_offsets(self):
"""
Test that SingleClusterTopicMetadataService maintains backward compatibility
and leaves offsets unset when not explicitly provided.
"""
metadata_service = self._build_metadata_service()

self._check_metadata_service_offsets(metadata_service._j_metadata_service, None, None)

def test_single_cluster_metadata_service_starting_offsets_only(self):
"""
Test that SingleClusterTopicMetadataService allows a per-cluster starting
offsets initializer without a stopping offsets initializer.
"""
starting_offset = KafkaOffsetsInitializer.earliest()

metadata_service = SingleClusterTopicMetadataService(
'test-cluster',
{'bootstrap.servers': 'localhost:9092'},
starting_offset
)

self._check_metadata_service_offsets(
metadata_service._j_metadata_service,
starting_offset,
None)

def test_single_cluster_metadata_service_stopping_offsets_only(self):
"""
Test that SingleClusterTopicMetadataService allows a per-cluster stopping
offsets initializer without a starting offsets initializer.
"""
stopping_offset = KafkaOffsetsInitializer.latest()

metadata_service = SingleClusterTopicMetadataService(
'test-cluster',
{'bootstrap.servers': 'localhost:9092'},
stopping_offsets_initializer=stopping_offset
)

self._check_metadata_service_offsets(
metadata_service._j_metadata_service,
None,
stopping_offset)

def _check_metadata_service_offsets(self, metadata_service, expected_starting_offset,
expected_stopping_offset):
j_starting_initializer = get_field_value(metadata_service, 'startingOffsetsInitializer')
j_stopping_initializer = get_field_value(metadata_service, 'stoppingOffsetsInitializer')
self._check_offsets(
j_starting_initializer,
j_stopping_initializer,
expected_starting_offset,
expected_stopping_offset)

def _check_offsets(self, j_starting_initializer, j_stopping_initializer,
expected_starting_offset, expected_stopping_offset):
if expected_starting_offset is None:
self.assertIsNone(j_starting_initializer)
else:
self.assertTrue(expected_starting_offset._j_initializer.equals(j_starting_initializer))

if expected_stopping_offset is None:
self.assertIsNone(j_stopping_initializer)
else:
self.assertTrue(expected_stopping_offset._j_initializer.equals(j_stopping_initializer))