From 1abd85442ddac4d04e0b9266e46cabe2853b219f Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 26 Jan 2026 15:57:57 -0500 Subject: [PATCH 1/2] NIFI-15511: Added endpoints for purging all FlowFiles for a given Connector; added method to ConnectorClient to call these endpoints; updated system tests to use these endpoints when tearing down flows; some bug fixes --- .../nifi/groups/StandardProcessGroup.java | 12 ++ .../components/connector/ConnectorNode.java | 6 + .../connector/StandardConnectorNode.java | 15 +- .../apache/nifi/web/NiFiServiceFacade.java | 16 ++ .../nifi/web/StandardNiFiServiceFacade.java | 10 + .../nifi/web/api/ConnectorResource.java | 200 ++++++++++++++++++ .../org/apache/nifi/web/dao/ConnectorDAO.java | 4 + .../web/dao/impl/StandardConnectorDAO.java | 20 ++ .../nifi/tests/system/NiFiClientUtil.java | 28 +++ .../nifi/toolkit/client/ConnectorClient.java | 33 +++ .../client/impl/JerseyConnectorClient.java | 54 +++++ 11 files changed, 397 insertions(+), 1 deletion(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 528842281ec0..cb62602085bf 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -1591,6 +1591,7 @@ private DropFlowFileStatus handleDropAllFlowFiles(String dropRequestId, Function aggregateDropFlowFileStatus.setState(null); AtomicBoolean processedAtLeastOne = new AtomicBoolean(false); + final List> completionFutures = new ArrayList<>(); connections.stream() .map(Connection::getFlowFileQueue) @@ -1598,10 +1599,21 @@ private DropFlowFileStatus handleDropAllFlowFiles(String dropRequestId, Function .forEach(additionalDropFlowFileStatus -> { aggregate(aggregateDropFlowFileStatus, additionalDropFlowFileStatus); processedAtLeastOne.set(true); + completionFutures.add(additionalDropFlowFileStatus.getCompletionFuture()); }); if (processedAtLeastOne.get()) { resultDropFlowFileStatus = aggregateDropFlowFileStatus; + + // When all individual drop requests complete, mark the aggregate as complete + CompletableFuture.allOf(completionFutures.toArray(new CompletableFuture[0])) + .whenComplete((result, throwable) -> { + if (throwable != null) { + aggregateDropFlowFileStatus.setState(DropFlowFileState.FAILURE, throwable.getMessage()); + } else { + aggregateDropFlowFileStatus.setState(DropFlowFileState.COMPLETE); + } + }); } else { resultDropFlowFileStatus = null; } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java index 13cadb3c4984..6cafcbbaf17d 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorNode.java @@ -211,6 +211,12 @@ public interface ConnectorNode extends ComponentAuthorizable, VersionedComponent */ void verifyCancelDrainFlowFiles() throws IllegalStateException; + /** + * Verifies that the Connector can have its FlowFiles purged. + * @throws IllegalStateException if not in a state where FlowFiles can be purged + */ + void verifyCanPurgeFlowFiles() throws IllegalStateException; + /** * Purges all FlowFiles from the Connector, immediately dropping the data. * diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java index e614db7070df..75525d16c32e 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java @@ -494,6 +494,19 @@ public void verifyCancelDrainFlowFiles() throws IllegalStateException { } } + @Override + public void verifyCanPurgeFlowFiles() throws IllegalStateException { + final ConnectorState desiredState = getDesiredState(); + if (desiredState != ConnectorState.STOPPED) { + throw new IllegalStateException("Cannot purge FlowFiles for " + this + " because its desired state is currently " + desiredState + "; it must be STOPPED."); + } + + final ConnectorState currentState = getCurrentState(); + if (currentState != ConnectorState.STOPPED) { + throw new IllegalStateException("Cannot purge FlowFiles for " + this + " because its current state is " + currentState + "; it must be STOPPED."); + } + } + @Override public Future purgeFlowFiles(final String requestor) { requireStopped("purge FlowFiles", ConnectorState.PURGING); @@ -520,7 +533,7 @@ private void requireStopped(final String action, final ConnectorState newState) while (!stateUpdated) { final ConnectorState currentState = getCurrentState(); if (currentState != ConnectorState.STOPPED) { - throw new IllegalStateException("Cannot " + action + " for " + this + " because its current state is currently " + currentState + "; it must be STOPPED."); + throw new IllegalStateException("Cannot " + action + " for " + this + " because its current state is " + currentState + "; it must be STOPPED."); } stateUpdated = stateTransition.trySetCurrentState(ConnectorState.STOPPED, newState); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index d270c0344fe7..c350b2b9837e 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -251,6 +251,22 @@ Set getConnectorControllerServices(String connectorId, Optional getConnectorAsset(String assetId); + /** + * Verifies that the connector is in a state where FlowFiles can be purged. + * + * @param connectorId the connector ID + * @throws IllegalStateException if the connector is not in a state where FlowFiles can be purged + */ + void verifyPurgeConnectorFlowFiles(String connectorId); + + /** + * Purges all FlowFiles from the connector. + * + * @param connectorId the connector ID + * @param requestor the identity of the user requesting the purge (used for provenance events) + */ + void purgeConnectorFlowFiles(String connectorId, String requestor); + // ---------------------------------------- // Synchronization methods // ---------------------------------------- diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index f08f8297b5c6..654644d55288 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -3859,6 +3859,16 @@ public Optional getConnectorAsset(final String assetId) { return connectorDAO.getAsset(assetId); } + @Override + public void verifyPurgeConnectorFlowFiles(final String connectorId) { + connectorDAO.verifyPurgeFlowFiles(connectorId); + } + + @Override + public void purgeConnectorFlowFiles(final String connectorId, final String requestor) { + connectorDAO.purgeFlowFiles(connectorId, requestor); + } + @Override public ReportingTaskEntity updateReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) { // get the component, ensure we have access to it, and perform the update request diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java index 02f115b966bf..6a0e12c264a3 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java @@ -77,6 +77,7 @@ import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO; import org.apache.nifi.web.api.dto.ConfigurationStepConfigurationDTO; import org.apache.nifi.web.api.dto.ConnectorDTO; +import org.apache.nifi.web.api.dto.DropRequestDTO; import org.apache.nifi.web.api.dto.VerifyConnectorConfigStepRequestDTO; import org.apache.nifi.web.api.dto.search.SearchResultsDTO; import org.apache.nifi.web.api.entity.AssetEntity; @@ -88,6 +89,7 @@ import org.apache.nifi.web.api.entity.ConnectorRunStatusEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServicesEntity; +import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.SearchResultsEntity; @@ -123,6 +125,7 @@ public class ConnectorResource extends ApplicationResource { private static final Logger logger = LoggerFactory.getLogger(ConnectorResource.class); private static final String VERIFICATION_REQUEST_TYPE = "verification-request"; + private static final String PURGE_REQUEST_TYPE = "purge-request"; private static final String FILENAME_HEADER = "Filename"; private static final String CONTENT_TYPE_HEADER = "Content-Type"; private static final String UPLOAD_CONTENT_TYPE = "application/octet-stream"; @@ -137,6 +140,8 @@ public class ConnectorResource extends ApplicationResource { private final RequestManager> configVerificationRequestManager = new AsyncRequestManager<>(100, 1L, "Connector Configuration Step Verification"); + private final RequestManager purgeRequestManager = new AsyncRequestManager<>(100, 1L, "Connector FlowFile Purge"); + @Context private ServletContext servletContext; @@ -792,6 +797,201 @@ public Response cancelDrain( ); } + + @POST + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("/{id}/purge-requests") + @Operation( + summary = "Creates a request to purge the FlowFiles for this connector", + responses = { + @ApiResponse( + responseCode = "202", description = "The request has been accepted. A HTTP response header will contain the URI where the response can be polled.", + content = @Content(schema = @Schema(implementation = DropRequestEntity.class)) + ), + @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), + @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), + @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + }, + description = "This will create a request to purge all FlowFiles from the connector. The connector must be in a STOPPED state before purging can begin. " + + "This is an asynchronous operation. The client should poll the returned URI to get the status of the purge request.", + security = { + @SecurityRequirement(name = "Write - /connectors/{uuid}") + } + ) + public Response createPurgeRequest( + @Parameter( + description = "The connector id.", + required = true + ) + @PathParam("id") final String id) { + + if (isReplicateRequest()) { + return replicate(HttpMethod.POST); + } + + final ConnectorEntity requestConnectorEntity = new ConnectorEntity(); + requestConnectorEntity.setId(id); + + return withWriteLock( + serviceFacade, + requestConnectorEntity, + lookup -> { + final Authorizable connector = lookup.getConnector(id); + connector.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser()); + }, + () -> serviceFacade.verifyPurgeConnectorFlowFiles(id), + (connectorEntity) -> performAsyncPurge(connectorEntity, id, NiFiUserUtils.getNiFiUser()) + ); + } + + + @GET + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("/{id}/purge-requests/{purge-request-id}") + @Operation( + summary = "Gets the current status of a purge request for the specified connector", + responses = { + @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = DropRequestEntity.class))), + @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), + @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), + @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + }, + security = { + @SecurityRequirement(name = "Only the user that submitted the request can get it") + } + ) + public Response getPurgeRequest( + @Parameter( + description = "The connector id.", + required = true + ) + @PathParam("id") final String connectorId, + @Parameter( + description = "The purge request id.", + required = true + ) + @PathParam("purge-request-id") final String purgeRequestId) { + + if (isReplicateRequest()) { + return replicate(HttpMethod.GET); + } + + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + final AsynchronousWebRequest asyncRequest = purgeRequestManager.getRequest(PURGE_REQUEST_TYPE, purgeRequestId, user); + final DropRequestEntity purgeRequestEntity = createPurgeRequestEntity(asyncRequest, connectorId, purgeRequestId); + return generateOkResponse(purgeRequestEntity).build(); + } + + + @DELETE + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("/{id}/purge-requests/{purge-request-id}") + @Operation( + summary = "Cancels and/or removes a request to purge the FlowFiles for this connector", + responses = { + @ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = DropRequestEntity.class))), + @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), + @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), + @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + }, + security = { + @SecurityRequirement(name = "Only the user that submitted the request can remove it") + } + ) + public Response removePurgeRequest( + @Parameter( + description = "The connector id.", + required = true + ) + @PathParam("id") final String connectorId, + @Parameter( + description = "The purge request id.", + required = true + ) + @PathParam("purge-request-id") final String purgeRequestId) { + + if (isReplicateRequest()) { + return replicate(HttpMethod.DELETE); + } + + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + final boolean twoPhaseRequest = isTwoPhaseRequest(httpServletRequest); + final boolean executionPhase = isExecutionPhase(httpServletRequest); + + if (!twoPhaseRequest || executionPhase) { + final AsynchronousWebRequest asyncRequest = purgeRequestManager.removeRequest(PURGE_REQUEST_TYPE, purgeRequestId, user); + + if (!asyncRequest.isComplete()) { + asyncRequest.cancel(); + } + + final DropRequestEntity purgeRequestEntity = createPurgeRequestEntity(asyncRequest, connectorId, purgeRequestId); + return generateOkResponse(purgeRequestEntity).build(); + } + + if (isValidationPhase(httpServletRequest)) { + purgeRequestManager.getRequest(PURGE_REQUEST_TYPE, purgeRequestId, user); + return generateContinueResponse().build(); + } else if (isCancellationPhase(httpServletRequest)) { + return generateOkResponse().build(); + } else { + throw new IllegalStateException("This request does not appear to be part of the two phase commit."); + } + } + + private Response performAsyncPurge(final ConnectorEntity connectorEntity, final String connectorId, final NiFiUser user) { + final String requestId = generateUuid(); + logger.debug("Generated Purge Request with ID {} for Connector {}", requestId, connectorId); + + final List updateSteps = Collections.singletonList(new StandardUpdateStep("Purge FlowFiles")); + + final AsynchronousWebRequest request = + new StandardAsynchronousWebRequest<>(requestId, connectorEntity, connectorId, user, updateSteps); + + final Consumer> updateTask = asyncRequest -> { + try { + serviceFacade.purgeConnectorFlowFiles(connectorId, user.getIdentity()); + asyncRequest.markStepComplete(null); + } catch (final Exception e) { + logger.error("Failed to purge FlowFiles for Connector {}", connectorId, e); + asyncRequest.fail("Failed to purge FlowFiles due to " + e); + } + }; + + purgeRequestManager.submitRequest(PURGE_REQUEST_TYPE, requestId, request, updateTask); + + final DropRequestEntity purgeRequestEntity = createPurgeRequestEntity(request, connectorId, requestId); + final URI location = URI.create(purgeRequestEntity.getDropRequest().getUri()); + return Response.status(Response.Status.ACCEPTED).location(location).entity(purgeRequestEntity).build(); + } + + private DropRequestEntity createPurgeRequestEntity(final AsynchronousWebRequest asyncRequest, + final String connectorId, final String requestId) { + final DropRequestDTO dto = new DropRequestDTO(); + dto.setId(requestId); + dto.setUri(generateResourceUri("connectors", connectorId, "purge-requests", requestId)); + dto.setSubmissionTime(asyncRequest.getLastUpdated()); + dto.setLastUpdated(asyncRequest.getLastUpdated()); + dto.setPercentCompleted(asyncRequest.getPercentComplete()); + dto.setFinished(asyncRequest.isComplete()); + dto.setFailureReason(asyncRequest.getFailureReason()); + dto.setState(asyncRequest.getState()); + + final DropRequestEntity entity = new DropRequestEntity(); + entity.setDropRequest(dto); + return entity; + } + /** * Gets the configuration step names for the specified connector. * diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java index c681c17e9a68..cb2e7046ce5a 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectorDAO.java @@ -54,6 +54,10 @@ public interface ConnectorDAO { void verifyCancelDrainFlowFile(String id); + void verifyPurgeFlowFiles(String id); + + void purgeFlowFiles(String id, String requestor); + void updateConnectorConfigurationStep(String id, String configurationStepName, ConfigurationStepConfigurationDTO configurationStepConfiguration); void applyConnectorUpdate(String id, ConnectorUpdateContext updateContext); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java index 409dac962397..c9fd333180bb 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectorDAO.java @@ -53,6 +53,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @Repository @@ -150,6 +151,25 @@ public void verifyCancelDrainFlowFile(final String id) { connector.verifyCancelDrainFlowFiles(); } + @Override + public void verifyPurgeFlowFiles(final String id) { + final ConnectorNode connector = getConnector(id); + connector.verifyCanPurgeFlowFiles(); + } + + @Override + public void purgeFlowFiles(final String id, final String requestor) { + final ConnectorNode connector = getConnector(id); + try { + connector.purgeFlowFiles(requestor).get(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException("Thread was interrupted while purging FlowFiles for Connector " + id, e); + } catch (final ExecutionException e) { + throw new IllegalStateException("Failed to purge FlowFiles for Connector " + id, e.getCause()); + } + } + @Override public void updateConnectorConfigurationStep(final String id, final String configurationStepName, final ConfigurationStepConfigurationDTO configurationStepDto) { final ConnectorNode connector = getConnector(id); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java index 311843db5d92..e0116ff9f7ef 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java @@ -1472,11 +1472,39 @@ public void deleteControllerLevelServices() throws NiFiClientException, IOExcept public void deleteConnectors() throws NiFiClientException, IOException { final ConnectorsEntity connectors = nifiClient.getFlowClient().getConnectors(); for (final ConnectorEntity connector : connectors.getConnectors()) { + purgeConnectorFlowFiles(connector.getId()); connector.setDisconnectedNodeAcknowledged(true); nifiClient.getConnectorClient().deleteConnector(connector); } } + public DropRequestEntity purgeConnectorFlowFiles(final String connectorId) throws NiFiClientException, IOException { + final ConnectorClient connectorClient = getConnectorClient(); + final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(1L); + + DropRequestEntity requestEntity = connectorClient.createPurgeRequest(connectorId); + try { + while (requestEntity.getDropRequest().getPercentCompleted() < 100) { + if (System.currentTimeMillis() > maxTimestamp) { + throw new IOException("Timed out waiting for Connector " + connectorId + " to purge FlowFiles"); + } + + try { + Thread.sleep(50L); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + return null; + } + + requestEntity = connectorClient.getPurgeRequest(connectorId, requestEntity.getDropRequest().getId()); + } + } finally { + requestEntity = connectorClient.deletePurgeRequest(connectorId, requestEntity.getDropRequest().getId()); + } + + return requestEntity; + } + public void waitForControllerServiceRunStatus(final String id, final String requestedRunStatus) throws NiFiClientException, IOException { final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2L); logger.info("Waiting for Controller Service {} to have a Run Status of {}", id, requestedRunStatus); diff --git a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java index 76dfefaeaa65..337954895553 100644 --- a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java +++ b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ConnectorClient.java @@ -22,6 +22,7 @@ import org.apache.nifi.web.api.entity.ConfigurationStepNamesEntity; import org.apache.nifi.web.api.entity.ConnectorEntity; import org.apache.nifi.web.api.entity.ConnectorPropertyAllowableValuesEntity; +import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.VerifyConnectorConfigStepRequestEntity; @@ -333,6 +334,38 @@ VerifyConnectorConfigStepRequestEntity deleteConfigStepVerificationRequest(Strin */ Path getAssetContent(String connectorId, String assetId, File outputDirectory) throws NiFiClientException, IOException; + /** + * Creates a request to purge all FlowFiles for the given connector. + * + * @param connectorId the connector ID + * @return the drop request entity containing the purge request status + * @throws NiFiClientException if an error occurs during the request + * @throws IOException if an I/O error occurs + */ + DropRequestEntity createPurgeRequest(String connectorId) throws NiFiClientException, IOException; + + /** + * Gets the status of a purge request for the given connector. + * + * @param connectorId the connector ID + * @param purgeRequestId the purge request ID + * @return the drop request entity containing the purge request status + * @throws NiFiClientException if an error occurs during the request + * @throws IOException if an I/O error occurs + */ + DropRequestEntity getPurgeRequest(String connectorId, String purgeRequestId) throws NiFiClientException, IOException; + + /** + * Deletes (cancels) a purge request for the given connector. + * + * @param connectorId the connector ID + * @param purgeRequestId the purge request ID + * @return the drop request entity containing the final purge request status + * @throws NiFiClientException if an error occurs during the request + * @throws IOException if an I/O error occurs + */ + DropRequestEntity deletePurgeRequest(String connectorId, String purgeRequestId) throws NiFiClientException, IOException; + /** * Indicates that mutable requests should indicate that the client has * acknowledged that the node is disconnected. diff --git a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java index 4f9b8d8aaa5e..06476292da74 100644 --- a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java +++ b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyConnectorClient.java @@ -32,6 +32,7 @@ import org.apache.nifi.web.api.entity.ConnectorEntity; import org.apache.nifi.web.api.entity.ConnectorPropertyAllowableValuesEntity; import org.apache.nifi.web.api.entity.ConnectorRunStatusEntity; +import org.apache.nifi.web.api.entity.DropRequestEntity; import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity; import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity; import org.apache.nifi.web.api.entity.VerifyConnectorConfigStepRequestEntity; @@ -539,4 +540,57 @@ public Path getAssetContent(final String connectorId, final String assetId, fina } }); } + + @Override + public DropRequestEntity createPurgeRequest(final String connectorId) throws NiFiClientException, IOException { + if (StringUtils.isBlank(connectorId)) { + throw new IllegalArgumentException("Connector id cannot be null or blank"); + } + + return executeAction("Error creating purge request for Connector " + connectorId, () -> { + final WebTarget target = connectorsTarget + .path("{id}/purge-requests") + .resolveTemplate("id", connectorId); + + return getRequestBuilder(target).post(null, DropRequestEntity.class); + }); + } + + @Override + public DropRequestEntity getPurgeRequest(final String connectorId, final String purgeRequestId) throws NiFiClientException, IOException { + if (StringUtils.isBlank(connectorId)) { + throw new IllegalArgumentException("Connector id cannot be null or blank"); + } + if (StringUtils.isBlank(purgeRequestId)) { + throw new IllegalArgumentException("Purge request id cannot be null or blank"); + } + + return executeAction("Error getting purge request for Connector " + connectorId, () -> { + final WebTarget target = connectorsTarget + .path("{id}/purge-requests/{purgeRequestId}") + .resolveTemplate("id", connectorId) + .resolveTemplate("purgeRequestId", purgeRequestId); + + return getRequestBuilder(target).get(DropRequestEntity.class); + }); + } + + @Override + public DropRequestEntity deletePurgeRequest(final String connectorId, final String purgeRequestId) throws NiFiClientException, IOException { + if (StringUtils.isBlank(connectorId)) { + throw new IllegalArgumentException("Connector id cannot be null or blank"); + } + if (StringUtils.isBlank(purgeRequestId)) { + throw new IllegalArgumentException("Purge request id cannot be null or blank"); + } + + return executeAction("Error deleting purge request for Connector " + connectorId, () -> { + final WebTarget target = connectorsTarget + .path("{id}/purge-requests/{purgeRequestId}") + .resolveTemplate("id", connectorId) + .resolveTemplate("purgeRequestId", purgeRequestId); + + return getRequestBuilder(target).delete(DropRequestEntity.class); + }); + } } From 985a640ea9a615de6a8a72627ce765a463bb8939 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 3 Feb 2026 10:12:05 -0500 Subject: [PATCH 2/2] NIFI-15511: Added WRITE permission requirement for retrieving and deleting a Connector Purge request --- .../org/apache/nifi/web/api/ConnectorResource.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java index 6a0e12c264a3..00a44a05758d 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectorResource.java @@ -883,6 +883,10 @@ public Response getPurgeRequest( } final NiFiUser user = NiFiUserUtils.getNiFiUser(); + serviceFacade.authorizeAccess(lookup -> { + final Authorizable connector = lookup.getConnector(connectorId); + connector.authorize(authorizer, RequestAction.WRITE, user); + }); final AsynchronousWebRequest asyncRequest = purgeRequestManager.getRequest(PURGE_REQUEST_TYPE, purgeRequestId, user); final DropRequestEntity purgeRequestEntity = createPurgeRequestEntity(asyncRequest, connectorId, purgeRequestId); @@ -925,6 +929,13 @@ public Response removePurgeRequest( } final NiFiUser user = NiFiUserUtils.getNiFiUser(); + + // Make sure user has write access to the connector + serviceFacade.authorizeAccess(lookup -> { + final Authorizable connector = lookup.getConnector(connectorId); + connector.authorize(authorizer, RequestAction.WRITE, user); + }); + final boolean twoPhaseRequest = isTwoPhaseRequest(httpServletRequest); final boolean executionPhase = isExecutionPhase(httpServletRequest);