From bbf66f4c3809f99e20ce29b61c12ec25739a5096 Mon Sep 17 00:00:00 2001 From: Peter Turcsanyi Date: Wed, 26 Nov 2025 16:53:07 +0100 Subject: [PATCH 1/3] NIFI-14782 Extended Kafka3ConnectionService OAuth authentication with SASL Extensions support --- .../service/Kafka3ConnectionService.java | 22 ++++++++++--- .../OAuthBearerLoginCallbackHandler.java | 29 +++++++++++++++- .../login/OAuthBearerLoginConfigProvider.java | 8 ++++- .../kafka/shared/util/SaslExtensionUtil.java | 33 +++++++++++++++++++ .../validation/DynamicPropertyValidator.java | 6 +++- 5 files changed, 91 insertions(+), 7 deletions(-) create mode 100644 nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/util/SaslExtensionUtil.java diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java index 3291d6aa1fc7..41e062998620 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java @@ -84,12 +84,16 @@ import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_LOCATION; import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_PASSWORD; import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_TYPE; +import static org.apache.nifi.kafka.shared.util.SaslExtensionUtil.SASL_EXTENSION_PROPERTY_PREFIX; +import static org.apache.nifi.kafka.shared.util.SaslExtensionUtil.isSaslExtensionProperty; +import static org.apache.nifi.kafka.shared.util.SaslExtensionUtil.removeSaslExtensionPropertyPrefix; @Tags({"Apache", "Kafka", "Message", "Publish", "Consume"}) -@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", - description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." +@DynamicProperty(name = "The name of a Kafka configuration property or a SASL extension property.", value = "The value of the given property.", + description = "Kafka configuration properties will be added on the Kafka configuration after loading any provided configuration properties." + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged." - + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.", + + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration." + + " SASL extension properties can be specified in " + SASL_EXTENSION_PROPERTY_PREFIX + "propertyName format (e.g. " + SASL_EXTENSION_PROPERTY_PREFIX + "logicalCluster).", expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT) @CapabilityDescription("Provides and manages connections to Kafka Brokers for producer or consumer operations.") public class Kafka3ConnectionService extends AbstractControllerService implements KafkaConnectionService, VerifiableControllerService, KafkaClientComponent { @@ -214,8 +218,18 @@ protected List getSupportedPropertyDescriptors() { @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + final String propertyName; + final String propertyType; + if (isSaslExtensionProperty(propertyDescriptorName)) { + propertyName = removeSaslExtensionPropertyPrefix(propertyDescriptorName); + propertyType = "SASL Extension"; + } else { + propertyName = propertyDescriptorName; + propertyType = "Kafka Configuration"; + } + return new PropertyDescriptor.Builder() - .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") + .description("Specifies the value for '" + propertyName + "' " + propertyType + ".") .name(propertyDescriptorName) .addValidator(new DynamicPropertyValidator(ProducerConfig.class, ConsumerConfig.class)) .dynamic(true) diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/security/OAuthBearerLoginCallbackHandler.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/security/OAuthBearerLoginCallbackHandler.java index 69019eab8505..4b4965d4aae8 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/security/OAuthBearerLoginCallbackHandler.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/security/OAuthBearerLoginCallbackHandler.java @@ -17,6 +17,8 @@ package org.apache.nifi.kafka.service.security; import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.auth.SaslExtensions; +import org.apache.kafka.common.security.auth.SaslExtensionsCallback; import org.apache.kafka.common.security.oauthbearer.ClientJwtValidator; import org.apache.kafka.common.security.oauthbearer.JwtValidatorException; import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; @@ -29,10 +31,16 @@ import org.slf4j.LoggerFactory; import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.auth.login.AppConfigurationEntry; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.nifi.kafka.shared.util.SaslExtensionUtil.isSaslExtensionProperty; +import static org.apache.nifi.kafka.shared.util.SaslExtensionUtil.removeSaslExtensionPropertyPrefix; /** * {@link org.apache.kafka.common.security.auth.AuthenticateCallbackHandler} implementation to support OAuth 2 in NiFi Kafka components. @@ -49,6 +57,8 @@ public class OAuthBearerLoginCallbackHandler implements AuthenticateCallbackHand private OAuth2AccessTokenProvider accessTokenProvider; private ClientJwtValidator accessTokenValidator; + private Map saslExtensions; + @Override public void configure(final Map configs, final String saslMechanism, final List jaasConfigEntries) { final Map options = JaasOptionsUtils.getOptions(saslMechanism, jaasConfigEntries); @@ -72,13 +82,23 @@ public void configure(final Map configs, final String saslMechanism, this.accessTokenProvider = accessTokenProvider; this.accessTokenValidator = new ClientJwtValidator(); this.accessTokenValidator.configure(configs, saslMechanism, List.of()); + + this.saslExtensions = options.entrySet().stream() + .filter(entry -> isSaslExtensionProperty(entry.getKey())) + .collect(Collectors.collectingAndThen( + Collectors.toMap(entry -> removeSaslExtensionPropertyPrefix(entry.getKey()), entry -> entry.getValue().toString()), + Collections::unmodifiableMap)); } @Override - public void handle(final Callback[] callbacks) { + public void handle(final Callback[] callbacks) throws UnsupportedCallbackException { for (final Callback callback : callbacks) { if (callback instanceof OAuthBearerTokenCallback) { handleTokenCallback((OAuthBearerTokenCallback) callback); + } else if (callback instanceof SaslExtensionsCallback) { + handleExtensionsCallback((SaslExtensionsCallback) callback); + } else { + throw new UnsupportedCallbackException(callback); } } } @@ -86,6 +106,8 @@ public void handle(final Callback[] callbacks) { private void handleTokenCallback(final OAuthBearerTokenCallback callback) { final String accessToken; try { + // Kafka's ExpiringCredentialRefreshingLogin calls this method when the current token is about to expire and expects a refreshed token, so forcefully update it + accessTokenProvider.refreshAccessDetails(); accessToken = accessTokenProvider.getAccessDetails().getAccessToken(); } catch (Exception e) { LOGGER.error("Could not retrieve access token", e); @@ -102,6 +124,11 @@ private void handleTokenCallback(final OAuthBearerTokenCallback callback) { } } + private void handleExtensionsCallback(final SaslExtensionsCallback callback) { + // a unique SaslExtensions object must be returned otherwise it will be lost upon relogin + callback.extensions(new SaslExtensions(saslExtensions)); + } + @Override public void close() { } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/OAuthBearerLoginConfigProvider.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/OAuthBearerLoginConfigProvider.java index f7df5b241a91..15fd34d1af5c 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/OAuthBearerLoginConfigProvider.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/OAuthBearerLoginConfigProvider.java @@ -20,6 +20,7 @@ import org.apache.nifi.kafka.shared.component.KafkaClientComponent; import static javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED; +import static org.apache.nifi.kafka.shared.util.SaslExtensionUtil.isSaslExtensionProperty; /** * SASL OAuthBearer Login Module implementation of configuration provider @@ -33,7 +34,8 @@ public class OAuthBearerLoginConfigProvider implements LoginConfigProvider { * Get JAAS configuration for activating OAuthBearer Login Module. * The login module uses callback handlers to acquire Access Tokens. NiFi's callback handler relies on {@link org.apache.nifi.oauth2.OAuth2AccessTokenProvider} controller service to get the token. * The controller service will be passed to the callback handler via Kafka config map (as an object, instead of the string-based JAAS config). - * The JAAS config contains the service id in order to make the callback handler unique to the given service (Kafka creates separate callback handlers based on JAAS config). + * The JAAS config contains the service id and the SASL extension properties in order to make the callback handler unique to the given configuration + * (the Kafka framework creates separate callback handlers based on JAAS config). * * @param context Property Context * @return JAAS configuration with OAuthBearer Login Module @@ -45,6 +47,10 @@ public String getConfiguration(final PropertyContext context) { final String serviceId = context.getProperty(KafkaClientComponent.OAUTH2_ACCESS_TOKEN_PROVIDER_SERVICE).getValue(); builder.append(SERVICE_ID_KEY, serviceId); + context.getAllProperties().entrySet().stream() + .filter(entry -> isSaslExtensionProperty(entry.getKey())) + .forEach(entry -> builder.append(entry.getKey(), entry.getValue())); + return builder.build(); } } diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/util/SaslExtensionUtil.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/util/SaslExtensionUtil.java new file mode 100644 index 000000000000..65a4bc45a848 --- /dev/null +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/util/SaslExtensionUtil.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.shared.util; + +/** + * Utility class to handle SASL Extension properties + */ +public class SaslExtensionUtil { + + public static final String SASL_EXTENSION_PROPERTY_PREFIX = "sasl_extension_"; + + public static boolean isSaslExtensionProperty(final String propertyName) { + return propertyName.startsWith(SASL_EXTENSION_PROPERTY_PREFIX); + } + + public static String removeSaslExtensionPropertyPrefix(final String propertyName) { + return propertyName.substring(SASL_EXTENSION_PROPERTY_PREFIX.length()); + } +} diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java index a9b0585dd758..63f1630c07b1 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/DynamicPropertyValidator.java @@ -25,6 +25,8 @@ import java.util.HashSet; import java.util.Set; +import static org.apache.nifi.kafka.shared.util.SaslExtensionUtil.SASL_EXTENSION_PROPERTY_PREFIX; + /** * Validator for dynamic Kafka properties */ @@ -48,10 +50,12 @@ public ValidationResult validate(final String subject, final String input, final if (subject.startsWith(PARTITIONS_PROPERTY_PREFIX)) { builder.valid(true); + } else if (subject.startsWith(SASL_EXTENSION_PROPERTY_PREFIX)) { + builder.valid(true); } else { final boolean valid = clientPropertyNames.contains(subject); builder.valid(valid); - builder.explanation("must be a known Kafka client configuration property"); + builder.explanation("must be a known Kafka client configuration property or a SASL extension property"); } return builder.build(); From bccd74ba4f85e4018c3c3e3256b5001a40ed891a Mon Sep 17 00:00:00 2001 From: Peter Turcsanyi Date: Thu, 27 Nov 2025 12:56:54 +0100 Subject: [PATCH 2/3] NIFI-14782 Review changes --- .../org/apache/nifi/kafka/service/Kafka3ConnectionService.java | 3 ++- .../org/apache/nifi/kafka/shared/util/SaslExtensionUtil.java | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java index 41e062998620..8d217e01aec0 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java @@ -220,6 +220,7 @@ protected List getSupportedPropertyDescriptors() { protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { final String propertyName; final String propertyType; + if (isSaslExtensionProperty(propertyDescriptorName)) { propertyName = removeSaslExtensionPropertyPrefix(propertyDescriptorName); propertyType = "SASL Extension"; @@ -229,7 +230,7 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String } return new PropertyDescriptor.Builder() - .description("Specifies the value for '" + propertyName + "' " + propertyType + ".") + .description("Specifies the value for '%s' %s property.".formatted(propertyName, propertyType)) .name(propertyDescriptorName) .addValidator(new DynamicPropertyValidator(ProducerConfig.class, ConsumerConfig.class)) .dynamic(true) diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/util/SaslExtensionUtil.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/util/SaslExtensionUtil.java index 65a4bc45a848..c87207f7a761 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/util/SaslExtensionUtil.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/util/SaslExtensionUtil.java @@ -23,6 +23,9 @@ public class SaslExtensionUtil { public static final String SASL_EXTENSION_PROPERTY_PREFIX = "sasl_extension_"; + private SaslExtensionUtil() { + } + public static boolean isSaslExtensionProperty(final String propertyName) { return propertyName.startsWith(SASL_EXTENSION_PROPERTY_PREFIX); } From ad242e750bef05e6738e2095e3ee8d8e0c8a745f Mon Sep 17 00:00:00 2001 From: Peter Turcsanyi Date: Fri, 28 Nov 2025 16:26:26 +0100 Subject: [PATCH 3/3] NIFI-14782 Review changes --- .../apache/nifi/kafka/service/Kafka3ConnectionService.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java index 8d217e01aec0..b29a44493287 100644 --- a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java +++ b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java @@ -220,13 +220,16 @@ protected List getSupportedPropertyDescriptors() { protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { final String propertyName; final String propertyType; + final ExpressionLanguageScope expressionLanguageScope; if (isSaslExtensionProperty(propertyDescriptorName)) { propertyName = removeSaslExtensionPropertyPrefix(propertyDescriptorName); propertyType = "SASL Extension"; + expressionLanguageScope = ExpressionLanguageScope.NONE; } else { propertyName = propertyDescriptorName; propertyType = "Kafka Configuration"; + expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT; } return new PropertyDescriptor.Builder() @@ -234,7 +237,7 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String .name(propertyDescriptorName) .addValidator(new DynamicPropertyValidator(ProducerConfig.class, ConsumerConfig.class)) .dynamic(true) - .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT) + .expressionLanguageSupported(expressionLanguageScope) .build(); }