Skip to content

Commit bcd5004

Browse files
feat: implement oauthbearer token refresh cb setter (#546)
1 parent b21a905 commit bcd5004

File tree

10 files changed

+150
-2
lines changed

10 files changed

+150
-2
lines changed

.github/workflows/test/build-librdkafka.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ set -ex
55
if ! [ -f ~/build-cache/librdkafka/usr/local/include/librdkafka/rdkafka.h ] || ! [ -f ~/build-cache/librdkafka/usr/local/bin/kafkacat ]; then
66
echo "librdkafka build is not cached"
77

8-
git clone --depth 1 --branch "${LIBRDKAFKA_VERSION:-1.5.0}" "${LIBRDKAFKA_REPOSITORY_URL:-https://github.com/edenhill/librdkafka.git}"
8+
git clone --depth 1 --branch "${LIBRDKAFKA_VERSION:-v2.3.0}" "${LIBRDKAFKA_REPOSITORY_URL:-https://github.com/edenhill/librdkafka.git}"
99

1010
cd librdkafka
1111
./configure

conf.c

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs) /* {{{ */
6666
cbs->offset_commit = NULL;
6767
kafka_conf_callback_dtor(cbs->log);
6868
cbs->log = NULL;
69+
kafka_conf_callback_dtor(cbs->oauthbearer_token_refresh);
70+
cbs->oauthbearer_token_refresh = NULL;
6971
} /* }}} */
7072

7173
static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callback *from) /* {{{ */
@@ -337,6 +339,40 @@ static void kafka_conf_log_cb(const rd_kafka_t *rk, int level, const char *facil
337339
zval_ptr_dtor(&args[3]);
338340
}
339341

342+
/*
343+
void rd_kafka_conf_set_oauthbearer_token_refresh_cb(
344+
rd_kafka_conf_t *conf,
345+
void (*oauthbearer_token_refresh_cb)(rd_kafka_t *rk,
346+
const char *oauthbearer_config,
347+
void *opaque)) {
348+
}*/
349+
static void kafka_conf_set_oauthbearer_token_refresh_cb(rd_kafka_t *rk, const char *oauthbearer_config, void *opaque)
350+
{
351+
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
352+
zval args[2];
353+
354+
if (!opaque) {
355+
return;
356+
}
357+
358+
if (!cbs->oauthbearer_token_refresh) {
359+
return;
360+
}
361+
362+
ZVAL_NULL(&args[0]);
363+
ZVAL_NULL(&args[1]);
364+
365+
ZVAL_ZVAL(&args[0], &cbs->zrk, 1, 0);
366+
ZVAL_STRING(&args[1], oauthbearer_config);
367+
368+
rdkafka_call_function(&cbs->oauthbearer_token_refresh->fci, &cbs->oauthbearer_token_refresh->fcc, NULL, 2, args);
369+
370+
zval_ptr_dtor(&args[0]);
371+
zval_ptr_dtor(&args[1]);
372+
}
373+
374+
375+
340376
/* {{{ proto RdKafka\Conf::__construct() */
341377
PHP_METHOD(RdKafka_Conf, __construct)
342378
{
@@ -698,6 +734,40 @@ PHP_METHOD(RdKafka_Conf, setLogCb)
698734
}
699735
/* }}} */
700736

737+
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
738+
/* {{{ proto void RdKafka\Conf::setOauthbearerTokenRefreshCb(mixed $callback)
739+
Set token refresh callback for OAUTHBEARER sasl */
740+
PHP_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb)
741+
{
742+
zend_fcall_info fci;
743+
zend_fcall_info_cache fcc;
744+
kafka_conf_object *conf;
745+
746+
if (zend_parse_parameters(ZEND_NUM_ARGS(), "f", &fci, &fcc) == FAILURE) {
747+
return;
748+
}
749+
750+
conf = get_kafka_conf_object(getThis());
751+
if (!conf) {
752+
return;
753+
}
754+
755+
Z_ADDREF_P(&fci.function_name);
756+
757+
if (conf->cbs.oauthbearer_token_refresh) {
758+
zval_ptr_dtor(&conf->cbs.oauthbearer_token_refresh->fci.function_name);
759+
} else {
760+
conf->cbs.oauthbearer_token_refresh = ecalloc(1, sizeof(*conf->cbs.oauthbearer_token_refresh));
761+
}
762+
763+
conf->cbs.oauthbearer_token_refresh->fci = fci;
764+
conf->cbs.oauthbearer_token_refresh->fcc = fcc;
765+
766+
rd_kafka_conf_set_oauthbearer_token_refresh_cb(conf->u.conf, kafka_conf_set_oauthbearer_token_refresh_cb);
767+
}
768+
/* }}} */
769+
#endif
770+
701771
/* {{{ proto RdKafka\TopicConf::__construct() */
702772
PHP_METHOD(RdKafka_TopicConf, __construct)
703773
{

conf.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ typedef struct _kafka_conf_callbacks {
4646
kafka_conf_callback *consume;
4747
kafka_conf_callback *offset_commit;
4848
kafka_conf_callback *log;
49+
kafka_conf_callback *oauthbearer_token_refresh;
4950
} kafka_conf_callbacks;
5051

5152
typedef struct _kafka_conf_object {

conf.stub.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ public function setOffsetCommitCb(callable $callback): void {}
4444

4545
/** @tentative-return-type */
4646
public function setLogCb(callable $callback): void {}
47+
48+
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
49+
/** @tentative-return-type */
50+
public function setOauthbearerTokenRefreshCb(callable $callback): void {}
51+
#endif
4752
}
4853

4954
class TopicConf

conf_arginfo.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ ZEND_END_ARG_INFO()
3232

3333
#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb
3434

35+
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
36+
#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb
37+
#endif
38+
3539
#define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct
3640

3741
#define arginfo_class_RdKafka_TopicConf_dump arginfo_class_RdKafka_Conf_dump
@@ -54,6 +58,11 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
5458
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
5559
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
5660
ZEND_METHOD(RdKafka_Conf, setLogCb);
61+
62+
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
63+
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
64+
#endif
65+
5766
ZEND_METHOD(RdKafka_TopicConf, __construct);
5867
ZEND_METHOD(RdKafka_TopicConf, setPartitioner);
5968

@@ -70,6 +79,9 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
7079
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
7180
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
7281
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
82+
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
83+
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
84+
#endif
7385
ZEND_FE_END
7486
};
7587

conf_legacy_arginfo.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ ZEND_END_ARG_INFO()
3232

3333
#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb
3434

35+
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
36+
#define arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb arginfo_class_RdKafka_Conf_setErrorCb
37+
#endif
38+
3539
#define arginfo_class_RdKafka_TopicConf___construct arginfo_class_RdKafka_Conf___construct
3640

3741
#define arginfo_class_RdKafka_TopicConf_dump arginfo_class_RdKafka_Conf_dump
@@ -54,6 +58,9 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
5458
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
5559
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
5660
ZEND_METHOD(RdKafka_Conf, setLogCb);
61+
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
62+
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
63+
#endif
5764
ZEND_METHOD(RdKafka_TopicConf, __construct);
5865
ZEND_METHOD(RdKafka_TopicConf, setPartitioner);
5966

@@ -70,6 +77,9 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
7077
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
7178
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
7279
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
80+
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
81+
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
82+
#endif
7383
ZEND_FE_END
7484
};
7585

config.m4

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ if test "$PHP_RDKAFKA" != "no"; then
8484
AC_MSG_WARN([murmur2 partitioner is not available])
8585
])
8686

87+
AC_CHECK_LIB($LIBNAME,[rd_kafka_conf_set_oauthbearer_token_refresh_cb],[
88+
AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB,1,[ ])
89+
],[
90+
AC_MSG_WARN([oauthbearer token refresh cb is not available])
91+
])
92+
8793
LDFLAGS="$ORIG_LDFLAGS"
8894
CPPFLAGS="$ORIG_CPPFLAGS"
8995

package.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@
114114
<file role="test" name="bug88.phpt"/>
115115
<file role="test" name="bugConfSetArgument.phpt"/>
116116
<file role="test" name="conf_callbacks_integration.phpt"/>
117+
<file role="test" name="conf_callbacks_rdkafka11.phpt"/>
117118
<file role="test" name="conf_callbacks.phpt"/>
118119
<file role="test" name="conf.phpt"/>
119120
<file role="test" name="conf_setDefaultTopicConf8.phpt"/>

tests/conf_callbacks.phpt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
RdKafka\Conf
33
--SKIPIF--
44
<?php
5-
RD_KAFKA_VERSION >= 0x090000 || die("skip librdkafka too old");
5+
(RD_KAFKA_VERSION >= 0x090000 && RD_KAFKA_VERSION < 0x010100ff) || die("skip librdkafka too old");
66
--FILE--
77
<?php
88

@@ -23,6 +23,8 @@ $conf->setRebalanceCb(function () { });
2323
$dump = $conf->dump();
2424
var_dump(isset($dump["rebalance_cb"]));
2525

26+
echo "Checking if oauthbearer cb exists\n";
27+
var_dump(method_exists($conf, 'setOauthbearerTokenRefreshCb'));
2628

2729
--EXPECT--
2830
Setting consume callback
@@ -31,3 +33,5 @@ Setting offset_commit callback
3133
bool(true)
3234
Setting rebalance callback
3335
bool(true)
36+
Checking if oauthbearer cb exists
37+
bool(false)
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
--TEST--
2+
RdKafka\Conf
3+
--SKIPIF--
4+
<?php
5+
RD_KAFKA_VERSION >= 0x010100ff || die("skip librdkafka too old");
6+
--FILE--
7+
<?php
8+
9+
$conf = new RdKafka\Conf();
10+
11+
echo "Setting consume callback\n";
12+
$conf->setConsumeCb(function () { });
13+
$dump = $conf->dump();
14+
var_dump(isset($dump["consume_cb"]));
15+
16+
echo "Setting offset_commit callback\n";
17+
$conf->setOffsetCommitCb(function () { });
18+
$dump = $conf->dump();
19+
var_dump(isset($dump["offset_commit_cb"]));
20+
21+
echo "Setting rebalance callback\n";
22+
$conf->setRebalanceCb(function () { });
23+
$dump = $conf->dump();
24+
var_dump(isset($dump["rebalance_cb"]));
25+
26+
echo "Setting oauth token bearer callback\n";
27+
$conf->setOauthbearerTokenRefreshCb(function () {});
28+
$dump = $conf->dump();
29+
var_dump(isset($dump["oauthbearer_token_refresh_cb"]));
30+
31+
--EXPECT--
32+
Setting consume callback
33+
bool(true)
34+
Setting offset_commit callback
35+
bool(true)
36+
Setting rebalance callback
37+
bool(true)
38+
Setting oauth token bearer callback
39+
bool(true)

0 commit comments

Comments
 (0)