[Feature] Cross Language Actions#709
Conversation
…thonfunction(...)) and Java->Python dispatch Add symmetrical Java surface for declaring an action whose body runs in the other language, plus the wire-format alignment needed for events to round-trip across Java<->Python. API: - @action gains target() = @pythonfunction(...) so a Java action can declare a Python body. - New @pythonfunction annotation mirrors plan.PythonFunction (module + qualname). - Event/ChatMessage/MessageRole/ToolResponse/Document get the Jackson @JsonCreator/@JsonProperty plumbing needed for cross-language SerDe. Plan: - AgentPlan compile honors @action(target) and promotes it to a plan.PythonFunction. - ActionJsonDeserializer handles the new PythonFunction config shape. - plan.PythonFunction error messages tightened. Runtime: - JavaResourceAdapter.invokeJavaAction exposes Java action invocation to the Python side via Pemja. Co-authored-by: Cursor <cursoragent@cursor.com>
Coverage for the Java side of cross-language actions and event SerDe: - API: CrossLanguageEventSnapshotTest reads python/*.json fixtures and asserts Java can deserialize each built-in event. AgentAddActionTest expanded for @action(target=@pythonfunction(...)). - Plan: AgentPlanCrossLanguageTest covers compile-time promotion of @action(target) into plan.PythonFunction and AgentPlan JSON shape. PlanFunctionDispatchTest exercises plan-layer Function invocation. ActionJsonDeserializerTest covers the new PythonFunction config. - Runtime: CrossLanguageActionRuntimeTest covers Python-bodied action dispatch via ActionExecutionOperator. - E2E: JavaAgentWithPythonActionTest runs a Java agent whose action body is Python on a Flink mini-cluster. Fixtures committed under e2e-test/cross-language-{event,agent-plan}-snapshots/java/. Co-authored-by: Cursor <cursoragent@cursor.com>
…on(...)) Add symmetrical Python surface for declaring an action whose body runs in Java, plus the wire-format alignment needed for events to round-trip across Python<->Java. API: - @action gains target=JavaFunction(...) so a Python action can declare a Java body. - Event/ChatEvent/ContextRetrievalEvent/ToolEvent model fields aligned with Java's Jackson shape so model_dump_json matches Java JSON. Plan: - AgentPlan compile honors @action(target=JavaFunction(...)) and stores a plan.JavaFunction. - JavaFunction.__call__ distinguishes positional vs keyword args and check_signature verifies arity. Co-authored-by: Cursor <cursoragent@cursor.com>
Coverage for the Python side of cross-language actions and event SerDe: - API: test_cross_language_event_snapshots reads java/*.json fixtures and asserts Python can deserialize each built-in event. test_agent_add_action and test_decorators cover @action(target=...). - Plan: test_agent_plan_cross_language covers compile-time storage of @action(target=JavaFunction(...)) and AgentPlan JSON shape. test_action, test_agent_plan, test_function, test_resource_provider updated for the new fields. - Runtime: test_local_runner_cross_language covers local-runner fast-fail when given a JavaFunction body. - E2E: python_agent_with_java_action_test runs a Python agent whose action body is Java on a Flink mini-cluster, mirroring the Java->Python case. Fixtures committed under e2e-test/cross-language-{event,agent-plan}-snapshots/python/. Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
|
Hi @wzhero1, could you please review when you get a chance. Noticed that cross-language actions is planned to be part of |
| OutputEvent typedEvent = | ||
| (event instanceof OutputEvent) ? (OutputEvent) event : OutputEvent.fromEvent(event); | ||
| if (inputIsJava) { | ||
| return (OUT) typedEvent.getOutput(); |
There was a problem hiding this comment.
For the inputIsJava == true && event is unified Event case (e.g. a Python action emitting _output_event in a Java pipeline), this now calls OutputEvent.fromEvent(event).getOutput() and bypasses PythonActionExecutor entirely. The old branch routed everything that wasn't instanceof OutputEvent through Python — so this is a deliberate behavior change to a hot path, not just a refactor, and it affects every output event in every pipeline, not just cross-language ones.
Two asks:
- Could this be called out in the PR description so reviewers (and the 0.3 release notes) don't miss it?
- Is there a test under
runtime/that specifically pins the Java-pipeline + Python-action output path so a future Pemja change doesn't silently regress payload reconstruction?CrossLanguageActionRuntimeTestexercises dispatch — point me at the one covering the recovered output type/value if I missed it.
There was a problem hiding this comment.
Missed a few tests, added 3 new unit tests in EventRouterTest, JavaAgentWithPythonActionTest would be the Pemja regression guard and also added a call-outs section in the PR description.
| actionAnnotation.target(); | ||
|
|
||
| org.apache.flink.agents.plan.Function execFunction; | ||
| if (target.module().isEmpty()) { |
There was a problem hiding this comment.
The module().isEmpty() branch picks Java vs Python, but qualname() is read unconditionally on the Python path without a non-empty check. @Action(target = @PythonFunction(module = "pkg")) (no qualname) would build a PythonFunction("pkg", "") and fail much later inside the interpreter with an obscure error.
Would it make sense to validate here — e.g. both empty → Java; exactly one set → reject with a message naming the action; both set → Python? Catching it at extraction time would surface the typo at the layer where the user can actually correct it. (Same question on the Python side — does @action(target=...) reject a JavaFunction with an empty method_name or parameter_types?)
There was a problem hiding this comment.
Agreed. Validation in AgentPlan now checks both fields: both empty is Java, both set is Python, anything in between throws with the action name. Same check on the Python decorator for both PythonFunction and JavaFunction targets. Also added tests for the reject cases on both sides.
| JsonNode actual = MAPPER.readTree(actualJson); | ||
|
|
||
| Path committed = snapshotDir.resolve("java/" + fileName); | ||
| assumeTrue( |
There was a problem hiding this comment.
assumeTrue here means the test reports as "skipped, not failed" if chat_request_event.json (or any other committed snapshot) gets renamed or deleted. The whole point of the snapshot suite is to catch silent wire-format drift, so silently-skipped "stable" tests undercut that guarantee.
The regenerate* tests genuinely need conditional skipping (the system property is the gate). For *JavaSnapshotIsStable and javaCanDeserialize*FromPythonSnapshot, was there a reason for the symmetric assumeTrue — any case where a missing snapshot should soft-pass? If not, would a hard assertTrue(Files.exists(...)) be better? Same question for readPythonSnapshot:111.
| def _assert_python_snapshot_stable(name: str, event: Event) -> None: | ||
| actual = json.loads(event.model_dump_json()) | ||
| committed_path = _SNAPSHOT_DIR / "python" / name | ||
| if not committed_path.exists(): |
There was a problem hiding this comment.
Mirror of the same concern on the Java side: pytest.skip when the committed snapshot is missing turns a real wire-format regression into a silent pass-as-skipped. The REGENERATE_SNAPSHOTS=1 skips on the regenerate tests are legitimate (the env var is the gate).
For the stability / round-trip helpers at lines 64 and 76, was the symmetric pytest.skip intentional — any case where a missing snapshot should soft-pass? If not, would pytest.fail(...) or a plain assert committed_path.exists() be a tighter fit?
…n and Java Function Args
wzhero1
left a comment
There was a problem hiding this comment.
Thanks for pushing this through before the freeze — the api/plan descriptor split and the Java↔Python snapshot pairs are a solid foundation. Most of my notes are inline: a couple of the snapshot tests pin a cross-language gap as the expected contract (silent data loss / wire incompatibility on the core path), there's an event-identity question, and a few latent dispatch/registration traps.
| } | ||
|
|
||
| @Test | ||
| void pythonToolResponseEventLosesDataWhenConsumedByJava() throws Exception { |
There was a problem hiding this comment.
This test pins the data loss as the expected contract: ToolResponseEvent.fromEvent only handles ToolResponse/Map values (no String branch), so a Python action emitting responses={"call_id": "pong"} (string-valued, which Python does emit) yields an empty responses map on the Java side with no exception — the core Python-action → Java-action path. Because this asserts the loss, the suite stays green while the data is gone and a fix has to break this test first.
Could we add a String branch in fromEvent so string responses survive, or — if they're genuinely out of scope for 0.3 — rename this to make the gap explicit (e.g. ..._knownGap_NotYetSupported) and note it in the PR description rather than encoding silent loss as the expected behavior?
There was a problem hiding this comment.
Fixed. ToolResponseEvent.fromEvent now wraps non-ToolResponse/non-Map values via ToolResponse.success(v), so covers primitives string/number/bool.. round-trip. Renamed the test to infer that and extended the Python snapshot with numeric and boolean entries to pin all three.
Root cause: responses is Dict[UUID, Any] on Python but Map<String, ToolResponse> on Java, so raw scalars satisfied Python's schema with nowhere to land on Java. Tightening Python's type to Dict[UUID, ToolResponse] would be the cleaner long-term fix/follow-up.
One caveat: when Python encodes an error case as a string in responses (e.g. tool_call_action.py:48), Java now wraps it as success(error_string) because the Python wire shape doesn't distinguish success from error. The cleaner fix would be tightening Python's responses in a follow-up.
| } | ||
|
|
||
| @Test | ||
| void chatRequestOutputSchemaWireFormatIsJavaShaped() throws Exception { |
There was a problem hiding this comment.
This (and the Python mirror test_chat_request_output_schema_wire_format_is_python_shaped) pins a wire incompatibility rather than reconciling it: for a RowTypeInfo-typed output_schema, Java emits {"fieldNames": [...], "types": [<Class>]} while Python emits {"names": [...], "types": [<BasicType int>]} — both the keys and the type encodings differ, so a ChatRequestEvent carrying a non-null RowTypeInfo output_schema can't be deserialized on the other side. (The BaseModel module/class path does look symmetric; this is specifically the RowTypeInfo branch.)
Is structured RowTypeInfo output across the boundary in scope for 0.3? If yes the two sides need a shared key/format; if no, same ask — make it an explicit documented limitation instead of a green test asserting the mismatch.
There was a problem hiding this comment.
Renamed the test (and Python mirror) to ...RowTypeInfoOutputSchemaIsNotPortableAcrossLanguages_knownGap with Javadoc spelling out the divergence: Java's {"fieldNames", types:[<Class>]} vs Python's {"names", types:[<BasicTypeInfo ordinal>]}. The BaseModel branch is symmetric and works; only RowTypeInfo is affected.
|
|
||
| private static final ObjectMapper MAPPER = new ObjectMapper(); | ||
|
|
||
| private static final UUID FIXED_EVENT_ID = |
There was a problem hiding this comment.
Forcing a fixed id here masks a real cross-language divergence: Java assigns event ids with UUID.randomUUID() (Event.java) while Python derives a deterministic content-based id (event.py _generate_content_based_id). Because the two sides use different identity models, the committed snapshots only line up if the id is forced — so they don't reflect what either side naturally emits, and the same logical event gets a different id depending on its producer, which breaks id-based dedup/replay/correlation across languages.
Is cross-language event-id stability a requirement? If so the two sides should agree on one scheme (content-derived on both, most likely); if not, worth documenting that ids are language-local and must not be used for cross-boundary correlation.
There was a problem hiding this comment.
-
ID survives the language hop via
from_event. There's a fix in this PR.InputEvent.from_eventandOutputEvent.from_eventinevent.pynow doesresult.id = event.idinstead of lettingEvent.__setattr__regenerate a content-based ID.
TheassertEquals(FIXED_EVENT_ID, typed.getId(), ...)lines in the round-trip tests would fail without that fix, soFIXED_EVENT_IDisn't masking. It's just the fixture that makes the snapshot JSON byte-stable so the preservation assertion can exist. -
Independent Java and Python producers don't agree on the ID for logically equivalent events. Java uses
UUID.randomUUID()so two identical-payload events get different IDs anyway. Python uses content-derived UUIDv3 so two identical-payload events get the same ID, plus the ID regenerates on every__setattr__write (maybe a bug?). So cross-producer dedup/replay across the boundary doesn't work today. Aligning that is a real design call, out of scope for 0.3, unless you'd rather tackle it here.
| throws Exception { | ||
| Method method = resolveMethod(className, methodName, parameterTypes); | ||
| Object[] args = arguments == null ? new Object[0] : arguments.toArray(); | ||
| return method.invoke(null, args); |
There was a problem hiding this comment.
invoke(null, args) assumes the resolved target is static, but Class.getMethod(...) doesn't enforce that. A JavaFunction whose method_name points at an instance method resolves fine here and then throws NullPointerException from Method.invoke with a message that says nothing about the static requirement — hard to diagnose from the user side.
Could we check Modifier.isStatic(method.getModifiers()) at resolution time and throw a message naming the method (mirroring FunctionTool's static check), so the misconfiguration surfaces where the user can act on it?
| actions = [] | ||
| for name, value in agent.__class__.__dict__.items(): | ||
| if isinstance(value, staticmethod) and hasattr(value, "_listen_events"): | ||
| exec_ = ( |
There was a problem hiding this comment.
hasattr(value, "_listen_events") is checked on the staticmethod wrapper, which makes this order-sensitive. With @action outermost over @staticmethod (the order the examples use), the attribute lands on the staticmethod object and this branch hits. With the conventional @staticmethod outermost over @action, the attribute is on __func__, hasattr(wrapper, ...) is False (3.9), and the elif callable(value) below also misses (staticmethod isn't callable pre-3.10) — so the action is silently dropped with no error.
Unwrapping first (inner = value.__func__ if isinstance(value, staticmethod) else value, then check inner) would make both orders work. No example hits the broken order today, so it's preventive — but it's an easy silent-drop trap.
| Objects.requireNonNull(actionAnnotation).listenEventTypes(); | ||
|
|
||
| org.apache.flink.agents.plan.JavaFunction javaFunction = | ||
| if (!method.isAnnotationPresent(org.apache.flink.agents.api.annotation.Action.class)) { |
There was a problem hiding this comment.
getDeclaredMethods() returns only methods declared on the concrete class, so @Action methods inherited from a parent agent class aren't registered. Every agent today is a single-level extends Agent with @Action on the final class, so nothing triggers this — but factoring shared actions into an intermediate base class (a natural refactor) would make them silently stop registering.
Note getMethods() only half-fixes it (catches public @Action by convention but misses protected/package-private); walking the superclass chain calling getDeclaredMethods() is the visibility-insensitive fix. Low urgency — worth a guard or a documented constraint.
| "set_java_resource_adapter before invocation." | ||
| ) | ||
| raise RuntimeError(msg) | ||
| if args: |
There was a problem hiding this comment.
Routing on if args: (positional → invokeJavaAction, else → invokeJavaTool) leaves the "positional = action, keyword = tool" contract enforced only by the docstring. A mixed call func(event, x=1) would go to invokeJavaAction and silently drop kwargs; a no-arg call falls to the tool path. Both real call sites are clean today (the action path passes exactly (event, context) positionally, the tool path passes pure kwargs), so this isn't a live bug — but it's an unenforced convention that a future caller or refactor could misroute silently.
A cheap assertion (reject simultaneous args and kwargs, or dispatch on an explicit flag) would make the contract enforced rather than assumed.
…llout with tests, ToolResponseEvent datatypes from_event
|
Thanks for the update! LGTM. @xintongsong could you take a final look before merge? |
| + "' is not supported; declare on the concrete agent."); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Ideally, inherited actions should also be included. Given that this issue is not introduced by this PR, I'm okay with not fixing it in this PR. We'd better create an issue to track and fix this in future.
Linked issue: #622
Purpose of change
Add symmetrical cross-language action support so a Java agent can declare a Python-bodied action via
@Action(target=@PythonFunction(...))and a Python agent can declare a Java-bodied action via@action(target=JavaFunction(...)). Aligns built-in event SerDe (Event,ChatMessage,MessageRole,ToolResponse,Document) so events round-trip across the Java<->Python wire.Tests
AgentPlanCrossLanguageTest,test_agent_plan_cross_language).e2e-test/cross-language-event-snapshots/{java,python}/.CrossLanguageActionRuntimeTest,test_local_runner_cross_language).JavaAgentWithPythonActionTestandpython_agent_with_java_action_test.API
Yes:
@Action.target()field, new@PythonFunctionannotation.@action(target=...)kwarg.Call-outs
Output extraction dispatches on pipeline wire format:
EventRouter.getOutputFromOutputEventnow branches oninputIsJavainstead ofevent instanceof OutputEvent. Fixes aClassCastExceptionon Java pipelines whoseaction body is Python (the action's
_output_eventdeserialized as baseEvent,not typed
OutputEvent). Same-language pipelinesare unchanged. Pinned in
EventRouterTestby three new tests.RowTypeInfooutput_schemaknown gap (no behavior change):Java emits
{"fieldNames", types:[<Class>]}, Python emits{"names", types:[<BasicTypeInfo ordinal>]}, doesn't round-trip. Has snapshot testswith
*_knownGapwith docstrings.Documentation
doc-not-needed(Docs covered in a follow-up)