Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 1 addition & 35 deletions plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -295,41 +295,7 @@ private void extractJavaMCPServer(Method method) throws Exception {
descriptor.getModule(),
JAVA_MCP_SERVER_CLASS_NAME,
new HashMap<>(descriptor.getInitialArguments()));
JavaResourceProvider provider = new JavaResourceProvider(name, MCP_SERVER, descriptor);

addResourceProvider(provider);
Object mcpServer = provider.provide(null);

// Call listTools() via reflection
Method listToolsMethod = mcpServer.getClass().getMethod("listTools");
@SuppressWarnings("unchecked")
Iterable<? extends SerializableResource> tools =
(Iterable<? extends SerializableResource>) listToolsMethod.invoke(mcpServer);

for (SerializableResource tool : tools) {
Method getNameMethod = tool.getClass().getMethod("getName");
String toolName = (String) getNameMethod.invoke(tool);
addResourceProvider(
JavaSerializableResourceProvider.createResourceProvider(toolName, TOOL, tool));
}

// Call listPrompts() via reflection
Method listPromptsMethod = mcpServer.getClass().getMethod("listPrompts");
@SuppressWarnings("unchecked")
Iterable<? extends SerializableResource> prompts =
(Iterable<? extends SerializableResource>) listPromptsMethod.invoke(mcpServer);

for (SerializableResource prompt : prompts) {
Method getNameMethod = prompt.getClass().getMethod("getName");
String promptName = (String) getNameMethod.invoke(prompt);
addResourceProvider(
JavaSerializableResourceProvider.createResourceProvider(
promptName, PROMPT, prompt));
}

// Call close() via reflection
Method closeMethod = mcpServer.getClass().getMethod("close");
closeMethod.invoke(mcpServer);
addResourceProvider(new JavaResourceProvider(name, MCP_SERVER, descriptor));
}

private void extractResourceProvidersFromAgent(Agent agent) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@
import org.apache.flink.agents.api.agents.Agent;
import org.apache.flink.agents.api.annotation.Action;
import org.apache.flink.agents.api.context.RunnerContext;
import org.apache.flink.agents.api.prompt.Prompt;
import org.apache.flink.agents.api.resource.Resource;
import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceName;
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.api.tools.Tool;
import org.apache.flink.agents.api.tools.ToolMetadata;
import org.apache.flink.agents.integrations.mcp.MCPPrompt;
import org.apache.flink.agents.integrations.mcp.MCPServer;
import org.apache.flink.agents.integrations.mcp.MCPTool;
import org.apache.flink.agents.plan.resourceprovider.ResourceProvider;
import org.junit.jupiter.api.*;
Expand All @@ -48,14 +46,17 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

/**
* Tests for MCP server integration with AgentPlan.
*
* <p>This test verifies that MCP servers, tools, and prompts are properly discovered and registered
* in the agent plan, following the pattern from {@link AgentPlanDeclareToolMethodTest}.
* <p>Verifies that MCP servers are registered in the agent plan at compile time, while tool and
* prompt discovery is deferred to operator startup (runtime). Tool/prompt retrieval tests
* instantiate the MCPServer directly from the plan's provider to simulate what {@code
* JavaMCPResourceDiscovery} does at runtime.
*
* <p>Uses the Python MCP server from python/flink_agents/api/tests/mcp/mcp_server.py.
*/
Expand Down Expand Up @@ -185,16 +186,14 @@ void setup() throws Exception {
agentPlan = new AgentPlan(new TestMCPAgent());
}

/** Resolves a resource directly from its provider. */
private Resource resolveResource(String name, ResourceType type) throws Exception {
return agentPlan
.getResourceProviders()
.get(type)
.get(name)
.provide(
(n, t) -> {
throw new UnsupportedOperationException("No dependencies expected");
});
/**
* Returns an MCPServer instantiated from the plan's provider, simulating what
* JavaMCPResourceDiscovery does at operator startup.
*/
private MCPServer instantiateMCPServer() throws Exception {
ResourceProvider provider =
agentPlan.getResourceProviders().get(ResourceType.MCP_SERVER).get("testMcpServer");
return (MCPServer) provider.provide(null);
}

@AfterAll
Expand All @@ -211,7 +210,7 @@ static void afterAll() {

@Test
@DisabledOnJre(JRE.JAVA_11)
@DisplayName("Discover @MCPServer method and register MCP server")
@DisplayName("Discover @MCPServer method and register MCP server provider in plan")
void discoverMCPServer() {
Map<ResourceType, Map<String, ResourceProvider>> providers =
agentPlan.getResourceProviders();
Expand All @@ -222,116 +221,148 @@ void discoverMCPServer() {

@Test
@DisabledOnJre(JRE.JAVA_11)
@DisplayName("Discover and register tools from MCP server")
@DisplayName("Tools are NOT in AgentPlan providers — discovery is deferred to operator startup")
void discoverToolsFromMCPServer() {
Map<ResourceType, Map<String, ResourceProvider>> providers =
agentPlan.getResourceProviders();
assertTrue(providers.containsKey(ResourceType.TOOL));

Map<String, ?> toolProviders = providers.get(ResourceType.TOOL);
assertTrue(toolProviders.containsKey("add"), "add tool should be discovered");
assertEquals(1, toolProviders.size(), "Should have exactly 1 tool from Python server");
// Tools are discovered at runtime by JavaMCPResourceDiscovery, not during plan construction
assertNull(
providers.get(ResourceType.TOOL),
"TOOL providers should be absent from AgentPlan; discovery is deferred to runtime");
}

@Test
@DisabledOnJre(JRE.JAVA_11)
@DisplayName("Discover and register prompts from MCP server")
@DisplayName(
"Prompts are NOT in AgentPlan providers — discovery is deferred to operator startup")
void discoverPromptsFromMCPServer() {
Map<ResourceType, Map<String, ResourceProvider>> providers =
agentPlan.getResourceProviders();
assertTrue(providers.containsKey(ResourceType.PROMPT));

Map<String, ?> promptProviders = providers.get(ResourceType.PROMPT);
assertTrue(promptProviders.containsKey("ask_sum"), "ask_sum prompt should be discovered");
assertEquals(1, promptProviders.size(), "Should have exactly 1 prompt from Python server");
// Prompts are discovered at runtime by JavaMCPResourceDiscovery, not during plan
// construction
assertNull(
providers.get(ResourceType.PROMPT),
"PROMPT providers should be absent from AgentPlan; discovery is deferred to runtime");
}

@Test
@DisabledOnJre(JRE.JAVA_11)
@DisplayName("Retrieve MCP tool from AgentPlan - add tool")
@DisplayName("Retrieve MCP tool at runtime - add tool")
void retrieveMCPToolAdd() throws Exception {
Tool tool = (Tool) resolveResource("add", ResourceType.TOOL);
assertNotNull(tool);
assertInstanceOf(MCPTool.class, tool);

MCPTool mcpTool = (MCPTool) tool;
assertEquals("add", mcpTool.getName());
// Verify description starts with expected text
assertTrue(
mcpTool.getMetadata()
.getDescription()
.startsWith("Get the detailed information of a specified IP address."),
"Description should start with expected text");
// Verify input schema contains expected parameters
String schema = mcpTool.getMetadata().getInputSchema();
assertTrue(schema.contains("a"), "Schema should contain parameter 'a'");
assertTrue(schema.contains("b"), "Schema should contain parameter 'b'");
MCPServer server = instantiateMCPServer();
try {
MCPTool tool = null;
for (MCPTool t : server.listTools()) {
if ("add".equals(t.getName())) {
tool = t;
break;
}
}
assertNotNull(tool, "add tool should be discoverable from MCPServer");
assertInstanceOf(MCPTool.class, tool);
assertEquals("add", tool.getName());
assertTrue(
tool.getMetadata()
.getDescription()
.startsWith("Get the detailed information of a specified IP address."),
"Description should start with expected text");
String schema = tool.getMetadata().getInputSchema();
assertTrue(schema.contains("a"), "Schema should contain parameter 'a'");
assertTrue(schema.contains("b"), "Schema should contain parameter 'b'");
} finally {
server.close();
}
}

@Test
@DisabledOnJre(JRE.JAVA_11)
@DisplayName("Retrieve MCP prompt from AgentPlan - ask_sum")
@DisplayName("Retrieve MCP prompt at runtime - ask_sum")
void retrieveMCPPromptAskSum() throws Exception {
Prompt prompt = (Prompt) resolveResource("ask_sum", ResourceType.PROMPT);
assertNotNull(prompt);
assertInstanceOf(MCPPrompt.class, prompt);

MCPPrompt mcpPrompt = (MCPPrompt) prompt;
assertEquals("ask_sum", mcpPrompt.getName());
assertEquals("Prompt of add tool.", mcpPrompt.getDescription());
// ask_sum prompt should have 'a' and 'b' as arguments
Map<String, MCPPrompt.PromptArgument> args = mcpPrompt.getPromptArguments();
assertTrue(args.containsKey("a"), "Should have 'a' argument");
assertTrue(args.containsKey("b"), "Should have 'b' argument");
MCPServer server = instantiateMCPServer();
try {
MCPPrompt prompt = null;
for (MCPPrompt p : server.listPrompts()) {
if ("ask_sum".equals(p.getName())) {
prompt = p;
break;
}
}
assertNotNull(prompt, "ask_sum prompt should be discoverable from MCPServer");
assertInstanceOf(MCPPrompt.class, prompt);
assertEquals("ask_sum", prompt.getName());
assertEquals("Prompt of add tool.", prompt.getDescription());
Map<String, MCPPrompt.PromptArgument> args = prompt.getPromptArguments();
assertTrue(args.containsKey("a"), "Should have 'a' argument");
assertTrue(args.containsKey("b"), "Should have 'b' argument");
} finally {
server.close();
}
}

@Test
@DisabledOnJre(JRE.JAVA_11)
@DisplayName("AgentPlan JSON serialization with MCP resources")
@DisplayName(
"AgentPlan JSON serialization contains MCPServer descriptor, not tool/prompt entries")
void testAgentPlanJsonSerializableWithMCP() throws Exception {
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(agentPlan);

// Verify JSON contains MCP resources
assertTrue(json.contains("add"), "JSON should contain add tool");
assertTrue(json.contains("ask_sum"), "JSON should contain ask_sum prompt");
// Serialized plan contains the MCPServer configuration
assertTrue(json.contains("mcp_server"), "JSON should contain mcp_server type");
assertTrue(json.contains("testMcpServer"), "JSON should contain the server provider name");
assertTrue(json.contains(MCP_ENDPOINT), "JSON should contain the endpoint");

// Tools and prompts are NOT serialized into the plan (they are runtime-discovered)
assertFalse(
json.contains("\"add\"") && json.contains("java_serializable"),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertFalse(
    json.contains("\"add\"") && json.contains("java_serializable"),
    "JSON should not contain a serialized 'add' tool provider");

This passes when either substring is missing, so a regression that drops "add" from the JSON for an unrelated reason would silently make this test green — the very class of bug the test is meant to catch. Since the design contract you're locking down is "no java_serializable providers for MCP-discovered resources at all," one alternative in case it helps:

assertFalse(json.contains("java_serializable"),
        "JSON should not contain any java_serializable provider entries (MCP discovery is deferred)");

"JSON should not contain a serialized 'add' tool provider");

// Verify serialization works without errors
// Verify serialization/deserialization roundtrip works without errors
assertNotNull(json);
assertFalse(json.isEmpty());
}

@Test
@DisabledOnJre(JRE.JAVA_11)
@DisplayName("Test MCP server is closed after discovery")
void testMCPServerClosedAfterDiscovery() throws Exception {
// The MCPServer.close() should be called after listTools() and listPrompts()
// We verify this indirectly by checking that the plan was created successfully
assertNotNull(agentPlan);
assertTrue(agentPlan.getResourceProviders().containsKey(ResourceType.MCP_SERVER));
assertTrue(agentPlan.getResourceProviders().containsKey(ResourceType.TOOL));
assertTrue(agentPlan.getResourceProviders().containsKey(ResourceType.PROMPT));
@DisplayName("AgentPlan construction does not make network calls to MCP server")
void testNoNetworkCallsDuringPlanBuild() {
Map<ResourceType, Map<String, ResourceProvider>> providers =
agentPlan.getResourceProviders();
assertNull(providers.get(ResourceType.TOOL), "No TOOL providers expected in plan");
assertNull(providers.get(ResourceType.PROMPT), "No PROMPT providers expected in plan");
assertTrue(
providers.containsKey(ResourceType.MCP_SERVER),
"MCP_SERVER provider must still be in plan for runtime discovery");
}

@Test
@DisabledOnJre(JRE.JAVA_11)
@DisplayName("Test metadata from MCP tool - add")
void testMCPToolMetadata() throws Exception {
Tool tool = (Tool) resolveResource("add", ResourceType.TOOL);
ToolMetadata metadata = tool.getMetadata();

assertEquals("add", metadata.getName());
// Verify description starts with expected text (full docstring includes Args/Returns)
assertTrue(
metadata.getDescription()
.startsWith("Get the detailed information of a specified IP address."),
"Description should start with expected text");
assertNotNull(metadata.getInputSchema());

String schema = metadata.getInputSchema();
// Verify the tool has expected parameters
assertTrue(schema.contains("a"), "Schema should contain 'a' parameter");
assertTrue(schema.contains("b"), "Schema should contain 'b' parameter");
MCPServer server = instantiateMCPServer();
try {
MCPTool tool = null;
for (MCPTool t : server.listTools()) {
if ("add".equals(t.getName())) {
tool = t;
break;
}
}
assertNotNull(tool);
ToolMetadata metadata = tool.getMetadata();

assertEquals("add", metadata.getName());
assertTrue(
metadata.getDescription()
.startsWith("Get the detailed information of a specified IP address."),
"Description should start with expected text");
assertNotNull(metadata.getInputSchema());

String schema = metadata.getInputSchema();
assertTrue(schema.contains("a"), "Schema should contain 'a' parameter");
assertTrue(schema.contains("b"), "Schema should contain 'b' parameter");
} finally {
server.close();
}
}
}
Loading
Loading