diff --git a/extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java b/extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java index 5e90ed00d..517b0cda4 100644 --- a/extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java +++ b/extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java @@ -17,6 +17,7 @@ import io.a2a.server.events.EventQueueFactory; import io.a2a.server.events.EventQueueItem; import io.a2a.server.events.InMemoryQueueManager; +import io.a2a.server.events.MainEventBus; import io.a2a.server.events.QueueManager; @ApplicationScoped @@ -32,10 +33,12 @@ public class ReplicatedQueueManager implements QueueManager { private TaskStateProvider taskStateProvider; @Inject - public ReplicatedQueueManager(ReplicationStrategy replicationStrategy, TaskStateProvider taskStateProvider) { + public ReplicatedQueueManager(ReplicationStrategy replicationStrategy, + TaskStateProvider taskStateProvider, + MainEventBus mainEventBus) { this.replicationStrategy = replicationStrategy; this.taskStateProvider = taskStateProvider; - this.delegate = new InMemoryQueueManager(new ReplicatingEventQueueFactory(), taskStateProvider); + this.delegate = new InMemoryQueueManager(new ReplicatingEventQueueFactory(), taskStateProvider, mainEventBus); } @@ -139,12 +142,11 @@ public EventQueue.EventQueueBuilder builder(String taskId) { // which sends the QueueClosedEvent after the database transaction commits. // This ensures proper ordering and transactional guarantees. - // Return the builder with callbacks - return delegate.getEventQueueBuilder(taskId) - .taskId(taskId) - .hook(new ReplicationHook(taskId)) - .addOnCloseCallback(delegate.getCleanupCallback(taskId)) - .taskStateProvider(taskStateProvider); + // Call createBaseEventQueueBuilder() directly to avoid infinite recursion + // (getEventQueueBuilder() would delegate back to this factory, creating a loop) + // The base builder already includes: taskId, cleanup callback, taskStateProvider, mainEventBus + return delegate.createBaseEventQueueBuilder(taskId) + .hook(new ReplicationHook(taskId)); } } diff --git a/extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java b/extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java index 42454ea3b..912564577 100644 --- a/extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java +++ b/extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java @@ -17,17 +17,23 @@ import java.util.concurrent.atomic.AtomicInteger; import io.a2a.extras.common.events.TaskFinalizedEvent; +import io.a2a.json.JsonUtil; import io.a2a.server.events.EventQueue; import io.a2a.server.events.EventQueueClosedException; import io.a2a.server.events.EventQueueItem; import io.a2a.server.events.EventQueueTestHelper; +import io.a2a.server.events.EventQueueUtil; +import io.a2a.server.events.MainEventBus; +import io.a2a.server.events.MainEventBusProcessor; import io.a2a.server.events.QueueClosedEvent; +import io.a2a.server.tasks.InMemoryTaskStore; +import io.a2a.server.tasks.PushNotificationSender; import io.a2a.spec.Event; import io.a2a.spec.StreamingEventKind; import io.a2a.spec.TaskState; import io.a2a.spec.TaskStatus; import io.a2a.spec.TaskStatusUpdateEvent; -import io.a2a.json.JsonUtil; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -35,10 +41,24 @@ class ReplicatedQueueManagerTest { private ReplicatedQueueManager queueManager; private StreamingEventKind testEvent; + private MainEventBus mainEventBus; + private MainEventBusProcessor mainEventBusProcessor; + private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {}; @BeforeEach void setUp() { - queueManager = new ReplicatedQueueManager(new NoOpReplicationStrategy(), new MockTaskStateProvider(true)); + // Create MainEventBus and MainEventBusProcessor for tests + InMemoryTaskStore taskStore = new InMemoryTaskStore(); + mainEventBus = new MainEventBus(); + mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER); + EventQueueUtil.start(mainEventBusProcessor); + + queueManager = new ReplicatedQueueManager( + new NoOpReplicationStrategy(), + new MockTaskStateProvider(true), + mainEventBus + ); + testEvent = new TaskStatusUpdateEvent.Builder() .taskId("test-task") .contextId("test-context") @@ -47,10 +67,65 @@ void setUp() { .build(); } + /** + * Helper to create a test event with the specified taskId. + * This ensures taskId consistency between queue creation and event creation. + */ + private TaskStatusUpdateEvent createEventForTask(String taskId) { + return new TaskStatusUpdateEvent.Builder() + .taskId(taskId) + .contextId("test-context") + .status(new TaskStatus(TaskState.SUBMITTED)) + .isFinal(false) + .build(); + } + + @AfterEach + void tearDown() { + if (mainEventBusProcessor != null) { + mainEventBusProcessor.setCallback(null); // Clear any test callbacks + EventQueueUtil.stop(mainEventBusProcessor); + } + mainEventBusProcessor = null; + mainEventBus = null; + queueManager = null; + } + + /** + * Helper to wait for MainEventBusProcessor to process an event. + * Replaces polling patterns with deterministic callback-based waiting. + * + * @param action the action that triggers event processing + * @throws InterruptedException if waiting is interrupted + * @throws AssertionError if processing doesn't complete within timeout + */ + private void waitForEventProcessing(Runnable action) throws InterruptedException { + CountDownLatch processingLatch = new CountDownLatch(1); + mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() { + @Override + public void onEventProcessed(String taskId, io.a2a.spec.Event event) { + processingLatch.countDown(); + } + + @Override + public void onTaskFinalized(String taskId) { + // Not needed for basic event processing wait + } + }); + + try { + action.run(); + assertTrue(processingLatch.await(5, TimeUnit.SECONDS), + "MainEventBusProcessor should have processed the event within timeout"); + } finally { + mainEventBusProcessor.setCallback(null); + } + } + @Test void testReplicationStrategyTriggeredOnNormalEnqueue() throws InterruptedException { CountingReplicationStrategy strategy = new CountingReplicationStrategy(); - queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true)); + queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true), mainEventBus); String taskId = "test-task-1"; EventQueue queue = queueManager.createOrTap(taskId); @@ -65,7 +140,7 @@ void testReplicationStrategyTriggeredOnNormalEnqueue() throws InterruptedExcepti @Test void testReplicationStrategyNotTriggeredOnReplicatedEvent() throws InterruptedException { CountingReplicationStrategy strategy = new CountingReplicationStrategy(); - queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true)); + queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true), mainEventBus); String taskId = "test-task-2"; EventQueue queue = queueManager.createOrTap(taskId); @@ -79,7 +154,7 @@ void testReplicationStrategyNotTriggeredOnReplicatedEvent() throws InterruptedEx @Test void testReplicationStrategyWithCountingImplementation() throws InterruptedException { CountingReplicationStrategy countingStrategy = new CountingReplicationStrategy(); - queueManager = new ReplicatedQueueManager(countingStrategy, new MockTaskStateProvider(true)); + queueManager = new ReplicatedQueueManager(countingStrategy, new MockTaskStateProvider(true), mainEventBus); String taskId = "test-task-3"; EventQueue queue = queueManager.createOrTap(taskId); @@ -100,46 +175,45 @@ void testReplicationStrategyWithCountingImplementation() throws InterruptedExcep @Test void testReplicatedEventDeliveredToCorrectQueue() throws InterruptedException { String taskId = "test-task-4"; + TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId EventQueue queue = queueManager.createOrTap(taskId); - ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent); - queueManager.onReplicatedEvent(replicatedEvent); + ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, eventForTask); - Event dequeuedEvent; - try { - dequeuedEvent = queue.dequeueEventItem(100).getEvent(); - } catch (EventQueueClosedException e) { - fail("Queue should not be closed"); - return; - } - assertEquals(testEvent, dequeuedEvent); + // Use callback to wait for event processing + EventQueueItem item = dequeueEventWithRetry(queue, () -> queueManager.onReplicatedEvent(replicatedEvent)); + assertNotNull(item, "Event should be available in queue"); + Event dequeuedEvent = item.getEvent(); + assertEquals(eventForTask, dequeuedEvent); } @Test void testReplicatedEventCreatesQueueIfNeeded() throws InterruptedException { String taskId = "non-existent-task"; + TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId // Verify no queue exists initially assertNull(queueManager.get(taskId)); - ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent); - - // Process the replicated event - assertDoesNotThrow(() -> queueManager.onReplicatedEvent(replicatedEvent)); - - // Verify that a queue was created and the event was enqueued - EventQueue queue = queueManager.get(taskId); - assertNotNull(queue, "Queue should be created when processing replicated event for non-existent task"); - - // Verify the event was enqueued by dequeuing it - Event dequeuedEvent; - try { - dequeuedEvent = queue.dequeueEventItem(100).getEvent(); - } catch (EventQueueClosedException e) { - fail("Queue should not be closed"); - return; - } - assertEquals(testEvent, dequeuedEvent, "The replicated event should be enqueued in the newly created queue"); + // Create a ChildQueue BEFORE processing the replicated event + // This ensures the ChildQueue exists when MainEventBusProcessor distributes the event + EventQueue childQueue = queueManager.createOrTap(taskId); + assertNotNull(childQueue, "ChildQueue should be created"); + + // Verify MainQueue was created + EventQueue mainQueue = queueManager.get(taskId); + assertNotNull(mainQueue, "MainQueue should exist after createOrTap"); + + ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, eventForTask); + + // Process the replicated event and wait for distribution + // Use callback to wait for event processing + EventQueueItem item = dequeueEventWithRetry(childQueue, () -> { + assertDoesNotThrow(() -> queueManager.onReplicatedEvent(replicatedEvent)); + }); + assertNotNull(item, "Event should be available in queue"); + Event dequeuedEvent = item.getEvent(); + assertEquals(eventForTask, dequeuedEvent, "The replicated event should be enqueued in the newly created queue"); } @Test @@ -170,7 +244,7 @@ void testBasicQueueManagerFunctionality() throws InterruptedException { void testQueueToTaskIdMappingMaintained() throws InterruptedException { String taskId = "test-task-6"; CountingReplicationStrategy countingStrategy = new CountingReplicationStrategy(); - queueManager = new ReplicatedQueueManager(countingStrategy, new MockTaskStateProvider(true)); + queueManager = new ReplicatedQueueManager(countingStrategy, new MockTaskStateProvider(true), mainEventBus); EventQueue queue = queueManager.createOrTap(taskId); queue.enqueueEvent(testEvent); @@ -217,7 +291,7 @@ void testReplicatedEventJsonSerialization() throws Exception { @Test void testParallelReplicationBehavior() throws InterruptedException { CountingReplicationStrategy strategy = new CountingReplicationStrategy(); - queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true)); + queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true), mainEventBus); String taskId = "parallel-test-task"; EventQueue queue = queueManager.createOrTap(taskId); @@ -297,7 +371,7 @@ void testParallelReplicationBehavior() throws InterruptedException { void testReplicatedEventSkippedWhenTaskInactive() throws InterruptedException { // Create a task state provider that returns false (task is inactive) MockTaskStateProvider stateProvider = new MockTaskStateProvider(false); - queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider); + queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider, mainEventBus); String taskId = "inactive-task"; @@ -316,30 +390,32 @@ void testReplicatedEventSkippedWhenTaskInactive() throws InterruptedException { void testReplicatedEventProcessedWhenTaskActive() throws InterruptedException { // Create a task state provider that returns true (task is active) MockTaskStateProvider stateProvider = new MockTaskStateProvider(true); - queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider); + queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider, mainEventBus); String taskId = "active-task"; + TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId // Verify no queue exists initially assertNull(queueManager.get(taskId)); - // Process a replicated event for an active task - ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent); - queueManager.onReplicatedEvent(replicatedEvent); + // Create a ChildQueue BEFORE processing the replicated event + // This ensures the ChildQueue exists when MainEventBusProcessor distributes the event + EventQueue childQueue = queueManager.createOrTap(taskId); + assertNotNull(childQueue, "ChildQueue should be created"); - // Queue should be created and event should be enqueued - EventQueue queue = queueManager.get(taskId); - assertNotNull(queue, "Queue should be created for active task"); + // Verify MainQueue was created + EventQueue mainQueue = queueManager.get(taskId); + assertNotNull(mainQueue, "MainQueue should exist after createOrTap"); - // Verify the event was enqueued - Event dequeuedEvent; - try { - dequeuedEvent = queue.dequeueEventItem(100).getEvent(); - } catch (EventQueueClosedException e) { - fail("Queue should not be closed"); - return; - } - assertEquals(testEvent, dequeuedEvent, "Event should be enqueued for active task"); + // Process a replicated event for an active task + ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, eventForTask); + + // Verify the event was enqueued and distributed to our ChildQueue + // Use callback to wait for event processing + EventQueueItem item = dequeueEventWithRetry(childQueue, () -> queueManager.onReplicatedEvent(replicatedEvent)); + assertNotNull(item, "Event should be available in queue"); + Event dequeuedEvent = item.getEvent(); + assertEquals(eventForTask, dequeuedEvent, "Event should be enqueued for active task"); } @@ -347,7 +423,7 @@ void testReplicatedEventProcessedWhenTaskActive() throws InterruptedException { void testReplicatedEventToExistingQueueWhenTaskBecomesInactive() throws InterruptedException { // Create a task state provider that returns true initially MockTaskStateProvider stateProvider = new MockTaskStateProvider(true); - queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider); + queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider, mainEventBus); String taskId = "task-becomes-inactive"; @@ -387,7 +463,7 @@ void testReplicatedEventToExistingQueueWhenTaskBecomesInactive() throws Interrup @Test void testPoisonPillSentViaTransactionAwareEvent() throws InterruptedException { CountingReplicationStrategy strategy = new CountingReplicationStrategy(); - queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true)); + queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true), mainEventBus); String taskId = "poison-pill-test"; EventQueue queue = queueManager.createOrTap(taskId); @@ -451,36 +527,21 @@ void testQueueClosedEventJsonSerialization() throws Exception { @Test void testReplicatedQueueClosedEventTerminatesConsumer() throws InterruptedException { String taskId = "remote-close-test"; + TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId EventQueue queue = queueManager.createOrTap(taskId); - // Enqueue a normal event - queue.enqueueEvent(testEvent); - // Simulate receiving QueueClosedEvent from remote node QueueClosedEvent closedEvent = new QueueClosedEvent(taskId); ReplicatedEventQueueItem replicatedClosedEvent = new ReplicatedEventQueueItem(taskId, closedEvent); - queueManager.onReplicatedEvent(replicatedClosedEvent); - // Dequeue the normal event first - EventQueueItem item1; - try { - item1 = queue.dequeueEventItem(100); - } catch (EventQueueClosedException e) { - fail("Should not throw on first dequeue"); - return; - } - assertNotNull(item1); - assertEquals(testEvent, item1.getEvent()); + // Dequeue the normal event first (use callback to wait for async processing) + EventQueueItem item1 = dequeueEventWithRetry(queue, () -> queue.enqueueEvent(eventForTask)); + assertNotNull(item1, "First event should be available"); + assertEquals(eventForTask, item1.getEvent()); - // Next dequeue should get the QueueClosedEvent - EventQueueItem item2; - try { - item2 = queue.dequeueEventItem(100); - } catch (EventQueueClosedException e) { - fail("Should not throw on second dequeue, should return the event"); - return; - } - assertNotNull(item2); + // Next dequeue should get the QueueClosedEvent (use callback to wait for async processing) + EventQueueItem item2 = dequeueEventWithRetry(queue, () -> queueManager.onReplicatedEvent(replicatedClosedEvent)); + assertNotNull(item2, "QueueClosedEvent should be available"); assertTrue(item2.getEvent() instanceof QueueClosedEvent, "Second event should be QueueClosedEvent"); } @@ -539,4 +600,25 @@ public void setActive(boolean active) { this.active = active; } } + + /** + * Helper method to dequeue an event after waiting for MainEventBusProcessor distribution. + * Uses callback-based waiting instead of polling for deterministic synchronization. + * + * @param queue the queue to dequeue from + * @param enqueueAction the action that enqueues the event (triggers event processing) + * @return the dequeued EventQueueItem, or null if queue is closed + */ + private EventQueueItem dequeueEventWithRetry(EventQueue queue, Runnable enqueueAction) throws InterruptedException { + // Wait for event to be processed and distributed + waitForEventProcessing(enqueueAction); + + // Event is now available, dequeue directly + try { + return queue.dequeueEventItem(100); + } catch (EventQueueClosedException e) { + // Queue closed, return null + return null; + } + } } \ No newline at end of file diff --git a/extras/queue-manager-replicated/core/src/test/java/io/a2a/server/events/EventQueueUtil.java b/extras/queue-manager-replicated/core/src/test/java/io/a2a/server/events/EventQueueUtil.java new file mode 100644 index 000000000..a91575aaa --- /dev/null +++ b/extras/queue-manager-replicated/core/src/test/java/io/a2a/server/events/EventQueueUtil.java @@ -0,0 +1,11 @@ +package io.a2a.server.events; + +public class EventQueueUtil { + public static void start(MainEventBusProcessor processor) { + processor.start(); + } + + public static void stop(MainEventBusProcessor processor) { + processor.stop(); + } +} diff --git a/reference/jsonrpc/src/test/resources/application.properties b/reference/jsonrpc/src/test/resources/application.properties index 7b9cea9cc..e612925d4 100644 --- a/reference/jsonrpc/src/test/resources/application.properties +++ b/reference/jsonrpc/src/test/resources/application.properties @@ -1 +1,6 @@ quarkus.arc.selected-alternatives=io.a2a.server.apps.common.TestHttpClient + +# Debug logging for event processing and request handling +quarkus.log.category."io.a2a.server.events".level=DEBUG +quarkus.log.category."io.a2a.server.requesthandlers".level=DEBUG +quarkus.log.category."io.a2a.server.tasks".level=DEBUG diff --git a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java index ccf9f0ce9..2d3d49fc8 100644 --- a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java +++ b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java @@ -58,13 +58,20 @@ public Flow.Publisher consumeAll() { EventQueueItem item; Event event; try { + LOGGER.debug("EventConsumer polling queue {} (error={})", System.identityHashCode(queue), error); item = queue.dequeueEventItem(QUEUE_WAIT_MILLISECONDS); if (item == null) { + LOGGER.debug("EventConsumer poll timeout (null item), continuing"); continue; } event = item.getEvent(); + LOGGER.debug("EventConsumer received event: {} (queue={})", + event.getClass().getSimpleName(), System.identityHashCode(queue)); + // Defensive logging for error handling if (event instanceof Throwable thr) { + LOGGER.info("EventConsumer detected Throwable event: {} - triggering tube.fail()", + thr.getClass().getSimpleName()); tube.fail(thr); return; } @@ -121,8 +128,13 @@ public Flow.Publisher consumeAll() { public EnhancedRunnable.DoneCallback createAgentRunnableDoneCallback() { return agentRunnable -> { + LOGGER.info("EventConsumer: Agent done callback invoked (hasError={}, queue={})", + agentRunnable.getError() != null, System.identityHashCode(queue)); if (agentRunnable.getError() != null) { error = agentRunnable.getError(); + LOGGER.info("EventConsumer: Set error field from agent callback"); + } else { + LOGGER.info("EventConsumer: Agent completed successfully (no error), continuing consumption"); } }; } diff --git a/server-common/src/main/java/io/a2a/server/events/EventQueue.java b/server-common/src/main/java/io/a2a/server/events/EventQueue.java index 6a8a154ac..c73b26e22 100644 --- a/server-common/src/main/java/io/a2a/server/events/EventQueue.java +++ b/server-common/src/main/java/io/a2a/server/events/EventQueue.java @@ -21,8 +21,6 @@ public abstract class EventQueue implements AutoCloseable { public static final int DEFAULT_QUEUE_SIZE = 1000; private final int queueSize; - protected final BlockingQueue queue = new LinkedBlockingDeque<>(); - protected final Semaphore semaphore; private volatile boolean closed = false; protected EventQueue() { @@ -34,7 +32,6 @@ protected EventQueue(int queueSize) { throw new IllegalArgumentException("Queue size must be greater than 0"); } this.queueSize = queueSize; - this.semaphore = new Semaphore(queueSize, true); LOGGER.trace("Creating {} with queue size: {}", this, queueSize); } @@ -43,8 +40,8 @@ protected EventQueue(EventQueue parent) { LOGGER.trace("Creating {}, parent: {}", this, parent); } - static EventQueueBuilder builder() { - return new EventQueueBuilder(); + static EventQueueBuilder builder(MainEventBus mainEventBus) { + return new EventQueueBuilder().mainEventBus(mainEventBus); } public static class EventQueueBuilder { @@ -53,6 +50,7 @@ public static class EventQueueBuilder { private String taskId; private List onCloseCallbacks = new java.util.ArrayList<>(); private TaskStateProvider taskStateProvider; + private MainEventBus mainEventBus; public EventQueueBuilder queueSize(int queueSize) { this.queueSize = queueSize; @@ -81,12 +79,17 @@ public EventQueueBuilder taskStateProvider(TaskStateProvider taskStateProvider) return this; } + public EventQueueBuilder mainEventBus(MainEventBus mainEventBus) { + this.mainEventBus = mainEventBus; + return this; + } + public EventQueue build() { - if (hook != null || !onCloseCallbacks.isEmpty() || taskStateProvider != null) { - return new MainQueue(queueSize, hook, taskId, onCloseCallbacks, taskStateProvider); - } else { - return new MainQueue(queueSize); + // MainEventBus is now REQUIRED - enforce single architectural path + if (mainEventBus == null) { + throw new IllegalStateException("MainEventBus is required for EventQueue creation"); } + return new MainQueue(queueSize, hook, taskId, onCloseCallbacks, taskStateProvider, mainEventBus); } } @@ -102,22 +105,7 @@ public void enqueueEvent(Event event) { enqueueItem(new LocalEventQueueItem(event)); } - public void enqueueItem(EventQueueItem item) { - Event event = item.getEvent(); - if (closed) { - LOGGER.warn("Queue is closed. Event will not be enqueued. {} {}", this, event); - return; - } - // Call toString() since for errors we don't really want the full stacktrace - try { - semaphore.acquire(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Unable to acquire the semaphore to enqueue the event", e); - } - queue.add(item); - LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this); - } + public abstract void enqueueItem(EventQueueItem item); public abstract EventQueue tap(); @@ -127,53 +115,32 @@ public void enqueueItem(EventQueueItem item) { * This method returns the full EventQueueItem wrapper, allowing callers to check * metadata like whether the event is replicated via {@link EventQueueItem#isReplicated()}. *

+ *

+ * Note: MainQueue does not support dequeue operations - only ChildQueues can be consumed. + *

* * @param waitMilliSeconds the maximum time to wait in milliseconds * @return the EventQueueItem, or null if timeout occurs * @throws EventQueueClosedException if the queue is closed and empty + * @throws UnsupportedOperationException if called on MainQueue */ - public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueClosedException { - if (closed && queue.isEmpty()) { - LOGGER.debug("Queue is closed, and empty. Sending termination message. {}", this); - throw new EventQueueClosedException(); - } - try { - if (waitMilliSeconds <= 0) { - EventQueueItem item = queue.poll(); - if (item != null) { - Event event = item.getEvent(); - // Call toString() since for errors we don't really want the full stacktrace - LOGGER.debug("Dequeued event item (no wait) {} {}", this, event instanceof Throwable ? event.toString() : event); - semaphore.release(); - } - return item; - } - try { - LOGGER.trace("Polling queue {} (wait={}ms)", System.identityHashCode(this), waitMilliSeconds); - EventQueueItem item = queue.poll(waitMilliSeconds, TimeUnit.MILLISECONDS); - if (item != null) { - Event event = item.getEvent(); - // Call toString() since for errors we don't really want the full stacktrace - LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event); - semaphore.release(); - } else { - LOGGER.trace("Dequeue timeout (null) from queue {}", System.identityHashCode(this)); - } - return item; - } catch (InterruptedException e) { - LOGGER.debug("Interrupted dequeue (waiting) {}", this); - Thread.currentThread().interrupt(); - return null; - } - } finally { - signalQueuePollerStarted(); - } - } + public abstract EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueClosedException; public void taskDone() { // TODO Not sure if needed yet. BlockingQueue.poll()/.take() remove the events. } + /** + * Returns the current size of the queue. + *

+ * For MainQueue: returns the size of the MainEventBus queue (events pending persistence/distribution). + * For ChildQueue: returns the size of the local consumption queue. + *

+ * + * @return the number of events currently in the queue + */ + public abstract int size(); + public abstract void close(); public abstract void close(boolean immediate); @@ -205,63 +172,29 @@ protected void doClose(boolean immediate) { LOGGER.debug("Closing {} (immediate={})", this, immediate); closed = true; } - - if (immediate) { - // Immediate close: clear pending events - queue.clear(); - LOGGER.debug("Cleared queue for immediate close: {}", this); - } - // For graceful close, let the queue drain naturally through normal consumption + // Subclasses handle immediate close logic (e.g., ChildQueue clears its local queue) } static class MainQueue extends EventQueue { private final List children = new CopyOnWriteArrayList<>(); + protected final Semaphore semaphore; private final CountDownLatch pollingStartedLatch = new CountDownLatch(1); private final AtomicBoolean pollingStarted = new AtomicBoolean(false); private final EventEnqueueHook enqueueHook; private final String taskId; private final List onCloseCallbacks; private final TaskStateProvider taskStateProvider; + private final MainEventBus mainEventBus; - MainQueue() { - super(); - this.enqueueHook = null; - this.taskId = null; - this.onCloseCallbacks = List.of(); - this.taskStateProvider = null; - } - - MainQueue(int queueSize) { - super(queueSize); - this.enqueueHook = null; - this.taskId = null; - this.onCloseCallbacks = List.of(); - this.taskStateProvider = null; - } - - MainQueue(EventEnqueueHook hook) { - super(); - this.enqueueHook = hook; - this.taskId = null; - this.onCloseCallbacks = List.of(); - this.taskStateProvider = null; - } - - MainQueue(int queueSize, EventEnqueueHook hook) { - super(queueSize); - this.enqueueHook = hook; - this.taskId = null; - this.onCloseCallbacks = List.of(); - this.taskStateProvider = null; - } - - MainQueue(int queueSize, EventEnqueueHook hook, String taskId, List onCloseCallbacks, TaskStateProvider taskStateProvider) { + MainQueue(int queueSize, EventEnqueueHook hook, String taskId, List onCloseCallbacks, TaskStateProvider taskStateProvider, MainEventBus mainEventBus) { super(queueSize); + this.semaphore = new Semaphore(queueSize, true); this.enqueueHook = hook; this.taskId = taskId; this.onCloseCallbacks = List.copyOf(onCloseCallbacks); // Defensive copy this.taskStateProvider = taskStateProvider; - LOGGER.debug("Created MainQueue for task {} with {} onClose callbacks and TaskStateProvider: {}", + this.mainEventBus = java.util.Objects.requireNonNull(mainEventBus, "MainEventBus is required"); + LOGGER.debug("Created MainQueue for task {} with {} onClose callbacks, TaskStateProvider: {}, MainEventBus configured", taskId, onCloseCallbacks.size(), taskStateProvider != null); } @@ -271,6 +204,25 @@ public EventQueue tap() { return child; } + /** + * Returns the current number of child queues. + * Useful for debugging and logging event distribution. + */ + public int getChildCount() { + return children.size(); + } + + @Override + public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueClosedException { + throw new UnsupportedOperationException("MainQueue cannot be consumed directly - use tap() to create a ChildQueue for consumption"); + } + + @Override + public int size() { + // Return size of MainEventBus queue (events pending persistence/distribution) + return mainEventBus.size(); + } + @Override public void enqueueItem(EventQueueItem item) { // MainQueue must accept events even when closed to support: @@ -289,14 +241,13 @@ public void enqueueItem(EventQueueItem item) { throw new RuntimeException("Unable to acquire the semaphore to enqueue the event", e); } - // Add to this MainQueue's internal queue - queue.add(item); LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this); - // Distribute to all ChildQueues (they will receive the event even if MainQueue is closed) - children.forEach(eq -> eq.internalEnqueueItem(item)); + // Submit to MainEventBus for centralized persistence + distribution + // MainEventBus is guaranteed non-null by constructor requirement + mainEventBus.submit(taskId, this, item); - // Trigger replication hook if configured + // Trigger replication hook if configured (for inter-process replication) if (enqueueHook != null) { enqueueHook.onEnqueue(item); } @@ -350,6 +301,36 @@ void childClosing(ChildQueue child, boolean immediate) { this.doClose(immediate); } + /** + * Distribute event to all ChildQueues. + * Called by MainEventBusProcessor after TaskStore persistence. + */ + void distributeToChildren(EventQueueItem item) { + int childCount = children.size(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("MainQueue[{}]: Distributing event {} to {} children", + taskId, item.getEvent().getClass().getSimpleName(), childCount); + } + children.forEach(child -> { + LOGGER.debug("MainQueue[{}]: Enqueueing event {} to child queue", + taskId, item.getEvent().getClass().getSimpleName()); + child.internalEnqueueItem(item); + }); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("MainQueue[{}]: Completed distribution of {} to {} children", + taskId, item.getEvent().getClass().getSimpleName(), childCount); + } + } + + /** + * Release the semaphore after event processing is complete. + * Called by MainEventBusProcessor in finally block to ensure release even on exceptions. + * Balances the acquire() in enqueueEvent() - protects MainEventBus throughput. + */ + void releaseSemaphore() { + semaphore.release(); + } + /** * Get the count of active child queues. * Used for testing to verify reference counting mechanism. @@ -400,6 +381,7 @@ public void close(boolean immediate, boolean notifyParent) { static class ChildQueue extends EventQueue { private final MainQueue parent; + private final BlockingQueue queue = new LinkedBlockingDeque<>(); public ChildQueue(MainQueue parent) { this.parent = parent; @@ -410,8 +392,61 @@ public void enqueueEvent(Event event) { parent.enqueueEvent(event); } + @Override + public void enqueueItem(EventQueueItem item) { + // ChildQueue delegates writes to parent MainQueue + parent.enqueueItem(item); + } + private void internalEnqueueItem(EventQueueItem item) { - super.enqueueItem(item); + // Internal method called by MainEventBusProcessor to add to local queue + // Note: Semaphore is managed by parent MainQueue (acquire/release), not ChildQueue + Event event = item.getEvent(); + if (isClosed()) { + LOGGER.warn("ChildQueue is closed. Event will not be enqueued. {} {}", this, event); + return; + } + if (!queue.offer(item)) { + LOGGER.warn("ChildQueue {} is full. Closing immediately.", this); + close(true); // immediate close + } else { + LOGGER.debug("Enqueued event {} {}", event instanceof Throwable ? event.toString() : event, this); + } + } + + @Override + public EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueClosedException { + if (isClosed() && queue.isEmpty()) { + LOGGER.debug("ChildQueue is closed, and empty. Sending termination message. {}", this); + throw new EventQueueClosedException(); + } + try { + if (waitMilliSeconds <= 0) { + EventQueueItem item = queue.poll(); + if (item != null) { + Event event = item.getEvent(); + LOGGER.debug("Dequeued event item (no wait) {} {}", this, event instanceof Throwable ? event.toString() : event); + } + return item; + } + try { + LOGGER.trace("Polling ChildQueue {} (wait={}ms)", System.identityHashCode(this), waitMilliSeconds); + EventQueueItem item = queue.poll(waitMilliSeconds, TimeUnit.MILLISECONDS); + if (item != null) { + Event event = item.getEvent(); + LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event); + } else { + LOGGER.trace("Dequeue timeout (null) from ChildQueue {}", System.identityHashCode(this)); + } + return item; + } catch (InterruptedException e) { + LOGGER.debug("Interrupted dequeue (waiting) {}", this); + Thread.currentThread().interrupt(); + return null; + } + } finally { + signalQueuePollerStarted(); + } } @Override @@ -419,6 +454,12 @@ public EventQueue tap() { throw new IllegalStateException("Can only tap the main queue"); } + @Override + public int size() { + // Return size of local consumption queue + return queue.size(); + } + @Override public void awaitQueuePollerStart() throws InterruptedException { parent.awaitQueuePollerStart(); @@ -429,6 +470,18 @@ public void signalQueuePollerStarted() { parent.signalQueuePollerStarted(); } + @Override + protected void doClose(boolean immediate) { + super.doClose(immediate); // Sets closed flag + if (immediate) { + // Immediate close: clear pending events from local queue + int clearedCount = queue.size(); + queue.clear(); + LOGGER.debug("Cleared {} events from ChildQueue for immediate close: {}", clearedCount, this); + } + // For graceful close, let the queue drain naturally through normal consumption + } + @Override public void close() { close(false); diff --git a/server-common/src/main/java/io/a2a/server/events/InMemoryQueueManager.java b/server-common/src/main/java/io/a2a/server/events/InMemoryQueueManager.java index 1383d058e..4360a6210 100644 --- a/server-common/src/main/java/io/a2a/server/events/InMemoryQueueManager.java +++ b/server-common/src/main/java/io/a2a/server/events/InMemoryQueueManager.java @@ -18,16 +18,20 @@ public class InMemoryQueueManager implements QueueManager { private final EventQueueFactory factory; private final TaskStateProvider taskStateProvider; + MainEventBus mainEventBus; + @Inject - public InMemoryQueueManager(TaskStateProvider taskStateProvider) { + public InMemoryQueueManager(TaskStateProvider taskStateProvider, MainEventBus mainEventBus) { + this.mainEventBus = mainEventBus; this.factory = new DefaultEventQueueFactory(); this.taskStateProvider = taskStateProvider; } - // For testing with custom factory - public InMemoryQueueManager(EventQueueFactory factory, TaskStateProvider taskStateProvider) { + // For testing/extensions with custom factory and MainEventBus + public InMemoryQueueManager(EventQueueFactory factory, TaskStateProvider taskStateProvider, MainEventBus mainEventBus) { this.factory = factory; this.taskStateProvider = taskStateProvider; + this.mainEventBus = mainEventBus; } @Override @@ -109,6 +113,12 @@ public void awaitQueuePollerStart(EventQueue eventQueue) throws InterruptedExcep eventQueue.awaitQueuePollerStart(); } + @Override + public EventQueue.EventQueueBuilder getEventQueueBuilder(String taskId) { + // Use the factory to ensure proper configuration (MainEventBus, callbacks, etc.) + return factory.builder(taskId); + } + @Override public int getActiveChildQueueCount(String taskId) { EventQueue queue = queues.get(taskId); @@ -123,6 +133,14 @@ public int getActiveChildQueueCount(String taskId) { return -1; } + @Override + public EventQueue.EventQueueBuilder createBaseEventQueueBuilder(String taskId) { + return EventQueue.builder(mainEventBus) + .taskId(taskId) + .addOnCloseCallback(getCleanupCallback(taskId)) + .taskStateProvider(taskStateProvider); + } + /** * Get the cleanup callback that removes a queue from the map when it closes. * This is exposed so that subclasses (like ReplicatedQueueManager) can reuse @@ -162,11 +180,8 @@ public Runnable getCleanupCallback(String taskId) { private class DefaultEventQueueFactory implements EventQueueFactory { @Override public EventQueue.EventQueueBuilder builder(String taskId) { - // Return builder with callback that removes queue from map when closed - return EventQueue.builder() - .taskId(taskId) - .addOnCloseCallback(getCleanupCallback(taskId)) - .taskStateProvider(taskStateProvider); + // Delegate to the base builder creation method + return createBaseEventQueueBuilder(taskId); } } } diff --git a/server-common/src/main/java/io/a2a/server/events/MainEventBus.java b/server-common/src/main/java/io/a2a/server/events/MainEventBus.java new file mode 100644 index 000000000..73500254e --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/events/MainEventBus.java @@ -0,0 +1,42 @@ +package io.a2a.server.events; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import jakarta.enterprise.context.ApplicationScoped; + +@ApplicationScoped +public class MainEventBus { + private static final Logger LOGGER = LoggerFactory.getLogger(MainEventBus.class); + private final BlockingQueue queue; + + public MainEventBus() { + this.queue = new LinkedBlockingDeque<>(); + } + + public void submit(String taskId, EventQueue eventQueue, EventQueueItem item) { + try { + queue.put(new MainEventBusContext(taskId, eventQueue, item)); + LOGGER.debug("Submitted event for task {} to MainEventBus (queue size: {})", + taskId, queue.size()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted submitting to MainEventBus", e); + } + } + + public MainEventBusContext take() throws InterruptedException { + LOGGER.debug("MainEventBus: Waiting to take event (current queue size: {})...", queue.size()); + MainEventBusContext context = queue.take(); + LOGGER.debug("MainEventBus: Took event for task {} (remaining queue size: {})", + context.taskId(), queue.size()); + return context; + } + + public int size() { + return queue.size(); + } +} diff --git a/server-common/src/main/java/io/a2a/server/events/MainEventBusContext.java b/server-common/src/main/java/io/a2a/server/events/MainEventBusContext.java new file mode 100644 index 000000000..f8e5e03ec --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/events/MainEventBusContext.java @@ -0,0 +1,11 @@ +package io.a2a.server.events; + +import java.util.Objects; + +record MainEventBusContext(String taskId, EventQueue eventQueue, EventQueueItem eventQueueItem) { + MainEventBusContext { + Objects.requireNonNull(taskId, "taskId cannot be null"); + Objects.requireNonNull(eventQueue, "eventQueue cannot be null"); + Objects.requireNonNull(eventQueueItem, "eventQueueItem cannot be null"); + } +} diff --git a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java new file mode 100644 index 000000000..c04578a8b --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessor.java @@ -0,0 +1,304 @@ +package io.a2a.server.events; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import io.a2a.server.tasks.PushNotificationSender; +import io.a2a.server.tasks.TaskManager; +import io.a2a.server.tasks.TaskStore; +import io.a2a.spec.A2AServerException; +import io.a2a.spec.Event; +import io.a2a.spec.InternalError; +import io.a2a.spec.Task; +import io.a2a.spec.TaskArtifactUpdateEvent; +import io.a2a.spec.TaskStatusUpdateEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Background processor for the MainEventBus. + *

+ * This processor runs in a dedicated background thread, consuming events from the MainEventBus + * and performing two critical operations in order: + *

+ *
    + *
  1. Update TaskStore with event data (persistence FIRST)
  2. + *
  3. Distribute event to ChildQueues (clients see it AFTER persistence)
  4. + *
+ *

+ * This architecture ensures clients never receive events before they're persisted, + * eliminating race conditions and enabling reliable event replay. + *

+ *

+ * Note: This bean is eagerly initialized by {@link MainEventBusProcessorInitializer} + * to ensure the background thread starts automatically when the application starts. + *

+ */ +@ApplicationScoped +public class MainEventBusProcessor implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(MainEventBusProcessor.class); + + /** + * Callback for testing synchronization with async event processing. + * Default is NOOP to avoid null checks in production code. + * Tests can inject their own callback via setCallback(). + */ + private volatile MainEventBusProcessorCallback callback = MainEventBusProcessorCallback.NOOP; + + private final MainEventBus eventBus; + + private final TaskStore taskStore; + + private final PushNotificationSender pushSender; + + private volatile boolean running = true; + private Thread processorThread; + + @Inject + public MainEventBusProcessor(MainEventBus eventBus, TaskStore taskStore, PushNotificationSender pushSender) { + this.eventBus = eventBus; + this.taskStore = taskStore; + this.pushSender = pushSender; + } + + /** + * Set a callback for testing synchronization with async event processing. + *

+ * This is primarily intended for tests that need to wait for event processing to complete. + * Pass null to reset to the default NOOP callback. + *

+ * + * @param callback the callback to invoke during event processing, or null for NOOP + */ + public void setCallback(MainEventBusProcessorCallback callback) { + this.callback = callback != null ? callback : MainEventBusProcessorCallback.NOOP; + } + + @PostConstruct + void start() { + processorThread = new Thread(this, "MainEventBusProcessor"); + processorThread.setDaemon(true); // Allow JVM to exit even if this thread is running + processorThread.start(); + LOGGER.info("MainEventBusProcessor started"); + } + + /** + * No-op method to force CDI proxy resolution and ensure @PostConstruct has been called. + * Called by MainEventBusProcessorInitializer during application startup. + */ + public void ensureStarted() { + // Method intentionally empty - just forces proxy resolution + } + + @PreDestroy + void stop() { + LOGGER.info("MainEventBusProcessor stopping..."); + running = false; + if (processorThread != null) { + processorThread.interrupt(); + try { + long start = System.currentTimeMillis(); + processorThread.join(5000); // Wait up to 5 seconds + long elapsed = System.currentTimeMillis() - start; + LOGGER.info("MainEventBusProcessor thread stopped in {}ms", elapsed); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("Interrupted while waiting for MainEventBusProcessor thread to stop"); + } + } + LOGGER.info("MainEventBusProcessor stopped"); + } + + @Override + public void run() { + LOGGER.info("MainEventBusProcessor processing loop started"); + while (running) { + try { + LOGGER.debug("MainEventBusProcessor: Waiting for event from MainEventBus..."); + MainEventBusContext context = eventBus.take(); + LOGGER.debug("MainEventBusProcessor: Retrieved event for task {} from MainEventBus", + context.taskId()); + processEvent(context); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.info("MainEventBusProcessor interrupted, shutting down"); + break; + } catch (Exception e) { + LOGGER.error("Error processing event from MainEventBus", e); + // Continue processing despite errors + } + } + LOGGER.info("MainEventBusProcessor processing loop ended"); + } + + private void processEvent(MainEventBusContext context) { + String taskId = context.taskId(); + Event event = context.eventQueueItem().getEvent(); + EventQueue eventQueue = context.eventQueue(); + + LOGGER.debug("MainEventBusProcessor: Processing event for task {}: {} (queue type: {})", + taskId, event.getClass().getSimpleName(), eventQueue.getClass().getSimpleName()); + + Event eventToDistribute = null; + try { + // Step 1: Update TaskStore FIRST (persistence before clients see it) + // If this throws, we distribute an error to ensure "persist before client visibility" + + try { + updateTaskStore(taskId, event); + eventToDistribute = event; // Success - distribute original event + } catch (InternalError e) { + // Persistence failed - create error event to distribute instead + LOGGER.error("Failed to persist event for task {}, distributing error to clients", taskId, e); + String errorMessage = "Failed to persist event: " + e.getMessage(); + eventToDistribute = e; + } catch (Exception e) { + LOGGER.error("Failed to persist event for task {}, distributing error to clients", taskId, e); + String errorMessage = "Failed to persist event: " + e.getMessage(); + eventToDistribute = new InternalError(errorMessage); + } + + // Step 2: Send push notification AFTER successful persistence + if (eventToDistribute == event) { + sendPushNotification(taskId); + } + + // Step 3: Then distribute to ChildQueues (clients see either event or error AFTER persistence attempt) + if (eventToDistribute == null) { + LOGGER.error("MainEventBusProcessor: eventToDistribute is NULL for task {} - this should never happen!", taskId); + eventToDistribute = new InternalError("Internal error: event processing failed"); + } + + if (eventQueue instanceof EventQueue.MainQueue mainQueue) { + int childCount = mainQueue.getChildCount(); + LOGGER.debug("MainEventBusProcessor: Distributing {} to {} children for task {}", + eventToDistribute.getClass().getSimpleName(), childCount, taskId); + // Create new EventQueueItem with the event to distribute (original or error) + EventQueueItem itemToDistribute = new LocalEventQueueItem(eventToDistribute); + mainQueue.distributeToChildren(itemToDistribute); + LOGGER.debug("MainEventBusProcessor: Distributed {} to {} children for task {}", + eventToDistribute.getClass().getSimpleName(), childCount, taskId); + } else { + LOGGER.warn("MainEventBusProcessor: Expected MainQueue but got {} for task {}", + eventQueue.getClass().getSimpleName(), taskId); + } + + LOGGER.debug("MainEventBusProcessor: Completed processing event for task {}", taskId); + + } finally { + try { + // Step 4: Notify callback after all processing is complete + // Call callback with the distributed event (original or error) + if (eventToDistribute != null) { + callback.onEventProcessed(taskId, eventToDistribute); + + // Step 5: If this is a final event, notify task finalization + // Only for successful persistence (not for errors) + if (eventToDistribute == event && isFinalEvent(event)) { + callback.onTaskFinalized(taskId); + } + } + } finally { + // ALWAYS release semaphore, even if processing fails + // Balances the acquire() in MainQueue.enqueueEvent() + if (eventQueue instanceof EventQueue.MainQueue mainQueue) { + mainQueue.releaseSemaphore(); + } + } + } + } + + /** + * Updates TaskStore using TaskManager.process(). + *

+ * Creates a temporary TaskManager instance for this event and delegates to its process() method, + * which handles all event types (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent). + * This leverages existing TaskManager logic for status updates, artifact appending, message history, etc. + *

+ *

+ * If persistence fails, the exception is propagated to processEvent() which distributes an + * InternalError to clients instead of the original event, ensuring "persist before visibility". + * See Gemini's comment: https://github.com/a2aproject/a2a-java/pull/515#discussion_r2604621833 + *

+ * + * @throws InternalError if persistence fails + */ + private void updateTaskStore(String taskId, Event event) throws InternalError { + try { + // Extract contextId from event (all relevant events have it) + String contextId = extractContextId(event); + + // Create temporary TaskManager instance for this event + TaskManager taskManager = new TaskManager(taskId, contextId, taskStore, null); + + // Use TaskManager.process() - handles all event types with existing logic + taskManager.process(event); + LOGGER.debug("TaskStore updated via TaskManager.process() for task {}: {}", + taskId, event.getClass().getSimpleName()); + } catch (InternalError e) { + LOGGER.error("Error updating TaskStore via TaskManager for task {}", taskId, e); + // Rethrow to prevent distributing unpersisted event to clients + throw e; + } catch (Exception e) { + LOGGER.error("Unexpected error updating TaskStore for task {}", taskId, e); + // Rethrow to prevent distributing unpersisted event to clients + throw new InternalError("TaskStore persistence failed: " + e.getMessage()); + } + } + + /** + * Sends push notification for the task AFTER persistence. + *

+ * This is called after updateTaskStore() to ensure the notification contains + * the latest persisted state, avoiding race conditions. + *

+ */ + private void sendPushNotification(String taskId) { + try { + Task task = taskStore.get(taskId); + if (task != null) { + LOGGER.debug("Sending push notification for task {}", taskId); + pushSender.sendNotification(task); + } else { + LOGGER.debug("Skipping push notification - task {} not found in TaskStore", taskId); + } + } catch (Exception e) { + LOGGER.error("Error sending push notification for task {}", taskId, e); + // Don't rethrow - we still want to distribute to ChildQueues + } + } + + /** + * Extracts contextId from an event. + * Returns null if the event type doesn't have a contextId (e.g., Message). + */ + private String extractContextId(Event event) { + if (event instanceof Task task) { + return task.getContextId(); + } else if (event instanceof TaskStatusUpdateEvent statusUpdate) { + return statusUpdate.getContextId(); + } else if (event instanceof TaskArtifactUpdateEvent artifactUpdate) { + return artifactUpdate.getContextId(); + } + // Message and other events don't have contextId + return null; + } + + /** + * Checks if an event represents a final task state. + * + * @param event the event to check + * @return true if the event represents a final state (COMPLETED, FAILED, CANCELED, REJECTED, UNKNOWN) + */ + private boolean isFinalEvent(Event event) { + if (event instanceof Task task) { + return task.getStatus() != null && task.getStatus().state() != null + && task.getStatus().state().isFinal(); + } else if (event instanceof TaskStatusUpdateEvent statusUpdate) { + return statusUpdate.isFinal(); + } + return false; + } +} diff --git a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorCallback.java b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorCallback.java new file mode 100644 index 000000000..b0a9adbce --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorCallback.java @@ -0,0 +1,66 @@ +package io.a2a.server.events; + +import io.a2a.spec.Event; + +/** + * Callback interface for MainEventBusProcessor events. + *

+ * This interface is primarily intended for testing, allowing tests to synchronize + * with the asynchronous MainEventBusProcessor. Production code should not rely on this. + *

+ * Usage in tests: + *
+ * {@code
+ * @Inject
+ * MainEventBusProcessor processor;
+ *
+ * @BeforeEach
+ * void setUp() {
+ *     CountDownLatch latch = new CountDownLatch(3);
+ *     processor.setCallback(new MainEventBusProcessorCallback() {
+ *         public void onEventProcessed(String taskId, Event event) {
+ *             latch.countDown();
+ *         }
+ *     });
+ * }
+ *
+ * @AfterEach
+ * void tearDown() {
+ *     processor.setCallback(null); // Reset to NOOP
+ * }
+ * }
+ * 
+ */ +public interface MainEventBusProcessorCallback { + + /** + * Called after an event has been fully processed (persisted, notification sent, distributed to children). + * + * @param taskId the task ID + * @param event the event that was processed + */ + void onEventProcessed(String taskId, Event event); + + /** + * Called when a task reaches a final state (COMPLETED, FAILED, CANCELED, REJECTED). + * + * @param taskId the task ID that was finalized + */ + void onTaskFinalized(String taskId); + + /** + * No-op implementation that does nothing. + * Used as the default callback to avoid null checks. + */ + MainEventBusProcessorCallback NOOP = new MainEventBusProcessorCallback() { + @Override + public void onEventProcessed(String taskId, Event event) { + // No-op + } + + @Override + public void onTaskFinalized(String taskId) { + // No-op + } + }; +} diff --git a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorInitializer.java b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorInitializer.java new file mode 100644 index 000000000..ba4b300be --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorInitializer.java @@ -0,0 +1,43 @@ +package io.a2a.server.events; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.context.Initialized; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Portable CDI initializer for MainEventBusProcessor. + *

+ * This bean observes the ApplicationScoped initialization event and injects + * MainEventBusProcessor, which triggers its eager creation and starts the background thread. + *

+ *

+ * This approach is portable across all Jakarta CDI implementations (Weld, OpenWebBeans, Quarkus, etc.) + * and ensures MainEventBusProcessor starts automatically when the application starts. + *

+ */ +@ApplicationScoped +public class MainEventBusProcessorInitializer { + private static final Logger LOGGER = LoggerFactory.getLogger(MainEventBusProcessorInitializer.class); + + @Inject + MainEventBusProcessor processor; + + /** + * Observes ApplicationScoped initialization to force eager creation of MainEventBusProcessor. + * The injection of MainEventBusProcessor in this bean triggers its creation, and calling + * ensureStarted() forces the CDI proxy to be resolved, which ensures @PostConstruct has been + * called and the background thread is running. + */ + void onStart(@Observes @Initialized(ApplicationScoped.class) Object event) { + if (processor != null) { + // Force proxy resolution to ensure @PostConstruct has been called + processor.ensureStarted(); + LOGGER.info("MainEventBusProcessor initialized and started"); + } else { + LOGGER.error("MainEventBusProcessor is null - initialization failed!"); + } + } +} diff --git a/server-common/src/main/java/io/a2a/server/events/QueueManager.java b/server-common/src/main/java/io/a2a/server/events/QueueManager.java index b0eb517ae..449d30f90 100644 --- a/server-common/src/main/java/io/a2a/server/events/QueueManager.java +++ b/server-common/src/main/java/io/a2a/server/events/QueueManager.java @@ -15,7 +15,31 @@ public interface QueueManager { void awaitQueuePollerStart(EventQueue eventQueue) throws InterruptedException; default EventQueue.EventQueueBuilder getEventQueueBuilder(String taskId) { - return EventQueue.builder(); + throw new UnsupportedOperationException( + "QueueManager implementations must override getEventQueueBuilder() to provide MainEventBus" + ); + } + + /** + * Creates a base EventQueueBuilder with standard configuration for this QueueManager. + * This method provides the foundation for creating event queues with proper configuration + * (MainEventBus, TaskStateProvider, cleanup callbacks, etc.). + *

+ * QueueManager implementations that use custom factories can call this method directly + * to get the base builder without going through the factory (which could cause infinite + * recursion if the factory delegates back to getEventQueueBuilder()). + *

+ *

+ * Callers can then add additional configuration (hooks, callbacks) before building the queue. + *

+ * + * @param taskId the task ID for the queue + * @return a builder with base configuration specific to this QueueManager implementation + */ + default EventQueue.EventQueueBuilder createBaseEventQueueBuilder(String taskId) { + throw new UnsupportedOperationException( + "QueueManager implementations must override createBaseEventQueueBuilder() to provide MainEventBus" + ); } /** diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index b59a9aedb..dcdac496a 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -105,23 +105,20 @@ public class DefaultRequestHandler implements RequestHandler { private final TaskStore taskStore; private final QueueManager queueManager; private final PushNotificationConfigStore pushConfigStore; - private final PushNotificationSender pushSender; private final Supplier requestContextBuilder; private final ConcurrentMap> runningAgents = new ConcurrentHashMap<>(); - private final Set> backgroundTasks = ConcurrentHashMap.newKeySet(); private final Executor executor; @Inject public DefaultRequestHandler(AgentExecutor agentExecutor, TaskStore taskStore, QueueManager queueManager, PushNotificationConfigStore pushConfigStore, - PushNotificationSender pushSender, @Internal Executor executor) { + @Internal Executor executor) { this.agentExecutor = agentExecutor; this.taskStore = taskStore; this.queueManager = queueManager; this.pushConfigStore = pushConfigStore; - this.pushSender = pushSender; this.executor = executor; // TODO In Python this is also a constructor parameter defaulting to this SimpleRequestContextBuilder // implementation if the parameter is null. Skip that for now, since otherwise I get CDI errors, and @@ -143,9 +140,9 @@ void initConfig() { */ public static DefaultRequestHandler create(AgentExecutor agentExecutor, TaskStore taskStore, QueueManager queueManager, PushNotificationConfigStore pushConfigStore, - PushNotificationSender pushSender, Executor executor) { + Executor executor) { DefaultRequestHandler handler = - new DefaultRequestHandler(agentExecutor, taskStore, queueManager, pushConfigStore, pushSender, executor); + new DefaultRequestHandler(agentExecutor, taskStore, queueManager, pushConfigStore, executor); handler.agentCompletionTimeoutSeconds = 5; handler.consumptionCompletionTimeoutSeconds = 2; return handler; @@ -229,7 +226,7 @@ public Task onCancelTask(TaskIdParams params, ServerCallContext context) throws EventQueue queue = queueManager.tap(task.getId()); if (queue == null) { - queue = queueManager.getEventQueueBuilder(task.getId()).build(); + queue = queueManager.getEventQueueBuilder(task.getId()).build().tap(); } agentExecutor.cancel( requestContextBuilder.get() @@ -274,15 +271,23 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte blocking = false; } + // Log blocking behavior from client request + if (params.configuration() != null && params.configuration().blocking() != null) { + LOGGER.info("DefaultRequestHandler: Client requested blocking={} for task {}", + params.configuration().blocking(), taskId); + } else if (params.configuration() != null) { + LOGGER.info("DefaultRequestHandler: Client sent configuration but blocking=null, using default blocking=true for task {}", taskId); + } else { + LOGGER.info("DefaultRequestHandler: Client sent no configuration, using default blocking=true for task {}", taskId); + } + LOGGER.info("DefaultRequestHandler: Final blocking decision: {} for task {}", blocking, taskId); + boolean interruptedOrNonBlocking = false; EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(taskId, mss.requestContext, queue); ResultAggregator.EventTypeAndInterrupt etai = null; EventKind kind = null; // Declare outside try block so it's in scope for return try { - // Create callback for push notifications during background event processing - Runnable pushNotificationCallback = () -> sendPushNotification(taskId, resultAggregator); - EventConsumer consumer = new EventConsumer(queue); // This callback must be added before we start consuming. Otherwise, @@ -298,7 +303,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte throw new InternalError("No result"); } interruptedOrNonBlocking = etai.interrupted(); - LOGGER.debug("Was interrupted or non-blocking: {}", interruptedOrNonBlocking); + LOGGER.info("DefaultRequestHandler: interruptedOrNonBlocking={} (blocking={}, eventType={})", + interruptedOrNonBlocking, blocking, kind != null ? kind.getClass().getSimpleName() : null); // For blocking calls that were interrupted (returned on first event), // wait for agent execution and event processing BEFORE returning to client. @@ -310,7 +316,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte // Store push notification config for newly created tasks (mirrors streaming logic) // Only for NEW tasks - existing tasks are handled by initMessageSend() if (mss.task() == null && kind instanceof Task createdTask && shouldAddPushInfo(params)) { - LOGGER.debug("Storing push notification config for new task {}", createdTask.getId()); + LOGGER.debug("Storing push notification config for new task {} (original taskId from params: {})", + createdTask.getId(), params.message().getTaskId()); pushConfigStore.setInfo(createdTask.getId(), params.configuration().pushNotificationConfig()); } @@ -321,16 +328,18 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte // 2. Close the queue to signal consumption can complete // 3. Wait for consumption to finish processing events // 4. Fetch final task state from TaskStore + LOGGER.info("DefaultRequestHandler: Entering blocking fire-and-forget handling for task {}", taskId); try { // Step 1: Wait for agent to finish (with configurable timeout) if (agentFuture != null) { try { agentFuture.get(agentCompletionTimeoutSeconds, SECONDS); - LOGGER.debug("Agent completed for task {}", taskId); + LOGGER.info("DefaultRequestHandler: Step 1 - Agent completed for task {}", taskId); } catch (java.util.concurrent.TimeoutException e) { // Agent still running after timeout - that's fine, events already being processed - LOGGER.debug("Agent still running for task {} after {}s", taskId, agentCompletionTimeoutSeconds); + LOGGER.info("DefaultRequestHandler: Step 1 - Agent still running for task {} after {}s timeout", + taskId, agentCompletionTimeoutSeconds); } } @@ -338,12 +347,12 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte // For fire-and-forget tasks, there's no final event, so we need to close the queue // This allows EventConsumer.consumeAll() to exit queue.close(false, false); // graceful close, don't notify parent yet - LOGGER.debug("Closed queue for task {} to allow consumption completion", taskId); + LOGGER.info("DefaultRequestHandler: Step 2 - Closed queue for task {} to allow consumption completion", taskId); // Step 3: Wait for consumption to complete (now that queue is closed) if (etai.consumptionFuture() != null) { etai.consumptionFuture().get(consumptionCompletionTimeoutSeconds, SECONDS); - LOGGER.debug("Consumption completed for task {}", taskId); + LOGGER.info("DefaultRequestHandler: Step 3 - Consumption completed for task {}", taskId); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -364,27 +373,29 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte Task updatedTask = taskStore.get(taskId); if (updatedTask != null) { kind = updatedTask; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Fetched final task for {} with state {} and {} artifacts", - taskId, updatedTask.getStatus().state(), - updatedTask.getArtifacts().size()); - } + LOGGER.info("DefaultRequestHandler: Step 4 - Fetched final task for {} with state {} and {} artifacts", + taskId, updatedTask.getStatus().state(), + updatedTask.getArtifacts().size()); + } else { + LOGGER.warn("DefaultRequestHandler: Step 4 - Task {} not found in TaskStore!", taskId); } } if (kind instanceof Task taskResult && !taskId.equals(taskResult.getId())) { throw new InternalError("Task ID mismatch in agent response"); } - - // Send push notification after initial return (for both blocking and non-blocking) - pushNotificationCallback.run(); } finally { // Remove agent from map immediately to prevent accumulation CompletableFuture agentFuture = runningAgents.remove(taskId); LOGGER.debug("Removed agent for task {} from runningAgents in finally block, size after: {}", taskId, runningAgents.size()); - // Track cleanup as background task to avoid blocking Vert.x threads + // Cleanup as background task to avoid blocking Vert.x threads // Pass the consumption future to ensure cleanup waits for background consumption to complete - trackBackgroundTask(cleanupProducer(agentFuture, etai != null ? etai.consumptionFuture() : null, taskId, queue, false)); + cleanupProducer(agentFuture, etai != null ? etai.consumptionFuture() : null, taskId, queue, false) + .whenComplete((res, err) -> { + if (err != null) { + LOGGER.error("Error during async cleanup for task {}", taskId, err); + } + }); } LOGGER.debug("Returning: {}", kind); @@ -394,24 +405,31 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte @Override public Flow.Publisher onMessageSendStream( MessageSendParams params, ServerCallContext context) throws JSONRPCError { - LOGGER.debug("onMessageSendStream START - task: {}; context: {}; runningAgents: {}; backgroundTasks: {}", - params.message().getTaskId(), params.message().getContextId(), runningAgents.size(), backgroundTasks.size()); + LOGGER.debug("onMessageSendStream START - task: {}; context: {}; runningAgents: {}", + params.message().getTaskId(), params.message().getContextId(), runningAgents.size()); MessageSendSetup mss = initMessageSend(params, context); AtomicReference taskId = new AtomicReference<>(mss.requestContext.getTaskId()); EventQueue queue = queueManager.createOrTap(taskId.get()); LOGGER.debug("Created/tapped queue for task {}: {}", taskId.get(), queue); + + // Store push notification config SYNCHRONOUSLY for new tasks before agent starts + // This ensures config is available when MainEventBusProcessor sends push notifications + // For existing tasks, config was already stored in initMessageSend() + if (mss.task() == null && shouldAddPushInfo(params)) { + LOGGER.debug("Storing push notification config for new streaming task {} EARLY (original taskId from params: {})", + taskId.get(), params.message().getTaskId()); + pushConfigStore.setInfo(taskId.get(), params.configuration().pushNotificationConfig()); + } + ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, executor); EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(taskId.get(), mss.requestContext, queue); // Move consumer creation and callback registration outside try block - // so consumer is available for background consumption on client disconnect EventConsumer consumer = new EventConsumer(queue); producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback()); - AtomicBoolean backgroundConsumeStarted = new AtomicBoolean(false); - try { Flow.Publisher results = resultAggregator.consumeAndEmit(consumer); @@ -432,22 +450,12 @@ public Flow.Publisher onMessageSendStream( } catch (TaskQueueExistsException e) { // TODO Log } - if (pushConfigStore != null && - params.configuration() != null && - params.configuration().pushNotificationConfig() != null) { - pushConfigStore.setInfo( - createdTask.getId(), - params.configuration().pushNotificationConfig()); - } + // Push notification config already stored synchronously at start of onMessageSendStream + // for new tasks, or in initMessageSend for existing tasks. No need to store again here. } - if (pushSender != null && taskId.get() != null) { - EventKind latest = resultAggregator.getCurrentResult(); - if (latest instanceof Task latestTask) { - pushSender.sendNotification(latestTask); - } - } + // Push notifications now sent by MainEventBusProcessor after persistence return true; })); @@ -457,7 +465,8 @@ public Flow.Publisher onMessageSendStream( Flow.Publisher finalPublisher = convertingProcessor(eventPublisher, event -> (StreamingEventKind) event); - // Wrap publisher to detect client disconnect and continue background consumption + // Wrap publisher to detect client disconnect and immediately close ChildQueue + // This prevents ChildQueue backpressure from blocking MainEventBusProcessor return subscriber -> { LOGGER.debug("Creating subscription wrapper for task {}", taskId.get()); finalPublisher.subscribe(new Flow.Subscriber() { @@ -477,8 +486,10 @@ public void request(long n) { @Override public void cancel() { - LOGGER.debug("Client cancelled subscription for task {}, starting background consumption", taskId.get()); - startBackgroundConsumption(); + LOGGER.debug("Client cancelled subscription for task {}, closing ChildQueue immediately", taskId.get()); + // Close ChildQueue immediately to prevent backpressure + // (clears queue and releases semaphore permits) + queue.close(true); // immediate=true subscription.cancel(); } }); @@ -503,8 +514,8 @@ public void onComplete() { subscriber.onComplete(); } catch (IllegalStateException e) { // Client already disconnected and response closed - this is expected - // for streaming responses where client disconnect triggers background - // consumption. Log and ignore. + // for streaming responses where client disconnect closes ChildQueue. + // Log and ignore. if (e.getMessage() != null && e.getMessage().contains("Response has already been written")) { LOGGER.debug("Client disconnected before onComplete, response already closed for task {}", taskId.get()); } else { @@ -512,36 +523,22 @@ public void onComplete() { } } } - - private void startBackgroundConsumption() { - if (backgroundConsumeStarted.compareAndSet(false, true)) { - LOGGER.debug("Starting background consumption for task {}", taskId.get()); - // Client disconnected: continue consuming and persisting events in background - CompletableFuture bgTask = CompletableFuture.runAsync(() -> { - try { - LOGGER.debug("Background consumption thread started for task {}", taskId.get()); - resultAggregator.consumeAll(consumer); - LOGGER.debug("Background consumption completed for task {}", taskId.get()); - } catch (Exception e) { - LOGGER.error("Error during background consumption for task {}", taskId.get(), e); - } - }, executor); - trackBackgroundTask(bgTask); - } else { - LOGGER.debug("Background consumption already started for task {}", taskId.get()); - } - } }); }; } finally { - LOGGER.debug("onMessageSendStream FINALLY - task: {}; runningAgents: {}; backgroundTasks: {}", - taskId.get(), runningAgents.size(), backgroundTasks.size()); + LOGGER.debug("onMessageSendStream FINALLY - task: {}; runningAgents: {}", + taskId.get(), runningAgents.size()); // Remove agent from map immediately to prevent accumulation CompletableFuture agentFuture = runningAgents.remove(taskId.get()); LOGGER.debug("Removed agent for task {} from runningAgents in finally block, size after: {}", taskId.get(), runningAgents.size()); - trackBackgroundTask(cleanupProducer(agentFuture, null, taskId.get(), queue, true)); + cleanupProducer(agentFuture, null, taskId.get(), queue, true) + .whenComplete((res, err) -> { + if (err != null) { + LOGGER.error("Error during async cleanup for streaming task {}", taskId.get(), err); + } + }); } } @@ -708,47 +705,6 @@ public void run() { return runnable; } - private void trackBackgroundTask(CompletableFuture task) { - backgroundTasks.add(task); - LOGGER.debug("Tracking background task (total: {}): {}", backgroundTasks.size(), task); - - task.whenComplete((result, throwable) -> { - try { - if (throwable != null) { - // Unwrap CompletionException to check for CancellationException - Throwable cause = throwable; - if (throwable instanceof java.util.concurrent.CompletionException && throwable.getCause() != null) { - cause = throwable.getCause(); - } - - if (cause instanceof java.util.concurrent.CancellationException) { - LOGGER.debug("Background task cancelled: {}", task); - } else { - LOGGER.error("Background task failed", throwable); - } - } - } finally { - backgroundTasks.remove(task); - LOGGER.debug("Removed background task (remaining: {}): {}", backgroundTasks.size(), task); - } - }); - } - - /** - * Wait for all background tasks to complete. - * Useful for testing to ensure cleanup completes before assertions. - * - * @return CompletableFuture that completes when all background tasks finish - */ - public CompletableFuture waitForBackgroundTasks() { - CompletableFuture[] tasks = backgroundTasks.toArray(new CompletableFuture[0]); - if (tasks.length == 0) { - return CompletableFuture.completedFuture(null); - } - LOGGER.debug("Waiting for {} background tasks to complete", tasks.length); - return CompletableFuture.allOf(tasks); - } - private CompletableFuture cleanupProducer(CompletableFuture agentFuture, CompletableFuture consumptionFuture, String taskId, EventQueue queue, boolean isStreaming) { LOGGER.debug("Starting cleanup for task {} (streaming={})", taskId, isStreaming); logThreadStats("CLEANUP START"); @@ -773,14 +729,20 @@ private CompletableFuture cleanupProducer(CompletableFuture agentFut LOGGER.debug("Agent and consumption both completed successfully for task {}", taskId); } - // Always close the ChildQueue and notify the parent MainQueue - // The parent will close itself when all children are closed (childClosing logic) - // This ensures proper cleanup and removal from QueueManager map - LOGGER.debug("{} call, closing ChildQueue for task {} (immediate=false, notifyParent=true)", - isStreaming ? "Streaming" : "Non-streaming", taskId); + if (isStreaming) { + // For streaming: Queue lifecycle managed by EventConsumer + // EventConsumer closes queue when it detects final event (or QueueClosedEvent from replication) + // For fire-and-forget tasks, MainQueue stays open per architectural principle + LOGGER.debug("Streaming call for task {} - queue lifecycle managed by EventConsumer", taskId); + } else { + // For non-streaming: close the ChildQueue and notify the parent MainQueue + // The parent will close itself when all children are closed (childClosing logic) + // This ensures proper cleanup and removal from QueueManager map + LOGGER.debug("Non-streaming call, closing ChildQueue for task {} (immediate=false, notifyParent=true)", taskId); - // Always notify parent so MainQueue can clean up when last child closes - queue.close(false, true); + // Always notify parent so MainQueue can clean up when last child closes + queue.close(false, true); + } // For replicated environments, the poison pill is now sent via CDI events // When JpaDatabaseTaskStore.save() persists a final task, it fires TaskFinalizedEvent @@ -820,15 +782,6 @@ private MessageSendSetup initMessageSend(MessageSendParams params, ServerCallCon return new MessageSendSetup(taskManager, task, requestContext); } - private void sendPushNotification(String taskId, ResultAggregator resultAggregator) { - if (pushSender != null && taskId != null) { - EventKind latest = resultAggregator.getCurrentResult(); - if (latest instanceof Task latestTask) { - pushSender.sendNotification(latestTask); - } - } - } - /** * Log current thread and resource statistics for debugging. * Only logs when DEBUG level is enabled. Call this from debugger or add strategic @@ -850,7 +803,6 @@ private void logThreadStats(String label) { LOGGER.debug("=== THREAD STATS: {} ===", label); LOGGER.debug("Active threads: {}", activeThreads); LOGGER.debug("Running agents: {}", runningAgents.size()); - LOGGER.debug("Background tasks: {}", backgroundTasks.size()); LOGGER.debug("Queue manager active queues: {}", queueManager.getClass().getSimpleName()); // List running agents @@ -861,13 +813,6 @@ private void logThreadStats(String label) { ); } - // List background tasks - if (!backgroundTasks.isEmpty()) { - LOGGER.debug("Background tasks:"); - backgroundTasks.forEach(task -> - LOGGER.debug(" - {}: {}", task, task.isDone() ? "DONE" : "RUNNING") - ); - } LOGGER.debug("=== END THREAD STATS ==="); } diff --git a/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java b/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java index 27de1defb..83ebf14e2 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java +++ b/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java @@ -13,7 +13,6 @@ import io.a2a.server.events.EventConsumer; import io.a2a.server.events.EventQueueItem; -import io.a2a.spec.A2AServerException; import io.a2a.spec.Event; import io.a2a.spec.EventKind; import io.a2a.spec.JSONRPCError; @@ -48,18 +47,10 @@ public EventKind getCurrentResult() { public Flow.Publisher consumeAndEmit(EventConsumer consumer) { Flow.Publisher allItems = consumer.consumeAll(); - // Process items conditionally - only save non-replicated events to database + // Just stream events - no persistence needed + // TaskStore update moved to MainEventBusProcessor return processor(createTubeConfig(), allItems, (errorConsumer, item) -> { - // Only process non-replicated events to avoid duplicate database writes - if (!item.isReplicated()) { - try { - callTaskManagerProcess(item.getEvent()); - } catch (A2AServerException e) { - errorConsumer.accept(e); - return false; - } - } - // Continue processing and emit (both replicated and non-replicated) + // Continue processing and emit all events return true; }); } @@ -80,15 +71,7 @@ public EventKind consumeAll(EventConsumer consumer) throws JSONRPCError { return false; } } - // Only process non-replicated events to avoid duplicate database writes - if (!item.isReplicated()) { - try { - callTaskManagerProcess(event); - } catch (A2AServerException e) { - error.set(e); - return false; - } - } + // TaskStore update moved to MainEventBusProcessor return true; }, error::set); @@ -107,6 +90,7 @@ public EventKind consumeAll(EventConsumer consumer) throws JSONRPCError { public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking) throws JSONRPCError { Flow.Publisher allItems = consumer.consumeAll(); AtomicReference message = new AtomicReference<>(); + AtomicReference capturedTask = new AtomicReference<>(); // Capture Task events AtomicBoolean interrupted = new AtomicBoolean(false); AtomicReference errorRef = new AtomicReference<>(); CompletableFuture completionFuture = new CompletableFuture<>(); @@ -140,25 +124,30 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, return false; } - // Process event through TaskManager - only for non-replicated events - if (!item.isReplicated()) { - try { - callTaskManagerProcess(event); - } catch (A2AServerException e) { - errorRef.set(e); - completionFuture.completeExceptionally(e); - return false; + // Capture Task events (especially for new tasks where taskManager.getTask() would return null) + // We capture the LATEST task to ensure we get the most up-to-date state + if (event instanceof Task t) { + Task previousTask = capturedTask.get(); + capturedTask.set(t); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Captured Task event: id={}, state={} (previous: {})", + t.getId(), t.getStatus().state(), + previousTask != null ? previousTask.getId() + "/" + previousTask.getStatus().state() : "none"); } } + // TaskStore update moved to MainEventBusProcessor + // Determine interrupt behavior boolean shouldInterrupt = false; - boolean continueInBackground = false; boolean isFinalEvent = (event instanceof Task task && task.getStatus().state().isFinal()) || (event instanceof TaskStatusUpdateEvent tsue && tsue.isFinal()); boolean isAuthRequired = (event instanceof Task task && task.getStatus().state() == TaskState.AUTH_REQUIRED) || (event instanceof TaskStatusUpdateEvent tsue && tsue.getStatus().state() == TaskState.AUTH_REQUIRED); + LOGGER.info("ResultAggregator: Evaluating interrupt (blocking={}, isFinal={}, isAuth={}, eventType={})", + blocking, isFinalEvent, isAuthRequired, event.getClass().getSimpleName()); + // Always interrupt on auth_required, as it needs external action. if (isAuthRequired) { // auth-required is a special state: the message should be @@ -168,20 +157,19 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, // new request is expected in order for the agent to make progress, // so the agent should exit. shouldInterrupt = true; - continueInBackground = true; + LOGGER.info("ResultAggregator: Setting shouldInterrupt=true (AUTH_REQUIRED)"); } else if (!blocking) { // For non-blocking calls, interrupt as soon as a task is available. shouldInterrupt = true; - continueInBackground = true; + LOGGER.info("ResultAggregator: Setting shouldInterrupt=true (non-blocking)"); } else if (blocking) { // For blocking calls: Interrupt to free Vert.x thread, but continue in background // Python's async consumption doesn't block threads, but Java's does // So we interrupt to return quickly, then rely on background consumption - // DefaultRequestHandler will fetch the final state from TaskStore shouldInterrupt = true; - continueInBackground = true; + LOGGER.info("ResultAggregator: Setting shouldInterrupt=true (blocking, isFinal={})", isFinalEvent); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Blocking call for task {}: {} event, returning with background consumption", taskIdForLogging(), isFinalEvent ? "final" : "non-final"); @@ -189,14 +177,14 @@ else if (blocking) { } if (shouldInterrupt) { + LOGGER.info("ResultAggregator: Interrupting consumption (setting interrupted=true)"); // Complete the future to unblock the main thread interrupted.set(true); completionFuture.complete(null); // For blocking calls, DON'T complete consumptionCompletionFuture here. // Let it complete naturally when subscription finishes (onComplete callback below). - // This ensures all events are processed and persisted to TaskStore before - // DefaultRequestHandler.cleanupProducer() proceeds with cleanup. + // This ensures all events are fully processed before cleanup. // // For non-blocking and auth-required calls, complete immediately to allow // cleanup to proceed while consumption continues in background. @@ -255,16 +243,27 @@ else if (blocking) { Utils.rethrow(error); } + // Return Message if captured, otherwise Task if captured, otherwise fetch from TaskStore + EventKind eventKind = message.get(); + if (eventKind == null) { + eventKind = capturedTask.get(); + if (LOGGER.isDebugEnabled() && eventKind instanceof Task t) { + LOGGER.debug("Returning capturedTask: id={}, state={}", t.getId(), t.getStatus().state()); + } + } + if (eventKind == null) { + eventKind = taskManager.getTask(); + if (LOGGER.isDebugEnabled() && eventKind instanceof Task t) { + LOGGER.debug("Returning task from TaskStore: id={}, state={}", t.getId(), t.getStatus().state()); + } + } + return new EventTypeAndInterrupt( - message.get() != null ? message.get() : taskManager.getTask(), + eventKind, interrupted.get(), consumptionCompletionFuture); } - private void callTaskManagerProcess(Event event) throws A2AServerException { - taskManager.process(event); - } - private String taskIdForLogging() { Task task = taskManager.getTask(); return task != null ? task.getId() : "unknown"; diff --git a/server-common/src/test/java/io/a2a/server/events/EventConsumerTest.java b/server-common/src/test/java/io/a2a/server/events/EventConsumerTest.java index 3e5611d9e..725ed33d8 100644 --- a/server-common/src/test/java/io/a2a/server/events/EventConsumerTest.java +++ b/server-common/src/test/java/io/a2a/server/events/EventConsumerTest.java @@ -15,6 +15,8 @@ import java.util.concurrent.atomic.AtomicReference; import io.a2a.json.JsonProcessingException; +import io.a2a.server.tasks.InMemoryTaskStore; +import io.a2a.server.tasks.PushNotificationSender; import io.a2a.spec.A2AError; import io.a2a.spec.A2AServerException; import io.a2a.spec.Artifact; @@ -28,14 +30,19 @@ import io.a2a.spec.TaskStatusUpdateEvent; import io.a2a.spec.TextPart; import io.a2a.util.Utils; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public class EventConsumerTest { + private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {}; + private static final String TASK_ID = "123"; // Must match MINIMAL_TASK id + private EventQueue eventQueue; private EventConsumer eventConsumer; - + private MainEventBus mainEventBus; + private MainEventBusProcessor mainEventBusProcessor; private static final String MINIMAL_TASK = """ { @@ -57,10 +64,58 @@ public class EventConsumerTest { @BeforeEach public void init() { - eventQueue = EventQueue.builder().build(); + // Set up MainEventBus and processor for production-like test environment + InMemoryTaskStore taskStore = new InMemoryTaskStore(); + mainEventBus = new MainEventBus(); + mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER); + EventQueueUtil.start(mainEventBusProcessor); + + eventQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus) + .taskId(TASK_ID) + .mainEventBus(mainEventBus) + .build().tap(); eventConsumer = new EventConsumer(eventQueue); } + @AfterEach + public void cleanup() { + if (mainEventBusProcessor != null) { + mainEventBusProcessor.setCallback(null); // Clear any test callbacks + EventQueueUtil.stop(mainEventBusProcessor); + } + } + + /** + * Helper to wait for MainEventBusProcessor to process an event. + * Replaces polling patterns with deterministic callback-based waiting. + * + * @param action the action that triggers event processing + * @throws InterruptedException if waiting is interrupted + * @throws AssertionError if processing doesn't complete within timeout + */ + private void waitForEventProcessing(Runnable action) throws InterruptedException { + CountDownLatch processingLatch = new CountDownLatch(1); + mainEventBusProcessor.setCallback(new MainEventBusProcessorCallback() { + @Override + public void onEventProcessed(String taskId, Event event) { + processingLatch.countDown(); + } + + @Override + public void onTaskFinalized(String taskId) { + // Not needed for basic event processing wait + } + }); + + try { + action.run(); + assertTrue(processingLatch.await(5, TimeUnit.SECONDS), + "MainEventBusProcessor should have processed the event within timeout"); + } finally { + mainEventBusProcessor.setCallback(null); + } + } + @Test public void testConsumeOneTaskEvent() throws Exception { Task event = Utils.unmarshalFrom(MINIMAL_TASK, Task.class); @@ -95,7 +150,7 @@ public void testConsumeAllMultipleEvents() throws JsonProcessingException { List events = List.of( Utils.unmarshalFrom(MINIMAL_TASK, Task.class), new TaskArtifactUpdateEvent.Builder() - .taskId("task-123") + .taskId(TASK_ID) .contextId("session-xyz") .artifact(new Artifact.Builder() .artifactId("11") @@ -103,7 +158,7 @@ public void testConsumeAllMultipleEvents() throws JsonProcessingException { .build()) .build(), new TaskStatusUpdateEvent.Builder() - .taskId("task-123") + .taskId(TASK_ID) .contextId("session-xyz") .status(new TaskStatus(TaskState.WORKING)) .isFinal(true) @@ -156,7 +211,7 @@ public void testConsumeUntilMessage() throws Exception { List events = List.of( Utils.unmarshalFrom(MINIMAL_TASK, Task.class), new TaskArtifactUpdateEvent.Builder() - .taskId("task-123") + .taskId(TASK_ID) .contextId("session-xyz") .artifact(new Artifact.Builder() .artifactId("11") @@ -164,7 +219,7 @@ public void testConsumeUntilMessage() throws Exception { .build()) .build(), new TaskStatusUpdateEvent.Builder() - .taskId("task-123") + .taskId(TASK_ID) .contextId("session-xyz") .status(new TaskStatus(TaskState.WORKING)) .isFinal(true) @@ -343,7 +398,9 @@ public void onComplete() { @Test public void testConsumeAllStopsOnQueueClosed() throws Exception { - EventQueue queue = EventQueue.builder().build(); + EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus) + .mainEventBus(mainEventBus) + .build().tap(); EventConsumer consumer = new EventConsumer(queue); // Close the queue immediately @@ -389,12 +446,16 @@ public void onComplete() { @Test public void testConsumeAllHandlesQueueClosedException() throws Exception { - EventQueue queue = EventQueue.builder().build(); + EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus) + .mainEventBus(mainEventBus) + .build().tap(); EventConsumer consumer = new EventConsumer(queue); // Add a message event (which will complete the stream) Event message = Utils.unmarshalFrom(MESSAGE_PAYLOAD, Message.class); - queue.enqueueEvent(message); + + // Use callback to wait for event processing + waitForEventProcessing(() -> queue.enqueueEvent(message)); // Close the queue before consuming queue.close(); @@ -439,7 +500,9 @@ public void onComplete() { @Test public void testConsumeAllTerminatesOnQueueClosedEvent() throws Exception { - EventQueue queue = EventQueue.builder().build(); + EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus) + .mainEventBus(mainEventBus) + .build().tap(); EventConsumer consumer = new EventConsumer(queue); // Enqueue a QueueClosedEvent (poison pill) @@ -488,8 +551,12 @@ public void onComplete() { } private void enqueueAndConsumeOneEvent(Event event) throws Exception { - eventQueue.enqueueEvent(event); + // Use callback to wait for event processing + waitForEventProcessing(() -> eventQueue.enqueueEvent(event)); + + // Event is now available, consume it directly Event result = eventConsumer.consumeOne(); + assertNotNull(result, "Event should be available"); assertSame(event, result); } diff --git a/server-common/src/test/java/io/a2a/server/events/EventQueueTest.java b/server-common/src/test/java/io/a2a/server/events/EventQueueTest.java index 75888f151..14fe31c94 100644 --- a/server-common/src/test/java/io/a2a/server/events/EventQueueTest.java +++ b/server-common/src/test/java/io/a2a/server/events/EventQueueTest.java @@ -10,7 +10,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import io.a2a.server.tasks.InMemoryTaskStore; +import io.a2a.server.tasks.PushNotificationSender; import io.a2a.spec.Artifact; import io.a2a.spec.Event; import io.a2a.spec.JSONRPCError; @@ -23,12 +27,17 @@ import io.a2a.spec.TaskStatusUpdateEvent; import io.a2a.spec.TextPart; import io.a2a.util.Utils; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public class EventQueueTest { private EventQueue eventQueue; + private MainEventBus mainEventBus; + private MainEventBusProcessor mainEventBusProcessor; + + private static final String TASK_ID = "123"; // Must match MINIMAL_TASK id private static final String MINIMAL_TASK = """ { @@ -48,38 +57,95 @@ public class EventQueueTest { } """; + private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {}; @BeforeEach public void init() { - eventQueue = EventQueue.builder().build(); + // Set up MainEventBus and processor for production-like test environment + InMemoryTaskStore taskStore = new InMemoryTaskStore(); + mainEventBus = new MainEventBus(); + mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER); + EventQueueUtil.start(mainEventBusProcessor); + + eventQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus) + .taskId(TASK_ID) + .mainEventBus(mainEventBus) + .build().tap(); + } + + @AfterEach + public void cleanup() { + if (mainEventBusProcessor != null) { + mainEventBusProcessor.setCallback(null); // Clear any test callbacks + EventQueueUtil.stop(mainEventBusProcessor); + } + } + /** + * Helper to create a queue with MainEventBus configured (for tests that need event distribution). + */ + private EventQueue createQueueWithEventBus(String taskId) { + return EventQueueUtil.getEventQueueBuilder(mainEventBus) + .taskId(taskId) + .build(); + } + + /** + * Helper to wait for MainEventBusProcessor to process an event. + * Replaces polling patterns with deterministic callback-based waiting. + * + * @param action the action that triggers event processing + * @throws InterruptedException if waiting is interrupted + * @throws AssertionError if processing doesn't complete within timeout + */ + private void waitForEventProcessing(Runnable action) throws InterruptedException { + CountDownLatch processingLatch = new CountDownLatch(1); + mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() { + @Override + public void onEventProcessed(String taskId, io.a2a.spec.Event event) { + processingLatch.countDown(); + } + + @Override + public void onTaskFinalized(String taskId) { + // Not needed for basic event processing wait + } + }); + + try { + action.run(); + assertTrue(processingLatch.await(5, TimeUnit.SECONDS), + "MainEventBusProcessor should have processed the event within timeout"); + } finally { + mainEventBusProcessor.setCallback(null); + } } @Test public void testConstructorDefaultQueueSize() { - EventQueue queue = EventQueue.builder().build(); + EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build(); assertEquals(EventQueue.DEFAULT_QUEUE_SIZE, queue.getQueueSize()); } @Test public void testConstructorCustomQueueSize() { int customSize = 500; - EventQueue queue = EventQueue.builder().queueSize(customSize).build(); + EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).queueSize(customSize).build(); assertEquals(customSize, queue.getQueueSize()); } @Test public void testConstructorInvalidQueueSize() { // Test zero queue size - assertThrows(IllegalArgumentException.class, () -> EventQueue.builder().queueSize(0).build()); + assertThrows(IllegalArgumentException.class, () -> EventQueueUtil.getEventQueueBuilder(mainEventBus).queueSize(0).build()); // Test negative queue size - assertThrows(IllegalArgumentException.class, () -> EventQueue.builder().queueSize(-10).build()); + assertThrows(IllegalArgumentException.class, () -> EventQueueUtil.getEventQueueBuilder(mainEventBus).queueSize(-10).build()); } @Test public void testTapCreatesChildQueue() { - EventQueue parentQueue = EventQueue.builder().build(); + EventQueue parentQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build(); EventQueue childQueue = parentQueue.tap(); assertNotNull(childQueue); @@ -89,7 +155,7 @@ public void testTapCreatesChildQueue() { @Test public void testTapOnChildQueueThrowsException() { - EventQueue parentQueue = EventQueue.builder().build(); + EventQueue parentQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build(); EventQueue childQueue = parentQueue.tap(); assertThrows(IllegalStateException.class, () -> childQueue.tap()); @@ -97,69 +163,74 @@ public void testTapOnChildQueueThrowsException() { @Test public void testEnqueueEventPropagagesToChildren() throws Exception { - EventQueue parentQueue = EventQueue.builder().build(); - EventQueue childQueue = parentQueue.tap(); + EventQueue mainQueue = createQueueWithEventBus(TASK_ID); + EventQueue childQueue1 = mainQueue.tap(); + EventQueue childQueue2 = mainQueue.tap(); Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.class); - parentQueue.enqueueEvent(event); + mainQueue.enqueueEvent(event); - // Event should be available in both parent and child queues - Event parentEvent = parentQueue.dequeueEventItem(-1).getEvent(); - Event childEvent = childQueue.dequeueEventItem(-1).getEvent(); + // Event should be available in all child queues + // Note: MainEventBusProcessor runs async, so we use dequeueEventItem with timeout + Event child1Event = childQueue1.dequeueEventItem(5000).getEvent(); + Event child2Event = childQueue2.dequeueEventItem(5000).getEvent(); - assertSame(event, parentEvent); - assertSame(event, childEvent); + assertSame(event, child1Event); + assertSame(event, child2Event); } @Test public void testMultipleChildQueuesReceiveEvents() throws Exception { - EventQueue parentQueue = EventQueue.builder().build(); - EventQueue childQueue1 = parentQueue.tap(); - EventQueue childQueue2 = parentQueue.tap(); + EventQueue mainQueue = createQueueWithEventBus(TASK_ID); + EventQueue childQueue1 = mainQueue.tap(); + EventQueue childQueue2 = mainQueue.tap(); + EventQueue childQueue3 = mainQueue.tap(); Event event1 = Utils.unmarshalFrom(MINIMAL_TASK, Task.class); Event event2 = Utils.unmarshalFrom(MESSAGE_PAYLOAD, Message.class); - parentQueue.enqueueEvent(event1); - parentQueue.enqueueEvent(event2); + mainQueue.enqueueEvent(event1); + mainQueue.enqueueEvent(event2); - // All queues should receive both events - assertSame(event1, parentQueue.dequeueEventItem(-1).getEvent()); - assertSame(event2, parentQueue.dequeueEventItem(-1).getEvent()); + // All child queues should receive both events + // Note: Use timeout for async processing + assertSame(event1, childQueue1.dequeueEventItem(5000).getEvent()); + assertSame(event2, childQueue1.dequeueEventItem(5000).getEvent()); - assertSame(event1, childQueue1.dequeueEventItem(-1).getEvent()); - assertSame(event2, childQueue1.dequeueEventItem(-1).getEvent()); + assertSame(event1, childQueue2.dequeueEventItem(5000).getEvent()); + assertSame(event2, childQueue2.dequeueEventItem(5000).getEvent()); - assertSame(event1, childQueue2.dequeueEventItem(-1).getEvent()); - assertSame(event2, childQueue2.dequeueEventItem(-1).getEvent()); + assertSame(event1, childQueue3.dequeueEventItem(5000).getEvent()); + assertSame(event2, childQueue3.dequeueEventItem(5000).getEvent()); } @Test public void testChildQueueDequeueIndependently() throws Exception { - EventQueue parentQueue = EventQueue.builder().build(); - EventQueue childQueue1 = parentQueue.tap(); - EventQueue childQueue2 = parentQueue.tap(); + EventQueue mainQueue = createQueueWithEventBus(TASK_ID); + EventQueue childQueue1 = mainQueue.tap(); + EventQueue childQueue2 = mainQueue.tap(); + EventQueue childQueue3 = mainQueue.tap(); Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.class); - parentQueue.enqueueEvent(event); + mainQueue.enqueueEvent(event); - // Dequeue from child1 first - Event child1Event = childQueue1.dequeueEventItem(-1).getEvent(); + // Dequeue from child1 first (use timeout for async processing) + Event child1Event = childQueue1.dequeueEventItem(5000).getEvent(); assertSame(event, child1Event); // child2 should still have the event available - Event child2Event = childQueue2.dequeueEventItem(-1).getEvent(); + Event child2Event = childQueue2.dequeueEventItem(5000).getEvent(); assertSame(event, child2Event); - // Parent should still have the event available - Event parentEvent = parentQueue.dequeueEventItem(-1).getEvent(); - assertSame(event, parentEvent); + // child3 should still have the event available + Event child3Event = childQueue3.dequeueEventItem(5000).getEvent(); + assertSame(event, child3Event); } @Test public void testCloseImmediatePropagationToChildren() throws Exception { - EventQueue parentQueue = EventQueue.builder().build(); + EventQueue parentQueue = createQueueWithEventBus(TASK_ID); EventQueue childQueue = parentQueue.tap(); // Add events to both parent and child @@ -168,7 +239,7 @@ public void testCloseImmediatePropagationToChildren() throws Exception { assertFalse(childQueue.isClosed()); try { - assertNotNull(childQueue.dequeueEventItem(-1)); // Child has the event + assertNotNull(childQueue.dequeueEventItem(5000)); // Child has the event (use timeout) } catch (EventQueueClosedException e) { // This is fine if queue closed before dequeue } @@ -189,27 +260,34 @@ public void testCloseImmediatePropagationToChildren() throws Exception { @Test public void testEnqueueEventWhenClosed() throws Exception { - EventQueue queue = EventQueue.builder().build(); + EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus) + .taskId(TASK_ID) + .build(); + EventQueue childQueue = mainQueue.tap(); Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.class); - queue.close(); // Close the queue first - assertTrue(queue.isClosed()); + childQueue.close(); // Close the child queue first + assertTrue(childQueue.isClosed()); // MainQueue accepts events even when closed (for replication support) // This ensures late-arriving replicated events can be enqueued to closed queues - queue.enqueueEvent(event); + mainQueue.enqueueEvent(event); - // Event should be available for dequeuing - Event dequeuedEvent = queue.dequeueEventItem(-1).getEvent(); + // Create a new child queue to receive the event (closed child won't receive it) + EventQueue newChildQueue = mainQueue.tap(); + EventQueueItem item = newChildQueue.dequeueEventItem(5000); + assertNotNull(item); + Event dequeuedEvent = item.getEvent(); assertSame(event, dequeuedEvent); - // Now queue is closed and empty, should throw exception - assertThrows(EventQueueClosedException.class, () -> queue.dequeueEventItem(-1)); + // Now new child queue is closed and empty, should throw exception + newChildQueue.close(); + assertThrows(EventQueueClosedException.class, () -> newChildQueue.dequeueEventItem(-1)); } @Test public void testDequeueEventWhenClosedAndEmpty() throws Exception { - EventQueue queue = EventQueue.builder().build(); + EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build().tap(); queue.close(); assertTrue(queue.isClosed()); @@ -219,19 +297,27 @@ public void testDequeueEventWhenClosedAndEmpty() throws Exception { @Test public void testDequeueEventWhenClosedButHasEvents() throws Exception { - EventQueue queue = EventQueue.builder().build(); + EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus) + .taskId(TASK_ID) + .build(); + EventQueue childQueue = mainQueue.tap(); Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.class); - queue.enqueueEvent(event); - queue.close(); // Graceful close - events should remain - assertTrue(queue.isClosed()); + // Use callback to wait for event processing instead of polling + waitForEventProcessing(() -> mainQueue.enqueueEvent(event)); - // Should still be able to dequeue existing events - Event dequeuedEvent = queue.dequeueEventItem(-1).getEvent(); + // At this point, event has been processed and distributed to childQueue + childQueue.close(); // Graceful close - events should remain + assertTrue(childQueue.isClosed()); + + // Should still be able to dequeue existing events from closed queue + EventQueueItem item = childQueue.dequeueEventItem(5000); + assertNotNull(item); + Event dequeuedEvent = item.getEvent(); assertSame(event, dequeuedEvent); // Now queue is closed and empty, should throw exception - assertThrows(EventQueueClosedException.class, () -> queue.dequeueEventItem(-1)); + assertThrows(EventQueueClosedException.class, () -> childQueue.dequeueEventItem(-1)); } @Test @@ -246,7 +332,9 @@ public void testEnqueueAndDequeueEvent() throws Exception { public void testDequeueEventNoWait() throws Exception { Event event = Utils.unmarshalFrom(MINIMAL_TASK, Task.class); eventQueue.enqueueEvent(event); - Event dequeuedEvent = eventQueue.dequeueEventItem(-1).getEvent(); + EventQueueItem item = eventQueue.dequeueEventItem(5000); + assertNotNull(item); + Event dequeuedEvent = item.getEvent(); assertSame(event, dequeuedEvent); } @@ -259,7 +347,7 @@ public void testDequeueEventEmptyQueueNoWait() throws Exception { @Test public void testDequeueEventWait() throws Exception { Event event = new TaskStatusUpdateEvent.Builder() - .taskId("task-123") + .taskId(TASK_ID) .contextId("session-xyz") .status(new TaskStatus(TaskState.WORKING)) .isFinal(true) @@ -273,7 +361,7 @@ public void testDequeueEventWait() throws Exception { @Test public void testTaskDone() throws Exception { Event event = new TaskArtifactUpdateEvent.Builder() - .taskId("task-123") + .taskId(TASK_ID) .contextId("session-xyz") .artifact(new Artifact.Builder() .artifactId("11") @@ -349,7 +437,7 @@ public void testCloseIdempotent() throws Exception { assertTrue(eventQueue.isClosed()); // Test with immediate close as well - EventQueue eventQueue2 = EventQueue.builder().build(); + EventQueue eventQueue2 = EventQueueUtil.getEventQueueBuilder(mainEventBus).build(); eventQueue2.close(true); assertTrue(eventQueue2.isClosed()); @@ -363,19 +451,20 @@ public void testCloseIdempotent() throws Exception { */ @Test public void testCloseChildQueues() throws Exception { - EventQueue childQueue = eventQueue.tap(); + EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build(); + EventQueue childQueue = mainQueue.tap(); assertTrue(childQueue != null); // Graceful close - parent closes but children remain open - eventQueue.close(); - assertTrue(eventQueue.isClosed()); + mainQueue.close(); + assertTrue(mainQueue.isClosed()); assertFalse(childQueue.isClosed()); // Child NOT closed on graceful parent close // Immediate close - parent force-closes all children - EventQueue parentQueue2 = EventQueue.builder().build(); - EventQueue childQueue2 = parentQueue2.tap(); - parentQueue2.close(true); // immediate=true - assertTrue(parentQueue2.isClosed()); + EventQueue mainQueue2 = EventQueueUtil.getEventQueueBuilder(mainEventBus).build(); + EventQueue childQueue2 = mainQueue2.tap(); + mainQueue2.close(true); // immediate=true + assertTrue(mainQueue2.isClosed()); assertTrue(childQueue2.isClosed()); // Child IS closed on immediate parent close } @@ -385,7 +474,7 @@ public void testCloseChildQueues() throws Exception { */ @Test public void testMainQueueReferenceCountingStaysOpenWithActiveChildren() throws Exception { - EventQueue mainQueue = EventQueue.builder().build(); + EventQueue mainQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build(); EventQueue child1 = mainQueue.tap(); EventQueue child2 = mainQueue.tap(); diff --git a/server-common/src/test/java/io/a2a/server/events/EventQueueUtil.java b/server-common/src/test/java/io/a2a/server/events/EventQueueUtil.java index 39201c1f6..6c9ed4a17 100644 --- a/server-common/src/test/java/io/a2a/server/events/EventQueueUtil.java +++ b/server-common/src/test/java/io/a2a/server/events/EventQueueUtil.java @@ -1,8 +1,39 @@ package io.a2a.server.events; +import java.util.concurrent.atomic.AtomicInteger; + public class EventQueueUtil { - // Since EventQueue.builder() is package protected, add a method to expose it - public static EventQueue.EventQueueBuilder getEventQueueBuilder() { - return EventQueue.builder(); + // Counter for generating unique test taskIds + private static final AtomicInteger TASK_ID_COUNTER = new AtomicInteger(0); + + /** + * Get an EventQueue builder pre-configured with the shared test MainEventBus and a unique taskId. + *

+ * Note: Returns MainQueue - tests should call .tap() if they need to consume events. + *

+ * + * @return builder with TEST_EVENT_BUS and unique taskId already set + */ + public static EventQueue.EventQueueBuilder getEventQueueBuilder(MainEventBus eventBus) { + return EventQueue.builder(eventBus) + .taskId("test-task-" + TASK_ID_COUNTER.incrementAndGet()); + } + + /** + * Start a MainEventBusProcessor instance. + * + * @param processor the processor to start + */ + public static void start(MainEventBusProcessor processor) { + processor.start(); + } + + /** + * Stop a MainEventBusProcessor instance. + * + * @param processor the processor to stop + */ + public static void stop(MainEventBusProcessor processor) { + processor.stop(); } } diff --git a/server-common/src/test/java/io/a2a/server/events/InMemoryQueueManagerTest.java b/server-common/src/test/java/io/a2a/server/events/InMemoryQueueManagerTest.java index 1eca1b739..808a1107a 100644 --- a/server-common/src/test/java/io/a2a/server/events/InMemoryQueueManagerTest.java +++ b/server-common/src/test/java/io/a2a/server/events/InMemoryQueueManagerTest.java @@ -14,7 +14,10 @@ import java.util.concurrent.ExecutionException; import java.util.stream.IntStream; +import io.a2a.server.tasks.InMemoryTaskStore; import io.a2a.server.tasks.MockTaskStateProvider; +import io.a2a.server.tasks.PushNotificationSender; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -22,17 +25,31 @@ public class InMemoryQueueManagerTest { private InMemoryQueueManager queueManager; private MockTaskStateProvider taskStateProvider; + private InMemoryTaskStore taskStore; + private MainEventBus mainEventBus; + private MainEventBusProcessor mainEventBusProcessor; + private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {}; @BeforeEach public void setUp() { taskStateProvider = new MockTaskStateProvider(); - queueManager = new InMemoryQueueManager(taskStateProvider); + taskStore = new InMemoryTaskStore(); + mainEventBus = new MainEventBus(); + mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER); + EventQueueUtil.start(mainEventBusProcessor); + + queueManager = new InMemoryQueueManager(taskStateProvider, mainEventBus); + } + + @AfterEach + public void tearDown() { + EventQueueUtil.stop(mainEventBusProcessor); } @Test public void testAddNewQueue() { String taskId = "test_task_id"; - EventQueue queue = EventQueue.builder().build(); + EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build(); queueManager.add(taskId, queue); @@ -43,8 +60,8 @@ public void testAddNewQueue() { @Test public void testAddExistingQueueThrowsException() { String taskId = "test_task_id"; - EventQueue queue1 = EventQueue.builder().build(); - EventQueue queue2 = EventQueue.builder().build(); + EventQueue queue1 = EventQueueUtil.getEventQueueBuilder(mainEventBus).build(); + EventQueue queue2 = EventQueueUtil.getEventQueueBuilder(mainEventBus).build(); queueManager.add(taskId, queue1); @@ -56,7 +73,7 @@ public void testAddExistingQueueThrowsException() { @Test public void testGetExistingQueue() { String taskId = "test_task_id"; - EventQueue queue = EventQueue.builder().build(); + EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build(); queueManager.add(taskId, queue); EventQueue result = queueManager.get(taskId); @@ -73,7 +90,7 @@ public void testGetNonexistentQueue() { @Test public void testTapExistingQueue() { String taskId = "test_task_id"; - EventQueue queue = EventQueue.builder().build(); + EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build(); queueManager.add(taskId, queue); EventQueue tappedQueue = queueManager.tap(taskId); @@ -94,7 +111,7 @@ public void testTapNonexistentQueue() { @Test public void testCloseExistingQueue() { String taskId = "test_task_id"; - EventQueue queue = EventQueue.builder().build(); + EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build(); queueManager.add(taskId, queue); queueManager.close(taskId); @@ -129,7 +146,7 @@ public void testCreateOrTapNewQueue() { @Test public void testCreateOrTapExistingQueue() { String taskId = "test_task_id"; - EventQueue originalQueue = EventQueue.builder().build(); + EventQueue originalQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build(); queueManager.add(taskId, originalQueue); EventQueue result = queueManager.createOrTap(taskId); @@ -151,7 +168,7 @@ public void testConcurrentOperations() throws InterruptedException, ExecutionExc // Add tasks concurrently List> addFutures = taskIds.stream() .map(taskId -> CompletableFuture.supplyAsync(() -> { - EventQueue queue = EventQueue.builder().build(); + EventQueue queue = EventQueueUtil.getEventQueueBuilder(mainEventBus).build(); queueManager.add(taskId, queue); return taskId; })) diff --git a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java index c11330f33..65cdbbf4d 100644 --- a/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java +++ b/server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java @@ -21,7 +21,10 @@ import io.a2a.server.agentexecution.RequestContext; import io.a2a.server.events.EventQueue; import io.a2a.server.events.EventQueueItem; +import io.a2a.server.events.EventQueueUtil; import io.a2a.server.events.InMemoryQueueManager; +import io.a2a.server.events.MainEventBus; +import io.a2a.server.events.MainEventBusProcessor; import io.a2a.server.tasks.BasePushNotificationSender; import io.a2a.server.tasks.InMemoryPushNotificationConfigStore; import io.a2a.server.tasks.InMemoryTaskStore; @@ -65,6 +68,8 @@ public class AbstractA2ARequestHandlerTest { private static final String PREFERRED_TRANSPORT = "preferred-transport"; private static final String A2A_REQUESTHANDLER_TEST_PROPERTIES = "/a2a-requesthandler-test.properties"; + private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {}; + protected AgentExecutor executor; protected TaskStore taskStore; protected RequestHandler requestHandler; @@ -72,6 +77,8 @@ public class AbstractA2ARequestHandlerTest { protected AgentExecutorMethod agentExecutorCancel; protected InMemoryQueueManager queueManager; protected TestHttpClient httpClient; + protected MainEventBus mainEventBus; + protected MainEventBusProcessor mainEventBusProcessor; protected final Executor internalExecutor = Executors.newCachedThreadPool(); @@ -95,19 +102,32 @@ public void cancel(RequestContext context, EventQueue eventQueue) throws JSONRPC InMemoryTaskStore inMemoryTaskStore = new InMemoryTaskStore(); taskStore = inMemoryTaskStore; - queueManager = new InMemoryQueueManager(inMemoryTaskStore); + + // Create push notification components BEFORE MainEventBusProcessor httpClient = new TestHttpClient(); PushNotificationConfigStore pushConfigStore = new InMemoryPushNotificationConfigStore(); PushNotificationSender pushSender = new BasePushNotificationSender(pushConfigStore, httpClient); + // Create MainEventBus and MainEventBusProcessor (production code path) + mainEventBus = new MainEventBus(); + mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, pushSender); + EventQueueUtil.start(mainEventBusProcessor); + + queueManager = new InMemoryQueueManager(inMemoryTaskStore, mainEventBus); + requestHandler = DefaultRequestHandler.create( - executor, taskStore, queueManager, pushConfigStore, pushSender, internalExecutor); + executor, taskStore, queueManager, pushConfigStore, internalExecutor); } @AfterEach public void cleanup() { agentExecutorExecute = null; agentExecutorCancel = null; + + // Stop MainEventBusProcessor background thread + if (mainEventBusProcessor != null) { + EventQueueUtil.stop(mainEventBusProcessor); + } } protected static AgentCard createAgentCard(boolean streaming, boolean pushNotifications, boolean stateTransitionHistory) { diff --git a/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java b/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java index acaa531ad..04900ba5c 100644 --- a/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java +++ b/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java @@ -19,9 +19,13 @@ import io.a2a.server.agentexecution.RequestContext; import io.a2a.server.auth.UnauthenticatedUser; import io.a2a.server.events.EventQueue; +import io.a2a.server.events.EventQueueUtil; import io.a2a.server.events.InMemoryQueueManager; +import io.a2a.server.events.MainEventBus; +import io.a2a.server.events.MainEventBusProcessor; import io.a2a.server.tasks.InMemoryPushNotificationConfigStore; import io.a2a.server.tasks.InMemoryTaskStore; +import io.a2a.server.tasks.PushNotificationSender; import io.a2a.server.tasks.TaskUpdater; import io.a2a.spec.JSONRPCError; import io.a2a.spec.Message; @@ -32,6 +36,7 @@ import io.a2a.spec.TaskState; import io.a2a.spec.TaskStatus; import io.a2a.spec.TextPart; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -50,12 +55,23 @@ public class DefaultRequestHandlerTest { private InMemoryQueueManager queueManager; private TestAgentExecutor agentExecutor; private ServerCallContext serverCallContext; + private MainEventBus mainEventBus; + private MainEventBusProcessor mainEventBusProcessor; + + private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {}; @BeforeEach void setUp() { taskStore = new InMemoryTaskStore(); + + // Create MainEventBus and MainEventBusProcessor (production code path) + mainEventBus = new MainEventBus(); + mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER); + EventQueueUtil.start(mainEventBusProcessor); + // Pass taskStore as TaskStateProvider to queueManager for task-aware queue management - queueManager = new InMemoryQueueManager(taskStore); + queueManager = new InMemoryQueueManager(taskStore, mainEventBus); + agentExecutor = new TestAgentExecutor(); requestHandler = DefaultRequestHandler.create( @@ -63,13 +79,52 @@ void setUp() { taskStore, queueManager, null, // pushConfigStore - null, // pushSender Executors.newCachedThreadPool() ); serverCallContext = new ServerCallContext(UnauthenticatedUser.INSTANCE, Map.of(), Set.of()); } + @AfterEach + void tearDown() { + // Stop MainEventBusProcessor background thread + if (mainEventBusProcessor != null) { + mainEventBusProcessor.setCallback(null); // Clear any test callbacks + EventQueueUtil.stop(mainEventBusProcessor); + } + } + + /** + * Helper to wait for MainEventBusProcessor to finalize a task. + * Replaces polling patterns with deterministic callback-based waiting. + * + * @param action the action that triggers task finalization (e.g., enqueuing a final event) + * @throws InterruptedException if waiting is interrupted + * @throws AssertionError if finalization doesn't complete within timeout + */ + private void waitForTaskFinalization(Runnable action) throws InterruptedException { + CountDownLatch finalizationLatch = new CountDownLatch(1); + mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() { + @Override + public void onEventProcessed(String taskId, io.a2a.spec.Event event) { + // Not used for task finalization wait + } + + @Override + public void onTaskFinalized(String taskId) { + finalizationLatch.countDown(); + } + }); + + try { + action.run(); + assertTrue(finalizationLatch.await(5, TimeUnit.SECONDS), + "MainEventBusProcessor should have finalized the task within timeout"); + } finally { + mainEventBusProcessor.setCallback(null); + } + } + /** * Test that multiple blocking messages to the same task work correctly * when agent doesn't emit final events (fire-and-forget pattern). @@ -577,32 +632,15 @@ void testNonBlockingMessagePersistsAllEventsInBackground() throws Exception { // At this point, the non-blocking call has returned, but the agent is still running - // Allow the agent to emit the final COMPLETED event - allowCompletion.countDown(); + // Assertion 2: Wait for the final task to be processed and finalized in background + // Use callback to wait for task finalization instead of polling + waitForTaskFinalization(() -> allowCompletion.countDown()); - // Assertion 2: Poll for the final task state to be persisted in background - // Use polling loop instead of fixed sleep for faster and more reliable test - long timeoutMs = 5000; - long startTime = System.currentTimeMillis(); - Task persistedTask = null; - boolean completedStateFound = false; - - while (System.currentTimeMillis() - startTime < timeoutMs) { - persistedTask = taskStore.get(taskId); - if (persistedTask != null && persistedTask.getStatus().state() == TaskState.COMPLETED) { - completedStateFound = true; - break; - } - Thread.sleep(100); // Poll every 100ms - } - - assertTrue(persistedTask != null, "Task should be persisted to store"); - assertTrue( - completedStateFound, - "Final task state should be COMPLETED (background consumption should have processed it), got: " + - (persistedTask != null ? persistedTask.getStatus().state() : "null") + - " after " + (System.currentTimeMillis() - startTime) + "ms" - ); + // Verify the task was persisted with COMPLETED state + Task persistedTask = taskStore.get(taskId); + assertNotNull(persistedTask, "Task should be persisted to store"); + assertEquals(TaskState.COMPLETED, persistedTask.getStatus().state(), + "Final task state should be COMPLETED (background consumption should have processed it)"); } /** @@ -779,16 +817,42 @@ void testBlockingCallReturnsCompleteTaskWithArtifacts() throws Exception { updater.complete(); }); - // Call blocking onMessageSend - should wait for ALL events - Object result = requestHandler.onMessageSend(params, serverCallContext); + // Use callback to ensure task finalization is complete before checking TaskStore + // This ensures MainEventBusProcessor has finished persisting the final state + CountDownLatch finalizationLatch = new CountDownLatch(1); + mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() { + @Override + public void onEventProcessed(String taskId, io.a2a.spec.Event event) { + // Not used + } - // The returned result should be a Task with ALL artifacts - assertTrue(result instanceof Task, "Result should be a Task"); - Task returnedTask = (Task) result; + @Override + public void onTaskFinalized(String taskId) { + finalizationLatch.countDown(); + } + }); + + Task returnedTask; + try { + // Call blocking onMessageSend - should wait for ALL events + Object result = requestHandler.onMessageSend(params, serverCallContext); - // Verify task is completed - assertEquals(TaskState.COMPLETED, returnedTask.getStatus().state(), - "Returned task should be COMPLETED"); + // Wait for finalization callback to ensure TaskStore is fully updated + assertTrue(finalizationLatch.await(5, TimeUnit.SECONDS), + "Task should be finalized within timeout"); + + // The returned result should be a Task with ALL artifacts + assertTrue(result instanceof Task, "Result should be a Task"); + returnedTask = (Task) result; + + // Fetch final state from TaskStore (guaranteed to be persisted after callback) + returnedTask = taskStore.get(taskId); + + assertEquals(TaskState.COMPLETED, returnedTask.getStatus().state(), + "Returned task should be COMPLETED"); + } finally { + mainEventBusProcessor.setCallback(null); + } // Verify artifacts are included in the returned task assertNotNull(returnedTask.getArtifacts(), @@ -823,7 +887,6 @@ void testBlockingMessageStoresPushNotificationConfigForNewTask() throws Exceptio taskStore, queueManager, pushConfigStore, // Add push config store - null, // pushSender Executors.newCachedThreadPool() ); @@ -893,7 +956,6 @@ void testMessageStoresPushNotificationConfigForExistingTask() throws Exception { taskStore, queueManager, pushConfigStore, // Add push config store - null, // pushSender Executors.newCachedThreadPool() ); diff --git a/server-common/src/test/java/io/a2a/server/tasks/ResultAggregatorTest.java b/server-common/src/test/java/io/a2a/server/tasks/ResultAggregatorTest.java index 0db54c373..6c95b494d 100644 --- a/server-common/src/test/java/io/a2a/server/tasks/ResultAggregatorTest.java +++ b/server-common/src/test/java/io/a2a/server/tasks/ResultAggregatorTest.java @@ -11,18 +11,25 @@ import static org.mockito.Mockito.when; import java.util.Collections; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import io.a2a.server.events.EventConsumer; import io.a2a.server.events.EventQueue; +import io.a2a.server.events.EventQueueUtil; import io.a2a.server.events.InMemoryQueueManager; +import io.a2a.server.events.MainEventBus; +import io.a2a.server.events.MainEventBusProcessor; +import io.a2a.spec.Event; import io.a2a.spec.EventKind; import io.a2a.spec.Message; import io.a2a.spec.Task; import io.a2a.spec.TaskState; import io.a2a.spec.TaskStatus; import io.a2a.spec.TextPart; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; @@ -69,6 +76,38 @@ private Task createSampleTask(String taskId, TaskState statusState, String conte .build(); } + /** + * Helper to wait for MainEventBusProcessor to process an event. + * Replaces polling patterns with deterministic callback-based waiting. + * + * @param processor the processor to set callback on + * @param action the action that triggers event processing + * @throws InterruptedException if waiting is interrupted + * @throws AssertionError if processing doesn't complete within timeout + */ + private void waitForEventProcessing(MainEventBusProcessor processor, Runnable action) throws InterruptedException { + CountDownLatch processingLatch = new CountDownLatch(1); + processor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() { + @Override + public void onEventProcessed(String taskId, Event event) { + processingLatch.countDown(); + } + + @Override + public void onTaskFinalized(String taskId) { + // Not needed for basic event processing wait + } + }); + + try { + action.run(); + assertTrue(processingLatch.await(5, TimeUnit.SECONDS), + "MainEventBusProcessor should have processed the event within timeout"); + } finally { + processor.setCallback(null); + } + } + // Basic functionality tests @@ -197,17 +236,25 @@ void testGetCurrentResultWithMessageTakesPrecedence() { @Test void testConsumeAndBreakNonBlocking() throws Exception { // Test that with blocking=false, the method returns after the first event - Task firstEvent = createSampleTask("non_blocking_task", TaskState.WORKING, "ctx1"); + String taskId = "test-task"; + Task firstEvent = createSampleTask(taskId, TaskState.WORKING, "ctx1"); // After processing firstEvent, the current result will be that task when(mockTaskManager.getTask()).thenReturn(firstEvent); // Create an event queue using QueueManager (which has access to builder) + MainEventBus mainEventBus = new MainEventBus(); + InMemoryTaskStore taskStore = new InMemoryTaskStore(); + MainEventBusProcessor processor = new MainEventBusProcessor(mainEventBus, taskStore, task -> {}); + EventQueueUtil.start(processor); + InMemoryQueueManager queueManager = - new InMemoryQueueManager(new MockTaskStateProvider()); + new InMemoryQueueManager(new MockTaskStateProvider(), mainEventBus); - EventQueue queue = queueManager.getEventQueueBuilder("test-task").build(); - queue.enqueueEvent(firstEvent); + EventQueue queue = queueManager.getEventQueueBuilder(taskId).build().tap(); + + // Use callback to wait for event processing (replaces polling) + waitForEventProcessing(processor, () -> queue.enqueueEvent(firstEvent)); // Create real EventConsumer with the queue EventConsumer eventConsumer = @@ -221,11 +268,15 @@ void testConsumeAndBreakNonBlocking() throws Exception { assertEquals(firstEvent, result.eventType()); assertTrue(result.interrupted()); - verify(mockTaskManager).process(firstEvent); + // NOTE: ResultAggregator no longer calls taskManager.process() + // That responsibility has moved to MainEventBusProcessor for centralized persistence // getTask() is called at least once for the return value (line 255) // May be called once more if debug logging executes in time (line 209) // The async consumer may or may not execute before verification, so we accept 1-2 calls verify(mockTaskManager, atLeast(1)).getTask(); verify(mockTaskManager, atMost(2)).getTask(); + + // Cleanup: stop the processor + EventQueueUtil.stop(processor); } } diff --git a/server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java b/server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java index 0e99f57b5..871ba352f 100644 --- a/server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java +++ b/server-common/src/test/java/io/a2a/server/tasks/TaskUpdaterTest.java @@ -14,7 +14,10 @@ import io.a2a.server.agentexecution.RequestContext; import io.a2a.server.events.EventQueue; +import io.a2a.server.events.EventQueueItem; import io.a2a.server.events.EventQueueUtil; +import io.a2a.server.events.MainEventBus; +import io.a2a.server.events.MainEventBusProcessor; import io.a2a.spec.Event; import io.a2a.spec.Message; import io.a2a.spec.Part; @@ -22,6 +25,7 @@ import io.a2a.spec.TaskState; import io.a2a.spec.TaskStatusUpdateEvent; import io.a2a.spec.TextPart; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -38,14 +42,27 @@ public class TaskUpdaterTest { private static final List> SAMPLE_PARTS = List.of(new TextPart("Test message")); + private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {}; + EventQueue eventQueue; + private MainEventBus mainEventBus; + private MainEventBusProcessor mainEventBusProcessor; private TaskUpdater taskUpdater; @BeforeEach public void init() { - eventQueue = EventQueueUtil.getEventQueueBuilder().build(); + // Set up MainEventBus and processor for production-like test environment + InMemoryTaskStore taskStore = new InMemoryTaskStore(); + mainEventBus = new MainEventBus(); + mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER); + EventQueueUtil.start(mainEventBusProcessor); + + eventQueue = EventQueueUtil.getEventQueueBuilder(mainEventBus) + .taskId(TEST_TASK_ID) + .mainEventBus(mainEventBus) + .build().tap(); RequestContext context = new RequestContext.Builder() .setTaskId(TEST_TASK_ID) .setContextId(TEST_TASK_CONTEXT_ID) @@ -53,10 +70,19 @@ public void init() { taskUpdater = new TaskUpdater(context, eventQueue); } + @AfterEach + public void cleanup() { + if (mainEventBusProcessor != null) { + EventQueueUtil.stop(mainEventBusProcessor); + } + } + @Test public void testAddArtifactWithCustomIdAndName() throws Exception { taskUpdater.addArtifact(SAMPLE_PARTS, "custom-artifact-id", "Custom Artifact", null); - Event event = eventQueue.dequeueEventItem(0).getEvent(); + EventQueueItem item = eventQueue.dequeueEventItem(5000); + assertNotNull(item); + Event event = item.getEvent(); assertNotNull(event); assertInstanceOf(TaskArtifactUpdateEvent.class, event); @@ -239,7 +265,9 @@ public void testNewAgentMessageWithMetadata() throws Exception { @Test public void testAddArtifactWithAppendTrue() throws Exception { taskUpdater.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, true, null); - Event event = eventQueue.dequeueEventItem(0).getEvent(); + EventQueueItem item = eventQueue.dequeueEventItem(5000); + assertNotNull(item); + Event event = item.getEvent(); assertNotNull(event); assertInstanceOf(TaskArtifactUpdateEvent.class, event); @@ -258,7 +286,9 @@ public void testAddArtifactWithAppendTrue() throws Exception { @Test public void testAddArtifactWithLastChunkTrue() throws Exception { taskUpdater.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, null, true); - Event event = eventQueue.dequeueEventItem(0).getEvent(); + EventQueueItem item = eventQueue.dequeueEventItem(5000); + assertNotNull(item); + Event event = item.getEvent(); assertNotNull(event); assertInstanceOf(TaskArtifactUpdateEvent.class, event); @@ -273,7 +303,9 @@ public void testAddArtifactWithLastChunkTrue() throws Exception { @Test public void testAddArtifactWithAppendAndLastChunk() throws Exception { taskUpdater.addArtifact(SAMPLE_PARTS, "artifact-id", "Test Artifact", null, true, false); - Event event = eventQueue.dequeueEventItem(0).getEvent(); + EventQueueItem item = eventQueue.dequeueEventItem(5000); + assertNotNull(item); + Event event = item.getEvent(); assertNotNull(event); assertInstanceOf(TaskArtifactUpdateEvent.class, event); @@ -287,7 +319,9 @@ public void testAddArtifactWithAppendAndLastChunk() throws Exception { @Test public void testAddArtifactGeneratesIdWhenNull() throws Exception { taskUpdater.addArtifact(SAMPLE_PARTS, null, "Test Artifact", null); - Event event = eventQueue.dequeueEventItem(0).getEvent(); + EventQueueItem item = eventQueue.dequeueEventItem(5000); + assertNotNull(item); + Event event = item.getEvent(); assertNotNull(event); assertInstanceOf(TaskArtifactUpdateEvent.class, event); @@ -383,7 +417,9 @@ public void testConcurrentCompletionAttempts() throws Exception { thread2.join(); // Exactly one event should have been queued - Event event = eventQueue.dequeueEventItem(0).getEvent(); + EventQueueItem item = eventQueue.dequeueEventItem(5000); + assertNotNull(item); + Event event = item.getEvent(); assertNotNull(event); assertInstanceOf(TaskStatusUpdateEvent.class, event); @@ -396,7 +432,10 @@ public void testConcurrentCompletionAttempts() throws Exception { } private TaskStatusUpdateEvent checkTaskStatusUpdateEventOnQueue(boolean isFinal, TaskState state, Message statusMessage) throws Exception { - Event event = eventQueue.dequeueEventItem(0).getEvent(); + // Wait up to 5 seconds for event (async MainEventBusProcessor needs time to distribute) + EventQueueItem item = eventQueue.dequeueEventItem(5000); + assertNotNull(item); + Event event = item.getEvent(); assertNotNull(event); assertInstanceOf(TaskStatusUpdateEvent.class, event); @@ -408,6 +447,7 @@ private TaskStatusUpdateEvent checkTaskStatusUpdateEventOnQueue(boolean isFinal, assertEquals(state, tsue.getStatus().state()); assertEquals(statusMessage, tsue.getStatus().message()); + // Check no additional events (still use 0 timeout for this check) assertNull(eventQueue.dequeueEventItem(0)); return tsue; diff --git a/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java b/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java index 44c465508..dff832ecb 100644 --- a/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java +++ b/transport/grpc/src/test/java/io/a2a/transport/grpc/handler/GrpcHandlerTest.java @@ -269,7 +269,7 @@ public void testPushNotificationsNotSupportedError() throws Exception { public void testOnGetPushNotificationNoPushNotifierConfig() throws Exception { // Create request handler without a push notifier DefaultRequestHandler requestHandler = - new DefaultRequestHandler(executor, taskStore, queueManager, null, null, internalExecutor); + new DefaultRequestHandler(executor, taskStore, queueManager,null, internalExecutor); AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(false, true, false); GrpcHandler handler = new TestGrpcHandler(card, requestHandler, internalExecutor); String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId(); @@ -281,7 +281,7 @@ public void testOnGetPushNotificationNoPushNotifierConfig() throws Exception { public void testOnSetPushNotificationNoPushNotifierConfig() throws Exception { // Create request handler without a push notifier DefaultRequestHandler requestHandler = DefaultRequestHandler.create( - executor, taskStore, queueManager, null, null, internalExecutor); + executor, taskStore, queueManager, null, internalExecutor); AgentCard card = AbstractA2ARequestHandlerTest.createAgentCard(false, true, false); GrpcHandler handler = new TestGrpcHandler(card, requestHandler, internalExecutor); String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId(); @@ -656,7 +656,7 @@ public void testListPushNotificationConfigNotSupported() throws Exception { @Test public void testListPushNotificationConfigNoPushConfigStore() { DefaultRequestHandler requestHandler = DefaultRequestHandler.create( - executor, taskStore, queueManager, null, null, internalExecutor); + executor, taskStore, queueManager, null, internalExecutor); GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); taskStore.save(AbstractA2ARequestHandlerTest.MINIMAL_TASK); agentExecutorExecute = (context, eventQueue) -> { @@ -729,7 +729,7 @@ public void testDeletePushNotificationConfigNotSupported() throws Exception { @Test public void testDeletePushNotificationConfigNoPushConfigStore() { DefaultRequestHandler requestHandler = DefaultRequestHandler.create( - executor, taskStore, queueManager, null, null, internalExecutor); + executor, taskStore, queueManager, null, internalExecutor); GrpcHandler handler = new TestGrpcHandler(AbstractA2ARequestHandlerTest.CARD, requestHandler, internalExecutor); String NAME = "tasks/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId() + "/pushNotificationConfigs/" + AbstractA2ARequestHandlerTest.MINIMAL_TASK.getId(); DeleteTaskPushNotificationConfigRequest request = DeleteTaskPushNotificationConfigRequest.newBuilder() diff --git a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java index dc2088a2d..916bfafd3 100644 --- a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java +++ b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java @@ -164,36 +164,9 @@ public void testOnMessageNewMessageSuccess() { SendMessageRequest request = new SendMessageRequest("1", new MessageSendParams(message, null, null)); SendMessageResponse response = handler.onMessageSend(request, callContext); assertNull(response.getError()); - // The Python implementation returns a Task here, but then again they are using hardcoded mocks and - // bypassing the whole EventQueue. - // If we were to send a Task in agentExecutorExecute EventConsumer.consumeAll() would not exit due to - // the Task not having a 'final' state - // - // See testOnMessageNewMessageSuccessMocks() for a test more similar to the Python implementation Assertions.assertSame(message, response.getResult()); } - @Test - public void testOnMessageNewMessageSuccessMocks() { - JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); - - Message message = new Message.Builder(MESSAGE) - .taskId(MINIMAL_TASK.getId()) - .contextId(MINIMAL_TASK.getContextId()) - .build(); - - SendMessageRequest request = new SendMessageRequest("1", new MessageSendParams(message, null, null)); - SendMessageResponse response; - try (MockedConstruction mocked = Mockito.mockConstruction( - EventConsumer.class, - (mock, context) -> {Mockito.doReturn(ZeroPublisher.fromItems(wrapEvent(MINIMAL_TASK))).when(mock).consumeAll(); - Mockito.doCallRealMethod().when(mock).createAgentRunnableDoneCallback();})){ - response = handler.onMessageSend(request, callContext); - } - assertNull(response.getError()); - Assertions.assertSame(MINIMAL_TASK, response.getResult()); - } - @Test public void testOnMessageNewMessageWithExistingTaskSuccess() { JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); @@ -208,37 +181,9 @@ public void testOnMessageNewMessageWithExistingTaskSuccess() { SendMessageRequest request = new SendMessageRequest("1", new MessageSendParams(message, null, null)); SendMessageResponse response = handler.onMessageSend(request, callContext); assertNull(response.getError()); - // The Python implementation returns a Task here, but then again they are using hardcoded mocks and - // bypassing the whole EventQueue. - // If we were to send a Task in agentExecutorExecute EventConsumer.consumeAll() would not exit due to - // the Task not having a 'final' state - // - // See testOnMessageNewMessageWithExistingTaskSuccessMocks() for a test more similar to the Python implementation Assertions.assertSame(message, response.getResult()); } - @Test - public void testOnMessageNewMessageWithExistingTaskSuccessMocks() { - JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); - taskStore.save(MINIMAL_TASK); - - Message message = new Message.Builder(MESSAGE) - .taskId(MINIMAL_TASK.getId()) - .contextId(MINIMAL_TASK.getContextId()) - .build(); - SendMessageRequest request = new SendMessageRequest("1", new MessageSendParams(message, null, null)); - SendMessageResponse response; - try (MockedConstruction mocked = Mockito.mockConstruction( - EventConsumer.class, - (mock, context) -> { - Mockito.doReturn(ZeroPublisher.fromItems(wrapEvent(MINIMAL_TASK))).when(mock).consumeAll();})){ - response = handler.onMessageSend(request, callContext); - } - assertNull(response.getError()); - Assertions.assertSame(MINIMAL_TASK, response.getResult()); - - } - @Test public void testOnMessageError() { // See testMessageOnErrorMocks() for a test more similar to the Python implementation, using mocks for @@ -338,9 +283,24 @@ public void onComplete() { @Test public void testOnMessageStreamNewMessageMultipleEventsSuccess() throws InterruptedException { - JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); + // Setup callback to wait for all 3 events to be processed by MainEventBusProcessor + CountDownLatch processingLatch = new CountDownLatch(3); + mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() { + @Override + public void onEventProcessed(String taskId, io.a2a.spec.Event event) { + processingLatch.countDown(); + } + + @Override + public void onTaskFinalized(String taskId) { + // Not needed for this test + } + }); + + try { + JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); - // Create multiple events to be sent during streaming + // Create multiple events to be sent during streaming Task taskEvent = new Task.Builder(MINIMAL_TASK) .status(new TaskStatus(TaskState.WORKING)) .build(); @@ -415,33 +375,40 @@ public void onComplete() { } }); - // Wait for all events to be received - Assertions.assertTrue(latch.await(2, TimeUnit.SECONDS), - "Expected to receive 3 events within timeout"); - - // Assert no error occurred during streaming - Assertions.assertNull(error.get(), "No error should occur during streaming"); - - // Verify that all 3 events were received - assertEquals(3, results.size(), "Should have received exactly 3 events"); - - // Verify the first event is the task - Task receivedTask = assertInstanceOf(Task.class, results.get(0), "First event should be a Task"); - assertEquals(MINIMAL_TASK.getId(), receivedTask.getId()); - assertEquals(MINIMAL_TASK.getContextId(), receivedTask.getContextId()); - assertEquals(TaskState.WORKING, receivedTask.getStatus().state()); - - // Verify the second event is the artifact update - TaskArtifactUpdateEvent receivedArtifact = assertInstanceOf(TaskArtifactUpdateEvent.class, results.get(1), - "Second event should be a TaskArtifactUpdateEvent"); - assertEquals(MINIMAL_TASK.getId(), receivedArtifact.getTaskId()); - assertEquals("artifact-1", receivedArtifact.getArtifact().artifactId()); - - // Verify the third event is the status update - TaskStatusUpdateEvent receivedStatus = assertInstanceOf(TaskStatusUpdateEvent.class, results.get(2), - "Third event should be a TaskStatusUpdateEvent"); - assertEquals(MINIMAL_TASK.getId(), receivedStatus.getTaskId()); - assertEquals(TaskState.COMPLETED, receivedStatus.getStatus().state()); + // Wait for all events to be received (increased timeout for async processing) + Assertions.assertTrue(latch.await(10, TimeUnit.SECONDS), + "Expected to receive 3 events within timeout"); + + // Wait for MainEventBusProcessor to complete processing all 3 events + Assertions.assertTrue(processingLatch.await(5, TimeUnit.SECONDS), + "MainEventBusProcessor should have processed all 3 events"); + + // Assert no error occurred during streaming + Assertions.assertNull(error.get(), "No error should occur during streaming"); + + // Verify that all 3 events were received + assertEquals(3, results.size(), "Should have received exactly 3 events"); + + // Verify the first event is the task + Task receivedTask = assertInstanceOf(Task.class, results.get(0), "First event should be a Task"); + assertEquals(MINIMAL_TASK.getId(), receivedTask.getId()); + assertEquals(MINIMAL_TASK.getContextId(), receivedTask.getContextId()); + assertEquals(TaskState.WORKING, receivedTask.getStatus().state()); + + // Verify the second event is the artifact update + TaskArtifactUpdateEvent receivedArtifact = assertInstanceOf(TaskArtifactUpdateEvent.class, results.get(1), + "Second event should be a TaskArtifactUpdateEvent"); + assertEquals(MINIMAL_TASK.getId(), receivedArtifact.getTaskId()); + assertEquals("artifact-1", receivedArtifact.getArtifact().artifactId()); + + // Verify the third event is the status update + TaskStatusUpdateEvent receivedStatus = assertInstanceOf(TaskStatusUpdateEvent.class, results.get(2), + "Third event should be a TaskStatusUpdateEvent"); + assertEquals(MINIMAL_TASK.getId(), receivedStatus.getTaskId()); + assertEquals(TaskState.COMPLETED, receivedStatus.getStatus().state()); + } finally { + mainEventBusProcessor.setCallback(null); + } } @Test @@ -710,32 +677,47 @@ public void testGetPushNotificationConfigSuccess() { @Test public void testOnMessageStreamNewMessageSendPushNotificationSuccess() throws Exception { - JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); - taskStore.save(MINIMAL_TASK); - - List events = List.of( - MINIMAL_TASK, - new TaskArtifactUpdateEvent.Builder() - .taskId(MINIMAL_TASK.getId()) - .contextId(MINIMAL_TASK.getContextId()) - .artifact(new Artifact.Builder() - .artifactId("11") - .parts(new TextPart("text")) - .build()) - .build(), - new TaskStatusUpdateEvent.Builder() - .taskId(MINIMAL_TASK.getId()) - .contextId(MINIMAL_TASK.getContextId()) - .status(new TaskStatus(TaskState.COMPLETED)) - .build()); - + // Setup callback to wait for all 3 events to be processed by MainEventBusProcessor + CountDownLatch processingLatch = new CountDownLatch(3); + mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() { + @Override + public void onEventProcessed(String taskId, io.a2a.spec.Event event) { + processingLatch.countDown(); + } - agentExecutorExecute = (context, eventQueue) -> { - // Hardcode the events to send here - for (Event event : events) { - eventQueue.enqueueEvent(event); + @Override + public void onTaskFinalized(String taskId) { + // Not needed for this test } - }; + }); + + try { + JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); + taskStore.save(MINIMAL_TASK); + + List events = List.of( + MINIMAL_TASK, + new TaskArtifactUpdateEvent.Builder() + .taskId(MINIMAL_TASK.getId()) + .contextId(MINIMAL_TASK.getContextId()) + .artifact(new Artifact.Builder() + .artifactId("11") + .parts(new TextPart("text")) + .build()) + .build(), + new TaskStatusUpdateEvent.Builder() + .taskId(MINIMAL_TASK.getId()) + .contextId(MINIMAL_TASK.getContextId()) + .status(new TaskStatus(TaskState.COMPLETED)) + .build()); + + + agentExecutorExecute = (context, eventQueue) -> { + // Hardcode the events to send here + for (Event event : events) { + eventQueue.enqueueEvent(event); + } + }; TaskPushNotificationConfig config = new TaskPushNotificationConfig( @@ -786,6 +768,11 @@ public void onComplete() { }); Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS)); + + // Wait for MainEventBusProcessor to complete processing all 3 events + Assertions.assertTrue(processingLatch.await(5, TimeUnit.SECONDS), + "MainEventBusProcessor should have processed all 3 events"); + subscriptionRef.get().cancel(); assertEquals(3, results.size()); assertEquals(3, httpClient.tasks.size()); @@ -811,6 +798,9 @@ public void onComplete() { assertEquals(1, curr.getArtifacts().size()); assertEquals(1, curr.getArtifacts().get(0).parts().size()); assertEquals("text", ((TextPart)curr.getArtifacts().get(0).parts().get(0)).getText()); + } finally { + mainEventBusProcessor.setCallback(null); + } } @Test @@ -1117,7 +1107,7 @@ public void testPushNotificationsNotSupportedError() { public void testOnGetPushNotificationNoPushNotifierConfig() { // Create request handler without a push notifier DefaultRequestHandler requestHandler = DefaultRequestHandler.create( - executor, taskStore, queueManager, null, null, internalExecutor); + executor, taskStore, queueManager, null, internalExecutor); AgentCard card = createAgentCard(false, true, false); JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor); @@ -1136,7 +1126,7 @@ public void testOnGetPushNotificationNoPushNotifierConfig() { public void testOnSetPushNotificationNoPushNotifierConfig() { // Create request handler without a push notifier DefaultRequestHandler requestHandler = DefaultRequestHandler.create( - executor, taskStore, queueManager, null, null, internalExecutor); + executor, taskStore, queueManager, null, internalExecutor); AgentCard card = createAgentCard(false, true, false); JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor); @@ -1228,7 +1218,7 @@ public void testDefaultRequestHandlerWithCustomComponents() { @Test public void testOnMessageSendErrorHandling() { DefaultRequestHandler requestHandler = DefaultRequestHandler.create( - executor, taskStore, queueManager, null, null, internalExecutor); + executor, taskStore, queueManager, null, internalExecutor); AgentCard card = createAgentCard(false, true, false); JSONRPCHandler handler = new JSONRPCHandler(card, requestHandler, internalExecutor); @@ -1274,16 +1264,31 @@ public void testOnMessageSendTaskIdMismatch() { } @Test - public void testOnMessageStreamTaskIdMismatch() { - JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); - taskStore.save(MINIMAL_TASK); + public void testOnMessageStreamTaskIdMismatch() throws InterruptedException { + // Setup callback to wait for the 1 event to be processed by MainEventBusProcessor + CountDownLatch processingLatch = new CountDownLatch(1); + mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() { + @Override + public void onEventProcessed(String taskId, io.a2a.spec.Event event) { + processingLatch.countDown(); + } - agentExecutorExecute = ((context, eventQueue) -> { - eventQueue.enqueueEvent(MINIMAL_TASK); + @Override + public void onTaskFinalized(String taskId) { + // Not needed for this test + } }); - SendStreamingMessageRequest request = new SendStreamingMessageRequest("1", new MessageSendParams(MESSAGE, null, null)); - Flow.Publisher response = handler.onMessageSendStream(request, callContext); + try { + JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); + taskStore.save(MINIMAL_TASK); + + agentExecutorExecute = ((context, eventQueue) -> { + eventQueue.enqueueEvent(MINIMAL_TASK); + }); + + SendStreamingMessageRequest request = new SendStreamingMessageRequest("1", new MessageSendParams(MESSAGE, null, null)); + Flow.Publisher response = handler.onMessageSendStream(request, callContext); CompletableFuture future = new CompletableFuture<>(); List results = new ArrayList<>(); @@ -1317,11 +1322,18 @@ public void onComplete() { } }); - future.join(); + future.join(); - Assertions.assertNull(error.get()); - Assertions.assertEquals(1, results.size()); - Assertions.assertInstanceOf(InternalError.class, results.get(0).getError()); + // Wait for MainEventBusProcessor to complete processing the event + Assertions.assertTrue(processingLatch.await(5, TimeUnit.SECONDS), + "MainEventBusProcessor should have processed the event"); + + Assertions.assertNull(error.get()); + Assertions.assertEquals(1, results.size()); + Assertions.assertInstanceOf(InternalError.class, results.get(0).getError()); + } finally { + mainEventBusProcessor.setCallback(null); + } } @Test @@ -1381,7 +1393,7 @@ public void testListPushNotificationConfigNotSupported() { @Test public void testListPushNotificationConfigNoPushConfigStore() { DefaultRequestHandler requestHandler = DefaultRequestHandler.create( - executor, taskStore, queueManager, null, null, internalExecutor); + executor, taskStore, queueManager, null, internalExecutor); JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK); agentExecutorExecute = (context, eventQueue) -> { @@ -1473,7 +1485,7 @@ public void testDeletePushNotificationConfigNotSupported() { @Test public void testDeletePushNotificationConfigNoPushConfigStore() { DefaultRequestHandler requestHandler = DefaultRequestHandler.create( - executor, taskStore, queueManager, null, null, internalExecutor); + executor, taskStore, queueManager, null, internalExecutor); JSONRPCHandler handler = new JSONRPCHandler(CARD, requestHandler, internalExecutor); taskStore.save(MINIMAL_TASK); agentExecutorExecute = (context, eventQueue) -> {