Skip to content

Commit 743f1d3

Browse files
feat: finalize OAUTHBEARER support (#547)
Co-authored-by: Sarina Corrigan <26685104+scorgn@users.noreply.github.com>
1 parent 0ee8bba commit 743f1d3

16 files changed

+488
-72
lines changed

.github/workflows/package.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,6 @@ jobs:
2020
run: './php-rdkafka/.github/workflows/package/package.sh'
2121

2222
- name: 'Archive package'
23-
uses: 'actions/upload-artifact@v2'
23+
uses: 'actions/upload-artifact@v4'
2424
with:
2525
path: 'php-rdkafka/rdkafka.tgz'

.github/workflows/test.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,28 +109,39 @@ jobs:
109109
# librdkafka 1.0.1
110110
- php: '8.1.0'
111111
librdkafka: 'v1.0.1'
112+
skipoauth: '1'
112113
- php: '8.0.0'
113114
librdkafka: 'v1.0.1'
115+
skipoauth: '1'
114116
- php: '7.4.0'
115117
librdkafka: 'v1.0.1'
118+
skipoauth: '1'
116119
- php: '7.3.0'
117120
librdkafka: 'v1.0.1'
121+
skipoauth: '1'
118122

119123
# librdkafka 0.11.6
120124
- php: '8.1.0'
121125
librdkafka: 'v0.11.6'
126+
skipoauth: '1'
122127
- php: '8.0.0'
123128
librdkafka: 'v0.11.6'
129+
skipoauth: '1'
124130
- php: '7.4.0'
125131
librdkafka: 'v0.11.6'
132+
skipoauth: '1'
126133
- php: '7.3.0'
127134
librdkafka: 'v0.11.6'
135+
skipoauth: '1'
128136
- php: '7.2.0'
129137
librdkafka: 'v0.11.6'
138+
skipoauth: '1'
130139
- php: '7.1.0'
131140
librdkafka: 'v0.11.6'
141+
skipoauth: '1'
132142
- php: '7.0.0'
133143
librdkafka: 'v0.11.6'
144+
skipoauth: '1'
134145

135146
# librdkafka master (experimental, does not block PRs)
136147
- php: '8.3.0'
@@ -158,6 +169,7 @@ jobs:
158169
PHP_VERSION: ${{ matrix.php }}
159170
LIBRDKAFKA_VERSION: ${{ matrix.librdkafka }}
160171
MEMORY_CHECK: ${{ matrix.memcheck }}
172+
SKIP_OAUTH: ${{ matrix.skipoauth }}
161173
TEST_KAFKA_BROKERS: kafka:9092
162174
TEST_KAFKA_BROKER_VERSION: 2.6
163175
steps:

.github/workflows/test/start-kafka.sh

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,63 @@ docker network create kafka_network
44
docker pull wurstmeister/zookeeper:latest
55
docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:latest
66
docker pull wurstmeister/kafka:latest
7-
docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:latest
8-
printf "\n127.0.0.1 kafka\n"|sudo tee /etc/hosts >/dev/null
7+
docker run -d -p 9092:9092 --network kafka_network \
8+
-e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" \
9+
-e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" \
10+
-e "KAFKA_BROKER_ID=1" \
11+
-e "KAFKA_ADVERTISED_HOST_NAME=kafka" \
12+
-e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka" \
13+
-e "KAFKA_ADVERTISED_PORT=9092" \
14+
--name kafka wurstmeister/kafka:latest
915

10-
echo "Waiting for Kafka to be ready"
16+
if [ ${SKIP_OAUTH:-0} -ne 1 ]; then
17+
docker run -d -p 29092:29092 --network kafka_network \
18+
-e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" \
19+
-e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" \
20+
-e "KAFKA_ADVERTISED_HOST_NAME=kafka_oauth2" \
21+
-e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181/kafka_oauth2" \
22+
-e "KAFKA_ADVERTISED_PORT=29092" \
23+
-e "KAFKA_BROKER_ID=2" \
24+
-e "KAFKA_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \
25+
-e "KAFKA_ADVERTISED_LISTENERS=SASLPLAINTEXT://kafka_oauth2:29092" \
26+
-e "KAFKA_LISTENER_NAME_SASLPLAINTEXT_OAUTHBEARER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredValidatorRequiredScope=\"required-scope\" unsecuredLoginStringClaim_scope=\"required-scope\" unsecuredLoginStringClaim_sub=\"admin\";" \
27+
-e "KAFKA_INTER_BROKER_LISTENER_NAME=SASLPLAINTEXT" \
28+
-e "KAFKA_SASL_ENABLED_MECHANISMS=OAUTHBEARER" \
29+
-e "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=SASLPLAINTEXT:SASL_PLAINTEXT" \
30+
-e "KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=OAUTHBEARER" \
31+
--name kafka_oauth2 wurstmeister/kafka:latest
32+
fi
33+
34+
printf "\n127.0.0.1 kafka\n127.0.0.1 kafka_oauth2\n"|sudo tee /etc/hosts >/dev/null
35+
36+
echo "Waiting for Kafka services to be ready"
37+
38+
kafka_ready=0
39+
kafka_oauth2_ready=0
1140

1241
for i in $(seq 1 20); do
13-
if kafkacat -b 127.0.0.1 -L; then
14-
echo "Kafka is ready"
15-
exit 0
42+
if [ $kafka_ready -eq 0 ]; then
43+
if kafkacat -b kafka:9092 -L -m 30 -d broker; then
44+
kafka_ready=1
45+
echo "Kafka is ready"
46+
fi
47+
fi
48+
if [ $kafka_oauth2_ready -eq 0 ] && [ ${SKIP_OAUTH:-0} -ne 1 ]; then
49+
if kafkacat -b kafka_oauth2:29092 \
50+
-X security.protocol=SASL_PLAINTEXT \
51+
-X sasl.mechanisms=OAUTHBEARER \
52+
-X enable.sasl.oauthbearer.unsecure.jwt="true" \
53+
-X sasl.oauthbearer.config="principal=admin scope=required-scope" -L
54+
then
55+
kafka_oauth2_ready=1
56+
echo "Kafka OAuth2 is ready"
57+
fi
58+
fi
59+
60+
if [ $kafka_ready -eq 1 ] && ( [ $kafka_oauth2_ready -eq 1 ] || [ ${SKIP_OAUTH:-0} -eq 1 ] ); then
61+
exit 0
1662
fi
1763
done
1864

19-
echo "Timedout waiting for Kafka to be ready"
65+
echo "Timedout waiting for Kafka services to be ready"
2066
exit 1

conf.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callba
8181

8282
void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *from) /* {{{ */
8383
{
84+
kafka_conf_callback_copy(&to->oauthbearer_token_refresh, from->oauthbearer_token_refresh);
8485
kafka_conf_callback_copy(&to->error, from->error);
8586
kafka_conf_callback_copy(&to->rebalance, from->rebalance);
8687
kafka_conf_callback_copy(&to->dr_msg, from->dr_msg);
@@ -734,7 +735,7 @@ PHP_METHOD(RdKafka_Conf, setLogCb)
734735
}
735736
/* }}} */
736737

737-
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
738+
#ifdef HAS_RD_KAFKA_OAUTHBEARER
738739
/* {{{ proto void RdKafka\Conf::setOauthbearerTokenRefreshCb(mixed $callback)
739740
Set token refresh callback for OAUTHBEARER sasl */
740741
PHP_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb)

conf.stub.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public function setOffsetCommitCb(callable $callback): void {}
4545
/** @tentative-return-type */
4646
public function setLogCb(callable $callback): void {}
4747

48-
#ifdef HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB
48+
#ifdef HAS_RD_KAFKA_OAUTHBEARER
4949
/** @tentative-return-type */
5050
public function setOauthbearerTokenRefreshCb(callable $callback): void {}
5151
#endif

conf_arginfo.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* This is a generated file, edit the .stub.php file instead.
2-
* Stub hash: 86934f54199c0af37cadfcedafeaffb569e33beb */
2+
* Stub hash: a72d2e1796ed7f89185f543973c659a6a704f347 */
33

44
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf___construct, 0, 0, 0)
55
ZEND_END_ARG_INFO()
@@ -48,7 +48,7 @@ ZEND_END_ARG_INFO()
4848

4949
#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb
5050

51-
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
51+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
5252
#if (PHP_VERSION_ID >= 80100)
5353
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 1, IS_VOID, 0)
5454
#else
@@ -83,7 +83,7 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
8383
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
8484
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
8585
ZEND_METHOD(RdKafka_Conf, setLogCb);
86-
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
86+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
8787
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
8888
#endif
8989
ZEND_METHOD(RdKafka_TopicConf, __construct);
@@ -101,7 +101,7 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
101101
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
102102
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
103103
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
104-
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
104+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
105105
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
106106
#endif
107107
ZEND_FE_END
@@ -128,7 +128,11 @@ static zend_class_entry *register_class_RdKafka_Conf(void)
128128
zend_class_entry ce, *class_entry;
129129

130130
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Conf", class_RdKafka_Conf_methods);
131+
#if (PHP_VERSION_ID >= 80400)
132+
class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
133+
#else
131134
class_entry = zend_register_internal_class_ex(&ce, NULL);
135+
#endif
132136

133137
return class_entry;
134138
}
@@ -138,7 +142,11 @@ static zend_class_entry *register_class_RdKafka_TopicConf(void)
138142
zend_class_entry ce, *class_entry;
139143

140144
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "TopicConf", class_RdKafka_TopicConf_methods);
145+
#if (PHP_VERSION_ID >= 80400)
146+
class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
147+
#else
141148
class_entry = zend_register_internal_class_ex(&ce, NULL);
149+
#endif
142150

143151
return class_entry;
144152
}

conf_legacy_arginfo.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* This is a generated file, edit the .stub.php file instead.
2-
* Stub hash: 86934f54199c0af37cadfcedafeaffb569e33beb */
2+
* Stub hash: a72d2e1796ed7f89185f543973c659a6a704f347 */
33

44
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf___construct, 0, 0, 0)
55
ZEND_END_ARG_INFO()
@@ -32,7 +32,7 @@ ZEND_END_ARG_INFO()
3232

3333
#define arginfo_class_RdKafka_Conf_setLogCb arginfo_class_RdKafka_Conf_setErrorCb
3434

35-
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
35+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
3636
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, 0, 0, 1)
3737
ZEND_ARG_INFO(0, callback)
3838
ZEND_END_ARG_INFO()
@@ -59,7 +59,7 @@ ZEND_METHOD(RdKafka_Conf, setRebalanceCb);
5959
ZEND_METHOD(RdKafka_Conf, setConsumeCb);
6060
ZEND_METHOD(RdKafka_Conf, setOffsetCommitCb);
6161
ZEND_METHOD(RdKafka_Conf, setLogCb);
62-
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
62+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
6363
ZEND_METHOD(RdKafka_Conf, setOauthbearerTokenRefreshCb);
6464
#endif
6565
ZEND_METHOD(RdKafka_TopicConf, __construct);
@@ -77,7 +77,7 @@ static const zend_function_entry class_RdKafka_Conf_methods[] = {
7777
ZEND_ME(RdKafka_Conf, setConsumeCb, arginfo_class_RdKafka_Conf_setConsumeCb, ZEND_ACC_PUBLIC)
7878
ZEND_ME(RdKafka_Conf, setOffsetCommitCb, arginfo_class_RdKafka_Conf_setOffsetCommitCb, ZEND_ACC_PUBLIC)
7979
ZEND_ME(RdKafka_Conf, setLogCb, arginfo_class_RdKafka_Conf_setLogCb, ZEND_ACC_PUBLIC)
80-
#if defined(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB)
80+
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
8181
ZEND_ME(RdKafka_Conf, setOauthbearerTokenRefreshCb, arginfo_class_RdKafka_Conf_setOauthbearerTokenRefreshCb, ZEND_ACC_PUBLIC)
8282
#endif
8383
ZEND_FE_END
@@ -104,7 +104,11 @@ static zend_class_entry *register_class_RdKafka_Conf(void)
104104
zend_class_entry ce, *class_entry;
105105

106106
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "Conf", class_RdKafka_Conf_methods);
107+
#if (PHP_VERSION_ID >= 80400)
108+
class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
109+
#else
107110
class_entry = zend_register_internal_class_ex(&ce, NULL);
111+
#endif
108112

109113
return class_entry;
110114
}
@@ -114,7 +118,11 @@ static zend_class_entry *register_class_RdKafka_TopicConf(void)
114118
zend_class_entry ce, *class_entry;
115119

116120
INIT_NS_CLASS_ENTRY(ce, "RdKafka", "TopicConf", class_RdKafka_TopicConf_methods);
121+
#if (PHP_VERSION_ID >= 80400)
122+
class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
123+
#else
117124
class_entry = zend_register_internal_class_ex(&ce, NULL);
125+
#endif
118126

119127
return class_entry;
120128
}

config.m4

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,10 @@ if test "$PHP_RDKAFKA" != "no"; then
9494
AC_MSG_WARN([murmur2 partitioner is not available])
9595
])
9696

97-
AC_CHECK_LIB($LIBNAME,[rd_kafka_conf_set_oauthbearer_token_refresh_cb],[
98-
AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER_TOKEN_REFRESH_CB,1,[ ])
97+
AC_CHECK_LIB($LIBNAME,[rd_kafka_oauthbearer_set_token],[
98+
AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER,1,[ ])
9999
],[
100-
AC_MSG_WARN([oauthbearer token refresh cb is not available])
100+
AC_MSG_WARN([oauthbearer support is not available])
101101
])
102102

103103
AC_CHECK_LIB($LIBNAME,[rd_kafka_incremental_assign, rd_kafka_incremental_unassign],[

package.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@
132132
<file role="test" name="metadata_partition_001.phpt"/>
133133
<file role="test" name="metadata_topic_001.phpt"/>
134134
<file role="test" name="new_topic_with_conf.phpt"/>
135+
<file role="test" name="oauthbearer_integration.phpt"/>
135136
<file role="test" name="pause_resume.phpt"/>
136137
<file role="test" name="produce_consume.phpt"/>
137138
<file role="test" name="produce_consume_queue.phpt"/>

0 commit comments

Comments
 (0)