From 84d02785608045db12c9097accadaf266101e04c Mon Sep 17 00:00:00 2001 From: twosom Date: Mon, 18 May 2026 12:30:27 +0900 Subject: [PATCH 01/13] feat(python): add EventContext class for python sdk --- python/flink_agents/api/event_context.py | 34 ++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 python/flink_agents/api/event_context.py diff --git a/python/flink_agents/api/event_context.py b/python/flink_agents/api/event_context.py new file mode 100644 index 000000000..cf54912b0 --- /dev/null +++ b/python/flink_agents/api/event_context.py @@ -0,0 +1,34 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from pydantic import BaseModel + + +class EventContext(BaseModel): + """Contextual information about an event, such as its type and timestamp. + + Attributes: + ---------- + eventType : str + The routing key for the event, matching the ``EVENT_TYPE`` constant or type string. + timestamp : str + Timestamp of when the event occurred. + """ + + eventType: str + + timestamp: str From 243d8a03e4fdb5c6786cb97c510d2cd54f0aef2e Mon Sep 17 00:00:00 2001 From: twosom Date: Mon, 18 May 2026 12:31:44 +0900 Subject: [PATCH 02/13] feat(python): add `EVENT_LISTENER` config in AgentConfigOptions --- python/flink_agents/api/core_options.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/flink_agents/api/core_options.py b/python/flink_agents/api/core_options.py index 5b575c3f6..dcd21ab6b 100644 --- a/python/flink_agents/api/core_options.py +++ b/python/flink_agents/api/core_options.py @@ -134,6 +134,12 @@ class AgentConfigOptions(metaclass=AgentConfigOptionsMeta): default=5, ) + EVENT_LISTENERS = ConfigOption( + key="event-listeners", + config_type=list[str], + default=None + ) + class AgentExecutionOptions: """Execution options for Flink Agents.""" From 630228269c8fb0570c0e5d6c73433a4e2cb93519 Mon Sep 17 00:00:00 2001 From: twosom Date: Mon, 18 May 2026 12:43:29 +0900 Subject: [PATCH 03/13] feat(python): added `EventListener` and `EventListenerMeta` class --- python/flink_agents/api/listener/__init__.py | 0 .../api/listener/event_listener.py | 116 ++++++++++++++++++ .../api/tests/test_event_listener.py | 99 +++++++++++++++ 3 files changed, 215 insertions(+) create mode 100644 python/flink_agents/api/listener/__init__.py create mode 100644 python/flink_agents/api/listener/event_listener.py create mode 100644 python/flink_agents/api/tests/test_event_listener.py diff --git a/python/flink_agents/api/listener/__init__.py b/python/flink_agents/api/listener/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/flink_agents/api/listener/event_listener.py b/python/flink_agents/api/listener/event_listener.py new file mode 100644 index 000000000..2089bfbc0 --- /dev/null +++ b/python/flink_agents/api/listener/event_listener.py @@ -0,0 +1,116 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import inspect +from abc import ABC, abstractmethod, ABCMeta + +from flink_agents.api.event_context import EventContext +from flink_agents.api.events.event import Event + + +class EventListenerMeta(ABCMeta): + """Metaclass for EventListener that provides a specialized string representation. + + This metaclass overrides the ``__str__`` method for classes that implement + ``EventListener``. The resulting string format is ``module:class_path.on_event_processed``, + which is specifically designed to be parsed by the agent's runtime (e.g., in Java + via Pemja) to dynamically instantiate the listener class. + + The string representation handles: + + - Standard module-level classes. + - Nested classes (using their full qualified name). + - Classes defined in the ``__main__`` module (attempting to resolve the actual + filename if available). + - Validation to ensure classes are not defined in a local scope (which would + make them inaccessible for remote/dynamic instantiation). + """ + def __str__(cls): + """Return a string representation of the listener class for dynamic instantiation. + + The format is ``module:class_path.on_event_processed``. + Example: ``my_module:MyListener.on_event_processed`` or + ``my_module:Outer.Inner.on_event_processed``. + + Returns: + ------- + str + A string identifier for the class and its handler method. + + Raises: + ------ + ValueError + If the class is defined within a local scope. + """ + class_qualname = cls.__qualname__ + + if "" in class_qualname: + raise ValueError( + f"Cannot instantiate local class in '{class_qualname}'. " + f"Classes defined within a local scope (indicated by '') " + f"are not accessible via module attributes. Move the class to the module level." + ) + + module_obj = inspect.getmodule(cls) + if module_obj is None: + module_name = cls.__module__ + else: + module_name = module_obj.__name__ + + if module_name == "__main__": + if hasattr(module_obj, "__file__"): + import os + file_path = module_obj.__file__ + module_name = os.path.splitext(os.path.basename(file_path))[0] + + return f"{module_name}:{class_qualname}.on_event_processed" + + +class EventListener(ABC, metaclass=EventListenerMeta): + """Interface for event listeners that are notified when events are received for processing. + + EventListener provides a callback mechanism triggered at the beginning of event processing. + This is useful for monitoring, metrics collection, debugging, or triggering side effects based on + event reception. + + Event listeners are executed synchronously when an event is received, before any actions are + triggered. Implementations should be lightweight and avoid blocking operations to prevent + impacting agent performance. + + **Note:** Implementing classes must provide a public no-argument constructor to + allow for dynamic instantiation by the agent. + """ + + @abstractmethod + def on_event_processed(self, context: EventContext, event: Event) -> None: + """Called when an event is being processed. + + This method is invoked when an event is received by the agent, before it is processed by + any actions. The listener can inspect the event and its context to perform additional + processing such as logging, metrics collection, or triggering external notifications. + + **Important:** This method should not throw exceptions as they will be caught + and logged but will not affect the main event processing flow. Implementations should handle + their own error recovery. + + Parameters: + ---------- + context : EventContext + The context associated with the event + event : Event + The event that is being processed + """ \ No newline at end of file diff --git a/python/flink_agents/api/tests/test_event_listener.py b/python/flink_agents/api/tests/test_event_listener.py new file mode 100644 index 000000000..f99d416e4 --- /dev/null +++ b/python/flink_agents/api/tests/test_event_listener.py @@ -0,0 +1,99 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import unittest + +from flink_agents.api.event_context import EventContext +from flink_agents.api.events.event import Event +from flink_agents.api.listener.event_listener import EventListener + + +class GlobalListener(EventListener): + def on_event_processed(self, context: EventContext, event: Event) -> None: + pass + + +class MainListenerMock(EventListener): + def on_event_processed(self, context: EventContext, event: Event) -> None: + pass + + +class TestEventListener(unittest.TestCase): + def test_global_listener_str(self): + """Test the string representation of a global EventListener class.""" + module_name = GlobalListener.__module__ + expected = f"{module_name}:GlobalListener.on_event_processed" + self.assertEqual(str(GlobalListener), expected) + + def test_top_level_nested_listener_str(self): + """Test the string representation of a nested EventListener class defined at module level.""" + global TopOuter + + class TopOuter: + class TopInner(EventListener): + def on_event_processed(self, context: EventContext, event: Event) -> None: + pass + + module_name = TopOuter.TopInner.__module__ + expected = f"{module_name}:TopOuter.TopInner.on_event_processed" + self.assertEqual(str(TopOuter.TopInner), expected) + + def test_local_listener_raises_error(self): + """Test that defining an EventListener in a local scope raises a ValueError.""" + def some_function(): + class LocalListener(EventListener): + def on_event_processed(self, context: EventContext, event: Event) -> None: + pass + + return LocalListener + + LocalListener = some_function() + with self.assertRaisesRegex(ValueError, "Cannot instantiate local class"): + str(LocalListener) + + def test_main_module_with_file_handling(self): + """Test string representation when the module is '__main__' and has a '__file__' attribute.""" + from unittest.mock import patch, MagicMock + + mock_module = MagicMock() + mock_module.__name__ = "__main__" + mock_module.__file__ = "/path/to/my_script.py" + + with patch("inspect.getmodule", return_value=mock_module): + self.assertEqual(str(MainListenerMock), "my_script:MainListenerMock.on_event_processed") + + def test_main_module_without_file_handling(self): + """Test string representation when the module is '__main__' but lacks a '__file__' attribute.""" + from unittest.mock import patch, MagicMock + + mock_module = MagicMock() + mock_module.__name__ = "__main__" + # __file__ attribute is missing + + with patch("inspect.getmodule", return_value=mock_module): + # Should fallback to "__main__" if __file__ is missing + self.assertEqual(str(MainListenerMock), "__main__:MainListenerMock.on_event_processed") + + def test_inspect_getmodule_none_fallback(self): + """Test fallback to '__module__' when 'inspect.getmodule' returns None.""" + from unittest.mock import patch + + with patch("inspect.getmodule", return_value=None): + # Should fallback to cls.__module__ + module_name = MainListenerMock.__module__ + expected = f"{module_name}:MainListenerMock.on_event_processed" + self.assertEqual(str(MainListenerMock), expected) From 072f43f4c41ca89daa0caebf7e1c699da4ba95ef Mon Sep 17 00:00:00 2001 From: twosom Date: Mon, 18 May 2026 21:24:19 +0900 Subject: [PATCH 04/13] chore : formatting --- python/flink_agents/api/event_context.py | 3 +- .../api/listener/event_listener.py | 53 ++++++++++--------- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/python/flink_agents/api/event_context.py b/python/flink_agents/api/event_context.py index cf54912b0..3602d33a3 100644 --- a/python/flink_agents/api/event_context.py +++ b/python/flink_agents/api/event_context.py @@ -24,7 +24,8 @@ class EventContext(BaseModel): Attributes: ---------- eventType : str - The routing key for the event, matching the ``EVENT_TYPE`` constant or type string. + The routing key for the event, matching the ``EVENT_TYPE`` constant or + type string. timestamp : str Timestamp of when the event occurred. """ diff --git a/python/flink_agents/api/listener/event_listener.py b/python/flink_agents/api/listener/event_listener.py index 2089bfbc0..4404b960f 100644 --- a/python/flink_agents/api/listener/event_listener.py +++ b/python/flink_agents/api/listener/event_listener.py @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################# import inspect -from abc import ABC, abstractmethod, ABCMeta +from abc import ABC, ABCMeta, abstractmethod from flink_agents.api.event_context import EventContext from flink_agents.api.events.event import Event @@ -26,9 +26,10 @@ class EventListenerMeta(ABCMeta): """Metaclass for EventListener that provides a specialized string representation. This metaclass overrides the ``__str__`` method for classes that implement - ``EventListener``. The resulting string format is ``module:class_path.on_event_processed``, - which is specifically designed to be parsed by the agent's runtime (e.g., in Java - via Pemja) to dynamically instantiate the listener class. + ``EventListener``. The resulting string format is + ``module:class_path.on_event_processed``, which is specifically designed to be + parsed by the agent's runtime (e.g., in Java via Pemja) to dynamically + instantiate the listener class. The string representation handles: @@ -39,8 +40,9 @@ class EventListenerMeta(ABCMeta): - Validation to ensure classes are not defined in a local scope (which would make them inaccessible for remote/dynamic instantiation). """ - def __str__(cls): - """Return a string representation of the listener class for dynamic instantiation. + def __str__(cls) -> str: + """Return a string representation of the listener class for dynamic + instantiation. The format is ``module:class_path.on_event_processed``. Example: ``my_module:MyListener.on_event_processed`` or @@ -59,11 +61,12 @@ def __str__(cls): class_qualname = cls.__qualname__ if "" in class_qualname: - raise ValueError( + err_msg = ( f"Cannot instantiate local class in '{class_qualname}'. " f"Classes defined within a local scope (indicated by '') " f"are not accessible via module attributes. Move the class to the module level." ) + raise ValueError(err_msg) module_obj = inspect.getmodule(cls) if module_obj is None: @@ -73,23 +76,24 @@ def __str__(cls): if module_name == "__main__": if hasattr(module_obj, "__file__"): - import os - file_path = module_obj.__file__ - module_name = os.path.splitext(os.path.basename(file_path))[0] + from pathlib import Path + file_path = Path(module_obj.__file__) + module_name = file_path.stem return f"{module_name}:{class_qualname}.on_event_processed" class EventListener(ABC, metaclass=EventListenerMeta): - """Interface for event listeners that are notified when events are received for processing. + """Interface for event listeners that are notified when events are received + for processing. - EventListener provides a callback mechanism triggered at the beginning of event processing. - This is useful for monitoring, metrics collection, debugging, or triggering side effects based on - event reception. + EventListener provides a callback mechanism triggered at the beginning of + event processing. This is useful for monitoring, metrics collection, + debugging, or triggering side effects based on event reception. - Event listeners are executed synchronously when an event is received, before any actions are - triggered. Implementations should be lightweight and avoid blocking operations to prevent - impacting agent performance. + Event listeners are executed synchronously when an event is received, + before any actions are triggered. Implementations should be lightweight + and avoid blocking operations to prevent impacting agent performance. **Note:** Implementing classes must provide a public no-argument constructor to allow for dynamic instantiation by the agent. @@ -99,13 +103,14 @@ class EventListener(ABC, metaclass=EventListenerMeta): def on_event_processed(self, context: EventContext, event: Event) -> None: """Called when an event is being processed. - This method is invoked when an event is received by the agent, before it is processed by - any actions. The listener can inspect the event and its context to perform additional - processing such as logging, metrics collection, or triggering external notifications. + This method is invoked when an event is received by the agent, before + it is processed by any actions. The listener can inspect the event and + its context to perform additional processing such as logging, metrics + collection, or triggering external notifications. - **Important:** This method should not throw exceptions as they will be caught - and logged but will not affect the main event processing flow. Implementations should handle - their own error recovery. + **Important:** This method should not throw exceptions as they will be + caught and logged but will not affect the main event processing flow. + Implementations should handle their own error recovery. Parameters: ---------- @@ -113,4 +118,4 @@ def on_event_processed(self, context: EventContext, event: Event) -> None: The context associated with the event event : Event The event that is being processed - """ \ No newline at end of file + """ From 50e15af3582940c8117a5cd12a8e0b98f95642ec Mon Sep 17 00:00:00 2001 From: twosom Date: Mon, 18 May 2026 21:31:25 +0900 Subject: [PATCH 05/13] feat(python): add event listener instantiation and context conversion in python_java_utils --- .../flink_agents/runtime/python_java_utils.py | 70 ++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/python/flink_agents/runtime/python_java_utils.py b/python/flink_agents/runtime/python_java_utils.py index 23ed4f5c1..2176df33d 100644 --- a/python/flink_agents/runtime/python_java_utils.py +++ b/python/flink_agents/runtime/python_java_utils.py @@ -22,6 +22,7 @@ import cloudpickle from flink_agents.api.chat_message import ChatMessage, MessageRole +from flink_agents.api.event_context import EventContext from flink_agents.api.events.event import Event, InputEvent, OutputEvent from flink_agents.api.memory.long_term_memory import MemorySet, MemorySetItem from flink_agents.api.resource import Resource, ResourceType, get_resource_class @@ -76,7 +77,7 @@ def get_output_from_output_event(event_json: str) -> Any: def create_resource( - resource_module: str, resource_clazz: str, func_kwargs: Dict[str, Any] + resource_module: str, resource_clazz: str, func_kwargs: Dict[str, Any] ) -> Resource: """Dynamically create a resource instance from module and class name. @@ -321,6 +322,73 @@ def get_java_tool_metadata_from_tool(tool: Tool) -> typing.Dict[str, str]: } +def from_java_event_context(eventType: str, timestamp: str) -> EventContext: + """Create an ``EventContext`` from Java ``EventContext``'s property.""" + return EventContext(eventType=eventType, timestamp=timestamp) + + +def _resolve_module(module_name: str, class_qualname: str) -> Any: + """Resolve the module for the given module name and class qualname. + + This helper handles special cases like '__main__' and falls back to + searching in sys.modules if standard import fails. + """ + import importlib + import sys + + if module_name == "__main__": + module = sys.modules["__main__"] + first_part = class_qualname.split(".")[0] + if not hasattr(module, first_part): + for m in list(sys.modules.values()): + if (m and hasattr(m, "__name__") and m.__name__ != "__main__" + and hasattr(m, first_part)): + return m + return importlib.import_module(module_name) + return module + + try: + return importlib.import_module(module_name) + except ImportError: + if module_name in sys.modules: + return sys.modules[module_name] + + main_module = sys.modules.get("__main__") + first_part = class_qualname.split(".")[0] + if main_module and hasattr(main_module, first_part): + return main_module + raise + + +def instantiate_python_event_listener(target_string: str) -> Any: + """Instantiate a Python event listener from a target string. + + The target string should be in the format ``module:class_path.method_name``. + The method name (typically ``on_event_processed``) is stripped to find the class, + which is then instantiated using its no-argument constructor. + + This method handles various module resolution scenarios, including: + - Standard modules available in ``sys.path`` + - The ``__main__`` module and its imported sub-modules + - Nested classes within a module + """ + if ":" not in target_string: + err_msg = f"Invalid format: '{target_string}'. Expected 'module:path'." + raise ValueError(err_msg) + + module_name, path_str = target_string.split(":", 1) + + # Strip the method name (last part after the last dot) to get the class qualname + class_qualname = path_str.rsplit(".", 1)[0] if "." in path_str else path_str + + module = _resolve_module(module_name, class_qualname) + + target_class = module + for part in class_qualname.split("."): + target_class = getattr(target_class, part) + + return target_class() + def get_long_term_memory(ctx: Any) -> Any: """Return ``ctx.long_term_memory`` (or ``None``). Used by the Java side to avoid relying on Pemja's ``PyObject.getAttr`` semantics for attributes that From b0f600ab8fa8a376ce3e9af208cbf495d81d8b83 Mon Sep 17 00:00:00 2001 From: twosom Date: Mon, 18 May 2026 21:32:38 +0900 Subject: [PATCH 06/13] refactor : refact test_event_listener test code --- .../api/tests/test_event_listener.py | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/python/flink_agents/api/tests/test_event_listener.py b/python/flink_agents/api/tests/test_event_listener.py index f99d416e4..831c5a8c6 100644 --- a/python/flink_agents/api/tests/test_event_listener.py +++ b/python/flink_agents/api/tests/test_event_listener.py @@ -37,7 +37,7 @@ def test_global_listener_str(self): """Test the string representation of a global EventListener class.""" module_name = GlobalListener.__module__ expected = f"{module_name}:GlobalListener.on_event_processed" - self.assertEqual(str(GlobalListener), expected) + assert str(GlobalListener) == expected def test_top_level_nested_listener_str(self): """Test the string representation of a nested EventListener class defined at module level.""" @@ -50,35 +50,37 @@ def on_event_processed(self, context: EventContext, event: Event) -> None: module_name = TopOuter.TopInner.__module__ expected = f"{module_name}:TopOuter.TopInner.on_event_processed" - self.assertEqual(str(TopOuter.TopInner), expected) + assert str(TopOuter.TopInner) == expected def test_local_listener_raises_error(self): """Test that defining an EventListener in a local scope raises a ValueError.""" - def some_function(): + import pytest + + def some_function() -> type: class LocalListener(EventListener): def on_event_processed(self, context: EventContext, event: Event) -> None: pass return LocalListener - LocalListener = some_function() - with self.assertRaisesRegex(ValueError, "Cannot instantiate local class"): - str(LocalListener) + local_listener_cls = some_function() + with pytest.raises(ValueError, match="Cannot instantiate local class"): + str(local_listener_cls) def test_main_module_with_file_handling(self): """Test string representation when the module is '__main__' and has a '__file__' attribute.""" - from unittest.mock import patch, MagicMock + from unittest.mock import MagicMock, patch mock_module = MagicMock() mock_module.__name__ = "__main__" mock_module.__file__ = "/path/to/my_script.py" with patch("inspect.getmodule", return_value=mock_module): - self.assertEqual(str(MainListenerMock), "my_script:MainListenerMock.on_event_processed") + assert str(MainListenerMock) == "my_script:MainListenerMock.on_event_processed" def test_main_module_without_file_handling(self): """Test string representation when the module is '__main__' but lacks a '__file__' attribute.""" - from unittest.mock import patch, MagicMock + from unittest.mock import MagicMock, patch mock_module = MagicMock() mock_module.__name__ = "__main__" @@ -86,7 +88,7 @@ def test_main_module_without_file_handling(self): with patch("inspect.getmodule", return_value=mock_module): # Should fallback to "__main__" if __file__ is missing - self.assertEqual(str(MainListenerMock), "__main__:MainListenerMock.on_event_processed") + assert str(MainListenerMock) == "__main__:MainListenerMock.on_event_processed" def test_inspect_getmodule_none_fallback(self): """Test fallback to '__module__' when 'inspect.getmodule' returns None.""" @@ -96,4 +98,4 @@ def test_inspect_getmodule_none_fallback(self): # Should fallback to cls.__module__ module_name = MainListenerMock.__module__ expected = f"{module_name}:MainListenerMock.on_event_processed" - self.assertEqual(str(MainListenerMock), expected) + assert str(MainListenerMock) == expected From 6db6fa2e27ae3542bc5a72c91ca1a6f0825756bf Mon Sep 17 00:00:00 2001 From: twosom Date: Mon, 18 May 2026 21:38:29 +0900 Subject: [PATCH 07/13] feat(java): implement Java-side adapter to bridge Python event listener utilities # Conflicts: # api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java # runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java # runtime/src/test/java/org/apache/flink/agents/runtime/ResourceCacheTest.java --- .../resource/python/PythonResourceAdapter.java | 17 +++++++++++++++++ .../utils/PythonResourceAdapterImpl.java | 18 ++++++++++++++++++ .../agents/runtime/ResourceCacheTest.java | 11 +++++++++++ 3 files changed, 46 insertions(+) diff --git a/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java b/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java index 03eb8248c..1de29d501 100644 --- a/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java +++ b/api/src/main/java/org/apache/flink/agents/api/resource/python/PythonResourceAdapter.java @@ -18,6 +18,7 @@ package org.apache.flink.agents.api.resource.python; +import org.apache.flink.agents.api.EventContext; import org.apache.flink.agents.api.chat.messages.ChatMessage; import org.apache.flink.agents.api.tools.Tool; import org.apache.flink.agents.api.vectorstores.Document; @@ -160,4 +161,20 @@ public interface PythonResourceAdapter { * @return the raw return value from the Python callable */ Object invokePythonTool(String module, String qualName, Map kwargs); + + /** + * Converts a Java {@link EventContext} object to its Python equivalent. + * + * @param context the Java event context to convert + * @return the Python representation of the event context + */ + Object toPythonEventContext(EventContext context); + + /** + * Initializes a Python event listener instance from the specified descriptor. + * + * @param target the listener descriptor in "module:class" format + * @return the initialized Python listener object + */ + Object initPythonEventListener(String target); } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java index f4284e48e..3db958706 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonResourceAdapterImpl.java @@ -17,6 +17,7 @@ */ package org.apache.flink.agents.runtime.python.utils; +import org.apache.flink.agents.api.EventContext; import org.apache.flink.agents.api.chat.messages.ChatMessage; import org.apache.flink.agents.api.chat.messages.MessageRole; import org.apache.flink.agents.api.prompt.Prompt; @@ -77,6 +78,11 @@ public class PythonResourceAdapterImpl implements PythonResourceAdapter { static final String INVOKE_PYTHON_TOOL = PYTHON_MODULE_PREFIX + "invoke_python_tool"; + static final String FROM_JAVA_EVENT_CONTEXT = PYTHON_MODULE_PREFIX + "from_java_event_context"; + + static final String INSTANTIATE_PYTHON_EVENT_LISTER = + PYTHON_MODULE_PREFIX + "instantiate_python_event_listener"; + private final ResourceContext resourceContext; private final PythonInterpreter interpreter; private final JavaResourceAdapter javaResourceAdapter; @@ -159,6 +165,18 @@ public Object toPythonDocuments(List documents) { return pythonDocuments; } + @Override + public Object toPythonEventContext(EventContext context) { + final String eventType = context.getEventType(); + final String timestamp = context.getTimestamp(); + return interpreter.invoke(FROM_JAVA_EVENT_CONTEXT, eventType, timestamp); + } + + @Override + public Object initPythonEventListener(String target) { + return interpreter.invoke(INSTANTIATE_PYTHON_EVENT_LISTER, target); + } + @Override public List fromPythonDocuments(List pythonDocuments) { List documents = new ArrayList<>(); diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/ResourceCacheTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/ResourceCacheTest.java index a48f618e5..1b6a70a54 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/ResourceCacheTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/ResourceCacheTest.java @@ -19,6 +19,7 @@ package org.apache.flink.agents.runtime; import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.EventContext; import org.apache.flink.agents.api.InputEvent; import org.apache.flink.agents.api.agents.Agent; import org.apache.flink.agents.api.annotation.ChatModelSetup; @@ -198,6 +199,16 @@ public Map getPythonToolMetadata(String module, String qualName) public Object invokePythonTool(String module, String qualName, Map kwargs) { return null; } + + @Override + public Object toPythonEventContext(EventContext context) { + return null; + } + + @Override + public Object initPythonEventListener(String target) { + return null; + } } @Test From 5b4042ed85949f0d5f3c934abb20497ff73b6d94 Mon Sep 17 00:00:00 2001 From: twosom Date: Mon, 18 May 2026 21:39:29 +0900 Subject: [PATCH 08/13] refactor: add utility method in AgentPlan --- .../apache/flink/agents/plan/AgentPlan.java | 21 ++++++++++++++++++ .../runtime/operator/PythonBridgeManager.java | 22 +++---------------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java index 605500ca1..6102f6928 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java @@ -168,6 +168,27 @@ public Map getConfigData() { return config.getConfData(); } + public boolean containsPythonAction() { + return this.actions.values().stream() + .anyMatch(action -> action.getExec() instanceof PythonFunction); + } + + public boolean containsJavaAction() { + return this.actions.values().stream() + .anyMatch(action -> action.getExec() instanceof JavaFunction); + } + + public boolean containsPythonResource() { + return this.resourceProviders.values().stream() + .anyMatch( + resourceProviderMap -> + resourceProviderMap.values().stream() + .anyMatch( + resourceProvider -> + resourceProvider + instanceof PythonResourceProvider)); + } + private void writeObject(ObjectOutputStream out) throws IOException { String serializedStr = new ObjectMapper().writeValueAsString(this); out.writeUTF(serializedStr); diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/PythonBridgeManager.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/PythonBridgeManager.java index cea9cce54..7bea695e2 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/PythonBridgeManager.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/PythonBridgeManager.java @@ -125,23 +125,9 @@ void open( String jobIdentifier, ClassLoader userCodeClassLoader) throws Exception { - boolean containPythonAction = - agentPlan.getActions().values().stream() - .anyMatch(action -> action.getExec() instanceof PythonFunction); - - boolean containPythonResource = - agentPlan.getResourceProviders().values().stream() - .anyMatch( - resourceProviderMap -> - resourceProviderMap.values().stream() - .anyMatch( - resourceProvider -> - resourceProvider - instanceof - PythonResourceProvider)); - + boolean containPythonAction = agentPlan.containsPythonAction(); + boolean containPythonResource = agentPlan.containsPythonResource(); boolean mem0Configured = isMem0Configured(agentPlan); - if (containPythonAction || containPythonResource || mem0Configured) { LOG.debug("Begin initialize PythonEnvironmentManager."); PythonDependencyInfo dependencyInfo = @@ -198,9 +184,7 @@ private boolean isMem0Configured(AgentPlan agentPlan) { && config.get(LongTermMemoryOptions.Mem0.EMBEDDING_MODEL_SETUP) != null && config.get(LongTermMemoryOptions.Mem0.VECTOR_STORE) != null; - boolean containJavaAction = - agentPlan.getActions().values().stream() - .anyMatch(action -> action.getExec() instanceof JavaFunction); + boolean containJavaAction = agentPlan.containsJavaAction(); // Mem0 will call chat model and embedding model in its own thread executor, this behavior // is same as the async execution for cross-language resources, and also requires the fix From d62bb2d8fc0ed030202a1faf671cf8dd8cfc436c Mon Sep 17 00:00:00 2001 From: twosom Date: Mon, 18 May 2026 21:40:56 +0900 Subject: [PATCH 09/13] refactor(java): extract event conversion logic in PythonActionExecutor - Move event conversion to a public method 'convertJsonToPythonEvent' for listener support - Optimize performance by reusing a static ObjectMapper instance - Clean up redundant ObjectMapper instantiations --- .../runtime/python/utils/PythonActionExecutor.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java index 67c80f38d..a23857210 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java @@ -68,6 +68,8 @@ public class PythonActionExecutor { private static final String GET_OUTPUT_FROM_OUTPUT_EVENT = "python_java_utils.get_output_from_output_event"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final PythonInterpreter interpreter; private final AgentPlan agentPlan; private final PythonRunnerContextImpl runnerContext; @@ -108,7 +110,7 @@ public void open() throws Exception { interpreter.invoke( CREATE_FLINK_RUNNER_CONTEXT, runnerContext, - new ObjectMapper().writeValueAsString(agentPlan), + OBJECT_MAPPER.writeValueAsString(agentPlan), pythonAsyncThreadPool, javaResourceAdapter, jobIdentifier); @@ -132,8 +134,7 @@ public String executePythonFunction(PythonFunction function, Event event, int ha interpreter.invoke( FLINK_RUNNER_CONTEXT_SWITCH_ACTION_CONTEXT, pythonRunnerContext, hashOfKey); - String eventJson = new ObjectMapper().writeValueAsString(event); - Object pythonEventObject = interpreter.invoke(CONVERT_JSON_TO_PYTHON_EVENT, eventJson); + final Object pythonEventObject = this.convertJsonToPythonEvent(event); try { Object calledResult = function.call(pythonEventObject, pythonRunnerContext); @@ -153,6 +154,11 @@ public String executePythonFunction(PythonFunction function, Event event, int ha } } + public Object convertJsonToPythonEvent(Event event) throws JsonProcessingException { + String eventJson = OBJECT_MAPPER.writeValueAsString(event); + return interpreter.invoke(CONVERT_JSON_TO_PYTHON_EVENT, eventJson); + } + public Event wrapToInputEvent(Object eventData) throws IOException { checkState(eventData instanceof byte[]); // wrap_to_input_event returns a JSON string From a23239cf11b8cec6d854bb0be1237c52e0e9c266 Mon Sep 17 00:00:00 2001 From: twosom Date: Mon, 18 May 2026 21:46:56 +0900 Subject: [PATCH 10/13] feat(runtime): wire up Python event listeners in ActionExecutionOperator - Implement PythonEventListenerWrapper in PythonBridgeManager to bridge events to Python - Add logic to initialize and register Python listeners in ActionExecutionOperator - Optimize Java-to-Python event conversion by performing it once per event notification --- .../operator/ActionExecutionOperator.java | 15 +++- .../runtime/operator/PythonBridgeManager.java | 79 ++++++++++++++++++- 2 files changed, 91 insertions(+), 3 deletions(-) diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java index a0a3813da..d4f54ebd9 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java @@ -21,6 +21,7 @@ import org.apache.flink.agents.api.OutputEvent; import org.apache.flink.agents.api.agents.AgentExecutionOptions; import org.apache.flink.agents.api.context.MemoryUpdate; +import org.apache.flink.agents.api.listener.EventListener; import org.apache.flink.agents.plan.AgentPlan; import org.apache.flink.agents.plan.JavaFunction; import org.apache.flink.agents.plan.PythonFunction; @@ -58,6 +59,7 @@ import java.util.List; import java.util.Optional; +import static org.apache.flink.agents.api.configuration.AgentConfigOptions.EVENT_LISTENERS; import static org.apache.flink.agents.api.configuration.AgentConfigOptions.JOB_IDENTIFIER; import static org.apache.flink.util.Preconditions.checkState; @@ -192,8 +194,17 @@ public void open() throws Exception { // Initialize the event logger if it is set. eventRouter.initEventLogger(getRuntimeContext()); - // Initialize user event listeners from configuration - eventRouter.initEventListeners(getRuntimeContext()); + if (pythonBridge.isInitialized()) { + final EventListener pythonEventListenerWrapper = + this.pythonBridge.initForPythonEventListeners( + agentPlan.getConfig().get(EVENT_LISTENERS)); + if (pythonEventListenerWrapper != null) { + this.eventRouter.addEventListener(pythonEventListenerWrapper); + } + } else { + // Initialize user event listeners from configuration + eventRouter.initEventListeners(getRuntimeContext()); + } // Since an operator restart may change the key range it manages due to changes in // parallelism, diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/PythonBridgeManager.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/PythonBridgeManager.java index 7bea695e2..6e21ffab7 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/PythonBridgeManager.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/PythonBridgeManager.java @@ -17,9 +17,11 @@ */ package org.apache.flink.agents.runtime.operator; +import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.EventContext; +import org.apache.flink.agents.api.listener.EventListener; import org.apache.flink.agents.api.memory.LongTermMemoryOptions; import org.apache.flink.agents.plan.AgentPlan; -import org.apache.flink.agents.plan.JavaFunction; import org.apache.flink.agents.plan.PythonFunction; import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider; import org.apache.flink.agents.runtime.PythonMCPResourceDiscovery; @@ -42,7 +44,10 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; +import java.util.Map; import static org.apache.flink.agents.plan.actions.Utils.requiredVersions; import static org.apache.flink.agents.plan.actions.Utils.supportAsync; @@ -285,4 +290,76 @@ public void close() throws Exception { pythonEnvironmentManager.close(); } } + + /** + * A wrapper class that implements {@link EventListener} to delegate events to Python-side + * listeners. + * + *

Similar to {@link EventRouter#notifyEventProcessed(Event)}, this wrapper handles event + * notification, but specifically for Python listeners. To optimize resource usage and avoid + * redundant Java-to-Python conversions, this wrapper converts the {@link Event} and {@link + * EventContext} into Python objects only once per event notification. It then iterates through + * all registered Python listener entries to invoke their respective methods using the + * pre-converted Python objects. + */ + final class PythonEventListenerWrapper implements EventListener { + + private final List> listenerEntries; + + PythonEventListenerWrapper(List> listenerEntries) { + this.listenerEntries = listenerEntries; + } + + /** + * Processes the event by converting it and its context to Python objects once, then + * delegating to all registered Python listeners. + * + * @param context the event context + * @param event the event to process + */ + @Override + public void onEventProcessed(EventContext context, Event event) { + try { + // Convert java event to python event + final Object pythonEvent = pythonActionExecutor.convertJsonToPythonEvent(event); + // Convert java event context to python event context + final Object pythonEventContext = + pythonResourceAdapter.toPythonEventContext(context); + for (Map.Entry entry : this.listenerEntries) { + final PythonFunction listenerFunction = entry.getValue(); + final Object listenerObject = entry.getKey(); + listenerFunction.call(listenerObject, pythonEventContext, pythonEvent); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * Initializes the event listener for Python-defined event listeners. + * + * @param pythonEventListenerStrings a list of strings representing the Python event listeners, + * in the format "module:class". + * @return an {@link EventListener} that delegates events to the specified Python listeners, or + * {@code null} if the list is null or empty. + */ + public EventListener initForPythonEventListeners(List pythonEventListenerStrings) { + if (pythonEventListenerStrings != null && !pythonEventListenerStrings.isEmpty()) { + final List> listenerEntries = new ArrayList<>(); + for (String listenerDescriptor : pythonEventListenerStrings) { + final String[] parts = listenerDescriptor.split(":"); + final String module = parts[0]; + final String qualName = parts[1]; + final Object pythonListenerObject = + pythonResourceAdapter.initPythonEventListener(listenerDescriptor); + final PythonFunction pythonFunction = new PythonFunction(module, qualName); + pythonFunction.setInterpreter(this.pythonInterpreter); + listenerEntries.add(Map.entry(pythonListenerObject, pythonFunction)); + } + + return new PythonEventListenerWrapper(listenerEntries); + } + return null; + } } From b880ad29f324959d94ded41923a75544fb1e9e3d Mon Sep 17 00:00:00 2001 From: twosom Date: Mon, 18 May 2026 21:51:21 +0900 Subject: [PATCH 11/13] feat(python): support event listeners in LocalRunner --- python/flink_agents/runtime/local_runner.py | 32 ++++++ .../tests/test_event_listener_runtime.py | 105 ++++++++++++++++++ 2 files changed, 137 insertions(+) create mode 100644 python/flink_agents/runtime/tests/test_event_listener_runtime.py diff --git a/python/flink_agents/runtime/local_runner.py b/python/flink_agents/runtime/local_runner.py index 078fd7f1d..21642a42b 100644 --- a/python/flink_agents/runtime/local_runner.py +++ b/python/flink_agents/runtime/local_runner.py @@ -16,6 +16,7 @@ # limitations under the License. ################################################################################# import asyncio +import datetime import logging import uuid from collections import deque @@ -25,7 +26,10 @@ from typing_extensions import override from flink_agents.api.agents.agent import Agent +from flink_agents.api.core_options import AgentConfigOptions +from flink_agents.api.event_context import EventContext from flink_agents.api.events.event import Event, InputEvent, OutputEvent +from flink_agents.api.listener.event_listener import EventListener from flink_agents.api.memory.long_term_memory import BaseLongTermMemory from flink_agents.api.memory_object import MemoryObject, MemoryType from flink_agents.api.metric_group import MetricGroup @@ -281,6 +285,7 @@ class LocalRunner(AgentRunner): __keyed_contexts: Dict[Any, LocalRunnerContext] __outputs: List[Dict[str, Any]] __config: AgentConfiguration + __listeners: List[EventListener] def __init__(self, agent: Agent, config: AgentConfiguration) -> None: """Initialize the runner with the provided agent. @@ -291,11 +296,19 @@ def __init__(self, agent: Agent, config: AgentConfiguration) -> None: The agent class to convert and run. """ from flink_agents.plan.agent_plan import AgentPlan + from flink_agents.runtime.python_java_utils import ( + instantiate_python_event_listener, + ) self.__agent_plan = AgentPlan.from_agent(agent, config) self.__keyed_contexts = {} self.__outputs = [] self.__config = config + self.__listeners = [] + listener_paths = config.get(AgentConfigOptions.EVENT_LISTENERS) + if listener_paths: + for path in listener_paths: + self.__listeners.append(instantiate_python_event_listener(path)) @override def run(self, **data: Dict[str, Any]) -> Any: @@ -337,6 +350,16 @@ def run(self, **data: Dict[str, Any]) -> Any: while len(context.events) > 0: event = context.events.popleft() + + # Trigger listeners before processing the event + if self.__listeners: + event_context = EventContext( + eventType=event.get_type(), + timestamp=datetime.datetime.now().isoformat(), + ) + for listener in self.__listeners: + self._trigger_listener(listener, event_context, event) + if isinstance(event, OutputEvent): self.__outputs.append({key: event.output}) continue @@ -358,6 +381,15 @@ def run(self, **data: Dict[str, Any]) -> Any: raise return key + def _trigger_listener( + self, listener: EventListener, event_context: EventContext, event: Event + ) -> None: + """Trigger a single listener with error handling.""" + try: + listener.on_event_processed(event_context, event) + except Exception: + logger.exception("Error in EventListener execution") + def get_outputs(self) -> List[Dict[str, Any]]: """Get the outputs generated by agent execution. diff --git a/python/flink_agents/runtime/tests/test_event_listener_runtime.py b/python/flink_agents/runtime/tests/test_event_listener_runtime.py new file mode 100644 index 000000000..e5ce0a36a --- /dev/null +++ b/python/flink_agents/runtime/tests/test_event_listener_runtime.py @@ -0,0 +1,105 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import sys +import unittest + +from flink_agents.api.agents.agent import Agent +from flink_agents.api.decorators import action +from flink_agents.api.event_context import EventContext +from flink_agents.api.events.event import Event, InputEvent +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.api.listener.event_listener import EventListener +from flink_agents.api.runner_context import RunnerContext +from flink_agents.runtime.python_java_utils import instantiate_python_event_listener + + +class MockListener(EventListener): + def __init__(self) -> None: + self.called_events = [] + + def on_event_processed(self, context: EventContext, event: Event) -> None: + self.called_events.append(event) + + +class SimpleAgent(Agent): + @action(InputEvent.EVENT_TYPE) + @staticmethod + def process(event: Event, ctx: RunnerContext) -> None: + pass + + +class TestEventListenerRuntime(unittest.TestCase): + def test_instantiate_main_module(self): + """Test instantiation of a listener from the __main__ module.""" + # Add MockListener to sys.modules['__main__'] for testing purposes + main_mod = sys.modules['__main__'] + main_mod.MockListener = MockListener + + target = "__main__:MockListener.on_event_processed" + listener = instantiate_python_event_listener(target) + assert isinstance(listener, EventListener) + assert listener.__class__.__name__ == "MockListener" + + def test_instantiate_invalid_format(self): + """Test that invalid format raises ValueError.""" + import pytest + with pytest.raises(ValueError, match="Invalid format"): + instantiate_python_event_listener("InvalidFormat") + + def test_instantiate_module_not_found(self): + """Test that non-existent module raises ImportError.""" + import pytest + with pytest.raises(ImportError): + instantiate_python_event_listener("non_existent_module:MyListener.on_event_processed") + + def test_instantiate_class_not_found(self): + """Test that non-existent class raises AttributeError.""" + import pytest + with pytest.raises(AttributeError): + instantiate_python_event_listener("flink_agents.api.listener.event_listener:NonExistent.on_event_processed") + + def test_listener_integration_with_local_runner(self): + """Test listener integration with LocalRunner. + Note: Currently LocalRunner might not support event listeners directly + as they are often handled on the Java side in a real Flink job. + But we can test if the configuration is correctly passed. + """ + from flink_agents.api.core_options import AgentConfigOptions + + env = AgentsExecutionEnvironment.get_execution_environment() + config = env.get_config() + + # Define a listener in this module + global MyTestListener + + class MyTestListener(EventListener): + call_count = 0 + + def on_event_processed(self, context: EventContext, event: Event) -> None: + MyTestListener.call_count += 1 + + listener_str = f"{__name__}:MyTestListener.on_event_processed" + config.set(AgentConfigOptions.EVENT_LISTENERS, [listener_str]) + + input_list = [{"key": "k1", "value": "v1"}] + agent = SimpleAgent() + env.from_list(input_list).apply(agent) + env.execute() + + # Now LocalRunner supports listeners, we can assert call_count + assert MyTestListener.call_count > 0 From 8fe6ee69b822f2f323e33ee88faedd594e6f0e05 Mon Sep 17 00:00:00 2001 From: twosom Date: Mon, 18 May 2026 22:05:31 +0900 Subject: [PATCH 12/13] doc : add event listener doc for python --- .../docs/development/event_listener.md | 113 ++++++++++++++++++ docs/content/docs/operations/configuration.md | 2 +- 2 files changed, 114 insertions(+), 1 deletion(-) create mode 100644 docs/content/docs/development/event_listener.md diff --git a/docs/content/docs/development/event_listener.md b/docs/content/docs/development/event_listener.md new file mode 100644 index 000000000..b86b1ef39 --- /dev/null +++ b/docs/content/docs/development/event_listener.md @@ -0,0 +1,113 @@ +--- +title: Event Listener +weight: 8 +type: docs +--- + + +## Overview + +`EventListener` is a callback mechanism that allows you to monitor and react to events as they are processed by the agent. It is triggered at the beginning of event processing, before any actions are executed. + +Common use cases include: +- **Monitoring & Metrics**: Tracking event throughput, latency, or specific event types. +- **Logging**: Capturing event details for auditing or debugging. +- **Side Effects**: Triggering external notifications or system updates based on event reception. + +## Implementation + +You can implement `EventListener` in either Java or Python. + +{{< tabs "EventListener Implementation" >}} + +{{< tab "Python" >}} +In Python, inherit from the `EventListener` class and implement the `on_event_processed` method. + +```python +from flink_agents.api.listener.event_listener import EventListener +from flink_agents.api.event_context import EventContext +from flink_agents.api.events.event import Event + +class MyPythonListener(EventListener): + def on_event_processed(self, context: EventContext, event: Event) -> None: + print(f"Received event: {event.get_type()} at {context.timestamp}") +``` +{{< /tab >}} + +{{< tab "Java" >}} +In Java, implement the `EventListener` interface. + +```java +import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.EventContext; +import org.apache.flink.agents.api.listener.EventListener; + +public class MyJavaListener implements EventListener { + @Override + public void onEventProcessed(EventContext context, Event event) { + System.out.println("Received event: " + event.getType() + + " at " + context.getTimestamp()); + } +} +``` +{{< /tab >}} + +{{< /tabs >}} + +## Configuration + +Register your listeners in the `AgentPlan` configuration using the `event-listeners` option. + +{{< tabs "Configuration" >}} + +{{< tab "Python" >}} + +```python +agents_env = AgentsExecutionEnvironment.get_execution_environment(env) + +# Register listeners using str(ClassName) +agents_env.get_config().set( + AgentConfigOptions.EVENT_LISTENERS, + [str(MyPythonListener)] +) +``` +{{< /tab >}} + +{{< tab "Java" >}} + +```java +AgentsExecutionEnvironment agentsEnv = AgentsExecutionEnvironment.getExecutionEnvironment(env); + +// Register listeners using fully qualified class names +agentsEnv.getConfig().set( + AgentConfigOptions.EVENT_LISTENERS, + Collections.singletonList(MyJavaListener.class.getName()) +); +``` +{{< /tab >}} + +{{< /tabs >}} + +## Best Practices & Limitations + +- **Performance**: Listeners are executed **synchronously**. Keep them lightweight and avoid long-running or blocking operations. +- **No-Arg Constructor**: Implementing classes must have a public no-argument constructor for dynamic instantiation. +- **Error Handling**: Implementations must handle their own error recovery. Any unhandled exceptions thrown by a listener will disrupt the main event processing flow and may cause the agent to fail. +- **Cross-Language Support**: If you are using the Java runtime, you can still register Python listeners. The framework will handle the Java-to-Python conversion of `Event` and `EventContext` objects. To optimize performance, this conversion happens only once per event notification, even if multiple Python listeners are registered. diff --git a/docs/content/docs/operations/configuration.md b/docs/content/docs/operations/configuration.md index ac9fab55e..66502ecec 100644 --- a/docs/content/docs/operations/configuration.md +++ b/docs/content/docs/operations/configuration.md @@ -132,7 +132,7 @@ Here is the list of all built-in core configuration options. | `eventLoggerType` | `SLF4J` | LoggerType | Which built-in event logger to use. Valid values: `SLF4J` (writes JSON through a dedicated SLF4J logger so events show up in Flink's Web UI **Logs** tab) and `FILE` (writes per-subtask `.log` files under `baseLogDir`). Setting `baseLogDir` overrides this and forces `FILE`. | | `baseLogDir` | (none) | String | Base directory for file-based event logs. If not set, uses `java.io.tmpdir/flink-agents`. Setting this value also implicitly switches `eventLoggerType` to `file`. | | `prettyPrint` | false | boolean | Whether to enable pretty-printed JSON format for event logs. When set to `true`, each event is written as formatted multi-line JSON instead of JSONL (JSON Lines) format. {{< hint info >}}Note: enabling this option makes the log file no longer valid JSONL format. {{< /hint >}} | -| `event-listeners` | none | `List` | The list of event listener class names. Each class must implement the EventListener interface and provide a public no-argument constructor. {{< hint warning >}} Note: Currently, custom event listeners are only supported in Java. {{< /hint >}} | +| `event-listeners` | none | `List` | The list of event listener class names. Each class must implement the EventListener interface and provide a public no-argument constructor. | | `error-handling-strategy` | ErrorHandlingStrategy.FAIL | ErrorHandlingStrategy | Strategy for handling errors during model requests, include timeout and unexpected output schema.
The option value could be:

  • `ErrorHandlingStrategy.FAIL`
  • `ErrorHandlingStrategy.RETRY`
  • `ErrorHandlingStrategy.IGNORE`
  • | | `max-retries` | 3 | int | Number of retries when using `ErrorHandlingStrategy.RETRY`. | | `retry-wait-interval` | 1 | int | Base wait interval in seconds between retries when using `ErrorHandlingStrategy.RETRY`. Uses exponential backoff: the actual wait time for the Nth retry is `retry-wait-interval * 2^(N-1)` seconds. For example, with default 1s, waits are 1s, 2s, 4s, etc. Retry count and total wait time are reported in `ChatResponseEvent` and recorded as metrics (`retryCount`, `retryWaitSec`) under the connection name. | From 680c8663bae63f55bd2bcaa6fbd57409c369fc80 Mon Sep 17 00:00:00 2001 From: twosom Date: Mon, 18 May 2026 23:55:53 +0900 Subject: [PATCH 13/13] chore : add license header --- python/flink_agents/api/listener/__init__.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/python/flink_agents/api/listener/__init__.py b/python/flink_agents/api/listener/__init__.py index e69de29bb..e154fadd3 100644 --- a/python/flink_agents/api/listener/__init__.py +++ b/python/flink_agents/api/listener/__init__.py @@ -0,0 +1,17 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#################################################################################