Skip to content

Commit 4b085d2

Browse files
authored
Add missing methods to support incremental rebalance (#541)
1 parent 9cafbba commit 4b085d2

File tree

5 files changed

+104
-1
lines changed

5 files changed

+104
-1
lines changed

config.m4

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ if test "$PHP_RDKAFKA" != "no"; then
1919
fi
2020
done
2121
fi
22-
22+
2323
if test -z "$RDKAFKA_DIR"; then
2424
AC_MSG_RESULT([not found])
2525
AC_MSG_ERROR([Please reinstall the rdkafka distribution])
@@ -90,6 +90,12 @@ if test "$PHP_RDKAFKA" != "no"; then
9090
AC_MSG_WARN([oauthbearer token refresh cb is not available])
9191
])
9292

93+
AC_CHECK_LIB($LIBNAME,[rd_kafka_incremental_assign, rd_kafka_incremental_unassign],[
94+
AC_DEFINE(HAS_RD_KAFKA_INCREMENTAL_ASSIGN,1,[ ])
95+
],[
96+
AC_MSG_WARN([no rd_kafka_incremental_(un)assign, incremental rebalance support will not be available])
97+
])
98+
9399
LDFLAGS="$ORIG_LDFLAGS"
94100
CPPFLAGS="$ORIG_CPPFLAGS"
95101

kafka_consumer.c

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,59 @@ PHP_METHOD(RdKafka_KafkaConsumer, assign)
212212
}
213213
/* }}} */
214214

215+
#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
216+
static void consumer_incremental_op(int assign, INTERNAL_FUNCTION_PARAMETERS) /* {{{ */
217+
{
218+
HashTable *htopars = NULL;
219+
220+
if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE || !htopars) {
221+
return;
222+
}
223+
224+
object_intern *intern = get_object(getThis());
225+
if (!intern) {
226+
return;
227+
}
228+
229+
rd_kafka_topic_partition_list_t *topics = array_arg_to_kafka_topic_partition_list(1, htopars);
230+
if (!topics) {
231+
return;
232+
}
233+
234+
rd_kafka_error_t *err;
235+
236+
if (assign) {
237+
err = rd_kafka_incremental_assign(intern->rk, topics);
238+
} else {
239+
err = rd_kafka_incremental_unassign(intern->rk, topics);
240+
}
241+
242+
rd_kafka_topic_partition_list_destroy(topics);
243+
244+
if (err) {
245+
zend_throw_exception(ce_kafka_exception, rd_kafka_error_string(err), 0);
246+
rd_kafka_error_destroy(err);
247+
}
248+
}
249+
/* }}} */
250+
251+
/* {{{ proto void RdKafka\KafkaConsumer::incrementalAssign(array $topics)
252+
Incremental assignment of partitions to consume */
253+
PHP_METHOD(RdKafka_KafkaConsumer, incrementalAssign)
254+
{
255+
consumer_incremental_op(1, INTERNAL_FUNCTION_PARAM_PASSTHRU);
256+
}
257+
/* }}} */
258+
259+
/* {{{ proto void RdKafka\KafkaConsumer::incrementalUnassign(array $topics)
260+
Incremental unassign of partitions to consume */
261+
PHP_METHOD(RdKafka_KafkaConsumer, incrementalUnassign)
262+
{
263+
consumer_incremental_op(0, INTERNAL_FUNCTION_PARAM_PASSTHRU);
264+
}
265+
/* }}} */
266+
#endif // !HAS_RD_KAFKA_INCREMENTAL_ASSIGN
267+
215268
/* {{{ proto array RdKafka\KafkaConsumer::getAssignment()
216269
Returns the current partition getAssignment */
217270
PHP_METHOD(RdKafka_KafkaConsumer, getAssignment)

kafka_consumer.stub.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ public function __construct(Conf $conf) {}
2121
/** @tentative-return-type */
2222
public function assign(?array $topic_partitions = null): void {}
2323

24+
#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
25+
/** @tentative-return-type */
26+
public function incrementalAssign(array $topic_partitions): void {}
27+
28+
/** @tentative-return-type */
29+
public function incrementalUnassign(array $topic_partitions): void {}
30+
#endif
31+
2432
/** @tentative-return-type */
2533
public function getAssignment(): array {}
2634

kafka_consumer_arginfo.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,16 @@ ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaCon
99
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, topic_partitions, IS_ARRAY, 1, "null")
1010
ZEND_END_ARG_INFO()
1111

12+
#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
13+
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, IS_VOID, 0)
14+
ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0)
15+
ZEND_END_ARG_INFO()
16+
17+
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, 0, 0, IS_VOID, 0)
18+
ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0)
19+
ZEND_END_ARG_INFO()
20+
#endif
21+
1222
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, IS_ARRAY, 0)
1323
ZEND_END_ARG_INFO()
1424

@@ -70,6 +80,10 @@ ZEND_END_ARG_INFO()
7080

7181
ZEND_METHOD(RdKafka_KafkaConsumer, __construct);
7282
ZEND_METHOD(RdKafka_KafkaConsumer, assign);
83+
#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
84+
ZEND_METHOD(RdKafka_KafkaConsumer, incrementalAssign);
85+
ZEND_METHOD(RdKafka_KafkaConsumer, incrementalUnassign);
86+
#endif
7387
ZEND_METHOD(RdKafka_KafkaConsumer, getAssignment);
7488
ZEND_METHOD(RdKafka_KafkaConsumer, commit);
7589
ZEND_METHOD(RdKafka_KafkaConsumer, close);
@@ -91,6 +105,10 @@ ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
91105
static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
92106
ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC)
93107
ZEND_ME(RdKafka_KafkaConsumer, assign, arginfo_class_RdKafka_KafkaConsumer_assign, ZEND_ACC_PUBLIC)
108+
#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
109+
ZEND_ME(RdKafka_KafkaConsumer, incrementalAssign, arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, ZEND_ACC_PUBLIC)
110+
ZEND_ME(RdKafka_KafkaConsumer, incrementalUnassign, arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, ZEND_ACC_PUBLIC)
111+
#endif
94112
ZEND_ME(RdKafka_KafkaConsumer, getAssignment, arginfo_class_RdKafka_KafkaConsumer_getAssignment, ZEND_ACC_PUBLIC)
95113
ZEND_ME(RdKafka_KafkaConsumer, commit, arginfo_class_RdKafka_KafkaConsumer_commit, ZEND_ACC_PUBLIC)
96114
ZEND_ME(RdKafka_KafkaConsumer, close, arginfo_class_RdKafka_KafkaConsumer_close, ZEND_ACC_PUBLIC)

kafka_consumer_legacy_arginfo.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,16 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_assign, 0, 0, 0)
99
ZEND_ARG_INFO(0, topic_partitions)
1010
ZEND_END_ARG_INFO()
1111

12+
#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
13+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, 0, 0, 0)
14+
ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0)
15+
ZEND_END_ARG_INFO()
16+
17+
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, 0, 0, 0)
18+
ZEND_ARG_ARRAY_INFO(0, topic_partitions, 0)
19+
ZEND_END_ARG_INFO()
20+
#endif
21+
1222
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_getAssignment, 0, 0, 0)
1323
ZEND_END_ARG_INFO()
1424

@@ -69,6 +79,10 @@ ZEND_END_ARG_INFO()
6979

7080
ZEND_METHOD(RdKafka_KafkaConsumer, __construct);
7181
ZEND_METHOD(RdKafka_KafkaConsumer, assign);
82+
#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
83+
ZEND_METHOD(RdKafka_KafkaConsumer, incrementalAssign);
84+
ZEND_METHOD(RdKafka_KafkaConsumer, incrementalUnassign);
85+
#endif
7286
ZEND_METHOD(RdKafka_KafkaConsumer, getAssignment);
7387
ZEND_METHOD(RdKafka_KafkaConsumer, commit);
7488
ZEND_METHOD(RdKafka_KafkaConsumer, close);
@@ -90,6 +104,10 @@ ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
90104
static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
91105
ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC)
92106
ZEND_ME(RdKafka_KafkaConsumer, assign, arginfo_class_RdKafka_KafkaConsumer_assign, ZEND_ACC_PUBLIC)
107+
#ifdef HAS_RD_KAFKA_INCREMENTAL_ASSIGN
108+
ZEND_ME(RdKafka_KafkaConsumer, incrementalAssign, arginfo_class_RdKafka_KafkaConsumer_incrementalAssign, ZEND_ACC_PUBLIC)
109+
ZEND_ME(RdKafka_KafkaConsumer, incrementalUnassign, arginfo_class_RdKafka_KafkaConsumer_incrementalUnassign, ZEND_ACC_PUBLIC)
110+
#endif
93111
ZEND_ME(RdKafka_KafkaConsumer, getAssignment, arginfo_class_RdKafka_KafkaConsumer_getAssignment, ZEND_ACC_PUBLIC)
94112
ZEND_ME(RdKafka_KafkaConsumer, commit, arginfo_class_RdKafka_KafkaConsumer_commit, ZEND_ACC_PUBLIC)
95113
ZEND_ME(RdKafka_KafkaConsumer, close, arginfo_class_RdKafka_KafkaConsumer_close, ZEND_ACC_PUBLIC)

0 commit comments

Comments
 (0)