diff --git a/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java b/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java index 03eb8248c..1de29d501 100644 --- a/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java +++ b/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java @@ -18,6 +18,7 @@ package org.apache.flink.agents.api.resource.python; +import org.apache.flink.agents.api.EventContext; import org.apache.flink.agents.api.chat.messages.ChatMessage; import org.apache.flink.agents.api.tools.Tool; import org.apache.flink.agents.api.vectorstores.Document; @@ -160,4 +161,20 @@ public interface PythonResourceAdapter { * @return the raw return value from the Python callable */ Object invokePythonTool(String module, String qualName, Map kwargs); + + /** + * Converts a Java {@link EventContext} object to its Python equivalent. + * + * @param context the Java event context to convert + * @return the Python representation of the event context + */ + Object toPythonEventContext(EventContext context); + + /** + * Initializes a Python event listener instance from the specified descriptor. + * + * @param target the listener descriptor in "module:class" format + * @return the initialized Python listener object + */ + Object initPythonEventListener(String target); } diff --git a/docs/content/docs/development/event_listener.md b/docs/content/docs/development/event_listener.md new file mode 100644 index 000000000..b86b1ef39 --- /dev/null +++ b/docs/content/docs/development/event_listener.md @@ -0,0 +1,113 @@ +--- +title: Event Listener +weight: 8 +type: docs +--- + + +## Overview + +`EventListener` is a callback mechanism that allows you to monitor and react to events as they are processed by the agent. It is triggered at the beginning of event processing, before any actions are executed. + +Common use cases include: +- **Monitoring & Metrics**: Tracking event throughput, latency, or specific event types. +- **Logging**: Capturing event details for auditing or debugging. +- **Side Effects**: Triggering external notifications or system updates based on event reception. + +## Implementation + +You can implement `EventListener` in either Java or Python. + +{{< tabs "EventListener Implementation" >}} + +{{< tab "Python" >}} +In Python, inherit from the `EventListener` class and implement the `on_event_processed` method. + +```python +from flink_agents.api.listener.event_listener import EventListener +from flink_agents.api.event_context import EventContext +from flink_agents.api.events.event import Event + +class MyPythonListener(EventListener): + def on_event_processed(self, context: EventContext, event: Event) -> None: + print(f"Received event: {event.get_type()} at {context.timestamp}") +``` +{{< /tab >}} + +{{< tab "Java" >}} +In Java, implement the `EventListener` interface. + +```java +import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.EventContext; +import org.apache.flink.agents.api.listener.EventListener; + +public class MyJavaListener implements EventListener { + @Override + public void onEventProcessed(EventContext context, Event event) { + System.out.println("Received event: " + event.getType() + + " at " + context.getTimestamp()); + } +} +``` +{{< /tab >}} + +{{< /tabs >}} + +## Configuration + +Register your listeners in the `AgentPlan` configuration using the `event-listeners` option. + +{{< tabs "Configuration" >}} + +{{< tab "Python" >}} + +```python +agents_env = AgentsExecutionEnvironment.get_execution_environment(env) + +# Register listeners using str(ClassName) +agents_env.get_config().set( + AgentConfigOptions.EVENT_LISTENERS, + [str(MyPythonListener)] +) +``` +{{< /tab >}} + +{{< tab "Java" >}} + +```java +AgentsExecutionEnvironment agentsEnv = AgentsExecutionEnvironment.getExecutionEnvironment(env); + +// Register listeners using fully qualified class names +agentsEnv.getConfig().set( + AgentConfigOptions.EVENT_LISTENERS, + Collections.singletonList(MyJavaListener.class.getName()) +); +``` +{{< /tab >}} + +{{< /tabs >}} + +## Best Practices & Limitations + +- **Performance**: Listeners are executed **synchronously**. Keep them lightweight and avoid long-running or blocking operations. +- **No-Arg Constructor**: Implementing classes must have a public no-argument constructor for dynamic instantiation. +- **Error Handling**: Implementations must handle their own error recovery. Any unhandled exceptions thrown by a listener will disrupt the main event processing flow and may cause the agent to fail. +- **Cross-Language Support**: If you are using the Java runtime, you can still register Python listeners. The framework will handle the Java-to-Python conversion of `Event` and `EventContext` objects. To optimize performance, this conversion happens only once per event notification, even if multiple Python listeners are registered. diff --git a/docs/content/docs/operations/configuration.md b/docs/content/docs/operations/configuration.md index ac9fab55e..66502ecec 100644 --- a/docs/content/docs/operations/configuration.md +++ b/docs/content/docs/operations/configuration.md @@ -132,7 +132,7 @@ Here is the list of all built-in core configuration options. | `eventLoggerType` | `SLF4J` | LoggerType | Which built-in event logger to use. Valid values: `SLF4J` (writes JSON through a dedicated SLF4J logger so events show up in Flink's Web UI **Logs** tab) and `FILE` (writes per-subtask `.log` files under `baseLogDir`). Setting `baseLogDir` overrides this and forces `FILE`. | | `baseLogDir` | (none) | String | Base directory for file-based event logs. If not set, uses `java.io.tmpdir/flink-agents`. Setting this value also implicitly switches `eventLoggerType` to `file`. | | `prettyPrint` | false | boolean | Whether to enable pretty-printed JSON format for event logs. When set to `true`, each event is written as formatted multi-line JSON instead of JSONL (JSON Lines) format. {{< hint info >}}Note: enabling this option makes the log file no longer valid JSONL format. {{< /hint >}} | -| `event-listeners` | none | `List` | The list of event listener class names. Each class must implement the EventListener interface and provide a public no-argument constructor. {{< hint warning >}} Note: Currently, custom event listeners are only supported in Java. {{< /hint >}} | +| `event-listeners` | none | `List` | The list of event listener class names. Each class must implement the EventListener interface and provide a public no-argument constructor. | | `error-handling-strategy` | ErrorHandlingStrategy.FAIL | ErrorHandlingStrategy | Strategy for handling errors during model requests, include timeout and unexpected output schema.
The option value could be:
  • `ErrorHandlingStrategy.FAIL`
  • `ErrorHandlingStrategy.RETRY`
  • `ErrorHandlingStrategy.IGNORE`
  • | | `max-retries` | 3 | int | Number of retries when using `ErrorHandlingStrategy.RETRY`. | | `retry-wait-interval` | 1 | int | Base wait interval in seconds between retries when using `ErrorHandlingStrategy.RETRY`. Uses exponential backoff: the actual wait time for the Nth retry is `retry-wait-interval * 2^(N-1)` seconds. For example, with default 1s, waits are 1s, 2s, 4s, etc. Retry count and total wait time are reported in `ChatResponseEvent` and recorded as metrics (`retryCount`, `retryWaitSec`) under the connection name. | diff --git a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java index 605500ca1..6102f6928 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java @@ -168,6 +168,27 @@ public Map getConfigData() { return config.getConfData(); } + public boolean containsPythonAction() { + return this.actions.values().stream() + .anyMatch(action -> action.getExec() instanceof PythonFunction); + } + + public boolean containsJavaAction() { + return this.actions.values().stream() + .anyMatch(action -> action.getExec() instanceof JavaFunction); + } + + public boolean containsPythonResource() { + return this.resourceProviders.values().stream() + .anyMatch( + resourceProviderMap -> + resourceProviderMap.values().stream() + .anyMatch( + resourceProvider -> + resourceProvider + instanceof PythonResourceProvider)); + } + private void writeObject(ObjectOutputStream out) throws IOException { String serializedStr = new ObjectMapper().writeValueAsString(this); out.writeUTF(serializedStr); diff --git a/python/flink_agents/api/core_options.py b/python/flink_agents/api/core_options.py index 5b575c3f6..dcd21ab6b 100644 --- a/python/flink_agents/api/core_options.py +++ b/python/flink_agents/api/core_options.py @@ -134,6 +134,12 @@ class AgentConfigOptions(metaclass=AgentConfigOptionsMeta): default=5, ) + EVENT_LISTENERS = ConfigOption( + key="event-listeners", + config_type=list[str], + default=None + ) + class AgentExecutionOptions: """Execution options for Flink Agents.""" diff --git a/python/flink_agents/api/event_context.py b/python/flink_agents/api/event_context.py new file mode 100644 index 000000000..3602d33a3 --- /dev/null +++ b/python/flink_agents/api/event_context.py @@ -0,0 +1,35 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from pydantic import BaseModel + + +class EventContext(BaseModel): + """Contextual information about an event, such as its type and timestamp. + + Attributes: + ---------- + eventType : str + The routing key for the event, matching the ``EVENT_TYPE`` constant or + type string. + timestamp : str + Timestamp of when the event occurred. + """ + + eventType: str + + timestamp: str diff --git a/python/flink_agents/api/listener/__init__.py b/python/flink_agents/api/listener/__init__.py new file mode 100644 index 000000000..e154fadd3 --- /dev/null +++ b/python/flink_agents/api/listener/__init__.py @@ -0,0 +1,17 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# diff --git a/python/flink_agents/api/listener/event_listener.py b/python/flink_agents/api/listener/event_listener.py new file mode 100644 index 000000000..4404b960f --- /dev/null +++ b/python/flink_agents/api/listener/event_listener.py @@ -0,0 +1,121 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import inspect +from abc import ABC, ABCMeta, abstractmethod + +from flink_agents.api.event_context import EventContext +from flink_agents.api.events.event import Event + + +class EventListenerMeta(ABCMeta): + """Metaclass for EventListener that provides a specialized string representation. + + This metaclass overrides the ``__str__`` method for classes that implement + ``EventListener``. The resulting string format is + ``module:class_path.on_event_processed``, which is specifically designed to be + parsed by the agent's runtime (e.g., in Java via Pemja) to dynamically + instantiate the listener class. + + The string representation handles: + + - Standard module-level classes. + - Nested classes (using their full qualified name). + - Classes defined in the ``__main__`` module (attempting to resolve the actual + filename if available). + - Validation to ensure classes are not defined in a local scope (which would + make them inaccessible for remote/dynamic instantiation). + """ + def __str__(cls) -> str: + """Return a string representation of the listener class for dynamic + instantiation. + + The format is ``module:class_path.on_event_processed``. + Example: ``my_module:MyListener.on_event_processed`` or + ``my_module:Outer.Inner.on_event_processed``. + + Returns: + ------- + str + A string identifier for the class and its handler method. + + Raises: + ------ + ValueError + If the class is defined within a local scope. + """ + class_qualname = cls.__qualname__ + + if "" in class_qualname: + err_msg = ( + f"Cannot instantiate local class in '{class_qualname}'. " + f"Classes defined within a local scope (indicated by '') " + f"are not accessible via module attributes. Move the class to the module level." + ) + raise ValueError(err_msg) + + module_obj = inspect.getmodule(cls) + if module_obj is None: + module_name = cls.__module__ + else: + module_name = module_obj.__name__ + + if module_name == "__main__": + if hasattr(module_obj, "__file__"): + from pathlib import Path + file_path = Path(module_obj.__file__) + module_name = file_path.stem + + return f"{module_name}:{class_qualname}.on_event_processed" + + +class EventListener(ABC, metaclass=EventListenerMeta): + """Interface for event listeners that are notified when events are received + for processing. + + EventListener provides a callback mechanism triggered at the beginning of + event processing. This is useful for monitoring, metrics collection, + debugging, or triggering side effects based on event reception. + + Event listeners are executed synchronously when an event is received, + before any actions are triggered. Implementations should be lightweight + and avoid blocking operations to prevent impacting agent performance. + + **Note:** Implementing classes must provide a public no-argument constructor to + allow for dynamic instantiation by the agent. + """ + + @abstractmethod + def on_event_processed(self, context: EventContext, event: Event) -> None: + """Called when an event is being processed. + + This method is invoked when an event is received by the agent, before + it is processed by any actions. The listener can inspect the event and + its context to perform additional processing such as logging, metrics + collection, or triggering external notifications. + + **Important:** This method should not throw exceptions as they will be + caught and logged but will not affect the main event processing flow. + Implementations should handle their own error recovery. + + Parameters: + ---------- + context : EventContext + The context associated with the event + event : Event + The event that is being processed + """ diff --git a/python/flink_agents/api/tests/test_event_listener.py b/python/flink_agents/api/tests/test_event_listener.py new file mode 100644 index 000000000..831c5a8c6 --- /dev/null +++ b/python/flink_agents/api/tests/test_event_listener.py @@ -0,0 +1,101 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import unittest + +from flink_agents.api.event_context import EventContext +from flink_agents.api.events.event import Event +from flink_agents.api.listener.event_listener import EventListener + + +class GlobalListener(EventListener): + def on_event_processed(self, context: EventContext, event: Event) -> None: + pass + + +class MainListenerMock(EventListener): + def on_event_processed(self, context: EventContext, event: Event) -> None: + pass + + +class TestEventListener(unittest.TestCase): + def test_global_listener_str(self): + """Test the string representation of a global EventListener class.""" + module_name = GlobalListener.__module__ + expected = f"{module_name}:GlobalListener.on_event_processed" + assert str(GlobalListener) == expected + + def test_top_level_nested_listener_str(self): + """Test the string representation of a nested EventListener class defined at module level.""" + global TopOuter + + class TopOuter: + class TopInner(EventListener): + def on_event_processed(self, context: EventContext, event: Event) -> None: + pass + + module_name = TopOuter.TopInner.__module__ + expected = f"{module_name}:TopOuter.TopInner.on_event_processed" + assert str(TopOuter.TopInner) == expected + + def test_local_listener_raises_error(self): + """Test that defining an EventListener in a local scope raises a ValueError.""" + import pytest + + def some_function() -> type: + class LocalListener(EventListener): + def on_event_processed(self, context: EventContext, event: Event) -> None: + pass + + return LocalListener + + local_listener_cls = some_function() + with pytest.raises(ValueError, match="Cannot instantiate local class"): + str(local_listener_cls) + + def test_main_module_with_file_handling(self): + """Test string representation when the module is '__main__' and has a '__file__' attribute.""" + from unittest.mock import MagicMock, patch + + mock_module = MagicMock() + mock_module.__name__ = "__main__" + mock_module.__file__ = "/path/to/my_script.py" + + with patch("inspect.getmodule", return_value=mock_module): + assert str(MainListenerMock) == "my_script:MainListenerMock.on_event_processed" + + def test_main_module_without_file_handling(self): + """Test string representation when the module is '__main__' but lacks a '__file__' attribute.""" + from unittest.mock import MagicMock, patch + + mock_module = MagicMock() + mock_module.__name__ = "__main__" + # __file__ attribute is missing + + with patch("inspect.getmodule", return_value=mock_module): + # Should fallback to "__main__" if __file__ is missing + assert str(MainListenerMock) == "__main__:MainListenerMock.on_event_processed" + + def test_inspect_getmodule_none_fallback(self): + """Test fallback to '__module__' when 'inspect.getmodule' returns None.""" + from unittest.mock import patch + + with patch("inspect.getmodule", return_value=None): + # Should fallback to cls.__module__ + module_name = MainListenerMock.__module__ + expected = f"{module_name}:MainListenerMock.on_event_processed" + assert str(MainListenerMock) == expected diff --git a/python/flink_agents/runtime/local_runner.py b/python/flink_agents/runtime/local_runner.py index 078fd7f1d..21642a42b 100644 --- a/python/flink_agents/runtime/local_runner.py +++ b/python/flink_agents/runtime/local_runner.py @@ -16,6 +16,7 @@ # limitations under the License. ################################################################################# import asyncio +import datetime import logging import uuid from collections import deque @@ -25,7 +26,10 @@ from typing_extensions import override from flink_agents.api.agents.agent import Agent +from flink_agents.api.core_options import AgentConfigOptions +from flink_agents.api.event_context import EventContext from flink_agents.api.events.event import Event, InputEvent, OutputEvent +from flink_agents.api.listener.event_listener import EventListener from flink_agents.api.memory.long_term_memory import BaseLongTermMemory from flink_agents.api.memory_object import MemoryObject, MemoryType from flink_agents.api.metric_group import MetricGroup @@ -281,6 +285,7 @@ class LocalRunner(AgentRunner): __keyed_contexts: Dict[Any, LocalRunnerContext] __outputs: List[Dict[str, Any]] __config: AgentConfiguration + __listeners: List[EventListener] def __init__(self, agent: Agent, config: AgentConfiguration) -> None: """Initialize the runner with the provided agent. @@ -291,11 +296,19 @@ def __init__(self, agent: Agent, config: AgentConfiguration) -> None: The agent class to convert and run. """ from flink_agents.plan.agent_plan import AgentPlan + from flink_agents.runtime.python_java_utils import ( + instantiate_python_event_listener, + ) self.__agent_plan = AgentPlan.from_agent(agent, config) self.__keyed_contexts = {} self.__outputs = [] self.__config = config + self.__listeners = [] + listener_paths = config.get(AgentConfigOptions.EVENT_LISTENERS) + if listener_paths: + for path in listener_paths: + self.__listeners.append(instantiate_python_event_listener(path)) @override def run(self, **data: Dict[str, Any]) -> Any: @@ -337,6 +350,16 @@ def run(self, **data: Dict[str, Any]) -> Any: while len(context.events) > 0: event = context.events.popleft() + + # Trigger listeners before processing the event + if self.__listeners: + event_context = EventContext( + eventType=event.get_type(), + timestamp=datetime.datetime.now().isoformat(), + ) + for listener in self.__listeners: + self._trigger_listener(listener, event_context, event) + if isinstance(event, OutputEvent): self.__outputs.append({key: event.output}) continue @@ -358,6 +381,15 @@ def run(self, **data: Dict[str, Any]) -> Any: raise return key + def _trigger_listener( + self, listener: EventListener, event_context: EventContext, event: Event + ) -> None: + """Trigger a single listener with error handling.""" + try: + listener.on_event_processed(event_context, event) + except Exception: + logger.exception("Error in EventListener execution") + def get_outputs(self) -> List[Dict[str, Any]]: """Get the outputs generated by agent execution. diff --git a/python/flink_agents/runtime/python_java_utils.py b/python/flink_agents/runtime/python_java_utils.py index 23ed4f5c1..2176df33d 100644 --- a/python/flink_agents/runtime/python_java_utils.py +++ b/python/flink_agents/runtime/python_java_utils.py @@ -22,6 +22,7 @@ import cloudpickle from flink_agents.api.chat_message import ChatMessage, MessageRole +from flink_agents.api.event_context import EventContext from flink_agents.api.events.event import Event, InputEvent, OutputEvent from flink_agents.api.memory.long_term_memory import MemorySet, MemorySetItem from flink_agents.api.resource import Resource, ResourceType, get_resource_class @@ -76,7 +77,7 @@ def get_output_from_output_event(event_json: str) -> Any: def create_resource( - resource_module: str, resource_clazz: str, func_kwargs: Dict[str, Any] + resource_module: str, resource_clazz: str, func_kwargs: Dict[str, Any] ) -> Resource: """Dynamically create a resource instance from module and class name. @@ -321,6 +322,73 @@ def get_java_tool_metadata_from_tool(tool: Tool) -> typing.Dict[str, str]: } +def from_java_event_context(eventType: str, timestamp: str) -> EventContext: + """Create an ``EventContext`` from Java ``EventContext``'s property.""" + return EventContext(eventType=eventType, timestamp=timestamp) + + +def _resolve_module(module_name: str, class_qualname: str) -> Any: + """Resolve the module for the given module name and class qualname. + + This helper handles special cases like '__main__' and falls back to + searching in sys.modules if standard import fails. + """ + import importlib + import sys + + if module_name == "__main__": + module = sys.modules["__main__"] + first_part = class_qualname.split(".")[0] + if not hasattr(module, first_part): + for m in list(sys.modules.values()): + if (m and hasattr(m, "__name__") and m.__name__ != "__main__" + and hasattr(m, first_part)): + return m + return importlib.import_module(module_name) + return module + + try: + return importlib.import_module(module_name) + except ImportError: + if module_name in sys.modules: + return sys.modules[module_name] + + main_module = sys.modules.get("__main__") + first_part = class_qualname.split(".")[0] + if main_module and hasattr(main_module, first_part): + return main_module + raise + + +def instantiate_python_event_listener(target_string: str) -> Any: + """Instantiate a Python event listener from a target string. + + The target string should be in the format ``module:class_path.method_name``. + The method name (typically ``on_event_processed``) is stripped to find the class, + which is then instantiated using its no-argument constructor. + + This method handles various module resolution scenarios, including: + - Standard modules available in ``sys.path`` + - The ``__main__`` module and its imported sub-modules + - Nested classes within a module + """ + if ":" not in target_string: + err_msg = f"Invalid format: '{target_string}'. Expected 'module:path'." + raise ValueError(err_msg) + + module_name, path_str = target_string.split(":", 1) + + # Strip the method name (last part after the last dot) to get the class qualname + class_qualname = path_str.rsplit(".", 1)[0] if "." in path_str else path_str + + module = _resolve_module(module_name, class_qualname) + + target_class = module + for part in class_qualname.split("."): + target_class = getattr(target_class, part) + + return target_class() + def get_long_term_memory(ctx: Any) -> Any: """Return ``ctx.long_term_memory`` (or ``None``). Used by the Java side to avoid relying on Pemja's ``PyObject.getAttr`` semantics for attributes that diff --git a/python/flink_agents/runtime/tests/test_event_listener_runtime.py b/python/flink_agents/runtime/tests/test_event_listener_runtime.py new file mode 100644 index 000000000..e5ce0a36a --- /dev/null +++ b/python/flink_agents/runtime/tests/test_event_listener_runtime.py @@ -0,0 +1,105 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import sys +import unittest + +from flink_agents.api.agents.agent import Agent +from flink_agents.api.decorators import action +from flink_agents.api.event_context import EventContext +from flink_agents.api.events.event import Event, InputEvent +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.api.listener.event_listener import EventListener +from flink_agents.api.runner_context import RunnerContext +from flink_agents.runtime.python_java_utils import instantiate_python_event_listener + + +class MockListener(EventListener): + def __init__(self) -> None: + self.called_events = [] + + def on_event_processed(self, context: EventContext, event: Event) -> None: + self.called_events.append(event) + + +class SimpleAgent(Agent): + @action(InputEvent.EVENT_TYPE) + @staticmethod + def process(event: Event, ctx: RunnerContext) -> None: + pass + + +class TestEventListenerRuntime(unittest.TestCase): + def test_instantiate_main_module(self): + """Test instantiation of a listener from the __main__ module.""" + # Add MockListener to sys.modules['__main__'] for testing purposes + main_mod = sys.modules['__main__'] + main_mod.MockListener = MockListener + + target = "__main__:MockListener.on_event_processed" + listener = instantiate_python_event_listener(target) + assert isinstance(listener, EventListener) + assert listener.__class__.__name__ == "MockListener" + + def test_instantiate_invalid_format(self): + """Test that invalid format raises ValueError.""" + import pytest + with pytest.raises(ValueError, match="Invalid format"): + instantiate_python_event_listener("InvalidFormat") + + def test_instantiate_module_not_found(self): + """Test that non-existent module raises ImportError.""" + import pytest + with pytest.raises(ImportError): + instantiate_python_event_listener("non_existent_module:MyListener.on_event_processed") + + def test_instantiate_class_not_found(self): + """Test that non-existent class raises AttributeError.""" + import pytest + with pytest.raises(AttributeError): + instantiate_python_event_listener("flink_agents.api.listener.event_listener:NonExistent.on_event_processed") + + def test_listener_integration_with_local_runner(self): + """Test listener integration with LocalRunner. + Note: Currently LocalRunner might not support event listeners directly + as they are often handled on the Java side in a real Flink job. + But we can test if the configuration is correctly passed. + """ + from flink_agents.api.core_options import AgentConfigOptions + + env = AgentsExecutionEnvironment.get_execution_environment() + config = env.get_config() + + # Define a listener in this module + global MyTestListener + + class MyTestListener(EventListener): + call_count = 0 + + def on_event_processed(self, context: EventContext, event: Event) -> None: + MyTestListener.call_count += 1 + + listener_str = f"{__name__}:MyTestListener.on_event_processed" + config.set(AgentConfigOptions.EVENT_LISTENERS, [listener_str]) + + input_list = [{"key": "k1", "value": "v1"}] + agent = SimpleAgent() + env.from_list(input_list).apply(agent) + env.execute() + + # Now LocalRunner supports listeners, we can assert call_count + assert MyTestListener.call_count > 0 diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java index a0a3813da..d4f54ebd9 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java @@ -21,6 +21,7 @@ import org.apache.flink.agents.api.OutputEvent; import org.apache.flink.agents.api.agents.AgentExecutionOptions; import org.apache.flink.agents.api.context.MemoryUpdate; +import org.apache.flink.agents.api.listener.EventListener; import org.apache.flink.agents.plan.AgentPlan; import org.apache.flink.agents.plan.JavaFunction; import org.apache.flink.agents.plan.PythonFunction; @@ -58,6 +59,7 @@ import java.util.List; import java.util.Optional; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.EVENT_LISTENERS; import static org.apache.flink.agents.api.configuration.AgentConfigOptions.JOB_IDENTIFIER; import static org.apache.flink.util.Preconditions.checkState; @@ -192,8 +194,17 @@ public void open() throws Exception { // Initialize the event logger if it is set. eventRouter.initEventLogger(getRuntimeContext()); - // Initialize user event listeners from configuration - eventRouter.initEventListeners(getRuntimeContext()); + if (pythonBridge.isInitialized()) { + final EventListener pythonEventListenerWrapper = + this.pythonBridge.initForPythonEventListeners( + agentPlan.getConfig().get(EVENT_LISTENERS)); + if (pythonEventListenerWrapper != null) { + this.eventRouter.addEventListener(pythonEventListenerWrapper); + } + } else { + // Initialize user event listeners from configuration + eventRouter.initEventListeners(getRuntimeContext()); + } // Since an operator restart may change the key range it manages due to changes in // parallelism, diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/PythonBridgeManager.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/PythonBridgeManager.java index cea9cce54..6e21ffab7 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/PythonBridgeManager.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/PythonBridgeManager.java @@ -17,9 +17,11 @@ */ package org.apache.flink.agents.runtime.operator; +import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.EventContext; +import org.apache.flink.agents.api.listener.EventListener; import org.apache.flink.agents.api.memory.LongTermMemoryOptions; import org.apache.flink.agents.plan.AgentPlan; -import org.apache.flink.agents.plan.JavaFunction; import org.apache.flink.agents.plan.PythonFunction; import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider; import org.apache.flink.agents.runtime.PythonMCPResourceDiscovery; @@ -42,7 +44,10 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; +import java.util.Map; import static org.apache.flink.agents.plan.actions.Utils.requiredVersions; import static org.apache.flink.agents.plan.actions.Utils.supportAsync; @@ -125,23 +130,9 @@ void open( String jobIdentifier, ClassLoader userCodeClassLoader) throws Exception { - boolean containPythonAction = - agentPlan.getActions().values().stream() - .anyMatch(action -> action.getExec() instanceof PythonFunction); - - boolean containPythonResource = - agentPlan.getResourceProviders().values().stream() - .anyMatch( - resourceProviderMap -> - resourceProviderMap.values().stream() - .anyMatch( - resourceProvider -> - resourceProvider - instanceof - PythonResourceProvider)); - + boolean containPythonAction = agentPlan.containsPythonAction(); + boolean containPythonResource = agentPlan.containsPythonResource(); boolean mem0Configured = isMem0Configured(agentPlan); - if (containPythonAction || containPythonResource || mem0Configured) { LOG.debug("Begin initialize PythonEnvironmentManager."); PythonDependencyInfo dependencyInfo = @@ -198,9 +189,7 @@ private boolean isMem0Configured(AgentPlan agentPlan) { && config.get(LongTermMemoryOptions.Mem0.EMBEDDING_MODEL_SETUP) != null && config.get(LongTermMemoryOptions.Mem0.VECTOR_STORE) != null; - boolean containJavaAction = - agentPlan.getActions().values().stream() - .anyMatch(action -> action.getExec() instanceof JavaFunction); + boolean containJavaAction = agentPlan.containsJavaAction(); // Mem0 will call chat model and embedding model in its own thread executor, this behavior // is same as the async execution for cross-language resources, and also requires the fix @@ -301,4 +290,76 @@ public void close() throws Exception { pythonEnvironmentManager.close(); } } + + /** + * A wrapper class that implements {@link EventListener} to delegate events to Python-side + * listeners. + * + *

    Similar to {@link EventRouter#notifyEventProcessed(Event)}, this wrapper handles event + * notification, but specifically for Python listeners. To optimize resource usage and avoid + * redundant Java-to-Python conversions, this wrapper converts the {@link Event} and {@link + * EventContext} into Python objects only once per event notification. It then iterates through + * all registered Python listener entries to invoke their respective methods using the + * pre-converted Python objects. + */ + final class PythonEventListenerWrapper implements EventListener { + + private final List> listenerEntries; + + PythonEventListenerWrapper(List> listenerEntries) { + this.listenerEntries = listenerEntries; + } + + /** + * Processes the event by converting it and its context to Python objects once, then + * delegating to all registered Python listeners. + * + * @param context the event context + * @param event the event to process + */ + @Override + public void onEventProcessed(EventContext context, Event event) { + try { + // Convert java event to python event + final Object pythonEvent = pythonActionExecutor.convertJsonToPythonEvent(event); + // Convert java event context to python event context + final Object pythonEventContext = + pythonResourceAdapter.toPythonEventContext(context); + for (Map.Entry entry : this.listenerEntries) { + final PythonFunction listenerFunction = entry.getValue(); + final Object listenerObject = entry.getKey(); + listenerFunction.call(listenerObject, pythonEventContext, pythonEvent); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * Initializes the event listener for Python-defined event listeners. + * + * @param pythonEventListenerStrings a list of strings representing the Python event listeners, + * in the format "module:class". + * @return an {@link EventListener} that delegates events to the specified Python listeners, or + * {@code null} if the list is null or empty. + */ + public EventListener initForPythonEventListeners(List pythonEventListenerStrings) { + if (pythonEventListenerStrings != null && !pythonEventListenerStrings.isEmpty()) { + final List> listenerEntries = new ArrayList<>(); + for (String listenerDescriptor : pythonEventListenerStrings) { + final String[] parts = listenerDescriptor.split(":"); + final String module = parts[0]; + final String qualName = parts[1]; + final Object pythonListenerObject = + pythonResourceAdapter.initPythonEventListener(listenerDescriptor); + final PythonFunction pythonFunction = new PythonFunction(module, qualName); + pythonFunction.setInterpreter(this.pythonInterpreter); + listenerEntries.add(Map.entry(pythonListenerObject, pythonFunction)); + } + + return new PythonEventListenerWrapper(listenerEntries); + } + return null; + } } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java index 67c80f38d..a23857210 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java @@ -68,6 +68,8 @@ public class PythonActionExecutor { private static final String GET_OUTPUT_FROM_OUTPUT_EVENT = "python_java_utils.get_output_from_output_event"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final PythonInterpreter interpreter; private final AgentPlan agentPlan; private final PythonRunnerContextImpl runnerContext; @@ -108,7 +110,7 @@ public void open() throws Exception { interpreter.invoke( CREATE_FLINK_RUNNER_CONTEXT, runnerContext, - new ObjectMapper().writeValueAsString(agentPlan), + OBJECT_MAPPER.writeValueAsString(agentPlan), pythonAsyncThreadPool, javaResourceAdapter, jobIdentifier); @@ -132,8 +134,7 @@ public String executePythonFunction(PythonFunction function, Event event, int ha interpreter.invoke( FLINK_RUNNER_CONTEXT_SWITCH_ACTION_CONTEXT, pythonRunnerContext, hashOfKey); - String eventJson = new ObjectMapper().writeValueAsString(event); - Object pythonEventObject = interpreter.invoke(CONVERT_JSON_TO_PYTHON_EVENT, eventJson); + final Object pythonEventObject = this.convertJsonToPythonEvent(event); try { Object calledResult = function.call(pythonEventObject, pythonRunnerContext); @@ -153,6 +154,11 @@ public String executePythonFunction(PythonFunction function, Event event, int ha } } + public Object convertJsonToPythonEvent(Event event) throws JsonProcessingException { + String eventJson = OBJECT_MAPPER.writeValueAsString(event); + return interpreter.invoke(CONVERT_JSON_TO_PYTHON_EVENT, eventJson); + } + public Event wrapToInputEvent(Object eventData) throws IOException { checkState(eventData instanceof byte[]); // wrap_to_input_event returns a JSON string diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java index f4284e48e..3db958706 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java @@ -17,6 +17,7 @@ */ package org.apache.flink.agents.runtime.python.utils; +import org.apache.flink.agents.api.EventContext; import org.apache.flink.agents.api.chat.messages.ChatMessage; import org.apache.flink.agents.api.chat.messages.MessageRole; import org.apache.flink.agents.api.prompt.Prompt; @@ -77,6 +78,11 @@ public class PythonResourceAdapterImpl implements PythonResourceAdapter { static final String INVOKE_PYTHON_TOOL = PYTHON_MODULE_PREFIX + "invoke_python_tool"; + static final String FROM_JAVA_EVENT_CONTEXT = PYTHON_MODULE_PREFIX + "from_java_event_context"; + + static final String INSTANTIATE_PYTHON_EVENT_LISTER = + PYTHON_MODULE_PREFIX + "instantiate_python_event_listener"; + private final ResourceContext resourceContext; private final PythonInterpreter interpreter; private final JavaResourceAdapter javaResourceAdapter; @@ -159,6 +165,18 @@ public Object toPythonDocuments(List documents) { return pythonDocuments; } + @Override + public Object toPythonEventContext(EventContext context) { + final String eventType = context.getEventType(); + final String timestamp = context.getTimestamp(); + return interpreter.invoke(FROM_JAVA_EVENT_CONTEXT, eventType, timestamp); + } + + @Override + public Object initPythonEventListener(String target) { + return interpreter.invoke(INSTANTIATE_PYTHON_EVENT_LISTER, target); + } + @Override public List fromPythonDocuments(List pythonDocuments) { List documents = new ArrayList<>(); diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/ResourceCacheTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/ResourceCacheTest.java index a48f618e5..1b6a70a54 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/ResourceCacheTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/ResourceCacheTest.java @@ -19,6 +19,7 @@ package org.apache.flink.agents.runtime; import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.EventContext; import org.apache.flink.agents.api.InputEvent; import org.apache.flink.agents.api.agents.Agent; import org.apache.flink.agents.api.annotation.ChatModelSetup; @@ -198,6 +199,16 @@ public Map getPythonToolMetadata(String module, String qualName) public Object invokePythonTool(String module, String qualName, Map kwargs) { return null; } + + @Override + public Object toPythonEventContext(EventContext context) { + return null; + } + + @Override + public Object initPythonEventListener(String target) { + return null; + } } @Test