Skip to content

Commit 554305b

Browse files
authored
Add getControllerId (#554)
1 parent 4b085d2 commit 554305b

16 files changed

+375
-69
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ jobs:
1212
tests:
1313
name: 'Tests'
1414
strategy:
15+
fail-fast: false
1516
matrix:
1617
include:
1718
# Latest librdkafka 2.x with memcheck

.github/workflows/test/start-kafka.sh

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
#!/bin/sh
22

33
docker network create kafka_network
4-
docker pull wurstmeister/zookeeper:3.4.6
5-
docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:3.4.6
6-
docker pull wurstmeister/kafka:2.13-2.6.0
7-
docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:2.13-2.6.0
4+
docker pull wurstmeister/zookeeper:latest
5+
docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:latest
6+
docker pull wurstmeister/kafka:latest
7+
docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:latest
88
printf "\n127.0.0.1 kafka\n"|sudo tee /etc/hosts >/dev/null
99

1010
echo "Waiting for Kafka to be ready"

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ modules
3333
package.xml
3434
rdkafka-*.tgz
3535
run-tests.php
36+
gen_stub.php
3637
tests/*/*.diff
3738
tests/*/*.exp
3839
tests/*/*.log

conf_arginfo.h

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,38 @@
11
/* This is a generated file, edit the .stub.php file instead.
2-
* Stub hash: 86e8e9fcd235f3affc4ef30ca0d96395abcad13f */
2+
* Stub hash: 86934f54199c0af37cadfcedafeaffb569e33beb */
33

44
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf___construct, 0, 0, 0)
55
ZEND_END_ARG_INFO()
66

7+
#if (PHP_VERSION_ID >= 80100)
78
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_dump, 0, 0, IS_ARRAY, 0)
9+
#else
10+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_dump, 0, 0, 0)
11+
#endif
812
ZEND_END_ARG_INFO()
913

14+
#if (PHP_VERSION_ID >= 80100)
1015
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_set, 0, 2, IS_VOID, 0)
16+
#else
17+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_set, 0, 0, 2)
18+
#endif
1119
ZEND_ARG_TYPE_INFO(0, name, IS_STRING, 0)
1220
ZEND_ARG_TYPE_INFO(0, value, IS_STRING, 0)
1321
ZEND_END_ARG_INFO()
1422

23+
#if (PHP_VERSION_ID >= 80100)
1524
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setDefaultTopicConf, 0, 1, IS_VOID, 0)
25+
#else
26+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setDefaultTopicConf, 0, 0, 1)
27+
#endif
1628
ZEND_ARG_OBJ_INFO(0, topic_conf, RdKafka\\TopicConf, 0)
1729
ZEND_END_ARG_INFO()
1830

31+
#if (PHP_VERSION_ID >= 80100)
1932
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setErrorCb, 0, 1, IS_VOID, 0)
33+
#else
34+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setErrorCb, 0, 0, 1)
35+
#endif
2036
ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0)
2137
ZEND_END_ARG_INFO()
2238

@@ -32,8 +48,14 @@ ZEND_END_ARG_INFO()
3248

3349
#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb
3450

35-
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
36-
#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb
51+
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
52+
#if (PHP_VERSION_ID >= 80100)
53+
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 1, IS_VOID, 0)
54+
#else
55+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 0, 1)
56+
#endif
57+
ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0)
58+
ZEND_END_ARG_INFO()
3759
#endif
3860

3961
#define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct
@@ -42,11 +64,14 @@ ZEND_END_ARG_INFO()
4264

4365
#define arginfo_class_RdKafka_TopicConf_set arginfo_class_RdKafka_Conf_set
4466

67+
#if (PHP_VERSION_ID >= 80100)
4568
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_TopicConf_setPartitioner, 0, 1, IS_VOID, 0)
69+
#else
70+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_TopicConf_setPartitioner, 0, 0, 1)
71+
#endif
4672
ZEND_ARG_TYPE_INFO(0, partitioner, IS_LONG, 0)
4773
ZEND_END_ARG_INFO()
4874

49-
5075
ZEND_METHOD(RdKafka_Conf, __construct);
5176
ZEND_METHOD(RdKafka_Conf, dump);
5277
ZEND_METHOD(RdKafka_Conf, set);
@@ -58,15 +83,12 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
5883
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
5984
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
6085
ZEND_METHOD(RdKafka_Conf, setLogCb);
61-
62-
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
86+
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
6387
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
6488
#endif
65-
6689
ZEND_METHOD(RdKafka_TopicConf, __construct);
6790
ZEND_METHOD(RdKafka_TopicConf, setPartitioner);
6891

69-
7092
static const zend_function_entry class_RdKafka_Conf_methods[] = {
7193
ZEND_ME(RdKafka_Conf, __construct, arginfo_class_RdKafka_Conf___construct, ZEND_ACC_PUBLIC)
7294
ZEND_ME(RdKafka_Conf, dump, arginfo_class_RdKafka_Conf_dump, ZEND_ACC_PUBLIC)
@@ -79,17 +101,24 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
79101
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
80102
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
81103
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
82-
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
104+
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
83105
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
84-
#endif
106+
#endif
85107
ZEND_FE_END
86108
};
87109

88-
89110
static const zend_function_entry class_RdKafka_TopicConf_methods[] = {
90111
ZEND_ME(RdKafka_TopicConf, __construct, arginfo_class_RdKafka_TopicConf___construct, ZEND_ACC_PUBLIC)
91-
ZEND_MALIAS(RdKafka_Conf, dump, dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC)
92-
ZEND_MALIAS(RdKafka_Conf, set, set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC)
112+
#if (PHP_VERSION_ID >= 80400)
113+
ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC, NULL, NULL)
114+
#else
115+
ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC)
116+
#endif
117+
#if (PHP_VERSION_ID >= 80400)
118+
ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC, NULL, NULL)
119+
#else
120+
ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC)
121+
#endif
93122
ZEND_ME(RdKafka_TopicConf, setPartitioner, arginfo_class_RdKafka_TopicConf_setPartitioner, ZEND_ACC_PUBLIC)
94123
ZEND_FE_END
95124
};

conf_legacy_arginfo.h

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* This is a generated file, edit the .stub.php file instead.
2-
* Stub hash: 86e8e9fcd235f3affc4ef30ca0d96395abcad13f */
2+
* Stub hash: 86934f54199c0af37cadfcedafeaffb569e33beb */
33

44
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf___construct, 0, 0, 0)
55
ZEND_END_ARG_INFO()
@@ -32,8 +32,10 @@ ZEND_END_ARG_INFO()
3232

3333
#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb
3434

35-
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
36-
#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb
35+
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
36+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 0, 1)
37+
ZEND_ARG_INFO(0, callback)
38+
ZEND_END_ARG_INFO()
3739
#endif
3840

3941
#define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct
@@ -46,7 +48,6 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_TopicConf_setPartitioner, 0, 0, 1)
4648
ZEND_ARG_INFO(0, partitioner)
4749
ZEND_END_ARG_INFO()
4850

49-
5051
ZEND_METHOD(RdKafka_Conf, __construct);
5152
ZEND_METHOD(RdKafka_Conf, dump);
5253
ZEND_METHOD(RdKafka_Conf, set);
@@ -58,13 +59,12 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
5859
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
5960
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
6061
ZEND_METHOD(RdKafka_Conf, setLogCb);
61-
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
62+
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
6263
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
6364
#endif
6465
ZEND_METHOD(RdKafka_TopicConf, __construct);
6566
ZEND_METHOD(RdKafka_TopicConf, setPartitioner);
6667

67-
6868
static const zend_function_entry class_RdKafka_Conf_methods[] = {
6969
ZEND_ME(RdKafka_Conf, __construct, arginfo_class_RdKafka_Conf___construct, ZEND_ACC_PUBLIC)
7070
ZEND_ME(RdKafka_Conf, dump, arginfo_class_RdKafka_Conf_dump, ZEND_ACC_PUBLIC)
@@ -77,17 +77,24 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
7777
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
7878
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
7979
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
80-
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
80+
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
8181
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
82-
#endif
82+
#endif
8383
ZEND_FE_END
8484
};
8585

86-
8786
static const zend_function_entry class_RdKafka_TopicConf_methods[] = {
8887
ZEND_ME(RdKafka_TopicConf, __construct, arginfo_class_RdKafka_TopicConf___construct, ZEND_ACC_PUBLIC)
89-
ZEND_MALIAS(RdKafka_Conf, dump, dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC)
90-
ZEND_MALIAS(RdKafka_Conf, set, set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC)
88+
#if (PHP_VERSION_ID >= 80400)
89+
ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC, NULL, NULL)
90+
#else
91+
ZEND_RAW_FENTRY("dump", zim_RdKafka_Conf_dump, arginfo_class_RdKafka_TopicConf_dump, ZEND_ACC_PUBLIC)
92+
#endif
93+
#if (PHP_VERSION_ID >= 80400)
94+
ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC, NULL, NULL)
95+
#else
96+
ZEND_RAW_FENTRY("set", zim_RdKafka_Conf_set, arginfo_class_RdKafka_TopicConf_set, ZEND_ACC_PUBLIC)
97+
#endif
9198
ZEND_ME(RdKafka_TopicConf, setPartitioner, arginfo_class_RdKafka_TopicConf_setPartitioner, ZEND_ACC_PUBLIC)
9299
ZEND_FE_END
93100
};

config.m4

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,16 @@ if test "$PHP_RDKAFKA" != "no"; then
7171
AC_MSG_WARN([purge is not available])
7272
])
7373

74+
AC_CHECK_LIB($LIBNAME,[rd_kafka_controllerid],[
75+
#if RD_KAFKA_VERSION >= 0x010000ff
76+
AC_DEFINE(HAS_RD_KAFKA_CONTROLLERID,1,[ ])
77+
#else
78+
AC_MSG_WARN([controllerid is broken on 0.11.x])
79+
#endif
80+
],[
81+
AC_MSG_WARN([controllerid is not available])
82+
])
83+
7484
AC_CHECK_LIB($LIBNAME,[rd_kafka_init_transactions],[
7585
AC_DEFINE(HAS_RD_KAFKA_TRANSACTIONS,1,[ ])
7686
SOURCES="$SOURCES kafka_error_exception.c"

kafka_consumer.c

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,7 @@ PHP_METHOD(RdKafka_KafkaConsumer, close)
542542
}
543543
/* }}} */
544544

545-
/* {{{ proto Metadata RdKafka\KafkaConsumer::getMetadata(bool all_topics, RdKafka\Topic only_topic, int timeout_ms)
545+
/* {{{ proto RdKafka\Metadata RdKafka\KafkaConsumer::getMetadata(bool $all_topics, RdKafka\Topic $only_topic, int $timeout_ms)
546546
Request Metadata from broker */
547547
PHP_METHOD(RdKafka_KafkaConsumer, getMetadata)
548548
{
@@ -581,6 +581,28 @@ PHP_METHOD(RdKafka_KafkaConsumer, getMetadata)
581581
}
582582
/* }}} */
583583

584+
#ifdef HAS_RD_KAFKA_CONTROLLERID
585+
/* {{{ proto int RdKafka\KafkaConsumer::getControllerId(int $timeout_ms)
586+
Returns the current ControllerId (controller broker id) as reported in broker metadata */
587+
PHP_METHOD(RdKafka_KafkaConsumer, getControllerId)
588+
{
589+
object_intern *intern;
590+
zend_long timeout;
591+
592+
if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &timeout) == FAILURE) {
593+
return;
594+
}
595+
596+
intern = get_object(getThis());
597+
if (!intern) {
598+
return;
599+
}
600+
601+
RETURN_LONG(rd_kafka_controllerid(intern->rk, timeout));
602+
}
603+
/* }}} */
604+
#endif
605+
584606
/* {{{ proto RdKafka\KafkaConsumerTopic RdKafka\KafkaConsumer::newTopic(string $topic)
585607
Returns a RdKafka\KafkaConsumerTopic object */
586608
PHP_METHOD(RdKafka_KafkaConsumer, newTopic)

kafka_consumer.stub.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ public function unsubscribe(): void {}
5656
/** @tentative-return-type */
5757
public function getMetadata(bool $all_topics, ?Topic $only_topic, int $timeout_ms): Metadata {}
5858

59+
#ifdef HAS_RD_KAFKA_CONTROLLERID
60+
/** @tentative-return-type */
61+
public function getControllerId(int $timeout_ms): int {}
62+
#endif
63+
5964
/** @tentative-return-type */
6065
public function newTopic(string $topic_name, ?TopicConf $topic_conf = null): KafkaConsumerTopic {}
6166

0 commit comments

Comments
 (0)