diff --git a/extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java b/extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java
index 5e90ed00d..517b0cda4 100644
--- a/extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java
+++ b/extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java
@@ -17,6 +17,7 @@
import io.a2a.server.events.EventQueueFactory;
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.InMemoryQueueManager;
+import io.a2a.server.events.MainEventBus;
import io.a2a.server.events.QueueManager;
@ApplicationScoped
@@ -32,10 +33,12 @@ public class ReplicatedQueueManager implements QueueManager {
private TaskStateProvider taskStateProvider;
@Inject
- public ReplicatedQueueManager(ReplicationStrategy replicationStrategy, TaskStateProvider taskStateProvider) {
+ public ReplicatedQueueManager(ReplicationStrategy replicationStrategy,
+ TaskStateProvider taskStateProvider,
+ MainEventBus mainEventBus) {
this.replicationStrategy = replicationStrategy;
this.taskStateProvider = taskStateProvider;
- this.delegate = new InMemoryQueueManager(new ReplicatingEventQueueFactory(), taskStateProvider);
+ this.delegate = new InMemoryQueueManager(new ReplicatingEventQueueFactory(), taskStateProvider, mainEventBus);
}
@@ -139,12 +142,11 @@ public EventQueue.EventQueueBuilder builder(String taskId) {
// which sends the QueueClosedEvent after the database transaction commits.
// This ensures proper ordering and transactional guarantees.
- // Return the builder with callbacks
- return delegate.getEventQueueBuilder(taskId)
- .taskId(taskId)
- .hook(new ReplicationHook(taskId))
- .addOnCloseCallback(delegate.getCleanupCallback(taskId))
- .taskStateProvider(taskStateProvider);
+ // Call createBaseEventQueueBuilder() directly to avoid infinite recursion
+ // (getEventQueueBuilder() would delegate back to this factory, creating a loop)
+ // The base builder already includes: taskId, cleanup callback, taskStateProvider, mainEventBus
+ return delegate.createBaseEventQueueBuilder(taskId)
+ .hook(new ReplicationHook(taskId));
}
}
diff --git a/extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java b/extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java
index 42454ea3b..912564577 100644
--- a/extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java
+++ b/extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java
@@ -17,17 +17,23 @@
import java.util.concurrent.atomic.AtomicInteger;
import io.a2a.extras.common.events.TaskFinalizedEvent;
+import io.a2a.json.JsonUtil;
import io.a2a.server.events.EventQueue;
import io.a2a.server.events.EventQueueClosedException;
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.EventQueueTestHelper;
+import io.a2a.server.events.EventQueueUtil;
+import io.a2a.server.events.MainEventBus;
+import io.a2a.server.events.MainEventBusProcessor;
import io.a2a.server.events.QueueClosedEvent;
+import io.a2a.server.tasks.InMemoryTaskStore;
+import io.a2a.server.tasks.PushNotificationSender;
import io.a2a.spec.Event;
import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatus;
import io.a2a.spec.TaskStatusUpdateEvent;
-import io.a2a.json.JsonUtil;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -35,10 +41,24 @@ class ReplicatedQueueManagerTest {
private ReplicatedQueueManager queueManager;
private StreamingEventKind testEvent;
+ private MainEventBus mainEventBus;
+ private MainEventBusProcessor mainEventBusProcessor;
+ private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {};
@BeforeEach
void setUp() {
- queueManager = new ReplicatedQueueManager(new NoOpReplicationStrategy(), new MockTaskStateProvider(true));
+ // Create MainEventBus and MainEventBusProcessor for tests
+ InMemoryTaskStore taskStore = new InMemoryTaskStore();
+ mainEventBus = new MainEventBus();
+ mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER);
+ EventQueueUtil.start(mainEventBusProcessor);
+
+ queueManager = new ReplicatedQueueManager(
+ new NoOpReplicationStrategy(),
+ new MockTaskStateProvider(true),
+ mainEventBus
+ );
+
testEvent = new TaskStatusUpdateEvent.Builder()
.taskId("test-task")
.contextId("test-context")
@@ -47,10 +67,65 @@ void setUp() {
.build();
}
+ /**
+ * Helper to create a test event with the specified taskId.
+ * This ensures taskId consistency between queue creation and event creation.
+ */
+ private TaskStatusUpdateEvent createEventForTask(String taskId) {
+ return new TaskStatusUpdateEvent.Builder()
+ .taskId(taskId)
+ .contextId("test-context")
+ .status(new TaskStatus(TaskState.SUBMITTED))
+ .isFinal(false)
+ .build();
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (mainEventBusProcessor != null) {
+ mainEventBusProcessor.setCallback(null); // Clear any test callbacks
+ EventQueueUtil.stop(mainEventBusProcessor);
+ }
+ mainEventBusProcessor = null;
+ mainEventBus = null;
+ queueManager = null;
+ }
+
+ /**
+ * Helper to wait for MainEventBusProcessor to process an event.
+ * Replaces polling patterns with deterministic callback-based waiting.
+ *
+ * @param action the action that triggers event processing
+ * @throws InterruptedException if waiting is interrupted
+ * @throws AssertionError if processing doesn't complete within timeout
+ */
+ private void waitForEventProcessing(Runnable action) throws InterruptedException {
+ CountDownLatch processingLatch = new CountDownLatch(1);
+ mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() {
+ @Override
+ public void onEventProcessed(String taskId, io.a2a.spec.Event event) {
+ processingLatch.countDown();
+ }
+
+ @Override
+ public void onTaskFinalized(String taskId) {
+ // Not needed for basic event processing wait
+ }
+ });
+
+ try {
+ action.run();
+ assertTrue(processingLatch.await(5, TimeUnit.SECONDS),
+ "MainEventBusProcessor should have processed the event within timeout");
+ } finally {
+ mainEventBusProcessor.setCallback(null);
+ }
+ }
+
@Test
void testReplicationStrategyTriggeredOnNormalEnqueue() throws InterruptedException {
CountingReplicationStrategy strategy = new CountingReplicationStrategy();
- queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true));
+ queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true), mainEventBus);
String taskId = "test-task-1";
EventQueue queue = queueManager.createOrTap(taskId);
@@ -65,7 +140,7 @@ void testReplicationStrategyTriggeredOnNormalEnqueue() throws InterruptedExcepti
@Test
void testReplicationStrategyNotTriggeredOnReplicatedEvent() throws InterruptedException {
CountingReplicationStrategy strategy = new CountingReplicationStrategy();
- queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true));
+ queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true), mainEventBus);
String taskId = "test-task-2";
EventQueue queue = queueManager.createOrTap(taskId);
@@ -79,7 +154,7 @@ void testReplicationStrategyNotTriggeredOnReplicatedEvent() throws InterruptedEx
@Test
void testReplicationStrategyWithCountingImplementation() throws InterruptedException {
CountingReplicationStrategy countingStrategy = new CountingReplicationStrategy();
- queueManager = new ReplicatedQueueManager(countingStrategy, new MockTaskStateProvider(true));
+ queueManager = new ReplicatedQueueManager(countingStrategy, new MockTaskStateProvider(true), mainEventBus);
String taskId = "test-task-3";
EventQueue queue = queueManager.createOrTap(taskId);
@@ -100,46 +175,45 @@ void testReplicationStrategyWithCountingImplementation() throws InterruptedExcep
@Test
void testReplicatedEventDeliveredToCorrectQueue() throws InterruptedException {
String taskId = "test-task-4";
+ TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
EventQueue queue = queueManager.createOrTap(taskId);
- ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
- queueManager.onReplicatedEvent(replicatedEvent);
+ ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, eventForTask);
- Event dequeuedEvent;
- try {
- dequeuedEvent = queue.dequeueEventItem(100).getEvent();
- } catch (EventQueueClosedException e) {
- fail("Queue should not be closed");
- return;
- }
- assertEquals(testEvent, dequeuedEvent);
+ // Use callback to wait for event processing
+ EventQueueItem item = dequeueEventWithRetry(queue, () -> queueManager.onReplicatedEvent(replicatedEvent));
+ assertNotNull(item, "Event should be available in queue");
+ Event dequeuedEvent = item.getEvent();
+ assertEquals(eventForTask, dequeuedEvent);
}
@Test
void testReplicatedEventCreatesQueueIfNeeded() throws InterruptedException {
String taskId = "non-existent-task";
+ TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
// Verify no queue exists initially
assertNull(queueManager.get(taskId));
- ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
-
- // Process the replicated event
- assertDoesNotThrow(() -> queueManager.onReplicatedEvent(replicatedEvent));
-
- // Verify that a queue was created and the event was enqueued
- EventQueue queue = queueManager.get(taskId);
- assertNotNull(queue, "Queue should be created when processing replicated event for non-existent task");
-
- // Verify the event was enqueued by dequeuing it
- Event dequeuedEvent;
- try {
- dequeuedEvent = queue.dequeueEventItem(100).getEvent();
- } catch (EventQueueClosedException e) {
- fail("Queue should not be closed");
- return;
- }
- assertEquals(testEvent, dequeuedEvent, "The replicated event should be enqueued in the newly created queue");
+ // Create a ChildQueue BEFORE processing the replicated event
+ // This ensures the ChildQueue exists when MainEventBusProcessor distributes the event
+ EventQueue childQueue = queueManager.createOrTap(taskId);
+ assertNotNull(childQueue, "ChildQueue should be created");
+
+ // Verify MainQueue was created
+ EventQueue mainQueue = queueManager.get(taskId);
+ assertNotNull(mainQueue, "MainQueue should exist after createOrTap");
+
+ ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, eventForTask);
+
+ // Process the replicated event and wait for distribution
+ // Use callback to wait for event processing
+ EventQueueItem item = dequeueEventWithRetry(childQueue, () -> {
+ assertDoesNotThrow(() -> queueManager.onReplicatedEvent(replicatedEvent));
+ });
+ assertNotNull(item, "Event should be available in queue");
+ Event dequeuedEvent = item.getEvent();
+ assertEquals(eventForTask, dequeuedEvent, "The replicated event should be enqueued in the newly created queue");
}
@Test
@@ -170,7 +244,7 @@ void testBasicQueueManagerFunctionality() throws InterruptedException {
void testQueueToTaskIdMappingMaintained() throws InterruptedException {
String taskId = "test-task-6";
CountingReplicationStrategy countingStrategy = new CountingReplicationStrategy();
- queueManager = new ReplicatedQueueManager(countingStrategy, new MockTaskStateProvider(true));
+ queueManager = new ReplicatedQueueManager(countingStrategy, new MockTaskStateProvider(true), mainEventBus);
EventQueue queue = queueManager.createOrTap(taskId);
queue.enqueueEvent(testEvent);
@@ -217,7 +291,7 @@ void testReplicatedEventJsonSerialization() throws Exception {
@Test
void testParallelReplicationBehavior() throws InterruptedException {
CountingReplicationStrategy strategy = new CountingReplicationStrategy();
- queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true));
+ queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true), mainEventBus);
String taskId = "parallel-test-task";
EventQueue queue = queueManager.createOrTap(taskId);
@@ -297,7 +371,7 @@ void testParallelReplicationBehavior() throws InterruptedException {
void testReplicatedEventSkippedWhenTaskInactive() throws InterruptedException {
// Create a task state provider that returns false (task is inactive)
MockTaskStateProvider stateProvider = new MockTaskStateProvider(false);
- queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider);
+ queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider, mainEventBus);
String taskId = "inactive-task";
@@ -316,30 +390,32 @@ void testReplicatedEventSkippedWhenTaskInactive() throws InterruptedException {
void testReplicatedEventProcessedWhenTaskActive() throws InterruptedException {
// Create a task state provider that returns true (task is active)
MockTaskStateProvider stateProvider = new MockTaskStateProvider(true);
- queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider);
+ queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider, mainEventBus);
String taskId = "active-task";
+ TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
// Verify no queue exists initially
assertNull(queueManager.get(taskId));
- // Process a replicated event for an active task
- ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
- queueManager.onReplicatedEvent(replicatedEvent);
+ // Create a ChildQueue BEFORE processing the replicated event
+ // This ensures the ChildQueue exists when MainEventBusProcessor distributes the event
+ EventQueue childQueue = queueManager.createOrTap(taskId);
+ assertNotNull(childQueue, "ChildQueue should be created");
- // Queue should be created and event should be enqueued
- EventQueue queue = queueManager.get(taskId);
- assertNotNull(queue, "Queue should be created for active task");
+ // Verify MainQueue was created
+ EventQueue mainQueue = queueManager.get(taskId);
+ assertNotNull(mainQueue, "MainQueue should exist after createOrTap");
- // Verify the event was enqueued
- Event dequeuedEvent;
- try {
- dequeuedEvent = queue.dequeueEventItem(100).getEvent();
- } catch (EventQueueClosedException e) {
- fail("Queue should not be closed");
- return;
- }
- assertEquals(testEvent, dequeuedEvent, "Event should be enqueued for active task");
+ // Process a replicated event for an active task
+ ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, eventForTask);
+
+ // Verify the event was enqueued and distributed to our ChildQueue
+ // Use callback to wait for event processing
+ EventQueueItem item = dequeueEventWithRetry(childQueue, () -> queueManager.onReplicatedEvent(replicatedEvent));
+ assertNotNull(item, "Event should be available in queue");
+ Event dequeuedEvent = item.getEvent();
+ assertEquals(eventForTask, dequeuedEvent, "Event should be enqueued for active task");
}
@@ -347,7 +423,7 @@ void testReplicatedEventProcessedWhenTaskActive() throws InterruptedException {
void testReplicatedEventToExistingQueueWhenTaskBecomesInactive() throws InterruptedException {
// Create a task state provider that returns true initially
MockTaskStateProvider stateProvider = new MockTaskStateProvider(true);
- queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider);
+ queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider, mainEventBus);
String taskId = "task-becomes-inactive";
@@ -387,7 +463,7 @@ void testReplicatedEventToExistingQueueWhenTaskBecomesInactive() throws Interrup
@Test
void testPoisonPillSentViaTransactionAwareEvent() throws InterruptedException {
CountingReplicationStrategy strategy = new CountingReplicationStrategy();
- queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true));
+ queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true), mainEventBus);
String taskId = "poison-pill-test";
EventQueue queue = queueManager.createOrTap(taskId);
@@ -451,36 +527,21 @@ void testQueueClosedEventJsonSerialization() throws Exception {
@Test
void testReplicatedQueueClosedEventTerminatesConsumer() throws InterruptedException {
String taskId = "remote-close-test";
+ TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
EventQueue queue = queueManager.createOrTap(taskId);
- // Enqueue a normal event
- queue.enqueueEvent(testEvent);
-
// Simulate receiving QueueClosedEvent from remote node
QueueClosedEvent closedEvent = new QueueClosedEvent(taskId);
ReplicatedEventQueueItem replicatedClosedEvent = new ReplicatedEventQueueItem(taskId, closedEvent);
- queueManager.onReplicatedEvent(replicatedClosedEvent);
- // Dequeue the normal event first
- EventQueueItem item1;
- try {
- item1 = queue.dequeueEventItem(100);
- } catch (EventQueueClosedException e) {
- fail("Should not throw on first dequeue");
- return;
- }
- assertNotNull(item1);
- assertEquals(testEvent, item1.getEvent());
+ // Dequeue the normal event first (use callback to wait for async processing)
+ EventQueueItem item1 = dequeueEventWithRetry(queue, () -> queue.enqueueEvent(eventForTask));
+ assertNotNull(item1, "First event should be available");
+ assertEquals(eventForTask, item1.getEvent());
- // Next dequeue should get the QueueClosedEvent
- EventQueueItem item2;
- try {
- item2 = queue.dequeueEventItem(100);
- } catch (EventQueueClosedException e) {
- fail("Should not throw on second dequeue, should return the event");
- return;
- }
- assertNotNull(item2);
+ // Next dequeue should get the QueueClosedEvent (use callback to wait for async processing)
+ EventQueueItem item2 = dequeueEventWithRetry(queue, () -> queueManager.onReplicatedEvent(replicatedClosedEvent));
+ assertNotNull(item2, "QueueClosedEvent should be available");
assertTrue(item2.getEvent() instanceof QueueClosedEvent,
"Second event should be QueueClosedEvent");
}
@@ -539,4 +600,25 @@ public void setActive(boolean active) {
this.active = active;
}
}
+
+ /**
+ * Helper method to dequeue an event after waiting for MainEventBusProcessor distribution.
+ * Uses callback-based waiting instead of polling for deterministic synchronization.
+ *
+ * @param queue the queue to dequeue from
+ * @param enqueueAction the action that enqueues the event (triggers event processing)
+ * @return the dequeued EventQueueItem, or null if queue is closed
+ */
+ private EventQueueItem dequeueEventWithRetry(EventQueue queue, Runnable enqueueAction) throws InterruptedException {
+ // Wait for event to be processed and distributed
+ waitForEventProcessing(enqueueAction);
+
+ // Event is now available, dequeue directly
+ try {
+ return queue.dequeueEventItem(100);
+ } catch (EventQueueClosedException e) {
+ // Queue closed, return null
+ return null;
+ }
+ }
}
\ No newline at end of file
diff --git a/extras/queue-manager-replicated/core/src/test/java/io/a2a/server/events/EventQueueUtil.java b/extras/queue-manager-replicated/core/src/test/java/io/a2a/server/events/EventQueueUtil.java
new file mode 100644
index 000000000..a91575aaa
--- /dev/null
+++ b/extras/queue-manager-replicated/core/src/test/java/io/a2a/server/events/EventQueueUtil.java
@@ -0,0 +1,11 @@
+package io.a2a.server.events;
+
+public class EventQueueUtil {
+ public static void start(MainEventBusProcessor processor) {
+ processor.start();
+ }
+
+ public static void stop(MainEventBusProcessor processor) {
+ processor.stop();
+ }
+}
diff --git a/reference/jsonrpc/src/test/resources/application.properties b/reference/jsonrpc/src/test/resources/application.properties
index 7b9cea9cc..e612925d4 100644
--- a/reference/jsonrpc/src/test/resources/application.properties
+++ b/reference/jsonrpc/src/test/resources/application.properties
@@ -1 +1,6 @@
quarkus.arc.selected-alternatives=io.a2a.server.apps.common.TestHttpClient
+
+# Debug logging for event processing and request handling
+quarkus.log.category."io.a2a.server.events".level=DEBUG
+quarkus.log.category."io.a2a.server.requesthandlers".level=DEBUG
+quarkus.log.category."io.a2a.server.tasks".level=DEBUG
diff --git a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java
index ccf9f0ce9..2d3d49fc8 100644
--- a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java
+++ b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java
@@ -58,13 +58,20 @@ public Flow.Publisher consumeAll() {
EventQueueItem item;
Event event;
try {
+ LOGGER.debug("EventConsumer polling queue {} (error={})", System.identityHashCode(queue), error);
item = queue.dequeueEventItem(QUEUE_WAIT_MILLISECONDS);
if (item == null) {
+ LOGGER.debug("EventConsumer poll timeout (null item), continuing");
continue;
}
event = item.getEvent();
+ LOGGER.debug("EventConsumer received event: {} (queue={})",
+ event.getClass().getSimpleName(), System.identityHashCode(queue));
+ // Defensive logging for error handling
if (event instanceof Throwable thr) {
+ LOGGER.info("EventConsumer detected Throwable event: {} - triggering tube.fail()",
+ thr.getClass().getSimpleName());
tube.fail(thr);
return;
}
@@ -121,8 +128,13 @@ public Flow.Publisher consumeAll() {
public EnhancedRunnable.DoneCallback createAgentRunnableDoneCallback() {
return agentRunnable -> {
+ LOGGER.info("EventConsumer: Agent done callback invoked (hasError={}, queue={})",
+ agentRunnable.getError() != null, System.identityHashCode(queue));
if (agentRunnable.getError() != null) {
error = agentRunnable.getError();
+ LOGGER.info("EventConsumer: Set error field from agent callback");
+ } else {
+ LOGGER.info("EventConsumer: Agent completed successfully (no error), continuing consumption");
}
};
}
diff --git a/server-common/src/main/java/io/a2a/server/events/EventQueue.java b/server-common/src/main/java/io/a2a/server/events/EventQueue.java
index 6a8a154ac..c73b26e22 100644
--- a/server-common/src/main/java/io/a2a/server/events/EventQueue.java
+++ b/server-common/src/main/java/io/a2a/server/events/EventQueue.java
@@ -21,8 +21,6 @@ public abstract class EventQueue implements AutoCloseable {
public static final int DEFAULT_QUEUE_SIZE = 1000;
private final int queueSize;
- protected final BlockingQueue queue = new LinkedBlockingDeque<>();
- protected final Semaphore semaphore;
private volatile boolean closed = false;
protected EventQueue() {
@@ -34,7 +32,6 @@ protected EventQueue(int queueSize) {
throw new IllegalArgumentException("Queue size must be greater than 0");
}
this.queueSize = queueSize;
- this.semaphore = new Semaphore(queueSize, true);
LOGGER.trace("Creating {} with queue size: {}", this, queueSize);
}
@@ -43,8 +40,8 @@ protected EventQueue(EventQueue parent) {
LOGGER.trace("Creating {}, parent: {}", this, parent);
}
- static EventQueueBuilder builder() {
- return new EventQueueBuilder();
+ static EventQueueBuilder builder(MainEventBus mainEventBus) {
+ return new EventQueueBuilder().mainEventBus(mainEventBus);
}
public static class EventQueueBuilder {
@@ -53,6 +50,7 @@ public static class EventQueueBuilder {
private String taskId;
private List onCloseCallbacks = new java.util.ArrayList<>();
private TaskStateProvider taskStateProvider;
+ private MainEventBus mainEventBus;
public EventQueueBuilder queueSize(int queueSize) {
this.queueSize = queueSize;
@@ -81,12 +79,17 @@ public EventQueueBuilder taskStateProvider(TaskStateProvider taskStateProvider)
return this;
}
+ public EventQueueBuilder mainEventBus(MainEventBus mainEventBus) {
+ this.mainEventBus = mainEventBus;
+ return this;
+ }
+
public EventQueue build() {
- if (hook != null || !onCloseCallbacks.isEmpty() || taskStateProvider != null) {
- return new MainQueue(queueSize, hook, taskId, onCloseCallbacks, taskStateProvider);
- } else {
- return new MainQueue(queueSize);
+ // MainEventBus is now REQUIRED - enforce single architectural path
+ if (mainEventBus == null) {
+ throw new IllegalStateException("MainEventBus is required for EventQueue creation");
}
+ return new MainQueue(queueSize, hook, taskId, onCloseCallbacks, taskStateProvider, mainEventBus);
}
}
@@ -102,22 +105,7 @@ public void enqueueEvent(Event event) {
enqueueItem(new LocalEventQueueItem(event));
}
- public void enqueueItem(EventQueueItem item) {
- Event event = item.getEvent();
- if (closed) {
- LOGGER.warn("Queue is closed. Event will not be enqueued. {} {}", this, event);
- return;
- }
- // Call toString() since for errors we don't really want the full stacktrace
- try {
- semaphore.acquire();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Unable to acquire the semaphore to enqueue the event", e);
- }
- queue.add(item);
- LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this);
- }
+ public abstract void enqueueItem(EventQueueItem item);
public abstract EventQueue tap();
@@ -127,53 +115,32 @@ public void enqueueItem(EventQueueItem item) {
* This method returns the full EventQueueItem wrapper, allowing callers to check
* metadata like whether the event is replicated via {@link EventQueueItem#isReplicated()}.
*
+ *
+ * Note: MainQueue does not support dequeue operations - only ChildQueues can be consumed.
+ *
*
* @param waitMilliSeconds the maximum time to wait in milliseconds
* @return the EventQueueItem, or null if timeout occurs
* @throws EventQueueClosedException if the queue is closed and empty
+ * @throws UnsupportedOperationException if called on MainQueue
*/
- public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueClosedException {
- if (closed && queue.isEmpty()) {
- LOGGER.debug("Queue is closed, and empty. Sending termination message. {}", this);
- throw new EventQueueClosedException();
- }
- try {
- if (waitMilliSeconds <= 0) {
- EventQueueItem item = queue.poll();
- if (item != null) {
- Event event = item.getEvent();
- // Call toString() since for errors we don't really want the full stacktrace
- LOGGER.debug("Dequeued event item (no wait) {} {}", this, event instanceof Throwable ? event.toString() : event);
- semaphore.release();
- }
- return item;
- }
- try {
- LOGGER.trace("Polling queue {} (wait={}ms)", System.identityHashCode(this), waitMilliSeconds);
- EventQueueItem item = queue.poll(waitMilliSeconds, TimeUnit.MILLISECONDS);
- if (item != null) {
- Event event = item.getEvent();
- // Call toString() since for errors we don't really want the full stacktrace
- LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event);
- semaphore.release();
- } else {
- LOGGER.trace("Dequeue timeout (null) from queue {}", System.identityHashCode(this));
- }
- return item;
- } catch (InterruptedException e) {
- LOGGER.debug("Interrupted dequeue (waiting) {}", this);
- Thread.currentThread().interrupt();
- return null;
- }
- } finally {
- signalQueuePollerStarted();
- }
- }
+ public abstract EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueClosedException;
public void taskDone() {
// TODO Not sure if needed yet. BlockingQueue.poll()/.take() remove the events.
}
+ /**
+ * Returns the current size of the queue.
+ *
+ * For MainQueue: returns the size of the MainEventBus queue (events pending persistence/distribution).
+ * For ChildQueue: returns the size of the local consumption queue.
+ *
+ *
+ * @return the number of events currently in the queue
+ */
+ public abstract int size();
+
public abstract void close();
public abstract void close(boolean immediate);
@@ -205,63 +172,29 @@ protected void doClose(boolean immediate) {
LOGGER.debug("Closing {} (immediate={})", this, immediate);
closed = true;
}
-
- if (immediate) {
- // Immediate close: clear pending events
- queue.clear();
- LOGGER.debug("Cleared queue for immediate close: {}", this);
- }
- // For graceful close, let the queue drain naturally through normal consumption
+ // Subclasses handle immediate close logic (e.g., ChildQueue clears its local queue)
}
static class MainQueue extends EventQueue {
private final List children = new CopyOnWriteArrayList<>();
+ protected final Semaphore semaphore;
private final CountDownLatch pollingStartedLatch = new CountDownLatch(1);
private final AtomicBoolean pollingStarted = new AtomicBoolean(false);
private final EventEnqueueHook enqueueHook;
private final String taskId;
private final List onCloseCallbacks;
private final TaskStateProvider taskStateProvider;
+ private final MainEventBus mainEventBus;
- MainQueue() {
- super();
- this.enqueueHook = null;
- this.taskId = null;
- this.onCloseCallbacks = List.of();
- this.taskStateProvider = null;
- }
-
- MainQueue(int queueSize) {
- super(queueSize);
- this.enqueueHook = null;
- this.taskId = null;
- this.onCloseCallbacks = List.of();
- this.taskStateProvider = null;
- }
-
- MainQueue(EventEnqueueHook hook) {
- super();
- this.enqueueHook = hook;
- this.taskId = null;
- this.onCloseCallbacks = List.of();
- this.taskStateProvider = null;
- }
-
- MainQueue(int queueSize, EventEnqueueHook hook) {
- super(queueSize);
- this.enqueueHook = hook;
- this.taskId = null;
- this.onCloseCallbacks = List.of();
- this.taskStateProvider = null;
- }
-
- MainQueue(int queueSize, EventEnqueueHook hook, String taskId, List onCloseCallbacks, TaskStateProvider taskStateProvider) {
+ MainQueue(int queueSize, EventEnqueueHook hook, String taskId, List onCloseCallbacks, TaskStateProvider taskStateProvider, MainEventBus mainEventBus) {
super(queueSize);
+ this.semaphore = new Semaphore(queueSize, true);
this.enqueueHook = hook;
this.taskId = taskId;
this.onCloseCallbacks = List.copyOf(onCloseCallbacks); // Defensive copy
this.taskStateProvider = taskStateProvider;
- LOGGER.debug("Created MainQueue for task {} with {} onClose callbacks and TaskStateProvider: {}",
+ this.mainEventBus = java.util.Objects.requireNonNull(mainEventBus, "MainEventBus is required");
+ LOGGER.debug("Created MainQueue for task {} with {} onClose callbacks, TaskStateProvider: {}, MainEventBus configured",
taskId, onCloseCallbacks.size(), taskStateProvider != null);
}
@@ -271,6 +204,25 @@ public EventQueue tap() {
return child;
}
+ /**
+ * Returns the current number of child queues.
+ * Useful for debugging and logging event distribution.
+ */
+ public int getChildCount() {
+ return children.size();
+ }
+
+ @Override
+ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueClosedException {
+ throw new UnsupportedOperationException("MainQueue cannot be consumed directly - use tap() to create a ChildQueue for consumption");
+ }
+
+ @Override
+ public int size() {
+ // Return size of MainEventBus queue (events pending persistence/distribution)
+ return mainEventBus.size();
+ }
+
@Override
public void enqueueItem(EventQueueItem item) {
// MainQueue must accept events even when closed to support:
@@ -289,14 +241,13 @@ public void enqueueItem(EventQueueItem item) {
throw new RuntimeException("Unable to acquire the semaphore to enqueue the event", e);
}
- // Add to this MainQueue's internal queue
- queue.add(item);
LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this);
- // Distribute to all ChildQueues (they will receive the event even if MainQueue is closed)
- children.forEach(eq -> eq.internalEnqueueItem(item));
+ // Submit to MainEventBus for centralized persistence + distribution
+ // MainEventBus is guaranteed non-null by constructor requirement
+ mainEventBus.submit(taskId, this, item);
- // Trigger replication hook if configured
+ // Trigger replication hook if configured (for inter-process replication)
if (enqueueHook != null) {
enqueueHook.onEnqueue(item);
}
@@ -350,6 +301,36 @@ void childClosing(ChildQueue child, boolean immediate) {
this.doClose(immediate);
}
+ /**
+ * Distribute event to all ChildQueues.
+ * Called by MainEventBusProcessor after TaskStore persistence.
+ */
+ void distributeToChildren(EventQueueItem item) {
+ int childCount = children.size();
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("MainQueue[{}]: Distributing event {} to {} children",
+ taskId, item.getEvent().getClass().getSimpleName(), childCount);
+ }
+ children.forEach(child -> {
+ LOGGER.debug("MainQueue[{}]: Enqueueing event {} to child queue",
+ taskId, item.getEvent().getClass().getSimpleName());
+ child.internalEnqueueItem(item);
+ });
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("MainQueue[{}]: Completed distribution of {} to {} children",
+ taskId, item.getEvent().getClass().getSimpleName(), childCount);
+ }
+ }
+
+ /**
+ * Release the semaphore after event processing is complete.
+ * Called by MainEventBusProcessor in finally block to ensure release even on exceptions.
+ * Balances the acquire() in enqueueEvent() - protects MainEventBus throughput.
+ */
+ void releaseSemaphore() {
+ semaphore.release();
+ }
+
/**
* Get the count of active child queues.
* Used for testing to verify reference counting mechanism.
@@ -400,6 +381,7 @@ public void close(boolean immediate, boolean notifyParent) {
static class ChildQueue extends EventQueue {
private final MainQueue parent;
+ private final BlockingQueue queue = new LinkedBlockingDeque<>();
public ChildQueue(MainQueue parent) {
this.parent = parent;
@@ -410,8 +392,61 @@ public void enqueueEvent(Event event) {
parent.enqueueEvent(event);
}
+ @Override
+ public void enqueueItem(EventQueueItem item) {
+ // ChildQueue delegates writes to parent MainQueue
+ parent.enqueueItem(item);
+ }
+
private void internalEnqueueItem(EventQueueItem item) {
- super.enqueueItem(item);
+ // Internal method called by MainEventBusProcessor to add to local queue
+ // Note: Semaphore is managed by parent MainQueue (acquire/release), not ChildQueue
+ Event event = item.getEvent();
+ if (isClosed()) {
+ LOGGER.warn("ChildQueue is closed. Event will not be enqueued. {} {}", this, event);
+ return;
+ }
+ if (!queue.offer(item)) {
+ LOGGER.warn("ChildQueue {} is full. Closing immediately.", this);
+ close(true); // immediate close
+ } else {
+ LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this);
+ }
+ }
+
+ @Override
+ public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueClosedException {
+ if (isClosed() && queue.isEmpty()) {
+ LOGGER.debug("ChildQueue is closed, and empty. Sending termination message. {}", this);
+ throw new EventQueueClosedException();
+ }
+ try {
+ if (waitMilliSeconds <= 0) {
+ EventQueueItem item = queue.poll();
+ if (item != null) {
+ Event event = item.getEvent();
+ LOGGER.debug("Dequeued event item (no wait) {} {}", this, event instanceof Throwable ? event.toString() : event);
+ }
+ return item;
+ }
+ try {
+ LOGGER.trace("Polling ChildQueue {} (wait={}ms)", System.identityHashCode(this), waitMilliSeconds);
+ EventQueueItem item = queue.poll(waitMilliSeconds, TimeUnit.MILLISECONDS);
+ if (item != null) {
+ Event event = item.getEvent();
+ LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event);
+ } else {
+ LOGGER.trace("Dequeue timeout (null) from ChildQueue {}", System.identityHashCode(this));
+ }
+ return item;
+ } catch (InterruptedException e) {
+ LOGGER.debug("Interrupted dequeue (waiting) {}", this);
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ } finally {
+ signalQueuePollerStarted();
+ }
}
@Override
@@ -419,6 +454,12 @@ public EventQueue tap() {
throw new IllegalStateException("Can only tap the main queue");
}
+ @Override
+ public int size() {
+ // Return size of local consumption queue
+ return queue.size();
+ }
+
@Override
public void awaitQueuePollerStart() throws InterruptedException {
parent.awaitQueuePollerStart();
@@ -429,6 +470,18 @@ public void signalQueuePollerStarted() {
parent.signalQueuePollerStarted();
}
+ @Override
+ protected void doClose(boolean immediate) {
+ super.doClose(immediate); // Sets closed flag
+ if (immediate) {
+ // Immediate close: clear pending events from local queue
+ int clearedCount = queue.size();
+ queue.clear();
+ LOGGER.debug("Cleared {} events from ChildQueue for immediate close: {}", clearedCount, this);
+ }
+ // For graceful close, let the queue drain naturally through normal consumption
+ }
+
@Override
public void close() {
close(false);
diff --git a/server-common/src/main/java/io/a2a/server/events/InMemoryQueueManager.java b/server-common/src/main/java/io/a2a/server/events/InMemoryQueueManager.java
index 1383d058e..4360a6210 100644
--- a/server-common/src/main/java/io/a2a/server/events/InMemoryQueueManager.java
+++ b/server-common/src/main/java/io/a2a/server/events/InMemoryQueueManager.java
@@ -18,16 +18,20 @@ public class InMemoryQueueManager implements QueueManager {
private final EventQueueFactory factory;
private final TaskStateProvider taskStateProvider;
+ MainEventBus mainEventBus;
+
@Inject
- public InMemoryQueueManager(TaskStateProvider taskStateProvider) {
+ public InMemoryQueueManager(TaskStateProvider taskStateProvider, MainEventBus mainEventBus) {
+ this.mainEventBus = mainEventBus;
this.factory = new DefaultEventQueueFactory();
this.taskStateProvider = taskStateProvider;
}
- // For testing with custom factory
- public InMemoryQueueManager(EventQueueFactory factory, TaskStateProvider taskStateProvider) {
+ // For testing/extensions with custom factory and MainEventBus
+ public InMemoryQueueManager(EventQueueFactory factory, TaskStateProvider taskStateProvider, MainEventBus mainEventBus) {
this.factory = factory;
this.taskStateProvider = taskStateProvider;
+ this.mainEventBus = mainEventBus;
}
@Override
@@ -109,6 +113,12 @@ public void awaitQueuePollerStart(EventQueue eventQueue) throws InterruptedExcep
eventQueue.awaitQueuePollerStart();
}
+ @Override
+ public EventQueue.EventQueueBuilder getEventQueueBuilder(String taskId) {
+ // Use the factory to ensure proper configuration (MainEventBus, callbacks, etc.)
+ return factory.builder(taskId);
+ }
+
@Override
public int getActiveChildQueueCount(String taskId) {
EventQueue queue = queues.get(taskId);
@@ -123,6 +133,14 @@ public int getActiveChildQueueCount(String taskId) {
return -1;
}
+ @Override
+ public EventQueue.EventQueueBuilder createBaseEventQueueBuilder(String taskId) {
+ return EventQueue.builder(mainEventBus)
+ .taskId(taskId)
+ .addOnCloseCallback(getCleanupCallback(taskId))
+ .taskStateProvider(taskStateProvider);
+ }
+
/**
* Get the cleanup callback that removes a queue from the map when it closes.
* This is exposed so that subclasses (like ReplicatedQueueManager) can reuse
@@ -162,11 +180,8 @@ public Runnable getCleanupCallback(String taskId) {
private class DefaultEventQueueFactory implements EventQueueFactory {
@Override
public EventQueue.EventQueueBuilder builder(String taskId) {
- // Return builder with callback that removes queue from map when closed
- return EventQueue.builder()
- .taskId(taskId)
- .addOnCloseCallback(getCleanupCallback(taskId))
- .taskStateProvider(taskStateProvider);
+ // Delegate to the base builder creation method
+ return createBaseEventQueueBuilder(taskId);
}
}
}
diff --git a/server-common/src/main/java/io/a2a/server/events/MainEventBus.java b/server-common/src/main/java/io/a2a/server/events/MainEventBus.java
new file mode 100644
index 000000000..73500254e
--- /dev/null
+++ b/server-common/src/main/java/io/a2a/server/events/MainEventBus.java
@@ -0,0 +1,42 @@
+package io.a2a.server.events;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class MainEventBus {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MainEventBus.class);
+ private final BlockingQueue queue;
+
+ public MainEventBus() {
+ this.queue = new LinkedBlockingDeque<>();
+ }
+
+ public void submit(String taskId, EventQueue eventQueue, EventQueueItem item) {
+ try {
+ queue.put(new MainEventBusContext(taskId, eventQueue, item));
+ LOGGER.debug("Submitted event for task {} to MainEventBus (queue size: {})",
+ taskId, queue.size());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted submitting to MainEventBus", e);
+ }
+ }
+
+ public MainEventBusContext take() throws InterruptedException {
+ LOGGER.debug("MainEventBus: Waiting to take event (current queue size: {})...", queue.size());
+ MainEventBusContext context = queue.take();
+ LOGGER.debug("MainEventBus: Took event for task {} (remaining queue size: {})",
+ context.taskId(), queue.size());
+ return context;
+ }
+
+ public int size() {
+ return queue.size();
+ }
+}
diff --git a/server-common/src/main/java/io/a2a/server/events/MainEventBusContext.java b/server-common/src/main/java/io/a2a/server/events/MainEventBusContext.java
new file mode 100644
index 000000000..f8e5e03ec
--- /dev/null
+++ b/server-common/src/main/java/io/a2a/server/events/MainEventBusContext.java
@@ -0,0 +1,11 @@
+package io.a2a.server.events;
+
+import java.util.Objects;
+
+record MainEventBusContext(String taskId, EventQueue eventQueue, EventQueueItem eventQueueItem) {
+ MainEventBusContext {
+ Objects.requireNonNull(taskId, "taskId cannot be null");
+ Objects.requireNonNull(eventQueue, "eventQueue cannot be null");
+ Objects.requireNonNull(eventQueueItem, "eventQueueItem cannot be null");
+ }
+}
diff --git a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
new file mode 100644
index 000000000..c04578a8b
--- /dev/null
+++ b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java
@@ -0,0 +1,304 @@
+package io.a2a.server.events;
+
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+
+import io.a2a.server.tasks.PushNotificationSender;
+import io.a2a.server.tasks.TaskManager;
+import io.a2a.server.tasks.TaskStore;
+import io.a2a.spec.A2AServerException;
+import io.a2a.spec.Event;
+import io.a2a.spec.InternalError;
+import io.a2a.spec.Task;
+import io.a2a.spec.TaskArtifactUpdateEvent;
+import io.a2a.spec.TaskStatusUpdateEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Background processor for the MainEventBus.
+ *
+ * This processor runs in a dedicated background thread, consuming events from the MainEventBus
+ * and performing two critical operations in order:
+ *
+ *
+ * - Update TaskStore with event data (persistence FIRST)
+ * - Distribute event to ChildQueues (clients see it AFTER persistence)
+ *
+ *
+ * This architecture ensures clients never receive events before they're persisted,
+ * eliminating race conditions and enabling reliable event replay.
+ *
+ *
+ * Note: This bean is eagerly initialized by {@link MainEventBusProcessorInitializer}
+ * to ensure the background thread starts automatically when the application starts.
+ *
+ */
+@ApplicationScoped
+public class MainEventBusProcessor implements Runnable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MainEventBusProcessor.class);
+
+ /**
+ * Callback for testing synchronization with async event processing.
+ * Default is NOOP to avoid null checks in production code.
+ * Tests can inject their own callback via setCallback().
+ */
+ private volatile MainEventBusProcessorCallback callback = MainEventBusProcessorCallback.NOOP;
+
+ private final MainEventBus eventBus;
+
+ private final TaskStore taskStore;
+
+ private final PushNotificationSender pushSender;
+
+ private volatile boolean running = true;
+ private Thread processorThread;
+
+ @Inject
+ public MainEventBusProcessor(MainEventBus eventBus, TaskStore taskStore, PushNotificationSender pushSender) {
+ this.eventBus = eventBus;
+ this.taskStore = taskStore;
+ this.pushSender = pushSender;
+ }
+
+ /**
+ * Set a callback for testing synchronization with async event processing.
+ *
+ * This is primarily intended for tests that need to wait for event processing to complete.
+ * Pass null to reset to the default NOOP callback.
+ *
+ *
+ * @param callback the callback to invoke during event processing, or null for NOOP
+ */
+ public void setCallback(MainEventBusProcessorCallback callback) {
+ this.callback = callback != null ? callback : MainEventBusProcessorCallback.NOOP;
+ }
+
+ @PostConstruct
+ void start() {
+ processorThread = new Thread(this, "MainEventBusProcessor");
+ processorThread.setDaemon(true); // Allow JVM to exit even if this thread is running
+ processorThread.start();
+ LOGGER.info("MainEventBusProcessor started");
+ }
+
+ /**
+ * No-op method to force CDI proxy resolution and ensure @PostConstruct has been called.
+ * Called by MainEventBusProcessorInitializer during application startup.
+ */
+ public void ensureStarted() {
+ // Method intentionally empty - just forces proxy resolution
+ }
+
+ @PreDestroy
+ void stop() {
+ LOGGER.info("MainEventBusProcessor stopping...");
+ running = false;
+ if (processorThread != null) {
+ processorThread.interrupt();
+ try {
+ long start = System.currentTimeMillis();
+ processorThread.join(5000); // Wait up to 5 seconds
+ long elapsed = System.currentTimeMillis() - start;
+ LOGGER.info("MainEventBusProcessor thread stopped in {}ms", elapsed);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Interrupted while waiting for MainEventBusProcessor thread to stop");
+ }
+ }
+ LOGGER.info("MainEventBusProcessor stopped");
+ }
+
+ @Override
+ public void run() {
+ LOGGER.info("MainEventBusProcessor processing loop started");
+ while (running) {
+ try {
+ LOGGER.debug("MainEventBusProcessor: Waiting for event from MainEventBus...");
+ MainEventBusContext context = eventBus.take();
+ LOGGER.debug("MainEventBusProcessor: Retrieved event for task {} from MainEventBus",
+ context.taskId());
+ processEvent(context);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.info("MainEventBusProcessor interrupted, shutting down");
+ break;
+ } catch (Exception e) {
+ LOGGER.error("Error processing event from MainEventBus", e);
+ // Continue processing despite errors
+ }
+ }
+ LOGGER.info("MainEventBusProcessor processing loop ended");
+ }
+
+ private void processEvent(MainEventBusContext context) {
+ String taskId = context.taskId();
+ Event event = context.eventQueueItem().getEvent();
+ EventQueue eventQueue = context.eventQueue();
+
+ LOGGER.debug("MainEventBusProcessor: Processing event for task {}: {} (queue type: {})",
+ taskId, event.getClass().getSimpleName(), eventQueue.getClass().getSimpleName());
+
+ Event eventToDistribute = null;
+ try {
+ // Step 1: Update TaskStore FIRST (persistence before clients see it)
+ // If this throws, we distribute an error to ensure "persist before client visibility"
+
+ try {
+ updateTaskStore(taskId, event);
+ eventToDistribute = event; // Success - distribute original event
+ } catch (InternalError e) {
+ // Persistence failed - create error event to distribute instead
+ LOGGER.error("Failed to persist event for task {}, distributing error to clients", taskId, e);
+ String errorMessage = "Failed to persist event: " + e.getMessage();
+ eventToDistribute = e;
+ } catch (Exception e) {
+ LOGGER.error("Failed to persist event for task {}, distributing error to clients", taskId, e);
+ String errorMessage = "Failed to persist event: " + e.getMessage();
+ eventToDistribute = new InternalError(errorMessage);
+ }
+
+ // Step 2: Send push notification AFTER successful persistence
+ if (eventToDistribute == event) {
+ sendPushNotification(taskId);
+ }
+
+ // Step 3: Then distribute to ChildQueues (clients see either event or error AFTER persistence attempt)
+ if (eventToDistribute == null) {
+ LOGGER.error("MainEventBusProcessor: eventToDistribute is NULL for task {} - this should never happen!", taskId);
+ eventToDistribute = new InternalError("Internal error: event processing failed");
+ }
+
+ if (eventQueue instanceof EventQueue.MainQueue mainQueue) {
+ int childCount = mainQueue.getChildCount();
+ LOGGER.debug("MainEventBusProcessor: Distributing {} to {} children for task {}",
+ eventToDistribute.getClass().getSimpleName(), childCount, taskId);
+ // Create new EventQueueItem with the event to distribute (original or error)
+ EventQueueItem itemToDistribute = new LocalEventQueueItem(eventToDistribute);
+ mainQueue.distributeToChildren(itemToDistribute);
+ LOGGER.debug("MainEventBusProcessor: Distributed {} to {} children for task {}",
+ eventToDistribute.getClass().getSimpleName(), childCount, taskId);
+ } else {
+ LOGGER.warn("MainEventBusProcessor: Expected MainQueue but got {} for task {}",
+ eventQueue.getClass().getSimpleName(), taskId);
+ }
+
+ LOGGER.debug("MainEventBusProcessor: Completed processing event for task {}", taskId);
+
+ } finally {
+ try {
+ // Step 4: Notify callback after all processing is complete
+ // Call callback with the distributed event (original or error)
+ if (eventToDistribute != null) {
+ callback.onEventProcessed(taskId, eventToDistribute);
+
+ // Step 5: If this is a final event, notify task finalization
+ // Only for successful persistence (not for errors)
+ if (eventToDistribute == event && isFinalEvent(event)) {
+ callback.onTaskFinalized(taskId);
+ }
+ }
+ } finally {
+ // ALWAYS release semaphore, even if processing fails
+ // Balances the acquire() in MainQueue.enqueueEvent()
+ if (eventQueue instanceof EventQueue.MainQueue mainQueue) {
+ mainQueue.releaseSemaphore();
+ }
+ }
+ }
+ }
+
+ /**
+ * Updates TaskStore using TaskManager.process().
+ *
+ * Creates a temporary TaskManager instance for this event and delegates to its process() method,
+ * which handles all event types (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent).
+ * This leverages existing TaskManager logic for status updates, artifact appending, message history, etc.
+ *
+ *
+ * If persistence fails, the exception is propagated to processEvent() which distributes an
+ * InternalError to clients instead of the original event, ensuring "persist before visibility".
+ * See Gemini's comment: https://github.com/a2aproject/a2a-java/pull/515#discussion_r2604621833
+ *
+ *
+ * @throws InternalError if persistence fails
+ */
+ private void updateTaskStore(String taskId, Event event) throws InternalError {
+ try {
+ // Extract contextId from event (all relevant events have it)
+ String contextId = extractContextId(event);
+
+ // Create temporary TaskManager instance for this event
+ TaskManager taskManager = new TaskManager(taskId, contextId, taskStore, null);
+
+ // Use TaskManager.process() - handles all event types with existing logic
+ taskManager.process(event);
+ LOGGER.debug("TaskStore updated via TaskManager.process() for task {}: {}",
+ taskId, event.getClass().getSimpleName());
+ } catch (InternalError e) {
+ LOGGER.error("Error updating TaskStore via TaskManager for task {}", taskId, e);
+ // Rethrow to prevent distributing unpersisted event to clients
+ throw e;
+ } catch (Exception e) {
+ LOGGER.error("Unexpected error updating TaskStore for task {}", taskId, e);
+ // Rethrow to prevent distributing unpersisted event to clients
+ throw new InternalError("TaskStore persistence failed: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Sends push notification for the task AFTER persistence.
+ *
+ * This is called after updateTaskStore() to ensure the notification contains
+ * the latest persisted state, avoiding race conditions.
+ *
+ */
+ private void sendPushNotification(String taskId) {
+ try {
+ Task task = taskStore.get(taskId);
+ if (task != null) {
+ LOGGER.debug("Sending push notification for task {}", taskId);
+ pushSender.sendNotification(task);
+ } else {
+ LOGGER.debug("Skipping push notification - task {} not found in TaskStore", taskId);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error sending push notification for task {}", taskId, e);
+ // Don't rethrow - we still want to distribute to ChildQueues
+ }
+ }
+
+ /**
+ * Extracts contextId from an event.
+ * Returns null if the event type doesn't have a contextId (e.g., Message).
+ */
+ private String extractContextId(Event event) {
+ if (event instanceof Task task) {
+ return task.getContextId();
+ } else if (event instanceof TaskStatusUpdateEvent statusUpdate) {
+ return statusUpdate.getContextId();
+ } else if (event instanceof TaskArtifactUpdateEvent artifactUpdate) {
+ return artifactUpdate.getContextId();
+ }
+ // Message and other events don't have contextId
+ return null;
+ }
+
+ /**
+ * Checks if an event represents a final task state.
+ *
+ * @param event the event to check
+ * @return true if the event represents a final state (COMPLETED, FAILED, CANCELED, REJECTED, UNKNOWN)
+ */
+ private boolean isFinalEvent(Event event) {
+ if (event instanceof Task task) {
+ return task.getStatus() != null && task.getStatus().state() != null
+ && task.getStatus().state().isFinal();
+ } else if (event instanceof TaskStatusUpdateEvent statusUpdate) {
+ return statusUpdate.isFinal();
+ }
+ return false;
+ }
+}
diff --git a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorCallback.java b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorCallback.java
new file mode 100644
index 000000000..b0a9adbce
--- /dev/null
+++ b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorCallback.java
@@ -0,0 +1,66 @@
+package io.a2a.server.events;
+
+import io.a2a.spec.Event;
+
+/**
+ * Callback interface for MainEventBusProcessor events.
+ *
+ * This interface is primarily intended for testing, allowing tests to synchronize
+ * with the asynchronous MainEventBusProcessor. Production code should not rely on this.
+ *
+ * Usage in tests:
+ *
+ * {@code
+ * @Inject
+ * MainEventBusProcessor processor;
+ *
+ * @BeforeEach
+ * void setUp() {
+ * CountDownLatch latch = new CountDownLatch(3);
+ * processor.setCallback(new MainEventBusProcessorCallback() {
+ * public void onEventProcessed(String taskId, Event event) {
+ * latch.countDown();
+ * }
+ * });
+ * }
+ *
+ * @AfterEach
+ * void tearDown() {
+ * processor.setCallback(null); // Reset to NOOP
+ * }
+ * }
+ *
+ */
+public interface MainEventBusProcessorCallback {
+
+ /**
+ * Called after an event has been fully processed (persisted, notification sent, distributed to children).
+ *
+ * @param taskId the task ID
+ * @param event the event that was processed
+ */
+ void onEventProcessed(String taskId, Event event);
+
+ /**
+ * Called when a task reaches a final state (COMPLETED, FAILED, CANCELED, REJECTED).
+ *
+ * @param taskId the task ID that was finalized
+ */
+ void onTaskFinalized(String taskId);
+
+ /**
+ * No-op implementation that does nothing.
+ * Used as the default callback to avoid null checks.
+ */
+ MainEventBusProcessorCallback NOOP = new MainEventBusProcessorCallback() {
+ @Override
+ public void onEventProcessed(String taskId, Event event) {
+ // No-op
+ }
+
+ @Override
+ public void onTaskFinalized(String taskId) {
+ // No-op
+ }
+ };
+}
diff --git a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorInitializer.java b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorInitializer.java
new file mode 100644
index 000000000..ba4b300be
--- /dev/null
+++ b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorInitializer.java
@@ -0,0 +1,43 @@
+package io.a2a.server.events;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.context.Initialized;
+import jakarta.enterprise.event.Observes;
+import jakarta.inject.Inject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Portable CDI initializer for MainEventBusProcessor.
+ *
+ * This bean observes the ApplicationScoped initialization event and injects
+ * MainEventBusProcessor, which triggers its eager creation and starts the background thread.
+ *
+ *
+ * This approach is portable across all Jakarta CDI implementations (Weld, OpenWebBeans, Quarkus, etc.)
+ * and ensures MainEventBusProcessor starts automatically when the application starts.
+ *
+ */
+@ApplicationScoped
+public class MainEventBusProcessorInitializer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MainEventBusProcessorInitializer.class);
+
+ @Inject
+ MainEventBusProcessor processor;
+
+ /**
+ * Observes ApplicationScoped initialization to force eager creation of MainEventBusProcessor.
+ * The injection of MainEventBusProcessor in this bean triggers its creation, and calling
+ * ensureStarted() forces the CDI proxy to be resolved, which ensures @PostConstruct has been
+ * called and the background thread is running.
+ */
+ void onStart(@Observes @Initialized(ApplicationScoped.class) Object event) {
+ if (processor != null) {
+ // Force proxy resolution to ensure @PostConstruct has been called
+ processor.ensureStarted();
+ LOGGER.info("MainEventBusProcessor initialized and started");
+ } else {
+ LOGGER.error("MainEventBusProcessor is null - initialization failed!");
+ }
+ }
+}
diff --git a/server-common/src/main/java/io/a2a/server/events/QueueManager.java b/server-common/src/main/java/io/a2a/server/events/QueueManager.java
index b0eb517ae..449d30f90 100644
--- a/server-common/src/main/java/io/a2a/server/events/QueueManager.java
+++ b/server-common/src/main/java/io/a2a/server/events/QueueManager.java
@@ -15,7 +15,31 @@ public interface QueueManager {
void awaitQueuePollerStart(EventQueue eventQueue) throws InterruptedException;
default EventQueue.EventQueueBuilder getEventQueueBuilder(String taskId) {
- return EventQueue.builder();
+ throw new UnsupportedOperationException(
+ "QueueManager implementations must override getEventQueueBuilder() to provide MainEventBus"
+ );
+ }
+
+ /**
+ * Creates a base EventQueueBuilder with standard configuration for this QueueManager.
+ * This method provides the foundation for creating event queues with proper configuration
+ * (MainEventBus, TaskStateProvider, cleanup callbacks, etc.).
+ *
+ * QueueManager implementations that use custom factories can call this method directly
+ * to get the base builder without going through the factory (which could cause infinite
+ * recursion if the factory delegates back to getEventQueueBuilder()).
+ *
+ *
+ * Callers can then add additional configuration (hooks, callbacks) before building the queue.
+ *
+ *
+ * @param taskId the task ID for the queue
+ * @return a builder with base configuration specific to this QueueManager implementation
+ */
+ default EventQueue.EventQueueBuilder createBaseEventQueueBuilder(String taskId) {
+ throw new UnsupportedOperationException(
+ "QueueManager implementations must override createBaseEventQueueBuilder() to provide MainEventBus"
+ );
}
/**
diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
index b59a9aedb..dcdac496a 100644
--- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
+++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
@@ -105,23 +105,20 @@ public class DefaultRequestHandler implements RequestHandler {
private final TaskStore taskStore;
private final QueueManager queueManager;
private final PushNotificationConfigStore pushConfigStore;
- private final PushNotificationSender pushSender;
private final Supplier requestContextBuilder;
private final ConcurrentMap> runningAgents = new ConcurrentHashMap<>();
- private final Set> backgroundTasks = ConcurrentHashMap.newKeySet();
private final Executor executor;
@Inject
public DefaultRequestHandler(AgentExecutor agentExecutor, TaskStore taskStore,
QueueManager queueManager, PushNotificationConfigStore pushConfigStore,
- PushNotificationSender pushSender, @Internal Executor executor) {
+ @Internal Executor executor) {
this.agentExecutor = agentExecutor;
this.taskStore = taskStore;
this.queueManager = queueManager;
this.pushConfigStore = pushConfigStore;
- this.pushSender = pushSender;
this.executor = executor;
// TODO In Python this is also a constructor parameter defaulting to this SimpleRequestContextBuilder
// implementation if the parameter is null. Skip that for now, since otherwise I get CDI errors, and
@@ -143,9 +140,9 @@ void initConfig() {
*/
public static DefaultRequestHandler create(AgentExecutor agentExecutor, TaskStore taskStore,
QueueManager queueManager, PushNotificationConfigStore pushConfigStore,
- PushNotificationSender pushSender, Executor executor) {
+ Executor executor) {
DefaultRequestHandler handler =
- new DefaultRequestHandler(agentExecutor, taskStore, queueManager, pushConfigStore, pushSender, executor);
+ new DefaultRequestHandler(agentExecutor, taskStore, queueManager, pushConfigStore, executor);
handler.agentCompletionTimeoutSeconds = 5;
handler.consumptionCompletionTimeoutSeconds = 2;
return handler;
@@ -229,7 +226,7 @@ public Task onCancelTask(TaskIdParams params, ServerCallContext context) throws
EventQueue queue = queueManager.tap(task.getId());
if (queue == null) {
- queue = queueManager.getEventQueueBuilder(task.getId()).build();
+ queue = queueManager.getEventQueueBuilder(task.getId()).build().tap();
}
agentExecutor.cancel(
requestContextBuilder.get()
@@ -274,15 +271,23 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
blocking = false;
}
+ // Log blocking behavior from client request
+ if (params.configuration() != null && params.configuration().blocking() != null) {
+ LOGGER.info("DefaultRequestHandler: Client requested blocking={} for task {}",
+ params.configuration().blocking(), taskId);
+ } else if (params.configuration() != null) {
+ LOGGER.info("DefaultRequestHandler: Client sent configuration but blocking=null, using default blocking=true for task {}", taskId);
+ } else {
+ LOGGER.info("DefaultRequestHandler: Client sent no configuration, using default blocking=true for task {}", taskId);
+ }
+ LOGGER.info("DefaultRequestHandler: Final blocking decision: {} for task {}", blocking, taskId);
+
boolean interruptedOrNonBlocking = false;
EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(taskId, mss.requestContext, queue);
ResultAggregator.EventTypeAndInterrupt etai = null;
EventKind kind = null; // Declare outside try block so it's in scope for return
try {
- // Create callback for push notifications during background event processing
- Runnable pushNotificationCallback = () -> sendPushNotification(taskId, resultAggregator);
-
EventConsumer consumer = new EventConsumer(queue);
// This callback must be added before we start consuming. Otherwise,
@@ -298,7 +303,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
throw new InternalError("No result");
}
interruptedOrNonBlocking = etai.interrupted();
- LOGGER.debug("Was interrupted or non-blocking: {}", interruptedOrNonBlocking);
+ LOGGER.info("DefaultRequestHandler: interruptedOrNonBlocking={} (blocking={}, eventType={})",
+ interruptedOrNonBlocking, blocking, kind != null ? kind.getClass().getSimpleName() : null);
// For blocking calls that were interrupted (returned on first event),
// wait for agent execution and event processing BEFORE returning to client.
@@ -310,7 +316,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
// Store push notification config for newly created tasks (mirrors streaming logic)
// Only for NEW tasks - existing tasks are handled by initMessageSend()
if (mss.task() == null && kind instanceof Task createdTask && shouldAddPushInfo(params)) {
- LOGGER.debug("Storing push notification config for new task {}", createdTask.getId());
+ LOGGER.debug("Storing push notification config for new task {} (original taskId from params: {})",
+ createdTask.getId(), params.message().getTaskId());
pushConfigStore.setInfo(createdTask.getId(), params.configuration().pushNotificationConfig());
}
@@ -321,16 +328,18 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
// 2. Close the queue to signal consumption can complete
// 3. Wait for consumption to finish processing events
// 4. Fetch final task state from TaskStore
+ LOGGER.info("DefaultRequestHandler: Entering blocking fire-and-forget handling for task {}", taskId);
try {
// Step 1: Wait for agent to finish (with configurable timeout)
if (agentFuture != null) {
try {
agentFuture.get(agentCompletionTimeoutSeconds, SECONDS);
- LOGGER.debug("Agent completed for task {}", taskId);
+ LOGGER.info("DefaultRequestHandler: Step 1 - Agent completed for task {}", taskId);
} catch (java.util.concurrent.TimeoutException e) {
// Agent still running after timeout - that's fine, events already being processed
- LOGGER.debug("Agent still running for task {} after {}s", taskId, agentCompletionTimeoutSeconds);
+ LOGGER.info("DefaultRequestHandler: Step 1 - Agent still running for task {} after {}s timeout",
+ taskId, agentCompletionTimeoutSeconds);
}
}
@@ -338,12 +347,12 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
// For fire-and-forget tasks, there's no final event, so we need to close the queue
// This allows EventConsumer.consumeAll() to exit
queue.close(false, false); // graceful close, don't notify parent yet
- LOGGER.debug("Closed queue for task {} to allow consumption completion", taskId);
+ LOGGER.info("DefaultRequestHandler: Step 2 - Closed queue for task {} to allow consumption completion", taskId);
// Step 3: Wait for consumption to complete (now that queue is closed)
if (etai.consumptionFuture() != null) {
etai.consumptionFuture().get(consumptionCompletionTimeoutSeconds, SECONDS);
- LOGGER.debug("Consumption completed for task {}", taskId);
+ LOGGER.info("DefaultRequestHandler: Step 3 - Consumption completed for task {}", taskId);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -364,27 +373,29 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
Task updatedTask = taskStore.get(taskId);
if (updatedTask != null) {
kind = updatedTask;
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Fetched final task for {} with state {} and {} artifacts",
- taskId, updatedTask.getStatus().state(),
- updatedTask.getArtifacts().size());
- }
+ LOGGER.info("DefaultRequestHandler: Step 4 - Fetched final task for {} with state {} and {} artifacts",
+ taskId, updatedTask.getStatus().state(),
+ updatedTask.getArtifacts().size());
+ } else {
+ LOGGER.warn("DefaultRequestHandler: Step 4 - Task {} not found in TaskStore!", taskId);
}
}
if (kind instanceof Task taskResult && !taskId.equals(taskResult.getId())) {
throw new InternalError("Task ID mismatch in agent response");
}
-
- // Send push notification after initial return (for both blocking and non-blocking)
- pushNotificationCallback.run();
} finally {
// Remove agent from map immediately to prevent accumulation
CompletableFuture agentFuture = runningAgents.remove(taskId);
LOGGER.debug("Removed agent for task {} from runningAgents in finally block, size after: {}", taskId, runningAgents.size());
- // Track cleanup as background task to avoid blocking Vert.x threads
+ // Cleanup as background task to avoid blocking Vert.x threads
// Pass the consumption future to ensure cleanup waits for background consumption to complete
- trackBackgroundTask(cleanupProducer(agentFuture, etai != null ? etai.consumptionFuture() : null, taskId, queue, false));
+ cleanupProducer(agentFuture, etai != null ? etai.consumptionFuture() : null, taskId, queue, false)
+ .whenComplete((res, err) -> {
+ if (err != null) {
+ LOGGER.error("Error during async cleanup for task {}", taskId, err);
+ }
+ });
}
LOGGER.debug("Returning: {}", kind);
@@ -394,24 +405,31 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
@Override
public Flow.Publisher onMessageSendStream(
MessageSendParams params, ServerCallContext context) throws JSONRPCError {
- LOGGER.debug("onMessageSendStream START - task: {}; context: {}; runningAgents: {}; backgroundTasks: {}",
- params.message().getTaskId(), params.message().getContextId(), runningAgents.size(), backgroundTasks.size());
+ LOGGER.debug("onMessageSendStream START - task: {}; context: {}; runningAgents: {}",
+ params.message().getTaskId(), params.message().getContextId(), runningAgents.size());
MessageSendSetup mss = initMessageSend(params, context);
AtomicReference taskId = new AtomicReference<>(mss.requestContext.getTaskId());
EventQueue queue = queueManager.createOrTap(taskId.get());
LOGGER.debug("Created/tapped queue for task {}: {}", taskId.get(), queue);
+
+ // Store push notification config SYNCHRONOUSLY for new tasks before agent starts
+ // This ensures config is available when MainEventBusProcessor sends push notifications
+ // For existing tasks, config was already stored in initMessageSend()
+ if (mss.task() == null && shouldAddPushInfo(params)) {
+ LOGGER.debug("Storing push notification config for new streaming task {} EARLY (original taskId from params: {})",
+ taskId.get(), params.message().getTaskId());
+ pushConfigStore.setInfo(taskId.get(), params.configuration().pushNotificationConfig());
+ }
+
ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, executor);
EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(taskId.get(), mss.requestContext, queue);
// Move consumer creation and callback registration outside try block
- // so consumer is available for background consumption on client disconnect
EventConsumer consumer = new EventConsumer(queue);
producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback());
- AtomicBoolean backgroundConsumeStarted = new AtomicBoolean(false);
-
try {
Flow.Publisher results = resultAggregator.consumeAndEmit(consumer);
@@ -432,22 +450,12 @@ public Flow.Publisher onMessageSendStream(
} catch (TaskQueueExistsException e) {
// TODO Log
}
- if (pushConfigStore != null &&
- params.configuration() != null &&
- params.configuration().pushNotificationConfig() != null) {
- pushConfigStore.setInfo(
- createdTask.getId(),
- params.configuration().pushNotificationConfig());
- }
+ // Push notification config already stored synchronously at start of onMessageSendStream
+ // for new tasks, or in initMessageSend for existing tasks. No need to store again here.
}
- if (pushSender != null && taskId.get() != null) {
- EventKind latest = resultAggregator.getCurrentResult();
- if (latest instanceof Task latestTask) {
- pushSender.sendNotification(latestTask);
- }
- }
+ // Push notifications now sent by MainEventBusProcessor after persistence
return true;
}));
@@ -457,7 +465,8 @@ public Flow.Publisher onMessageSendStream(
Flow.Publisher finalPublisher = convertingProcessor(eventPublisher, event -> (StreamingEventKind) event);
- // Wrap publisher to detect client disconnect and continue background consumption
+ // Wrap publisher to detect client disconnect and immediately close ChildQueue
+ // This prevents ChildQueue backpressure from blocking MainEventBusProcessor
return subscriber -> {
LOGGER.debug("Creating subscription wrapper for task {}", taskId.get());
finalPublisher.subscribe(new Flow.Subscriber() {
@@ -477,8 +486,10 @@ public void request(long n) {
@Override
public void cancel() {
- LOGGER.debug("Client cancelled subscription for task {}, starting background consumption", taskId.get());
- startBackgroundConsumption();
+ LOGGER.debug("Client cancelled subscription for task {}, closing ChildQueue immediately", taskId.get());
+ // Close ChildQueue immediately to prevent backpressure
+ // (clears queue and releases semaphore permits)
+ queue.close(true); // immediate=true
subscription.cancel();
}
});
@@ -503,8 +514,8 @@ public void onComplete() {
subscriber.onComplete();
} catch (IllegalStateException e) {
// Client already disconnected and response closed - this is expected
- // for streaming responses where client disconnect triggers background
- // consumption. Log and ignore.
+ // for streaming responses where client disconnect closes ChildQueue.
+ // Log and ignore.
if (e.getMessage() != null && e.getMessage().contains("Response has already been written")) {
LOGGER.debug("Client disconnected before onComplete, response already closed for task {}", taskId.get());
} else {
@@ -512,36 +523,22 @@ public void onComplete() {
}
}
}
-
- private void startBackgroundConsumption() {
- if (backgroundConsumeStarted.compareAndSet(false, true)) {
- LOGGER.debug("Starting background consumption for task {}", taskId.get());
- // Client disconnected: continue consuming and persisting events in background
- CompletableFuture bgTask = CompletableFuture.runAsync(() -> {
- try {
- LOGGER.debug("Background consumption thread started for task {}", taskId.get());
- resultAggregator.consumeAll(consumer);
- LOGGER.debug("Background consumption completed for task {}", taskId.get());
- } catch (Exception e) {
- LOGGER.error("Error during background consumption for task {}", taskId.get(), e);
- }
- }, executor);
- trackBackgroundTask(bgTask);
- } else {
- LOGGER.debug("Background consumption already started for task {}", taskId.get());
- }
- }
});
};
} finally {
- LOGGER.debug("onMessageSendStream FINALLY - task: {}; runningAgents: {}; backgroundTasks: {}",
- taskId.get(), runningAgents.size(), backgroundTasks.size());
+ LOGGER.debug("onMessageSendStream FINALLY - task: {}; runningAgents: {}",
+ taskId.get(), runningAgents.size());
// Remove agent from map immediately to prevent accumulation
CompletableFuture agentFuture = runningAgents.remove(taskId.get());
LOGGER.debug("Removed agent for task {} from runningAgents in finally block, size after: {}", taskId.get(), runningAgents.size());
- trackBackgroundTask(cleanupProducer(agentFuture, null, taskId.get(), queue, true));
+ cleanupProducer(agentFuture, null, taskId.get(), queue, true)
+ .whenComplete((res, err) -> {
+ if (err != null) {
+ LOGGER.error("Error during async cleanup for streaming task {}", taskId.get(), err);
+ }
+ });
}
}
@@ -708,47 +705,6 @@ public void run() {
return runnable;
}
- private void trackBackgroundTask(CompletableFuture task) {
- backgroundTasks.add(task);
- LOGGER.debug("Tracking background task (total: {}): {}", backgroundTasks.size(), task);
-
- task.whenComplete((result, throwable) -> {
- try {
- if (throwable != null) {
- // Unwrap CompletionException to check for CancellationException
- Throwable cause = throwable;
- if (throwable instanceof java.util.concurrent.CompletionException && throwable.getCause() != null) {
- cause = throwable.getCause();
- }
-
- if (cause instanceof java.util.concurrent.CancellationException) {
- LOGGER.debug("Background task cancelled: {}", task);
- } else {
- LOGGER.error("Background task failed", throwable);
- }
- }
- } finally {
- backgroundTasks.remove(task);
- LOGGER.debug("Removed background task (remaining: {}): {}", backgroundTasks.size(), task);
- }
- });
- }
-
- /**
- * Wait for all background tasks to complete.
- * Useful for testing to ensure cleanup completes before assertions.
- *
- * @return CompletableFuture that completes when all background tasks finish
- */
- public CompletableFuture waitForBackgroundTasks() {
- CompletableFuture>[] tasks = backgroundTasks.toArray(new CompletableFuture[0]);
- if (tasks.length == 0) {
- return CompletableFuture.completedFuture(null);
- }
- LOGGER.debug("Waiting for {} background tasks to complete", tasks.length);
- return CompletableFuture.allOf(tasks);
- }
-
private CompletableFuture cleanupProducer(CompletableFuture agentFuture, CompletableFuture consumptionFuture, String taskId, EventQueue queue, boolean isStreaming) {
LOGGER.debug("Starting cleanup for task {} (streaming={})", taskId, isStreaming);
logThreadStats("CLEANUP START");
@@ -773,14 +729,20 @@ private CompletableFuture cleanupProducer(CompletableFuture agentFut
LOGGER.debug("Agent and consumption both completed successfully for task {}", taskId);
}
- // Always close the ChildQueue and notify the parent MainQueue
- // The parent will close itself when all children are closed (childClosing logic)
- // This ensures proper cleanup and removal from QueueManager map
- LOGGER.debug("{} call, closing ChildQueue for task {} (immediate=false, notifyParent=true)",
- isStreaming ? "Streaming" : "Non-streaming", taskId);
+ if (isStreaming) {
+ // For streaming: Queue lifecycle managed by EventConsumer
+ // EventConsumer closes queue when it detects final event (or QueueClosedEvent from replication)
+ // For fire-and-forget tasks, MainQueue stays open per architectural principle
+ LOGGER.debug("Streaming call for task {} - queue lifecycle managed by EventConsumer", taskId);
+ } else {
+ // For non-streaming: close the ChildQueue and notify the parent MainQueue
+ // The parent will close itself when all children are closed (childClosing logic)
+ // This ensures proper cleanup and removal from QueueManager map
+ LOGGER.debug("Non-streaming call, closing ChildQueue for task {} (immediate=false, notifyParent=true)", taskId);
- // Always notify parent so MainQueue can clean up when last child closes
- queue.close(false, true);
+ // Always notify parent so MainQueue can clean up when last child closes
+ queue.close(false, true);
+ }
// For replicated environments, the poison pill is now sent via CDI events
// When JpaDatabaseTaskStore.save() persists a final task, it fires TaskFinalizedEvent
@@ -820,15 +782,6 @@ private MessageSendSetup initMessageSend(MessageSendParams params, ServerCallCon
return new MessageSendSetup(taskManager, task, requestContext);
}
- private void sendPushNotification(String taskId, ResultAggregator resultAggregator) {
- if (pushSender != null && taskId != null) {
- EventKind latest = resultAggregator.getCurrentResult();
- if (latest instanceof Task latestTask) {
- pushSender.sendNotification(latestTask);
- }
- }
- }
-
/**
* Log current thread and resource statistics for debugging.
* Only logs when DEBUG level is enabled. Call this from debugger or add strategic
@@ -850,7 +803,6 @@ private void logThreadStats(String label) {
LOGGER.debug("=== THREAD STATS: {} ===", label);
LOGGER.debug("Active threads: {}", activeThreads);
LOGGER.debug("Running agents: {}", runningAgents.size());
- LOGGER.debug("Background tasks: {}", backgroundTasks.size());
LOGGER.debug("Queue manager active queues: {}", queueManager.getClass().getSimpleName());
// List running agents
@@ -861,13 +813,6 @@ private void logThreadStats(String label) {
);
}
- // List background tasks
- if (!backgroundTasks.isEmpty()) {
- LOGGER.debug("Background tasks:");
- backgroundTasks.forEach(task ->
- LOGGER.debug(" - {}: {}", task, task.isDone() ? "DONE" : "RUNNING")
- );
- }
LOGGER.debug("=== END THREAD STATS ===");
}
diff --git a/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java b/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java
index 27de1defb..83ebf14e2 100644
--- a/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java
+++ b/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java
@@ -13,7 +13,6 @@
import io.a2a.server.events.EventConsumer;
import io.a2a.server.events.EventQueueItem;
-import io.a2a.spec.A2AServerException;
import io.a2a.spec.Event;
import io.a2a.spec.EventKind;
import io.a2a.spec.JSONRPCError;
@@ -48,18 +47,10 @@ public EventKind getCurrentResult() {
public Flow.Publisher consumeAndEmit(EventConsumer consumer) {
Flow.Publisher allItems = consumer.consumeAll();
- // Process items conditionally - only save non-replicated events to database
+ // Just stream events - no persistence needed
+ // TaskStore update moved to MainEventBusProcessor
return processor(createTubeConfig(), allItems, (errorConsumer, item) -> {
- // Only process non-replicated events to avoid duplicate database writes
- if (!item.isReplicated()) {
- try {
- callTaskManagerProcess(item.getEvent());
- } catch (A2AServerException e) {
- errorConsumer.accept(e);
- return false;
- }
- }
- // Continue processing and emit (both replicated and non-replicated)
+ // Continue processing and emit all events
return true;
});
}
@@ -80,15 +71,7 @@ public EventKind consumeAll(EventConsumer consumer) throws JSONRPCError {
return false;
}
}
- // Only process non-replicated events to avoid duplicate database writes
- if (!item.isReplicated()) {
- try {
- callTaskManagerProcess(event);
- } catch (A2AServerException e) {
- error.set(e);
- return false;
- }
- }
+ // TaskStore update moved to MainEventBusProcessor
return true;
},
error::set);
@@ -107,6 +90,7 @@ public EventKind consumeAll(EventConsumer consumer) throws JSONRPCError {
public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking) throws JSONRPCError {
Flow.Publisher allItems = consumer.consumeAll();
AtomicReference message = new AtomicReference<>();
+ AtomicReference capturedTask = new AtomicReference<>(); // Capture Task events
AtomicBoolean interrupted = new AtomicBoolean(false);
AtomicReference errorRef = new AtomicReference<>();
CompletableFuture completionFuture = new CompletableFuture<>();
@@ -140,25 +124,30 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer,
return false;
}
- // Process event through TaskManager - only for non-replicated events
- if (!item.isReplicated()) {
- try {
- callTaskManagerProcess(event);
- } catch (A2AServerException e) {
- errorRef.set(e);
- completionFuture.completeExceptionally(e);
- return false;
+ // Capture Task events (especially for new tasks where taskManager.getTask() would return null)
+ // We capture the LATEST task to ensure we get the most up-to-date state
+ if (event instanceof Task t) {
+ Task previousTask = capturedTask.get();
+ capturedTask.set(t);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Captured Task event: id={}, state={} (previous: {})",
+ t.getId(), t.getStatus().state(),
+ previousTask != null ? previousTask.getId() + "/" + previousTask.getStatus().state() : "none");
}
}
+ // TaskStore update moved to MainEventBusProcessor
+
// Determine interrupt behavior
boolean shouldInterrupt = false;
- boolean continueInBackground = false;
boolean isFinalEvent = (event instanceof Task task && task.getStatus().state().isFinal())
|| (event instanceof TaskStatusUpdateEvent tsue && tsue.isFinal());
boolean isAuthRequired = (event instanceof Task task && task.getStatus().state() == TaskState.AUTH_REQUIRED)
|| (event instanceof TaskStatusUpdateEvent tsue && tsue.getStatus().state() == TaskState.AUTH_REQUIRED);
+ LOGGER.info("ResultAggregator: Evaluating interrupt (blocking={}, isFinal={}, isAuth={}, eventType={})",
+ blocking, isFinalEvent, isAuthRequired, event.getClass().getSimpleName());
+
// Always interrupt on auth_required, as it needs external action.
if (isAuthRequired) {
// auth-required is a special state: the message should be
@@ -168,20 +157,19 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer,
// new request is expected in order for the agent to make progress,
// so the agent should exit.
shouldInterrupt = true;
- continueInBackground = true;
+ LOGGER.info("ResultAggregator: Setting shouldInterrupt=true (AUTH_REQUIRED)");
}
else if (!blocking) {
// For non-blocking calls, interrupt as soon as a task is available.
shouldInterrupt = true;
- continueInBackground = true;
+ LOGGER.info("ResultAggregator: Setting shouldInterrupt=true (non-blocking)");
}
else if (blocking) {
// For blocking calls: Interrupt to free Vert.x thread, but continue in background
// Python's async consumption doesn't block threads, but Java's does
// So we interrupt to return quickly, then rely on background consumption
- // DefaultRequestHandler will fetch the final state from TaskStore
shouldInterrupt = true;
- continueInBackground = true;
+ LOGGER.info("ResultAggregator: Setting shouldInterrupt=true (blocking, isFinal={})", isFinalEvent);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Blocking call for task {}: {} event, returning with background consumption",
taskIdForLogging(), isFinalEvent ? "final" : "non-final");
@@ -189,14 +177,14 @@ else if (blocking) {
}
if (shouldInterrupt) {
+ LOGGER.info("ResultAggregator: Interrupting consumption (setting interrupted=true)");
// Complete the future to unblock the main thread
interrupted.set(true);
completionFuture.complete(null);
// For blocking calls, DON'T complete consumptionCompletionFuture here.
// Let it complete naturally when subscription finishes (onComplete callback below).
- // This ensures all events are processed and persisted to TaskStore before
- // DefaultRequestHandler.cleanupProducer() proceeds with cleanup.
+ // This ensures all events are fully processed before cleanup.
//
// For non-blocking and auth-required calls, complete immediately to allow
// cleanup to proceed while consumption continues in background.
@@ -255,16 +243,27 @@ else if (blocking) {
Utils.rethrow(error);
}
+ // Return Message if captured, otherwise Task if captured, otherwise fetch from TaskStore
+ EventKind eventKind = message.get();
+ if (eventKind == null) {
+ eventKind = capturedTask.get();
+ if (LOGGER.isDebugEnabled() && eventKind instanceof Task t) {
+ LOGGER.debug("Returning capturedTask: id={}, state={}", t.getId(), t.getStatus().state());
+ }
+ }
+ if (eventKind == null) {
+ eventKind = taskManager.getTask();
+ if (LOGGER.isDebugEnabled() && eventKind instanceof Task t) {
+ LOGGER.debug("Returning task from TaskStore: id={}, state={}", t.getId(), t.getStatus().state());
+ }
+ }
+
return new EventTypeAndInterrupt(
- message.get() != null ? message.get() : taskManager.getTask(),
+ eventKind,
interrupted.get(),
consumptionCompletionFuture);
}
- private void callTaskManagerProcess(Event event) throws A2AServerException {
- taskManager.process(event);
- }
-
private String taskIdForLogging() {
Task task = taskManager.getTask();
return task != null ? task.getId() : "unknown";
diff --git a/server-common/src/test/java/io/a2a/server/events/EventConsumerTest.java b/server-common/src/test/java/io/a2a/server/events/EventConsumerTest.java
index 3e5611d9e..725ed33d8 100644
--- a/server-common/src/test/java/io/a2a/server/events/EventConsumerTest.java
+++ b/server-common/src/test/java/io/a2a/server/events/EventConsumerTest.java
@@ -15,6 +15,8 @@
import java.util.concurrent.atomic.AtomicReference;
import io.a2a.json.JsonProcessingException;
+import io.a2a.server.tasks.InMemoryTaskStore;
+import io.a2a.server.tasks.PushNotificationSender;
import io.a2a.spec.A2AError;
import io.a2a.spec.A2AServerException;
import io.a2a.spec.Artifact;
@@ -28,14 +30,19 @@
import io.a2a.spec.TaskStatusUpdateEvent;
import io.a2a.spec.TextPart;
import io.a2a.util.Utils;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class EventConsumerTest {
+ private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {};
+ private static final String TASK_ID = "123"; // Must match MINIMAL_TASK id
+
private EventQueue eventQueue;
private EventConsumer eventConsumer;
-
+ private MainEventBus mainEventBus;
+ private MainEventBusProcessor mainEventBusProcessor;
private static final String MINIMAL_TASK = """
{
@@ -57,10 +64,58 @@ public class EventConsumerTest {
@BeforeEach
public void init() {
- eventQueue = EventQueue.builder().build();
+ // Set up MainEventBus and processor for production-like test environment
+ InMemoryTaskStore taskStore = new InMemoryTaskStore();
+ mainEventBus = new MainEventBus();
+ mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER);
+ EventQueueUtil.start(mainEventBusProcessor);
+
+ eventQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus)
+ .taskId(TASK_ID)
+ .mainEventBus(mainEventBus)
+ .build().tap();
eventConsumer = new EventConsumer(eventQueue);
}
+ @AfterEach
+ public void cleanup() {
+ if (mainEventBusProcessor != null) {
+ mainEventBusProcessor.setCallback(null); // Clear any test callbacks
+ EventQueueUtil.stop(mainEventBusProcessor);
+ }
+ }
+
+ /**
+ * Helper to wait for MainEventBusProcessor to process an event.
+ * Replaces polling patterns with deterministic callback-based waiting.
+ *
+ * @param action the action that triggers event processing
+ * @throws InterruptedException if waiting is interrupted
+ * @throws AssertionError if processing doesn't complete within timeout
+ */
+ private void waitForEventProcessing(Runnable action) throws InterruptedException {
+ CountDownLatch processingLatch = new CountDownLatch(1);
+ mainEventBusProcessor.setCallback(new MainEventBusProcessorCallback() {
+ @Override
+ public void onEventProcessed(String taskId, Event event) {
+ processingLatch.countDown();
+ }
+
+ @Override
+ public void onTaskFinalized(String taskId) {
+ // Not needed for basic event processing wait
+ }
+ });
+
+ try {
+ action.run();
+ assertTrue(processingLatch.await(5, TimeUnit.SECONDS),
+ "MainEventBusProcessor should have processed the event within timeout");
+ } finally {
+ mainEventBusProcessor.setCallback(null);
+ }
+ }
+
@Test
public void testConsumeOneTaskEvent() throws Exception {
Task event = Utils.unmarshalFrom(MINIMAL_TASK, Task.class);
@@ -95,7 +150,7 @@ public void testConsumeAllMultipleEvents() throws JsonProcessingException {
List events = List.of(
Utils.unmarshalFrom(MINIMAL_TASK, Task.class),
new TaskArtifactUpdateEvent.Builder()
- .taskId("task-123")
+ .taskId(TASK_ID)
.contextId("session-xyz")
.artifact(new Artifact.Builder()
.artifactId("11")
@@ -103,7 +158,7 @@ public void testConsumeAllMultipleEvents() throws JsonProcessingException {
.build())
.build(),
new TaskStatusUpdateEvent.Builder()
- .taskId("task-123")
+ .taskId(TASK_ID)
.contextId("session-xyz")
.status(new TaskStatus(TaskState.WORKING))
.isFinal(true)
@@ -156,7 +211,7 @@ public void testConsumeUntilMessage() throws Exception {
List events = List.of(
Utils.unmarshalFrom(MINIMAL_TASK, Task.class),
new TaskArtifactUpdateEvent.Builder()
- .taskId("task-123")
+ .taskId(TASK_ID)
.contextId("session-xyz")
.artifact(new Artifact.Builder()
.artifactId("11")
@@ -164,7 +219,7 @@ public void testConsumeUntilMessage() throws Exception {
.build())
.build(),
new TaskStatusUpdateEvent.Builder()
- .taskId("task-123")
+ .taskId(TASK_ID)
.contextId("session-xyz")
.status(new TaskStatus(TaskState.WORKING))
.isFinal(true)
@@ -343,7 +398,9 @@ public void onComplete() {
@Test
public void testConsumeAllStopsOnQueueClosed() throws Exception {
- EventQueue queue = EventQueue.builder().build();
+ EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus)
+ .mainEventBus(mainEventBus)
+ .build().tap();
EventConsumer consumer = new EventConsumer(queue);
// Close the queue immediately
@@ -389,12 +446,16 @@ public void onComplete() {
@Test
public void testConsumeAllHandlesQueueClosedException() throws Exception {
- EventQueue queue = EventQueue.builder().build();
+ EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus)
+ .mainEventBus(mainEventBus)
+ .build().tap();
EventConsumer consumer = new EventConsumer(queue);
// Add a message event (which will complete the stream)
Event message = Utils.unmarshalFrom(MESSAGE_PAYLOAD, Message.class);
- queue.enqueueEvent(message);
+
+ // Use callback to wait for event processing
+ waitForEventProcessing(() -> queue.enqueueEvent(message));
// Close the queue before consuming
queue.close();
@@ -439,7 +500,9 @@ public void onComplete() {
@Test
public void testConsumeAllTerminatesOnQueueClosedEvent() throws Exception {
- EventQueue queue = EventQueue.builder().build();
+ EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus)
+ .mainEventBus(mainEventBus)
+ .build().tap();
EventConsumer consumer = new EventConsumer(queue);
// Enqueue a QueueClosedEvent (poison pill)
@@ -488,8 +551,12 @@ public void onComplete() {
}
private void enqueueAndConsumeOneEvent(Event event) throws Exception {
- eventQueue.enqueueEvent(event);
+ // Use callback to wait for event processing
+ waitForEventProcessing(() -> eventQueue.enqueueEvent(event));
+
+ // Event is now available, consume it directly
Event result = eventConsumer.consumeOne();
+ assertNotNull(result, "Event should be available");
assertSame(event, result);
}
diff --git a/server-common/src/test/java/io/a2a/server/events/EventQueueTest.java b/server-common/src/test/java/io/a2a/server/events/EventQueueTest.java
index 75888f151..14fe31c94 100644
--- a/server-common/src/test/java/io/a2a/server/events/EventQueueTest.java
+++ b/server-common/src/test/java/io/a2a/server/events/EventQueueTest.java
@@ -10,7 +10,11 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import io.a2a.server.tasks.InMemoryTaskStore;
+import io.a2a.server.tasks.PushNotificationSender;
import io.a2a.spec.Artifact;
import io.a2a.spec.Event;
import io.a2a.spec.JSONRPCError;
@@ -23,12 +27,17 @@
import io.a2a.spec.TaskStatusUpdateEvent;
import io.a2a.spec.TextPart;
import io.a2a.util.Utils;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class EventQueueTest {
private EventQueue eventQueue;
+ private MainEventBus mainEventBus;
+ private MainEventBusProcessor mainEventBusProcessor;
+
+ private static final String TASK_ID = "123"; // Must match MINIMAL_TASK id
private static final String MINIMAL_TASK = """
{
@@ -48,38 +57,95 @@ public class EventQueueTest {
}
""";
+ private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {};
@BeforeEach
public void init() {
- eventQueue = EventQueue.builder().build();
+ // Set up MainEventBus and processor for production-like test environment
+ InMemoryTaskStore taskStore = new InMemoryTaskStore();
+ mainEventBus = new MainEventBus();
+ mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER);
+ EventQueueUtil.start(mainEventBusProcessor);
+
+ eventQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus)
+ .taskId(TASK_ID)
+ .mainEventBus(mainEventBus)
+ .build().tap();
+ }
+
+ @AfterEach
+ public void cleanup() {
+ if (mainEventBusProcessor != null) {
+ mainEventBusProcessor.setCallback(null); // Clear any test callbacks
+ EventQueueUtil.stop(mainEventBusProcessor);
+ }
+ }
+ /**
+ * Helper to create a queue with MainEventBus configured (for tests that need event distribution).
+ */
+ private EventQueue createQueueWithEventBus(String taskId) {
+ return EventQueueUtil.getEventQueueBuilder(mainEventBus)
+ .taskId(taskId)
+ .build();
+ }
+
+ /**
+ * Helper to wait for MainEventBusProcessor to process an event.
+ * Replaces polling patterns with deterministic callback-based waiting.
+ *
+ * @param action the action that triggers event processing
+ * @throws InterruptedException if waiting is interrupted
+ * @throws AssertionError if processing doesn't complete within timeout
+ */
+ private void waitForEventProcessing(Runnable action) throws InterruptedException {
+ CountDownLatch processingLatch = new CountDownLatch(1);
+ mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() {
+ @Override
+ public void onEventProcessed(String taskId, io.a2a.spec.Event event) {
+ processingLatch.countDown();
+ }
+
+ @Override
+ public void onTaskFinalized(String taskId) {
+ // Not needed for basic event processing wait
+ }
+ });
+
+ try {
+ action.run();
+ assertTrue(processingLatch.await(5, TimeUnit.SECONDS),
+ "MainEventBusProcessor should have processed the event within timeout");
+ } finally {
+ mainEventBusProcessor.setCallback(null);
+ }
}
@Test
public void testConstructorDefaultQueueSize() {
- EventQueue queue = EventQueue.builder().build();
+ EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
assertEquals(EventQueue.DEFAULT_QUEUE_SIZE, queue.getQueueSize());
}
@Test
public void testConstructorCustomQueueSize() {
int customSize = 500;
- EventQueue queue = EventQueue.builder().queueSize(customSize).build();
+ EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).queueSize(customSize).build();
assertEquals(customSize, queue.getQueueSize());
}
@Test
public void testConstructorInvalidQueueSize() {
// Test zero queue size
- assertThrows(IllegalArgumentException.class, () -> EventQueue.builder().queueSize(0).build());
+ assertThrows(IllegalArgumentException.class, () -> EventQueueUtil.getEventQueueBuilder(mainEventBus).queueSize(0).build());
// Test negative queue size
- assertThrows(IllegalArgumentException.class, () -> EventQueue.builder().queueSize(-10).build());
+ assertThrows(IllegalArgumentException.class, () -> EventQueueUtil.getEventQueueBuilder(mainEventBus).queueSize(-10).build());
}
@Test
public void testTapCreatesChildQueue() {
- EventQueue parentQueue = EventQueue.builder().build();
+ EventQueue parentQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
EventQueue childQueue = parentQueue.tap();
assertNotNull(childQueue);
@@ -89,7 +155,7 @@ public void testTapCreatesChildQueue() {
@Test
public void testTapOnChildQueueThrowsException() {
- EventQueue parentQueue = EventQueue.builder().build();
+ EventQueue parentQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
EventQueue childQueue = parentQueue.tap();
assertThrows(IllegalStateException.class, () -> childQueue.tap());
@@ -97,69 +163,74 @@ public void testTapOnChildQueueThrowsException() {
@Test
public void testEnqueueEventPropagagesToChildren() throws Exception {
- EventQueue parentQueue = EventQueue.builder().build();
- EventQueue childQueue = parentQueue.tap();
+ EventQueue mainQueue = createQueueWithEventBus(TASK_ID);
+ EventQueue childQueue1 = mainQueue.tap();
+ EventQueue childQueue2 = mainQueue.tap();
Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.class);
- parentQueue.enqueueEvent(event);
+ mainQueue.enqueueEvent(event);
- // Event should be available in both parent and child queues
- Event parentEvent = parentQueue.dequeueEventItem(-1).getEvent();
- Event childEvent = childQueue.dequeueEventItem(-1).getEvent();
+ // Event should be available in all child queues
+ // Note: MainEventBusProcessor runs async, so we use dequeueEventItem with timeout
+ Event child1Event = childQueue1.dequeueEventItem(5000).getEvent();
+ Event child2Event = childQueue2.dequeueEventItem(5000).getEvent();
- assertSame(event, parentEvent);
- assertSame(event, childEvent);
+ assertSame(event, child1Event);
+ assertSame(event, child2Event);
}
@Test
public void testMultipleChildQueuesReceiveEvents() throws Exception {
- EventQueue parentQueue = EventQueue.builder().build();
- EventQueue childQueue1 = parentQueue.tap();
- EventQueue childQueue2 = parentQueue.tap();
+ EventQueue mainQueue = createQueueWithEventBus(TASK_ID);
+ EventQueue childQueue1 = mainQueue.tap();
+ EventQueue childQueue2 = mainQueue.tap();
+ EventQueue childQueue3 = mainQueue.tap();
Event event1 = Utils.unmarshalFrom(MINIMAL_TASK, Task.class);
Event event2 = Utils.unmarshalFrom(MESSAGE_PAYLOAD, Message.class);
- parentQueue.enqueueEvent(event1);
- parentQueue.enqueueEvent(event2);
+ mainQueue.enqueueEvent(event1);
+ mainQueue.enqueueEvent(event2);
- // All queues should receive both events
- assertSame(event1, parentQueue.dequeueEventItem(-1).getEvent());
- assertSame(event2, parentQueue.dequeueEventItem(-1).getEvent());
+ // All child queues should receive both events
+ // Note: Use timeout for async processing
+ assertSame(event1, childQueue1.dequeueEventItem(5000).getEvent());
+ assertSame(event2, childQueue1.dequeueEventItem(5000).getEvent());
- assertSame(event1, childQueue1.dequeueEventItem(-1).getEvent());
- assertSame(event2, childQueue1.dequeueEventItem(-1).getEvent());
+ assertSame(event1, childQueue2.dequeueEventItem(5000).getEvent());
+ assertSame(event2, childQueue2.dequeueEventItem(5000).getEvent());
- assertSame(event1, childQueue2.dequeueEventItem(-1).getEvent());
- assertSame(event2, childQueue2.dequeueEventItem(-1).getEvent());
+ assertSame(event1, childQueue3.dequeueEventItem(5000).getEvent());
+ assertSame(event2, childQueue3.dequeueEventItem(5000).getEvent());
}
@Test
public void testChildQueueDequeueIndependently() throws Exception {
- EventQueue parentQueue = EventQueue.builder().build();
- EventQueue childQueue1 = parentQueue.tap();
- EventQueue childQueue2 = parentQueue.tap();
+ EventQueue mainQueue = createQueueWithEventBus(TASK_ID);
+ EventQueue childQueue1 = mainQueue.tap();
+ EventQueue childQueue2 = mainQueue.tap();
+ EventQueue childQueue3 = mainQueue.tap();
Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.class);
- parentQueue.enqueueEvent(event);
+ mainQueue.enqueueEvent(event);
- // Dequeue from child1 first
- Event child1Event = childQueue1.dequeueEventItem(-1).getEvent();
+ // Dequeue from child1 first (use timeout for async processing)
+ Event child1Event = childQueue1.dequeueEventItem(5000).getEvent();
assertSame(event, child1Event);
// child2 should still have the event available
- Event child2Event = childQueue2.dequeueEventItem(-1).getEvent();
+ Event child2Event = childQueue2.dequeueEventItem(5000).getEvent();
assertSame(event, child2Event);
- // Parent should still have the event available
- Event parentEvent = parentQueue.dequeueEventItem(-1).getEvent();
- assertSame(event, parentEvent);
+ // child3 should still have the event available
+ Event child3Event = childQueue3.dequeueEventItem(5000).getEvent();
+ assertSame(event, child3Event);
}
@Test
public void testCloseImmediatePropagationToChildren() throws Exception {
- EventQueue parentQueue = EventQueue.builder().build();
+ EventQueue parentQueue = createQueueWithEventBus(TASK_ID);
EventQueue childQueue = parentQueue.tap();
// Add events to both parent and child
@@ -168,7 +239,7 @@ public void testCloseImmediatePropagationToChildren() throws Exception {
assertFalse(childQueue.isClosed());
try {
- assertNotNull(childQueue.dequeueEventItem(-1)); // Child has the event
+ assertNotNull(childQueue.dequeueEventItem(5000)); // Child has the event (use timeout)
} catch (EventQueueClosedException e) {
// This is fine if queue closed before dequeue
}
@@ -189,27 +260,34 @@ public void testCloseImmediatePropagationToChildren() throws Exception {
@Test
public void testEnqueueEventWhenClosed() throws Exception {
- EventQueue queue = EventQueue.builder().build();
+ EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus)
+ .taskId(TASK_ID)
+ .build();
+ EventQueue childQueue = mainQueue.tap();
Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.class);
- queue.close(); // Close the queue first
- assertTrue(queue.isClosed());
+ childQueue.close(); // Close the child queue first
+ assertTrue(childQueue.isClosed());
// MainQueue accepts events even when closed (for replication support)
// This ensures late-arriving replicated events can be enqueued to closed queues
- queue.enqueueEvent(event);
+ mainQueue.enqueueEvent(event);
- // Event should be available for dequeuing
- Event dequeuedEvent = queue.dequeueEventItem(-1).getEvent();
+ // Create a new child queue to receive the event (closed child won't receive it)
+ EventQueue newChildQueue = mainQueue.tap();
+ EventQueueItem item = newChildQueue.dequeueEventItem(5000);
+ assertNotNull(item);
+ Event dequeuedEvent = item.getEvent();
assertSame(event, dequeuedEvent);
- // Now queue is closed and empty, should throw exception
- assertThrows(EventQueueClosedException.class, () -> queue.dequeueEventItem(-1));
+ // Now new child queue is closed and empty, should throw exception
+ newChildQueue.close();
+ assertThrows(EventQueueClosedException.class, () -> newChildQueue.dequeueEventItem(-1));
}
@Test
public void testDequeueEventWhenClosedAndEmpty() throws Exception {
- EventQueue queue = EventQueue.builder().build();
+ EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build().tap();
queue.close();
assertTrue(queue.isClosed());
@@ -219,19 +297,27 @@ public void testDequeueEventWhenClosedAndEmpty() throws Exception {
@Test
public void testDequeueEventWhenClosedButHasEvents() throws Exception {
- EventQueue queue = EventQueue.builder().build();
+ EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus)
+ .taskId(TASK_ID)
+ .build();
+ EventQueue childQueue = mainQueue.tap();
Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.class);
- queue.enqueueEvent(event);
- queue.close(); // Graceful close - events should remain
- assertTrue(queue.isClosed());
+ // Use callback to wait for event processing instead of polling
+ waitForEventProcessing(() -> mainQueue.enqueueEvent(event));
- // Should still be able to dequeue existing events
- Event dequeuedEvent = queue.dequeueEventItem(-1).getEvent();
+ // At this point, event has been processed and distributed to childQueue
+ childQueue.close(); // Graceful close - events should remain
+ assertTrue(childQueue.isClosed());
+
+ // Should still be able to dequeue existing events from closed queue
+ EventQueueItem item = childQueue.dequeueEventItem(5000);
+ assertNotNull(item);
+ Event dequeuedEvent = item.getEvent();
assertSame(event, dequeuedEvent);
// Now queue is closed and empty, should throw exception
- assertThrows(EventQueueClosedException.class, () -> queue.dequeueEventItem(-1));
+ assertThrows(EventQueueClosedException.class, () -> childQueue.dequeueEventItem(-1));
}
@Test
@@ -246,7 +332,9 @@ public void testEnqueueAndDequeueEvent() throws Exception {
public void testDequeueEventNoWait() throws Exception {
Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.class);
eventQueue.enqueueEvent(event);
- Event dequeuedEvent = eventQueue.dequeueEventItem(-1).getEvent();
+ EventQueueItem item = eventQueue.dequeueEventItem(5000);
+ assertNotNull(item);
+ Event dequeuedEvent = item.getEvent();
assertSame(event, dequeuedEvent);
}
@@ -259,7 +347,7 @@ public void testDequeueEventEmptyQueueNoWait() throws Exception {
@Test
public void testDequeueEventWait() throws Exception {
Event event = new TaskStatusUpdateEvent.Builder()
- .taskId("task-123")
+ .taskId(TASK_ID)
.contextId("session-xyz")
.status(new TaskStatus(TaskState.WORKING))
.isFinal(true)
@@ -273,7 +361,7 @@ public void testDequeueEventWait() throws Exception {
@Test
public void testTaskDone() throws Exception {
Event event = new TaskArtifactUpdateEvent.Builder()
- .taskId("task-123")
+ .taskId(TASK_ID)
.contextId("session-xyz")
.artifact(new Artifact.Builder()
.artifactId("11")
@@ -349,7 +437,7 @@ public void testCloseIdempotent() throws Exception {
assertTrue(eventQueue.isClosed());
// Test with immediate close as well
- EventQueue eventQueue2 = EventQueue.builder().build();
+ EventQueue eventQueue2 = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
eventQueue2.close(true);
assertTrue(eventQueue2.isClosed());
@@ -363,19 +451,20 @@ public void testCloseIdempotent() throws Exception {
*/
@Test
public void testCloseChildQueues() throws Exception {
- EventQueue childQueue = eventQueue.tap();
+ EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
+ EventQueue childQueue = mainQueue.tap();
assertTrue(childQueue != null);
// Graceful close - parent closes but children remain open
- eventQueue.close();
- assertTrue(eventQueue.isClosed());
+ mainQueue.close();
+ assertTrue(mainQueue.isClosed());
assertFalse(childQueue.isClosed()); // Child NOT closed on graceful parent close
// Immediate close - parent force-closes all children
- EventQueue parentQueue2 = EventQueue.builder().build();
- EventQueue childQueue2 = parentQueue2.tap();
- parentQueue2.close(true); // immediate=true
- assertTrue(parentQueue2.isClosed());
+ EventQueue mainQueue2 = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
+ EventQueue childQueue2 = mainQueue2.tap();
+ mainQueue2.close(true); // immediate=true
+ assertTrue(mainQueue2.isClosed());
assertTrue(childQueue2.isClosed()); // Child IS closed on immediate parent close
}
@@ -385,7 +474,7 @@ public void testCloseChildQueues() throws Exception {
*/
@Test
public void testMainQueueReferenceCountingStaysOpenWithActiveChildren() throws Exception {
- EventQueue mainQueue = EventQueue.builder().build();
+ EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
EventQueue child1 = mainQueue.tap();
EventQueue child2 = mainQueue.tap();
diff --git a/server-common/src/test/java/io/a2a/server/events/EventQueueUtil.java b/server-common/src/test/java/io/a2a/server/events/EventQueueUtil.java
index 39201c1f6..6c9ed4a17 100644
--- a/server-common/src/test/java/io/a2a/server/events/EventQueueUtil.java
+++ b/server-common/src/test/java/io/a2a/server/events/EventQueueUtil.java
@@ -1,8 +1,39 @@
package io.a2a.server.events;
+import java.util.concurrent.atomic.AtomicInteger;
+
public class EventQueueUtil {
- // Since EventQueue.builder() is package protected, add a method to expose it
- public static EventQueue.EventQueueBuilder getEventQueueBuilder() {
- return EventQueue.builder();
+ // Counter for generating unique test taskIds
+ private static final AtomicInteger TASK_ID_COUNTER = new AtomicInteger(0);
+
+ /**
+ * Get an EventQueue builder pre-configured with the shared test MainEventBus and a unique taskId.
+ *
+ * Note: Returns MainQueue - tests should call .tap() if they need to consume events.
+ *
+ *
+ * @return builder with TEST_EVENT_BUS and unique taskId already set
+ */
+ public static EventQueue.EventQueueBuilder getEventQueueBuilder(MainEventBus eventBus) {
+ return EventQueue.builder(eventBus)
+ .taskId("test-task-" + TASK_ID_COUNTER.incrementAndGet());
+ }
+
+ /**
+ * Start a MainEventBusProcessor instance.
+ *
+ * @param processor the processor to start
+ */
+ public static void start(MainEventBusProcessor processor) {
+ processor.start();
+ }
+
+ /**
+ * Stop a MainEventBusProcessor instance.
+ *
+ * @param processor the processor to stop
+ */
+ public static void stop(MainEventBusProcessor processor) {
+ processor.stop();
}
}
diff --git a/server-common/src/test/java/io/a2a/server/events/InMemoryQueueManagerTest.java b/server-common/src/test/java/io/a2a/server/events/InMemoryQueueManagerTest.java
index 1eca1b739..808a1107a 100644
--- a/server-common/src/test/java/io/a2a/server/events/InMemoryQueueManagerTest.java
+++ b/server-common/src/test/java/io/a2a/server/events/InMemoryQueueManagerTest.java
@@ -14,7 +14,10 @@
import java.util.concurrent.ExecutionException;
import java.util.stream.IntStream;
+import io.a2a.server.tasks.InMemoryTaskStore;
import io.a2a.server.tasks.MockTaskStateProvider;
+import io.a2a.server.tasks.PushNotificationSender;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -22,17 +25,31 @@ public class InMemoryQueueManagerTest {
private InMemoryQueueManager queueManager;
private MockTaskStateProvider taskStateProvider;
+ private InMemoryTaskStore taskStore;
+ private MainEventBus mainEventBus;
+ private MainEventBusProcessor mainEventBusProcessor;
+ private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {};
@BeforeEach
public void setUp() {
taskStateProvider = new MockTaskStateProvider();
- queueManager = new InMemoryQueueManager(taskStateProvider);
+ taskStore = new InMemoryTaskStore();
+ mainEventBus = new MainEventBus();
+ mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER);
+ EventQueueUtil.start(mainEventBusProcessor);
+
+ queueManager = new InMemoryQueueManager(taskStateProvider, mainEventBus);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ EventQueueUtil.stop(mainEventBusProcessor);
}
@Test
public void testAddNewQueue() {
String taskId = "test_task_id";
- EventQueue queue = EventQueue.builder().build();
+ EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
queueManager.add(taskId, queue);
@@ -43,8 +60,8 @@ public void testAddNewQueue() {
@Test
public void testAddExistingQueueThrowsException() {
String taskId = "test_task_id";
- EventQueue queue1 = EventQueue.builder().build();
- EventQueue queue2 = EventQueue.builder().build();
+ EventQueue queue1 = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
+ EventQueue queue2 = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
queueManager.add(taskId, queue1);
@@ -56,7 +73,7 @@ public void testAddExistingQueueThrowsException() {
@Test
public void testGetExistingQueue() {
String taskId = "test_task_id";
- EventQueue queue = EventQueue.builder().build();
+ EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
queueManager.add(taskId, queue);
EventQueue result = queueManager.get(taskId);
@@ -73,7 +90,7 @@ public void testGetNonexistentQueue() {
@Test
public void testTapExistingQueue() {
String taskId = "test_task_id";
- EventQueue queue = EventQueue.builder().build();
+ EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
queueManager.add(taskId, queue);
EventQueue tappedQueue = queueManager.tap(taskId);
@@ -94,7 +111,7 @@ public void testTapNonexistentQueue() {
@Test
public void testCloseExistingQueue() {
String taskId = "test_task_id";
- EventQueue queue = EventQueue.builder().build();
+ EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
queueManager.add(taskId, queue);
queueManager.close(taskId);
@@ -129,7 +146,7 @@ public void testCreateOrTapNewQueue() {
@Test
public void testCreateOrTapExistingQueue() {
String taskId = "test_task_id";
- EventQueue originalQueue = EventQueue.builder().build();
+ EventQueue originalQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
queueManager.add(taskId, originalQueue);
EventQueue result = queueManager.createOrTap(taskId);
@@ -151,7 +168,7 @@ public void testConcurrentOperations() throws InterruptedException, ExecutionExc
// Add tasks concurrently
List> addFutures = taskIds.stream()
.map(taskId -> CompletableFuture.supplyAsync(() -> {
- EventQueue queue = EventQueue.builder().build();
+ EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build();
queueManager.add(taskId, queue);
return taskId;
}))
diff --git a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java
index c11330f33..65cdbbf4d 100644
--- a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java
+++ b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java
@@ -21,7 +21,10 @@
import io.a2a.server.agentexecution.RequestContext;
import io.a2a.server.events.EventQueue;
import io.a2a.server.events.EventQueueItem;
+import io.a2a.server.events.EventQueueUtil;
import io.a2a.server.events.InMemoryQueueManager;
+import io.a2a.server.events.MainEventBus;
+import io.a2a.server.events.MainEventBusProcessor;
import io.a2a.server.tasks.BasePushNotificationSender;
import io.a2a.server.tasks.InMemoryPushNotificationConfigStore;
import io.a2a.server.tasks.InMemoryTaskStore;
@@ -65,6 +68,8 @@ public class AbstractA2ARequestHandlerTest {
private static final String PREFERRED_TRANSPORT = "preferred-transport";
private static final String A2A_REQUESTHANDLER_TEST_PROPERTIES = "/a2a-requesthandler-test.properties";
+ private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {};
+
protected AgentExecutor executor;
protected TaskStore taskStore;
protected RequestHandler requestHandler;
@@ -72,6 +77,8 @@ public class AbstractA2ARequestHandlerTest {
protected AgentExecutorMethod agentExecutorCancel;
protected InMemoryQueueManager queueManager;
protected TestHttpClient httpClient;
+ protected MainEventBus mainEventBus;
+ protected MainEventBusProcessor mainEventBusProcessor;
protected final Executor internalExecutor = Executors.newCachedThreadPool();
@@ -95,19 +102,32 @@ public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPC
InMemoryTaskStore inMemoryTaskStore = new InMemoryTaskStore();
taskStore = inMemoryTaskStore;
- queueManager = new InMemoryQueueManager(inMemoryTaskStore);
+
+ // Create push notification components BEFORE MainEventBusProcessor
httpClient = new TestHttpClient();
PushNotificationConfigStore pushConfigStore = new InMemoryPushNotificationConfigStore();
PushNotificationSender pushSender = new BasePushNotificationSender(pushConfigStore, httpClient);
+ // Create MainEventBus and MainEventBusProcessor (production code path)
+ mainEventBus = new MainEventBus();
+ mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, pushSender);
+ EventQueueUtil.start(mainEventBusProcessor);
+
+ queueManager = new InMemoryQueueManager(inMemoryTaskStore, mainEventBus);
+
requestHandler = DefaultRequestHandler.create(
- executor, taskStore, queueManager, pushConfigStore, pushSender, internalExecutor);
+ executor, taskStore, queueManager, pushConfigStore, internalExecutor);
}
@AfterEach
public void cleanup() {
agentExecutorExecute = null;
agentExecutorCancel = null;
+
+ // Stop MainEventBusProcessor background thread
+ if (mainEventBusProcessor != null) {
+ EventQueueUtil.stop(mainEventBusProcessor);
+ }
}
protected static AgentCard createAgentCard(boolean streaming, boolean pushNotifications, boolean stateTransitionHistory) {
diff --git a/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java b/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java
index acaa531ad..04900ba5c 100644
--- a/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java
+++ b/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java
@@ -19,9 +19,13 @@
import io.a2a.server.agentexecution.RequestContext;
import io.a2a.server.auth.UnauthenticatedUser;
import io.a2a.server.events.EventQueue;
+import io.a2a.server.events.EventQueueUtil;
import io.a2a.server.events.InMemoryQueueManager;
+import io.a2a.server.events.MainEventBus;
+import io.a2a.server.events.MainEventBusProcessor;
import io.a2a.server.tasks.InMemoryPushNotificationConfigStore;
import io.a2a.server.tasks.InMemoryTaskStore;
+import io.a2a.server.tasks.PushNotificationSender;
import io.a2a.server.tasks.TaskUpdater;
import io.a2a.spec.JSONRPCError;
import io.a2a.spec.Message;
@@ -32,6 +36,7 @@
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatus;
import io.a2a.spec.TextPart;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -50,12 +55,23 @@ public class DefaultRequestHandlerTest {
private InMemoryQueueManager queueManager;
private TestAgentExecutor agentExecutor;
private ServerCallContext serverCallContext;
+ private MainEventBus mainEventBus;
+ private MainEventBusProcessor mainEventBusProcessor;
+
+ private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {};
@BeforeEach
void setUp() {
taskStore = new InMemoryTaskStore();
+
+ // Create MainEventBus and MainEventBusProcessor (production code path)
+ mainEventBus = new MainEventBus();
+ mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER);
+ EventQueueUtil.start(mainEventBusProcessor);
+
// Pass taskStore as TaskStateProvider to queueManager for task-aware queue management
- queueManager = new InMemoryQueueManager(taskStore);
+ queueManager = new InMemoryQueueManager(taskStore, mainEventBus);
+
agentExecutor = new TestAgentExecutor();
requestHandler = DefaultRequestHandler.create(
@@ -63,13 +79,52 @@ void setUp() {
taskStore,
queueManager,
null, // pushConfigStore
- null, // pushSender
Executors.newCachedThreadPool()
);
serverCallContext = new ServerCallContext(UnauthenticatedUser.INSTANCE, Map.of(), Set.of());
}
+ @AfterEach
+ void tearDown() {
+ // Stop MainEventBusProcessor background thread
+ if (mainEventBusProcessor != null) {
+ mainEventBusProcessor.setCallback(null); // Clear any test callbacks
+ EventQueueUtil.stop(mainEventBusProcessor);
+ }
+ }
+
+ /**
+ * Helper to wait for MainEventBusProcessor to finalize a task.
+ * Replaces polling patterns with deterministic callback-based waiting.
+ *
+ * @param action the action that triggers task finalization (e.g., enqueuing a final event)
+ * @throws InterruptedException if waiting is interrupted
+ * @throws AssertionError if finalization doesn't complete within timeout
+ */
+ private void waitForTaskFinalization(Runnable action) throws InterruptedException {
+ CountDownLatch finalizationLatch = new CountDownLatch(1);
+ mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() {
+ @Override
+ public void onEventProcessed(String taskId, io.a2a.spec.Event event) {
+ // Not used for task finalization wait
+ }
+
+ @Override
+ public void onTaskFinalized(String taskId) {
+ finalizationLatch.countDown();
+ }
+ });
+
+ try {
+ action.run();
+ assertTrue(finalizationLatch.await(5, TimeUnit.SECONDS),
+ "MainEventBusProcessor should have finalized the task within timeout");
+ } finally {
+ mainEventBusProcessor.setCallback(null);
+ }
+ }
+
/**
* Test that multiple blocking messages to the same task work correctly
* when agent doesn't emit final events (fire-and-forget pattern).
@@ -577,32 +632,15 @@ void testNonBlockingMessagePersistsAllEventsInBackground() throws Exception {
// At this point, the non-blocking call has returned, but the agent is still running
- // Allow the agent to emit the final COMPLETED event
- allowCompletion.countDown();
+ // Assertion 2: Wait for the final task to be processed and finalized in background
+ // Use callback to wait for task finalization instead of polling
+ waitForTaskFinalization(() -> allowCompletion.countDown());
- // Assertion 2: Poll for the final task state to be persisted in background
- // Use polling loop instead of fixed sleep for faster and more reliable test
- long timeoutMs = 5000;
- long startTime = System.currentTimeMillis();
- Task persistedTask = null;
- boolean completedStateFound = false;
-
- while (System.currentTimeMillis() - startTime < timeoutMs) {
- persistedTask = taskStore.get(taskId);
- if (persistedTask != null && persistedTask.getStatus().state() == TaskState.COMPLETED) {
- completedStateFound = true;
- break;
- }
- Thread.sleep(100); // Poll every 100ms
- }
-
- assertTrue(persistedTask != null, "Task should be persisted to store");
- assertTrue(
- completedStateFound,
- "Final task state should be COMPLETED (background consumption should have processed it), got: " +
- (persistedTask != null ? persistedTask.getStatus().state() : "null") +
- " after " + (System.currentTimeMillis() - startTime) + "ms"
- );
+ // Verify the task was persisted with COMPLETED state
+ Task persistedTask = taskStore.get(taskId);
+ assertNotNull(persistedTask, "Task should be persisted to store");
+ assertEquals(TaskState.COMPLETED, persistedTask.getStatus().state(),
+ "Final task state should be COMPLETED (background consumption should have processed it)");
}
/**
@@ -779,16 +817,42 @@ void testBlockingCallReturnsCompleteTaskWithArtifacts() throws Exception {
updater.complete();
});
- // Call blocking onMessageSend - should wait for ALL events
- Object result = requestHandler.onMessageSend(params, serverCallContext);
+ // Use callback to ensure task finalization is complete before checking TaskStore
+ // This ensures MainEventBusProcessor has finished persisting the final state
+ CountDownLatch finalizationLatch = new CountDownLatch(1);
+ mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() {
+ @Override
+ public void onEventProcessed(String taskId, io.a2a.spec.Event event) {
+ // Not used
+ }
- // The returned result should be a Task with ALL artifacts
- assertTrue(result instanceof Task, "Result should be a Task");
- Task returnedTask = (Task) result;
+ @Override
+ public void onTaskFinalized(String taskId) {
+ finalizationLatch.countDown();
+ }
+ });
+
+ Task returnedTask;
+ try {
+ // Call blocking onMessageSend - should wait for ALL events
+ Object result = requestHandler.onMessageSend(params, serverCallContext);
- // Verify task is completed
- assertEquals(TaskState.COMPLETED, returnedTask.getStatus().state(),
- "Returned task should be COMPLETED");
+ // Wait for finalization callback to ensure TaskStore is fully updated
+ assertTrue(finalizationLatch.await(5, TimeUnit.SECONDS),
+ "Task should be finalized within timeout");
+
+ // The returned result should be a Task with ALL artifacts
+ assertTrue(result instanceof Task, "Result should be a Task");
+ returnedTask = (Task) result;
+
+ // Fetch final state from TaskStore (guaranteed to be persisted after callback)
+ returnedTask = taskStore.get(taskId);
+
+ assertEquals(TaskState.COMPLETED, returnedTask.getStatus().state(),
+ "Returned task should be COMPLETED");
+ } finally {
+ mainEventBusProcessor.setCallback(null);
+ }
// Verify artifacts are included in the returned task
assertNotNull(returnedTask.getArtifacts(),
@@ -823,7 +887,6 @@ void testBlockingMessageStoresPushNotificationConfigForNewTask() throws Exceptio
taskStore,
queueManager,
pushConfigStore, // Add push config store
- null, // pushSender
Executors.newCachedThreadPool()
);
@@ -893,7 +956,6 @@ void testMessageStoresPushNotificationConfigForExistingTask() throws Exception {
taskStore,
queueManager,
pushConfigStore, // Add push config store
- null, // pushSender
Executors.newCachedThreadPool()
);
diff --git a/server-common/src/test/java/io/a2a/server/tasks/ResultAggregatorTest.java b/server-common/src/test/java/io/a2a/server/tasks/ResultAggregatorTest.java
index 0db54c373..6c95b494d 100644
--- a/server-common/src/test/java/io/a2a/server/tasks/ResultAggregatorTest.java
+++ b/server-common/src/test/java/io/a2a/server/tasks/ResultAggregatorTest.java
@@ -11,18 +11,25 @@
import static org.mockito.Mockito.when;
import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import io.a2a.server.events.EventConsumer;
import io.a2a.server.events.EventQueue;
+import io.a2a.server.events.EventQueueUtil;
import io.a2a.server.events.InMemoryQueueManager;
+import io.a2a.server.events.MainEventBus;
+import io.a2a.server.events.MainEventBusProcessor;
+import io.a2a.spec.Event;
import io.a2a.spec.EventKind;
import io.a2a.spec.Message;
import io.a2a.spec.Task;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatus;
import io.a2a.spec.TextPart;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
@@ -69,6 +76,38 @@ private Task createSampleTask(String taskId, TaskState statusState, String conte
.build();
}
+ /**
+ * Helper to wait for MainEventBusProcessor to process an event.
+ * Replaces polling patterns with deterministic callback-based waiting.
+ *
+ * @param processor the processor to set callback on
+ * @param action the action that triggers event processing
+ * @throws InterruptedException if waiting is interrupted
+ * @throws AssertionError if processing doesn't complete within timeout
+ */
+ private void waitForEventProcessing(MainEventBusProcessor processor, Runnable action) throws InterruptedException {
+ CountDownLatch processingLatch = new CountDownLatch(1);
+ processor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() {
+ @Override
+ public void onEventProcessed(String taskId, Event event) {
+ processingLatch.countDown();
+ }
+
+ @Override
+ public void onTaskFinalized(String taskId) {
+ // Not needed for basic event processing wait
+ }
+ });
+
+ try {
+ action.run();
+ assertTrue(processingLatch.await(5, TimeUnit.SECONDS),
+ "MainEventBusProcessor should have processed the event within timeout");
+ } finally {
+ processor.setCallback(null);
+ }
+ }
+
// Basic functionality tests
@@ -197,17 +236,25 @@ void testGetCurrentResultWithMessageTakesPrecedence() {
@Test
void testConsumeAndBreakNonBlocking() throws Exception {
// Test that with blocking=false, the method returns after the first event
- Task firstEvent = createSampleTask("non_blocking_task", TaskState.WORKING, "ctx1");
+ String taskId = "test-task";
+ Task firstEvent = createSampleTask(taskId, TaskState.WORKING, "ctx1");
// After processing firstEvent, the current result will be that task
when(mockTaskManager.getTask()).thenReturn(firstEvent);
// Create an event queue using QueueManager (which has access to builder)
+ MainEventBus mainEventBus = new MainEventBus();
+ InMemoryTaskStore taskStore = new InMemoryTaskStore();
+ MainEventBusProcessor processor = new MainEventBusProcessor(mainEventBus, taskStore, task -> {});
+ EventQueueUtil.start(processor);
+
InMemoryQueueManager queueManager =
- new InMemoryQueueManager(new MockTaskStateProvider());
+ new InMemoryQueueManager(new MockTaskStateProvider(), mainEventBus);
- EventQueue queue = queueManager.getEventQueueBuilder("test-task").build();
- queue.enqueueEvent(firstEvent);
+ EventQueue queue = queueManager.getEventQueueBuilder(taskId).build().tap();
+
+ // Use callback to wait for event processing (replaces polling)
+ waitForEventProcessing(processor, () -> queue.enqueueEvent(firstEvent));
// Create real EventConsumer with the queue
EventConsumer eventConsumer =
@@ -221,11 +268,15 @@ void testConsumeAndBreakNonBlocking() throws Exception {
assertEquals(firstEvent, result.eventType());
assertTrue(result.interrupted());
- verify(mockTaskManager).process(firstEvent);
+ // NOTE: ResultAggregator no longer calls taskManager.process()
+ // That responsibility has moved to MainEventBusProcessor for centralized persistence
// getTask() is called at least once for the return value (line 255)
// May be called once more if debug logging executes in time (line 209)
// The async consumer may or may not execute before verification, so we accept 1-2 calls
verify(mockTaskManager, atLeast(1)).getTask();
verify(mockTaskManager, atMost(2)).getTask();
+
+ // Cleanup: stop the processor
+ EventQueueUtil.stop(processor);
}
}
diff --git a/server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java b/server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java
index 0e99f57b5..871ba352f 100644
--- a/server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java
+++ b/server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java
@@ -14,7 +14,10 @@
import io.a2a.server.agentexecution.RequestContext;
import io.a2a.server.events.EventQueue;
+import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.EventQueueUtil;
+import io.a2a.server.events.MainEventBus;
+import io.a2a.server.events.MainEventBusProcessor;
import io.a2a.spec.Event;
import io.a2a.spec.Message;
import io.a2a.spec.Part;
@@ -22,6 +25,7 @@
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatusUpdateEvent;
import io.a2a.spec.TextPart;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -38,14 +42,27 @@ public class TaskUpdaterTest {
private static final List> SAMPLE_PARTS = List.of(new TextPart("Test message"));
+ private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {};
+
EventQueue eventQueue;
+ private MainEventBus mainEventBus;
+ private MainEventBusProcessor mainEventBusProcessor;
private TaskUpdater taskUpdater;
@BeforeEach
public void init() {
- eventQueue = EventQueueUtil.getEventQueueBuilder().build();
+ // Set up MainEventBus and processor for production-like test environment
+ InMemoryTaskStore taskStore = new InMemoryTaskStore();
+ mainEventBus = new MainEventBus();
+ mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER);
+ EventQueueUtil.start(mainEventBusProcessor);
+
+ eventQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus)
+ .taskId(TEST_TASK_ID)
+ .mainEventBus(mainEventBus)
+ .build().tap();
RequestContext context = new RequestContext.Builder()
.setTaskId(TEST_TASK_ID)
.setContextId(TEST_TASK_CONTEXT_ID)
@@ -53,10 +70,19 @@ public void init() {
taskUpdater = new TaskUpdater(context, eventQueue);
}
+ @AfterEach
+ public void cleanup() {
+ if (mainEventBusProcessor != null) {
+ EventQueueUtil.stop(mainEventBusProcessor);
+ }
+ }
+
@Test
public void testAddArtifactWithCustomIdAndName() throws Exception {
taskUpdater.addArtifact(SAMPLE_PARTS, "custom-artifact-id", "Custom Artifact", null);
- Event event = eventQueue.dequeueEventItem(0).getEvent();
+ EventQueueItem item = eventQueue.dequeueEventItem(5000);
+ assertNotNull(item);
+ Event event = item.getEvent();
assertNotNull(event);
assertInstanceOf(TaskArtifactUpdateEvent.class, event);
@@ -239,7 +265,9 @@ public void testNewAgentMessageWithMetadata() throws Exception {
@Test
public void testAddArtifactWithAppendTrue() throws Exception {
taskUpdater.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, true, null);
- Event event = eventQueue.dequeueEventItem(0).getEvent();
+ EventQueueItem item = eventQueue.dequeueEventItem(5000);
+ assertNotNull(item);
+ Event event = item.getEvent();
assertNotNull(event);
assertInstanceOf(TaskArtifactUpdateEvent.class, event);
@@ -258,7 +286,9 @@ public void testAddArtifactWithAppendTrue() throws Exception {
@Test
public void testAddArtifactWithLastChunkTrue() throws Exception {
taskUpdater.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, null, true);
- Event event = eventQueue.dequeueEventItem(0).getEvent();
+ EventQueueItem item = eventQueue.dequeueEventItem(5000);
+ assertNotNull(item);
+ Event event = item.getEvent();
assertNotNull(event);
assertInstanceOf(TaskArtifactUpdateEvent.class, event);
@@ -273,7 +303,9 @@ public void testAddArtifactWithLastChunkTrue() throws Exception {
@Test
public void testAddArtifactWithAppendAndLastChunk() throws Exception {
taskUpdater.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, true, false);
- Event event = eventQueue.dequeueEventItem(0).getEvent();
+ EventQueueItem item = eventQueue.dequeueEventItem(5000);
+ assertNotNull(item);
+ Event event = item.getEvent();
assertNotNull(event);
assertInstanceOf(TaskArtifactUpdateEvent.class, event);
@@ -287,7 +319,9 @@ public void testAddArtifactWithAppendAndLastChunk() throws Exception {
@Test
public void testAddArtifactGeneratesIdWhenNull() throws Exception {
taskUpdater.addArtifact(SAMPLE_PARTS, null, "Test Artifact", null);
- Event event = eventQueue.dequeueEventItem(0).getEvent();
+ EventQueueItem item = eventQueue.dequeueEventItem(5000);
+ assertNotNull(item);
+ Event event = item.getEvent();
assertNotNull(event);
assertInstanceOf(TaskArtifactUpdateEvent.class, event);
@@ -383,7 +417,9 @@ public void testConcurrentCompletionAttempts() throws Exception {
thread2.join();
// Exactly one event should have been queued
- Event event = eventQueue.dequeueEventItem(0).getEvent();
+ EventQueueItem item = eventQueue.dequeueEventItem(5000);
+ assertNotNull(item);
+ Event event = item.getEvent();
assertNotNull(event);
assertInstanceOf(TaskStatusUpdateEvent.class, event);
@@ -396,7 +432,10 @@ public void testConcurrentCompletionAttempts() throws Exception {
}
private TaskStatusUpdateEvent checkTaskStatusUpdateEventOnQueue(boolean isFinal, TaskState state, Message statusMessage) throws Exception {
- Event event = eventQueue.dequeueEventItem(0).getEvent();
+ // Wait up to 5 seconds for event (async MainEventBusProcessor needs time to distribute)
+ EventQueueItem item = eventQueue.dequeueEventItem(5000);
+ assertNotNull(item);
+ Event event = item.getEvent();
assertNotNull(event);
assertInstanceOf(TaskStatusUpdateEvent.class, event);
@@ -408,6 +447,7 @@ private TaskStatusUpdateEvent checkTaskStatusUpdateEventOnQueue(boolean isFinal,
assertEquals(state, tsue.getStatus().state());
assertEquals(statusMessage, tsue.getStatus().message());
+ // Check no additional events (still use 0 timeout for this check)
assertNull(eventQueue.dequeueEventItem(0));
return tsue;
diff --git a/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java b/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java
index 44c465508..dff832ecb 100644
--- a/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java
+++ b/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java
@@ -269,7 +269,7 @@ public void testPushNotificationsNotSupportedError() throws Exception {
public void testOnGetPushNotificationNoPushNotifierConfig() throws Exception {
// Create request handler without a push notifier
DefaultRequestHandler requestHandler =
- new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor);
+ new DefaultRequestHandler(executor, taskStore, queueManager,null, internalExecutor);
AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(false, true, false);
GrpcHandler handler = new TestGrpcHandler(card, requestHandler, internalExecutor);
String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId();
@@ -281,7 +281,7 @@ public void testOnGetPushNotificationNoPushNotifierConfig() throws Exception {
public void testOnSetPushNotificationNoPushNotifierConfig() throws Exception {
// Create request handler without a push notifier
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(
- executor, taskStore, queueManager, null, null, internalExecutor);
+ executor, taskStore, queueManager, null, internalExecutor);
AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(false, true, false);
GrpcHandler handler = new TestGrpcHandler(card, requestHandler, internalExecutor);
String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId();
@@ -656,7 +656,7 @@ public void testListPushNotificationConfigNotSupported() throws Exception {
@Test
public void testListPushNotificationConfigNoPushConfigStore() {
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(
- executor, taskStore, queueManager, null, null, internalExecutor);
+ executor, taskStore, queueManager, null, internalExecutor);
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK);
agentExecutorExecute = (context, eventQueue) -> {
@@ -729,7 +729,7 @@ public void testDeletePushNotificationConfigNotSupported() throws Exception {
@Test
public void testDeletePushNotificationConfigNoPushConfigStore() {
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(
- executor, taskStore, queueManager, null, null, internalExecutor);
+ executor, taskStore, queueManager, null, internalExecutor);
GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor);
String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId();
DeleteTaskPushNotificationConfigRequest request = DeleteTaskPushNotificationConfigRequest.newBuilder()
diff --git a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java
index dc2088a2d..916bfafd3 100644
--- a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java
+++ b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java
@@ -164,36 +164,9 @@ public void testOnMessageNewMessageSuccess() {
SendMessageRequest request = new SendMessageRequest("1", new MessageSendParams(message, null, null));
SendMessageResponse response = handler.onMessageSend(request, callContext);
assertNull(response.getError());
- // The Python implementation returns a Task here, but then again they are using hardcoded mocks and
- // bypassing the whole EventQueue.
- // If we were to send a Task in agentExecutorExecute EventConsumer.consumeAll() would not exit due to
- // the Task not having a 'final' state
- //
- // See testOnMessageNewMessageSuccessMocks() for a test more similar to the Python implementation
Assertions.assertSame(message, response.getResult());
}
- @Test
- public void testOnMessageNewMessageSuccessMocks() {
- JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
-
- Message message = new Message.Builder(MESSAGE)
- .taskId(MINIMAL_TASK.getId())
- .contextId(MINIMAL_TASK.getContextId())
- .build();
-
- SendMessageRequest request = new SendMessageRequest("1", new MessageSendParams(message, null, null));
- SendMessageResponse response;
- try (MockedConstruction mocked = Mockito.mockConstruction(
- EventConsumer.class,
- (mock, context) -> {Mockito.doReturn(ZeroPublisher.fromItems(wrapEvent(MINIMAL_TASK))).when(mock).consumeAll();
- Mockito.doCallRealMethod().when(mock).createAgentRunnableDoneCallback();})){
- response = handler.onMessageSend(request, callContext);
- }
- assertNull(response.getError());
- Assertions.assertSame(MINIMAL_TASK, response.getResult());
- }
-
@Test
public void testOnMessageNewMessageWithExistingTaskSuccess() {
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
@@ -208,37 +181,9 @@ public void testOnMessageNewMessageWithExistingTaskSuccess() {
SendMessageRequest request = new SendMessageRequest("1", new MessageSendParams(message, null, null));
SendMessageResponse response = handler.onMessageSend(request, callContext);
assertNull(response.getError());
- // The Python implementation returns a Task here, but then again they are using hardcoded mocks and
- // bypassing the whole EventQueue.
- // If we were to send a Task in agentExecutorExecute EventConsumer.consumeAll() would not exit due to
- // the Task not having a 'final' state
- //
- // See testOnMessageNewMessageWithExistingTaskSuccessMocks() for a test more similar to the Python implementation
Assertions.assertSame(message, response.getResult());
}
- @Test
- public void testOnMessageNewMessageWithExistingTaskSuccessMocks() {
- JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
- taskStore.save(MINIMAL_TASK);
-
- Message message = new Message.Builder(MESSAGE)
- .taskId(MINIMAL_TASK.getId())
- .contextId(MINIMAL_TASK.getContextId())
- .build();
- SendMessageRequest request = new SendMessageRequest("1", new MessageSendParams(message, null, null));
- SendMessageResponse response;
- try (MockedConstruction mocked = Mockito.mockConstruction(
- EventConsumer.class,
- (mock, context) -> {
- Mockito.doReturn(ZeroPublisher.fromItems(wrapEvent(MINIMAL_TASK))).when(mock).consumeAll();})){
- response = handler.onMessageSend(request, callContext);
- }
- assertNull(response.getError());
- Assertions.assertSame(MINIMAL_TASK, response.getResult());
-
- }
-
@Test
public void testOnMessageError() {
// See testMessageOnErrorMocks() for a test more similar to the Python implementation, using mocks for
@@ -338,9 +283,24 @@ public void onComplete() {
@Test
public void testOnMessageStreamNewMessageMultipleEventsSuccess() throws InterruptedException {
- JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
+ // Setup callback to wait for all 3 events to be processed by MainEventBusProcessor
+ CountDownLatch processingLatch = new CountDownLatch(3);
+ mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() {
+ @Override
+ public void onEventProcessed(String taskId, io.a2a.spec.Event event) {
+ processingLatch.countDown();
+ }
+
+ @Override
+ public void onTaskFinalized(String taskId) {
+ // Not needed for this test
+ }
+ });
+
+ try {
+ JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
- // Create multiple events to be sent during streaming
+ // Create multiple events to be sent during streaming
Task taskEvent = new Task.Builder(MINIMAL_TASK)
.status(new TaskStatus(TaskState.WORKING))
.build();
@@ -415,33 +375,40 @@ public void onComplete() {
}
});
- // Wait for all events to be received
- Assertions.assertTrue(latch.await(2, TimeUnit.SECONDS),
- "Expected to receive 3 events within timeout");
-
- // Assert no error occurred during streaming
- Assertions.assertNull(error.get(), "No error should occur during streaming");
-
- // Verify that all 3 events were received
- assertEquals(3, results.size(), "Should have received exactly 3 events");
-
- // Verify the first event is the task
- Task receivedTask = assertInstanceOf(Task.class, results.get(0), "First event should be a Task");
- assertEquals(MINIMAL_TASK.getId(), receivedTask.getId());
- assertEquals(MINIMAL_TASK.getContextId(), receivedTask.getContextId());
- assertEquals(TaskState.WORKING, receivedTask.getStatus().state());
-
- // Verify the second event is the artifact update
- TaskArtifactUpdateEvent receivedArtifact = assertInstanceOf(TaskArtifactUpdateEvent.class, results.get(1),
- "Second event should be a TaskArtifactUpdateEvent");
- assertEquals(MINIMAL_TASK.getId(), receivedArtifact.getTaskId());
- assertEquals("artifact-1", receivedArtifact.getArtifact().artifactId());
-
- // Verify the third event is the status update
- TaskStatusUpdateEvent receivedStatus = assertInstanceOf(TaskStatusUpdateEvent.class, results.get(2),
- "Third event should be a TaskStatusUpdateEvent");
- assertEquals(MINIMAL_TASK.getId(), receivedStatus.getTaskId());
- assertEquals(TaskState.COMPLETED, receivedStatus.getStatus().state());
+ // Wait for all events to be received (increased timeout for async processing)
+ Assertions.assertTrue(latch.await(10, TimeUnit.SECONDS),
+ "Expected to receive 3 events within timeout");
+
+ // Wait for MainEventBusProcessor to complete processing all 3 events
+ Assertions.assertTrue(processingLatch.await(5, TimeUnit.SECONDS),
+ "MainEventBusProcessor should have processed all 3 events");
+
+ // Assert no error occurred during streaming
+ Assertions.assertNull(error.get(), "No error should occur during streaming");
+
+ // Verify that all 3 events were received
+ assertEquals(3, results.size(), "Should have received exactly 3 events");
+
+ // Verify the first event is the task
+ Task receivedTask = assertInstanceOf(Task.class, results.get(0), "First event should be a Task");
+ assertEquals(MINIMAL_TASK.getId(), receivedTask.getId());
+ assertEquals(MINIMAL_TASK.getContextId(), receivedTask.getContextId());
+ assertEquals(TaskState.WORKING, receivedTask.getStatus().state());
+
+ // Verify the second event is the artifact update
+ TaskArtifactUpdateEvent receivedArtifact = assertInstanceOf(TaskArtifactUpdateEvent.class, results.get(1),
+ "Second event should be a TaskArtifactUpdateEvent");
+ assertEquals(MINIMAL_TASK.getId(), receivedArtifact.getTaskId());
+ assertEquals("artifact-1", receivedArtifact.getArtifact().artifactId());
+
+ // Verify the third event is the status update
+ TaskStatusUpdateEvent receivedStatus = assertInstanceOf(TaskStatusUpdateEvent.class, results.get(2),
+ "Third event should be a TaskStatusUpdateEvent");
+ assertEquals(MINIMAL_TASK.getId(), receivedStatus.getTaskId());
+ assertEquals(TaskState.COMPLETED, receivedStatus.getStatus().state());
+ } finally {
+ mainEventBusProcessor.setCallback(null);
+ }
}
@Test
@@ -710,32 +677,47 @@ public void testGetPushNotificationConfigSuccess() {
@Test
public void testOnMessageStreamNewMessageSendPushNotificationSuccess() throws Exception {
- JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
- taskStore.save(MINIMAL_TASK);
-
- List events = List.of(
- MINIMAL_TASK,
- new TaskArtifactUpdateEvent.Builder()
- .taskId(MINIMAL_TASK.getId())
- .contextId(MINIMAL_TASK.getContextId())
- .artifact(new Artifact.Builder()
- .artifactId("11")
- .parts(new TextPart("text"))
- .build())
- .build(),
- new TaskStatusUpdateEvent.Builder()
- .taskId(MINIMAL_TASK.getId())
- .contextId(MINIMAL_TASK.getContextId())
- .status(new TaskStatus(TaskState.COMPLETED))
- .build());
-
+ // Setup callback to wait for all 3 events to be processed by MainEventBusProcessor
+ CountDownLatch processingLatch = new CountDownLatch(3);
+ mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() {
+ @Override
+ public void onEventProcessed(String taskId, io.a2a.spec.Event event) {
+ processingLatch.countDown();
+ }
- agentExecutorExecute = (context, eventQueue) -> {
- // Hardcode the events to send here
- for (Event event : events) {
- eventQueue.enqueueEvent(event);
+ @Override
+ public void onTaskFinalized(String taskId) {
+ // Not needed for this test
}
- };
+ });
+
+ try {
+ JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
+ taskStore.save(MINIMAL_TASK);
+
+ List events = List.of(
+ MINIMAL_TASK,
+ new TaskArtifactUpdateEvent.Builder()
+ .taskId(MINIMAL_TASK.getId())
+ .contextId(MINIMAL_TASK.getContextId())
+ .artifact(new Artifact.Builder()
+ .artifactId("11")
+ .parts(new TextPart("text"))
+ .build())
+ .build(),
+ new TaskStatusUpdateEvent.Builder()
+ .taskId(MINIMAL_TASK.getId())
+ .contextId(MINIMAL_TASK.getContextId())
+ .status(new TaskStatus(TaskState.COMPLETED))
+ .build());
+
+
+ agentExecutorExecute = (context, eventQueue) -> {
+ // Hardcode the events to send here
+ for (Event event : events) {
+ eventQueue.enqueueEvent(event);
+ }
+ };
TaskPushNotificationConfig config = new TaskPushNotificationConfig(
@@ -786,6 +768,11 @@ public void onComplete() {
});
Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ // Wait for MainEventBusProcessor to complete processing all 3 events
+ Assertions.assertTrue(processingLatch.await(5, TimeUnit.SECONDS),
+ "MainEventBusProcessor should have processed all 3 events");
+
subscriptionRef.get().cancel();
assertEquals(3, results.size());
assertEquals(3, httpClient.tasks.size());
@@ -811,6 +798,9 @@ public void onComplete() {
assertEquals(1, curr.getArtifacts().size());
assertEquals(1, curr.getArtifacts().get(0).parts().size());
assertEquals("text", ((TextPart)curr.getArtifacts().get(0).parts().get(0)).getText());
+ } finally {
+ mainEventBusProcessor.setCallback(null);
+ }
}
@Test
@@ -1117,7 +1107,7 @@ public void testPushNotificationsNotSupportedError() {
public void testOnGetPushNotificationNoPushNotifierConfig() {
// Create request handler without a push notifier
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(
- executor, taskStore, queueManager, null, null, internalExecutor);
+ executor, taskStore, queueManager, null, internalExecutor);
AgentCard card = createAgentCard(false, true, false);
JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor);
@@ -1136,7 +1126,7 @@ public void testOnGetPushNotificationNoPushNotifierConfig() {
public void testOnSetPushNotificationNoPushNotifierConfig() {
// Create request handler without a push notifier
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(
- executor, taskStore, queueManager, null, null, internalExecutor);
+ executor, taskStore, queueManager, null, internalExecutor);
AgentCard card = createAgentCard(false, true, false);
JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor);
@@ -1228,7 +1218,7 @@ public void testDefaultRequestHandlerWithCustomComponents() {
@Test
public void testOnMessageSendErrorHandling() {
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(
- executor, taskStore, queueManager, null, null, internalExecutor);
+ executor, taskStore, queueManager, null, internalExecutor);
AgentCard card = createAgentCard(false, true, false);
JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor);
@@ -1274,16 +1264,31 @@ public void testOnMessageSendTaskIdMismatch() {
}
@Test
- public void testOnMessageStreamTaskIdMismatch() {
- JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
- taskStore.save(MINIMAL_TASK);
+ public void testOnMessageStreamTaskIdMismatch() throws InterruptedException {
+ // Setup callback to wait for the 1 event to be processed by MainEventBusProcessor
+ CountDownLatch processingLatch = new CountDownLatch(1);
+ mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() {
+ @Override
+ public void onEventProcessed(String taskId, io.a2a.spec.Event event) {
+ processingLatch.countDown();
+ }
- agentExecutorExecute = ((context, eventQueue) -> {
- eventQueue.enqueueEvent(MINIMAL_TASK);
+ @Override
+ public void onTaskFinalized(String taskId) {
+ // Not needed for this test
+ }
});
- SendStreamingMessageRequest request = new SendStreamingMessageRequest("1", new MessageSendParams(MESSAGE, null, null));
- Flow.Publisher response = handler.onMessageSendStream(request, callContext);
+ try {
+ JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
+ taskStore.save(MINIMAL_TASK);
+
+ agentExecutorExecute = ((context, eventQueue) -> {
+ eventQueue.enqueueEvent(MINIMAL_TASK);
+ });
+
+ SendStreamingMessageRequest request = new SendStreamingMessageRequest("1", new MessageSendParams(MESSAGE, null, null));
+ Flow.Publisher response = handler.onMessageSendStream(request, callContext);
CompletableFuture future = new CompletableFuture<>();
List results = new ArrayList<>();
@@ -1317,11 +1322,18 @@ public void onComplete() {
}
});
- future.join();
+ future.join();
- Assertions.assertNull(error.get());
- Assertions.assertEquals(1, results.size());
- Assertions.assertInstanceOf(InternalError.class, results.get(0).getError());
+ // Wait for MainEventBusProcessor to complete processing the event
+ Assertions.assertTrue(processingLatch.await(5, TimeUnit.SECONDS),
+ "MainEventBusProcessor should have processed the event");
+
+ Assertions.assertNull(error.get());
+ Assertions.assertEquals(1, results.size());
+ Assertions.assertInstanceOf(InternalError.class, results.get(0).getError());
+ } finally {
+ mainEventBusProcessor.setCallback(null);
+ }
}
@Test
@@ -1381,7 +1393,7 @@ public void testListPushNotificationConfigNotSupported() {
@Test
public void testListPushNotificationConfigNoPushConfigStore() {
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(
- executor, taskStore, queueManager, null, null, internalExecutor);
+ executor, taskStore, queueManager, null, internalExecutor);
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK);
agentExecutorExecute = (context, eventQueue) -> {
@@ -1473,7 +1485,7 @@ public void testDeletePushNotificationConfigNotSupported() {
@Test
public void testDeletePushNotificationConfigNoPushConfigStore() {
DefaultRequestHandler requestHandler = DefaultRequestHandler.create(
- executor, taskStore, queueManager, null, null, internalExecutor);
+ executor, taskStore, queueManager, null, internalExecutor);
JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor);
taskStore.save(MINIMAL_TASK);
agentExecutorExecute = (context, eventQueue) -> {