Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,7 @@ protected final ConnectorInitializationContext getInitializationContext() {
@Override
public void start(final FlowContext context) throws FlowUpdateException {
final ProcessGroupLifecycle lifecycle = context.getRootGroup().getLifecycle();

try {
lifecycle.enableControllerServices(ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY, ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get();
} catch (final Exception e) {
throw new FlowUpdateException("Failed to enable Controller Services", e);
}

lifecycle.startProcessors();
lifecycle.start(ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY);
}

@Override
Expand Down Expand Up @@ -129,7 +122,7 @@ private CompletableFuture<Void> stopAsync(final FlowContext context) {
}
});

return stopProcessorsFuture.thenRun(() -> {
return stopProcessorsFuture.thenRunAsync(() -> {
try {
lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get();
} catch (final Exception e) {
Expand Down Expand Up @@ -171,32 +164,8 @@ public CompletableFuture<Void> drainFlowFiles(final FlowContext flowContext) {

final CompletableFuture<Void> stopProcessorsFuture = stopSourceProcessors(flowContext);

final CompletableFuture<Void> startNonSourceFuture = stopProcessorsFuture.thenRun(() -> {
if (result.isDone()) {
return;
}

final CompletableFuture<Void> enableServices = flowContext.getRootGroup().getLifecycle().enableControllerServices(
ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY,
ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS);

try {
// Wait for all referenced services to be enabled.
enableServices.get();

if (!result.isDone()) {
getLogger().info("Starting all non-source processors to facilitate drainage of FlowFiles");
startNonSourceProcessors(flowContext).get();
}
} catch (final Exception e) {
try {
flowContext.getRootGroup().getLifecycle().disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get();
} catch (final Exception e1) {
e.addSuppressed(e1);
}

result.completeExceptionally(new RuntimeException("Failed to start non-source processors while draining FlowFiles", e.getCause()));
}
final CompletableFuture<Void> startNonSourceFuture = stopProcessorsFuture.thenRunAsync(() -> {
startNonSourceProcessors(result, flowContext);
}).exceptionally(throwable -> {
if (!result.isDone()) {
result.completeExceptionally(new RuntimeException("Failed to stop source processors while draining FlowFiles", throwable));
Expand All @@ -205,68 +174,100 @@ public CompletableFuture<Void> drainFlowFiles(final FlowContext flowContext) {
});

startNonSourceFuture.thenRunAsync(() -> {
try {
ensureDrainageUnblocked();
} catch (final Exception e) {
getLogger().warn("Failed to ensure drainage is unblocked when draining FlowFiles", e);
completeDrain(result, flowContext, initialQueueSize);
}).exceptionally(throwable -> {
if (!result.isDone()) {
result.completeExceptionally(new RuntimeException("Failed while draining FlowFiles", throwable));
}
return null;
});

Exception failureReason = null;
int iterations = 0;
while (!isGroupDrained(flowContext.getRootGroup())) {
if (result.isDone()) {
getLogger().info("Drainage has been cancelled; will no longer wait for FlowFiles to drain");
break;
}
return result;
}

// Log the current queue size every 10 seconds (20 iterations of 500ms) so that it's clear
// whether or not progress is being made.
if (iterations++ % 20 == 0) {
final QueueSize queueSize = flowContext.getRootGroup().getQueueSize();
getLogger().info("Waiting for {} FlowFiles ({} bytes) to drain",
queueSize.getObjectCount(), NumberFormat.getNumberInstance().format(queueSize.getByteCount()));
}
private void completeDrain(final CompletableFuture<Void> result, final FlowContext flowContext, final QueueSize initialQueueSize) {
try {
ensureDrainageUnblocked();
} catch (final Exception e) {
getLogger().warn("Failed to ensure drainage is unblocked when draining FlowFiles", e);
}

try {
Thread.sleep(500);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
failureReason = e;
break;
}
Exception failureReason = null;
int iterations = 0;
while (!isGroupDrained(flowContext.getRootGroup())) {
if (result.isDone()) {
getLogger().info("Drainage has been cancelled; will no longer wait for FlowFiles to drain");
break;
}

// Log completion unless the result was completed exceptionally or cancelled.
if (!result.isDone()) {
getLogger().info("All {} FlowFiles have drained from Connector", initialQueueSize.getObjectCount());
// Log the current queue size every 10 seconds (20 iterations of 500ms) so that it's clear
// whether or not progress is being made.
if (iterations++ % 20 == 0) {
final QueueSize queueSize = flowContext.getRootGroup().getQueueSize();
getLogger().info("Waiting for {} FlowFiles ({} bytes) to drain",
queueSize.getObjectCount(), NumberFormat.getNumberInstance().format(queueSize.getByteCount()));
}

try {
stop(flowContext);
} catch (final Exception e) {
getLogger().warn("Failed to stop source Processors after draining FlowFiles", e);
if (failureReason == null) {
failureReason = e;
} else {
failureReason.addSuppressed(e);
}
Thread.sleep(500);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
failureReason = e;
break;
}
}

// Log completion unless the result was completed exceptionally or cancelled.
if (!result.isDone()) {
getLogger().info("All {} FlowFiles have drained from Connector", initialQueueSize.getObjectCount());
}

if (failureReason != null && !result.isDone()) {
result.completeExceptionally(new RuntimeException("Interrupted while waiting for " + AbstractConnector.this + " to drain", failureReason));
try {
stop(flowContext);
} catch (final Exception e) {
getLogger().warn("Failed to stop source Processors after draining FlowFiles", e);
if (failureReason == null) {
failureReason = e;
} else {
failureReason.addSuppressed(e);
}
}

if (failureReason != null && !result.isDone()) {
result.completeExceptionally(new RuntimeException("Interrupted while waiting for " + AbstractConnector.this + " to drain", failureReason));
}

if (!result.isDone()) {
result.complete(null);
}
}

private void startNonSourceProcessors(final CompletableFuture<Void> result, final FlowContext flowContext) {
if (result.isDone()) {
return;
}

final CompletableFuture<Void> enableServices = flowContext.getRootGroup().getLifecycle().enableControllerServices(
ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY,
ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS);

try {
// Wait for all referenced services to be enabled.
enableServices.get();

if (!result.isDone()) {
result.complete(null);
getLogger().info("Starting all non-source processors to facilitate drainage of FlowFiles");
startNonSourceProcessors(flowContext).get();
}
}).exceptionally(throwable -> {
if (!result.isDone()) {
result.completeExceptionally(new RuntimeException("Failed while draining FlowFiles", throwable));
} catch (final Exception e) {
try {
flowContext.getRootGroup().getLifecycle().disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get();
} catch (final Exception e1) {
e.addSuppressed(e1);
}
return null;
});

return result;
result.completeExceptionally(new RuntimeException("Failed to start non-source processors while draining FlowFiles", e.getCause()));
}
}

/**
Expand Down