diff --git a/src/main/java/org/apache/nifi/components/connector/DropFlowFileSummary.java b/src/main/java/org/apache/nifi/components/connector/DropFlowFileSummary.java new file mode 100644 index 0000000..f8abf0b --- /dev/null +++ b/src/main/java/org/apache/nifi/components/connector/DropFlowFileSummary.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +/** + * A summary of FlowFiles that were dropped from a FlowFile Queue. + * This class provides information about the number of FlowFiles dropped + * and the total aggregate size in bytes of those FlowFiles. + */ +public class DropFlowFileSummary { + + private final int droppedCount; + private final long droppedBytes; + + /** + * Creates a new DropFlowFileSummary with the given count and byte size. + * + * @param droppedCount the number of FlowFiles that were dropped + * @param droppedBytes the total size in bytes of all dropped FlowFiles + */ + public DropFlowFileSummary(final int droppedCount, final long droppedBytes) { + this.droppedCount = droppedCount; + this.droppedBytes = droppedBytes; + } + + /** + * @return the number of FlowFiles that were dropped + */ + public int getDroppedCount() { + return droppedCount; + } + + /** + * @return the total size in bytes of all dropped FlowFiles + */ + public long getDroppedBytes() { + return droppedBytes; + } + + /** + * Creates a new DropFlowFileSummary that represents the combination of this summary and the given summary. + * + * @param other the other summary to add to this one + * @return a new DropFlowFileSummary representing the combined totals + */ + public DropFlowFileSummary add(final DropFlowFileSummary other) { + return new DropFlowFileSummary(this.droppedCount + other.droppedCount, this.droppedBytes + other.droppedBytes); + } + + @Override + public String toString() { + return "DropFlowFileSummary[droppedCount=" + droppedCount + ", droppedBytes=" + droppedBytes + "]"; + } +} diff --git a/src/main/java/org/apache/nifi/components/connector/components/ConnectionFacade.java b/src/main/java/org/apache/nifi/components/connector/components/ConnectionFacade.java index 58cbac2..ff17d2b 100644 --- a/src/main/java/org/apache/nifi/components/connector/components/ConnectionFacade.java +++ b/src/main/java/org/apache/nifi/components/connector/components/ConnectionFacade.java @@ -17,8 +17,13 @@ package org.apache.nifi.components.connector.components; +import org.apache.nifi.components.connector.DropFlowFileSummary; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.flow.VersionedConnection; +import org.apache.nifi.flowfile.FlowFile; + +import java.io.IOException; +import java.util.function.Predicate; public interface ConnectionFacade { @@ -35,4 +40,13 @@ public interface ConnectionFacade { */ void purge(); + /** + * Drops all FlowFiles from the connection that match the given predicate. + * + * @param predicate the predicate to use to determine which FlowFiles to drop + * @return a summary of the FlowFiles that were dropped + * @throws IOException if an I/O error occurs while dropping FlowFiles + */ + DropFlowFileSummary dropFlowFiles(Predicate predicate) throws IOException; + } diff --git a/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupFacade.java b/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupFacade.java index 79655fd..f0b566c 100644 --- a/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupFacade.java +++ b/src/main/java/org/apache/nifi/components/connector/components/ProcessGroupFacade.java @@ -17,10 +17,14 @@ package org.apache.nifi.components.connector.components; +import org.apache.nifi.components.connector.DropFlowFileSummary; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flowfile.FlowFile; +import java.io.IOException; import java.util.Set; +import java.util.function.Predicate; public interface ProcessGroupFacade { @@ -52,4 +56,14 @@ public interface ProcessGroupFacade { ProcessGroupLifecycle getLifecycle(); + /** + * Drops all FlowFiles from all connections in this ProcessGroup and its child ProcessGroups + * that match the given predicate. + * + * @param predicate the predicate to test each FlowFile against + * @return a summary of the dropped FlowFiles + * @throws IOException if an I/O error occurs while dropping FlowFiles + */ + DropFlowFileSummary dropFlowFiles(Predicate predicate) throws IOException; + }