Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.a2a.server.events.EventQueueFactory;
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.InMemoryQueueManager;
import io.a2a.server.events.MainEventBus;
import io.a2a.server.events.QueueManager;

@ApplicationScoped
Expand All @@ -32,10 +33,12 @@ public class ReplicatedQueueManager implements QueueManager {
private TaskStateProvider taskStateProvider;

@Inject
public ReplicatedQueueManager(ReplicationStrategy replicationStrategy, TaskStateProvider taskStateProvider) {
public ReplicatedQueueManager(ReplicationStrategy replicationStrategy,
TaskStateProvider taskStateProvider,
MainEventBus mainEventBus) {
this.replicationStrategy = replicationStrategy;
this.taskStateProvider = taskStateProvider;
this.delegate = new InMemoryQueueManager(new ReplicatingEventQueueFactory(), taskStateProvider);
this.delegate = new InMemoryQueueManager(new ReplicatingEventQueueFactory(), taskStateProvider, mainEventBus);
}


Expand Down Expand Up @@ -139,12 +142,11 @@ public EventQueue.EventQueueBuilder builder(String taskId) {
// which sends the QueueClosedEvent after the database transaction commits.
// This ensures proper ordering and transactional guarantees.

// Return the builder with callbacks
return delegate.getEventQueueBuilder(taskId)
.taskId(taskId)
.hook(new ReplicationHook(taskId))
.addOnCloseCallback(delegate.getCleanupCallback(taskId))
.taskStateProvider(taskStateProvider);
// Call createBaseEventQueueBuilder() directly to avoid infinite recursion
// (getEventQueueBuilder() would delegate back to this factory, creating a loop)
// The base builder already includes: taskId, cleanup callback, taskStateProvider, mainEventBus
return delegate.createBaseEventQueueBuilder(taskId)
.hook(new ReplicationHook(taskId));
}
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.a2a.server.events;

public class EventQueueUtil {
public static void start(MainEventBusProcessor processor) {
processor.start();
}

public static void stop(MainEventBusProcessor processor) {
processor.stop();
}
}
5 changes: 5 additions & 0 deletions reference/jsonrpc/src/test/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
quarkus.arc.selected-alternatives=io.a2a.server.apps.common.TestHttpClient

# Debug logging for event processing and request handling
quarkus.log.category."io.a2a.server.events".level=DEBUG
quarkus.log.category."io.a2a.server.requesthandlers".level=DEBUG
quarkus.log.category."io.a2a.server.tasks".level=DEBUG
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,20 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
EventQueueItem item;
Event event;
try {
LOGGER.debug("EventConsumer polling queue {} (error={})", System.identityHashCode(queue), error);
item = queue.dequeueEventItem(QUEUE_WAIT_MILLISECONDS);
if (item == null) {
LOGGER.debug("EventConsumer poll timeout (null item), continuing");
continue;
}
event = item.getEvent();
LOGGER.debug("EventConsumer received event: {} (queue={})",
event.getClass().getSimpleName(), System.identityHashCode(queue));

// Defensive logging for error handling
if (event instanceof Throwable thr) {
LOGGER.info("EventConsumer detected Throwable event: {} - triggering tube.fail()",
thr.getClass().getSimpleName());
tube.fail(thr);
return;
}
Expand Down Expand Up @@ -121,8 +128,13 @@ public Flow.Publisher<EventQueueItem> consumeAll() {

public EnhancedRunnable.DoneCallback createAgentRunnableDoneCallback() {
return agentRunnable -> {
LOGGER.info("EventConsumer: Agent done callback invoked (hasError={}, queue={})",
agentRunnable.getError() != null, System.identityHashCode(queue));
if (agentRunnable.getError() != null) {
error = agentRunnable.getError();
LOGGER.info("EventConsumer: Set error field from agent callback");
} else {
LOGGER.info("EventConsumer: Agent completed successfully (no error), continuing consumption");
}
};
}
Expand Down
Loading
Loading