diff --git a/api/pom.xml b/api/pom.xml
index 2fb06f467..f7bdbbabd 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -38,6 +38,10 @@ under the License.
com.fasterxml.jackson.corejackson-databind
+
+ com.fasterxml.jackson.dataformat
+ jackson-dataformat-yaml
+ org.apache.flinkflink-streaming-java
diff --git a/api/src/main/java/org/apache/flink/agents/api/AgentBuilder.java b/api/src/main/java/org/apache/flink/agents/api/AgentBuilder.java
index 662114274..4aac8b5ee 100644
--- a/api/src/main/java/org/apache/flink/agents/api/AgentBuilder.java
+++ b/api/src/main/java/org/apache/flink/agents/api/AgentBuilder.java
@@ -42,6 +42,21 @@ public interface AgentBuilder {
*/
AgentBuilder apply(Agent agent);
+ /**
+ * Apply an agent previously registered on the environment (typically via {@code
+ * env.loadYaml(...)}) by name.
+ *
+ *
Default implementation throws — concrete builders that have access to the environment
+ * override this to look up the named agent and delegate to {@link #apply(Agent)}.
+ *
+ * @param agentName the name under which the agent was registered on the environment.
+ * @return a configured AgentBuilder for method chaining.
+ */
+ default AgentBuilder apply(String agentName) {
+ throw new UnsupportedOperationException(
+ "apply(String) is not supported by this AgentBuilder; only Agent instances accepted.");
+ }
+
/**
* Get output list of agent execution.
*
diff --git a/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java b/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java
index 37f2c3dce..b1555f069 100644
--- a/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java
+++ b/api/src/main/java/org/apache/flink/agents/api/AgentsExecutionEnvironment.java
@@ -18,6 +18,7 @@
package org.apache.flink.agents.api;
+import org.apache.flink.agents.api.agents.Agent;
import org.apache.flink.agents.api.configuration.Configuration;
import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceType;
@@ -42,6 +43,7 @@
*/
public abstract class AgentsExecutionEnvironment {
protected final Map> resources;
+ protected final Map agents = new HashMap<>();
protected AgentsExecutionEnvironment() {
this.resources = new HashMap<>();
@@ -50,6 +52,25 @@ protected AgentsExecutionEnvironment() {
}
}
+ /**
+ * Returns the agents registered on this environment, keyed by name.
+ *
+ *
Populated by {@link #loadYaml(java.nio.file.Path...)} and friends.
+ */
+ public Map getAgents() {
+ return agents;
+ }
+
+ /**
+ * Returns the resources registered on this environment, grouped by {@link ResourceType}.
+ *
+ *
Exposed primarily so YAML loading code (in a sibling package) and tests can inspect
+ * registered shared resources without subclassing.
+ */
+ public Map> getResources() {
+ return resources;
+ }
+
/**
* Get agents execution environment.
*
@@ -229,4 +250,20 @@ public AgentsExecutionEnvironment addResource(String name, ResourceType type, Ob
}
return this;
}
+
+ /**
+ * Load one or more YAML files and register their agents and shared resources on this
+ * environment. Duplicate names — both within a single file and across the current environment —
+ * raise {@link IllegalArgumentException}.
+ */
+ public void loadYaml(java.nio.file.Path... paths) {
+ org.apache.flink.agents.api.yaml.YamlLoader.loadYaml(this, java.util.Arrays.asList(paths));
+ }
+
+ /**
+ * Load multiple YAML files and register their agents and shared resources on this environment.
+ */
+ public void loadYaml(java.util.List paths) {
+ org.apache.flink.agents.api.yaml.YamlLoader.loadYaml(this, paths);
+ }
}
diff --git a/api/src/main/java/org/apache/flink/agents/api/agents/Agent.java b/api/src/main/java/org/apache/flink/agents/api/agents/Agent.java
index 230e5a7bb..f73372488 100644
--- a/api/src/main/java/org/apache/flink/agents/api/agents/Agent.java
+++ b/api/src/main/java/org/apache/flink/agents/api/agents/Agent.java
@@ -18,6 +18,8 @@
package org.apache.flink.agents.api.agents;
+import org.apache.flink.agents.api.function.Function;
+import org.apache.flink.agents.api.function.JavaFunction;
import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.api.resource.SerializableResource;
@@ -31,7 +33,7 @@
/** Base class for defining agent logic. */
public class Agent {
- private final Map>> actions;
+ private final Map>> actions;
private final Map> resources;
@@ -43,7 +45,7 @@ public Agent() {
this.actions = new HashMap<>();
}
- public Map>> getActions() {
+ public Map>> getActions() {
return actions;
}
@@ -60,12 +62,7 @@ public Map> getResources() {
*/
public Agent addAction(
String[] eventTypes, Method method, @Nullable Map config) {
- String name = method.getName();
- if (actions.containsKey(name)) {
- throw new IllegalArgumentException(String.format("Action %s already defined.", name));
- }
- actions.put(name, new Tuple3<>(eventTypes, method, config));
- return this;
+ return addAction(method.getName(), eventTypes, JavaFunction.fromMethod(method), config);
}
/**
@@ -78,6 +75,27 @@ public Agent addAction(String[] eventTypes, Method method) {
return addAction(eventTypes, method, null);
}
+ /**
+ * Add action to agent.
+ *
+ * @param name The action name. Must be unique within this agent.
+ * @param eventTypes The event type strings this action listens to.
+ * @param function The api-layer function descriptor; will be promoted to a plan-layer
+ * executable at {@code AgentPlan} construction.
+ * @param config Optional config for this action.
+ */
+ public Agent addAction(
+ String name,
+ String[] eventTypes,
+ Function function,
+ @Nullable Map config) {
+ if (actions.containsKey(name)) {
+ throw new IllegalArgumentException(String.format("Action %s already defined.", name));
+ }
+ actions.put(name, new Tuple3<>(eventTypes, function, config));
+ return this;
+ }
+
public void addResourcesIfAbsent(Map> resources) {
for (ResourceType type : resources.keySet()) {
Map typedResources = resources.get(type);
diff --git a/api/src/main/java/org/apache/flink/agents/api/function/Function.java b/api/src/main/java/org/apache/flink/agents/api/function/Function.java
new file mode 100644
index 000000000..097a78ee3
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/function/Function.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.function;
+
+/**
+ * Pure-data marker for user-defined function descriptors carried on the api layer.
+ *
+ *
Implementations describe which function ({@link PythonFunction}, {@link
+ * JavaFunction}) but do not execute it. The plan-layer twins ({@code
+ * org.apache.flink.agents.plan.Function} and friends) own execution; the conversion from api → plan
+ * happens during {@code AgentPlan} construction.
+ */
+public interface Function {}
diff --git a/api/src/main/java/org/apache/flink/agents/api/function/JavaFunction.java b/api/src/main/java/org/apache/flink/agents/api/function/JavaFunction.java
new file mode 100644
index 000000000..2e438d9ce
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/function/JavaFunction.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.function;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.Serializable;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Pure-data descriptor for a Java method, identified by declaring class FQN, method name, and
+ * parameter types as strings.
+ *
+ *
Parameter types are strings — JVM primitive names ({@code int}, {@code long}, {@code boolean},
+ * …) or fully-qualified reference type names ({@code java.lang.String}, {@code java.util.List}). No
+ * generic parameters. The wire form keeps the descriptor pure data; class resolution is deferred to
+ * the plan-layer twin.
+ */
+public final class JavaFunction implements Function, Serializable {
+
+ private static final String FIELD_QUAL_NAME = "qualName";
+ private static final String FIELD_METHOD_NAME = "methodName";
+ private static final String FIELD_PARAMETER_TYPES = "parameterTypes";
+
+ @JsonProperty(FIELD_QUAL_NAME)
+ private final String qualName;
+
+ @JsonProperty(FIELD_METHOD_NAME)
+ private final String methodName;
+
+ @JsonProperty(FIELD_PARAMETER_TYPES)
+ private final List parameterTypes;
+
+ @JsonCreator
+ public JavaFunction(
+ @JsonProperty(FIELD_QUAL_NAME) String qualName,
+ @JsonProperty(FIELD_METHOD_NAME) String methodName,
+ @JsonProperty(FIELD_PARAMETER_TYPES) List parameterTypes) {
+ this.qualName = Objects.requireNonNull(qualName, "qualName");
+ this.methodName = Objects.requireNonNull(methodName, "methodName");
+ this.parameterTypes =
+ parameterTypes == null
+ ? Collections.emptyList()
+ : Collections.unmodifiableList(new ArrayList<>(parameterTypes));
+ }
+
+ /**
+ * Build a descriptor from a reflected {@link Method}. Each parameter type is captured via
+ * {@link Class#getName()} — the same form {@link Class#forName(String)} accepts when the api
+ * descriptor is later promoted to its plan-layer twin. For primitives this is the keyword
+ * ({@code int}, {@code long}); for reference types the fully-qualified name; for array types
+ * the JVM-internal descriptor ({@code [I}, {@code [Ljava.lang.String;}).
+ */
+ public static JavaFunction fromMethod(Method method) {
+ List params = new ArrayList<>(method.getParameterCount());
+ for (Class> p : method.getParameterTypes()) {
+ params.add(p.getName());
+ }
+ return new JavaFunction(method.getDeclaringClass().getName(), method.getName(), params);
+ }
+
+ public String getQualName() {
+ return qualName;
+ }
+
+ public String getMethodName() {
+ return methodName;
+ }
+
+ public List getParameterTypes() {
+ return parameterTypes;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof JavaFunction)) return false;
+ JavaFunction that = (JavaFunction) o;
+ return qualName.equals(that.qualName)
+ && methodName.equals(that.methodName)
+ && parameterTypes.equals(that.parameterTypes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(qualName, methodName, parameterTypes);
+ }
+
+ @Override
+ public String toString() {
+ return "JavaFunction{" + qualName + "#" + methodName + parameterTypes + "}";
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/function/PythonFunction.java b/api/src/main/java/org/apache/flink/agents/api/function/PythonFunction.java
new file mode 100644
index 000000000..e249a693f
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/function/PythonFunction.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.api.function;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Pure-data descriptor for a Python callable, identified by its module and qualified name.
+ *
+ *
Carries no execution behavior — the plan-layer {@code
+ * org.apache.flink.agents.plan.PythonFunction} owns invocation via the Pemja interpreter.
+ */
+public final class PythonFunction implements Function, Serializable {
+
+ private static final String FIELD_MODULE = "module";
+ private static final String FIELD_QUAL_NAME = "qualName";
+
+ @JsonProperty(FIELD_MODULE)
+ private final String module;
+
+ @JsonProperty(FIELD_QUAL_NAME)
+ private final String qualName;
+
+ @JsonCreator
+ public PythonFunction(
+ @JsonProperty(FIELD_MODULE) String module,
+ @JsonProperty(FIELD_QUAL_NAME) String qualName) {
+ this.module = Objects.requireNonNull(module, "module");
+ this.qualName = Objects.requireNonNull(qualName, "qualName");
+ }
+
+ public String getModule() {
+ return module;
+ }
+
+ public String getQualName() {
+ return qualName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof PythonFunction)) return false;
+ PythonFunction that = (PythonFunction) o;
+ return module.equals(that.module) && qualName.equals(that.qualName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(module, qualName);
+ }
+
+ @Override
+ public String toString() {
+ return "PythonFunction{" + module + ":" + qualName + "}";
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java b/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java
index a28006f87..03eb8248c 100644
--- a/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java
+++ b/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java
@@ -128,4 +128,36 @@ public interface PythonResourceAdapter {
* @return the result of the method invocation
*/
Object invoke(String name, Object... args);
+
+ /**
+ * Look up tool metadata for a Python function across the JVM→Python bridge.
+ *
+ *
The Java side asks the Python side to introspect a callable identified by {@code module} +
+ * {@code qualName}, and returns a flat {@code Map} with keys {@code "name"},
+ * {@code "description"}, and {@code "inputSchema"} (a JSON schema string compatible with {@code
+ * ToolMetadata.inputSchema}).
+ *
+ *
The return shape is intentionally flat — pemja can SIGSEGV when returning arbitrary Python
+ * objects to Java on non-main-interpreter threads.
+ *
+ * @param module the Python module containing the callable
+ * @param qualName the qualified name of the callable inside the module (e.g. {@code "fn"} or
+ * {@code "MyClass.method"})
+ * @return flat map with keys "name", "description", "inputSchema"
+ */
+ Map getPythonToolMetadata(String module, String qualName);
+
+ /**
+ * Invoke a Python callable as a tool, passing keyword arguments. Used when a Java chat model's
+ * tool list contains a {@code plan.FunctionTool} whose function descriptor is a {@code
+ * PythonFunction}: instead of routing the invocation through Java reflection, dispatch it
+ * across the bridge so the underlying Python function runs in the Pemja interpreter.
+ *
+ * @param module the Python module containing the callable
+ * @param qualName the qualified name of the callable inside the module
+ * @param kwargs keyword arguments to pass to the callable; LLM tool calls always arrive as
+ * keyword arguments
+ * @return the raw return value from the Python callable
+ */
+ Object invokePythonTool(String module, String qualName, Map kwargs);
}
diff --git a/api/src/main/java/org/apache/flink/agents/api/tools/FunctionTool.java b/api/src/main/java/org/apache/flink/agents/api/tools/FunctionTool.java
index 3ccba6745..cf9dbd8a9 100644
--- a/api/src/main/java/org/apache/flink/agents/api/tools/FunctionTool.java
+++ b/api/src/main/java/org/apache/flink/agents/api/tools/FunctionTool.java
@@ -18,25 +18,38 @@
package org.apache.flink.agents.api.tools;
+import org.apache.flink.agents.api.function.Function;
+import org.apache.flink.agents.api.function.JavaFunction;
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.api.resource.SerializableResource;
import java.lang.reflect.Method;
+import java.util.Objects;
-/** Tool keeps a method, will be converted to tool after compile. */
+/**
+ * Pure-data tool descriptor: carries an {@link Function} reference. Used at agent-construction
+ * time; compiled to the plan-layer executable {@code plan.tools.FunctionTool} when the agent
+ * becomes an {@code AgentPlan}.
+ */
public class FunctionTool extends SerializableResource {
- private final Method method;
- public FunctionTool(Method method) {
- this.method = method;
+ private final Function func;
+
+ public FunctionTool(Function func) {
+ this.func = Objects.requireNonNull(func, "func");
+ }
+
+ /** Convenience factory: derive a {@link JavaFunction} from a reflected method. */
+ public static FunctionTool fromMethod(Method method) {
+ return new FunctionTool(JavaFunction.fromMethod(method));
+ }
+
+ public Function getFunc() {
+ return func;
}
@Override
public ResourceType getResourceType() {
return ResourceType.TOOL;
}
-
- public Method getMethod() {
- return method;
- }
}
diff --git a/api/src/main/java/org/apache/flink/agents/api/tools/Tool.java b/api/src/main/java/org/apache/flink/agents/api/tools/Tool.java
index a02384005..11f0356d0 100644
--- a/api/src/main/java/org/apache/flink/agents/api/tools/Tool.java
+++ b/api/src/main/java/org/apache/flink/agents/api/tools/Tool.java
@@ -30,12 +30,21 @@
*/
public abstract class Tool extends SerializableResource {
- protected final ToolMetadata metadata;
+ protected ToolMetadata metadata;
protected Tool(ToolMetadata metadata) {
this.metadata = java.util.Objects.requireNonNull(metadata, "metadata cannot be null");
}
+ /**
+ * Replace this tool's metadata. Intended for subclasses that derive metadata lazily once a
+ * runtime bridge becomes available (e.g. {@code FunctionTool} backed by a {@code
+ * PythonFunction} refreshing placeholder metadata via the JVM→Python adapter).
+ */
+ protected void setMetadata(ToolMetadata metadata) {
+ this.metadata = java.util.Objects.requireNonNull(metadata, "metadata cannot be null");
+ }
+
/** Get the metadata of this tool. */
public final ToolMetadata getMetadata() {
return metadata;
@@ -68,6 +77,6 @@ public final String getDescription() {
/** Get tool keeps a method. */
public static FunctionTool fromMethod(Method method) {
- return new FunctionTool(method);
+ return FunctionTool.fromMethod(method);
}
}
diff --git a/api/src/main/java/org/apache/flink/agents/api/yaml/Aliases.java b/api/src/main/java/org/apache/flink/agents/api/yaml/Aliases.java
new file mode 100644
index 000000000..f7b2479dd
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/yaml/Aliases.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.yaml;
+
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.event.ChatRequestEvent;
+import org.apache.flink.agents.api.event.ChatResponseEvent;
+import org.apache.flink.agents.api.event.ContextRetrievalRequestEvent;
+import org.apache.flink.agents.api.event.ContextRetrievalResponseEvent;
+import org.apache.flink.agents.api.event.ToolRequestEvent;
+import org.apache.flink.agents.api.event.ToolResponseEvent;
+import org.apache.flink.agents.api.resource.ResourceName;
+import org.apache.flink.agents.api.resource.ResourceType;
+
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Static alias tables for the YAML loader.
+ *
+ *
Two tables:
+ *
+ *
+ *
{@link #EVENT_ALIASES} maps short event names to {@code EVENT_TYPE} constants.
+ *
{@link #CLAZZ_ALIASES} maps short provider names to fully-qualified class paths, keyed on
+ * resource type and implementation language so the same alias (e.g. {@code ollama})
+ * can refer to different classes across sections and languages.
+ *
+ *
+ *
For Python resources, the loader resolves the alias to the Python FQN and wraps it in a
+ * Java-side wrapper class (see {@link #PYTHON_WRAPPER_CLAZZ}).
+ */
+public final class Aliases {
+
+ /** Short event alias to fully-qualified {@code EVENT_TYPE} string. */
+ public static final Map EVENT_ALIASES;
+
+ /** ResourceType to Language to alias to fully-qualified class path. */
+ public static final Map>> CLAZZ_ALIASES;
+
+ /**
+ * ResourceType to Java-side wrapper FQN that embeds a Python implementation. Used when a YAML
+ * resource declares {@code type: python} so the Java host wraps the Python class through an
+ * existing PythonResourceWrapper implementation.
+ */
+ public static final Map PYTHON_WRAPPER_CLAZZ;
+
+ static {
+ Map ev = new HashMap<>();
+ ev.put("input", InputEvent.EVENT_TYPE);
+ ev.put("output", OutputEvent.EVENT_TYPE);
+ ev.put("chat_request", ChatRequestEvent.EVENT_TYPE);
+ ev.put("chat_response", ChatResponseEvent.EVENT_TYPE);
+ ev.put("tool_request", ToolRequestEvent.EVENT_TYPE);
+ ev.put("tool_response", ToolResponseEvent.EVENT_TYPE);
+ ev.put("context_retrieval_request", ContextRetrievalRequestEvent.EVENT_TYPE);
+ ev.put("context_retrieval_response", ContextRetrievalResponseEvent.EVENT_TYPE);
+ EVENT_ALIASES = Collections.unmodifiableMap(ev);
+
+ Map>> ca =
+ new EnumMap<>(ResourceType.class);
+
+ // CHAT_MODEL_CONNECTION
+ Map chatConnJava = new HashMap<>();
+ chatConnJava.put("ollama", ResourceName.ChatModel.OLLAMA_CONNECTION);
+ chatConnJava.put(
+ "openai_completions", ResourceName.ChatModel.OPENAI_COMPLETIONS_CONNECTION);
+ chatConnJava.put("openai_responses", ResourceName.ChatModel.OPENAI_RESPONSES_CONNECTION);
+ chatConnJava.put("anthropic", ResourceName.ChatModel.ANTHROPIC_CONNECTION);
+ chatConnJava.put("azure", ResourceName.ChatModel.AZURE_CONNECTION);
+ Map chatConnPython = new HashMap<>();
+ chatConnPython.put("ollama", ResourceName.ChatModel.Python.OLLAMA_CONNECTION);
+ chatConnPython.put("openai", ResourceName.ChatModel.Python.OPENAI_COMPLETIONS_CONNECTION);
+ chatConnPython.put("anthropic", ResourceName.ChatModel.Python.ANTHROPIC_CONNECTION);
+ chatConnPython.put("tongyi", ResourceName.ChatModel.Python.TONGYI_CONNECTION);
+ chatConnPython.put("azure_openai", ResourceName.ChatModel.Python.AZURE_OPENAI_CONNECTION);
+ ca.put(ResourceType.CHAT_MODEL_CONNECTION, buildLangBuckets(chatConnJava, chatConnPython));
+
+ // CHAT_MODEL
+ Map chatJava = new HashMap<>();
+ chatJava.put("ollama", ResourceName.ChatModel.OLLAMA_SETUP);
+ chatJava.put("openai_completions", ResourceName.ChatModel.OPENAI_COMPLETIONS_SETUP);
+ chatJava.put("openai_responses", ResourceName.ChatModel.OPENAI_RESPONSES_SETUP);
+ chatJava.put("anthropic", ResourceName.ChatModel.ANTHROPIC_SETUP);
+ chatJava.put("azure", ResourceName.ChatModel.AZURE_SETUP);
+ Map chatPython = new HashMap<>();
+ chatPython.put("ollama", ResourceName.ChatModel.Python.OLLAMA_SETUP);
+ chatPython.put("openai", ResourceName.ChatModel.Python.OPENAI_COMPLETIONS_SETUP);
+ chatPython.put("anthropic", ResourceName.ChatModel.Python.ANTHROPIC_SETUP);
+ chatPython.put("tongyi", ResourceName.ChatModel.Python.TONGYI_SETUP);
+ chatPython.put("azure_openai", ResourceName.ChatModel.Python.AZURE_OPENAI_SETUP);
+ ca.put(ResourceType.CHAT_MODEL, buildLangBuckets(chatJava, chatPython));
+
+ // EMBEDDING_MODEL_CONNECTION
+ Map embConnJava = new HashMap<>();
+ embConnJava.put("ollama", ResourceName.EmbeddingModel.OLLAMA_CONNECTION);
+ Map embConnPython = new HashMap<>();
+ embConnPython.put("ollama", ResourceName.EmbeddingModel.Python.OLLAMA_CONNECTION);
+ embConnPython.put("openai", ResourceName.EmbeddingModel.Python.OPENAI_CONNECTION);
+ embConnPython.put("tongyi", ResourceName.EmbeddingModel.Python.TONGYI_CONNECTION);
+ ca.put(
+ ResourceType.EMBEDDING_MODEL_CONNECTION,
+ buildLangBuckets(embConnJava, embConnPython));
+
+ // EMBEDDING_MODEL
+ Map embJava = new HashMap<>();
+ embJava.put("ollama", ResourceName.EmbeddingModel.OLLAMA_SETUP);
+ Map embPython = new HashMap<>();
+ embPython.put("ollama", ResourceName.EmbeddingModel.Python.OLLAMA_SETUP);
+ embPython.put("openai", ResourceName.EmbeddingModel.Python.OPENAI_SETUP);
+ embPython.put("tongyi", ResourceName.EmbeddingModel.Python.TONGYI_SETUP);
+ ca.put(ResourceType.EMBEDDING_MODEL, buildLangBuckets(embJava, embPython));
+
+ // VECTOR_STORE
+ Map vsJava = new HashMap<>();
+ vsJava.put("elasticsearch", ResourceName.VectorStore.ELASTICSEARCH_VECTOR_STORE);
+ Map vsPython = new HashMap<>();
+ vsPython.put("chroma", ResourceName.VectorStore.Python.CHROMA_VECTOR_STORE);
+ ca.put(ResourceType.VECTOR_STORE, buildLangBuckets(vsJava, vsPython));
+
+ CLAZZ_ALIASES = Collections.unmodifiableMap(ca);
+
+ Map wrap = new EnumMap<>(ResourceType.class);
+ wrap.put(
+ ResourceType.CHAT_MODEL_CONNECTION,
+ ResourceName.ChatModel.PYTHON_WRAPPER_CONNECTION);
+ wrap.put(ResourceType.CHAT_MODEL, ResourceName.ChatModel.PYTHON_WRAPPER_SETUP);
+ wrap.put(
+ ResourceType.EMBEDDING_MODEL_CONNECTION,
+ ResourceName.EmbeddingModel.PYTHON_WRAPPER_CONNECTION);
+ wrap.put(ResourceType.EMBEDDING_MODEL, ResourceName.EmbeddingModel.PYTHON_WRAPPER_SETUP);
+ wrap.put(ResourceType.VECTOR_STORE, ResourceName.VectorStore.PYTHON_WRAPPER_VECTOR_STORE);
+ PYTHON_WRAPPER_CLAZZ = Collections.unmodifiableMap(wrap);
+ }
+
+ private Aliases() {}
+
+ private static Map> buildLangBuckets(
+ Map javaBucket, Map pythonBucket) {
+ Map> out = new EnumMap<>(Language.class);
+ out.put(Language.JAVA, Collections.unmodifiableMap(new HashMap<>(javaBucket)));
+ out.put(Language.PYTHON, Collections.unmodifiableMap(new HashMap<>(pythonBucket)));
+ return Collections.unmodifiableMap(out);
+ }
+
+ /** Look up an event alias; return {@code name} unchanged on miss. */
+ public static String resolveEventType(String name) {
+ return EVENT_ALIASES.getOrDefault(name, name);
+ }
+
+ /**
+ * Look up a class alias for {@code (resourceType, language)}; return {@code name} unchanged on
+ * miss.
+ */
+ public static String resolveClazz(String name, ResourceType resourceType, Language language) {
+ Map> byLang = CLAZZ_ALIASES.get(resourceType);
+ if (byLang == null) {
+ return name;
+ }
+ Map bucket = byLang.get(language);
+ if (bucket == null) {
+ return name;
+ }
+ return bucket.getOrDefault(name, name);
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/yaml/Language.java b/api/src/main/java/org/apache/flink/agents/api/yaml/Language.java
new file mode 100644
index 000000000..5eaee39b7
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/yaml/Language.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.yaml;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonValue;
+
+/**
+ * Implementation language of a YAML-declared resource, action, or tool.
+ *
+ *
The JSON/YAML wire form is the lowercase string ({@code "python"} or {@code "java"}); the
+ * loader supplies the host-default when the field is omitted.
+ */
+public enum Language {
+ PYTHON("python"),
+ JAVA("java");
+
+ private final String value;
+
+ Language(String value) {
+ this.value = value;
+ }
+
+ @JsonValue
+ public String getValue() {
+ return value;
+ }
+
+ @JsonCreator
+ public static Language fromValue(String value) {
+ for (Language l : values()) {
+ if (l.value.equals(value)) return l;
+ }
+ throw new IllegalArgumentException("Unknown language: " + value);
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/yaml/YamlLoader.java b/api/src/main/java/org/apache/flink/agents/api/yaml/YamlLoader.java
new file mode 100644
index 000000000..8189c0d38
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/yaml/YamlLoader.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.yaml;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import org.apache.flink.agents.api.AgentsExecutionEnvironment;
+import org.apache.flink.agents.api.agents.Agent;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.function.Function;
+import org.apache.flink.agents.api.function.JavaFunction;
+import org.apache.flink.agents.api.function.PythonFunction;
+import org.apache.flink.agents.api.prompt.Prompt;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.skills.Skills;
+import org.apache.flink.agents.api.tools.FunctionTool;
+import org.apache.flink.agents.api.yaml.spec.ActionSpec;
+import org.apache.flink.agents.api.yaml.spec.AgentActionRef;
+import org.apache.flink.agents.api.yaml.spec.AgentSpec;
+import org.apache.flink.agents.api.yaml.spec.DescriptorSpec;
+import org.apache.flink.agents.api.yaml.spec.PromptSpec;
+import org.apache.flink.agents.api.yaml.spec.SkillsSpec;
+import org.apache.flink.agents.api.yaml.spec.ToolSpec;
+import org.apache.flink.agents.api.yaml.spec.YamlAgentsDocument;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** YAML loader entry points and helpers. */
+public final class YamlLoader {
+
+ private YamlLoader() {}
+
+ private static final ObjectMapper YAML_MAPPER = new ObjectMapper(new YAMLFactory());
+
+ /** Default Java parameter types for an action method: (Event, RunnerContext). */
+ private static final List JAVA_ACTION_PARAMETER_TYPES =
+ Collections.unmodifiableList(
+ List.of(
+ "org.apache.flink.agents.api.Event",
+ "org.apache.flink.agents.api.context.RunnerContext"));
+
+ /**
+ * Resolve a YAML function reference into a pure-data {@link Function}.
+ *
+ *
{@code function} must be {@code :}. For {@link Language#PYTHON}
+ * the right side is a qualified Python name (which may contain dots for class methods). For
+ * {@link Language#JAVA} the left side is a class FQN and the right side is a method name; the
+ * caller must pass {@code parameterTypes}.
+ */
+ public static Function resolveFunction(
+ String name, String function, Language language, List parameterTypes) {
+ if (function == null) {
+ throw new IllegalArgumentException(
+ "Action/tool '"
+ + name
+ + "': 'function' is required and must be of the form "
+ + "':'.");
+ }
+ int firstColon = function.indexOf(':');
+ int lastColon = function.lastIndexOf(':');
+ if (firstColon <= 0 || firstColon != lastColon || lastColon == function.length() - 1) {
+ String kind = language == Language.JAVA ? "java" : "python";
+ throw new IllegalArgumentException(
+ "Action/tool '"
+ + name
+ + "': "
+ + kind
+ + " function '"
+ + function
+ + "' must be of the form ':' (e.g. "
+ + "'pkg.tools:add', 'pkg.tools:MyTools.add', 'com.example.X:method').");
+ }
+ String left = function.substring(0, firstColon);
+ String right = function.substring(firstColon + 1);
+ if (language == Language.JAVA) {
+ return new JavaFunction(
+ left, right, parameterTypes == null ? Collections.emptyList() : parameterTypes);
+ }
+ return new PythonFunction(left, right);
+ }
+
+ /**
+ * Build a {@link ResourceDescriptor} from a parsed {@link DescriptorSpec}, resolving the alias
+ * and applying cross-language wrapping when {@code type: python}.
+ *
+ *
For {@code type: python} the resulting descriptor's {@code clazz} is the Java-side wrapper
+ * FQN (looked up in {@link Aliases#PYTHON_WRAPPER_CLAZZ}) and a {@code pythonClazz} init
+ * argument carries the Python implementation FQN — matching what {@code PythonResourceProvider}
+ * already expects.
+ */
+ public static ResourceDescriptor buildDescriptor(
+ DescriptorSpec spec, ResourceType resourceType) {
+ Language language = spec.getType() == null ? Language.PYTHON : spec.getType();
+ Map extras = new LinkedHashMap<>(spec.getExtras());
+
+ if (language == Language.PYTHON) {
+ String wrapper = Aliases.PYTHON_WRAPPER_CLAZZ.get(resourceType);
+ if (wrapper == null) {
+ throw new IllegalArgumentException(
+ "Resource '"
+ + spec.getName()
+ + "': type='python' is not supported for "
+ + resourceType.getValue()
+ + " (no Java-side Python wrapper).");
+ }
+ String pythonFqn = Aliases.resolveClazz(spec.getClazz(), resourceType, Language.PYTHON);
+ extras.put("pythonClazz", pythonFqn);
+ return new ResourceDescriptor(wrapper, extras);
+ }
+ String javaFqn = Aliases.resolveClazz(spec.getClazz(), resourceType, Language.JAVA);
+ return new ResourceDescriptor(javaFqn, extras);
+ }
+
+ /** Build a {@link FunctionTool} from a parsed {@link ToolSpec}. */
+ public static FunctionTool buildTool(ToolSpec spec) {
+ Language language = spec.getType() == null ? Language.PYTHON : spec.getType();
+ if (language == Language.JAVA && spec.getParameterTypes() == null) {
+ throw new IllegalArgumentException(
+ "Tool '"
+ + spec.getName()
+ + "': java tools must declare 'parameter_types' in YAML.");
+ }
+ Function fn =
+ resolveFunction(
+ spec.getName(), spec.getFunction(), language, spec.getParameterTypes());
+ return new FunctionTool(fn);
+ }
+
+ /** Build a {@link Prompt} from a parsed {@link PromptSpec}. */
+ public static Prompt buildPrompt(PromptSpec spec) {
+ if (spec.getText() != null) {
+ return Prompt.fromText(spec.getText());
+ }
+ List messages =
+ spec.getMessages().stream()
+ .map(m -> new ChatMessage(m.getRole(), m.getContent()))
+ .collect(Collectors.toList());
+ return Prompt.fromMessages(messages);
+ }
+
+ /** Build a {@link Skills} resource from a parsed {@link SkillsSpec}. */
+ public static Skills buildSkills(SkillsSpec spec) {
+ return new Skills(new ArrayList<>(spec.getPaths()));
+ }
+
+ /**
+ * Output of {@link #buildAgents(Path)}. Holds in-file state without touching any environment.
+ */
+ public static final class LoadedFile {
+ private final Map agents;
+ private final Map> sharedResources;
+ private final Map sharedActions;
+ private final Map agentSpecs;
+
+ LoadedFile(
+ Map agents,
+ Map> sharedResources,
+ Map sharedActions,
+ Map agentSpecs) {
+ this.agents = agents;
+ this.sharedResources = sharedResources;
+ this.sharedActions = sharedActions;
+ this.agentSpecs = agentSpecs;
+ }
+
+ public Map getAgents() {
+ return agents;
+ }
+
+ public Map> getSharedResources() {
+ return sharedResources;
+ }
+
+ public Map getSharedActions() {
+ return sharedActions;
+ }
+
+ /** Package-private — used by {@code loadYaml(...)} when it lands. */
+ Map getAgentSpecs() {
+ return agentSpecs;
+ }
+ }
+
+ /** Parse one YAML file and build the agents it declares. */
+ public static LoadedFile buildAgents(Path path) {
+ YamlAgentsDocument doc;
+ try {
+ doc = YAML_MAPPER.readValue(Files.readString(path), YamlAgentsDocument.class);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to read YAML " + path, e);
+ }
+ if (doc == null) {
+ throw new IllegalArgumentException("YAML file " + path + " is empty");
+ }
+
+ Map agents = new LinkedHashMap<>();
+ Map agentSpecs = new LinkedHashMap<>();
+ for (AgentSpec spec : doc.getAgents()) {
+ if (agents.containsKey(spec.getName())) {
+ throw new IllegalArgumentException(
+ "Duplicate agent name '" + spec.getName() + "' in " + path);
+ }
+ agentSpecs.put(spec.getName(), spec);
+ agents.put(spec.getName(), buildAgent(spec));
+ }
+
+ Map> sharedResources = new EnumMap<>(ResourceType.class);
+ for (ResourceType t : ResourceType.values()) {
+ sharedResources.put(t, new LinkedHashMap<>());
+ }
+
+ addSharedDescriptors(
+ sharedResources,
+ ResourceType.CHAT_MODEL_CONNECTION,
+ doc.getChatModelConnections(),
+ path);
+ addSharedDescriptors(
+ sharedResources, ResourceType.CHAT_MODEL, doc.getChatModelSetups(), path);
+ addSharedDescriptors(
+ sharedResources,
+ ResourceType.EMBEDDING_MODEL_CONNECTION,
+ doc.getEmbeddingModelConnections(),
+ path);
+ addSharedDescriptors(
+ sharedResources, ResourceType.EMBEDDING_MODEL, doc.getEmbeddingModelSetups(), path);
+ addSharedDescriptors(
+ sharedResources, ResourceType.VECTOR_STORE, doc.getVectorStores(), path);
+ addSharedDescriptors(sharedResources, ResourceType.MCP_SERVER, doc.getMcpServers(), path);
+
+ for (ToolSpec t : doc.getTools()) {
+ if (sharedResources.get(ResourceType.TOOL).put(t.getName(), buildTool(t)) != null) {
+ throw new IllegalArgumentException(
+ "Duplicate shared tool name '" + t.getName() + "' in " + path);
+ }
+ }
+ for (PromptSpec p : doc.getPrompts()) {
+ if (sharedResources.get(ResourceType.PROMPT).put(p.getName(), buildPrompt(p)) != null) {
+ throw new IllegalArgumentException(
+ "Duplicate shared prompt name '" + p.getName() + "' in " + path);
+ }
+ }
+ for (SkillsSpec s : doc.getSkills()) {
+ if (sharedResources.get(ResourceType.SKILLS).put(s.getName(), buildSkills(s)) != null) {
+ throw new IllegalArgumentException(
+ "Duplicate shared skills name '" + s.getName() + "' in " + path);
+ }
+ }
+
+ Map sharedActions = new LinkedHashMap<>();
+ for (ActionSpec a : doc.getActions()) {
+ if (sharedActions.containsKey(a.getName())) {
+ throw new IllegalArgumentException(
+ "Duplicate shared action name '" + a.getName() + "' in " + path);
+ }
+ sharedActions.put(a.getName(), a);
+ }
+
+ return new LoadedFile(agents, sharedResources, sharedActions, agentSpecs);
+ }
+
+ /** Load one YAML file and register its agents and shared resources on the environment. */
+ public static void loadYaml(AgentsExecutionEnvironment env, Path path) {
+ loadYaml(env, List.of(path));
+ }
+
+ /**
+ * Load multiple YAML files. Multiple calls accumulate. Duplicate names — both within a single
+ * file (caught by {@link #buildAgents(Path)}) and across the current environment — raise {@link
+ * IllegalArgumentException}.
+ */
+ public static void loadYaml(AgentsExecutionEnvironment env, List paths) {
+ for (Path path : paths) {
+ LoadedFile loaded = buildAgents(path);
+
+ // Resolve shared-action string refs first so a bad ref doesn't leave partial state on
+ // the env.
+ for (Map.Entry entry : loaded.getAgents().entrySet()) {
+ AgentSpec spec = loaded.getAgentSpecs().get(entry.getKey());
+ for (AgentActionRef ref : spec.getActions()) {
+ if (!ref.isReference()) {
+ continue;
+ }
+ ActionSpec shared = loaded.getSharedActions().get(ref.getReference());
+ if (shared == null) {
+ throw new IllegalArgumentException(
+ "Agent '"
+ + entry.getKey()
+ + "' references shared action '"
+ + ref.getReference()
+ + "' in "
+ + path
+ + ", but no shared action with that name is defined at"
+ + " the file level.");
+ }
+ addActionToAgent(entry.getValue(), shared);
+ }
+ }
+
+ // Cross-environment agent name uniqueness check.
+ for (String name : loaded.getAgents().keySet()) {
+ if (env.getAgents().containsKey(name)) {
+ throw new IllegalArgumentException(
+ "Duplicate agent name '" + name + "' (loading " + path + ")");
+ }
+ }
+
+ // Commit shared resources — env.addResource enforces dedup with its own message.
+ for (Map.Entry> e :
+ loaded.getSharedResources().entrySet()) {
+ for (Map.Entry r : e.getValue().entrySet()) {
+ env.addResource(r.getKey(), e.getKey(), r.getValue());
+ }
+ }
+ env.getAgents().putAll(loaded.getAgents());
+ }
+ }
+
+ private static void addSharedDescriptors(
+ Map> sharedResources,
+ ResourceType type,
+ List specs,
+ Path path) {
+ Map bucket = sharedResources.get(type);
+ for (DescriptorSpec s : specs) {
+ if (bucket.put(s.getName(), buildDescriptor(s, type)) != null) {
+ throw new IllegalArgumentException(
+ "Duplicate shared resource name '" + s.getName() + "' in " + path);
+ }
+ }
+ }
+
+ private static Agent buildAgent(AgentSpec spec) {
+ Agent agent = new Agent();
+ addAgentDescriptors(
+ agent, ResourceType.CHAT_MODEL_CONNECTION, spec.getChatModelConnections());
+ addAgentDescriptors(agent, ResourceType.CHAT_MODEL, spec.getChatModelSetups());
+ addAgentDescriptors(
+ agent,
+ ResourceType.EMBEDDING_MODEL_CONNECTION,
+ spec.getEmbeddingModelConnections());
+ addAgentDescriptors(agent, ResourceType.EMBEDDING_MODEL, spec.getEmbeddingModelSetups());
+ addAgentDescriptors(agent, ResourceType.VECTOR_STORE, spec.getVectorStores());
+ addAgentDescriptors(agent, ResourceType.MCP_SERVER, spec.getMcpServers());
+
+ for (ToolSpec t : spec.getTools()) {
+ agent.addResource(t.getName(), ResourceType.TOOL, buildTool(t));
+ }
+ for (PromptSpec p : spec.getPrompts()) {
+ agent.addResource(p.getName(), ResourceType.PROMPT, buildPrompt(p));
+ }
+ for (SkillsSpec s : spec.getSkills()) {
+ agent.addResource(s.getName(), ResourceType.SKILLS, buildSkills(s));
+ }
+ for (AgentActionRef ref : spec.getActions()) {
+ if (ref.isReference()) {
+ continue; // resolved later in loadYaml
+ }
+ addActionToAgent(agent, ref.getSpec());
+ }
+ return agent;
+ }
+
+ private static void addAgentDescriptors(
+ Agent agent, ResourceType type, List specs) {
+ for (DescriptorSpec s : specs) {
+ agent.addResource(s.getName(), type, buildDescriptor(s, type));
+ }
+ }
+
+ /**
+ * Resolve an ActionSpec's function reference. Java actions always use the standard {@code
+ * (Event, RunnerContext)} parameter types — ActionSpec has no {@code parameter_types} field
+ * because action method signatures are fixed by the framework.
+ */
+ static Function resolveActionFunction(ActionSpec action) {
+ Language language = action.getType() == null ? Language.PYTHON : action.getType();
+ List paramTypes = language == Language.JAVA ? JAVA_ACTION_PARAMETER_TYPES : null;
+ return resolveFunction(action.getName(), action.getFunction(), language, paramTypes);
+ }
+
+ /** Register an action on the agent with event aliases resolved. */
+ static void addActionToAgent(Agent agent, ActionSpec action) {
+ Function fn = resolveActionFunction(action);
+ String[] events =
+ action.getListenTo().stream().map(Aliases::resolveEventType).toArray(String[]::new);
+ Map config = action.getConfig();
+ agent.addAction(action.getName(), events, fn, config);
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/yaml/spec/ActionSpec.java b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/ActionSpec.java
new file mode 100644
index 000000000..4a8b07cf6
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/ActionSpec.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.yaml.spec;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.agents.api.yaml.Language;
+
+import java.util.List;
+import java.util.Map;
+
+/** Action referencing a user function plus the event types it listens to. */
+@JsonIgnoreProperties(ignoreUnknown = false)
+public final class ActionSpec {
+ private final String name;
+ private final String function;
+ private final List listenTo;
+ private final Map config;
+ private final Language type;
+
+ @JsonCreator
+ public ActionSpec(
+ @JsonProperty(value = "name", required = true) String name,
+ @JsonProperty("function") String function,
+ @JsonProperty(value = "listen_to", required = true) List listenTo,
+ @JsonProperty("config") Map config,
+ @JsonProperty("type") Language type) {
+ if (listenTo == null || listenTo.isEmpty()) {
+ throw new IllegalArgumentException("listen_to must not be empty");
+ }
+ this.name = name;
+ this.function = function;
+ this.listenTo = listenTo;
+ this.config = config;
+ this.type = type;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getFunction() {
+ return function;
+ }
+
+ public List getListenTo() {
+ return listenTo;
+ }
+
+ public Map getConfig() {
+ return config;
+ }
+
+ public Language getType() {
+ return type;
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/yaml/spec/AgentActionRef.java b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/AgentActionRef.java
new file mode 100644
index 000000000..a7fdbff26
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/AgentActionRef.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.yaml.spec;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+import java.io.IOException;
+
+/**
+ * An item under {@code agents[].actions:} — either a string reference to a shared action, or a full
+ * {@link ActionSpec}.
+ */
+@JsonDeserialize(using = AgentActionRef.Deserializer.class)
+public final class AgentActionRef {
+ private final String reference;
+ private final ActionSpec spec;
+
+ private AgentActionRef(String reference, ActionSpec spec) {
+ this.reference = reference;
+ this.spec = spec;
+ }
+
+ public static AgentActionRef of(String reference) {
+ return new AgentActionRef(reference, null);
+ }
+
+ public static AgentActionRef of(ActionSpec spec) {
+ return new AgentActionRef(null, spec);
+ }
+
+ public boolean isReference() {
+ return reference != null;
+ }
+
+ public String getReference() {
+ return reference;
+ }
+
+ public ActionSpec getSpec() {
+ return spec;
+ }
+
+ static final class Deserializer extends JsonDeserializer {
+ @Override
+ public AgentActionRef deserialize(JsonParser p, DeserializationContext ctxt)
+ throws IOException {
+ if (p.currentToken().isScalarValue()) {
+ return AgentActionRef.of(p.getValueAsString());
+ }
+ ActionSpec spec = ctxt.readValue(p, ActionSpec.class);
+ return AgentActionRef.of(spec);
+ }
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/yaml/spec/AgentSpec.java b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/AgentSpec.java
new file mode 100644
index 000000000..0e37a8aad
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/AgentSpec.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.yaml.spec;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.List;
+
+/** One agent inside a YAML file's {@code agents:} list. */
+@JsonIgnoreProperties(ignoreUnknown = false)
+public final class AgentSpec {
+ private final String name;
+ private final String description;
+ private final List prompts;
+ private final List tools;
+ private final List skills;
+ private final List actions;
+ private final List chatModelConnections;
+ private final List chatModelSetups;
+ private final List embeddingModelConnections;
+ private final List embeddingModelSetups;
+ private final List vectorStores;
+ private final List mcpServers;
+
+ @JsonCreator
+ public AgentSpec(
+ @JsonProperty(value = "name", required = true) String name,
+ @JsonProperty("description") String description,
+ @JsonProperty("prompts") List prompts,
+ @JsonProperty("tools") List tools,
+ @JsonProperty("skills") List skills,
+ @JsonProperty("actions") List actions,
+ @JsonProperty("chat_model_connections") List chatModelConnections,
+ @JsonProperty("chat_model_setups") List chatModelSetups,
+ @JsonProperty("embedding_model_connections")
+ List embeddingModelConnections,
+ @JsonProperty("embedding_model_setups") List embeddingModelSetups,
+ @JsonProperty("vector_stores") List vectorStores,
+ @JsonProperty("mcp_servers") List mcpServers) {
+ this.name = name;
+ this.description = description;
+ this.prompts = orEmpty(prompts);
+ this.tools = orEmpty(tools);
+ this.skills = orEmpty(skills);
+ this.actions = orEmpty(actions);
+ this.chatModelConnections = orEmpty(chatModelConnections);
+ this.chatModelSetups = orEmpty(chatModelSetups);
+ this.embeddingModelConnections = orEmpty(embeddingModelConnections);
+ this.embeddingModelSetups = orEmpty(embeddingModelSetups);
+ this.vectorStores = orEmpty(vectorStores);
+ this.mcpServers = orEmpty(mcpServers);
+ }
+
+ private static List orEmpty(List list) {
+ return list == null ? Collections.emptyList() : list;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public List getPrompts() {
+ return prompts;
+ }
+
+ public List getTools() {
+ return tools;
+ }
+
+ public List getSkills() {
+ return skills;
+ }
+
+ public List getActions() {
+ return actions;
+ }
+
+ public List getChatModelConnections() {
+ return chatModelConnections;
+ }
+
+ public List getChatModelSetups() {
+ return chatModelSetups;
+ }
+
+ public List getEmbeddingModelConnections() {
+ return embeddingModelConnections;
+ }
+
+ public List getEmbeddingModelSetups() {
+ return embeddingModelSetups;
+ }
+
+ public List getVectorStores() {
+ return vectorStores;
+ }
+
+ public List getMcpServers() {
+ return mcpServers;
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/yaml/spec/DescriptorSpec.java b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/DescriptorSpec.java
new file mode 100644
index 000000000..8a72c7aba
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/DescriptorSpec.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.yaml.spec;
+
+import com.fasterxml.jackson.annotation.JsonAnyGetter;
+import com.fasterxml.jackson.annotation.JsonAnySetter;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.agents.api.yaml.Language;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Schema for any ResourceDescriptor-backed resource. Required: {@code name} and {@code clazz}.
+ * {@code type} optionally pins the language. All remaining properties are captured into {@link
+ * #getExtras()} and forwarded as ResourceDescriptor init args by the loader.
+ */
+public final class DescriptorSpec {
+ private final String name;
+ private final String clazz;
+ private final Language type;
+ private final Map extras = new LinkedHashMap<>();
+
+ @JsonCreator
+ public DescriptorSpec(
+ @JsonProperty(value = "name", required = true) String name,
+ @JsonProperty(value = "clazz", required = true) String clazz,
+ @JsonProperty("type") Language type) {
+ this.name = name;
+ this.clazz = clazz;
+ this.type = type;
+ }
+
+ @JsonAnySetter
+ void putExtra(String key, Object value) {
+ extras.put(key, value);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getClazz() {
+ return clazz;
+ }
+
+ public Language getType() {
+ return type;
+ }
+
+ @JsonAnyGetter
+ public Map getExtras() {
+ return extras;
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/yaml/spec/PromptMessage.java b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/PromptMessage.java
new file mode 100644
index 000000000..c2887539b
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/PromptMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.yaml.spec;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+
+/** One message in a multi-turn prompt template. */
+@JsonIgnoreProperties(ignoreUnknown = false)
+public final class PromptMessage {
+ private final MessageRole role;
+ private final String content;
+
+ @JsonCreator
+ public PromptMessage(
+ @JsonProperty("role") String role,
+ @JsonProperty(value = "content", required = true) String content) {
+ this.role = role == null ? MessageRole.USER : MessageRole.fromValue(role);
+ this.content = content;
+ }
+
+ public MessageRole getRole() {
+ return role;
+ }
+
+ public String getContent() {
+ return content;
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/yaml/spec/PromptSpec.java b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/PromptSpec.java
new file mode 100644
index 000000000..54a5ee2e7
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/PromptSpec.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.yaml.spec;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/** Declarative prompt: either a single text template or a list of role-tagged messages. */
+@JsonIgnoreProperties(ignoreUnknown = false)
+public final class PromptSpec {
+ private final String name;
+ private final String text;
+ private final List messages;
+
+ @JsonCreator
+ public PromptSpec(
+ @JsonProperty(value = "name", required = true) String name,
+ @JsonProperty("text") String text,
+ @JsonProperty("messages") List messages) {
+ this.name = name;
+ this.text = text;
+ this.messages = messages;
+ boolean hasText = text != null && !text.isEmpty();
+ boolean hasMessages = messages != null && !messages.isEmpty();
+ if (hasText == hasMessages) {
+ throw new IllegalArgumentException(
+ "prompt must define exactly one non-empty 'text' or 'messages'");
+ }
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getText() {
+ return text;
+ }
+
+ public List getMessages() {
+ return messages;
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/yaml/spec/SkillsSpec.java b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/SkillsSpec.java
new file mode 100644
index 000000000..61136e049
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/SkillsSpec.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.yaml.spec;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/** Declarative Skills resource. */
+@JsonIgnoreProperties(ignoreUnknown = false)
+public final class SkillsSpec {
+ private final String name;
+ private final List paths;
+
+ @JsonCreator
+ public SkillsSpec(
+ @JsonProperty(value = "name", required = true) String name,
+ @JsonProperty(value = "paths", required = true) List paths) {
+ this.name = name;
+ this.paths = paths;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public List getPaths() {
+ return paths;
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/yaml/spec/ToolSpec.java b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/ToolSpec.java
new file mode 100644
index 000000000..b42f90cf2
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/ToolSpec.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.yaml.spec;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.agents.api.yaml.Language;
+
+import java.util.List;
+
+/** Declarative function tool. */
+@JsonIgnoreProperties(ignoreUnknown = false)
+public final class ToolSpec {
+ private final String name;
+ private final String function;
+ private final Language type;
+ private final List parameterTypes;
+
+ @JsonCreator
+ public ToolSpec(
+ @JsonProperty(value = "name", required = true) String name,
+ @JsonProperty("function") String function,
+ @JsonProperty("type") Language type,
+ @JsonProperty("parameter_types") List parameterTypes) {
+ this.name = name;
+ this.function = function;
+ this.type = type;
+ this.parameterTypes = parameterTypes;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getFunction() {
+ return function;
+ }
+
+ public Language getType() {
+ return type;
+ }
+
+ public List getParameterTypes() {
+ return parameterTypes;
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/yaml/spec/YamlAgentsDocument.java b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/YamlAgentsDocument.java
new file mode 100644
index 000000000..cc0874d05
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/YamlAgentsDocument.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.yaml.spec;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.List;
+
+/** Top-level YAML document. */
+@JsonIgnoreProperties(ignoreUnknown = false)
+public final class YamlAgentsDocument {
+ private final List agents;
+ private final List prompts;
+ private final List tools;
+ private final List skills;
+ private final List actions;
+ private final List chatModelConnections;
+ private final List chatModelSetups;
+ private final List embeddingModelConnections;
+ private final List embeddingModelSetups;
+ private final List vectorStores;
+ private final List mcpServers;
+
+ @JsonCreator
+ public YamlAgentsDocument(
+ @JsonProperty(value = "agents", required = true) List agents,
+ @JsonProperty("prompts") List prompts,
+ @JsonProperty("tools") List tools,
+ @JsonProperty("skills") List skills,
+ @JsonProperty("actions") List actions,
+ @JsonProperty("chat_model_connections") List chatModelConnections,
+ @JsonProperty("chat_model_setups") List chatModelSetups,
+ @JsonProperty("embedding_model_connections")
+ List embeddingModelConnections,
+ @JsonProperty("embedding_model_setups") List embeddingModelSetups,
+ @JsonProperty("vector_stores") List vectorStores,
+ @JsonProperty("mcp_servers") List mcpServers) {
+ this.agents = agents;
+ this.prompts = orEmpty(prompts);
+ this.tools = orEmpty(tools);
+ this.skills = orEmpty(skills);
+ this.actions = orEmpty(actions);
+ this.chatModelConnections = orEmpty(chatModelConnections);
+ this.chatModelSetups = orEmpty(chatModelSetups);
+ this.embeddingModelConnections = orEmpty(embeddingModelConnections);
+ this.embeddingModelSetups = orEmpty(embeddingModelSetups);
+ this.vectorStores = orEmpty(vectorStores);
+ this.mcpServers = orEmpty(mcpServers);
+ }
+
+ private static List orEmpty(List list) {
+ return list == null ? Collections.emptyList() : list;
+ }
+
+ public List getAgents() {
+ return agents;
+ }
+
+ public List getPrompts() {
+ return prompts;
+ }
+
+ public List getTools() {
+ return tools;
+ }
+
+ public List getSkills() {
+ return skills;
+ }
+
+ public List getActions() {
+ return actions;
+ }
+
+ public List getChatModelConnections() {
+ return chatModelConnections;
+ }
+
+ public List getChatModelSetups() {
+ return chatModelSetups;
+ }
+
+ public List getEmbeddingModelConnections() {
+ return embeddingModelConnections;
+ }
+
+ public List getEmbeddingModelSetups() {
+ return embeddingModelSetups;
+ }
+
+ public List getVectorStores() {
+ return vectorStores;
+ }
+
+ public List getMcpServers() {
+ return mcpServers;
+ }
+}
diff --git a/api/src/test/java/org/apache/flink/agents/api/AgentBuilderApplyByNameTest.java b/api/src/test/java/org/apache/flink/agents/api/AgentBuilderApplyByNameTest.java
new file mode 100644
index 000000000..8e3d6d3fd
--- /dev/null
+++ b/api/src/test/java/org/apache/flink/agents/api/AgentBuilderApplyByNameTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api;
+
+import org.apache.flink.agents.api.agents.Agent;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class AgentBuilderApplyByNameTest {
+
+ @Test
+ void defaultApplyByNameThrows() {
+ AgentBuilder b = new SimpleStubBuilder();
+ assertThatThrownBy(() -> b.apply("any")).isInstanceOf(UnsupportedOperationException.class);
+ }
+
+ @Test
+ void overrideResolvesFromEnv() {
+ Agent inc = new Agent();
+ AgentBuilder b = new RegistryStubBuilder("inc", inc);
+ b.apply("inc");
+ assertThatThrownBy(() -> b.apply("ghost")).hasMessageContaining("ghost");
+ }
+
+ /** Stub builder that doesn't override apply(String). */
+ private static final class SimpleStubBuilder implements AgentBuilder {
+ @Override
+ public AgentBuilder apply(Agent agent) {
+ return this;
+ }
+
+ @Override
+ public List