Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1591,17 +1591,29 @@ private DropFlowFileStatus handleDropAllFlowFiles(String dropRequestId, Function
aggregateDropFlowFileStatus.setState(null);

AtomicBoolean processedAtLeastOne = new AtomicBoolean(false);
final List<CompletableFuture<Void>> completionFutures = new ArrayList<>();

connections.stream()
.map(Connection::getFlowFileQueue)
.map(function::apply)
.forEach(additionalDropFlowFileStatus -> {
aggregate(aggregateDropFlowFileStatus, additionalDropFlowFileStatus);
processedAtLeastOne.set(true);
completionFutures.add(additionalDropFlowFileStatus.getCompletionFuture());
});

if (processedAtLeastOne.get()) {
resultDropFlowFileStatus = aggregateDropFlowFileStatus;

// When all individual drop requests complete, mark the aggregate as complete
CompletableFuture.allOf(completionFutures.toArray(new CompletableFuture[0]))
.whenComplete((result, throwable) -> {
if (throwable != null) {
aggregateDropFlowFileStatus.setState(DropFlowFileState.FAILURE, throwable.getMessage());
} else {
aggregateDropFlowFileStatus.setState(DropFlowFileState.COMPLETE);
}
});
} else {
resultDropFlowFileStatus = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ public interface ConnectorNode extends ComponentAuthorizable, VersionedComponent
*/
void verifyCancelDrainFlowFiles() throws IllegalStateException;

/**
* Verifies that the Connector can have its FlowFiles purged.
* @throws IllegalStateException if not in a state where FlowFiles can be purged
*/
void verifyCanPurgeFlowFiles() throws IllegalStateException;

/**
* Purges all FlowFiles from the Connector, immediately dropping the data.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,19 @@ public void verifyCancelDrainFlowFiles() throws IllegalStateException {
}
}

@Override
public void verifyCanPurgeFlowFiles() throws IllegalStateException {
final ConnectorState desiredState = getDesiredState();
if (desiredState != ConnectorState.STOPPED) {
throw new IllegalStateException("Cannot purge FlowFiles for " + this + " because its desired state is currently " + desiredState + "; it must be STOPPED.");
}

final ConnectorState currentState = getCurrentState();
if (currentState != ConnectorState.STOPPED) {
throw new IllegalStateException("Cannot purge FlowFiles for " + this + " because its current state is " + currentState + "; it must be STOPPED.");
}
}

@Override
public Future<Void> purgeFlowFiles(final String requestor) {
requireStopped("purge FlowFiles", ConnectorState.PURGING);
Expand All @@ -520,7 +533,7 @@ private void requireStopped(final String action, final ConnectorState newState)
while (!stateUpdated) {
final ConnectorState currentState = getCurrentState();
if (currentState != ConnectorState.STOPPED) {
throw new IllegalStateException("Cannot " + action + " for " + this + " because its current state is currently " + currentState + "; it must be STOPPED.");
throw new IllegalStateException("Cannot " + action + " for " + this + " because its current state is " + currentState + "; it must be STOPPED.");
}

stateUpdated = stateTransition.trySetCurrentState(ConnectorState.STOPPED, newState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,22 @@ Set<ControllerServiceEntity> getConnectorControllerServices(String connectorId,

Optional<Asset> getConnectorAsset(String assetId);

/**
* Verifies that the connector is in a state where FlowFiles can be purged.
*
* @param connectorId the connector ID
* @throws IllegalStateException if the connector is not in a state where FlowFiles can be purged
*/
void verifyPurgeConnectorFlowFiles(String connectorId);

/**
* Purges all FlowFiles from the connector.
*
* @param connectorId the connector ID
* @param requestor the identity of the user requesting the purge (used for provenance events)
*/
void purgeConnectorFlowFiles(String connectorId, String requestor);

// ----------------------------------------
// Synchronization methods
// ----------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3859,6 +3859,16 @@ public Optional<Asset> getConnectorAsset(final String assetId) {
return connectorDAO.getAsset(assetId);
}

@Override
public void verifyPurgeConnectorFlowFiles(final String connectorId) {
connectorDAO.verifyPurgeFlowFiles(connectorId);
}

@Override
public void purgeConnectorFlowFiles(final String connectorId, final String requestor) {
connectorDAO.purgeFlowFiles(connectorId, requestor);
}

@Override
public ReportingTaskEntity updateReportingTask(final Revision revision, final ReportingTaskDTO reportingTaskDTO) {
// get the component, ensure we have access to it, and perform the update request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.nifi.web.api.dto.ConfigVerificationResultDTO;
import org.apache.nifi.web.api.dto.ConfigurationStepConfigurationDTO;
import org.apache.nifi.web.api.dto.ConnectorDTO;
import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.VerifyConnectorConfigStepRequestDTO;
import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
import org.apache.nifi.web.api.entity.AssetEntity;
Expand All @@ -88,6 +89,7 @@
import org.apache.nifi.web.api.entity.ConnectorRunStatusEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.SearchResultsEntity;
Expand Down Expand Up @@ -123,6 +125,7 @@ public class ConnectorResource extends ApplicationResource {

private static final Logger logger = LoggerFactory.getLogger(ConnectorResource.class);
private static final String VERIFICATION_REQUEST_TYPE = "verification-request";
private static final String PURGE_REQUEST_TYPE = "purge-request";
private static final String FILENAME_HEADER = "Filename";
private static final String CONTENT_TYPE_HEADER = "Content-Type";
private static final String UPLOAD_CONTENT_TYPE = "application/octet-stream";
Expand All @@ -137,6 +140,8 @@ public class ConnectorResource extends ApplicationResource {
private final RequestManager<VerifyConnectorConfigStepRequestEntity, List<ConfigVerificationResultDTO>> configVerificationRequestManager =
new AsyncRequestManager<>(100, 1L, "Connector Configuration Step Verification");

private final RequestManager<ConnectorEntity, Void> purgeRequestManager = new AsyncRequestManager<>(100, 1L, "Connector FlowFile Purge");

@Context
private ServletContext servletContext;

Expand Down Expand Up @@ -792,6 +797,212 @@ public Response cancelDrain(
);
}


@POST
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("/{id}/purge-requests")
@Operation(
summary = "Creates a request to purge the FlowFiles for this connector",
responses = {
@ApiResponse(
responseCode = "202", description = "The request has been accepted. A HTTP response header will contain the URI where the response can be polled.",
content = @Content(schema = @Schema(implementation = DropRequestEntity.class))
),
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.")
},
description = "This will create a request to purge all FlowFiles from the connector. The connector must be in a STOPPED state before purging can begin. "
+ "This is an asynchronous operation. The client should poll the returned URI to get the status of the purge request.",
security = {
@SecurityRequirement(name = "Write - /connectors/{uuid}")
}
)
public Response createPurgeRequest(
@Parameter(
description = "The connector id.",
required = true
)
@PathParam("id") final String id) {

if (isReplicateRequest()) {
return replicate(HttpMethod.POST);
}

final ConnectorEntity requestConnectorEntity = new ConnectorEntity();
requestConnectorEntity.setId(id);

return withWriteLock(
serviceFacade,
requestConnectorEntity,
lookup -> {
final Authorizable connector = lookup.getConnector(id);
connector.authorize(authorizer, RequestAction.WRITE, NiFiUserUtils.getNiFiUser());
},
() -> serviceFacade.verifyPurgeConnectorFlowFiles(id),
(connectorEntity) -> performAsyncPurge(connectorEntity, id, NiFiUserUtils.getNiFiUser())
);
}


@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("/{id}/purge-requests/{purge-request-id}")
@Operation(
summary = "Gets the current status of a purge request for the specified connector",
responses = {
@ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = DropRequestEntity.class))),
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.")
},
security = {
@SecurityRequirement(name = "Only the user that submitted the request can get it")
}
)
public Response getPurgeRequest(
@Parameter(
description = "The connector id.",
required = true
)
@PathParam("id") final String connectorId,
@Parameter(
description = "The purge request id.",
required = true
)
@PathParam("purge-request-id") final String purgeRequestId) {

if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}

final NiFiUser user = NiFiUserUtils.getNiFiUser();
serviceFacade.authorizeAccess(lookup -> {
final Authorizable connector = lookup.getConnector(connectorId);
connector.authorize(authorizer, RequestAction.WRITE, user);
});

final AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest = purgeRequestManager.getRequest(PURGE_REQUEST_TYPE, purgeRequestId, user);
final DropRequestEntity purgeRequestEntity = createPurgeRequestEntity(asyncRequest, connectorId, purgeRequestId);
return generateOkResponse(purgeRequestEntity).build();
}


@DELETE
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("/{id}/purge-requests/{purge-request-id}")
@Operation(
summary = "Cancels and/or removes a request to purge the FlowFiles for this connector",
responses = {
@ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = DropRequestEntity.class))),
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.")
},
security = {
@SecurityRequirement(name = "Only the user that submitted the request can remove it")
}
)
public Response removePurgeRequest(
@Parameter(
description = "The connector id.",
required = true
)
@PathParam("id") final String connectorId,
@Parameter(
description = "The purge request id.",
required = true
)
@PathParam("purge-request-id") final String purgeRequestId) {

if (isReplicateRequest()) {
return replicate(HttpMethod.DELETE);
}

final NiFiUser user = NiFiUserUtils.getNiFiUser();

// Make sure user has write access to the connector
serviceFacade.authorizeAccess(lookup -> {
final Authorizable connector = lookup.getConnector(connectorId);
connector.authorize(authorizer, RequestAction.WRITE, user);
});

final boolean twoPhaseRequest = isTwoPhaseRequest(httpServletRequest);
final boolean executionPhase = isExecutionPhase(httpServletRequest);

if (!twoPhaseRequest || executionPhase) {
final AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest = purgeRequestManager.removeRequest(PURGE_REQUEST_TYPE, purgeRequestId, user);

if (!asyncRequest.isComplete()) {
asyncRequest.cancel();
}

final DropRequestEntity purgeRequestEntity = createPurgeRequestEntity(asyncRequest, connectorId, purgeRequestId);
return generateOkResponse(purgeRequestEntity).build();
}

if (isValidationPhase(httpServletRequest)) {
purgeRequestManager.getRequest(PURGE_REQUEST_TYPE, purgeRequestId, user);
return generateContinueResponse().build();
} else if (isCancellationPhase(httpServletRequest)) {
return generateOkResponse().build();
} else {
throw new IllegalStateException("This request does not appear to be part of the two phase commit.");
}
}

private Response performAsyncPurge(final ConnectorEntity connectorEntity, final String connectorId, final NiFiUser user) {
final String requestId = generateUuid();
logger.debug("Generated Purge Request with ID {} for Connector {}", requestId, connectorId);

final List<UpdateStep> updateSteps = Collections.singletonList(new StandardUpdateStep("Purge FlowFiles"));

final AsynchronousWebRequest<ConnectorEntity, Void> request =
new StandardAsynchronousWebRequest<>(requestId, connectorEntity, connectorId, user, updateSteps);

final Consumer<AsynchronousWebRequest<ConnectorEntity, Void>> updateTask = asyncRequest -> {
try {
serviceFacade.purgeConnectorFlowFiles(connectorId, user.getIdentity());
asyncRequest.markStepComplete(null);
} catch (final Exception e) {
logger.error("Failed to purge FlowFiles for Connector {}", connectorId, e);
asyncRequest.fail("Failed to purge FlowFiles due to " + e);
}
};

purgeRequestManager.submitRequest(PURGE_REQUEST_TYPE, requestId, request, updateTask);

final DropRequestEntity purgeRequestEntity = createPurgeRequestEntity(request, connectorId, requestId);
final URI location = URI.create(purgeRequestEntity.getDropRequest().getUri());
return Response.status(Response.Status.ACCEPTED).location(location).entity(purgeRequestEntity).build();
}

private DropRequestEntity createPurgeRequestEntity(final AsynchronousWebRequest<ConnectorEntity, Void> asyncRequest,
final String connectorId, final String requestId) {
final DropRequestDTO dto = new DropRequestDTO();
dto.setId(requestId);
dto.setUri(generateResourceUri("connectors", connectorId, "purge-requests", requestId));
dto.setSubmissionTime(asyncRequest.getLastUpdated());
dto.setLastUpdated(asyncRequest.getLastUpdated());
dto.setPercentCompleted(asyncRequest.getPercentComplete());
dto.setFinished(asyncRequest.isComplete());
dto.setFailureReason(asyncRequest.getFailureReason());
dto.setState(asyncRequest.getState());

final DropRequestEntity entity = new DropRequestEntity();
entity.setDropRequest(dto);
return entity;
}

/**
* Gets the configuration step names for the specified connector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public interface ConnectorDAO {

void verifyCancelDrainFlowFile(String id);

void verifyPurgeFlowFiles(String id);

void purgeFlowFiles(String id, String requestor);

void updateConnectorConfigurationStep(String id, String configurationStepName, ConfigurationStepConfigurationDTO configurationStepConfiguration);

void applyConnectorUpdate(String id, ConnectorUpdateContext updateContext);
Expand Down
Loading
Loading