Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.components.connector;

/**
* A summary of FlowFiles that were dropped from a FlowFile Queue.
* This class provides information about the number of FlowFiles dropped
* and the total aggregate size in bytes of those FlowFiles.
*/
public class DropFlowFileSummary {

private final int droppedCount;
private final long droppedBytes;

/**
* Creates a new DropFlowFileSummary with the given count and byte size.
*
* @param droppedCount the number of FlowFiles that were dropped
* @param droppedBytes the total size in bytes of all dropped FlowFiles
*/
public DropFlowFileSummary(final int droppedCount, final long droppedBytes) {
this.droppedCount = droppedCount;
this.droppedBytes = droppedBytes;
}

/**
* @return the number of FlowFiles that were dropped
*/
public int getDroppedCount() {
return droppedCount;
}

/**
* @return the total size in bytes of all dropped FlowFiles
*/
public long getDroppedBytes() {
return droppedBytes;
}

/**
* Creates a new DropFlowFileSummary that represents the combination of this summary and the given summary.
*
* @param other the other summary to add to this one
* @return a new DropFlowFileSummary representing the combined totals
*/
public DropFlowFileSummary add(final DropFlowFileSummary other) {
return new DropFlowFileSummary(this.droppedCount + other.droppedCount, this.droppedBytes + other.droppedBytes);
}

@Override
public String toString() {
return "DropFlowFileSummary[droppedCount=" + droppedCount + ", droppedBytes=" + droppedBytes + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@

package org.apache.nifi.components.connector.components;

import org.apache.nifi.components.connector.DropFlowFileSummary;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flowfile.FlowFile;

import java.io.IOException;
import java.util.function.Predicate;

public interface ConnectionFacade {

Expand All @@ -35,4 +40,13 @@ public interface ConnectionFacade {
*/
void purge();

/**
* Drops all FlowFiles from the connection that match the given predicate.
*
* @param predicate the predicate to use to determine which FlowFiles to drop
* @return a summary of the FlowFiles that were dropped
* @throws IOException if an I/O error occurs while dropping FlowFiles
*/
DropFlowFileSummary dropFlowFiles(Predicate<FlowFile> predicate) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.apache.nifi.components.connector.components;

import org.apache.nifi.components.connector.DropFlowFileSummary;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flowfile.FlowFile;

import java.io.IOException;
import java.util.Set;
import java.util.function.Predicate;

public interface ProcessGroupFacade {

Expand Down Expand Up @@ -52,4 +56,14 @@ public interface ProcessGroupFacade {

ProcessGroupLifecycle getLifecycle();

/**
* Drops all FlowFiles from all connections in this ProcessGroup and its child ProcessGroups
* that match the given predicate.
*
* @param predicate the predicate to test each FlowFile against
* @return a summary of the dropped FlowFiles
* @throws IOException if an I/O error occurs while dropping FlowFiles
*/
DropFlowFileSummary dropFlowFiles(Predicate<FlowFile> predicate) throws IOException;

}