Skip to content

Commit 9e82cc6

Browse files
committed
support parallel agent
1 parent b3ca86e commit 9e82cc6

File tree

2 files changed

+50
-1
lines changed

2 files changed

+50
-1
lines changed

core/src/main/java/com/google/adk/agents/ParallelAgent.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.adk.agents.ConfigAgentUtils.ConfigurationException;
2121
import com.google.adk.events.Event;
2222
import io.reactivex.rxjava3.core.Flowable;
23+
import io.reactivex.rxjava3.schedulers.Schedulers;
2324
import java.util.ArrayList;
2425
import java.util.List;
2526
import org.slf4j.Logger;
@@ -131,7 +132,7 @@ protected Flowable<Event> runAsyncImpl(InvocationContext invocationContext) {
131132

132133
List<Flowable<Event>> agentFlowables = new ArrayList<>();
133134
for (BaseAgent subAgent : currentSubAgents) {
134-
agentFlowables.add(subAgent.runAsync(invocationContext));
135+
agentFlowables.add(subAgent.runAsync(invocationContext).subscribeOn(Schedulers.io()));
135136
}
136137
return Flowable.merge(agentFlowables);
137138
}

core/src/test/java/com/google/adk/agents/ParallelAgentTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,52 @@ public void runAsync_noSubAgents_returnsEmptyFlowable() {
110110

111111
assertThat(events).isEmpty();
112112
}
113+
114+
static class BlockingAgent extends BaseAgent {
115+
private final long sleepMillis;
116+
117+
private BlockingAgent(String name, long sleepMillis) {
118+
super(name, "Blocking Agent", ImmutableList.of(), null, null);
119+
this.sleepMillis = sleepMillis;
120+
}
121+
122+
@Override
123+
protected Flowable<Event> runAsyncImpl(InvocationContext invocationContext) {
124+
return Flowable.fromCallable(
125+
() -> {
126+
Thread.sleep(sleepMillis);
127+
return Event.builder()
128+
.author(name())
129+
.branch(invocationContext.branch().orElse(null))
130+
.invocationId(invocationContext.invocationId())
131+
.content(Content.fromParts(Part.fromText("Done")))
132+
.build();
133+
});
134+
}
135+
136+
@Override
137+
protected Flowable<Event> runLiveImpl(InvocationContext invocationContext) {
138+
throw new UnsupportedOperationException("Not implemented");
139+
}
140+
}
141+
142+
@Test
143+
public void runAsync_blockingSubAgents_shouldExecuteInParallel() {
144+
long sleepTime = 1000;
145+
BlockingAgent agent1 = new BlockingAgent("agent1", sleepTime);
146+
BlockingAgent agent2 = new BlockingAgent("agent2", sleepTime);
147+
148+
ParallelAgent parallelAgent =
149+
ParallelAgent.builder().name("parallel_agent").subAgents(agent1, agent2).build();
150+
151+
InvocationContext invocationContext = createInvocationContext(parallelAgent);
152+
153+
long startTime = System.currentTimeMillis();
154+
List<Event> events = parallelAgent.runAsync(invocationContext).toList().blockingGet();
155+
long duration = System.currentTimeMillis() - startTime;
156+
157+
assertThat(events).hasSize(2);
158+
// If parallel, duration should be less than 2 * sleepTime (2000ms).
159+
assertThat(duration).isLessThan((long) (2.0 * sleepTime));
160+
}
113161
}

0 commit comments

Comments
 (0)