diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index 376947b52754..26bc8f699203 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -26,6 +26,12 @@ nifi-utils + + + org.apache.nifi + nifi-oauth2-provider-api + + org.apache.nifi nifi-service-utils @@ -52,10 +58,6 @@ org.apache.nifi nifi-record - - org.apache.nifi - nifi-oauth2-provider-api - org.apache.nifi nifi-proxy-configuration-api @@ -69,7 +71,6 @@ org.apache.nifi nifi-azure-services-api 2.8.0-SNAPSHOT - provided com.azure diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java index 25108147610d..3883b2ab3d9c 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java @@ -81,6 +81,7 @@ import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; @@ -151,6 +152,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT; static final PropertyDescriptor AUTHENTICATION_STRATEGY = AzureEventHubComponent.AUTHENTICATION_STRATEGY; static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER = AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER; + static final PropertyDescriptor EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER = AzureEventHubComponent.IDENTITY_FEDERATION_TOKEN_PROVIDER; static final PropertyDescriptor ACCESS_POLICY_NAME = new PropertyDescriptor.Builder() .name("Shared Access Policy Name") .description("The name of the shared access policy. This policy must have Listen claims.") @@ -268,6 +270,13 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem .required(true) .dependsOn(BLOB_STORAGE_AUTHENTICATION_STRATEGY, BlobStorageAuthenticationStrategy.OAUTH2) .build(); + static final PropertyDescriptor BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER = new PropertyDescriptor.Builder() + .name("Storage Identity Federation Token Provider") + .description("Controller Service exchanging workload identity tokens for Azure AD access tokens when using Identity Federation with Azure Blob Storage.") + .identifiesControllerService(AzureIdentityFederationTokenProvider.class) + .required(true) + .dependsOn(BLOB_STORAGE_AUTHENTICATION_STRATEGY, BlobStorageAuthenticationStrategy.IDENTITY_FEDERATION) + .build(); static final PropertyDescriptor STORAGE_ACCOUNT_KEY = new PropertyDescriptor.Builder() .name("Storage Account Key") .description("The Azure Storage account key to store event hub consumer group state.") @@ -327,6 +336,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem POLICY_PRIMARY_KEY, AUTHENTICATION_STRATEGY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, + EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, CONSUMER_GROUP, RECORD_READER, RECORD_WRITER, @@ -341,6 +351,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor implem STORAGE_ACCOUNT_KEY, STORAGE_SAS_TOKEN, BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER, + BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER, PROXY_CONFIGURATION_SERVICE ); @@ -436,7 +447,6 @@ protected Collection customValidate(ValidationContext validati final String storageAccountKey = validationContext.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); final String storageSasToken = validationContext.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue(); final CheckpointStrategy checkpointStrategy = CheckpointStrategy.valueOf(validationContext.getProperty(CHECKPOINT_STRATEGY).getValue()); - final boolean blobOauthProviderSet = validationContext.getProperty(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER).isSet(); if ((recordReader != null && recordWriter == null) || (recordReader == null && recordWriter != null)) { results.add(new ValidationResult.Builder() @@ -449,10 +459,10 @@ protected Collection customValidate(ValidationContext validati if (checkpointStrategy == CheckpointStrategy.AZURE_BLOB_STORAGE) { final BlobStorageAuthenticationStrategy blobStorageAuthenticationStrategy = - validationContext.getProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY) - .asAllowableValue(BlobStorageAuthenticationStrategy.class); + validationContext.getProperty(BLOB_STORAGE_AUTHENTICATION_STRATEGY).asAllowableValue(BlobStorageAuthenticationStrategy.class); if (blobStorageAuthenticationStrategy == BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY) { + // needed because of expression language support if (StringUtils.isBlank(storageAccountKey)) { results.add(new ValidationResult.Builder() .subject(STORAGE_ACCOUNT_KEY.getDisplayName()) @@ -463,18 +473,8 @@ protected Collection customValidate(ValidationContext validati .valid(false) .build()); } - - if (StringUtils.isNotBlank(storageSasToken)) { - results.add(new ValidationResult.Builder() - .subject(STORAGE_SAS_TOKEN.getDisplayName()) - .explanation("%s must not be set when %s is %s." - .formatted(STORAGE_SAS_TOKEN.getDisplayName(), - BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(), - BlobStorageAuthenticationStrategy.STORAGE_ACCOUNT_KEY.getDisplayName())) - .valid(false) - .build()); - } } else if (blobStorageAuthenticationStrategy == BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE) { + // needed because of expression language support if (StringUtils.isBlank(storageSasToken)) { results.add(new ValidationResult.Builder() .subject(STORAGE_SAS_TOKEN.getDisplayName()) @@ -485,53 +485,16 @@ protected Collection customValidate(ValidationContext validati .valid(false) .build()); } - - if (StringUtils.isNotBlank(storageAccountKey)) { - results.add(new ValidationResult.Builder() - .subject(STORAGE_ACCOUNT_KEY.getDisplayName()) - .explanation("%s must not be set when %s is %s." - .formatted(STORAGE_ACCOUNT_KEY.getDisplayName(), - BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(), - BlobStorageAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName())) - .valid(false) - .build()); - } } else if (blobStorageAuthenticationStrategy == BlobStorageAuthenticationStrategy.OAUTH2) { - if (!blobOauthProviderSet) { - results.add(new ValidationResult.Builder() - .subject(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER.getDisplayName()) - .explanation("%s must be set when %s is %s." - .formatted(BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER.getDisplayName(), - BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(), - BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName())) - .valid(false) - .build()); - } - - if (StringUtils.isNotBlank(storageAccountKey)) { - results.add(new ValidationResult.Builder() - .subject(STORAGE_ACCOUNT_KEY.getDisplayName()) - .explanation("%s must not be set when %s is %s." - .formatted(STORAGE_ACCOUNT_KEY.getDisplayName(), - BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(), - BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName())) - .valid(false) - .build()); - } - - if (StringUtils.isNotBlank(storageSasToken)) { - results.add(new ValidationResult.Builder() - .subject(STORAGE_SAS_TOKEN.getDisplayName()) - .explanation("%s must not be set when %s is %s." - .formatted(STORAGE_SAS_TOKEN.getDisplayName(), - BLOB_STORAGE_AUTHENTICATION_STRATEGY.getDisplayName(), - BlobStorageAuthenticationStrategy.OAUTH2.getDisplayName())) - .valid(false) - .build()); - } + // Rely on required property + dependsOn validation to ensure provider is configured + } else if (blobStorageAuthenticationStrategy == BlobStorageAuthenticationStrategy.IDENTITY_FEDERATION) { + // Rely on required property + dependsOn validation to ensure provider is configured } } - results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, validationContext)); + + results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, + EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, validationContext)); + return results; } @@ -629,6 +592,14 @@ protected EventProcessorClient createClient(final ProcessContext context) { blobContainerClientBuilder.endpoint(endpoint); blobContainerClientBuilder.credential(tokenCredential); } + case IDENTITY_FEDERATION -> { + final AzureIdentityFederationTokenProvider tokenProvider = + context.getProperty(BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER).asControllerService(AzureIdentityFederationTokenProvider.class); + final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider); + final String endpoint = createBlobEndpoint(storageAccountName, domainName); + blobContainerClientBuilder.endpoint(endpoint); + blobContainerClientBuilder.credential(tokenCredential); + } } blobContainerClientBuilder.containerName(containerName); @@ -684,6 +655,13 @@ protected EventProcessorClient createClient(final ProcessContext context) { final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider); eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential); } + case IDENTITY_FEDERATION -> { + final AzureIdentityFederationTokenProvider tokenProvider = + context.getProperty(EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER) + .asControllerService(AzureIdentityFederationTokenProvider.class); + final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider); + eventProcessorClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential); + } } final Integer prefetchCount = context.getProperty(PREFETCH_COUNT).evaluateAttributeExpressions().asInteger(); @@ -923,7 +901,7 @@ private String createStorageConnectionString(final ProcessContext context, String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY, storageAccountName, storageAccountKey, domainName); case SHARED_ACCESS_SIGNATURE -> String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN, storageAccountName, domainName, storageSasToken); - case OAUTH2 -> throw new IllegalArgumentException(String.format( + case OAUTH2, IDENTITY_FEDERATION -> throw new IllegalArgumentException(String.format( "Blob Storage Authentication Strategy %s does not support connection string authentication", blobStorageAuthenticationStrategy)); }; } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java index 55b50805f1a7..47a37307ddb1 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java @@ -58,6 +58,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils; import org.apache.nifi.scheduling.ExecutionNode; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; @@ -117,6 +118,7 @@ public class GetAzureEventHub extends AbstractProcessor implements AzureEventHub static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT; static final PropertyDescriptor AUTHENTICATION_STRATEGY = AzureEventHubComponent.AUTHENTICATION_STRATEGY; static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER = AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER; + static final PropertyDescriptor EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER = AzureEventHubComponent.IDENTITY_FEDERATION_TOKEN_PROVIDER; static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder() .name("Shared Access Policy Name") .description("The name of the shared access policy. This policy must have Listen claims.") @@ -173,6 +175,7 @@ public class GetAzureEventHub extends AbstractProcessor implements AzureEventHub POLICY_PRIMARY_KEY, AUTHENTICATION_STRATEGY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, + EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, CONSUMER_GROUP, ENQUEUE_TIME, RECEIVER_FETCH_SIZE, @@ -210,7 +213,8 @@ public final List getSupportedPropertyDescriptors() { @Override protected Collection customValidate(ValidationContext context) { - return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, context); + return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, + EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, context); } @OnPrimaryNodeStateChange @@ -437,6 +441,13 @@ private EventHubClientBuilder createEventHubClientBuilder(final ProcessContext c final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider); eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential); } + case IDENTITY_FEDERATION -> { + final AzureIdentityFederationTokenProvider tokenProvider = + context.getProperty(EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER) + .asControllerService(AzureIdentityFederationTokenProvider.class); + final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider); + eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential); + } } // Set Azure Event Hub Client Identifier using Processor Identifier instead of default random UUID diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java index 2ec7d310d181..0ace4dbb9546 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java @@ -51,6 +51,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils; import org.apache.nifi.processors.azure.storage.utils.FlowFileResultCarrier; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; @@ -90,6 +91,7 @@ public class PutAzureEventHub extends AbstractProcessor implements AzureEventHub static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT; static final PropertyDescriptor AUTHENTICATION_STRATEGY = AzureEventHubComponent.AUTHENTICATION_STRATEGY; static final PropertyDescriptor EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER = AzureEventHubComponent.OAUTH2_ACCESS_TOKEN_PROVIDER; + static final PropertyDescriptor EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER = AzureEventHubComponent.IDENTITY_FEDERATION_TOKEN_PROVIDER; static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder() .name("Shared Access Policy Name") .description("The name of the shared access policy. This policy must have Send claims.") @@ -134,6 +136,7 @@ public class PutAzureEventHub extends AbstractProcessor implements AzureEventHub POLICY_PRIMARY_KEY, AUTHENTICATION_STRATEGY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, + EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, PARTITIONING_KEY_ATTRIBUTE_NAME, MAX_BATCH_SIZE, PROXY_CONFIGURATION_SERVICE @@ -172,7 +175,8 @@ public void closeClient() { @Override protected Collection customValidate(ValidationContext context) { - return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, context); + return AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, + EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, context); } @Override @@ -261,6 +265,13 @@ protected EventHubProducerClient createEventHubProducerClient(final ProcessConte final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider); eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential); } + case IDENTITY_FEDERATION -> { + final AzureIdentityFederationTokenProvider tokenProvider = + context.getProperty(EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER) + .asControllerService(AzureIdentityFederationTokenProvider.class); + final TokenCredential tokenCredential = AzureEventHubUtils.createTokenCredential(tokenProvider); + eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, tokenCredential); + } } AzureEventHubUtils.getProxyOptions(context).ifPresent(eventHubClientBuilder::proxyOptions); return eventHubClientBuilder.buildProducerClient(); diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java index 566d08b0569f..4c661e3ae245 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java @@ -28,6 +28,7 @@ import org.apache.nifi.oauth2.OAuth2AccessTokenProvider; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.proxy.ProxyConfiguration; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubComponent; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; @@ -35,16 +36,12 @@ import java.net.InetSocketAddress; import java.net.Proxy; -import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.TimeUnit; public final class AzureEventHubUtils { @@ -76,44 +73,21 @@ public final class AzureEventHubUtils { .required(true) .build(); - private static final long DEFAULT_TOKEN_LIFETIME_SECONDS = TimeUnit.MINUTES.toSeconds(5); - public static List customValidate(PropertyDescriptor accessPolicyDescriptor, PropertyDescriptor policyKeyDescriptor, - PropertyDescriptor tokenProviderDescriptor, + PropertyDescriptor oauth2TokenProviderDescriptor, + PropertyDescriptor identityFederationTokenProviderDescriptor, ValidationContext context) { List validationResults = new ArrayList<>(); - boolean accessPolicyIsSet = context.getProperty(accessPolicyDescriptor).isSet(); boolean policyKeyIsSet = context.getProperty(policyKeyDescriptor).isSet(); final AzureEventHubAuthenticationStrategy authenticationStrategy = Optional.ofNullable( - context.getProperty(AzureEventHubComponent.AUTHENTICATION_STRATEGY) - .asAllowableValue(AzureEventHubAuthenticationStrategy.class)) + context.getProperty(AzureEventHubComponent.AUTHENTICATION_STRATEGY).asAllowableValue(AzureEventHubAuthenticationStrategy.class)) .orElse(AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY); - final boolean tokenProviderIsSet = tokenProviderDescriptor != null && context.getProperty(tokenProviderDescriptor).isSet(); switch (authenticationStrategy) { - case MANAGED_IDENTITY -> { - if (accessPolicyIsSet || policyKeyIsSet) { - final String msg = String.format( - "When '%s' is set to '%s', '%s' and '%s' must not be set.", - AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(), - AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getDisplayName(), - accessPolicyDescriptor.getDisplayName(), - policyKeyDescriptor.getDisplayName() - ); - validationResults.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build()); - } - if (tokenProviderIsSet) { - validationResults.add(new ValidationResult.Builder() - .subject(Objects.requireNonNull(tokenProviderDescriptor).getDisplayName()) - .valid(false) - .explanation(String.format("'%s' must not be set when '%s' is '%s'.", - tokenProviderDescriptor.getDisplayName(), - AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(), - AzureEventHubAuthenticationStrategy.MANAGED_IDENTITY.getDisplayName())) - .build()); - } + case MANAGED_IDENTITY, OAUTH2, IDENTITY_FEDERATION -> { + // Rely on required property + dependsOn validation to ensure proper configuration } case SHARED_ACCESS_SIGNATURE -> { if (!accessPolicyIsSet || !policyKeyIsSet) { @@ -126,40 +100,9 @@ public static List customValidate(PropertyDescriptor accessPol ); validationResults.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build()); } - if (tokenProviderIsSet) { - validationResults.add(new ValidationResult.Builder() - .subject(Objects.requireNonNull(tokenProviderDescriptor).getDisplayName()) - .valid(false) - .explanation(String.format("'%s' must not be set when '%s' is '%s'.", - tokenProviderDescriptor.getDisplayName(), - AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(), - AzureEventHubAuthenticationStrategy.SHARED_ACCESS_SIGNATURE.getDisplayName())) - .build()); - } - } - case OAUTH2 -> { - if (accessPolicyIsSet || policyKeyIsSet) { - final String msg = String.format( - "When '%s' is set to '%s', '%s' and '%s' must not be set.", - AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(), - AzureEventHubAuthenticationStrategy.OAUTH2.getDisplayName(), - accessPolicyDescriptor.getDisplayName(), - policyKeyDescriptor.getDisplayName() - ); - validationResults.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build()); - } - if (!tokenProviderIsSet) { - validationResults.add(new ValidationResult.Builder() - .subject(Objects.requireNonNull(tokenProviderDescriptor).getDisplayName()) - .valid(false) - .explanation(String.format("'%s' must be set when '%s' is '%s'.", - tokenProviderDescriptor.getDisplayName(), - AzureEventHubComponent.AUTHENTICATION_STRATEGY.getDisplayName(), - AzureEventHubAuthenticationStrategy.OAUTH2.getDisplayName())) - .build()); - } } } + ProxyConfiguration.validateProxySpec(context, validationResults, AzureEventHubComponent.PROXY_SPECS); return validationResults; } @@ -215,24 +158,27 @@ public static TokenCredential createTokenCredential(final OAuth2AccessTokenProvi Objects.requireNonNull(tokenProvider, "OAuth2 Access Token Provider is required"); return tokenRequestContext -> Mono.fromSupplier(() -> { - final org.apache.nifi.oauth2.AccessToken accessDetails = tokenProvider.getAccessDetails(); - final String accessToken = accessDetails.getAccessToken(); - - if (accessToken == null || accessToken.isBlank()) { - throw new IllegalStateException("OAuth2 Access Token Provider returned an empty access token"); + final org.apache.nifi.oauth2.AccessToken accessToken = tokenProvider.getAccessDetails(); + Objects.requireNonNull(accessToken, "Access Token is required"); + final String tokenValue = accessToken.getAccessToken(); + if (tokenValue == null || tokenValue.isEmpty()) { + throw new IllegalStateException("Access Token value is required"); } - - final Instant fetchTime = accessDetails.getFetchTime(); - final long expiresInSeconds = accessDetails.getExpiresIn(); - final Instant expirationInstant = expiresInSeconds > 0 - ? fetchTime.plusSeconds(expiresInSeconds) - : fetchTime.plusSeconds(DEFAULT_TOKEN_LIFETIME_SECONDS); - final OffsetDateTime expiresAt = OffsetDateTime.ofInstant(expirationInstant, ZoneOffset.UTC); - - return new com.azure.core.credential.AccessToken(accessToken, expiresAt); + final java.time.Instant fetchTime = Objects.requireNonNull(accessToken.getFetchTime(), "Access Token fetch time required"); + final long expiresIn = accessToken.getExpiresIn(); + final java.time.Instant expirationInstant = expiresIn > 0 + ? fetchTime.plusSeconds(expiresIn) + : fetchTime.plusSeconds(300); + final java.time.OffsetDateTime expirationTime = java.time.OffsetDateTime.ofInstant(expirationInstant, java.time.ZoneOffset.UTC); + return new com.azure.core.credential.AccessToken(tokenValue, expirationTime); }); } + public static TokenCredential createTokenCredential(final AzureIdentityFederationTokenProvider tokenProvider) { + Objects.requireNonNull(tokenProvider, "Identity Federation Token Provider is required"); + return tokenProvider.getCredentials(); + } + private static Proxy getProxy(ProxyConfiguration proxyConfiguration) { final Proxy.Type type; if (proxyConfiguration.getProxyType() == Proxy.Type.HTTP) { diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java index 24029168f6b3..c93680b27fac 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java @@ -58,11 +58,11 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy; import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12; import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12; import org.apache.nifi.services.azure.storage.AzureStorageCredentialsType; -import reactor.core.publisher.Mono; import java.text.DecimalFormat; import java.time.OffsetDateTime; @@ -364,9 +364,9 @@ private static AzureStorageCredentialsService_v12 getCopyFromCredentialsService( private static HttpAuthorization getHttpAuthorization(final AzureStorageCredentialsDetails_v12 credentialsDetails) { switch (credentialsDetails.getCredentialsType()) { - case ACCESS_TOKEN -> { - TokenCredential credential = tokenRequestContext -> Mono.just(credentialsDetails.getAccessToken()); - return getHttpAuthorizationFromTokenCredential(credential); + case IDENTITY_FEDERATION -> { + final AzureIdentityFederationTokenProvider identityTokenProvider = credentialsDetails.getIdentityTokenProvider(); + return getHttpAuthorizationFromTokenCredential(identityTokenProvider.getCredentials()); } case MANAGED_IDENTITY -> { final ManagedIdentityCredential credential = new ManagedIdentityCredentialBuilder() diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage_v12.java index 3318c8b817cf..9a9267a47dfa 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage_v12.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage_v12.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.azure.storage.queue; import com.azure.core.credential.AzureSasCredential; -import com.azure.core.credential.TokenCredential; import com.azure.core.http.HttpClient; import com.azure.core.http.ProxyOptions; import com.azure.core.http.netty.NettyAsyncHttpClientBuilder; @@ -42,9 +41,9 @@ import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.proxy.ProxySpec; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12; import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12; -import reactor.core.publisher.Mono; import java.util.ArrayList; import java.util.Collection; @@ -176,9 +175,9 @@ private void processCredentials(final QueueClientBuilder clientBuilder, final Az .clientSecret(storageCredentialsDetails.getServicePrincipalClientSecret()) .build()); break; - case ACCESS_TOKEN: - TokenCredential credential = tokenRequestContext -> Mono.just(storageCredentialsDetails.getAccessToken()); - clientBuilder.credential(credential); + case IDENTITY_FEDERATION: + final AzureIdentityFederationTokenProvider identityTokenProvider = storageCredentialsDetails.getIdentityTokenProvider(); + clientBuilder.credential(identityTokenProvider.getCredentials()); break; } } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java index 26030d307537..b1ff3013701d 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java @@ -30,6 +30,7 @@ import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.proxy.ProxySpec; import org.apache.nifi.proxy.SocksVersion; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.services.azure.storage.ADLSCredentialsService; import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy; import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12; @@ -85,7 +86,8 @@ public final class AzureStorageUtils { AzureStorageCredentialsType.ACCOUNT_KEY, AzureStorageCredentialsType.SAS_TOKEN, AzureStorageCredentialsType.MANAGED_IDENTITY, - AzureStorageCredentialsType.SERVICE_PRINCIPAL)) + AzureStorageCredentialsType.SERVICE_PRINCIPAL, + AzureStorageCredentialsType.IDENTITY_FEDERATION)) .defaultValue(AzureStorageCredentialsType.SAS_TOKEN) .build(); @@ -252,6 +254,14 @@ public final class AzureStorageUtils { .dependsOn(CREDENTIALS_TYPE, AzureStorageCredentialsType.SERVICE_PRINCIPAL) .build(); + public static final PropertyDescriptor IDENTITY_FEDERATION_TOKEN_PROVIDER = new PropertyDescriptor.Builder() + .name("Identity Federation Token Provider") + .description("Controller Service that provides Azure credentials via workload identity federation.") + .identifiesControllerService(AzureIdentityFederationTokenProvider.class) + .required(true) + .dependsOn(CREDENTIALS_TYPE, AzureStorageCredentialsType.IDENTITY_FEDERATION) + .build(); + private AzureStorageUtils() { // do not instantiate } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactory.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactory.java index fb811f6de29d..cc7aa8d47bab 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactory.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobServiceClientFactory.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.azure.storage.utils; import com.azure.core.credential.AzureSasCredential; -import com.azure.core.credential.TokenCredential; import com.azure.core.http.ProxyOptions; import com.azure.core.http.netty.NettyAsyncHttpClientBuilder; import com.azure.core.util.ClientOptions; @@ -28,8 +27,8 @@ import com.azure.storage.blob.BlobServiceClientBuilder; import com.azure.storage.common.StorageSharedKeyCredential; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails_v12; -import reactor.core.publisher.Mono; public class BlobServiceClientFactory extends AbstractStorageClientFactory { @@ -73,12 +72,12 @@ private void configureCredential(final BlobServiceClientBuilder clientBuilder, f .clientSecret(credentialsDetails.getServicePrincipalClientSecret()) .httpClient(new NettyAsyncHttpClientBuilder() .proxy(credentialsDetails.getProxyOptions()) - .build()) + .build()) .build()); break; - case ACCESS_TOKEN: - TokenCredential credential = tokenRequestContext -> Mono.just(credentialsDetails.getAccessToken()); - clientBuilder.credential(credential); + case IDENTITY_FEDERATION: + final AzureIdentityFederationTokenProvider identityTokenProvider = credentialsDetails.getIdentityTokenProvider(); + clientBuilder.credential(identityTokenProvider.getCredentials()); break; } } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java index bebf1cebd954..b1c7cd3152ed 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/DataLakeServiceClientFactory.java @@ -31,6 +31,7 @@ import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails; import reactor.core.publisher.Mono; @@ -46,6 +47,7 @@ protected DataLakeServiceClient createStorageClient(ADLSCredentialsDetails crede final String accountKey = credentialsDetails.getAccountKey(); final String sasToken = credentialsDetails.getSasToken(); final AccessToken accessToken = credentialsDetails.getAccessToken(); + final AzureIdentityFederationTokenProvider identityTokenProvider = credentialsDetails.getIdentityTokenProvider(); final String endpointSuffix = credentialsDetails.getEndpointSuffix(); final boolean useManagedIdentity = credentialsDetails.getUseManagedIdentity(); final String managedIdentityClientId = credentialsDetails.getManagedIdentityClientId(); @@ -64,6 +66,8 @@ protected DataLakeServiceClient createStorageClient(ADLSCredentialsDetails crede dataLakeServiceClientBuilder.credential(credential); } else if (StringUtils.isNotBlank(sasToken)) { dataLakeServiceClientBuilder.sasToken(sasToken); + } else if (identityTokenProvider != null) { + dataLakeServiceClientBuilder.credential(identityTokenProvider.getCredentials()); } else if (accessToken != null) { final TokenCredential credential = tokenRequestContext -> Mono.just(accessToken); dataLakeServiceClientBuilder.credential(credential); diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureCredentialsControllerService.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureCredentialsControllerService.java index fb648155c947..a4663df41056 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureCredentialsControllerService.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureCredentialsControllerService.java @@ -53,12 +53,15 @@ public class StandardAzureCredentialsControllerService extends AbstractControlle public static AllowableValue SERVICE_PRINCIPAL = new AllowableValue("service-principal", "Service Principal", "Azure Active Directory Service Principal with Client ID / Client Secret of a registered application"); + public static AllowableValue IDENTITY_FEDERATION = new AllowableValue("identity-federation", + "Identity Federation", + "Uses workload identity federation to obtain access tokens for Azure clients via an external identity token."); public static final PropertyDescriptor CREDENTIAL_CONFIGURATION_STRATEGY = new PropertyDescriptor.Builder() .name("Credential Configuration Strategy") .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(true) .sensitive(false) - .allowableValues(DEFAULT_CREDENTIAL, MANAGED_IDENTITY, SERVICE_PRINCIPAL) + .allowableValues(DEFAULT_CREDENTIAL, MANAGED_IDENTITY, SERVICE_PRINCIPAL, IDENTITY_FEDERATION) .defaultValue(DEFAULT_CREDENTIAL) .build(); @@ -103,12 +106,21 @@ public class StandardAzureCredentialsControllerService extends AbstractControlle .dependsOn(CREDENTIAL_CONFIGURATION_STRATEGY, SERVICE_PRINCIPAL) .build(); + public static final PropertyDescriptor IDENTITY_FEDERATION_TOKEN_PROVIDER = new PropertyDescriptor.Builder() + .name("Identity Federation Token Provider") + .description("Controller Service that provides Azure credentials via workload identity federation.") + .identifiesControllerService(AzureIdentityFederationTokenProvider.class) + .required(true) + .dependsOn(CREDENTIAL_CONFIGURATION_STRATEGY, IDENTITY_FEDERATION) + .build(); + private static final List PROPERTY_DESCRIPTORS = List.of( CREDENTIAL_CONFIGURATION_STRATEGY, MANAGED_IDENTITY_CLIENT_ID, SERVICE_PRINCIPAL_TENANT_ID, SERVICE_PRINCIPAL_CLIENT_ID, - SERVICE_PRINCIPAL_CLIENT_SECRET + SERVICE_PRINCIPAL_CLIENT_SECRET, + IDENTITY_FEDERATION_TOKEN_PROVIDER ); private TokenCredential credentials; @@ -133,6 +145,8 @@ public void onConfigured(final ConfigurationContext context) { credentials = getManagedIdentityCredential(context); } else if (SERVICE_PRINCIPAL.getValue().equals(configurationStrategy)) { credentials = getServicePrincipalCredential(context); + } else if (IDENTITY_FEDERATION.getValue().equals(configurationStrategy)) { + credentials = getIdentityFederationCredential(context); } else { final String errorMsg = String.format("Configuration Strategy [%s] not recognized", configurationStrategy); getLogger().error(errorMsg); @@ -178,6 +192,12 @@ private TokenCredential getServicePrincipalCredential(final ConfigurationContext .build(); } + private TokenCredential getIdentityFederationCredential(final ConfigurationContext context) { + final AzureIdentityFederationTokenProvider identityFederationTokenProvider = context.getProperty(IDENTITY_FEDERATION_TOKEN_PROVIDER) + .asControllerService(AzureIdentityFederationTokenProvider.class); + return identityFederationTokenProvider.getCredentials(); + } + @Override public String toString() { return "StandardAzureCredentialsControllerService[id=" + getIdentifier() + "]"; diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureIdentityFederationTokenProvider.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureIdentityFederationTokenProvider.java new file mode 100644 index 000000000000..29e6fa4ce644 --- /dev/null +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/StandardAzureIdentityFederationTokenProvider.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure; + +import com.azure.core.credential.TokenCredential; +import com.azure.core.credential.TokenRequestContext; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.ConfigVerificationResult.Outcome; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.VerifiableControllerService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.oauth2.OAuth2AccessTokenProvider; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.services.azure.util.AzureWorkloadIdentityCredentialUtils; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * Controller Service that provides Azure {@link TokenCredential} for workload identity federation. + * Uses {@link AzureWorkloadIdentityCredentialUtils} to build the credential from an external identity token. + */ +@Tags({"azure", "identity", "federation", "credentials", "workload"}) +@CapabilityDescription("Provides Azure TokenCredential for workload identity federation. " + + "Exchanges external identity tokens (from an OAuth2 provider) for Azure AD access tokens " + + "using Azure Identity SDK's ClientAssertionCredential with built-in caching and retry logic.") +public class StandardAzureIdentityFederationTokenProvider extends AbstractControllerService + implements AzureIdentityFederationTokenProvider, VerifiableControllerService { + + private static final String DEFAULT_VERIFICATION_SCOPE = "https://storage.azure.com/.default"; + private static final String ERROR_EXCHANGE_FAILED = "Failed to exchange workload identity token: %s"; + private static final String STEP_EXCHANGE_TOKEN = "Exchange workload identity token"; + + public static final PropertyDescriptor TENANT_ID = new PropertyDescriptor.Builder() + .name("Tenant ID") + .description("Microsoft Entra tenant ID.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor CLIENT_ID = new PropertyDescriptor.Builder() + .name("Client ID") + .description("Application (client) ID of the Microsoft Entra application registration configured for workload identity federation.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor CLIENT_ASSERTION_PROVIDER = new PropertyDescriptor.Builder() + .name("Client Assertion Provider") + .description("Controller Service that retrieves the external workload identity token (client assertion) exchanged with Azure AD.") + .identifiesControllerService(OAuth2AccessTokenProvider.class) + .required(true) + .build(); + + private static final List DESCRIPTORS = List.of( + TENANT_ID, + CLIENT_ID, + CLIENT_ASSERTION_PROVIDER + ); + + private volatile TokenCredential credential; + + @Override + protected List getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + final String tenantId = context.getProperty(TENANT_ID).getValue(); + final String clientId = context.getProperty(CLIENT_ID).getValue(); + final OAuth2AccessTokenProvider clientAssertionProvider = context.getProperty(CLIENT_ASSERTION_PROVIDER) + .asControllerService(OAuth2AccessTokenProvider.class); + + this.credential = AzureWorkloadIdentityCredentialUtils.createCredential(tenantId, clientId, clientAssertionProvider); + } + + @Override + public TokenCredential getCredentials() { + return credential; + } + + @Override + public List verify(final ConfigurationContext context, + final ComponentLog verificationLogger, + final Map variables) { + final ConfigVerificationResult.Builder resultBuilder = new ConfigVerificationResult.Builder() + .verificationStepName(STEP_EXCHANGE_TOKEN); + + try { + final String tenantId = context.getProperty(TENANT_ID).getValue(); + final String clientId = context.getProperty(CLIENT_ID).getValue(); + final OAuth2AccessTokenProvider verificationAssertionProvider = context.getProperty(CLIENT_ASSERTION_PROVIDER) + .asControllerService(OAuth2AccessTokenProvider.class); + + final TokenCredential verificationCredential = AzureWorkloadIdentityCredentialUtils.createCredential( + tenantId, clientId, verificationAssertionProvider); + + final TokenRequestContext tokenRequestContext = new TokenRequestContext().addScopes(DEFAULT_VERIFICATION_SCOPE); + verificationCredential.getToken(tokenRequestContext).block(); + + return Collections.singletonList(resultBuilder + .outcome(Outcome.SUCCESSFUL) + .explanation("Successfully exchanged workload identity token for an Azure AD access token") + .build()); + } catch (final Exception e) { + final String explanation = String.format(ERROR_EXCHANGE_FAILED, e.getMessage()); + verificationLogger.error(explanation, e); + return Collections.singletonList(resultBuilder + .outcome(Outcome.FAILED) + .explanation(explanation) + .build()); + } + } +} diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsControllerService.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsControllerService.java index 6a223fcaaedf..81206eab8677 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsControllerService.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsControllerService.java @@ -28,6 +28,7 @@ import org.apache.nifi.migration.ProxyServiceMigration; import org.apache.nifi.processors.azure.AzureServiceEndpoints; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import java.util.List; import java.util.Map; @@ -88,6 +89,7 @@ public class ADLSCredentialsControllerService extends AbstractControllerService SERVICE_PRINCIPAL_TENANT_ID, SERVICE_PRINCIPAL_CLIENT_ID, SERVICE_PRINCIPAL_CLIENT_SECRET, + AzureStorageUtils.IDENTITY_FEDERATION_TOKEN_PROVIDER, PROXY_CONFIGURATION_SERVICE ); @@ -129,6 +131,12 @@ public void migrateProperties(PropertyConfiguration config) { config.removeProperty(propNameUseManagedIdentity); } + + // Migrate ACCESS_TOKEN credential type to IDENTITY_FEDERATION + config.getPropertyValue(CREDENTIALS_TYPE.getName()) + .filter("ACCESS_TOKEN"::equals) + .ifPresent(value -> config.setProperty(CREDENTIALS_TYPE.getName(), AzureStorageCredentialsType.IDENTITY_FEDERATION.getValue())); + ProxyServiceMigration.renameProxyConfigurationServiceProperty(config); } @@ -152,6 +160,12 @@ public ADLSCredentialsDetails getCredentialsDetails(Map attribut setValue(credentialsBuilder, SERVICE_PRINCIPAL_CLIENT_ID, PropertyValue::getValue, ADLSCredentialsDetails.Builder::setServicePrincipalClientId, attributes); setValue(credentialsBuilder, SERVICE_PRINCIPAL_CLIENT_SECRET, PropertyValue::getValue, ADLSCredentialsDetails.Builder::setServicePrincipalClientSecret, attributes); + if (context.getProperty(CREDENTIALS_TYPE).asAllowableValue(AzureStorageCredentialsType.class) == AzureStorageCredentialsType.IDENTITY_FEDERATION) { + final AzureIdentityFederationTokenProvider identityTokenProvider = context.getProperty(AzureStorageUtils.IDENTITY_FEDERATION_TOKEN_PROVIDER) + .asControllerService(AzureIdentityFederationTokenProvider.class); + credentialsBuilder.setIdentityTokenProvider(identityTokenProvider); + } + credentialsBuilder.setProxyOptions(AzureStorageUtils.getProxyOptions(context)); return credentialsBuilder.build(); diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java index 9c761b0e97c0..415748db9521 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerService_v12.java @@ -27,6 +27,7 @@ import org.apache.nifi.migration.ProxyServiceMigration; import org.apache.nifi.processors.azure.AzureServiceEndpoints; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import java.util.List; import java.util.Map; @@ -34,6 +35,7 @@ import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ACCOUNT_KEY; import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ACCOUNT_NAME; import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.CREDENTIALS_TYPE; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.IDENTITY_FEDERATION_TOKEN_PROVIDER; import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.MANAGED_IDENTITY_CLIENT_ID; import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.SAS_TOKEN; import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.SERVICE_PRINCIPAL_CLIENT_ID; @@ -69,6 +71,7 @@ public class AzureStorageCredentialsControllerService_v12 extends AbstractContro SERVICE_PRINCIPAL_TENANT_ID, SERVICE_PRINCIPAL_CLIENT_ID, SERVICE_PRINCIPAL_CLIENT_SECRET, + IDENTITY_FEDERATION_TOKEN_PROVIDER, PROXY_CONFIGURATION_SERVICE ); @@ -107,9 +110,16 @@ public AzureStorageCredentialsDetails_v12 getCredentialsDetails(Map config.setProperty(CREDENTIALS_TYPE.getName(), AzureStorageCredentialsType.IDENTITY_FEDERATION.getValue())); } } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/util/AzureWorkloadIdentityCredentialUtils.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/util/AzureWorkloadIdentityCredentialUtils.java new file mode 100644 index 000000000000..bbb51aee3434 --- /dev/null +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/util/AzureWorkloadIdentityCredentialUtils.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.util; + +import com.azure.core.credential.TokenCredential; +import com.azure.core.http.HttpClient; +import com.azure.core.http.netty.NettyAsyncHttpClientBuilder; +import com.azure.identity.ClientAssertionCredentialBuilder; +import org.apache.nifi.oauth2.AccessToken; +import org.apache.nifi.oauth2.OAuth2AccessTokenProvider; + +import java.util.Objects; +import java.util.function.Supplier; + +/** + * Utility class for building Azure {@link TokenCredential} objects for workload identity federation. + *

+ * This utility simplifies the creation of {@link com.azure.identity.ClientAssertionCredential} by providing + * factory methods that accept common NiFi components like {@link OAuth2AccessTokenProvider}. + */ +public final class AzureWorkloadIdentityCredentialUtils { + + private AzureWorkloadIdentityCredentialUtils() { + // Utility class - prevent instantiation + } + + /** + * Creates a {@link TokenCredential} for Azure workload identity federation using the provided + * OAuth2 access token provider as the source of client assertions. + * + * @param tenantId the Microsoft Entra tenant ID + * @param clientId the application (client) ID of the Microsoft Entra application + * @param clientAssertionProvider the OAuth2 access token provider that supplies the client assertion token + * @return a TokenCredential configured for workload identity federation + * @throws NullPointerException if any parameter is null + */ + public static TokenCredential createCredential( + final String tenantId, + final String clientId, + final OAuth2AccessTokenProvider clientAssertionProvider) { + Objects.requireNonNull(tenantId, "Tenant ID is required"); + Objects.requireNonNull(clientId, "Client ID is required"); + Objects.requireNonNull(clientAssertionProvider, "Client Assertion Provider is required"); + + return createCredential(tenantId, clientId, () -> getClientAssertion(clientAssertionProvider)); + } + + /** + * Creates a {@link TokenCredential} for Azure workload identity federation using a custom + * client assertion supplier. + * + * @param tenantId the Microsoft Entra tenant ID + * @param clientId the application (client) ID of the Microsoft Entra application + * @param clientAssertionSupplier a supplier that provides the client assertion token string + * @return a TokenCredential configured for workload identity federation + * @throws NullPointerException if any parameter is null + */ + public static TokenCredential createCredential( + final String tenantId, + final String clientId, + final Supplier clientAssertionSupplier) { + Objects.requireNonNull(tenantId, "Tenant ID is required"); + Objects.requireNonNull(clientId, "Client ID is required"); + Objects.requireNonNull(clientAssertionSupplier, "Client Assertion Supplier is required"); + + return createCredential(tenantId, clientId, clientAssertionSupplier, new NettyAsyncHttpClientBuilder().build()); + } + + /** + * Creates a {@link TokenCredential} for Azure workload identity federation using a custom + * client assertion supplier and HTTP client. + * + * @param tenantId the Microsoft Entra tenant ID + * @param clientId the application (client) ID of the Microsoft Entra application + * @param clientAssertionSupplier a supplier that provides the client assertion token string + * @param httpClient the HTTP client to use for token requests + * @return a TokenCredential configured for workload identity federation + * @throws NullPointerException if any parameter is null + */ + public static TokenCredential createCredential( + final String tenantId, + final String clientId, + final Supplier clientAssertionSupplier, + final HttpClient httpClient) { + Objects.requireNonNull(tenantId, "Tenant ID is required"); + Objects.requireNonNull(clientId, "Client ID is required"); + Objects.requireNonNull(clientAssertionSupplier, "Client Assertion Supplier is required"); + Objects.requireNonNull(httpClient, "HTTP Client is required"); + + return new ClientAssertionCredentialBuilder() + .tenantId(tenantId) + .clientId(clientId) + .clientAssertion(clientAssertionSupplier) + .httpClient(httpClient) + .build(); + } + + /** + * Extracts the client assertion token from an OAuth2 access token provider. + * + * @param provider the OAuth2 access token provider + * @return the client assertion token string + * @throws IllegalStateException if the provider returns null or an empty token + */ + private static String getClientAssertion(final OAuth2AccessTokenProvider provider) { + final AccessToken accessToken = provider.getAccessDetails(); + if (accessToken == null) { + throw new IllegalStateException("Client assertion provider returned null"); + } + final String assertion = accessToken.getAccessToken(); + if (assertion == null || assertion.isBlank()) { + throw new IllegalStateException("Client assertion provider returned empty token"); + } + return assertion; + } +} diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java index 88398e82ec65..efcd1cf1efa8 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubAuthenticationStrategy.java @@ -24,7 +24,8 @@ public enum AzureEventHubAuthenticationStrategy implements DescribedValue { MANAGED_IDENTITY("Managed Identity", "Authenticate using the Managed Identity of the hosting Azure resource."), SHARED_ACCESS_SIGNATURE("Shared Access Signature", "Authenticate using the Shared Access Policy name and key."), - OAUTH2("OAuth2", "Authenticate using an OAuth2 Access Token Provider backed by an Entra registered application."); + OAUTH2("OAuth2", "Authenticate using an OAuth2 Access Token Provider backed by an Entra registered application."), + IDENTITY_FEDERATION("Identity Federation", "Authenticate using a workload identity token exchanged for an Azure AD access token."); private final String displayName; private final String description; diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java index caf503f02411..28f738725da5 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/AzureEventHubComponent.java @@ -22,6 +22,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.proxy.ProxySpec; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; /** * Azure Event Hub Component interface with shared properties @@ -52,6 +53,14 @@ public interface AzureEventHubComponent { .expressionLanguageSupported(ExpressionLanguageScope.NONE) .dependsOn(AUTHENTICATION_STRATEGY, AzureEventHubAuthenticationStrategy.OAUTH2) .build(); + PropertyDescriptor IDENTITY_FEDERATION_TOKEN_PROVIDER = new PropertyDescriptor.Builder() + .name("Event Hubs Identity Federation Token Provider") + .description("Controller Service exchanging workload identity tokens for Azure AD access tokens when authenticating to Azure Event Hubs.") + .identifiesControllerService(AzureIdentityFederationTokenProvider.class) + .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .dependsOn(AUTHENTICATION_STRATEGY, AzureEventHubAuthenticationStrategy.IDENTITY_FEDERATION) + .build(); ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP, ProxySpec.HTTP_AUTH}; PropertyDescriptor PROXY_CONFIGURATION_SERVICE = new PropertyDescriptor.Builder() .fromPropertyDescriptor(ProxyConfiguration.createProxyConfigPropertyDescriptor(PROXY_SPECS)) diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java index 3ab03cc3f614..121f66339a4f 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/shared/azure/eventhubs/BlobStorageAuthenticationStrategy.java @@ -21,7 +21,8 @@ public enum BlobStorageAuthenticationStrategy implements DescribedValue { STORAGE_ACCOUNT_KEY("Storage Account Key", "Authenticate to Azure Blob Storage using the account key."), SHARED_ACCESS_SIGNATURE("Shared Access Signature", "Authenticate to Azure Blob Storage using a SAS token."), - OAUTH2("OAuth2", "Authenticate to Azure Blob Storage using an OAuth2 Access Token Provider backed by an Entra registered application."); + OAUTH2("OAuth2", "Authenticate to Azure Blob Storage using an OAuth2 Access Token Provider backed by an Entra registered application."), + IDENTITY_FEDERATION("Identity Federation", "Authenticate to Azure Blob Storage using a workload identity token exchanged for an Azure AD access token."); private final String displayName; private final String description; diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 2823c4f3fcdc..e76b973a7817 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -21,5 +21,6 @@ org.apache.nifi.services.azure.data.explorer.StandardKustoIngestService org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12 org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup_v12 org.apache.nifi.services.azure.StandardAzureCredentialsControllerService +org.apache.nifi.services.azure.StandardAzureIdentityFederationTokenProvider org.apache.nifi.services.azure.storage.AzureBlobStorageFileResourceService org.apache.nifi.services.azure.storage.AzureDataLakeStorageFileResourceService diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.StandardAzureIdentityFederationTokenProvider/additionalDetails.md b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.StandardAzureIdentityFederationTokenProvider/additionalDetails.md new file mode 100644 index 000000000000..7747c20046de --- /dev/null +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.StandardAzureIdentityFederationTokenProvider/additionalDetails.md @@ -0,0 +1,55 @@ + + +# StandardAzureIdentityFederationTokenProvider + +The *StandardAzureIdentityFederationTokenProvider* provides Azure `TokenCredential` for workload identity federation. It exchanges tokens from external identity providers for Azure AD credentials using Azure Identity SDK's `ClientAssertionCredential`. This approach provides built-in token caching, automatic refresh, and robust error handling. + +Components such as the ADLS and Azure Storage credentials controller services reference this provider when the **Credentials Type** is set to **Access Token**. + + +## Configuration workflow + +1. **Client Assertion Provider** – Select a controller service that retrieves the external workload identity token. The token is passed to Azure AD as the `client_assertion` parameter. +2. **Tenant ID** and **Client ID** – Provide the Microsoft Entra tenant and application (client) ID for the federated app registration. + +At runtime the service provides a `TokenCredential` backed by `ClientAssertionCredential`. When the consuming component requests a token (specifying the appropriate scope), the credential exchanges the client assertion for an Azure AD access token via `https://login.microsoftonline.com//oauth2/v2.0/token`. The Azure Identity SDK handles token caching and automatic refresh when tokens expire. + +Ensure the federated app registration has the necessary Azure RBAC roles (for example *Storage Blob Data Contributor* and *Azure Event Hubs Data Receiver/Sender* as appropriate) and that the client assertion provider refreshes assertions before they expire so new Azure access tokens can be obtained. + + +## Azure Resource Scopes + +Different Azure services require different scopes when requesting tokens. The scope is determined automatically by the consuming component based on the Azure service being accessed: + +- `https://storage.azure.com/.default` – Azure Storage operations (Blob, ADLS, Queue). +- `https://eventhubs.azure.net/.default` – Event Hubs operations. +- `https://management.azure.com/.default` – Azure Resource Manager APIs. + +> **Note**: Microsoft Entra requires a single resource (`*.default`) per client credentials request. + + +## Event Hub components + +- `GetAzureEventHub`, `PutAzureEventHub`, and `ConsumeAzureEventHub` support the **Identity Federation** authentication strategy for Event Hubs connections. +- `ConsumeAzureEventHub` also supports Identity Federation for the Blob Storage checkpoint store. + + +## Entra ID setup summary + +1. **Create or reuse an app registration** for NiFi in Microsoft Entra ID. +2. **Add a federated credential** (Certificates & secrets → Federated credentials) matching your issuer/subject. Set the audience to `api://AzureADTokenExchange`. +3. **Assign RBAC roles** to that app registration, such as `Storage Blob Data Reader`/`Storage Blob Data Contributor` on the storage account. +4. Record the **Tenant ID** and **Client ID** for configuring the controller service in NiFi. diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService/additionalDetails.md b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService/additionalDetails.md index d67c55ed98c2..6900c8a7754b 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService/additionalDetails.md +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService/additionalDetails.md @@ -15,6 +15,12 @@ # ADLSCredentialsControllerService +### Azure Identity Federation Token Provider + +When the **Credentials Type** property is set to `Access Token`, configure the **Azure Identity Federation Token Provider** with a controller service capable of exchanging workload identity tokens for Azure AD access tokens. The provider must return an `access_token` issued by Microsoft Entra ID (for example using the `StandardAzureIdentityFederationTokenProvider`). The access token is converted to the Azure SDK representation and cached in memory until it expires. + +The Azure client instances created by this service do not perform additional token refresh on their own. Ensure the configured Azure Identity Federation Token Provider automatically refreshes tokens before they expire, and that the configured scopes or audiences grant access to the target storage resources. + ### Security considerations of using Expression Language for sensitive properties Allowing Expression Language for a property has the advantage of configuring the property dynamically via FlowFile @@ -30,4 +36,4 @@ Best practices for using Expression Language for sensitive properties: * control access to the flow and to provenance repository * encrypt disks storing FlowFiles and provenance data * if the sensitive data is a temporary token (like the SAS token), use a shorter lifetime and refresh the token - periodically \ No newline at end of file + periodically diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java index d3a30188a294..7ab266b43ae2 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.azure.eventhub; +import com.azure.core.credential.TokenCredential; import com.azure.messaging.eventhubs.EventData; import com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties; import com.azure.messaging.eventhubs.models.PartitionContext; @@ -31,6 +32,7 @@ import org.apache.nifi.proxy.ProxyConfigurationService; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.scheduling.ExecutionNode; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; import org.apache.nifi.util.MockFlowFile; @@ -40,6 +42,7 @@ import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; import java.net.Proxy; import java.time.Instant; @@ -66,6 +69,7 @@ public class GetAzureEventHubTest { private static final String POLICY_KEY = "POLICY-KEY"; private static final String CONSUMER_GROUP = "$Default"; private static final String EVENT_HUB_OAUTH_SERVICE_ID = "get-event-hub-oauth"; + private static final String EVENT_HUB_IDENTITY_SERVICE_ID = "get-event-hub-identity"; private static final Instant ENQUEUED_TIME = Instant.now(); private static final long SEQUENCE_NUMBER = 32; private static final String OFFSET = "64"; @@ -170,6 +174,13 @@ private void configureEventHubOAuthTokenProvider() throws InitializationExceptio testRunner.setProperty(GetAzureEventHub.EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, EVENT_HUB_OAUTH_SERVICE_ID); } + private void configureEventHubIdentityTokenProvider() throws InitializationException { + final MockIdentityFederationTokenProvider provider = new MockIdentityFederationTokenProvider(); + testRunner.addControllerService(EVENT_HUB_IDENTITY_SERVICE_ID, provider); + testRunner.enableControllerService(provider); + testRunner.setProperty(GetAzureEventHub.EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, EVENT_HUB_IDENTITY_SERVICE_ID); + } + @Test public void testPropertiesManagedIdentity() { testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); @@ -193,6 +204,19 @@ public void testEventHubOAuthRequiresTokenProvider() throws InitializationExcept testRunner.assertValid(); } + @Test + public void testEventHubIdentityFederationRequiresTokenProvider() throws InitializationException { + testRunner.setProperty(GetAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); + testRunner.setProperty(GetAzureEventHub.NAMESPACE, EVENT_HUB_NAMESPACE); + testRunner.setProperty(GetAzureEventHub.AUTHENTICATION_STRATEGY, AzureEventHubAuthenticationStrategy.IDENTITY_FEDERATION.getValue()); + + testRunner.assertNotValid(); + + configureEventHubIdentityTokenProvider(); + + testRunner.assertValid(); + } + @Test public void testRunNoEventsReceived() { setProperties(); @@ -321,4 +345,12 @@ public AccessToken getAccessDetails() { return accessToken; } } + + private static class MockIdentityFederationTokenProvider extends AbstractControllerService implements AzureIdentityFederationTokenProvider { + @Override + public TokenCredential getCredentials() { + return tokenRequestContext -> Mono.just( + new com.azure.core.credential.AccessToken("access-token", java.time.OffsetDateTime.now().plusMinutes(5))); + } + } } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java index 5dffa9350fb7..9101324d0d9f 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.azure.eventhub; +import com.azure.core.credential.TokenCredential; import com.azure.messaging.eventhubs.EventHubProducerClient; import com.azure.messaging.eventhubs.models.SendOptions; import org.apache.nifi.controller.AbstractControllerService; @@ -27,6 +28,7 @@ import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.proxy.ProxyConfigurationService; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; import org.apache.nifi.util.MockPropertyConfiguration; @@ -40,6 +42,7 @@ import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import reactor.core.publisher.Mono; import java.net.Proxy; import java.util.Collections; @@ -66,6 +69,7 @@ public class PutAzureEventHubTest { private static final String PARTITION_KEY = "partition"; private static final String CONTENT = String.class.getSimpleName(); private static final String EVENT_HUB_OAUTH_SERVICE_ID = "put-event-hub-oauth"; + private static final String EVENT_HUB_IDENTITY_SERVICE_ID = "put-event-hub-identity"; @Mock EventHubProducerClient eventHubProducerClient; @@ -161,6 +165,13 @@ private void configureEventHubOAuthTokenProvider() throws InitializationExceptio testRunner.setProperty(PutAzureEventHub.EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, EVENT_HUB_OAUTH_SERVICE_ID); } + private void configureEventHubIdentityTokenProvider() throws InitializationException { + final MockIdentityFederationTokenProvider provider = new MockIdentityFederationTokenProvider(); + testRunner.addControllerService(EVENT_HUB_IDENTITY_SERVICE_ID, provider); + testRunner.enableControllerService(provider); + testRunner.setProperty(PutAzureEventHub.EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, EVENT_HUB_IDENTITY_SERVICE_ID); + } + @Test public void testPropertiesManagedIdentityEnabled() { testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); @@ -184,6 +195,19 @@ public void testEventHubOAuthRequiresTokenProvider() throws InitializationExcept testRunner.assertValid(); } + @Test + public void testEventHubIdentityFederationRequiresTokenProvider() throws InitializationException { + testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); + testRunner.setProperty(PutAzureEventHub.NAMESPACE, EVENT_HUB_NAMESPACE); + testRunner.setProperty(PutAzureEventHub.AUTHENTICATION_STRATEGY, AzureEventHubAuthenticationStrategy.IDENTITY_FEDERATION.getValue()); + + testRunner.assertNotValid(); + + configureEventHubIdentityTokenProvider(); + + testRunner.assertValid(); + } + @Test public void testRunNoFlowFiles() { setProperties(); @@ -279,4 +303,12 @@ public AccessToken getAccessDetails() { return accessToken; } } + + private static class MockIdentityFederationTokenProvider extends AbstractControllerService implements AzureIdentityFederationTokenProvider { + @Override + public TokenCredential getCredentials() { + return tokenRequestContext -> Mono.just( + new com.azure.core.credential.AccessToken("access-token", java.time.OffsetDateTime.now().plusMinutes(5))); + } + } } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java index 50a9c8cdf11b..39e519ad39e3 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.azure.eventhub; +import com.azure.core.credential.TokenCredential; import com.azure.messaging.eventhubs.CheckpointStore; import com.azure.messaging.eventhubs.EventData; import com.azure.messaging.eventhubs.EventProcessorClient; @@ -48,6 +49,7 @@ import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubAuthenticationStrategy; import org.apache.nifi.shared.azure.eventhubs.AzureEventHubTransportType; import org.apache.nifi.shared.azure.eventhubs.BlobStorageAuthenticationStrategy; @@ -106,7 +108,9 @@ public class TestConsumeAzureEventHub { private static final String APPLICATION_PROPERTY = "application"; private static final String APPLICATION_ATTRIBUTE_NAME = String.format("eventhub.property.%s", APPLICATION_PROPERTY); private static final String EVENT_HUB_OAUTH_SERVICE_ID = "eventHubOauth"; + private static final String EVENT_HUB_IDENTITY_SERVICE_ID = "eventHubIdentity"; private static final String BLOB_OAUTH_SERVICE_ID = "blobOauth"; + private static final String BLOB_IDENTITY_SERVICE_ID = "blobIdentity"; private static final String EXPECTED_TRANSIT_URI = String.format("amqps://%s%s/%s/ConsumerGroups/%s/Partitions/%s", EVENT_HUB_NAMESPACE, @@ -202,7 +206,7 @@ public void testProcessorConfigValidityWithBothStorageKeyAndTokenSet() { testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_NAME); testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY, STORAGE_ACCOUNT_KEY); testRunner.setProperty(ConsumeAzureEventHub.STORAGE_SAS_TOKEN, STORAGE_TOKEN); - testRunner.assertNotValid(); + testRunner.assertValid(); } @Test @@ -218,6 +222,20 @@ public void testProcessorConfigValidityWithEventHubOAuthRequiresTokenProvider() testRunner.assertValid(); } + @Test + public void testProcessorConfigValidityWithEventHubIdentityFederationRequiresTokenProvider() throws InitializationException { + testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); + testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE, EVENT_HUB_NAMESPACE); + testRunner.setProperty(ConsumeAzureEventHub.CHECKPOINT_STRATEGY, CheckpointStrategy.COMPONENT_STATE.getValue()); + testRunner.setProperty(ConsumeAzureEventHub.AUTHENTICATION_STRATEGY, AzureEventHubAuthenticationStrategy.IDENTITY_FEDERATION.getValue()); + + testRunner.assertNotValid(); + + configureEventHubIdentityTokenProvider(); + + testRunner.assertValid(); + } + @Test public void testProcessorConfigValidityWithBlobOAuthRequiresTokenProvider() throws InitializationException { testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); @@ -231,6 +249,20 @@ public void testProcessorConfigValidityWithBlobOAuthRequiresTokenProvider() thro testRunner.assertValid(); } + @Test + public void testProcessorConfigValidityWithBlobIdentityFederationRequiresTokenProvider() throws InitializationException { + testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); + testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE, EVENT_HUB_NAMESPACE); + testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_NAME); + testRunner.setProperty(ConsumeAzureEventHub.BLOB_STORAGE_AUTHENTICATION_STRATEGY, BlobStorageAuthenticationStrategy.IDENTITY_FEDERATION.getValue()); + + testRunner.assertNotValid(); + + configureBlobIdentityTokenProvider(); + + testRunner.assertValid(); + } + @Test public void testProcessorConfigValidityWithTokenSet() throws InitializationException { testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME, EVENT_HUB_NAME); @@ -668,6 +700,13 @@ private void configureEventHubOAuthTokenProvider() throws InitializationExceptio testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_OAUTH2_ACCESS_TOKEN_PROVIDER, EVENT_HUB_OAUTH_SERVICE_ID); } + private void configureEventHubIdentityTokenProvider() throws InitializationException { + final MockIdentityFederationTokenProvider provider = new MockIdentityFederationTokenProvider(); + testRunner.addControllerService(EVENT_HUB_IDENTITY_SERVICE_ID, provider); + testRunner.enableControllerService(provider); + testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_IDENTITY_FEDERATION_TOKEN_PROVIDER, EVENT_HUB_IDENTITY_SERVICE_ID); + } + private void configureBlobOAuthTokenProvider() throws InitializationException { final MockOAuth2AccessTokenProvider provider = new MockOAuth2AccessTokenProvider(); testRunner.addControllerService(BLOB_OAUTH_SERVICE_ID, provider); @@ -675,6 +714,13 @@ private void configureBlobOAuthTokenProvider() throws InitializationException { testRunner.setProperty(ConsumeAzureEventHub.BLOB_STORAGE_OAUTH2_ACCESS_TOKEN_PROVIDER, BLOB_OAUTH_SERVICE_ID); } + private void configureBlobIdentityTokenProvider() throws InitializationException { + final MockIdentityFederationTokenProvider provider = new MockIdentityFederationTokenProvider(); + testRunner.addControllerService(BLOB_IDENTITY_SERVICE_ID, provider); + testRunner.enableControllerService(provider); + testRunner.setProperty(ConsumeAzureEventHub.BLOB_STORAGE_IDENTITY_FEDERATION_TOKEN_PROVIDER, BLOB_IDENTITY_SERVICE_ID); + } + private class MockConsumeAzureEventHub extends ConsumeAzureEventHub { @Override protected EventProcessorClient createClient(final ProcessContext context) { @@ -696,4 +742,12 @@ public AccessToken getAccessDetails() { return accessToken; } } + + private static class MockIdentityFederationTokenProvider extends AbstractControllerService implements AzureIdentityFederationTokenProvider { + @Override + public TokenCredential getCredentials() { + return tokenRequestContext -> Mono.just( + new com.azure.core.credential.AccessToken("access-token", java.time.OffsetDateTime.now().plusMinutes(5))); + } + } } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureCredentialsControllerService.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureCredentialsControllerService.java index a89968446585..c8a7725a6cd0 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureCredentialsControllerService.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureCredentialsControllerService.java @@ -16,24 +16,41 @@ */ package org.apache.nifi.services.azure; +import com.azure.core.credential.AccessToken; +import com.azure.core.credential.TokenCredential; +import com.azure.core.credential.TokenRequestContext; +import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.NoOpProcessor; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + +import java.time.OffsetDateTime; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; public class TestStandardAzureCredentialsControllerService { private static final String CREDENTIALS_SERVICE_IDENTIFIER = "credentials-service"; private static final String SAMPLE_MANAGED_CLIENT_ID = "sample-managed-client-id"; + private static final String TOKEN_PROVIDER_IDENTIFIER = "identity-provider"; + private TestRunner runner; private StandardAzureCredentialsControllerService credentialsService; + private MockIdentityFederationTokenProvider tokenProvider; @BeforeEach public void setUp() throws InitializationException { runner = TestRunners.newTestRunner(NoOpProcessor.class); credentialsService = new StandardAzureCredentialsControllerService(); runner.addControllerService(CREDENTIALS_SERVICE_IDENTIFIER, credentialsService); + + tokenProvider = new MockIdentityFederationTokenProvider(); + runner.addControllerService(TOKEN_PROVIDER_IDENTIFIER, tokenProvider); + runner.enableControllerService(tokenProvider); } @Test @@ -43,7 +60,6 @@ public void testValidControllerServiceConfiguration() { StandardAzureCredentialsControllerService.DEFAULT_CREDENTIAL); runner.assertValid(credentialsService); - // should still be valid be ignored until CREDENTIAL_CONFIGURATION_STRATEGY is set to MANAGED_IDENTITY runner.setProperty(credentialsService, StandardAzureCredentialsControllerService.MANAGED_IDENTITY_CLIENT_ID, SAMPLE_MANAGED_CLIENT_ID); @@ -76,5 +92,39 @@ public void testNotValidControllerServiceBlankManagedIdentityClientId() { runner.assertValid(credentialsService); } + @Test + public void testIdentityFederationStrategyRequiresProvider() { + runner.setProperty(credentialsService, + StandardAzureCredentialsControllerService.CREDENTIAL_CONFIGURATION_STRATEGY, + StandardAzureCredentialsControllerService.IDENTITY_FEDERATION); + runner.assertNotValid(credentialsService); + } + + @Test + public void testIdentityFederationStrategyProvidesTokenCredential() throws Exception { + runner.setProperty(credentialsService, + StandardAzureCredentialsControllerService.CREDENTIAL_CONFIGURATION_STRATEGY, + StandardAzureCredentialsControllerService.IDENTITY_FEDERATION); + runner.setProperty(credentialsService, + StandardAzureCredentialsControllerService.IDENTITY_FEDERATION_TOKEN_PROVIDER, + TOKEN_PROVIDER_IDENTIFIER); + + runner.assertValid(credentialsService); + runner.enableControllerService(credentialsService); + + final TokenCredential tokenCredential = credentialsService.getCredentials(); + final AccessToken accessToken = tokenCredential.getToken(new TokenRequestContext().addScopes("https://storage.azure.com/.default")).block(); + assertNotNull(accessToken); + assertEquals(MockIdentityFederationTokenProvider.ACCESS_TOKEN_VALUE, accessToken.getToken()); + } + + private static final class MockIdentityFederationTokenProvider extends AbstractControllerService implements AzureIdentityFederationTokenProvider { + private static final String ACCESS_TOKEN_VALUE = "access-token"; + + @Override + public TokenCredential getCredentials() { + return tokenRequestContext -> Mono.just(new AccessToken(ACCESS_TOKEN_VALUE, OffsetDateTime.now().plusHours(1))); + } + } } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureIdentityFederationTokenProvider.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureIdentityFederationTokenProvider.java new file mode 100644 index 000000000000..c6bff0337581 --- /dev/null +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/TestStandardAzureIdentityFederationTokenProvider.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.oauth2.AccessToken; +import org.apache.nifi.oauth2.OAuth2AccessTokenProvider; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for {@link StandardAzureIdentityFederationTokenProvider}. + * + * Note: These tests validate configuration and property handling only. + * Actual token exchange with Azure AD requires integration testing with real credentials. + */ +public class TestStandardAzureIdentityFederationTokenProvider { + + private TestRunner runner; + private StandardAzureIdentityFederationTokenProvider tokenProvider; + + @BeforeEach + public void setUp() throws InitializationException { + runner = TestRunners.newTestRunner(NoOpProcessor.class); + + tokenProvider = new StandardAzureIdentityFederationTokenProvider(); + runner.addControllerService("identity-provider", tokenProvider); + + final MockOAuth2AccessTokenProvider assertionProvider = new MockOAuth2AccessTokenProvider(); + runner.addControllerService("assertion-provider", assertionProvider); + runner.enableControllerService(assertionProvider); + } + + @Test + public void testValidConfiguration() { + runner.setProperty(tokenProvider, StandardAzureIdentityFederationTokenProvider.TENANT_ID, "tenant-id"); + runner.setProperty(tokenProvider, StandardAzureIdentityFederationTokenProvider.CLIENT_ID, "client-id"); + runner.setProperty(tokenProvider, StandardAzureIdentityFederationTokenProvider.CLIENT_ASSERTION_PROVIDER, "assertion-provider"); + + runner.assertValid(tokenProvider); + } + + @Test + public void testInvalidWithoutTenantId() { + runner.setProperty(tokenProvider, StandardAzureIdentityFederationTokenProvider.CLIENT_ID, "client-id"); + runner.setProperty(tokenProvider, StandardAzureIdentityFederationTokenProvider.CLIENT_ASSERTION_PROVIDER, "assertion-provider"); + + runner.assertNotValid(tokenProvider); + } + + @Test + public void testInvalidWithoutClientId() { + runner.setProperty(tokenProvider, StandardAzureIdentityFederationTokenProvider.TENANT_ID, "tenant-id"); + runner.setProperty(tokenProvider, StandardAzureIdentityFederationTokenProvider.CLIENT_ASSERTION_PROVIDER, "assertion-provider"); + + runner.assertNotValid(tokenProvider); + } + + @Test + public void testInvalidWithoutClientAssertionProvider() { + runner.setProperty(tokenProvider, StandardAzureIdentityFederationTokenProvider.TENANT_ID, "tenant-id"); + runner.setProperty(tokenProvider, StandardAzureIdentityFederationTokenProvider.CLIENT_ID, "client-id"); + + runner.assertNotValid(tokenProvider); + } + + private static class MockOAuth2AccessTokenProvider extends AbstractControllerService implements OAuth2AccessTokenProvider { + @Override + public AccessToken getAccessDetails() { + final AccessToken accessToken = new AccessToken(); + accessToken.setAccessToken("client-assertion-token"); + accessToken.setExpiresIn(600L); + return accessToken; + } + } +} diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestADLSCredentialsControllerService.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestADLSCredentialsControllerService.java index ad0c1ba83dfe..828dd47866af 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestADLSCredentialsControllerService.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestADLSCredentialsControllerService.java @@ -16,15 +16,21 @@ */ package org.apache.nifi.services.azure.storage; +import com.azure.core.credential.AccessToken; +import com.azure.core.credential.TokenCredential; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.util.NoOpProcessor; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import java.time.OffsetDateTime; import java.util.HashMap; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -36,6 +42,7 @@ public class TestADLSCredentialsControllerService { public static final String CREDENTIALS_SERVICE_IDENTIFIER = "credentials-service"; + private static final String TOKEN_PROVIDER_IDENTIFIER = "oauth2-provider"; private static final String ACCOUNT_NAME_VALUE = "AccountName"; private static final String ACCOUNT_KEY_VALUE = "AccountKey"; @@ -48,12 +55,16 @@ public class TestADLSCredentialsControllerService { private TestRunner runner; private ADLSCredentialsControllerService credentialsService; + private MockOAuth2AccessTokenProvider tokenProvider; @BeforeEach public void setUp() throws InitializationException { runner = TestRunners.newTestRunner(NoOpProcessor.class); credentialsService = new ADLSCredentialsControllerService(); runner.addControllerService(CREDENTIALS_SERVICE_IDENTIFIER, credentialsService); + tokenProvider = new MockOAuth2AccessTokenProvider(); + runner.addControllerService(TOKEN_PROVIDER_IDENTIFIER, tokenProvider); + runner.enableControllerService(tokenProvider); } @Test @@ -153,6 +164,23 @@ public void testNotValidBecauseNoClientSecretSpecifiedForServicePrincipal() { runner.assertNotValid(credentialsService); } + @Test + public void testValidWithAccountNameAndIdentityFederation() { + configureCredentialsType(AzureStorageCredentialsType.IDENTITY_FEDERATION); + configureAccountName(); + configureIdentityFederationProvider(); + + runner.assertValid(credentialsService); + } + + @Test + public void testNotValidWithIdentityFederationMissingProvider() { + configureCredentialsType(AzureStorageCredentialsType.IDENTITY_FEDERATION); + configureAccountName(); + + runner.assertNotValid(credentialsService); + } + @Test public void testGetCredentialsDetailsWithAccountKey() throws Exception { // GIVEN @@ -245,6 +273,21 @@ public void testGetCredentialsDetailsWithSasTokenUsingEL() throws Exception { assertNull(actual.getServicePrincipalClientSecret()); } + @Test + public void testGetCredentialsDetailsWithIdentityFederation() throws Exception { + configureCredentialsType(AzureStorageCredentialsType.IDENTITY_FEDERATION); + configureAccountName(); + configureIdentityFederationProvider(); + + runner.enableControllerService(credentialsService); + + final ADLSCredentialsDetails actual = credentialsService.getCredentialsDetails(new HashMap<>()); + + assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName()); + final AzureIdentityFederationTokenProvider identityTokenProvider = actual.getIdentityTokenProvider(); + assertNotNull(identityTokenProvider); + } + @Test public void testGetCredentialsDetailsWithSystemAssignedManagedIdentity() throws Exception { // GIVEN @@ -405,8 +448,21 @@ private void configureServicePrincipalClientSecret() { runner.setProperty(credentialsService, AzureStorageUtils.SERVICE_PRINCIPAL_CLIENT_SECRET, SERVICE_PRINCIPAL_CLIENT_SECRET_VALUE); } + private void configureIdentityFederationProvider() { + runner.setProperty(credentialsService, AzureStorageUtils.IDENTITY_FEDERATION_TOKEN_PROVIDER, TOKEN_PROVIDER_IDENTIFIER); + } + private void configurePropertyUsingEL(PropertyDescriptor propertyDescriptor, String variableName, String variableValue) { runner.setProperty(credentialsService, propertyDescriptor, String.format("${%s}", variableName)); runner.setEnvironmentVariableValue(variableName, variableValue); } + + private static final class MockOAuth2AccessTokenProvider extends AbstractControllerService implements AzureIdentityFederationTokenProvider { + private static final String ACCESS_TOKEN_VALUE = "access-token"; + + @Override + public TokenCredential getCredentials() { + return tokenRequestContext -> Mono.just(new AccessToken(ACCESS_TOKEN_VALUE, OffsetDateTime.now().plusHours(1))); + } + } } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java index 94b61bb2e22e..cbb46005fc87 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/TestAzureStorageCredentialsControllerService_v12.java @@ -16,13 +16,20 @@ */ package org.apache.nifi.services.azure.storage; +import com.azure.core.credential.AccessToken; +import com.azure.core.credential.TokenCredential; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import org.apache.nifi.util.NoOpProcessor; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import java.time.OffsetDateTime; import java.util.Collections; import static org.apache.nifi.processors.azure.AzureServiceEndpoints.DEFAULT_BLOB_ENDPOINT_SUFFIX; @@ -35,11 +42,13 @@ import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.SERVICE_PRINCIPAL_CLIENT_SECRET; import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.SERVICE_PRINCIPAL_TENANT_ID; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; public class TestAzureStorageCredentialsControllerService_v12 { public static final String CREDENTIALS_SERVICE_IDENTIFIER = "credentials-service"; + private static final String TOKEN_PROVIDER_IDENTIFIER = "oauth2-provider"; private static final String ACCOUNT_NAME_VALUE = "AccountName"; private static final String ACCOUNT_KEY_VALUE = "AccountKey"; @@ -51,12 +60,17 @@ public class TestAzureStorageCredentialsControllerService_v12 { private TestRunner runner; private AzureStorageCredentialsControllerService_v12 credentialsService; + private MockOAuth2AccessTokenProvider tokenProvider; @BeforeEach public void setUp() throws InitializationException { runner = TestRunners.newTestRunner(NoOpProcessor.class); credentialsService = new AzureStorageCredentialsControllerService_v12(); runner.addControllerService(CREDENTIALS_SERVICE_IDENTIFIER, credentialsService); + + tokenProvider = new MockOAuth2AccessTokenProvider(); + runner.addControllerService(TOKEN_PROVIDER_IDENTIFIER, tokenProvider); + runner.enableControllerService(tokenProvider); } @Test @@ -150,6 +164,39 @@ public void testServicePrincipalCredentialsTypeNotValidBecauseClientSecretMissin runner.assertNotValid(credentialsService); } + @Test + public void testIdentityFederationCredentialsTypeValid() { + configureAccountName(); + configureCredentialsType(AzureStorageCredentialsType.IDENTITY_FEDERATION); + configureIdentityFederationProvider(); + + runner.assertValid(credentialsService); + } + + @Test + public void testIdentityFederationCredentialsTypeNotValidWhenProviderMissing() { + configureAccountName(); + configureCredentialsType(AzureStorageCredentialsType.IDENTITY_FEDERATION); + + runner.assertNotValid(credentialsService); + } + + @Test + public void testGetCredentialsDetailsWithIdentityFederation() throws Exception { + configureAccountName(); + configureCredentialsType(AzureStorageCredentialsType.IDENTITY_FEDERATION); + configureIdentityFederationProvider(); + + runner.enableControllerService(credentialsService); + + final AzureStorageCredentialsDetails_v12 actual = credentialsService.getCredentialsDetails(Collections.emptyMap()); + + assertEquals(ACCOUNT_NAME_VALUE, actual.getAccountName()); + assertEquals(AzureStorageCredentialsType.IDENTITY_FEDERATION, actual.getCredentialsType()); + final AzureIdentityFederationTokenProvider identityTokenProvider = actual.getIdentityTokenProvider(); + assertNotNull(identityTokenProvider); + } + @Test public void testGetCredentialsDetailsWithAccountKey() { configureAccountName(); @@ -276,4 +323,17 @@ private void configureServicePrincipalClientId() { private void configureServicePrincipalClientSecret() { runner.setProperty(credentialsService, SERVICE_PRINCIPAL_CLIENT_SECRET, SERVICE_PRINCIPAL_CLIENT_SECRET_VALUE); } + + private void configureIdentityFederationProvider() { + runner.setProperty(credentialsService, AzureStorageUtils.IDENTITY_FEDERATION_TOKEN_PROVIDER, TOKEN_PROVIDER_IDENTIFIER); + } + + private static final class MockOAuth2AccessTokenProvider extends AbstractControllerService implements AzureIdentityFederationTokenProvider { + private static final String ACCESS_TOKEN_VALUE = "access-token"; + + @Override + public TokenCredential getCredentials() { + return tokenRequestContext -> Mono.just(new AccessToken(ACCESS_TOKEN_VALUE, OffsetDateTime.now().plusHours(1))); + } + } } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/util/TestAzureWorkloadIdentityCredentialUtils.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/util/TestAzureWorkloadIdentityCredentialUtils.java new file mode 100644 index 000000000000..f9e3968381a1 --- /dev/null +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/util/TestAzureWorkloadIdentityCredentialUtils.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.util; + +import com.azure.core.credential.TokenCredential; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.oauth2.AccessToken; +import org.apache.nifi.oauth2.OAuth2AccessTokenProvider; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Unit tests for {@link AzureWorkloadIdentityCredentialUtils}. + */ +public class TestAzureWorkloadIdentityCredentialUtils { + + private static final String TENANT_ID = "test-tenant-id"; + private static final String CLIENT_ID = "test-client-id"; + private static final String ASSERTION_TOKEN = "test-assertion-token"; + + @Test + public void testCreateCredentialWithOAuth2Provider() { + final OAuth2AccessTokenProvider provider = new MockOAuth2Provider(ASSERTION_TOKEN); + + final TokenCredential credential = AzureWorkloadIdentityCredentialUtils.createCredential( + TENANT_ID, CLIENT_ID, provider); + + assertNotNull(credential); + } + + @Test + public void testCreateCredentialWithSupplier() { + final TokenCredential credential = AzureWorkloadIdentityCredentialUtils.createCredential( + TENANT_ID, CLIENT_ID, () -> ASSERTION_TOKEN); + + assertNotNull(credential); + } + + @Test + public void testCreateCredentialWithNullTenantId() { + assertThrows(NullPointerException.class, () -> + AzureWorkloadIdentityCredentialUtils.createCredential(null, CLIENT_ID, () -> ASSERTION_TOKEN)); + } + + @Test + public void testCreateCredentialWithNullClientId() { + assertThrows(NullPointerException.class, () -> + AzureWorkloadIdentityCredentialUtils.createCredential(TENANT_ID, null, () -> ASSERTION_TOKEN)); + } + + @Test + public void testCreateCredentialWithNullSupplier() { + assertThrows(NullPointerException.class, () -> + AzureWorkloadIdentityCredentialUtils.createCredential(TENANT_ID, CLIENT_ID, (java.util.function.Supplier) null)); + } + + @Test + public void testCreateCredentialWithNullProvider() { + assertThrows(NullPointerException.class, () -> + AzureWorkloadIdentityCredentialUtils.createCredential(TENANT_ID, CLIENT_ID, (OAuth2AccessTokenProvider) null)); + } + + @Test + public void testProviderReturnsNullAccessToken() { + final OAuth2AccessTokenProvider provider = new MockOAuth2Provider(null, true); + + final TokenCredential credential = AzureWorkloadIdentityCredentialUtils.createCredential( + TENANT_ID, CLIENT_ID, provider); + + // Credential is created, but calling getToken() would fail when the supplier is invoked + assertNotNull(credential); + } + + @Test + public void testProviderReturnsEmptyToken() { + final OAuth2AccessTokenProvider provider = new MockOAuth2Provider(""); + + final TokenCredential credential = AzureWorkloadIdentityCredentialUtils.createCredential( + TENANT_ID, CLIENT_ID, provider); + + // Credential is created, but calling getToken() would fail when the supplier is invoked + assertNotNull(credential); + } + + private static class MockOAuth2Provider extends AbstractControllerService implements OAuth2AccessTokenProvider { + private final String tokenValue; + private final boolean returnNull; + + MockOAuth2Provider(String tokenValue) { + this(tokenValue, false); + } + + MockOAuth2Provider(String tokenValue, boolean returnNull) { + this.tokenValue = tokenValue; + this.returnNull = returnNull; + } + + @Override + public AccessToken getAccessDetails() { + if (returnNull) { + return null; + } + final AccessToken accessToken = new AccessToken(); + accessToken.setAccessToken(tokenValue); + accessToken.setExpiresIn(600L); + return accessToken; + } + } +} diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/AzureIdentityFederationTokenProvider.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/AzureIdentityFederationTokenProvider.java new file mode 100644 index 000000000000..6b278171f387 --- /dev/null +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/AzureIdentityFederationTokenProvider.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure; + +import com.azure.core.credential.TokenCredential; +import org.apache.nifi.controller.ControllerService; + +/** + * Controller Service that provides Azure {@link TokenCredential} for workload identity federation. + * Implementations exchange an external identity token for an Azure AD access token suitable for + * Azure service clients (for example, Storage, Data Lake, or Event Hubs). + */ +public interface AzureIdentityFederationTokenProvider extends ControllerService { + + /** + * Returns a {@link TokenCredential} that can be used to authenticate with Azure services. + * The credential handles token acquisition and refresh automatically. + * + * @return a TokenCredential for Azure service authentication + */ + TokenCredential getCredentials(); +} diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java index b1780f9e164f..e6ec397dd3d8 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/ADLSCredentialsDetails.java @@ -18,6 +18,7 @@ import com.azure.core.credential.AccessToken; import com.azure.core.http.ProxyOptions; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import java.util.Objects; @@ -40,6 +41,7 @@ public class ADLSCredentialsDetails { private final String servicePrincipalClientId; private final String servicePrincipalClientSecret; + private final AzureIdentityFederationTokenProvider identityTokenProvider; private final ProxyOptions proxyOptions; public ADLSCredentialsDetails( @@ -53,6 +55,7 @@ public ADLSCredentialsDetails( String servicePrincipalTenantId, String servicePrincipalClientId, String servicePrincipalClientSecret, + AzureIdentityFederationTokenProvider identityTokenProvider, ProxyOptions proxyOptions ) { this.accountName = accountName; @@ -65,6 +68,7 @@ public ADLSCredentialsDetails( this.servicePrincipalTenantId = servicePrincipalTenantId; this.servicePrincipalClientId = servicePrincipalClientId; this.servicePrincipalClientSecret = servicePrincipalClientSecret; + this.identityTokenProvider = identityTokenProvider; this.proxyOptions = proxyOptions; } @@ -108,6 +112,10 @@ public String getServicePrincipalClientSecret() { return servicePrincipalClientSecret; } + public AzureIdentityFederationTokenProvider getIdentityTokenProvider() { + return identityTokenProvider; + } + public ProxyOptions getProxyOptions() { return proxyOptions; } @@ -133,6 +141,7 @@ public boolean equals(Object o) { && Objects.equals(servicePrincipalTenantId, that.servicePrincipalTenantId) && Objects.equals(servicePrincipalClientId, that.servicePrincipalClientId) && Objects.equals(servicePrincipalClientSecret, that.servicePrincipalClientSecret) + && Objects.equals(identityTokenProvider, that.identityTokenProvider) && equalsProxyOptions(proxyOptions, that.proxyOptions); } @@ -149,6 +158,7 @@ public int hashCode() { servicePrincipalTenantId, servicePrincipalClientId, servicePrincipalClientSecret, + identityTokenProvider, hashCodeProxyOptions(proxyOptions) ); } @@ -164,6 +174,7 @@ public static class Builder { private String servicePrincipalTenantId; private String servicePrincipalClientId; private String servicePrincipalClientSecret; + private AzureIdentityFederationTokenProvider identityTokenProvider; private ProxyOptions proxyOptions; private Builder() { } @@ -222,6 +233,11 @@ public Builder setServicePrincipalClientSecret(String servicePrincipalClientSecr return this; } + public Builder setIdentityTokenProvider(final AzureIdentityFederationTokenProvider identityTokenProvider) { + this.identityTokenProvider = identityTokenProvider; + return this; + } + public Builder setProxyOptions(ProxyOptions proxyOptions) { this.proxyOptions = proxyOptions; return this; @@ -229,7 +245,7 @@ public Builder setProxyOptions(ProxyOptions proxyOptions) { public ADLSCredentialsDetails build() { return new ADLSCredentialsDetails(accountName, accountKey, sasToken, endpointSuffix, accessToken, useManagedIdentity, managedIdentityClientId, - servicePrincipalTenantId, servicePrincipalClientId, servicePrincipalClientSecret, proxyOptions); + servicePrincipalTenantId, servicePrincipalClientId, servicePrincipalClientSecret, identityTokenProvider, proxyOptions); } } } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java index ee28670f9cee..337d8c2e5594 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsDetails_v12.java @@ -18,6 +18,7 @@ import com.azure.core.credential.AccessToken; import com.azure.core.http.ProxyOptions; +import org.apache.nifi.services.azure.AzureIdentityFederationTokenProvider; import java.util.Objects; @@ -36,11 +37,13 @@ public class AzureStorageCredentialsDetails_v12 { private final String servicePrincipalClientId; private final String servicePrincipalClientSecret; private final AccessToken accessToken; + private final AzureIdentityFederationTokenProvider identityTokenProvider; private final ProxyOptions proxyOptions; private AzureStorageCredentialsDetails_v12( String accountName, String endpointSuffix, AzureStorageCredentialsType credentialsType, String accountKey, String sasToken, String managedIdentityClientId, - String servicePrincipalTenantId, String servicePrincipalClientId, String servicePrincipalClientSecret, AccessToken accessToken, ProxyOptions proxyOptions) { + String servicePrincipalTenantId, String servicePrincipalClientId, String servicePrincipalClientSecret, AccessToken accessToken, + AzureIdentityFederationTokenProvider identityTokenProvider, ProxyOptions proxyOptions) { this.accountName = accountName; this.endpointSuffix = endpointSuffix; this.credentialsType = credentialsType; @@ -51,6 +54,7 @@ private AzureStorageCredentialsDetails_v12( this.servicePrincipalClientId = servicePrincipalClientId; this.servicePrincipalClientSecret = servicePrincipalClientSecret; this.accessToken = accessToken; + this.identityTokenProvider = identityTokenProvider; this.proxyOptions = proxyOptions; } @@ -94,6 +98,10 @@ public AccessToken getAccessToken() { return accessToken; } + public AzureIdentityFederationTokenProvider getIdentityTokenProvider() { + return identityTokenProvider; + } + public ProxyOptions getProxyOptions() { return proxyOptions; } @@ -119,6 +127,7 @@ public boolean equals(Object o) { && Objects.equals(servicePrincipalClientId, that.servicePrincipalClientId) && Objects.equals(servicePrincipalClientSecret, that.servicePrincipalClientSecret) && Objects.equals(accessToken, that.accessToken) + && Objects.equals(identityTokenProvider, that.identityTokenProvider) && equalsProxyOptions(proxyOptions, that.proxyOptions); } @@ -135,6 +144,7 @@ public int hashCode() { servicePrincipalClientId, servicePrincipalClientSecret, accessToken, + identityTokenProvider, hashCodeProxyOptions(proxyOptions) ); } @@ -143,14 +153,16 @@ public static AzureStorageCredentialsDetails_v12 createWithAccountKey( String accountName, String endpointSuffix, String accountKey) { - return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.ACCOUNT_KEY, accountKey, null, null, null, null, null, null, null); + return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.ACCOUNT_KEY, + accountKey, null, null, null, null, null, null, null, null); } public static AzureStorageCredentialsDetails_v12 createWithSasToken( String accountName, String endpointSuffix, String sasToken) { - return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.SAS_TOKEN, null, sasToken, null, null, null, null, null, null); + return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.SAS_TOKEN, + null, sasToken, null, null, null, null, null, null, null); } public static AzureStorageCredentialsDetails_v12 createWithManagedIdentity( @@ -158,8 +170,8 @@ public static AzureStorageCredentialsDetails_v12 createWithManagedIdentity( String endpointSuffix, String managedIdentityClientId, ProxyOptions proxyOptions) { - return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.MANAGED_IDENTITY, null, null, managedIdentityClientId, - null, null, null, null, proxyOptions); + return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.MANAGED_IDENTITY, + null, null, managedIdentityClientId, null, null, null, null, null, proxyOptions); } public static AzureStorageCredentialsDetails_v12 createWithServicePrincipal( @@ -169,14 +181,32 @@ public static AzureStorageCredentialsDetails_v12 createWithServicePrincipal( String servicePrincipalClientId, String servicePrincipalClientSecret, ProxyOptions proxyOptions) { - return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.SERVICE_PRINCIPAL, null, null, null, - servicePrincipalTenantId, servicePrincipalClientId, servicePrincipalClientSecret, null, proxyOptions); + return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.SERVICE_PRINCIPAL, + null, null, null, servicePrincipalTenantId, servicePrincipalClientId, servicePrincipalClientSecret, null, null, proxyOptions); } public static AzureStorageCredentialsDetails_v12 createWithAccessToken( String accountName, String endpointSuffix, AccessToken accessToken) { - return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.ACCESS_TOKEN, null, null, null, null, null, null, accessToken, null); + return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.IDENTITY_FEDERATION, + null, null, null, null, null, null, accessToken, null, null); + } + + public static AzureStorageCredentialsDetails_v12 createWithAccessToken( + String accountName, + String endpointSuffix, + AccessToken accessToken, + AzureIdentityFederationTokenProvider identityTokenProvider) { + return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.IDENTITY_FEDERATION, null, null, null, + null, null, null, accessToken, identityTokenProvider, null); + } + + public static AzureStorageCredentialsDetails_v12 createWithIdentityTokenProvider( + String accountName, + String endpointSuffix, + AzureIdentityFederationTokenProvider identityTokenProvider) { + return new AzureStorageCredentialsDetails_v12(accountName, endpointSuffix, AzureStorageCredentialsType.IDENTITY_FEDERATION, null, null, null, + null, null, null, null, identityTokenProvider, null); } } diff --git a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java index 04921c9ec080..27d7f74cf5d7 100644 --- a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java +++ b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsType.java @@ -24,7 +24,7 @@ public enum AzureStorageCredentialsType implements DescribedValue { SAS_TOKEN("SAS Token", "SAS (Shared Access Signature) Token generated for accessing resources in the storage account"), MANAGED_IDENTITY("Managed Identity", "Azure Virtual Machine Managed Identity (it can only be used when NiFi is running on Azure)"), SERVICE_PRINCIPAL("Service Principal", "Azure Active Directory Service Principal with Client Id / Client Secret of a registered application"), - ACCESS_TOKEN("Access Token", "Access Token provided by custom controller service implementations"); + IDENTITY_FEDERATION("Identity Federation", "Azure credential obtained via workload identity federation using an external identity token"); private final String displayName; private final String description;