Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ under the License.
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
Expand Down
15 changes: 15 additions & 0 deletions api/src/main/java/org/apache/flink/agents/api/AgentBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ public interface AgentBuilder {
*/
AgentBuilder apply(Agent agent);

/**
* Apply an agent previously registered on the environment (typically via {@code
* env.loadYaml(...)}) by name.
*
* <p>Default implementation throws — concrete builders that have access to the environment
* override this to look up the named agent and delegate to {@link #apply(Agent)}.
*
* @param agentName the name under which the agent was registered on the environment.
* @return a configured AgentBuilder for method chaining.
*/
default AgentBuilder apply(String agentName) {
throw new UnsupportedOperationException(
"apply(String) is not supported by this AgentBuilder; only Agent instances accepted.");
}

/**
* Get output list of agent execution.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.agents.api;

import org.apache.flink.agents.api.agents.Agent;
import org.apache.flink.agents.api.configuration.Configuration;
import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceType;
Expand All @@ -42,6 +43,7 @@
*/
public abstract class AgentsExecutionEnvironment {
protected final Map<ResourceType, Map<String, Object>> resources;
protected final Map<String, Agent> agents = new HashMap<>();

protected AgentsExecutionEnvironment() {
this.resources = new HashMap<>();
Expand All @@ -50,6 +52,25 @@ protected AgentsExecutionEnvironment() {
}
}

/**
* Returns the agents registered on this environment, keyed by name.
*
* <p>Populated by {@link #loadYaml(java.nio.file.Path...)} and friends.
*/
public Map<String, Agent> getAgents() {
return agents;
}

/**
* Returns the resources registered on this environment, grouped by {@link ResourceType}.
*
* <p>Exposed primarily so YAML loading code (in a sibling package) and tests can inspect
* registered shared resources without subclassing.
*/
public Map<ResourceType, Map<String, Object>> getResources() {
return resources;
}

/**
* Get agents execution environment.
*
Expand Down Expand Up @@ -229,4 +250,20 @@ public AgentsExecutionEnvironment addResource(String name, ResourceType type, Ob
}
return this;
}

/**
* Load one or more YAML files and register their agents and shared resources on this
* environment. Duplicate names — both within a single file and across the current environment —
* raise {@link IllegalArgumentException}.
*/
public void loadYaml(java.nio.file.Path... paths) {
org.apache.flink.agents.api.yaml.YamlLoader.loadYaml(this, java.util.Arrays.asList(paths));
}

/**
* Load multiple YAML files and register their agents and shared resources on this environment.
*/
public void loadYaml(java.util.List<java.nio.file.Path> paths) {
org.apache.flink.agents.api.yaml.YamlLoader.loadYaml(this, paths);
}
}
34 changes: 26 additions & 8 deletions api/src/main/java/org/apache/flink/agents/api/agents/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.agents.api.agents;

import org.apache.flink.agents.api.function.Function;
import org.apache.flink.agents.api.function.JavaFunction;
import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.api.resource.SerializableResource;
Expand All @@ -31,7 +33,7 @@

/** Base class for defining agent logic. */
public class Agent {
private final Map<String, Tuple3<String[], Method, Map<String, Object>>> actions;
private final Map<String, Tuple3<String[], Function, Map<String, Object>>> actions;

private final Map<ResourceType, Map<String, Object>> resources;

Expand All @@ -43,7 +45,7 @@ public Agent() {
this.actions = new HashMap<>();
}

public Map<String, Tuple3<String[], Method, Map<String, Object>>> getActions() {
public Map<String, Tuple3<String[], Function, Map<String, Object>>> getActions() {
return actions;
}

Expand All @@ -60,12 +62,7 @@ public Map<ResourceType, Map<String, Object>> getResources() {
*/
public Agent addAction(
String[] eventTypes, Method method, @Nullable Map<String, Object> config) {
String name = method.getName();
if (actions.containsKey(name)) {
throw new IllegalArgumentException(String.format("Action %s already defined.", name));
}
actions.put(name, new Tuple3<>(eventTypes, method, config));
return this;
return addAction(method.getName(), eventTypes, JavaFunction.fromMethod(method), config);
}

/**
Expand All @@ -78,6 +75,27 @@ public Agent addAction(String[] eventTypes, Method method) {
return addAction(eventTypes, method, null);
}

/**
* Add action to agent.
*
* @param name The action name. Must be unique within this agent.
* @param eventTypes The event type strings this action listens to.
* @param function The api-layer function descriptor; will be promoted to a plan-layer
* executable at {@code AgentPlan} construction.
* @param config Optional config for this action.
*/
public Agent addAction(
String name,
String[] eventTypes,
Function function,
@Nullable Map<String, Object> config) {
if (actions.containsKey(name)) {
throw new IllegalArgumentException(String.format("Action %s already defined.", name));
}
actions.put(name, new Tuple3<>(eventTypes, function, config));
return this;
}

public void addResourcesIfAbsent(Map<ResourceType, Map<String, Object>> resources) {
for (ResourceType type : resources.keySet()) {
Map<String, Object> typedResources = resources.get(type);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.agents.api.function;

/**
* Pure-data marker for user-defined function descriptors carried on the api layer.
*
* <p>Implementations describe <em>which</em> function ({@link PythonFunction}, {@link
* JavaFunction}) but do not execute it. The plan-layer twins ({@code
* org.apache.flink.agents.plan.Function} and friends) own execution; the conversion from api → plan
* happens during {@code AgentPlan} construction.
*/
public interface Function {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.agents.api.function;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
* Pure-data descriptor for a Java method, identified by declaring class FQN, method name, and
* parameter types as strings.
*
* <p>Parameter types are strings — JVM primitive names ({@code int}, {@code long}, {@code boolean},
* …) or fully-qualified reference type names ({@code java.lang.String}, {@code java.util.List}). No
* generic parameters. The wire form keeps the descriptor pure data; class resolution is deferred to
* the plan-layer twin.
*/
public final class JavaFunction implements Function, Serializable {

private static final String FIELD_QUAL_NAME = "qualName";
private static final String FIELD_METHOD_NAME = "methodName";
private static final String FIELD_PARAMETER_TYPES = "parameterTypes";

@JsonProperty(FIELD_QUAL_NAME)
private final String qualName;

@JsonProperty(FIELD_METHOD_NAME)
private final String methodName;

@JsonProperty(FIELD_PARAMETER_TYPES)
private final List<String> parameterTypes;

@JsonCreator
public JavaFunction(
@JsonProperty(FIELD_QUAL_NAME) String qualName,
@JsonProperty(FIELD_METHOD_NAME) String methodName,
@JsonProperty(FIELD_PARAMETER_TYPES) List<String> parameterTypes) {
this.qualName = Objects.requireNonNull(qualName, "qualName");
this.methodName = Objects.requireNonNull(methodName, "methodName");
this.parameterTypes =
parameterTypes == null
? Collections.emptyList()
: Collections.unmodifiableList(new ArrayList<>(parameterTypes));
}

/**
* Build a descriptor from a reflected {@link Method}. Each parameter type is captured via
* {@link Class#getName()} — the same form {@link Class#forName(String)} accepts when the api
* descriptor is later promoted to its plan-layer twin. For primitives this is the keyword
* ({@code int}, {@code long}); for reference types the fully-qualified name; for array types
* the JVM-internal descriptor ({@code [I}, {@code [Ljava.lang.String;}).
*/
public static JavaFunction fromMethod(Method method) {
List<String> params = new ArrayList<>(method.getParameterCount());
for (Class<?> p : method.getParameterTypes()) {
params.add(p.getName());
}
return new JavaFunction(method.getDeclaringClass().getName(), method.getName(), params);
}

public String getQualName() {
return qualName;
}

public String getMethodName() {
return methodName;
}

public List<String> getParameterTypes() {
return parameterTypes;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof JavaFunction)) return false;
JavaFunction that = (JavaFunction) o;
return qualName.equals(that.qualName)
&& methodName.equals(that.methodName)
&& parameterTypes.equals(that.parameterTypes);
}

@Override
public int hashCode() {
return Objects.hash(qualName, methodName, parameterTypes);
}

@Override
public String toString() {
return "JavaFunction{" + qualName + "#" + methodName + parameterTypes + "}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.agents.api.function;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.io.Serializable;
import java.util.Objects;

/**
* Pure-data descriptor for a Python callable, identified by its module and qualified name.
*
* <p>Carries no execution behavior — the plan-layer {@code
* org.apache.flink.agents.plan.PythonFunction} owns invocation via the Pemja interpreter.
*/
public final class PythonFunction implements Function, Serializable {

private static final String FIELD_MODULE = "module";
private static final String FIELD_QUAL_NAME = "qualName";

@JsonProperty(FIELD_MODULE)
private final String module;

@JsonProperty(FIELD_QUAL_NAME)
private final String qualName;

@JsonCreator
public PythonFunction(
@JsonProperty(FIELD_MODULE) String module,
@JsonProperty(FIELD_QUAL_NAME) String qualName) {
this.module = Objects.requireNonNull(module, "module");
this.qualName = Objects.requireNonNull(qualName, "qualName");
}

public String getModule() {
return module;
}

public String getQualName() {
return qualName;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof PythonFunction)) return false;
PythonFunction that = (PythonFunction) o;
return module.equals(that.module) && qualName.equals(that.qualName);
}

@Override
public int hashCode() {
return Objects.hash(module, qualName);
}

@Override
public String toString() {
return "PythonFunction{" + module + ":" + qualName + "}";
}
}
Loading
Loading