From ace38df5a7cce5ae911ff8c4d4c6cd07a044bead Mon Sep 17 00:00:00 2001 From: Rakesh Kumar Singh Date: Wed, 6 May 2026 16:01:59 +0530 Subject: [PATCH 1/3] NIFI-15483: Fixed PublishAMQP routing FlowFiles to success when broker cannot deliver message MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PublishAMQP uses mandatory=true on basicPublish() so the broker returns messages it cannot route to any queue. However, the return arrives asynchronously via ReturnListener.handleReturn() on the AMQP I/O thread while the publishing thread had already moved on to session.transfer(REL_SUCCESS). The UndeliverableMessageLogger only logged a warning — it never signaled failure back to publish() or onTrigger(), so every unroutable message was silently counted as a success despite never reaching any consumer. Fix: - Enabled Publisher Confirms (channel.confirmSelect()) in the constructor. The broker's basic.return frame for an unroutable message is guaranteed to arrive before the corresponding confirm frame, so waitForConfirms() acts as a synchronization barrier that makes return detection reliable. - Added an AtomicReference field (undeliverableReturnReason) that UndeliverableMessageLogger.handleReturn() populates with exchange/routingKey/ replyCode/replyText when a message is returned. - publish() now: resets the field before each call, calls waitForConfirms(5s) to synchronize with the broker, then checks the field and throws AMQPException if the message was returned — causing onTrigger() to route to REL_FAILURE. - Broker NACKs (e.g., resource alarm) are also now surfaced as AMQPException because waitForConfirms() returns false on NACK. Co-authored-by: Rakesh Kumar Singh --- .../nifi/amqp/processors/AMQPPublisher.java | 73 +++++++++++++++---- 1 file changed, 60 insertions(+), 13 deletions(-) diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java index 27a64bbb1ffb..0427648ff0da 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.net.SocketException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; /** * Generic publisher of messages to AMQP-based messaging system. It is based on @@ -33,6 +35,14 @@ final class AMQPPublisher extends AMQPWorker { private final String connectionString; + /** + * Stores the broker's return reason when a message is published with mandatory=true + * but the broker cannot route it to any queue. Written by the AMQP I/O thread via + * {@link UndeliverableMessageLogger} and read by the publishing thread after + * {@link com.rabbitmq.client.Channel#waitForConfirms} synchronizes the two. + */ + private final AtomicReference undeliverableReturnReason = new AtomicReference<>(null); + /** * Creates an instance of this publisher * @@ -43,6 +53,17 @@ final class AMQPPublisher extends AMQPWorker { getChannel().addReturnListener(new UndeliverableMessageLogger()); this.connectionString = connection.toString(); + // Enable Publisher Confirms on this channel so that waitForConfirms() can be used + // after basicPublish() to create a synchronization point. This ensures that any + // basic.return frame sent by the broker (for mandatory messages it cannot route) + // will have been processed by the ReturnListener before waitForConfirms() returns, + // allowing undeliverable messages to be reliably detected and routed to REL_FAILURE. + try { + getChannel().confirmSelect(); + } catch (final IOException e) { + throw new AMQPException("Failed to enable Publisher Confirms on AMQP channel", e); + } + processorLog.info("Successfully connected AMQPPublisher to {}", this.connectionString); } @@ -68,6 +89,9 @@ void publish(byte[] bytes, BasicProperties properties, String routingKey, String processorLog.debug("Successfully connected AMQPPublisher to {} and '{}' exchange with '{}' as a routing key.", this.connectionString, exchange, routingKey); } + // Reset any stale return reason from a previous publish before sending. + undeliverableReturnReason.set(null); + try { getChannel().basicPublish(exchange, routingKey, true, properties, bytes); } catch (AlreadyClosedException | SocketException e) { @@ -75,6 +99,30 @@ void publish(byte[] bytes, BasicProperties properties, String routingKey, String } catch (Exception e) { throw new AMQPException("Failed to publish message to Exchange '" + exchange + "' with Routing Key '" + routingKey + "'.", e); } + + // Wait for the broker's publish confirm (ack/nack). Because the broker sends a basic.return + // frame BEFORE the corresponding confirm frame for mandatory messages it cannot route, + // UndeliverableMessageLogger.handleReturn() is guaranteed to have run by the time + // waitForConfirms() returns. This makes undeliverable-message detection reliable. + try { + if (!getChannel().waitForConfirms(5_000L)) { + throw new AMQPException("Broker negatively acknowledged (NACK) message published to Exchange '" + + exchange + "' with Routing Key '" + routingKey + "'"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AMQPException("Interrupted while waiting for publish confirmation from broker", e); + } catch (TimeoutException e) { + throw new AMQPException("Timed out waiting for publish confirmation from broker for Exchange '" + + exchange + "' with Routing Key '" + routingKey + "'", e); + } + + // If the broker returned the message (e.g., no queue bound to the exchange/routing-key), + // surface it as a hard failure so the caller can route to REL_FAILURE instead of REL_SUCCESS. + final String returnReason = undeliverableReturnReason.get(); + if (returnReason != null) { + throw new AMQPException("Message returned as undeliverable by broker — " + returnReason); + } } @Override @@ -83,23 +131,22 @@ public String toString() { } /** - * Listener to listen and WARN-log undeliverable messages which are returned - * back to the sender. Since in the current implementation messages are sent - * with 'mandatory' bit set, such messages must have final destination - * otherwise they are silently dropped which could cause a confusion - * especially during early stages of flow development. This implies that - * bindings between exchange -> routingKey -> queue must exist and are - * typically done by AMQP administrator. This logger simply helps to monitor - * for such conditions by logging such messages as warning. In the future - * this can be extended to provide other type of functionality (e.g., fail - * processor etc.) + * Listens for messages returned by the broker when they cannot be routed to any queue + * (mandatory=true publish with no matching binding). Previously this listener only logged + * a warning, causing PublishAMQP to silently route the FlowFile to REL_SUCCESS even though + * the message was never delivered. (NIFI-15483) + * + * Now it stores the return reason in {@link #undeliverableReturnReason} so that + * {@link #publish} can detect it after {@code waitForConfirms()} synchronizes the two + * threads and throw an {@link AMQPException} to trigger REL_FAILURE routing. */ private final class UndeliverableMessageLogger implements ReturnListener { @Override public void handleReturn(int replyCode, String replyText, String exchangeName, String routingKey, BasicProperties properties, byte[] message) throws IOException { - String logMessage = "Message destined for '" + exchangeName + "' exchange with '" + routingKey - + "' as routing key came back with replyCode=" + replyCode + " and replyText=" + replyText + "."; - processorLog.warn(logMessage); + final String reason = "exchange='" + exchangeName + "' routingKey='" + routingKey + + "' replyCode=" + replyCode + " replyText='" + replyText + "'"; + undeliverableReturnReason.set(reason); + processorLog.warn("Message returned as undeliverable by broker: {}", reason); } } } From 89ce24b4ee27e308f42ce80459ccc3a11b01695b Mon Sep 17 00:00:00 2001 From: Rakesh Kumar Singh Date: Wed, 6 May 2026 21:28:30 +0530 Subject: [PATCH 2/3] NIFI-15483: Handle ShutdownSignalException from waitForConfirms on missing exchange When the broker closes the channel with a 404 NOT_FOUND error (e.g., exchange does not exist), waitForConfirms() throws ShutdownSignalException instead of returning normally. This was propagating as an unhandled processor failure rather than routing the FlowFile to REL_FAILURE. - Added ShutdownSignalException to the catch block in AMQPPublisher.publish() - Converts the channel-close signal into AMQPException so PublishAMQP routes the FlowFile to REL_FAILURE with a descriptive error message - Added ShutdownSignalException import Co-authored-by: Rakesh Kumar Singh --- .../apache/nifi/amqp/processors/AMQPPublisher.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java index 0427648ff0da..47ee1c49e358 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java @@ -20,6 +20,7 @@ import com.rabbitmq.client.AlreadyClosedException; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ReturnListener; +import com.rabbitmq.client.ShutdownSignalException; import org.apache.nifi.logging.ComponentLog; import java.io.IOException; @@ -104,6 +105,11 @@ void publish(byte[] bytes, BasicProperties properties, String routingKey, String // frame BEFORE the corresponding confirm frame for mandatory messages it cannot route, // UndeliverableMessageLogger.handleReturn() is guaranteed to have run by the time // waitForConfirms() returns. This makes undeliverable-message detection reliable. + // + // If the exchange does not exist, the broker closes the channel with a 404 NOT_FOUND + // channel.close frame, which causes waitForConfirms() to throw ShutdownSignalException. + // We catch it here and convert it to AMQPException so the FlowFile routes to REL_FAILURE + // instead of surfacing as an unhandled processor error. try { if (!getChannel().waitForConfirms(5_000L)) { throw new AMQPException("Broker negatively acknowledged (NACK) message published to Exchange '" @@ -115,6 +121,11 @@ void publish(byte[] bytes, BasicProperties properties, String routingKey, String } catch (TimeoutException e) { throw new AMQPException("Timed out waiting for publish confirmation from broker for Exchange '" + exchange + "' with Routing Key '" + routingKey + "'", e); + } catch (ShutdownSignalException e) { + // Broker closed the channel — most commonly because the exchange does not exist (404) + // or the vhost was deleted. Convert to AMQPException so PublishAMQP routes to REL_FAILURE. + throw new AMQPException("Broker closed channel while waiting for publish confirmation — " + + "Exchange '" + exchange + "' may not exist: " + e.getMessage(), e); } // If the broker returned the message (e.g., no queue bound to the exchange/routing-key), From 232ca83244f59eae23fe5cad4aa5ffd8629ca53b Mon Sep 17 00:00:00 2001 From: Rakesh Kumar Singh Date: Thu, 7 May 2026 12:21:06 +0530 Subject: [PATCH 3/3] NIFI-15483: Add tests for ShutdownSignalException and broker NACK handling Added regression tests to verify that AMQPPublisher and PublishAMQP correctly route FlowFiles to REL_FAILURE for all broker-side failure modes: - TestChannel: implemented confirmSelect() (no-op) and waitForConfirms(timeout) with simulation flags for ShutdownSignalException and NACK - TestChannel: added simulateSynchronousReturn flag for deterministic undeliverable-message tests (fires ReturnListeners synchronously) - TestConnection: exposed getTestChannel() for test configuration - AMQPPublisherTest: added 3 new unit tests * failPublishWhenBrokerClosesChannelDuringConfirm * failPublishWhenBrokerNacksMessage * failPublishWhenMessageReturnedAsUndeliverable - PublishAMQPTest: added 2 new integration tests * validateFlowFileRoutedToFailureWhenBrokerClosesChannel * validateFlowFileRoutedToFailureOnBrokerNack All 45 tests pass. Tests were verified to FAIL without the corresponding fix. Co-authored-by: Rakesh Kumar Singh --- .../amqp/processors/AMQPPublisherTest.java | 57 ++++++++++++++++++- .../nifi/amqp/processors/PublishAMQPTest.java | 40 +++++++++++++ .../nifi/amqp/processors/TestChannel.java | 42 ++++++++++++-- .../nifi/amqp/processors/TestConnection.java | 4 ++ 4 files changed, 137 insertions(+), 6 deletions(-) diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java index 84d7c5528f58..b8c92afe8dca 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java @@ -37,7 +37,6 @@ public class AMQPPublisherTest { - @SuppressWarnings("resource") @Test public void failOnNullConnection() { assertThrows(IllegalArgumentException.class, () -> new AMQPPublisher(null, null)); @@ -105,4 +104,60 @@ public void validateSuccessfulPublishingAndUndeliverableRoutingKey() throws Exce connection.close(); } + /** + * Verifies that a {@link com.rabbitmq.client.ShutdownSignalException} thrown by + * {@code waitForConfirms()} (e.g., broker closes channel with 404 NOT_FOUND because the + * exchange does not exist) is converted to {@link AMQPException} so the FlowFile routes + * to REL_FAILURE instead of surfacing as an unhandled processor error. + */ + @Test + public void failPublishWhenBrokerClosesChannelDuringConfirm() { + assertThrows(AMQPException.class, () -> { + TestConnection conn = new TestConnection(null, null); + conn.getTestChannel().setSimulateShutdownOnConfirm(true); + try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) { + sender.publish("hello".getBytes(), null, "foo", ""); + } + }); + } + + /** + * Verifies that a broker NACK (waitForConfirms returns false) throws {@link AMQPException} + * so the FlowFile routes to REL_FAILURE. + */ + @Test + public void failPublishWhenBrokerNacksMessage() { + assertThrows(AMQPException.class, () -> { + TestConnection conn = new TestConnection(null, null); + conn.getTestChannel().setSimulateNackOnConfirm(true); + try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) { + sender.publish("hello".getBytes(), null, "foo", ""); + } + }); + } + + /** + * Verifies that when the broker returns a message as undeliverable (basic.return, e.g., no + * queue bound to the exchange/routing-key), an {@link AMQPException} is thrown so the FlowFile + * routes to REL_FAILURE rather than silently to REL_SUCCESS. + */ + @Test + public void failPublishWhenMessageReturnedAsUndeliverable() { + assertThrows(AMQPException.class, () -> { + Map> routingMap = new HashMap<>(); + routingMap.put("key1", Arrays.asList("queue1")); + Map exchangeToRoutingKeymap = new HashMap<>(); + exchangeToRoutingKeymap.put("myExchange", "key1"); + + TestConnection conn = new TestConnection(exchangeToRoutingKeymap, routingMap); + // Fire return listener synchronously so it is guaranteed to run before waitForConfirms() + conn.getTestChannel().setSimulateSynchronousReturn(true); + + try (AMQPPublisher sender = new AMQPPublisher(conn, new MockComponentLog("id", ""))) { + // Wrong routing key → broker returns the message as undeliverable + sender.publish("hello".getBytes(), null, "wrongKey", "myExchange"); + } + }); + } + } diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java index e2a2f0697aed..ce933ae25234 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java @@ -278,5 +278,45 @@ protected Connection createConnection(ProcessContext context, ExecutorService ex public Connection getConnection() { return connection; } + + public TestChannel getTestChannel() { + return connection.getTestChannel(); + } + } + + /** + * When the broker closes the channel with a 404 (exchange not found), the FlowFile + * must route to REL_FAILURE — not cause an unhandled processor exception. + */ + @Test + public void validateFlowFileRoutedToFailureWhenBrokerClosesChannel() { + final LocalPublishAMQP proc = new LocalPublishAMQP(); + final TestRunner testRunner = TestRunners.newTestRunner(proc); + setConnectionProperties(testRunner); + proc.getTestChannel().setSimulateShutdownOnConfirm(true); + + testRunner.enqueue("Hello Joe".getBytes()); + testRunner.run(); + + assertTrue(testRunner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).isEmpty()); + assertNotNull(testRunner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).getFirst()); + } + + /** + * When the broker sends a NACK for the published message, the FlowFile must route + * to REL_FAILURE. + */ + @Test + public void validateFlowFileRoutedToFailureOnBrokerNack() { + final LocalPublishAMQP proc = new LocalPublishAMQP(); + final TestRunner testRunner = TestRunners.newTestRunner(proc); + setConnectionProperties(testRunner); + proc.getTestChannel().setSimulateNackOnConfirm(true); + + testRunner.enqueue("Hello Joe".getBytes()); + testRunner.run(); + + assertTrue(testRunner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).isEmpty()); + assertNotNull(testRunner.getFlowFilesForRelationship(PublishAMQP.REL_FAILURE).getFirst()); } } diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java index ada7f18958fa..20c1ccb77745 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java @@ -78,6 +78,9 @@ class TestChannel implements Channel { private final BitSet acknowledgments = new BitSet(); private final BitSet nacks = new BitSet(); private int prefetchCount = 0; + private boolean simulateShutdownOnConfirm = false; + private boolean simulateNackOnConfirm = false; + private boolean simulateSynchronousReturn = false; public TestChannel(Map exchangeToRoutingKeyMappings, Map> routingKeyToQueueMappings) { @@ -100,6 +103,24 @@ void corruptChannel() { this.corrupted = true; } + /** Causes the next {@link #waitForConfirms(long)} call to throw {@link ShutdownSignalException}, + * simulating the broker closing the channel (e.g., exchange not found, 404 NOT_FOUND). */ + void setSimulateShutdownOnConfirm(boolean simulate) { + this.simulateShutdownOnConfirm = simulate; + } + + /** Causes the next {@link #waitForConfirms(long)} call to return {@code false}, + * simulating the broker sending a NACK for the published message. */ + void setSimulateNackOnConfirm(boolean simulate) { + this.simulateNackOnConfirm = simulate; + } + + /** When {@code true}, return listeners are invoked synchronously inside + * {@link #basicPublish} rather than asynchronously, making tests deterministic. */ + void setSimulateSynchronousReturn(boolean simulate) { + this.simulateSynchronousReturn = simulate; + } + void setConnection(Connection connection) { this.connection = connection; } @@ -283,15 +304,23 @@ public void basicPublish(final String exchange, final String routingKey, boolean private void discard(final String exchange, final String routingKey, boolean mandatory, final BasicProperties props, final byte[] body) { - // NO ROUTE. Invoke return listener async + // NO ROUTE. Invoke return listener — synchronously when simulating for tests, async otherwise. for (final ReturnListener listener : returnListeners) { - this.executorService.execute(() -> { + if (simulateSynchronousReturn) { try { listener.handleReturn(-9, "Rejecting", exchange, routingKey, props, body); } catch (Exception e) { throw new IllegalStateException("Failed to send return message", e); } - }); + } else { + this.executorService.execute(() -> { + try { + listener.handleReturn(-9, "Rejecting", exchange, routingKey, props, body); + } catch (Exception e) { + throw new IllegalStateException("Failed to send return message", e); + } + }); + } } } @@ -582,7 +611,7 @@ public RollbackOk txRollback() throws IOException { @Override public com.rabbitmq.client.AMQP.Confirm.SelectOk confirmSelect() throws IOException { - throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + return null; // no-op: publisher confirms enabled for testing } @Override @@ -597,7 +626,10 @@ public boolean waitForConfirms() throws InterruptedException { @Override public boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException { - throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing"); + if (simulateShutdownOnConfirm) { + throw new ShutdownSignalException(false, false, null, this); + } + return !simulateNackOnConfirm; } @Override diff --git a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java index 996c00dd8ceb..10adb50438a3 100644 --- a/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java +++ b/nifi-extension-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestConnection.java @@ -122,6 +122,10 @@ public Channel createChannel() throws IOException { return this.channel; } + public TestChannel getTestChannel() { + return this.channel; + } + @Override public Channel createChannel(int channelNumber) throws IOException { throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");