diff --git a/api/src/main/java/org/apache/flink/agents/api/skills/SkillSourceSpec.java b/api/src/main/java/org/apache/flink/agents/api/skills/SkillSourceSpec.java
new file mode 100644
index 000000000..edaa4316c
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/skills/SkillSourceSpec.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.skills;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A single entry inside {@link Skills#getSources()}. {@code scheme} identifies the source type
+ * (e.g. {@code "local"}, {@code "url"}, {@code "classpath"}); {@code params} carries the
+ * scheme-specific configuration (e.g. {@code {"path": "/data/skills"}}).
+ *
+ *
The {@code scheme} is normalized to lowercase. Unknown schemes deserialize successfully — the
+ * registry is the fail point at load time.
+ */
+public final class SkillSourceSpec {
+
+ private final String scheme;
+ private final Map params;
+
+ @JsonCreator
+ public SkillSourceSpec(
+ @JsonProperty("scheme") String scheme,
+ @JsonProperty("params") Map params) {
+ this.scheme = scheme == null ? null : scheme.toLowerCase(Locale.ROOT);
+ this.params = params == null ? Collections.emptyMap() : Map.copyOf(params);
+ }
+
+ @JsonProperty("scheme")
+ public String getScheme() {
+ return scheme;
+ }
+
+ @JsonProperty("params")
+ public Map getParams() {
+ return params;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof SkillSourceSpec)) return false;
+ SkillSourceSpec that = (SkillSourceSpec) o;
+ return Objects.equals(scheme, that.scheme) && params.equals(that.params);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(scheme, params);
+ }
+
+ @Override
+ public String toString() {
+ return "SkillSourceSpec{scheme=" + scheme + ", params=" + params + "}";
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/skills/Skills.java b/api/src/main/java/org/apache/flink/agents/api/skills/Skills.java
index c4a8de7be..49cfb5824 100644
--- a/api/src/main/java/org/apache/flink/agents/api/skills/Skills.java
+++ b/api/src/main/java/org/apache/flink/agents/api/skills/Skills.java
@@ -28,14 +28,29 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
/**
* Configuration resource describing where to load agent skills from.
*
- * Mirrors the Python {@code flink_agents.api.skills.Skills}. Use {@link
- * #fromLocalDir(String...)} to construct.
+ *
The single field {@code sources} holds an ordered list of {@link SkillSourceSpec} entries.
+ * Each entry has a {@code scheme} (e.g. {@code "local"}, {@code "url"}, {@code "classpath"}, {@code
+ * "package"}) and a scheme-specific {@code params} map. Use one of the factory methods to construct
+ * a {@link Skills} resource:
*
- *
Multiple {@code @Skills} declarations on the same agent are merged at plan-build time.
+ *
+ * - {@link #fromLocalDir(String...)} for local directories or {@code .zip} files
+ *
- {@link #fromUrl(String...)} for http(s) URLs pointing to a {@code .zip}
+ *
- {@link #fromClasspath(String...)} for resources on the classpath
+ *
+ *
+ * The {@code "package"} scheme exists on the Python side only (Java has no analogous concept). A
+ * plan written by Python with {@code scheme=package} deserializes successfully on Java, but {@code
+ * SkillManager} will fail fast at load time with the registered-scheme list.
+ *
+ *
Multiple {@code @Skills} declarations on the same agent are merged at plan-build time;
+ * duplicate {@link SkillSourceSpec} entries (same {@code scheme} and {@code params}) are collapsed.
*/
@JsonIgnoreProperties(
ignoreUnknown = true,
@@ -51,31 +66,60 @@ public class Skills extends SerializableResource {
/** Reserved name of the built-in bash tool used to execute skill scripts. */
public static final String BASH_TOOL = "bash";
- private List paths;
+ private final List sources;
/** Required by Jackson. */
public Skills() {
- this.paths = Collections.emptyList();
+ this.sources = Collections.emptyList();
}
@JsonCreator
- public Skills(@JsonProperty("paths") List paths) {
- this.paths = paths == null ? Collections.emptyList() : List.copyOf(paths);
+ public Skills(@JsonProperty("sources") List sources) {
+ this.sources = sources == null ? Collections.emptyList() : List.copyOf(sources);
}
/**
- * Create a {@link Skills} resource from one or more local filesystem directories.
+ * Create a {@link Skills} resource from one or more local paths.
*
- * Each path points to a directory whose immediate subdirectories each contain a {@code
- * SKILL.md} file.
+ *
Each path may be a directory whose immediate subdirectories each contain a {@code
+ * SKILL.md} file, or a {@code .zip} file whose top-level entries are the skill subdirectories.
*/
public static Skills fromLocalDir(String... paths) {
- return new Skills(Arrays.asList(paths));
+ return new Skills(
+ Arrays.stream(paths)
+ .map(p -> new SkillSourceSpec("local", Map.of("path", p)))
+ .collect(Collectors.toList()));
+ }
+
+ /**
+ * Create a {@link Skills} resource from one or more http(s) URLs.
+ *
+ *
Each URL must point to a {@code .zip} whose top level is the baseDir.
+ */
+ public static Skills fromUrl(String... urls) {
+ return new Skills(
+ Arrays.stream(urls)
+ .map(u -> new SkillSourceSpec("url", Map.of("url", u)))
+ .collect(Collectors.toList()));
+ }
+
+ /**
+ * Create a {@link Skills} resource from one or more classpath resource paths.
+ *
+ *
Each resource may be a directory (e.g. under {@code src/main/resources/skills}) or a
+ * {@code .zip} file. When packaged into a JAR, the resource is loaded via the thread context
+ * class loader and materialized to a temp directory at runtime.
+ */
+ public static Skills fromClasspath(String... resources) {
+ return new Skills(
+ Arrays.stream(resources)
+ .map(r -> new SkillSourceSpec("classpath", Map.of("resource", r)))
+ .collect(Collectors.toList()));
}
- @JsonProperty("paths")
- public List getPaths() {
- return paths;
+ @JsonProperty("sources")
+ public List getSources() {
+ return sources;
}
@JsonIgnore
diff --git a/api/src/main/java/org/apache/flink/agents/api/yaml/YamlLoader.java b/api/src/main/java/org/apache/flink/agents/api/yaml/YamlLoader.java
index 8189c0d38..3de375b75 100644
--- a/api/src/main/java/org/apache/flink/agents/api/yaml/YamlLoader.java
+++ b/api/src/main/java/org/apache/flink/agents/api/yaml/YamlLoader.java
@@ -29,12 +29,14 @@
import org.apache.flink.agents.api.prompt.Prompt;
import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.skills.SkillSourceSpec;
import org.apache.flink.agents.api.skills.Skills;
import org.apache.flink.agents.api.tools.FunctionTool;
import org.apache.flink.agents.api.yaml.spec.ActionSpec;
import org.apache.flink.agents.api.yaml.spec.AgentActionRef;
import org.apache.flink.agents.api.yaml.spec.AgentSpec;
import org.apache.flink.agents.api.yaml.spec.DescriptorSpec;
+import org.apache.flink.agents.api.yaml.spec.PackageSkillSpec;
import org.apache.flink.agents.api.yaml.spec.PromptSpec;
import org.apache.flink.agents.api.yaml.spec.SkillsSpec;
import org.apache.flink.agents.api.yaml.spec.ToolSpec;
@@ -167,7 +169,27 @@ public static Prompt buildPrompt(PromptSpec spec) {
/** Build a {@link Skills} resource from a parsed {@link SkillsSpec}. */
public static Skills buildSkills(SkillsSpec spec) {
- return new Skills(new ArrayList<>(spec.getPaths()));
+ List sources = new ArrayList<>();
+ for (String p : spec.getPaths()) {
+ sources.add(new SkillSourceSpec("local", Map.of("path", p)));
+ }
+ for (String u : spec.getUrls()) {
+ sources.add(new SkillSourceSpec("url", Map.of("url", u)));
+ }
+ for (String r : spec.getClasspath()) {
+ sources.add(new SkillSourceSpec("classpath", Map.of("resource", r)));
+ }
+ for (PackageSkillSpec pkg : spec.getPackageEntries()) {
+ sources.add(
+ new SkillSourceSpec(
+ "package",
+ Map.of(
+ "package",
+ pkg.getPackageName(),
+ "resource",
+ pkg.getResource())));
+ }
+ return new Skills(sources);
}
/**
diff --git a/api/src/main/java/org/apache/flink/agents/api/yaml/spec/PackageSkillSpec.java b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/PackageSkillSpec.java
new file mode 100644
index 000000000..e64378d8d
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/PackageSkillSpec.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.yaml.spec;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A single {@code package} skill source entry: a Python package name plus a resource path relative
+ * to that package's root. The {@code package} scheme is Python-only at runtime — a YAML using this
+ * field deserializes on Java but fails at skill load time.
+ */
+@JsonIgnoreProperties(ignoreUnknown = false)
+public final class PackageSkillSpec {
+ private final String packageName;
+ private final String resource;
+
+ @JsonCreator
+ public PackageSkillSpec(
+ @JsonProperty(value = "package", required = true) String packageName,
+ @JsonProperty(value = "resource", required = true) String resource) {
+ this.packageName = packageName;
+ this.resource = resource;
+ }
+
+ @JsonProperty("package")
+ public String getPackageName() {
+ return packageName;
+ }
+
+ @JsonProperty("resource")
+ public String getResource() {
+ return resource;
+ }
+}
diff --git a/api/src/main/java/org/apache/flink/agents/api/yaml/spec/SkillsSpec.java b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/SkillsSpec.java
index 61136e049..567bdddb8 100644
--- a/api/src/main/java/org/apache/flink/agents/api/yaml/spec/SkillsSpec.java
+++ b/api/src/main/java/org/apache/flink/agents/api/yaml/spec/SkillsSpec.java
@@ -22,20 +22,54 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Collections;
import java.util.List;
-/** Declarative Skills resource. */
+/**
+ * Declarative Skills resource. Each list below maps to a skill source scheme:
+ *
+ *
+ * - {@code paths} — {@code local} scheme: directories or {@code .zip} files on the filesystem
+ *
- {@code urls} — {@code url} scheme: {@code http(s)} URLs pointing to a {@code .zip}
+ *
- {@code classpath} — {@code classpath} scheme: resource paths on the Java classpath
+ *
- {@code package} — {@code package} scheme (Python-only at runtime): {@code (package,
+ * resource)} pairs pointing at resources inside an installed Python package
+ *
+ *
+ * At least one of the four lists must be non-empty. {@code package} is exposed on Java for YAML
+ * schema parity with Python — it deserializes successfully but {@code SkillManager} on Java will
+ * fail at load time because Java does not register a {@code package} handler.
+ */
@JsonIgnoreProperties(ignoreUnknown = false)
public final class SkillsSpec {
private final String name;
private final List paths;
+ private final List urls;
+ private final List classpath;
+ private final List packageEntries;
@JsonCreator
public SkillsSpec(
@JsonProperty(value = "name", required = true) String name,
- @JsonProperty(value = "paths", required = true) List paths) {
+ @JsonProperty("paths") List paths,
+ @JsonProperty("urls") List urls,
+ @JsonProperty("classpath") List classpath,
+ @JsonProperty("package") List packageEntries) {
this.name = name;
- this.paths = paths;
+ this.paths = paths == null ? Collections.emptyList() : List.copyOf(paths);
+ this.urls = urls == null ? Collections.emptyList() : List.copyOf(urls);
+ this.classpath = classpath == null ? Collections.emptyList() : List.copyOf(classpath);
+ this.packageEntries =
+ packageEntries == null ? Collections.emptyList() : List.copyOf(packageEntries);
+ if (this.paths.isEmpty()
+ && this.urls.isEmpty()
+ && this.classpath.isEmpty()
+ && this.packageEntries.isEmpty()) {
+ throw new IllegalArgumentException(
+ "skills '"
+ + name
+ + "': at least one of paths/urls/classpath/package must be non-empty.");
+ }
}
public String getName() {
@@ -45,4 +79,17 @@ public String getName() {
public List getPaths() {
return paths;
}
+
+ public List getUrls() {
+ return urls;
+ }
+
+ public List getClasspath() {
+ return classpath;
+ }
+
+ @JsonProperty("package")
+ public List getPackageEntries() {
+ return packageEntries;
+ }
}
diff --git a/api/src/test/java/org/apache/flink/agents/api/skills/SkillSourceSpecTest.java b/api/src/test/java/org/apache/flink/agents/api/skills/SkillSourceSpecTest.java
new file mode 100644
index 000000000..1b29066c0
--- /dev/null
+++ b/api/src/test/java/org/apache/flink/agents/api/skills/SkillSourceSpecTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.skills;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+class SkillSourceSpecTest {
+
+ @Test
+ void schemeIsLowercased() {
+ SkillSourceSpec spec = new SkillSourceSpec("LOCAL", Map.of("path", "/x"));
+ assertEquals("local", spec.getScheme());
+ }
+
+ @Test
+ void equalityIgnoresSchemeCase() {
+ assertEquals(
+ new SkillSourceSpec("local", Map.of("path", "/x")),
+ new SkillSourceSpec("LOCAL", Map.of("path", "/x")));
+ }
+
+ @Test
+ void equalityRequiresSameParams() {
+ assertNotEquals(
+ new SkillSourceSpec("local", Map.of("path", "/x")),
+ new SkillSourceSpec("local", Map.of("path", "/y")));
+ }
+
+ @Test
+ void hashCodeMatchesEquals() {
+ SkillSourceSpec a = new SkillSourceSpec("local", Map.of("path", "/x"));
+ SkillSourceSpec b = new SkillSourceSpec("LOCAL", Map.of("path", "/x"));
+ assertEquals(a, b);
+ assertEquals(a.hashCode(), b.hashCode());
+ }
+
+ @Test
+ void jsonRoundTrip() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ SkillSourceSpec original =
+ new SkillSourceSpec("package", Map.of("package", "p", "resource", "r"));
+ String json = mapper.writeValueAsString(original);
+ SkillSourceSpec restored = mapper.readValue(json, SkillSourceSpec.class);
+ assertEquals(original, restored);
+ }
+
+ @Test
+ void unknownSchemeDeserializesSuccessfully() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ String json = "{\"scheme\":\"future-scheme\",\"params\":{\"k\":\"v\"}}";
+ SkillSourceSpec spec = mapper.readValue(json, SkillSourceSpec.class);
+ assertEquals("future-scheme", spec.getScheme());
+ assertEquals(Map.of("k", "v"), spec.getParams());
+ }
+}
diff --git a/api/src/test/java/org/apache/flink/agents/api/skills/SkillsResourceTest.java b/api/src/test/java/org/apache/flink/agents/api/skills/SkillsResourceTest.java
index 864eae42c..7b42b7d79 100644
--- a/api/src/test/java/org/apache/flink/agents/api/skills/SkillsResourceTest.java
+++ b/api/src/test/java/org/apache/flink/agents/api/skills/SkillsResourceTest.java
@@ -23,25 +23,46 @@
import org.junit.jupiter.api.Test;
import java.util.List;
+import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
class SkillsResourceTest {
@Test
- void fromLocalDirCarriesPaths() {
+ void fromLocalDirEmitsLocalScheme() {
Skills skills = Skills.fromLocalDir("/tmp/a", "/tmp/b");
- assertEquals(List.of("/tmp/a", "/tmp/b"), skills.getPaths());
+ assertEquals(
+ List.of(
+ new SkillSourceSpec("local", Map.of("path", "/tmp/a")),
+ new SkillSourceSpec("local", Map.of("path", "/tmp/b"))),
+ skills.getSources());
assertEquals(ResourceType.SKILLS, skills.getResourceType());
}
+ @Test
+ void fromUrlEmitsUrlScheme() {
+ Skills skills = Skills.fromUrl("https://example.com/x.zip");
+ assertEquals(
+ List.of(new SkillSourceSpec("url", Map.of("url", "https://example.com/x.zip"))),
+ skills.getSources());
+ }
+
+ @Test
+ void fromClasspathEmitsClasspathScheme() {
+ Skills skills = Skills.fromClasspath("skills");
+ assertEquals(
+ List.of(new SkillSourceSpec("classpath", Map.of("resource", "skills"))),
+ skills.getSources());
+ }
+
@Test
void roundTripsThroughJackson() throws Exception {
Skills original = Skills.fromLocalDir("/tmp/skill1", "/tmp/skill2");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(original);
Skills restored = mapper.readValue(json, Skills.class);
- assertEquals(original.getPaths(), restored.getPaths());
+ assertEquals(original.getSources(), restored.getSources());
}
@Test
diff --git a/api/src/test/java/org/apache/flink/agents/api/yaml/YamlLoaderBuildAgentsTest.java b/api/src/test/java/org/apache/flink/agents/api/yaml/YamlLoaderBuildAgentsTest.java
index 2209dafda..802622c21 100644
--- a/api/src/test/java/org/apache/flink/agents/api/yaml/YamlLoaderBuildAgentsTest.java
+++ b/api/src/test/java/org/apache/flink/agents/api/yaml/YamlLoaderBuildAgentsTest.java
@@ -27,6 +27,7 @@
import org.apache.flink.agents.api.prompt.Prompt;
import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.skills.SkillSourceSpec;
import org.apache.flink.agents.api.skills.Skills;
import org.apache.flink.agents.api.tools.FunctionTool;
import org.apache.flink.agents.api.yaml.YamlLoader.LoadedFile;
@@ -123,11 +124,15 @@ void skillsPerAgentAndShared() {
LoadedFile out = YamlLoader.buildAgents(FIXTURES.resolve("with_skills.yaml"));
Agent agent = out.getAgents().get("skills_agent");
Skills own = (Skills) agent.getResources().get(ResourceType.SKILLS).get("agent_skills");
- assertThat(own.getPaths()).containsExactly("./agent_skill_dir");
+ assertThat(own.getSources())
+ .containsExactly(new SkillSourceSpec("local", Map.of("path", "./agent_skill_dir")));
Skills shared =
(Skills) out.getSharedResources().get(ResourceType.SKILLS).get("shared_skills");
- assertThat(shared.getPaths()).containsExactly("./shared_skill_dir", "./more");
+ assertThat(shared.getSources())
+ .containsExactly(
+ new SkillSourceSpec("local", Map.of("path", "./shared_skill_dir")),
+ new SkillSourceSpec("local", Map.of("path", "./more")));
}
@Test
diff --git a/api/src/test/java/org/apache/flink/agents/api/yaml/YamlLoaderBuildersTest.java b/api/src/test/java/org/apache/flink/agents/api/yaml/YamlLoaderBuildersTest.java
index 84560008f..9ec57c9fc 100644
--- a/api/src/test/java/org/apache/flink/agents/api/yaml/YamlLoaderBuildersTest.java
+++ b/api/src/test/java/org/apache/flink/agents/api/yaml/YamlLoaderBuildersTest.java
@@ -26,6 +26,7 @@
import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceName;
import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.skills.SkillSourceSpec;
import org.apache.flink.agents.api.skills.Skills;
import org.apache.flink.agents.api.tools.FunctionTool;
import org.apache.flink.agents.api.yaml.spec.DescriptorSpec;
@@ -142,6 +143,24 @@ void buildPromptMessages() throws Exception {
void buildSkills() throws Exception {
SkillsSpec spec = M.readValue("name: s\npaths: [./a]\n", SkillsSpec.class);
Skills s = YamlLoader.buildSkills(spec);
- assertThat(s.getPaths()).containsExactly("./a");
+ assertThat(s.getSources())
+ .containsExactly(new SkillSourceSpec("local", Map.of("path", "./a")));
+ }
+
+ @Test
+ void buildSkillsMergesAllSchemes() throws Exception {
+ SkillsSpec spec =
+ M.readValue(
+ "name: s\n"
+ + "paths: [./a]\n"
+ + "urls: [https://x/s.zip]\n"
+ + "classpath: [com/example/s]\n",
+ SkillsSpec.class);
+ Skills s = YamlLoader.buildSkills(spec);
+ assertThat(s.getSources())
+ .containsExactly(
+ new SkillSourceSpec("local", Map.of("path", "./a")),
+ new SkillSourceSpec("url", Map.of("url", "https://x/s.zip")),
+ new SkillSourceSpec("classpath", Map.of("resource", "com/example/s")));
}
}
diff --git a/api/src/test/java/org/apache/flink/agents/api/yaml/YamlLoaderLoadYamlTest.java b/api/src/test/java/org/apache/flink/agents/api/yaml/YamlLoaderLoadYamlTest.java
index 5101d068c..0fba37e48 100644
--- a/api/src/test/java/org/apache/flink/agents/api/yaml/YamlLoaderLoadYamlTest.java
+++ b/api/src/test/java/org/apache/flink/agents/api/yaml/YamlLoaderLoadYamlTest.java
@@ -25,6 +25,7 @@
import org.apache.flink.agents.api.function.Function;
import org.apache.flink.agents.api.function.JavaFunction;
import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.skills.SkillSourceSpec;
import org.apache.flink.agents.api.skills.Skills;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -110,7 +111,10 @@ void sharedSkillsRegistered() {
AgentsExecutionEnvironment env = new TestEnv();
env.loadYaml(FIXTURES.resolve("with_skills.yaml"));
Skills shared = (Skills) env.getResources().get(ResourceType.SKILLS).get("shared_skills");
- assertThat(shared.getPaths()).containsExactly("./shared_skill_dir", "./more");
+ assertThat(shared.getSources())
+ .containsExactly(
+ new SkillSourceSpec("local", Map.of("path", "./shared_skill_dir")),
+ new SkillSourceSpec("local", Map.of("path", "./more")));
}
/** Minimal env stub — we can't instantiate LocalExecutionEnvironment from the api module. */
diff --git a/api/src/test/java/org/apache/flink/agents/api/yaml/spec/SchemaParityTest.java b/api/src/test/java/org/apache/flink/agents/api/yaml/spec/SchemaParityTest.java
index ba733cf98..ef4caa5f6 100644
--- a/api/src/test/java/org/apache/flink/agents/api/yaml/spec/SchemaParityTest.java
+++ b/api/src/test/java/org/apache/flink/agents/api/yaml/spec/SchemaParityTest.java
@@ -74,6 +74,7 @@ class SchemaParityTest {
m.put("PromptMessage", PromptMessage.class);
m.put("PromptSpec", PromptSpec.class);
m.put("SkillsSpec", SkillsSpec.class);
+ m.put("PackageSkillSpec", PackageSkillSpec.class);
m.put("ToolSpec", ToolSpec.class);
SPEC_CLASSES = Map.copyOf(m);
}
diff --git a/api/src/test/java/org/apache/flink/agents/api/yaml/spec/SkillsSpecTest.java b/api/src/test/java/org/apache/flink/agents/api/yaml/spec/SkillsSpecTest.java
index 388e33033..132ecba41 100644
--- a/api/src/test/java/org/apache/flink/agents/api/yaml/spec/SkillsSpecTest.java
+++ b/api/src/test/java/org/apache/flink/agents/api/yaml/spec/SkillsSpecTest.java
@@ -33,11 +33,30 @@ void parsesNameAndPaths() throws Exception {
SkillsSpec spec = M.readValue("name: s\npaths: [./a, ./b]\n", SkillsSpec.class);
assertThat(spec.getName()).isEqualTo("s");
assertThat(spec.getPaths()).containsExactly("./a", "./b");
+ assertThat(spec.getUrls()).isEmpty();
+ assertThat(spec.getClasspath()).isEmpty();
+ }
+
+ @Test
+ void parsesUrlsAndClasspath() throws Exception {
+ SkillsSpec spec =
+ M.readValue(
+ "name: s\nurls: [https://x/skills.zip]\nclasspath: [com/example/s]\n",
+ SkillsSpec.class);
+ assertThat(spec.getPaths()).isEmpty();
+ assertThat(spec.getUrls()).containsExactly("https://x/skills.zip");
+ assertThat(spec.getClasspath()).containsExactly("com/example/s");
}
@Test
void rejectsUnknownProperty() {
- assertThatThrownBy(() -> M.readValue("name: s\npaths: []\nextra: 1\n", SkillsSpec.class))
+ assertThatThrownBy(() -> M.readValue("name: s\npaths: [./a]\nextra: 1\n", SkillsSpec.class))
.hasMessageContaining("extra");
}
+
+ @Test
+ void rejectsAllEmpty() {
+ assertThatThrownBy(() -> M.readValue("name: s\n", SkillsSpec.class))
+ .hasMessageContaining("at least one of paths/urls/classpath");
+ }
}
diff --git a/docs/yaml-schema.json b/docs/yaml-schema.json
index 905fa4b65..ac62bb17e 100644
--- a/docs/yaml-schema.json
+++ b/docs/yaml-schema.json
@@ -216,6 +216,26 @@
"title": "MessageRole",
"type": "string"
},
+ "PackageSkillSpec": {
+ "additionalProperties": false,
+ "description": "A single ``package`` skill source entry: a Python package name plus a\nresource path relative to that package's root.",
+ "properties": {
+ "package": {
+ "title": "Package",
+ "type": "string"
+ },
+ "resource": {
+ "title": "Resource",
+ "type": "string"
+ }
+ },
+ "required": [
+ "package",
+ "resource"
+ ],
+ "title": "PackageSkillSpec",
+ "type": "object"
+ },
"PromptMessage": {
"additionalProperties": false,
"description": "One message in a multi-turn prompt template.",
@@ -279,23 +299,43 @@
},
"SkillsSpec": {
"additionalProperties": false,
- "description": "Declarative Skills resource pointing at one or more skill source\ndirectories on the local filesystem.",
+ "description": "Declarative Skills resource: one or more skill sources grouped by scheme.\n\nEach list below maps to a skill source scheme:\n\n- ``paths`` \u2014 ``local`` scheme: directories or ``.zip`` files\n- ``urls`` \u2014 ``url`` scheme: ``http(s)`` URLs pointing to a ``.zip``\n- ``classpath`` \u2014 ``classpath`` scheme (Java-only at runtime): resource\n paths on the Java classpath\n- ``package`` \u2014 ``package`` scheme (Python-only at runtime): resources\n inside installed Python packages, given as ``{package, resource}`` pairs\n\nAt least one of the four must be non-empty. ``classpath`` is exposed on\nPython for YAML schema parity with Java \u2014 it deserializes successfully\nbut ``SkillManager`` on Python will fail at load time because Python does\nnot register a ``classpath`` handler.",
"properties": {
+ "classpath": {
+ "items": {
+ "type": "string"
+ },
+ "title": "Classpath",
+ "type": "array"
+ },
"name": {
"title": "Name",
"type": "string"
},
+ "package": {
+ "items": {
+ "$ref": "#/$defs/PackageSkillSpec"
+ },
+ "title": "Package",
+ "type": "array"
+ },
"paths": {
"items": {
"type": "string"
},
"title": "Paths",
"type": "array"
+ },
+ "urls": {
+ "items": {
+ "type": "string"
+ },
+ "title": "Urls",
+ "type": "array"
}
},
"required": [
- "name",
- "paths"
+ "name"
],
"title": "SkillsSpec",
"type": "object"
diff --git a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
index 62eb0876e..605500ca1 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
@@ -28,6 +28,7 @@
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.api.resource.SerializableResource;
import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
+import org.apache.flink.agents.api.skills.SkillSourceSpec;
import org.apache.flink.agents.api.skills.Skills;
import org.apache.flink.agents.api.tools.ToolMetadata;
import org.apache.flink.agents.plan.actions.Action;
@@ -57,6 +58,7 @@
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -572,11 +574,15 @@ private void addSkills(Map skillsObjects) throws Exception {
ResourceType.TOOL,
new ResourceDescriptor(BashTool.class.getName(), new HashMap<>())));
- LinkedHashSet paths = new LinkedHashSet<>();
- for (Skills s : skillsObjects.values()) {
- paths.addAll(s.getPaths());
+ // Sort by key before merging: getDeclaredMethods() makes no order guarantee, so without
+ // this the winner on a duplicate skill name would vary across JDK / class layout.
+ List orderedKeys = new ArrayList<>(skillsObjects.keySet());
+ Collections.sort(orderedKeys);
+ LinkedHashSet sources = new LinkedHashSet<>();
+ for (String key : orderedKeys) {
+ sources.addAll(skillsObjects.get(key).getSources());
}
- Skills merged = Skills.fromLocalDir(paths.toArray(new String[0]));
+ Skills merged = new Skills(new ArrayList<>(sources));
addResourceProvider(
JavaSerializableResourceProvider.createResourceProvider(
Skills.SKILLS_CONFIG, ResourceType.SKILLS, merged));
diff --git a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareSkillsTest.java b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareSkillsTest.java
index e7605368a..c2cd4f7e2 100644
--- a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareSkillsTest.java
+++ b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareSkillsTest.java
@@ -20,12 +20,14 @@
import org.apache.flink.agents.api.agents.Agent;
import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.skills.SkillSourceSpec;
import org.apache.flink.agents.api.skills.Skills;
import org.apache.flink.agents.plan.resourceprovider.JavaResourceProvider;
import org.apache.flink.agents.plan.resourceprovider.JavaSerializableResourceProvider;
import org.apache.flink.agents.plan.resourceprovider.ResourceProvider;
import org.junit.jupiter.api.Test;
+import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -54,6 +56,47 @@ public static Skills second() {
}
}
+ public static class MultiSkillsUrlAgent extends Agent {
+ @org.apache.flink.agents.api.annotation.Skills
+ public static Skills first() {
+ return Skills.fromUrl("https://a.example/x.zip", "https://b.example/x.zip");
+ }
+
+ @org.apache.flink.agents.api.annotation.Skills
+ public static Skills second() {
+ return Skills.fromUrl("https://b.example/x.zip", "https://c.example/x.zip");
+ }
+ }
+
+ public static class MultiSkillsClasspathAgent extends Agent {
+ @org.apache.flink.agents.api.annotation.Skills
+ public static Skills first() {
+ return Skills.fromClasspath("skills-a", "skills-b");
+ }
+
+ @org.apache.flink.agents.api.annotation.Skills
+ public static Skills second() {
+ return Skills.fromClasspath("skills-b", "skills-c");
+ }
+ }
+
+ public static class MixedSchemeAgent extends Agent {
+ @org.apache.flink.agents.api.annotation.Skills
+ public static Skills locals() {
+ return Skills.fromLocalDir("/tmp/skill-a");
+ }
+
+ @org.apache.flink.agents.api.annotation.Skills
+ public static Skills urls() {
+ return Skills.fromUrl("https://a.example/x.zip");
+ }
+
+ @org.apache.flink.agents.api.annotation.Skills
+ public static Skills classpaths() {
+ return Skills.fromClasspath("skills-a");
+ }
+ }
+
public static class NoSkillsAgent extends Agent {}
@Test
@@ -83,21 +126,49 @@ void singleSkillsRegistersConfigAndBuiltInTools() throws Exception {
}
@Test
- void multipleSkillsMethodsMergePathsWithDeduplication() throws Exception {
- AgentPlan plan = new AgentPlan(new MultiSkillsAgent());
- ResourceProvider configProvider =
- plan.getResourceProviders().get(ResourceType.SKILLS).get(Skills.SKILLS_CONFIG);
- Skills merged =
- (Skills)
- ((JavaSerializableResourceProvider) configProvider)
- .provide(
- org.apache.flink.agents.api.resource.ResourceContext
- .fromGetResource((n, t) -> null));
- // Order is preserved; "/tmp/skill-b" appears once.
- assertEquals(3, merged.getPaths().size());
- assertTrue(merged.getPaths().contains("/tmp/skill-a"));
- assertTrue(merged.getPaths().contains("/tmp/skill-b"));
- assertTrue(merged.getPaths().contains("/tmp/skill-c"));
+ void multipleSkillsMethodsMergeLocalPathsWithDeduplication() throws Exception {
+ Skills merged = mergedSkillsOf(new MultiSkillsAgent());
+ assertEquals(
+ java.util.Set.of(
+ new SkillSourceSpec("local", Map.of("path", "/tmp/skill-a")),
+ new SkillSourceSpec("local", Map.of("path", "/tmp/skill-b")),
+ new SkillSourceSpec("local", Map.of("path", "/tmp/skill-c"))),
+ java.util.Set.copyOf(merged.getSources()));
+ }
+
+ @Test
+ void multipleSkillsMethodsMergeUrlsWithDeduplication() throws Exception {
+ Skills merged = mergedSkillsOf(new MultiSkillsUrlAgent());
+ assertEquals(
+ java.util.Set.of(
+ new SkillSourceSpec("url", Map.of("url", "https://a.example/x.zip")),
+ new SkillSourceSpec("url", Map.of("url", "https://b.example/x.zip")),
+ new SkillSourceSpec("url", Map.of("url", "https://c.example/x.zip"))),
+ java.util.Set.copyOf(merged.getSources()));
+ }
+
+ @Test
+ void multipleSkillsMethodsMergeClasspathResourcesWithDeduplication() throws Exception {
+ Skills merged = mergedSkillsOf(new MultiSkillsClasspathAgent());
+ assertEquals(
+ java.util.Set.of(
+ new SkillSourceSpec("classpath", Map.of("resource", "skills-a")),
+ new SkillSourceSpec("classpath", Map.of("resource", "skills-b")),
+ new SkillSourceSpec("classpath", Map.of("resource", "skills-c"))),
+ java.util.Set.copyOf(merged.getSources()));
+ }
+
+ @Test
+ void threeSchemeMixMergesInLexicalKeyOrder() throws Exception {
+ // MixedSchemeAgent's @Skills methods are classpaths/locals/urls — merging by sorted
+ // method name fixes the source order regardless of getDeclaredMethods()'s whims.
+ Skills merged = mergedSkillsOf(new MixedSchemeAgent());
+ assertEquals(
+ java.util.List.of(
+ new SkillSourceSpec("classpath", Map.of("resource", "skills-a")),
+ new SkillSourceSpec("local", Map.of("path", "/tmp/skill-a")),
+ new SkillSourceSpec("url", Map.of("url", "https://a.example/x.zip"))),
+ merged.getSources());
}
@Test
@@ -116,16 +187,20 @@ void noSkillsLeavesNoConfigProvider() throws Exception {
void programmaticSkillsAddResourceParticipates() throws Exception {
Agent agent = new NoSkillsAgent();
agent.addResource("more", ResourceType.SKILLS, Skills.fromLocalDir("/tmp/skill-d"));
+ Skills merged = mergedSkillsOf(agent);
+ assertEquals(
+ List.of(new SkillSourceSpec("local", Map.of("path", "/tmp/skill-d"))),
+ merged.getSources());
+ }
+
+ private static Skills mergedSkillsOf(Agent agent) throws Exception {
AgentPlan plan = new AgentPlan(agent);
ResourceProvider configProvider =
plan.getResourceProviders().get(ResourceType.SKILLS).get(Skills.SKILLS_CONFIG);
- assertNotNull(configProvider);
- Skills merged =
- (Skills)
- ((JavaSerializableResourceProvider) configProvider)
- .provide(
- org.apache.flink.agents.api.resource.ResourceContext
- .fromGetResource((n, t) -> null));
- assertEquals(java.util.List.of("/tmp/skill-d"), merged.getPaths());
+ return (Skills)
+ ((JavaSerializableResourceProvider) configProvider)
+ .provide(
+ org.apache.flink.agents.api.resource.ResourceContext
+ .fromGetResource((n, t) -> null));
}
}
diff --git a/python/flink_agents/api/skills.py b/python/flink_agents/api/skills.py
index 0306d9832..f1acfec68 100644
--- a/python/flink_agents/api/skills.py
+++ b/python/flink_agents/api/skills.py
@@ -17,7 +17,14 @@
#################################################################################
"""Skills configuration resource for agent skills discovery.
-Use :meth:`Skills.from_local_dir` to construct a :class:`Skills` resource.
+Each :class:`Skills` resource carries a single ordered list of
+:class:`SkillSourceSpec` entries. Each entry has a ``scheme`` (e.g.
+``"local"``, ``"url"``, ``"package"``) and a scheme-specific ``params`` map.
+Use one of the factory methods to construct a :class:`Skills` resource:
+
+* :meth:`Skills.from_local_dir` for local directories or local ``.zip`` files
+* :meth:`Skills.from_url` for http(s) URLs pointing to a ``.zip``
+* :meth:`Skills.from_package` for resources inside installed packages
Example::
@@ -26,39 +33,115 @@
def my_skills() -> Skills:
return Skills.from_local_dir("./skills")
+
+ @skills
+ @staticmethod
+ def remote_skills() -> Skills:
+ return Skills.from_url("https://example.com/skills.zip")
+
+
+ @skills
+ @staticmethod
+ def packaged_skills() -> Skills:
+ return Skills.from_package(("my_skills_pkg", "skills"))
+
+The ``"classpath"`` scheme is Java-only; a plan written by Java with
+``scheme=classpath`` deserializes successfully on Python but
+:class:`SkillManager` will fail fast at load time with the registered-scheme
+list.
+
Declare more than one ``@skills`` function on the same agent to combine
-sources; the runtime merges them.
+sources; the runtime merges them and de-duplicates identical
+:class:`SkillSourceSpec` entries.
"""
from __future__ import annotations
-from typing import List
+from typing import Dict, List, Tuple
-from pydantic import Field
+from pydantic import BaseModel, ConfigDict, Field, field_validator
from typing_extensions import override
from flink_agents.api.resource import ResourceType, SerializableResource
+class SkillSourceSpec(BaseModel):
+ """One entry in :attr:`Skills.sources`.
+
+ ``scheme`` identifies the source type; ``params`` carries the
+ scheme-specific configuration. The ``scheme`` is normalized to lowercase.
+ Unknown schemes deserialize successfully — the registry is the fail point
+ at load time.
+ """
+
+ scheme: str
+ params: Dict[str, str] = Field(default_factory=dict)
+
+ model_config = ConfigDict(frozen=True)
+
+ @field_validator("scheme")
+ @classmethod
+ def _lower(cls, v: str) -> str:
+ return v.lower()
+
+ def __hash__(self) -> int:
+ return hash((self.scheme, tuple(sorted(self.params.items()))))
+
+
class Skills(SerializableResource):
"""A resource describing where to load agent skills from.
- Use :meth:`from_local_dir` to construct — direct field construction is
- reserved for internal serialization and not part of the public API.
+ Use one of the ``from_*`` factory methods to construct — direct field
+ construction is reserved for internal serialization and not part of the
+ public API.
"""
- # Internal location list populated through the factory methods; kept
- # as a public pydantic field so it can be serialized/deserialized.
- paths: List[str] = Field(default_factory=list)
+ sources: List[SkillSourceSpec] = Field(default_factory=list)
@classmethod
def from_local_dir(cls, *paths: str) -> Skills:
- """Create a Skills resource from one or more local filesystem directories.
+ """Create a Skills resource from one or more local paths.
- Each path points to a directory whose immediate subdirectories each
- contain a ``SKILL.md`` file.
+ Each path may be a directory or a ``.zip`` file. For a directory, its
+ immediate subdirectories must each contain a ``SKILL.md`` file. For
+ a zip, its top-level entries are the skill subdirectories.
+ """
+ return cls(
+ sources=[
+ SkillSourceSpec(scheme="local", params={"path": p}) for p in paths
+ ]
+ )
+
+ @classmethod
+ def from_url(cls, *urls: str) -> Skills:
+ """Create a Skills resource from one or more http(s) URLs.
+
+ Each URL must point to a ``.zip`` whose top level is the baseDir
+ (i.e. skill subdirectories sit at the top of the zip).
+ """
+ return cls(
+ sources=[SkillSourceSpec(scheme="url", params={"url": u}) for u in urls]
+ )
+
+ @classmethod
+ def from_package(cls, *pairs: Tuple[str, str]) -> Skills:
+ """Create a Skills resource from resources inside installed packages.
+
+ Args:
+ *pairs: One or more ``(package, resource)`` tuples. ``package`` is
+ a dotted Python package name (e.g. ``"my_skills_pkg"``);
+ ``resource`` is a path inside the package, relative to the
+ package root. The resource may refer to a directory or a
+ ``.zip`` file.
"""
- return cls(paths=list(paths))
+ return cls(
+ sources=[
+ SkillSourceSpec(
+ scheme="package", params={"package": pkg, "resource": res}
+ )
+ for pkg, res in pairs
+ ]
+ )
@classmethod
@override
diff --git a/python/flink_agents/api/tests/test_skills.py b/python/flink_agents/api/tests/test_skills.py
new file mode 100644
index 000000000..7b8829c16
--- /dev/null
+++ b/python/flink_agents/api/tests/test_skills.py
@@ -0,0 +1,99 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+"""Tests for the Skills resource API."""
+
+from flink_agents.api.skills import Skills, SkillSourceSpec
+
+
+class TestSkillsFactories:
+ def test_from_local_dir_emits_local_scheme(self) -> None:
+ s = Skills.from_local_dir("/a", "/b.zip")
+ assert s.sources == [
+ SkillSourceSpec(scheme="local", params={"path": "/a"}),
+ SkillSourceSpec(scheme="local", params={"path": "/b.zip"}),
+ ]
+
+ def test_from_url_emits_url_scheme(self) -> None:
+ s = Skills.from_url("https://example.com/x.zip")
+ assert s.sources == [
+ SkillSourceSpec(
+ scheme="url", params={"url": "https://example.com/x.zip"}
+ )
+ ]
+
+ def test_from_package_single_pair(self) -> None:
+ s = Skills.from_package(("my_pkg", "skills"))
+ assert s.sources == [
+ SkillSourceSpec(
+ scheme="package", params={"package": "my_pkg", "resource": "skills"}
+ )
+ ]
+
+ def test_from_package_varargs(self) -> None:
+ s = Skills.from_package(("pkg_a", "skills"), ("pkg_b", "other"))
+ assert s.sources == [
+ SkillSourceSpec(
+ scheme="package", params={"package": "pkg_a", "resource": "skills"}
+ ),
+ SkillSourceSpec(
+ scheme="package", params={"package": "pkg_b", "resource": "other"}
+ ),
+ ]
+
+ def test_serialize_roundtrip(self) -> None:
+ s = Skills(
+ sources=[
+ SkillSourceSpec(scheme="local", params={"path": "/a"}),
+ SkillSourceSpec(
+ scheme="url", params={"url": "https://e.com/x.zip"}
+ ),
+ SkillSourceSpec(
+ scheme="package",
+ params={"package": "p", "resource": "skills"},
+ ),
+ ]
+ )
+ dumped = s.model_dump()
+ restored = Skills.model_validate(dumped)
+ assert restored.sources == s.sources
+
+
+class TestSkillSourceSpec:
+ def test_scheme_is_lowercased(self) -> None:
+ spec = SkillSourceSpec(scheme="LOCAL", params={"path": "/x"})
+ assert spec.scheme == "local"
+
+ def test_equality_ignores_scheme_case(self) -> None:
+ a = SkillSourceSpec(scheme="LOCAL", params={"path": "/x"})
+ b = SkillSourceSpec(scheme="local", params={"path": "/x"})
+ assert a == b
+
+ def test_hashable(self) -> None:
+ a = SkillSourceSpec(scheme="local", params={"path": "/x"})
+ b = SkillSourceSpec(scheme="LOCAL", params={"path": "/x"})
+ assert hash(a) == hash(b)
+ # Spec is usable as a set / dict key, supporting de-duplication during merge.
+ assert len({a, b}) == 1
+
+ def test_unknown_scheme_deserializes_successfully(self) -> None:
+ # The registry — not the model — is the fail point.
+ spec = SkillSourceSpec.model_validate(
+ {"scheme": "future-scheme", "params": {"k": "v"}}
+ )
+ assert spec.scheme == "future-scheme"
+ assert spec.params == {"k": "v"}
diff --git a/python/flink_agents/api/yaml/loader.py b/python/flink_agents/api/yaml/loader.py
index 599220533..4ec97a954 100644
--- a/python/flink_agents/api/yaml/loader.py
+++ b/python/flink_agents/api/yaml/loader.py
@@ -32,7 +32,7 @@
from flink_agents.api.function import Function, JavaFunction, PythonFunction
from flink_agents.api.prompts.prompt import Prompt
from flink_agents.api.resource import ResourceDescriptor, ResourceType
-from flink_agents.api.skills import Skills
+from flink_agents.api.skills import Skills, SkillSourceSpec
from flink_agents.api.tools.function_tool import FunctionTool
from flink_agents.api.yaml.aliases import (
JAVA_WRAPPER_CLAZZ,
@@ -199,7 +199,24 @@ def _build_prompt(spec: PromptSpec) -> Prompt:
def _build_skills(spec: SkillsSpec) -> Skills:
- return Skills(paths=list(spec.paths))
+ sources: List[SkillSourceSpec] = [
+ SkillSourceSpec(scheme="local", params={"path": p}) for p in spec.paths
+ ]
+ sources.extend(
+ SkillSourceSpec(scheme="url", params={"url": u}) for u in spec.urls
+ )
+ sources.extend(
+ SkillSourceSpec(scheme="classpath", params={"resource": r})
+ for r in spec.classpath
+ )
+ sources.extend(
+ SkillSourceSpec(
+ scheme="package",
+ params={"package": entry.package, "resource": entry.resource},
+ )
+ for entry in spec.package
+ )
+ return Skills(sources=sources)
def _build_agent(agent_spec: AgentSpec) -> Agent:
diff --git a/python/flink_agents/api/yaml/specs.py b/python/flink_agents/api/yaml/specs.py
index f806bf29d..220eccf35 100644
--- a/python/flink_agents/api/yaml/specs.py
+++ b/python/flink_agents/api/yaml/specs.py
@@ -123,15 +123,52 @@ class ToolSpec(BaseModel):
parameter_types: List[str] | None = None
+class PackageSkillSpec(BaseModel):
+ """A single ``package`` skill source entry: a Python package name plus a
+ resource path relative to that package's root.
+ """
+
+ model_config = ConfigDict(extra="forbid")
+
+ package: str
+ resource: str
+
+
class SkillsSpec(BaseModel):
- """Declarative Skills resource pointing at one or more skill source
- directories on the local filesystem.
+ """Declarative Skills resource: one or more skill sources grouped by scheme.
+
+ Each list below maps to a skill source scheme:
+
+ - ``paths`` — ``local`` scheme: directories or ``.zip`` files
+ - ``urls`` — ``url`` scheme: ``http(s)`` URLs pointing to a ``.zip``
+ - ``classpath`` — ``classpath`` scheme (Java-only at runtime): resource
+ paths on the Java classpath
+ - ``package`` — ``package`` scheme (Python-only at runtime): resources
+ inside installed Python packages, given as ``{package, resource}`` pairs
+
+ At least one of the four must be non-empty. ``classpath`` is exposed on
+ Python for YAML schema parity with Java — it deserializes successfully
+ but ``SkillManager`` on Python will fail at load time because Python does
+ not register a ``classpath`` handler.
"""
model_config = ConfigDict(extra="forbid")
name: str
- paths: List[str]
+ paths: List[str] = Field(default_factory=list)
+ urls: List[str] = Field(default_factory=list)
+ classpath: List[str] = Field(default_factory=list)
+ package: List[PackageSkillSpec] = Field(default_factory=list)
+
+ @model_validator(mode="after")
+ def _require_one_source(self) -> "SkillsSpec":
+ if not (self.paths or self.urls or self.classpath or self.package):
+ msg = (
+ f"skills '{self.name}': at least one of "
+ "paths/urls/classpath/package must be non-empty."
+ )
+ raise ValueError(msg)
+ return self
class ActionSpec(BaseModel):
diff --git a/python/flink_agents/api/yaml/tests/test_loader.py b/python/flink_agents/api/yaml/tests/test_loader.py
index 7569c9bd7..a605cc386 100644
--- a/python/flink_agents/api/yaml/tests/test_loader.py
+++ b/python/flink_agents/api/yaml/tests/test_loader.py
@@ -27,9 +27,15 @@
from flink_agents.api.function import JavaFunction, PythonFunction
from flink_agents.api.prompts.prompt import LocalPrompt
from flink_agents.api.resource import ResourceDescriptor, ResourceName, ResourceType
-from flink_agents.api.skills import Skills
+from flink_agents.api.skills import Skills, SkillSourceSpec
from flink_agents.api.tools.function_tool import FunctionTool
-from flink_agents.api.yaml.loader import build_agents, load_yaml, resolve_function
+from flink_agents.api.yaml.loader import (
+ _build_skills,
+ build_agents,
+ load_yaml,
+ resolve_function,
+)
+from flink_agents.api.yaml.specs import SkillsSpec
from flink_agents.api.yaml.tests.fixtures import loader_targets
_FIXTURES = Path(__file__).parent / "fixtures"
@@ -336,11 +342,37 @@ def test_build_agents_loads_skills_per_agent_and_shared() -> None:
own = agent.resources[ResourceType.SKILLS]["agent_skills"]
assert isinstance(own, Skills)
- assert own.paths == ["./agent_skill_dir"]
+ assert own.sources == [
+ SkillSourceSpec(scheme="local", params={"path": "./agent_skill_dir"})
+ ]
shared = shared_resources[ResourceType.SKILLS]["shared_skills"]
assert isinstance(shared, Skills)
- assert shared.paths == ["./shared_skill_dir", "./more"]
+ assert shared.sources == [
+ SkillSourceSpec(scheme="local", params={"path": "./shared_skill_dir"}),
+ SkillSourceSpec(scheme="local", params={"path": "./more"}),
+ ]
+
+
+def test_build_skills_merges_all_schemes() -> None:
+ spec = SkillsSpec.model_validate(
+ {
+ "name": "s",
+ "paths": ["./a"],
+ "urls": ["https://x/s.zip"],
+ "classpath": ["com/example/s"],
+ "package": [{"package": "my_pkg", "resource": "skills/"}],
+ }
+ )
+ skills = _build_skills(spec)
+ assert skills.sources == [
+ SkillSourceSpec(scheme="local", params={"path": "./a"}),
+ SkillSourceSpec(scheme="url", params={"url": "https://x/s.zip"}),
+ SkillSourceSpec(scheme="classpath", params={"resource": "com/example/s"}),
+ SkillSourceSpec(
+ scheme="package", params={"package": "my_pkg", "resource": "skills/"}
+ ),
+ ]
def test_load_yaml_registers_shared_skills_on_env() -> None:
@@ -348,7 +380,10 @@ def test_load_yaml_registers_shared_skills_on_env() -> None:
load_yaml(env, _FIXTURES / "with_skills.yaml")
shared = env.resources[ResourceType.SKILLS]["shared_skills"]
assert isinstance(shared, Skills)
- assert shared.paths == ["./shared_skill_dir", "./more"]
+ assert shared.sources == [
+ SkillSourceSpec(scheme="local", params={"path": "./shared_skill_dir"}),
+ SkillSourceSpec(scheme="local", params={"path": "./more"}),
+ ]
def test_build_agents_supports_type_java(tmp_path: Path) -> None:
diff --git a/python/flink_agents/api/yaml/tests/test_specs.py b/python/flink_agents/api/yaml/tests/test_specs.py
index a7c89dbde..e70285881 100644
--- a/python/flink_agents/api/yaml/tests/test_specs.py
+++ b/python/flink_agents/api/yaml/tests/test_specs.py
@@ -250,7 +250,7 @@ def test_yaml_document_with_shared_resources_and_actions() -> None:
assert doc.actions[0].name == "shared"
-def test_skills_spec_requires_paths() -> None:
+def test_skills_spec_requires_at_least_one_source() -> None:
with pytest.raises(ValidationError):
SkillsSpec.model_validate({"name": "s"})
@@ -258,6 +258,25 @@ def test_skills_spec_requires_paths() -> None:
def test_skills_spec_with_paths() -> None:
spec = SkillsSpec.model_validate({"name": "s", "paths": ["./a", "./b"]})
assert spec.paths == ["./a", "./b"]
+ assert spec.urls == []
+ assert spec.package == []
+
+
+def test_skills_spec_with_urls_classpath_package() -> None:
+ spec = SkillsSpec.model_validate(
+ {
+ "name": "s",
+ "urls": ["https://x/s.zip"],
+ "classpath": ["com/example/s"],
+ "package": [{"package": "my_pkg", "resource": "skills/"}],
+ }
+ )
+ assert spec.paths == []
+ assert spec.urls == ["https://x/s.zip"]
+ assert spec.classpath == ["com/example/s"]
+ assert len(spec.package) == 1
+ assert spec.package[0].package == "my_pkg"
+ assert spec.package[0].resource == "skills/"
def test_skills_spec_forbids_extras() -> None:
diff --git a/python/flink_agents/plan/agent_plan.py b/python/flink_agents/plan/agent_plan.py
index f38c0fc77..01e946e4a 100644
--- a/python/flink_agents/plan/agent_plan.py
+++ b/python/flink_agents/plan/agent_plan.py
@@ -499,11 +499,11 @@ def _add_skills(
# skill names and which skill manager they belong to when declaring a chat
# model setup. MCP prompts and tools face the same situation, we can refactor
# them as a whole.
- paths: List[str] = []
+ sources = []
for skills_obj in skills_objects.values():
- paths.extend(skills_obj.paths)
+ sources.extend(skills_obj.sources)
- merged = Skills.from_local_dir(*dict.fromkeys(paths))
+ merged = Skills(sources=list(dict.fromkeys(sources)))
resource_providers.append(
PythonSerializableResourceProvider.from_resource(
diff --git a/python/flink_agents/runtime/flink_runner_context.py b/python/flink_agents/runtime/flink_runner_context.py
index 387e327b7..ada5b8e30 100644
--- a/python/flink_agents/runtime/flink_runner_context.py
+++ b/python/flink_agents/runtime/flink_runner_context.py
@@ -52,7 +52,6 @@
Mem0LongTermMemory,
)
from flink_agents.runtime.resource_cache import ResourceCache
-from flink_agents.runtime.resource_context import ResourceContextImpl
logger = logging.getLogger(__name__)
@@ -274,9 +273,6 @@ def __init__(
self.__agent_plan.resource_providers, self.__agent_plan.config
)
self.__resource_cache.set_java_resource_adapter(j_resource_adapter)
- self.__resource_cache.set_resource_context(
- ResourceContextImpl(self.__resource_cache)
- )
self.executor = executor
def set_long_term_memory(self, ltm: InternalBaseLongTermMemory) -> None:
diff --git a/python/flink_agents/runtime/local_runner.py b/python/flink_agents/runtime/local_runner.py
index c575a6ec3..078fd7f1d 100644
--- a/python/flink_agents/runtime/local_runner.py
+++ b/python/flink_agents/runtime/local_runner.py
@@ -35,7 +35,6 @@
from flink_agents.runtime.agent_runner import AgentRunner
from flink_agents.runtime.local_memory_object import LocalMemoryObject
from flink_agents.runtime.resource_cache import ResourceCache
-from flink_agents.runtime.resource_context import ResourceContextImpl
if TYPE_CHECKING:
from flink_agents.plan.agent_plan import AgentPlan
@@ -88,9 +87,6 @@ def __init__(
self.__resource_cache = ResourceCache(
agent_plan.resource_providers, agent_plan.config
)
- self.__resource_cache.set_resource_context(
- ResourceContextImpl(self.__resource_cache)
- )
self.__key = key
self.events = deque()
self._sensory_mem_store = {}
diff --git a/python/flink_agents/runtime/resource_cache.py b/python/flink_agents/runtime/resource_cache.py
index f66cc7588..3f98a8b7e 100644
--- a/python/flink_agents/runtime/resource_cache.py
+++ b/python/flink_agents/runtime/resource_cache.py
@@ -18,11 +18,11 @@
from typing import Any, Dict
from flink_agents.api.resource import Resource, ResourceType
-from flink_agents.api.resource_context import ResourceContext
from flink_agents.plan.configuration import AgentConfiguration
from flink_agents.plan.function import JavaFunction
from flink_agents.plan.resource_provider import JavaResourceProvider, ResourceProvider
from flink_agents.plan.tools.function_tool import FunctionTool
+from flink_agents.runtime.resource_context import ResourceContextImpl
class ResourceCache:
@@ -54,11 +54,11 @@ def __init__(
self._config = config
self._cache: Dict[ResourceType, Dict[str, Resource]] = {}
self._j_resource_adapter: Any = None
- self._resource_context = None
+ self._resource_context = ResourceContextImpl(self)
- def set_resource_context(self, resource_context: ResourceContext) -> None:
- """Set the resource context for accessing other resource in runtime."""
- self._resource_context = resource_context
+ def get_resource_context(self) -> ResourceContextImpl:
+ """Return the long-lived ResourceContext owned by this cache."""
+ return self._resource_context
def set_java_resource_adapter(self, j_resource_adapter: Any) -> None:
"""Set Java resource adapter for Java resource providers."""
@@ -94,8 +94,15 @@ def get_resource(self, name: str, type: ResourceType) -> Resource:
return resource
def close(self) -> None:
- """Clean up all cached resources."""
+ """Clean up all cached resources and close the injected ResourceContext.
+
+ Cascades to ``ResourceContextImpl.close()`` which in turn closes the
+ cached ``SkillManager`` (releasing materialized skill temp dirs). This
+ is what releases skill resources on operator close, including Flink
+ failover when the JVM stays up.
+ """
for typed in self._cache.values():
for resource in typed.values():
resource.close()
self._cache.clear()
+ self._resource_context.close()
diff --git a/python/flink_agents/runtime/resource_context.py b/python/flink_agents/runtime/resource_context.py
index 1cfb24b95..11fb5d9ff 100644
--- a/python/flink_agents/runtime/resource_context.py
+++ b/python/flink_agents/runtime/resource_context.py
@@ -75,6 +75,19 @@ def get_skill_manager(self) -> SkillManager | None:
self._skill_manager = self._create_skill_manager()
return self._skill_manager
+ def close(self) -> None:
+ """Close the lazily-cached SkillManager, releasing materialized temp
+ directories owned by its repositories. Called via
+ ``ResourceCache.close()`` on operator close (including Flink failover
+ when the JVM stays up). Idempotent.
+ """
+ if self._skill_manager is not None:
+ try:
+ self._skill_manager.close()
+ finally:
+ self._skill_manager = None
+ self._skill_manager_initialized = False
+
def _create_skill_manager(self) -> SkillManager | None:
try:
skills_config = cast(
diff --git a/python/flink_agents/runtime/skill/agent_skill.py b/python/flink_agents/runtime/skill/agent_skill.py
index df44d48e1..d94b09c5c 100644
--- a/python/flink_agents/runtime/skill/agent_skill.py
+++ b/python/flink_agents/runtime/skill/agent_skill.py
@@ -20,6 +20,23 @@
from pydantic import BaseModel, Field, PrivateAttr
+class SkillOrigin(BaseModel):
+ """Identifies the source from which an :class:`AgentSkill` was loaded.
+
+ Attached to each skill at registration time and used for logging (e.g.
+ duplicate-name WARN) and debugging. ``scheme`` mirrors the
+ ``SkillSourceSpec`` scheme (``"local"`` / ``"url"`` / ``"package"`` /
+ ``"classpath"``); ``location`` is a human-readable identifier such as a
+ filesystem path, URL, ``/``, or classpath resource name.
+ """
+
+ scheme: str
+ location: str
+
+ def __str__(self) -> str:
+ return f"{self.scheme}:{self.location}"
+
+
class AgentSkill(BaseModel):
"""Represents an agent skill that can be loaded and used by agents.
@@ -53,6 +70,7 @@ class AgentSkill(BaseModel):
compatibility: str | None = Field(default=None, max_length=500)
metadata: Dict[str, str] | None = Field(default=None)
resources: Dict[str, str] | None = None
+ origin: SkillOrigin | None = Field(default=None)
_resource_loader: Callable[[], Dict[str, str]] | None = PrivateAttr(default=None)
_activated: bool = PrivateAttr(default=False)
@@ -61,6 +79,12 @@ def set_resource_loader(self, loader: Callable[[], Dict[str, str]]) -> None:
"""Set a lazy resource loader for this skill."""
self._resource_loader = loader
+ def set_origin(self, origin: SkillOrigin) -> None:
+ """Set the source this skill was loaded from. Called by ``SkillManager`` at
+ registration time; used for duplicate-name WARN and debugging.
+ """
+ self.origin = origin
+
def _activate(self) -> None:
"""Load resources lazily on first access."""
if not self._activated and self._resource_loader is not None:
diff --git a/python/flink_agents/runtime/skill/repository/_materialize.py b/python/flink_agents/runtime/skill/repository/_materialize.py
new file mode 100644
index 000000000..4119f6177
--- /dev/null
+++ b/python/flink_agents/runtime/skill/repository/_materialize.py
@@ -0,0 +1,166 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+"""Internal helpers for materializing skills from non-filesystem sources."""
+
+from __future__ import annotations
+
+import atexit
+import os
+import shutil
+import tempfile
+import zipfile
+from pathlib import Path
+from typing import TYPE_CHECKING
+from urllib.request import Request, urlopen
+
+if TYPE_CHECKING:
+ from typing_extensions import Self
+
+_TEMP_DIR_PREFIX = "flink-agents-skills-"
+
+
+class Materialized:
+ """Owns one temp directory plus a fallback atexit cleanup handler.
+
+ :meth:`close` unregisters the handler and removes the dir eagerly;
+ it is idempotent. Mirrors Java's ``SkillMaterializer.Materialized``.
+ """
+
+ def __init__(self, dir_: Path, *, borrowed: bool = False) -> None:
+ """Wrap ``dir_`` as an owned (or borrowed) handle.
+
+ Args:
+ dir_: The directory to wrap.
+ borrowed: If True, the caller owns the dir; ``close()`` is a
+ no-op and no atexit handler is registered.
+ """
+ self.dir = dir_
+ self._closed = False
+ self._borrowed = borrowed
+ if not borrowed:
+ atexit.register(self._cleanup)
+
+ @classmethod
+ def borrowed(cls, existing_dir: Path) -> Materialized:
+ """Wrap an existing directory the caller does not own."""
+ return cls(existing_dir, borrowed=True)
+
+ def _cleanup(self) -> None:
+ shutil.rmtree(self.dir, ignore_errors=True)
+
+ def close(self) -> None:
+ """Release the temp dir eagerly. Idempotent."""
+ if self._closed:
+ return
+ self._closed = True
+ if self._borrowed:
+ return
+ # atexit.unregister matches by identity; passing the bound method works because
+ # the same bound-method instance was registered in __init__.
+ atexit.unregister(self._cleanup)
+ self._cleanup()
+
+ def __enter__(self) -> Self:
+ return self
+
+ def __exit__(self, *exc: object) -> None:
+ self.close()
+
+
+def copy_dir_to_temp(src_dir: Path) -> Materialized:
+ """Copy ``src_dir`` into a fresh owned tempdir and return a Materialized.
+
+ Used when the caller has transient access to a directory (e.g. inside an
+ ``importlib.resources.as_file`` context) and wants a lasting owned copy
+ whose lifetime is independent of the original.
+
+ Args:
+ src_dir: Source directory to copy.
+
+ Returns:
+ A :class:`Materialized` handle owning the copied directory.
+ """
+ extract_dir = Path(tempfile.mkdtemp(prefix=_TEMP_DIR_PREFIX)).resolve()
+ materialized = Materialized(extract_dir)
+ try:
+ shutil.copytree(src_dir, extract_dir, dirs_exist_ok=True)
+ except Exception:
+ materialized.close()
+ raise
+ return materialized
+
+
+def extract_zip_safely(zip_path: Path) -> Materialized:
+ """Extract a zip into a fresh temp dir, returning a :class:`Materialized`.
+
+ Each entry is validated against zip-slip. ``close()`` the returned handle
+ to free the dir eagerly; an atexit cleanup is the fallback.
+
+ Args:
+ zip_path: Path to the zip file to extract.
+
+ Returns:
+ A :class:`Materialized` handle owning the extraction directory.
+
+ Raises:
+ ValueError: if any zip entry resolves outside the extraction directory.
+ """
+ extract_dir = Path(tempfile.mkdtemp(prefix=_TEMP_DIR_PREFIX)).resolve()
+ # Construct the handle before validation so the (empty) tempdir is always reclaimed,
+ # even if validation raises.
+ materialized = Materialized(extract_dir)
+ with zipfile.ZipFile(zip_path) as zf:
+ for member in zf.infolist():
+ target = (extract_dir / member.filename).resolve()
+ if not target.is_relative_to(extract_dir):
+ msg = f"Unsafe zip entry: {member.filename}"
+ raise ValueError(msg)
+ zf.extractall(extract_dir)
+ return materialized
+
+
+def download_to_tempfile(url: str, timeout: int = 90) -> Path:
+ """Download ``url`` to a temp file and return its path.
+
+ Uses ``urllib.request`` from the standard library. ``timeout`` is the
+ socket-level timeout passed to ``urlopen`` and applies to both the
+ connection and the read phases.
+
+ Args:
+ url: The URL to download.
+ timeout: Socket timeout in seconds.
+
+ Returns:
+ Path to the downloaded temp file (caller is responsible for deletion).
+
+ Raises:
+ urllib.error.HTTPError / URLError on HTTP or transport failures.
+ """
+ req = Request(url, method="GET")
+ # The .zip suffix is load-bearing: FileSystemSkillRepository uses
+ # path.suffix == ".zip" to detect zip input. Do not change it.
+ fd, tmp_path_str = tempfile.mkstemp(prefix=_TEMP_DIR_PREFIX, suffix=".zip")
+ os.close(fd)
+ tmp_path = Path(tmp_path_str)
+ try:
+ with urlopen(req, timeout=timeout) as resp, tmp_path.open("wb") as out:
+ shutil.copyfileobj(resp, out)
+ except Exception:
+ tmp_path.unlink(missing_ok=True)
+ raise
+ return tmp_path
diff --git a/python/flink_agents/runtime/skill/repository/filesystem_repository.py b/python/flink_agents/runtime/skill/repository/filesystem_repository.py
index 6e6eaa047..8fc7d67ee 100644
--- a/python/flink_agents/runtime/skill/repository/filesystem_repository.py
+++ b/python/flink_agents/runtime/skill/repository/filesystem_repository.py
@@ -15,196 +15,60 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-import logging
-import os
-from pathlib import Path
-from typing import Dict, List
-
-from typing_extensions import override
+"""Filesystem-backed :class:`SkillRepository`.
-from flink_agents.runtime.skill.agent_skill import AgentSkill
-from flink_agents.runtime.skill.skill_parser import SkillParser
-from flink_agents.runtime.skill.skill_repository import (
- SkillRepository,
-)
+Accepts either a directory whose immediate subdirectories each contain a
+``SKILL.md``, or a ``.zip`` file that expands into such a layout.
+"""
-logger = logging.getLogger(__name__)
+from __future__ import annotations
+from pathlib import Path
-class FileSystemSkillRepository(SkillRepository):
- """File system based implementation of SkillRepository.
+from flink_agents.runtime.skill.repository._materialize import (
+ Materialized,
+ extract_zip_safely,
+)
+from flink_agents.runtime.skill.repository.materialized_skill_repository import (
+ MaterializedSkillRepository,
+)
- This repository stores skills in a local file system directory structure
- where each skill is stored in its own subdirectory containing a SKILL.md
- file and optional resource files.
- Directory structure:
+class FileSystemSkillRepository(MaterializedSkillRepository):
+ """Filesystem-backed :class:`SkillRepository`.
- baseDir/
- ├── skill-name-1/
- │ ├── SKILL.md # Required: Entry file with YAML frontmatter
- │ ├── references/ # Optional: Reference documentation
- │ ├── examples/ # Optional: Example files
- │ └── scripts/ # Optional: Script files
- └── skill-name-2/
- └── SKILL.md
+ Accepts a directory whose immediate subdirectories each contain a
+ ``SKILL.md``, or a ``.zip`` that extracts into such a layout.
"""
- SKILL_MD_FILE = "SKILL.md"
-
- def __init__(
- self,
- base_dir: Path | str,
- ) -> None:
- """Create a FileSystemSkillRepository.
-
- Args:
- base_dir: The base directory containing skill subdirectories.
- skip_dirs: Optional set of directory names to skip.
- skip_patterns: Optional set of file patterns to skip.
+ def __init__(self, base_dir: Path | str) -> None:
+ """Open a directory or ``.zip`` of skills.
Raises:
- ValueError: If base_dir is None, doesn't exist, or is not a directory.
+ ValueError: If ``base_dir`` is None, doesn't exist, or is neither
+ a directory nor a ``.zip`` file.
"""
if base_dir is None:
msg = "Base directory cannot be None"
raise ValueError(msg)
- # Convert to Path and normalize
- self._base_dir = Path(base_dir).resolve()
+ path = Path(base_dir).resolve()
- # Validate directory exists
- if not self._base_dir.exists():
- msg = f"Base directory does not exist: {self._base_dir}"
+ if not path.exists():
+ msg = f"Path does not exist: {path}"
raise ValueError(msg)
- # Validate it's a directory
- if not self._base_dir.is_dir():
- msg = f"Base directory is not a directory: {self._base_dir}"
+ if path.is_dir():
+ materialization = Materialized.borrowed(path)
+ elif path.is_file() and path.suffix.lower() == ".zip":
+ materialization = extract_zip_safely(path)
+ else:
+ msg = f"Path must be a directory or a .zip file: {path}"
raise ValueError(msg)
+ super().__init__(materialization)
+
@property
def base_dir(self) -> Path:
- """Get the base directory.
-
- Returns:
- The base directory path.
- """
- return self._base_dir
-
- @override
- def get_skill(self, name: str) -> AgentSkill | None:
- """Get a skill by name.
-
- Args:
- name: The skill name.
-
- Returns:
- The skill, or None if not found.
- """
- skill_dir = self._base_dir / name
- skill_md_path = skill_dir / self.SKILL_MD_FILE
-
- if not skill_md_path.exists():
- return None
-
- return self._load_skill(skill_dir)
-
- @override
- def get_resources(self, name: str) -> Dict[str, str]:
- skill_dir = self._base_dir / name
- return self._load_resources(skill_dir)
-
- @override
- def get_skills(self) -> List[AgentSkill]:
- """Get all skills in this repository.
-
- Returns:
- List of all skills.
- """
- skills = []
- for skill_name in self._get_all_skill_names():
- skill = self.get_skill(skill_name)
- if skill is not None:
- skills.append(skill)
- return skills
-
- def _get_all_skill_names(self) -> List[str]:
- """Get all skill names in this repository.
-
- Returns:
- List of skill names.
- """
- return sorted(
- [
- entry.name
- for entry in self._base_dir.iterdir()
- if entry.is_dir() and (entry / self.SKILL_MD_FILE).exists()
- ]
- )
-
- def _load_skill(self, skill_dir: Path) -> AgentSkill | None:
- """Load a skill from a directory.
-
- Args:
- skill_dir: Path to the skill directory.
-
- Returns:
- The loaded skill, or None if loading failed.
- """
- skill_md_path = skill_dir / self.SKILL_MD_FILE
-
- if not skill_md_path.exists():
- return None
-
- try:
- skill_md_content = skill_md_path.read_text()
-
- skill = SkillParser.parse_skill(skill_md_content)
-
- if skill.name != skill_dir.name:
- logger.warning(
- f"The skill name {skill.name} is different from the base directory {skill_dir.name}."
- )
-
- except Exception as e:
- err_msg = f"Failed to load skill from {skill_dir}"
- raise ValueError(err_msg) from e
- else:
- return skill
-
- def _load_resources(self, skill_dir: Path) -> dict[str, str]:
- """Load all resources from a skill directory.
-
- Args:
- skill_dir: Path to the skill directory.
-
- Returns:
- Map of relative path to content.
- """
- resources = {}
-
- for root, _dirs, files in os.walk(skill_dir):
- root_path = Path(root)
-
- for file_name in files:
- # Skip SKILL.md (handled separately)
- if file_name == self.SKILL_MD_FILE:
- continue
-
- file_path = root_path / file_name
- relative_path = str(file_path.relative_to(skill_dir))
-
- try:
- # Try to read as text
- content = file_path.read_text()
- resources[relative_path] = content
- except UnicodeDecodeError:
- content = file_path.read_bytes()
- resources[relative_path] = f"base64: {content}"
- except Exception:
- logging.warning(
- f"Failed to read resource file {file_path}", exc_info=True
- )
-
- return resources
+ """Absolute base directory."""
+ return self._materialization.dir
diff --git a/python/flink_agents/runtime/skill/repository/materialized_skill_repository.py b/python/flink_agents/runtime/skill/repository/materialized_skill_repository.py
new file mode 100644
index 000000000..634f4559a
--- /dev/null
+++ b/python/flink_agents/runtime/skill/repository/materialized_skill_repository.py
@@ -0,0 +1,72 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""Base for SkillRepository implementations whose skills live on disk under a
+single :class:`Materialized` directory. Subclasses contribute only the
+source-specific I/O that produces that handle in their constructor; reading
+and close lifecycle are handled here.
+"""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Dict, List
+
+from typing_extensions import override
+
+from flink_agents.runtime.skill.repository.skill_directory_reader import (
+ SkillDirectoryReader,
+)
+from flink_agents.runtime.skill.skill_repository import SkillRepository
+
+if TYPE_CHECKING:
+ from pathlib import Path
+
+ from flink_agents.runtime.skill.agent_skill import AgentSkill
+ from flink_agents.runtime.skill.repository._materialize import Materialized
+
+
+class MaterializedSkillRepository(SkillRepository):
+ """SkillRepository whose backing data is a single :class:`Materialized`
+ directory — borrowed (already on disk) or owned (extracted from a zip /
+ download). :meth:`close` is unconditional because ``Materialized.close``
+ collapses both cases.
+ """
+
+ def __init__(self, materialization: Materialized) -> None:
+ """Initialize a :class:`MaterializedSkillRepository`."""
+ self._materialization = materialization
+ self._reader = SkillDirectoryReader(materialization.dir)
+
+ @override
+ def get_skill(self, name: str) -> AgentSkill | None:
+ return self._reader.get_skill(name)
+
+ @override
+ def get_skills(self) -> List[AgentSkill]:
+ return self._reader.get_skills()
+
+ @override
+ def get_resources(self, name: str) -> Dict[str, str]:
+ return self._reader.get_resources(name)
+
+ @override
+ def get_skill_dir(self, name: str) -> Path:
+ return self._reader.get_skill_dir(name)
+
+ @override
+ def close(self) -> None:
+ self._materialization.close()
diff --git a/python/flink_agents/runtime/skill/repository/package_repository.py b/python/flink_agents/runtime/skill/repository/package_repository.py
new file mode 100644
index 000000000..7547b6077
--- /dev/null
+++ b/python/flink_agents/runtime/skill/repository/package_repository.py
@@ -0,0 +1,94 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""Package-resource based :class:`SkillRepository`.
+
+Resolves a resource (directory or zip) inside an installed Python package via
+:mod:`importlib.resources` and materializes it once at construction time into
+a process-owned temp directory.
+"""
+
+from __future__ import annotations
+
+from importlib.resources import as_file, files
+from pathlib import Path
+
+from flink_agents.runtime.skill.repository._materialize import (
+ copy_dir_to_temp,
+ extract_zip_safely,
+)
+from flink_agents.runtime.skill.repository.materialized_skill_repository import (
+ MaterializedSkillRepository,
+)
+
+
+class PackageSkillRepository(MaterializedSkillRepository):
+ """Skill repository backed by a resource inside an installed Python package.
+
+ The resource (directory or ``.zip``) is copied / extracted into a
+ process-owned temp directory at construction time. The
+ ``importlib.resources.as_file`` context is released immediately after
+ materialization, so close lifecycle reduces to releasing that single owned
+ temp directory.
+ """
+
+ def __init__(self, package: str, resource: str) -> None:
+ """Open ``resource`` inside ``package``.
+
+ Args:
+ package: Dotted package name.
+ resource: Path inside the package; directory or ``.zip``.
+
+ Raises:
+ ValueError: If the resource is missing, or is neither a directory
+ nor a ``.zip`` file.
+ """
+ traversable = files(package).joinpath(resource)
+ if not traversable.is_dir() and not traversable.is_file():
+ msg = f"Resource {resource!r} not found in package {package!r}"
+ raise ValueError(msg)
+
+ self._package = package
+ self._resource = resource
+
+ # Materialize inside the as_file context so the path is valid, then
+ # let the context release; the resulting Materialized owns its own
+ # temp dir and outlives the context.
+ with as_file(traversable) as path:
+ path = Path(path)
+ if path.is_dir():
+ materialization = copy_dir_to_temp(path)
+ elif path.is_file() and path.suffix.lower() == ".zip":
+ materialization = extract_zip_safely(path)
+ else:
+ msg = (
+ f"Package resource must be a directory or a .zip: "
+ f"{package}/{resource}"
+ )
+ raise ValueError(msg)
+
+ super().__init__(materialization)
+
+ @property
+ def package(self) -> str:
+ """Source package this repo was loaded from."""
+ return self._package
+
+ @property
+ def resource(self) -> str:
+ """Resource path inside :attr:`package`."""
+ return self._resource
diff --git a/python/flink_agents/runtime/skill/repository/skill_directory_reader.py b/python/flink_agents/runtime/skill/repository/skill_directory_reader.py
new file mode 100644
index 000000000..983df5f17
--- /dev/null
+++ b/python/flink_agents/runtime/skill/repository/skill_directory_reader.py
@@ -0,0 +1,139 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""Read-only accessor for an on-disk directory of skills.
+
+Composed by :class:`SkillRepository` implementations to handle the
+"parse SKILL.md under base_dir" half of their work, leaving each repo
+free to manage its own materialization and ``close()`` story.
+"""
+
+from __future__ import annotations
+
+import base64
+import logging
+import os
+from pathlib import Path
+from typing import TYPE_CHECKING, Dict, List
+
+from flink_agents.runtime.skill.skill_parser import SkillParser
+
+if TYPE_CHECKING:
+ from flink_agents.runtime.skill.agent_skill import AgentSkill
+
+logger = logging.getLogger(__name__)
+
+
+class SkillDirectoryReader:
+ """Reads skills from an already-materialized directory. No lifecycle."""
+
+ SKILL_MD_FILE = "SKILL.md"
+
+ def __init__(self, base_dir: Path) -> None:
+ """Wrap ``base_dir`` (must be an existing directory)."""
+ if base_dir is None:
+ msg = "Base directory cannot be None"
+ raise ValueError(msg)
+ resolved = Path(base_dir).resolve()
+ if not resolved.exists():
+ msg = f"Path does not exist: {resolved}"
+ raise ValueError(msg)
+ if not resolved.is_dir():
+ msg = f"Path must be a directory: {resolved}"
+ raise ValueError(msg)
+ self._base_dir = resolved
+
+ @property
+ def base_dir(self) -> Path:
+ """Absolute base directory."""
+ return self._base_dir
+
+ def get_skill_dir(self, name: str) -> Path:
+ """``base_dir / name``; existence not checked."""
+ return self._base_dir / name
+
+ def get_skill(self, name: str) -> AgentSkill | None:
+ """Parse ``base_dir/name/SKILL.md``; ``None`` if absent."""
+ skill_dir = self._base_dir / name
+ skill_md_path = skill_dir / self.SKILL_MD_FILE
+ if not skill_md_path.exists():
+ return None
+ return self._load_skill(skill_dir)
+
+ def get_skills(self) -> List[AgentSkill]:
+ """All skills under ``base_dir``, sorted by name."""
+ skills: List[AgentSkill] = []
+ for skill_name in self._list_skill_names():
+ skill = self.get_skill(skill_name)
+ if skill is not None:
+ skills.append(skill)
+ return skills
+
+ def get_resources(self, name: str) -> Dict[str, str]:
+ """All non-``SKILL.md`` files under the named skill, keyed by relative path."""
+ skill_dir = self._base_dir / name
+ if not skill_dir.is_dir():
+ return {}
+ return self._load_resources(skill_dir)
+
+ def _list_skill_names(self) -> List[str]:
+ return sorted(
+ [
+ entry.name
+ for entry in self._base_dir.iterdir()
+ if entry.is_dir() and (entry / self.SKILL_MD_FILE).exists()
+ ]
+ )
+
+ def _load_skill(self, skill_dir: Path) -> AgentSkill | None:
+ skill_md_path = skill_dir / self.SKILL_MD_FILE
+ if not skill_md_path.exists():
+ return None
+ try:
+ content = skill_md_path.read_text()
+ skill = SkillParser.parse_skill(content)
+ if skill.name != skill_dir.name:
+ logger.warning(
+ "The skill name %s is different from the base directory %s.",
+ skill.name,
+ skill_dir.name,
+ )
+ except Exception as e:
+ err_msg = f"Failed to load skill from {skill_dir}"
+ raise ValueError(err_msg) from e
+ else:
+ return skill
+
+ def _load_resources(self, skill_dir: Path) -> Dict[str, str]:
+ resources: Dict[str, str] = {}
+ for root, _dirs, files in os.walk(skill_dir):
+ root_path = Path(root)
+ for file_name in files:
+ if file_name == self.SKILL_MD_FILE:
+ continue
+ file_path = root_path / file_name
+ relative_path = str(file_path.relative_to(skill_dir))
+ try:
+ resources[relative_path] = file_path.read_text()
+ except UnicodeDecodeError:
+ encoded = base64.b64encode(file_path.read_bytes()).decode("ascii")
+ resources[relative_path] = f"base64: {encoded}"
+ except Exception:
+ logger.warning(
+ "Failed to read resource file %s", file_path, exc_info=True
+ )
+ return resources
diff --git a/python/flink_agents/runtime/skill/repository/url_repository.py b/python/flink_agents/runtime/skill/repository/url_repository.py
new file mode 100644
index 000000000..b4e4f8886
--- /dev/null
+++ b/python/flink_agents/runtime/skill/repository/url_repository.py
@@ -0,0 +1,66 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""URL-based :class:`SkillRepository`.
+
+Downloads a zip from an http(s) URL into a temp file, extracts it into a
+process-local temp directory, and reads skills from there.
+"""
+
+from __future__ import annotations
+
+from flink_agents.runtime.skill.repository._materialize import (
+ download_to_tempfile,
+ extract_zip_safely,
+)
+from flink_agents.runtime.skill.repository.materialized_skill_repository import (
+ MaterializedSkillRepository,
+)
+
+_REQUEST_TIMEOUT_SEC = 90
+
+
+class URLSkillRepository(MaterializedSkillRepository):
+ """Skill repository backed by an http(s) URL pointing to a zip.
+
+ The zip is downloaded then extracted into a process-local temp directory
+ (released eagerly via :meth:`close` or at process exit).
+ """
+
+ def __init__(self, url: str) -> None:
+ """Download and extract the zip at ``url``.
+
+ Raises:
+ ValueError: If the URL is not http(s).
+ urllib.error.HTTPError / URLError: On transport/HTTP failures.
+ """
+ if not url.startswith(("http://", "https://")):
+ msg = f"Only http(s) URLs are supported: {url}"
+ raise ValueError(msg)
+
+ self._url = url
+ tmp_zip = download_to_tempfile(url, timeout=_REQUEST_TIMEOUT_SEC)
+ try:
+ materialization = extract_zip_safely(tmp_zip)
+ finally:
+ tmp_zip.unlink(missing_ok=True)
+ super().__init__(materialization)
+
+ @property
+ def url(self) -> str:
+ """Source URL this repo was loaded from."""
+ return self._url
diff --git a/python/flink_agents/runtime/skill/skill_manager.py b/python/flink_agents/runtime/skill/skill_manager.py
index deb9ce82b..c5ccf5925 100644
--- a/python/flink_agents/runtime/skill/skill_manager.py
+++ b/python/flink_agents/runtime/skill/skill_manager.py
@@ -15,19 +15,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
+import contextlib
+import logging
from pathlib import Path
from typing import TYPE_CHECKING, Dict, List
-from flink_agents.api.skills import Skills
-from flink_agents.runtime.skill.agent_skill import AgentSkill
-from flink_agents.runtime.skill.repository.filesystem_repository import (
- FileSystemSkillRepository,
-)
+from flink_agents.api.skills import Skills, SkillSourceSpec
+from flink_agents.runtime.skill import skill_source_registry
+from flink_agents.runtime.skill.agent_skill import AgentSkill, SkillOrigin
from flink_agents.runtime.skill.skill_prompt_provider import SkillPromptProvider
if TYPE_CHECKING:
from flink_agents.runtime.skill.skill_repository import SkillRepository
+logger = logging.getLogger(__name__)
+
+
+def _origin_of(spec: SkillSourceSpec) -> SkillOrigin:
+ """Build a SkillOrigin from a spec for diagnostics.
+
+ Delegates location description to the handler so a new scheme is one
+ ``register()`` call, not a parallel switch here.
+ """
+ handler = skill_source_registry.get(spec.scheme)
+ return SkillOrigin(
+ scheme=spec.scheme,
+ location=handler.describe_location(spec.params),
+ )
+
class SkillManager:
"""Internal runtime component for loading, parsing, and managing skills.
@@ -45,8 +60,12 @@ def __init__(self, skills_config: Skills) -> None:
"""Initialize the SkillManager from a Skills configuration."""
self._skills: Dict[str, AgentSkill] = {}
self._repos: Dict[str, SkillRepository] = {}
+ # Every opened repo in load order, kept separately from `_repos` because that
+ # map is keyed by skill name — duplicate names overwrite the earlier repo's
+ # reference, so close() iterates this list (id-deduped) instead.
+ self._opened_repos: List[SkillRepository] = []
self._config = skills_config
- self._load_skills_from_paths()
+ self._load_skills()
@property
def size(self) -> int:
@@ -93,23 +112,30 @@ def get_skill_dirs(self, *names: str) -> List[str]:
"""Return absolute directory paths for the given skill names.
If no names are provided, returns directories for all filesystem-backed
- skills. Unknown names and skills not backed by a filesystem repo are
- silently skipped.
+ skills. Skills not backed by a filesystem repo are silently skipped.
+
+ Raises:
+ ValueError: If any provided name is not a registered skill.
"""
selected = names if names else tuple(self._repos.keys())
dirs: List[str] = []
for skill_name in selected:
repo = self._repos.get(skill_name)
- if isinstance(repo, FileSystemSkillRepository):
- dirs.append(str(repo.base_dir / skill_name))
+ if repo is None:
+ msg = (
+ f"Skill {skill_name} not found, "
+ f"available skill names are: {list(self._repos.keys())}"
+ )
+ raise ValueError(msg)
+ dir_path = repo.get_skill_dir(skill_name)
+ if dir_path is not None:
+ dirs.append(str(dir_path))
return dirs
def get_skill_dir(self, skill_name: str) -> Path | None:
"""Return absolute directory path for a single skill, if filesystem-backed."""
repo = self._repos.get(skill_name)
- if isinstance(repo, FileSystemSkillRepository):
- return repo.base_dir / skill_name
- return None
+ return None if repo is None else repo.get_skill_dir(skill_name)
def resolve_resource_path(self, skill_name: str, resource_path: str) -> Path | None:
"""Resolve a skill resource's relative path to an absolute filesystem path.
@@ -117,18 +143,68 @@ def resolve_resource_path(self, skill_name: str, resource_path: str) -> Path | N
Returns None if the skill's repository doesn't support path resolution.
"""
repo = self._repos.get(skill_name)
- if isinstance(repo, FileSystemSkillRepository):
- resolved = repo.base_dir / skill_name / resource_path
- if resolved.exists() and resolved.is_file():
- return resolved
- return None
-
- def _load_skills_from_paths(self) -> None:
- for path in self._config.paths:
- repo = FileSystemSkillRepository(path)
- for skill in repo.get_skills():
- skill.set_resource_loader(
- lambda name=skill.name, r=repo: r.get_resources(name)
+ if repo is None:
+ return None
+ dir_path = repo.get_skill_dir(skill_name)
+ if dir_path is None:
+ return None
+ resolved = dir_path / resource_path
+ return resolved if resolved.is_file() else None
+
+ def _load_skills(self) -> None:
+ for spec in self._config.sources:
+ try:
+ handler = skill_source_registry.get(spec.scheme)
+ repo = handler.open(spec.params)
+ self._opened_repos.append(repo)
+ except (OSError, ValueError) as e:
+ # Release repos opened by earlier iterations — the caller never
+ # receives a SkillManager reference to clean them up via close()
+ # itself, so without this their temp dirs / atexit handlers leak
+ # until interpreter exit.
+ self.close()
+ msg = (
+ f"Failed to load skills from {spec.scheme}:{spec.params}"
)
- self._skills[skill.name] = skill
- self._repos[skill.name] = repo
+ raise RuntimeError(msg) from e
+ self._register_repo(repo, _origin_of(spec))
+
+ def _register_repo(self, repo: "SkillRepository", origin: SkillOrigin) -> None:
+ for skill in repo.get_skills():
+ skill.set_resource_loader(
+ lambda name=skill.name, r=repo: r.get_resources(name)
+ )
+ skill.set_origin(origin)
+ previous = self._skills.get(skill.name)
+ if previous is not None:
+ logger.warning(
+ "Skill '%s' from %s overrides earlier registration from %s",
+ skill.name,
+ origin,
+ previous.origin if previous.origin is not None else "",
+ )
+ self._skills[skill.name] = skill
+ self._repos[skill.name] = repo
+
+ def close(self) -> None:
+ """Close every opened :class:`SkillRepository`, releasing any temp directory
+ materialized for URL / classpath-zip / package sources. Idempotent.
+
+ Iterates ``_opened_repos`` rather than ``_repos.values()``: duplicate skill
+ names overwrite the earlier repo's reference in ``_repos``, but the displaced
+ repo is still owned and must be closed. Dedup by identity in case the same
+ repo contributes multiple skills.
+ """
+ seen: set[int] = set()
+ for repo in self._opened_repos:
+ if id(repo) in seen:
+ continue
+ seen.add(id(repo))
+ with contextlib.suppress(Exception):
+ repo.close()
+
+ def __enter__(self) -> "SkillManager":
+ return self
+
+ def __exit__(self, *exc: object) -> None:
+ self.close()
diff --git a/python/flink_agents/runtime/skill/skill_repository.py b/python/flink_agents/runtime/skill/skill_repository.py
index 1d2cd4073..5cca1f8b0 100644
--- a/python/flink_agents/runtime/skill/skill_repository.py
+++ b/python/flink_agents/runtime/skill/skill_repository.py
@@ -17,6 +17,7 @@
#################################################################################
from abc import ABC, abstractmethod
from dataclasses import dataclass
+from pathlib import Path
from typing import Dict, List
from flink_agents.runtime.skill.agent_skill import AgentSkill
@@ -42,43 +43,31 @@ class SkillRepositoryInfo:
class SkillRepository(ABC):
- """Abstract interface for skill repositories.
+ """Source of skills, loaded from filesystem / classpath / URL / package.
- A SkillRepository is responsible for loading and optionally storing skills
- from a specific source (filesystem, classpath, URL, etc.).
-
- Each skill is stored in its own subdirectory containing a SKILL.md file
- and optional resource files:
-
- baseDir/
- ├── skill-name-1/
- │ ├── SKILL.md # Required: Entry file with YAML frontmatter
- │ ├── references/ # Optional: Reference documentation
- │ ├── examples/ # Optional: Example files
- │ └── scripts/ # Optional: Script files
- └── skill-name-2/
- └── SKILL.md
+ Each skill lives under ``base_dir//`` with a required ``SKILL.md``
+ and optional resource files (``references/``, ``scripts/``, ...).
"""
@abstractmethod
- def get_skill(self, name: str) -> str:
- """Get a skill by name.
-
- Args:
- name: The skill name.
-
- Returns:
- The skill, or None if not found.
- """
+ def get_skill(self, name: str) -> AgentSkill | None:
+ """Return the named skill, or ``None`` if absent."""
@abstractmethod
def get_skills(self) -> List[AgentSkill]:
- """Get all skills in this repository.
-
- Returns:
- List of all skills.
- """
+ """Return all skills in this repository."""
@abstractmethod
def get_resources(self, name: str) -> Dict[str, str]:
- """Get resources for the specified skill."""
+ """Return resources for the named skill, keyed by relative path."""
+
+ def get_skill_dir(self, name: str) -> Path | None:
+ """Absolute on-disk directory for the named skill.
+
+ Filesystem-backed implementations return ``base_dir / name``
+ without checking existence. Non-filesystem-backed return ``None``.
+ """
+ return None
+
+ def close(self) -> None: # noqa: B027
+ """Release any owned temp directory. Default no-op; idempotent."""
diff --git a/python/flink_agents/runtime/skill/skill_source_registry.py b/python/flink_agents/runtime/skill/skill_source_registry.py
new file mode 100644
index 000000000..c193126a6
--- /dev/null
+++ b/python/flink_agents/runtime/skill/skill_source_registry.py
@@ -0,0 +1,131 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+"""Scheme-keyed registry mapping a skill source scheme (e.g. ``"local"``,
+``"url"``, ``"package"``) to a handler that opens a :class:`SkillRepository`.
+Built-ins are registered at import time; external code may add custom schemes
+via :func:`register`.
+
+Adding a new source = one :func:`register` call plus one ``SkillRepository``
+implementation. ``SkillManager`` need not change.
+
+The Java side has an analogous ``SkillSourceRegistry`` registering ``local``
+/ ``url`` / ``classpath``. Cross-language unsupported schemes (Java's
+``classpath`` on Python, Python's ``package`` on Java) hit the registry's
+fail-loud path at load time.
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from typing import TYPE_CHECKING, Callable, Dict, Mapping
+
+from flink_agents.runtime.skill.repository.filesystem_repository import (
+ FileSystemSkillRepository,
+)
+from flink_agents.runtime.skill.repository.package_repository import (
+ PackageSkillRepository,
+)
+from flink_agents.runtime.skill.repository.url_repository import URLSkillRepository
+
+if TYPE_CHECKING:
+ from flink_agents.runtime.skill.skill_repository import SkillRepository
+
+
+@dataclass(frozen=True)
+class SkillSourceHandler:
+ """Pair of (open, describe_location) bound to a scheme.
+
+ ``describe_location`` returns the human-readable source location for
+ :class:`SkillOrigin`. The default falls back to the raw params dict;
+ built-ins override it to point at the relevant param (e.g. ``path`` for
+ local). Keeping description on the handler removes the parallel scheme
+ ladder ``SkillManager`` would otherwise need.
+ """
+
+ open: Callable[[Mapping[str, str]], SkillRepository]
+ describe_location: Callable[[Mapping[str, str]], str] = field(
+ default=lambda params: str(dict(params)),
+ )
+
+
+_HANDLERS: Dict[str, SkillSourceHandler] = {}
+
+
+def register(
+ scheme: str,
+ open_fn: Callable[[Mapping[str, str]], SkillRepository],
+ describe_location: Callable[[Mapping[str, str]], str] | None = None,
+) -> None:
+ """Register a handler under ``scheme``. Scheme is normalized to lowercase.
+
+ ``describe_location`` defaults to the raw params dict; pass a tighter
+ function (e.g. ``lambda p: p.get("path", "")``) to produce a clean origin.
+ """
+ handler = (
+ SkillSourceHandler(open=open_fn)
+ if describe_location is None
+ else SkillSourceHandler(open=open_fn, describe_location=describe_location)
+ )
+ _HANDLERS[scheme.lower()] = handler
+
+
+def get(scheme: str) -> SkillSourceHandler:
+ """Return the handler for ``scheme``, or raise if unknown.
+
+ The error message lists the currently registered schemes to aid debugging
+ cross-language plan mismatches.
+ """
+ handler = _HANDLERS.get(scheme.lower())
+ if handler is None:
+ msg = (
+ f"Unknown skill source scheme: {scheme}. "
+ f"Registered schemes: {sorted(_HANDLERS.keys())}"
+ )
+ raise ValueError(msg)
+ return handler
+
+
+def _require(params: Mapping[str, str], scheme: str, key: str) -> str:
+ value = params.get(key)
+ if value is None:
+ msg = (
+ f"Missing required param '{key}' for skill source scheme "
+ f"'{scheme}'. Got: {dict(params)}"
+ )
+ raise ValueError(msg)
+ return value
+
+
+register(
+ "local",
+ lambda params: FileSystemSkillRepository(_require(params, "local", "path")),
+ lambda params: params.get("path", ""),
+)
+register(
+ "url",
+ lambda params: URLSkillRepository(_require(params, "url", "url")),
+ lambda params: params.get("url", ""),
+)
+register(
+ "package",
+ lambda params: PackageSkillRepository(
+ _require(params, "package", "package"),
+ _require(params, "package", "resource"),
+ ),
+ lambda params: f"{params.get('package', '')}/{params.get('resource', '')}",
+)
diff --git a/python/flink_agents/runtime/skill/tests/test_skill_repository.py b/python/flink_agents/runtime/skill/tests/test_filesystem_repository.py
similarity index 64%
rename from python/flink_agents/runtime/skill/tests/test_skill_repository.py
rename to python/flink_agents/runtime/skill/tests/test_filesystem_repository.py
index 8a0f1bd2a..51d55b5a8 100644
--- a/python/flink_agents/runtime/skill/tests/test_skill_repository.py
+++ b/python/flink_agents/runtime/skill/tests/test_filesystem_repository.py
@@ -17,6 +17,7 @@
################################################################################
"""Unit tests for Skill Repository components."""
+import zipfile
from pathlib import Path
import pytest
@@ -47,9 +48,9 @@ def test_create_repository_invalid_path(self) -> None:
FileSystemSkillRepository("/nonexistent/path")
def test_get_all_skill_names(self, skills_dir: Path) -> None:
- """Test getting all skill names."""
+ """Test getting all skills (verifies skill discovery)."""
repo = FileSystemSkillRepository(skills_dir)
- names = repo._get_all_skill_names()
+ names = [s.name for s in repo.get_skills()]
assert len(names) == 2
assert "github" in names
assert "nano-banana-pro" in names
@@ -94,3 +95,48 @@ def test_get_all_skills(self, skills_dir: Path) -> None:
names = {s.name for s in skills}
assert "github" in names
assert "nano-banana-pro" in names
+
+
+def _zip_dir(src: Path, dst_zip: Path) -> None:
+ """Zip ``src`` so that ``src``'s immediate children are at the zip top level."""
+ with zipfile.ZipFile(dst_zip, "w", zipfile.ZIP_DEFLATED) as zf:
+ for path in src.rglob("*"):
+ if path.is_file():
+ zf.write(path, arcname=path.relative_to(src))
+
+
+class TestFileSystemSkillRepositoryZip:
+ @pytest.fixture
+ def skills_zip(self, tmp_path: Path) -> Path:
+ # Reuse the existing on-disk skills directory; package it into a zip
+ # whose top level is the baseDir (skill subdirs at the top).
+ src = Path(__file__).parent / "resources" / "skills"
+ zip_path = tmp_path / "skills.zip"
+ _zip_dir(src, zip_path)
+ return zip_path
+
+ def test_create_repository_from_zip(self, skills_zip: Path) -> None:
+ repo = FileSystemSkillRepository(skills_zip)
+
+ # base_dir resolves to a temp directory containing the extracted layout
+ assert repo.base_dir.is_dir()
+ names = {p.name for p in repo.base_dir.iterdir()}
+ assert "github" in names
+ assert "nano-banana-pro" in names
+
+ def test_get_skills_from_zip(self, skills_zip: Path) -> None:
+ repo = FileSystemSkillRepository(skills_zip)
+ skills = repo.get_skills()
+ names = {s.name for s in skills}
+ assert names == {"github", "nano-banana-pro"}
+
+ def test_get_resources_from_zip(self, skills_zip: Path) -> None:
+ repo = FileSystemSkillRepository(skills_zip)
+ resources = repo.get_resources("nano-banana-pro")
+ assert "_meta.json" in resources
+
+ def test_invalid_path_kind(self, tmp_path: Path) -> None:
+ non_zip_file = tmp_path / "data.txt"
+ non_zip_file.write_text("not a zip")
+ with pytest.raises(ValueError, match=r"must be a directory or a \.zip file"):
+ FileSystemSkillRepository(non_zip_file)
diff --git a/python/flink_agents/runtime/skill/tests/test_manager.py b/python/flink_agents/runtime/skill/tests/test_manager.py
index 3e4cb2534..4944f846b 100644
--- a/python/flink_agents/runtime/skill/tests/test_manager.py
+++ b/python/flink_agents/runtime/skill/tests/test_manager.py
@@ -17,11 +17,20 @@
################################################################################
"""Unit tests for SkillManager and skill tools."""
+import importlib
+import shutil
+import sys
+import threading
+import zipfile
+from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
import pytest
-from flink_agents.api.skills import Skills
+from flink_agents.api.skills import Skills, SkillSourceSpec
+from flink_agents.runtime.skill.repository.package_repository import (
+ PackageSkillRepository,
+)
from flink_agents.runtime.skill.skill_manager import SkillManager
base_dir = Path(__file__).parent
@@ -75,6 +84,35 @@ def test_get_skill(self, skills_dir: Path) -> None:
== "Generate/edit images with Nano Banana Pro (Gemini 3 Pro Image). Use for image create/modify requests incl. edits. Supports text-to-image + image-to-image; 1K/2K/4K; use --input-image."
)
+ def test_origin_is_attached_after_load(self, skills_dir: Path) -> None:
+ manager = SkillManager(Skills.from_local_dir(str(skills_dir)))
+ origin = manager.get_skill("github").origin
+ assert origin is not None
+ assert origin.scheme == "local"
+ assert origin.location == str(skills_dir)
+
+ def test_duplicate_skill_name_last_write_wins_with_new_origin(
+ self, tmp_path: Path
+ ) -> None:
+ # Two distinct local source dirs each contain a single skill named "dup".
+ dir_a = tmp_path / "a"
+ dir_b = tmp_path / "b"
+ for d, tag in [(dir_a, "from-a"), (dir_b, "from-b")]:
+ skill_dir = d / "dup"
+ skill_dir.mkdir(parents=True)
+ (skill_dir / "SKILL.md").write_text(
+ f"---\nname: dup\ndescription: dummy ({tag})\n---\nbody {tag}"
+ )
+ config = Skills(
+ sources=[
+ SkillSourceSpec(scheme="local", params={"path": str(dir_a)}),
+ SkillSourceSpec(scheme="local", params={"path": str(dir_b)}),
+ ]
+ )
+ manager = SkillManager(config)
+ # Second source wins on collision; check via origin.
+ assert manager.get_skill("dup").origin.location == str(dir_b)
+
def test_load_skill_resource(self, skills_dir: Path) -> None:
"""Test loading a skill resource."""
manager = SkillManager(Skills.from_local_dir(str(skills_dir)))
@@ -86,3 +124,236 @@ def test_load_skill_resource(self, skills_dir: Path) -> None:
nonexistent = skill.get_resource("nonexistent")
assert nonexistent is None
+
+ def test_get_skill_dirs_raises_for_unknown_name(self, skills_dir: Path) -> None:
+ manager = SkillManager(Skills.from_local_dir(str(skills_dir)))
+ with pytest.raises(ValueError) as exc_info:
+ manager.get_skill_dirs("does-not-exist")
+ assert "does-not-exist" in str(exc_info.value)
+ assert "github" in str(exc_info.value)
+
+
+def _zip_dir(src: Path, dst_zip: Path) -> None:
+ with zipfile.ZipFile(dst_zip, "w", zipfile.ZIP_DEFLATED) as zf:
+ for path in src.rglob("*"):
+ if path.is_file():
+ zf.write(path, arcname=path.relative_to(src))
+
+
+class _ZipHandler(BaseHTTPRequestHandler):
+ zip_bytes: bytes = b""
+
+ def do_GET(self) -> None:
+ self.send_response(200)
+ self.send_header("Content-Type", "application/zip")
+ self.send_header("Content-Length", str(len(type(self).zip_bytes)))
+ self.end_headers()
+ self.wfile.write(type(self).zip_bytes)
+
+ def log_message(self, *_args: object) -> None:
+ pass
+
+
+@pytest.fixture
+def mixed_sources(tmp_path: Path):
+ """Yields (dir_path, zip_url, pkg_name, pkg_resource).
+
+ Spins up a local http.server, builds a tmp package on sys.path with
+ a skills/ subdirectory, and zips the existing skills tree for the
+ URL fixture. Cleans up all three on teardown.
+ """
+ src = base_dir / "resources" / "skills"
+
+ dir_path = str(src)
+
+ zip_path = tmp_path / "skills.zip"
+ _zip_dir(src, zip_path)
+ _ZipHandler.zip_bytes = zip_path.read_bytes()
+ server = HTTPServer(("127.0.0.1", 0), _ZipHandler)
+ port = server.server_address[1]
+ thread = threading.Thread(target=server.serve_forever, daemon=True)
+ thread.start()
+
+ pkg_root = tmp_path / "pkg_root"
+ pkg_root.mkdir()
+ pkg_name = "_flink_agents_test_mixed_pkg"
+ pkg_dir = pkg_root / pkg_name
+ pkg_dir.mkdir()
+ (pkg_dir / "__init__.py").write_text("")
+ shutil.copytree(src, pkg_dir / "skills")
+ sys.path.insert(0, str(pkg_root))
+ importlib.invalidate_caches()
+
+ try:
+ yield (
+ dir_path,
+ f"http://127.0.0.1:{port}/skills.zip",
+ pkg_name,
+ "skills",
+ )
+ finally:
+ server.shutdown()
+ server.server_close()
+ _ZipHandler.zip_bytes = b""
+ sys.path.remove(str(pkg_root))
+ sys.modules.pop(pkg_name, None)
+
+
+class TestSkillManagerMixedSources:
+ def test_url_only_loads_skills(self, mixed_sources) -> None:
+ _dir, url, _pkg, _resource = mixed_sources
+ config = Skills.from_url(url)
+ manager = SkillManager(config)
+ assert set(manager.get_all_skill_names()) == {"github", "nano-banana-pro"}
+
+ def test_package_only_loads_skills(self, mixed_sources) -> None:
+ _dir, _url, pkg, resource = mixed_sources
+ config = Skills.from_package((pkg, resource))
+ manager = SkillManager(config)
+ assert set(manager.get_all_skill_names()) == {"github", "nano-banana-pro"}
+
+ def test_loads_from_all_sources(self, mixed_sources) -> None:
+ dir_path, url, pkg, resource = mixed_sources
+ # All three sources expose the same skills. Dispatch runs in source-list
+ # order (local -> url -> package); registration is last-wins, so the
+ # final repo for each skill must be a PackageSkillRepository if all
+ # three sources actually executed.
+ config = Skills(
+ sources=[
+ SkillSourceSpec(scheme="local", params={"path": dir_path}),
+ SkillSourceSpec(scheme="url", params={"url": url}),
+ SkillSourceSpec(
+ scheme="package",
+ params={"package": pkg, "resource": resource},
+ ),
+ ]
+ )
+ manager = SkillManager(config)
+ assert set(manager.get_all_skill_names()) == {"github", "nano-banana-pro"}
+ assert all(
+ isinstance(r, PackageSkillRepository) for r in manager._repos.values()
+ )
+
+ def test_close_releases_url_repo_temp_dir(self, mixed_sources) -> None:
+ _dir, url, _pkg, _resource = mixed_sources
+ config = Skills.from_url(url)
+ with SkillManager(config) as manager:
+ skill_dir = manager.get_skill_dir("github")
+ assert skill_dir is not None
+ extract_root = skill_dir.parent
+ assert skill_dir.exists()
+ assert extract_root.exists()
+ # Context manager exit triggered close.
+ assert not extract_root.exists(), (
+ "SkillManager.close() must release the URL repo's temp dir"
+ )
+
+ def test_unknown_scheme_fails_loud(self) -> None:
+ config = Skills(
+ sources=[SkillSourceSpec(scheme="future-scheme", params={"k": "v"})]
+ )
+ with pytest.raises(RuntimeError) as exc_info:
+ SkillManager(config)
+ assert "future-scheme" in str(exc_info.value)
+ # The registered-scheme list comes from the chained ValueError.
+ assert "local" in str(exc_info.value.__cause__)
+ assert "package" in str(exc_info.value.__cause__)
+
+ def test_close_releases_repo_displaced_by_duplicate_skill_name(self) -> None:
+ from typing import Dict, List
+
+ from flink_agents.runtime.skill import skill_source_registry
+ from flink_agents.runtime.skill.agent_skill import AgentSkill
+ from flink_agents.runtime.skill.skill_repository import SkillRepository
+
+ closed: List[int] = []
+
+ class FakeRepo(SkillRepository):
+ def __init__(self, tag: int) -> None:
+ self._tag = tag
+
+ def get_skill(self, name: str) -> AgentSkill | None:
+ return self.get_skills()[0] if name == "dup" else None
+
+ def get_skills(self) -> List[AgentSkill]:
+ return [AgentSkill(name="dup", description="dummy", content="body")]
+
+ def get_resources(self, name: str) -> Dict[str, str]:
+ return {}
+
+ def close(self) -> None:
+ closed.append(self._tag)
+
+ counter = {"n": 0}
+
+ def opener(params) -> SkillRepository:
+ counter["n"] += 1
+ return FakeRepo(counter["n"])
+
+ skill_source_registry.register("test-dup-close", opener)
+
+ config = Skills(
+ sources=[
+ SkillSourceSpec(scheme="test-dup-close", params={}),
+ SkillSourceSpec(scheme="test-dup-close", params={}),
+ ]
+ )
+ manager = SkillManager(config)
+ manager.close()
+
+ assert sorted(closed) == [1, 2], (
+ "duplicate-name registration must not orphan the displaced repo"
+ )
+
+ def test_partial_load_failure_closes_already_opened_repos(self) -> None:
+ from typing import Dict, List
+
+ from flink_agents.runtime.skill import skill_source_registry
+ from flink_agents.runtime.skill.agent_skill import AgentSkill
+ from flink_agents.runtime.skill.skill_repository import SkillRepository
+
+ closed: List[str] = []
+
+ class FakeRepo(SkillRepository):
+ def __init__(self, skill_name: str) -> None:
+ self._skill_name = skill_name
+
+ def get_skill(self, name: str) -> AgentSkill | None:
+ return self.get_skills()[0] if name == self._skill_name else None
+
+ def get_skills(self) -> List[AgentSkill]:
+ return [
+ AgentSkill(
+ name=self._skill_name, description="dummy", content="body"
+ )
+ ]
+
+ def get_resources(self, name: str) -> Dict[str, str]:
+ return {}
+
+ def close(self) -> None:
+ closed.append(self._skill_name)
+
+ counter = {"n": 0}
+
+ def opener(params) -> SkillRepository:
+ counter["n"] += 1
+ if counter["n"] == 2:
+ msg = "boom"
+ raise OSError(msg)
+ return FakeRepo(f"skill-{counter['n']}")
+
+ skill_source_registry.register("test-partial-load", opener)
+
+ config = Skills(
+ sources=[
+ SkillSourceSpec(scheme="test-partial-load", params={}),
+ SkillSourceSpec(scheme="test-partial-load", params={}),
+ ]
+ )
+
+ with pytest.raises(RuntimeError):
+ SkillManager(config)
+ assert closed == ["skill-1"], (
+ "the repo opened before the partial-load failure must be closed"
+ )
diff --git a/python/flink_agents/runtime/skill/tests/test_materialize.py b/python/flink_agents/runtime/skill/tests/test_materialize.py
new file mode 100644
index 000000000..2285e7a2c
--- /dev/null
+++ b/python/flink_agents/runtime/skill/tests/test_materialize.py
@@ -0,0 +1,152 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+"""Unit tests for the _materialize utility module."""
+
+import threading
+import zipfile
+from http.server import BaseHTTPRequestHandler, HTTPServer
+from pathlib import Path
+from urllib.error import HTTPError
+
+import pytest
+
+from flink_agents.runtime.skill.repository._materialize import (
+ Materialized,
+ download_to_tempfile,
+ extract_zip_safely,
+)
+
+
+def _make_zip(zip_path: Path, entries: dict[str, str]) -> None:
+ with zipfile.ZipFile(zip_path, "w") as zf:
+ for name, content in entries.items():
+ zf.writestr(name, content)
+
+
+class TestExtractZipSafely:
+ def test_extracts_top_level_entries(self, tmp_path: Path) -> None:
+ zip_path = tmp_path / "skills.zip"
+ _make_zip(
+ zip_path,
+ {
+ "skill-a/SKILL.md": "---\nname: skill-a\n---\nbody",
+ "skill-b/SKILL.md": "---\nname: skill-b\n---\nbody",
+ },
+ )
+
+ with extract_zip_safely(zip_path) as m:
+ extract_dir = m.dir
+ assert extract_dir.is_dir()
+ assert (extract_dir / "skill-a" / "SKILL.md").read_text().startswith("---")
+ assert (extract_dir / "skill-b" / "SKILL.md").is_file()
+
+ def test_rejects_zip_slip_relative(self, tmp_path: Path) -> None:
+ zip_path = tmp_path / "evil.zip"
+ _make_zip(zip_path, {"../evil.txt": "pwn"})
+
+ with pytest.raises(ValueError, match="Unsafe zip entry"):
+ extract_zip_safely(zip_path)
+
+ def test_rejects_zip_slip_absolute(self, tmp_path: Path) -> None:
+ # Defense-in-depth: CPython's extractall already strips leading slashes,
+ # but we reject absolute entries explicitly so we don't depend on that.
+ zip_path = tmp_path / "evil.zip"
+ _make_zip(zip_path, {"/etc/evil.txt": "pwn"})
+
+ with pytest.raises(ValueError, match="Unsafe zip entry"):
+ extract_zip_safely(zip_path)
+
+
+class TestMaterialized:
+ def test_close_removes_dir(self, tmp_path: Path) -> None:
+ zip_path = tmp_path / "skills.zip"
+ _make_zip(zip_path, {"skill-a/SKILL.md": "---\nname: skill-a\n---\nbody"})
+ m = extract_zip_safely(zip_path)
+ extracted = m.dir
+ assert extracted.exists()
+
+ m.close()
+ assert not extracted.exists(), "close() must remove the temp dir"
+
+ # Idempotent.
+ m.close()
+
+ def test_borrowed_does_not_remove_dir(self, tmp_path: Path) -> None:
+ target = tmp_path / "borrowed"
+ target.mkdir()
+ m = Materialized.borrowed(target)
+ m.close()
+ assert target.exists(), "borrowed dirs must not be deleted on close"
+
+
+class _StaticHandler(BaseHTTPRequestHandler):
+ payload: bytes = b""
+ status: int = 200
+
+ def do_GET(self) -> None:
+ self.send_response(type(self).status)
+ self.send_header("Content-Length", str(len(type(self).payload)))
+ self.end_headers()
+ self.wfile.write(type(self).payload)
+
+ def log_message(self, *_args: object) -> None:
+ pass
+
+
+@pytest.fixture
+def static_server() -> "tuple[str, type[_StaticHandler]]":
+ _StaticHandler.payload = b""
+ _StaticHandler.status = 200
+ server = HTTPServer(("127.0.0.1", 0), _StaticHandler)
+ port = server.server_address[1]
+ thread = threading.Thread(target=server.serve_forever, daemon=True)
+ thread.start()
+ try:
+ yield f"http://127.0.0.1:{port}", _StaticHandler
+ finally:
+ server.shutdown()
+ server.server_close()
+ _StaticHandler.payload = b""
+ _StaticHandler.status = 200
+
+
+class TestDownloadToTempfile:
+ def test_downloads_bytes(
+ self, static_server: "tuple[str, type[_StaticHandler]]"
+ ) -> None:
+ base_url, handler = static_server
+ handler.payload = b"hello-zip-bytes"
+ handler.status = 200
+
+ path = download_to_tempfile(f"{base_url}/anything", timeout=10)
+
+ try:
+ assert path.is_file()
+ assert path.read_bytes() == b"hello-zip-bytes"
+ finally:
+ path.unlink(missing_ok=True)
+
+ def test_raises_on_http_error(
+ self, static_server: "tuple[str, type[_StaticHandler]]"
+ ) -> None:
+ base_url, handler = static_server
+ handler.payload = b""
+ handler.status = 404
+
+ with pytest.raises(HTTPError):
+ download_to_tempfile(f"{base_url}/missing", timeout=10)
diff --git a/python/flink_agents/runtime/skill/tests/test_package_repository.py b/python/flink_agents/runtime/skill/tests/test_package_repository.py
new file mode 100644
index 000000000..5b1490f57
--- /dev/null
+++ b/python/flink_agents/runtime/skill/tests/test_package_repository.py
@@ -0,0 +1,84 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+"""Unit tests for PackageSkillRepository."""
+
+import importlib
+import shutil
+import sys
+import zipfile
+from pathlib import Path
+
+import pytest
+
+from flink_agents.runtime.skill.repository.package_repository import (
+ PackageSkillRepository,
+)
+
+
+def _zip_dir(src: Path, dst_zip: Path) -> None:
+ with zipfile.ZipFile(dst_zip, "w", zipfile.ZIP_DEFLATED) as zf:
+ for path in src.rglob("*"):
+ if path.is_file():
+ zf.write(path, arcname=path.relative_to(src))
+
+
+@pytest.fixture
+def installed_pkg(tmp_path: Path) -> str:
+ """Create a tmp Python package on sys.path with skills as both a directory
+ and a zip resource. Yields the package name.
+ """
+ pkg_root = tmp_path / "pkg_root"
+ pkg_root.mkdir()
+ pkg_name = "_flink_agents_test_skills_pkg"
+ pkg_dir = pkg_root / pkg_name
+ pkg_dir.mkdir()
+ (pkg_dir / "__init__.py").write_text("")
+
+ # Copy the existing skills/ directory into the package as a directory resource.
+ src_skills = Path(__file__).parent / "resources" / "skills"
+ shutil.copytree(src_skills, pkg_dir / "skills")
+
+ # Build a zip resource alongside the directory.
+ _zip_dir(src_skills, pkg_dir / "skills.zip")
+
+ sys.path.insert(0, str(pkg_root))
+ importlib.invalidate_caches()
+ try:
+ yield pkg_name
+ finally:
+ sys.path.remove(str(pkg_root))
+ # Remove cached module so it doesn't leak across tests.
+ sys.modules.pop(pkg_name, None)
+
+
+class TestPackageSkillRepository:
+ def test_load_directory_resource(self, installed_pkg: str) -> None:
+ repo = PackageSkillRepository(installed_pkg, "skills")
+
+ names = {s.name for s in repo.get_skills()}
+ assert names == {"github", "nano-banana-pro"}
+
+ def test_load_zip_resource(self, installed_pkg: str) -> None:
+ repo = PackageSkillRepository(installed_pkg, "skills.zip")
+
+ names = {s.name for s in repo.get_skills()}
+ assert names == {"github", "nano-banana-pro"}
+
+ def test_missing_resource(self, installed_pkg: str) -> None:
+ with pytest.raises(ValueError, match="not found in package"):
+ PackageSkillRepository(installed_pkg, "no_such_resource")
diff --git a/python/flink_agents/runtime/skill/tests/test_skill_directory_reader.py b/python/flink_agents/runtime/skill/tests/test_skill_directory_reader.py
new file mode 100644
index 000000000..6143919e5
--- /dev/null
+++ b/python/flink_agents/runtime/skill/tests/test_skill_directory_reader.py
@@ -0,0 +1,106 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+"""Unit tests for SkillDirectoryReader."""
+
+import base64
+from pathlib import Path
+
+import pytest
+
+from flink_agents.runtime.skill.repository.skill_directory_reader import (
+ SkillDirectoryReader,
+)
+
+
+def _write_skill(base_dir: Path, name: str, body: str) -> None:
+ skill_dir = base_dir / name
+ skill_dir.mkdir(parents=True)
+ (skill_dir / "SKILL.md").write_text(
+ f"---\nname: {name}\ndescription: dummy skill\n---\n{body}"
+ )
+
+
+class TestSkillDirectoryReader:
+ def test_rejects_none(self) -> None:
+ with pytest.raises(ValueError):
+ SkillDirectoryReader(None) # type: ignore[arg-type]
+
+ def test_rejects_nonexistent_path(self, tmp_path: Path) -> None:
+ with pytest.raises(ValueError):
+ SkillDirectoryReader(tmp_path / "missing")
+
+ def test_rejects_non_directory(self, tmp_path: Path) -> None:
+ file_path = tmp_path / "not-a-dir.txt"
+ file_path.write_text("x")
+ with pytest.raises(ValueError):
+ SkillDirectoryReader(file_path)
+
+ def test_get_skill_dir_returns_base_slash_name(self, tmp_path: Path) -> None:
+ reader = SkillDirectoryReader(tmp_path)
+ # Returns even for missing names — caller verifies existence.
+ assert reader.get_skill_dir("anything") == (tmp_path.resolve() / "anything")
+
+ def test_get_skill_returns_none_for_missing(self, tmp_path: Path) -> None:
+ reader = SkillDirectoryReader(tmp_path)
+ assert reader.get_skill("missing") is None
+
+ def test_get_skills_lists_every_skill_subdir(self, tmp_path: Path) -> None:
+ _write_skill(tmp_path, "alpha", "body alpha")
+ _write_skill(tmp_path, "beta", "body beta")
+ # A non-skill subdirectory must be ignored.
+ (tmp_path / "not-a-skill").mkdir()
+
+ reader = SkillDirectoryReader(tmp_path)
+ names = [s.name for s in reader.get_skills()]
+ assert names == ["alpha", "beta"]
+
+ def test_get_resources_exposes_non_skill_md_files(self, tmp_path: Path) -> None:
+ _write_skill(tmp_path, "gamma", "body gamma")
+ skill_dir = tmp_path / "gamma"
+ (skill_dir / "notes.txt").write_text("hello")
+ (skill_dir / "scripts").mkdir()
+ (skill_dir / "scripts" / "run.sh").write_text("echo hi")
+
+ reader = SkillDirectoryReader(tmp_path)
+ resources = reader.get_resources("gamma")
+ assert resources == {
+ "notes.txt": "hello",
+ "scripts/run.sh": "echo hi",
+ }
+
+ def test_get_resources_empty_for_missing_skill(self, tmp_path: Path) -> None:
+ reader = SkillDirectoryReader(tmp_path)
+ assert reader.get_resources("missing") == {}
+
+ def test_binary_resource_round_trips_through_base64(self, tmp_path: Path) -> None:
+ # PNG signature + a few non-UTF-8 bytes — must come back identical after
+ # b64-decoding the value with the "base64: " prefix stripped, otherwise
+ # the encoder is silently emitting something other than base64
+ # (e.g. the f-string-of-bytes-repr bug that survived the first review).
+ _write_skill(tmp_path, "binary-skill", "body")
+ original = bytes(
+ [0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A, 0xF8, 0x88, 0x80, 0x80]
+ )
+ (tmp_path / "binary-skill" / "blob.bin").write_bytes(original)
+
+ reader = SkillDirectoryReader(tmp_path)
+ encoded = reader.get_resources("binary-skill")["blob.bin"]
+
+ assert encoded.startswith("base64: ")
+ decoded = base64.b64decode(encoded.removeprefix("base64: "))
+ assert decoded == original
diff --git a/python/flink_agents/runtime/skill/tests/test_url_repository.py b/python/flink_agents/runtime/skill/tests/test_url_repository.py
new file mode 100644
index 000000000..6eaf1eae0
--- /dev/null
+++ b/python/flink_agents/runtime/skill/tests/test_url_repository.py
@@ -0,0 +1,99 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+"""Unit tests for URLSkillRepository."""
+
+import threading
+import zipfile
+from http.server import BaseHTTPRequestHandler, HTTPServer
+from pathlib import Path
+from urllib.error import HTTPError
+
+import pytest
+
+from flink_agents.runtime.skill.repository.url_repository import URLSkillRepository
+
+
+def _zip_dir(src: Path, dst_zip: Path) -> None:
+ with zipfile.ZipFile(dst_zip, "w", zipfile.ZIP_DEFLATED) as zf:
+ for path in src.rglob("*"):
+ if path.is_file():
+ zf.write(path, arcname=path.relative_to(src))
+
+
+@pytest.fixture
+def skills_zip_path(tmp_path: Path) -> Path:
+ src = Path(__file__).parent / "resources" / "skills"
+ zip_path = tmp_path / "skills.zip"
+ _zip_dir(src, zip_path)
+ return zip_path
+
+
+class _ZipHandler(BaseHTTPRequestHandler):
+ zip_bytes: bytes = b""
+ status: int = 200
+
+ def do_GET(self) -> None:
+ self.send_response(type(self).status)
+ self.send_header("Content-Type", "application/zip")
+ self.send_header("Content-Length", str(len(type(self).zip_bytes)))
+ self.end_headers()
+ self.wfile.write(type(self).zip_bytes)
+
+ def log_message(self, *_args: object) -> None:
+ pass
+
+
+@pytest.fixture
+def zip_server(skills_zip_path: Path) -> "tuple[str, type[_ZipHandler]]":
+ _ZipHandler.zip_bytes = skills_zip_path.read_bytes()
+ _ZipHandler.status = 200
+ server = HTTPServer(("127.0.0.1", 0), _ZipHandler)
+ port = server.server_address[1]
+ thread = threading.Thread(target=server.serve_forever, daemon=True)
+ thread.start()
+ try:
+ yield f"http://127.0.0.1:{port}/skills.zip", _ZipHandler
+ finally:
+ server.shutdown()
+ server.server_close()
+ _ZipHandler.zip_bytes = b""
+ _ZipHandler.status = 200
+
+
+class TestURLSkillRepository:
+ def test_load_from_url(self, zip_server: "tuple[str, type[_ZipHandler]]") -> None:
+ url, _handler = zip_server
+ repo = URLSkillRepository(url)
+
+ skills = repo.get_skills()
+ names = {s.name for s in skills}
+ assert names == {"github", "nano-banana-pro"}
+
+ def test_non_http_url_rejected(self) -> None:
+ with pytest.raises(ValueError, match="Only http"):
+ URLSkillRepository("file:///tmp/skills.zip")
+
+ with pytest.raises(ValueError, match="Only http"):
+ URLSkillRepository("ftp://example.com/skills.zip")
+
+ def test_404_error(self, zip_server: "tuple[str, type[_ZipHandler]]") -> None:
+ url, handler = zip_server
+ handler.status = 404
+
+ with pytest.raises(HTTPError):
+ URLSkillRepository(url)
diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/PythonMCPResourceDiscovery.java b/runtime/src/main/java/org/apache/flink/agents/runtime/PythonMCPResourceDiscovery.java
index 32af34280..356ec67be 100644
--- a/runtime/src/main/java/org/apache/flink/agents/runtime/PythonMCPResourceDiscovery.java
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/PythonMCPResourceDiscovery.java
@@ -25,7 +25,6 @@
import org.apache.flink.agents.plan.resource.python.PythonMCPTool;
import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider;
import org.apache.flink.agents.plan.resourceprovider.ResourceProvider;
-import org.apache.flink.agents.runtime.resource.ResourceContextImpl;
import java.util.Map;
@@ -71,17 +70,7 @@ public static void discoverPythonMCPResources(
PythonResourceProvider provider = (PythonResourceProvider) rp;
provider.setPythonResourceAdapter(adapter);
- PythonMCPServer server =
- (PythonMCPServer)
- provider.provide(
- new ResourceContextImpl(
- (name, type) -> {
- try {
- return cache.getResource(name, type);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }));
+ PythonMCPServer server = (PythonMCPServer) provider.provide(cache.getResourceContext());
for (PythonMCPTool tool : server.listTools()) {
cache.put(tool.getName(), TOOL, tool);
diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/ResourceCache.java b/runtime/src/main/java/org/apache/flink/agents/runtime/ResourceCache.java
index 8e56bb988..01b920373 100644
--- a/runtime/src/main/java/org/apache/flink/agents/runtime/ResourceCache.java
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/ResourceCache.java
@@ -46,20 +46,49 @@ public class ResourceCache implements AutoCloseable {
private final Map> resourceProviders;
private final Map> cache = new ConcurrentHashMap<>();
private volatile PythonResourceAdapter pythonResourceAdapter;
+ private final ResourceContextImpl resourceContext;
- public ResourceCache(Map> resourceProviders) {
+ /**
+ * Construct a cache that resolves {@code classpath:} skill sources via {@code classLoader}.
+ * Production code passes the Flink user-code class loader (from {@code
+ * ActionExecutionOperator.getRuntimeContext().getUserCodeClassLoader()}); tests may call {@link
+ * #ResourceCache(Map)}.
+ */
+ public ResourceCache(
+ Map> resourceProviders,
+ ClassLoader classLoader) {
// Defensive copy: the cache must not be affected by later mutations to the source map.
this.resourceProviders = new HashMap<>();
for (Map.Entry> entry :
resourceProviders.entrySet()) {
this.resourceProviders.put(entry.getKey(), new HashMap<>(entry.getValue()));
}
+
+ this.resourceContext =
+ new ResourceContextImpl(
+ (name, type) -> {
+ try {
+ return this.getResource(name, type);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ },
+ classLoader);
+ }
+
+ /** Convenience overload that uses the current thread's context class loader. */
+ public ResourceCache(Map> resourceProviders) {
+ this(resourceProviders, Thread.currentThread().getContextClassLoader());
}
void setPythonResourceAdapter(PythonResourceAdapter adapter) {
this.pythonResourceAdapter = adapter;
}
+ public ResourceContextImpl getResourceContext() {
+ return resourceContext;
+ }
+
/**
* Resolves a resource by name and type, creating it from its provider if not cached.
*
@@ -87,16 +116,7 @@ public synchronized Resource getResource(String name, ResourceType type) throws
((PythonResourceProvider) provider).setPythonResourceAdapter(pythonResourceAdapter);
}
- Resource resource =
- provider.provide(
- new ResourceContextImpl(
- (anotherName, anotherType) -> {
- try {
- return this.getResource(anotherName, anotherType);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }));
+ Resource resource = provider.provide(resourceContext);
if (pythonResourceAdapter != null && resource instanceof FunctionTool) {
((FunctionTool) resource).setPythonResourceAdapter(pythonResourceAdapter);
@@ -135,6 +155,15 @@ public void close() throws Exception {
}
}
cache.clear();
+ try {
+ resourceContext.close();
+ } catch (Exception e) {
+ if (firstException == null) {
+ firstException = e;
+ } else {
+ firstException.addSuppressed(e);
+ }
+ }
if (firstException != null) {
throw firstException;
}
diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 899f463d6..a0a3813da 100644
--- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -144,7 +144,16 @@ public void open() throws Exception {
stateManager.initializeKeyedStates(getRuntimeContext());
stateManager.initializeOperatorStates(getOperatorStateBackend());
- resourceCache = new ResourceCache(agentPlan.getResourceProviders());
+ // ResourceCache constructs its own long-lived ResourceContextImpl internally; on
+ // close() the cache cascades close to it and to the cached SkillManager, covering
+ // Flink failover when the JVM does not exit. The user-code class loader is threaded
+ // down so classpath: skill sources resolve against the Flink user JAR regardless of
+ // which thread (mailbox / Python interpreter / async pool) later triggers the lazy
+ // SkillManager construction.
+ resourceCache =
+ new ResourceCache(
+ agentPlan.getResourceProviders(),
+ getRuntimeContext().getUserCodeClassLoader());
metricGroup = new FlinkAgentsMetricGroupImpl(getMetricGroup());
builtInMetrics = new BuiltInMetrics(metricGroup, agentPlan);
diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/PythonBridgeManager.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/PythonBridgeManager.java
index 73eb6a500..cea9cce54 100644
--- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/PythonBridgeManager.java
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/PythonBridgeManager.java
@@ -18,7 +18,6 @@
package org.apache.flink.agents.runtime.operator;
import org.apache.flink.agents.api.memory.LongTermMemoryOptions;
-import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.plan.AgentPlan;
import org.apache.flink.agents.plan.JavaFunction;
import org.apache.flink.agents.plan.PythonFunction;
@@ -33,7 +32,6 @@
import org.apache.flink.agents.runtime.python.utils.JavaResourceAdapter;
import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
import org.apache.flink.agents.runtime.python.utils.PythonResourceAdapterImpl;
-import org.apache.flink.agents.runtime.resource.ResourceContextImpl;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.python.env.PythonDependencyInfo;
@@ -165,14 +163,7 @@ void open(
javaResourceAdapter =
new JavaResourceAdapter(
- new ResourceContextImpl(
- (name, type) -> {
- try {
- return resourceCache.getResource(name, type);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }),
+ resourceCache.getResourceContext(),
pythonInterpreter,
userCodeClassLoader);
if (containPythonResource || mem0Configured) {
@@ -262,16 +253,7 @@ private void initPythonResourceAdapter(AgentPlan agentPlan, ResourceCache resour
throws Exception {
pythonResourceAdapter =
new PythonResourceAdapterImpl(
- new ResourceContextImpl(
- (String anotherName, ResourceType anotherType) -> {
- try {
- return resourceCache.getResource(anotherName, anotherType);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }),
- pythonInterpreter,
- javaResourceAdapter);
+ resourceCache.getResourceContext(), pythonInterpreter, javaResourceAdapter);
pythonResourceAdapter.open();
PythonMCPResourceDiscovery.discoverPythonMCPResources(
agentPlan.getResourceProviders(), pythonResourceAdapter, resourceCache);
diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/resource/ResourceContextImpl.java b/runtime/src/main/java/org/apache/flink/agents/runtime/resource/ResourceContextImpl.java
index b7dc0eeda..9b9dfd794 100644
--- a/runtime/src/main/java/org/apache/flink/agents/runtime/resource/ResourceContextImpl.java
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/resource/ResourceContextImpl.java
@@ -38,15 +38,28 @@
* skill methods lazily build a {@link SkillManager} from the {@code _skills_config} resource — if
* no such resource is registered they return safe defaults (empty string / empty list).
*/
-public class ResourceContextImpl implements ResourceContext {
+public class ResourceContextImpl implements ResourceContext, AutoCloseable {
private final BiFunction getResource;
+ private final ClassLoader classLoader;
- @Nullable private volatile SkillManager skillManager;
- @Nullable private volatile Skills cachedSkillsConfig;
+ @Nullable private SkillManager skillManager;
+ private boolean skillManagerInitialized;
- public ResourceContextImpl(BiFunction getResource) {
+ /**
+ * Construct a context that resolves {@code classpath:} skill sources via {@code classLoader}.
+ * Production code passes the Flink user-code class loader (threaded through {@code
+ * ResourceCache}); standalone use may use {@link #ResourceContextImpl(BiFunction)}.
+ */
+ public ResourceContextImpl(
+ BiFunction getResource, ClassLoader classLoader) {
this.getResource = getResource;
+ this.classLoader = classLoader;
+ }
+
+ /** Convenience overload that uses the current thread's context class loader. */
+ public ResourceContextImpl(BiFunction getResource) {
+ this(getResource, Thread.currentThread().getContextClassLoader());
}
@Override
@@ -83,6 +96,15 @@ public synchronized SkillManager getSkillManager() throws Exception {
@Nullable
private synchronized SkillManager ensureSkillManager() throws Exception {
+ if (!skillManagerInitialized) {
+ skillManagerInitialized = true;
+ skillManager = createSkillManager();
+ }
+ return skillManager;
+ }
+
+ @Nullable
+ private SkillManager createSkillManager() throws Exception {
Skills config;
try {
Resource r = getResource(Skills.SKILLS_CONFIG, ResourceType.SKILLS);
@@ -94,10 +116,23 @@ private synchronized SkillManager ensureSkillManager() throws Exception {
// No skills config registered — that's fine, return null.
return null;
}
- if (config != cachedSkillsConfig) {
- cachedSkillsConfig = config;
- skillManager = new SkillManager(config);
+ return new SkillManager(config, classLoader);
+ }
+
+ /**
+ * Close the lazily-cached {@link SkillManager}, releasing every materialized temp directory
+ * owned by its repositories. Idempotent. Called via {@code ResourceCache.close()} on operator
+ * close, including during Flink failover when the JVM stays up.
+ */
+ @Override
+ public synchronized void close() throws Exception {
+ if (skillManager != null) {
+ try {
+ skillManager.close();
+ } finally {
+ skillManager = null;
+ skillManagerInitialized = false;
+ }
}
- return skillManager;
}
}
diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/skill/AgentSkill.java b/runtime/src/main/java/org/apache/flink/agents/runtime/skill/AgentSkill.java
index 7b32ade79..822e9bc2a 100644
--- a/runtime/src/main/java/org/apache/flink/agents/runtime/skill/AgentSkill.java
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/skill/AgentSkill.java
@@ -44,6 +44,7 @@ public final class AgentSkill {
@Nullable private volatile Map resources;
@Nullable private Supplier