Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.agents.api.resource.python;

import org.apache.flink.agents.api.EventContext;
import org.apache.flink.agents.api.chat.messages.ChatMessage;
import org.apache.flink.agents.api.tools.Tool;
import org.apache.flink.agents.api.vectorstores.Document;
Expand Down Expand Up @@ -160,4 +161,20 @@ public interface PythonResourceAdapter {
* @return the raw return value from the Python callable
*/
Object invokePythonTool(String module, String qualName, Map<String, Object> kwargs);

/**
* Converts a Java {@link EventContext} object to its Python equivalent.
*
* @param context the Java event context to convert
* @return the Python representation of the event context
*/
Object toPythonEventContext(EventContext context);

/**
* Initializes a Python event listener instance from the specified descriptor.
*
* @param target the listener descriptor in "module:class" format
* @return the initialized Python listener object
*/
Object initPythonEventListener(String target);
}
113 changes: 113 additions & 0 deletions docs/content/docs/development/event_listener.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
---
title: Event Listener
weight: 8
type: docs
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

## Overview

`EventListener` is a callback mechanism that allows you to monitor and react to events as they are processed by the agent. It is triggered at the beginning of event processing, before any actions are executed.

Common use cases include:
- **Monitoring & Metrics**: Tracking event throughput, latency, or specific event types.
- **Logging**: Capturing event details for auditing or debugging.
- **Side Effects**: Triggering external notifications or system updates based on event reception.

## Implementation

You can implement `EventListener` in either Java or Python.

{{< tabs "EventListener Implementation" >}}

{{< tab "Python" >}}
In Python, inherit from the `EventListener` class and implement the `on_event_processed` method.

```python
from flink_agents.api.listener.event_listener import EventListener
from flink_agents.api.event_context import EventContext
from flink_agents.api.events.event import Event

class MyPythonListener(EventListener):
def on_event_processed(self, context: EventContext, event: Event) -> None:
print(f"Received event: {event.get_type()} at {context.timestamp}")
```
{{< /tab >}}

{{< tab "Java" >}}
In Java, implement the `EventListener` interface.

```java
import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.api.EventContext;
import org.apache.flink.agents.api.listener.EventListener;

public class MyJavaListener implements EventListener {
@Override
public void onEventProcessed(EventContext context, Event event) {
System.out.println("Received event: " + event.getType()
+ " at " + context.getTimestamp());
}
}
```
{{< /tab >}}

{{< /tabs >}}

## Configuration

Register your listeners in the `AgentPlan` configuration using the `event-listeners` option.

{{< tabs "Configuration" >}}

{{< tab "Python" >}}

```python
agents_env = AgentsExecutionEnvironment.get_execution_environment(env)

# Register listeners using str(ClassName)
agents_env.get_config().set(
AgentConfigOptions.EVENT_LISTENERS,
[str(MyPythonListener)]
)
```
{{< /tab >}}

{{< tab "Java" >}}

```java
AgentsExecutionEnvironment agentsEnv = AgentsExecutionEnvironment.getExecutionEnvironment(env);

// Register listeners using fully qualified class names
agentsEnv.getConfig().set(
AgentConfigOptions.EVENT_LISTENERS,
Collections.singletonList(MyJavaListener.class.getName())
);
```
{{< /tab >}}

{{< /tabs >}}

## Best Practices & Limitations

- **Performance**: Listeners are executed **synchronously**. Keep them lightweight and avoid long-running or blocking operations.
- **No-Arg Constructor**: Implementing classes must have a public no-argument constructor for dynamic instantiation.
- **Error Handling**: Implementations must handle their own error recovery. Any unhandled exceptions thrown by a listener will disrupt the main event processing flow and may cause the agent to fail.
- **Cross-Language Support**: If you are using the Java runtime, you can still register Python listeners. The framework will handle the Java-to-Python conversion of `Event` and `EventContext` objects. To optimize performance, this conversion happens only once per event notification, even if multiple Python listeners are registered.
2 changes: 1 addition & 1 deletion docs/content/docs/operations/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ Here is the list of all built-in core configuration options.
| `eventLoggerType` | `SLF4J` | LoggerType | Which built-in event logger to use. Valid values: `SLF4J` (writes JSON through a dedicated SLF4J logger so events show up in Flink's Web UI **Logs** tab) and `FILE` (writes per-subtask `.log` files under `baseLogDir`). Setting `baseLogDir` overrides this and forces `FILE`. |
| `baseLogDir` | (none) | String | Base directory for file-based event logs. If not set, uses `java.io.tmpdir/flink-agents`. Setting this value also implicitly switches `eventLoggerType` to `file`. |
| `prettyPrint` | false | boolean | Whether to enable pretty-printed JSON format for event logs. When set to `true`, each event is written as formatted multi-line JSON instead of JSONL (JSON Lines) format. {{< hint info >}}Note: enabling this option makes the log file no longer valid JSONL format. {{< /hint >}} |
| `event-listeners` | none | `List<String>` | The list of event listener class names. Each class must implement the EventListener interface and provide a public no-argument constructor. {{< hint warning >}} Note: Currently, custom event listeners are only supported in Java. {{< /hint >}} |
| `event-listeners` | none | `List<String>` | The list of event listener class names. Each class must implement the EventListener interface and provide a public no-argument constructor. |
| `error-handling-strategy` | ErrorHandlingStrategy.FAIL | ErrorHandlingStrategy | Strategy for handling errors during model requests, include timeout and unexpected output schema. <br/>The option value could be:<br/> <ul><li>`ErrorHandlingStrategy.FAIL`</li> <li>`ErrorHandlingStrategy.RETRY`</li> <li>`ErrorHandlingStrategy.IGNORE`</li> |
| `max-retries` | 3 | int | Number of retries when using `ErrorHandlingStrategy.RETRY`. |
| `retry-wait-interval` | 1 | int | Base wait interval in seconds between retries when using `ErrorHandlingStrategy.RETRY`. Uses exponential backoff: the actual wait time for the Nth retry is `retry-wait-interval * 2^(N-1)` seconds. For example, with default 1s, waits are 1s, 2s, 4s, etc. Retry count and total wait time are reported in `ChatResponseEvent` and recorded as metrics (`retryCount`, `retryWaitSec`) under the connection name. |
Expand Down
21 changes: 21 additions & 0 deletions plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,27 @@ public Map<String, Object> getConfigData() {
return config.getConfData();
}

public boolean containsPythonAction() {
return this.actions.values().stream()
.anyMatch(action -> action.getExec() instanceof PythonFunction);
}

public boolean containsJavaAction() {
return this.actions.values().stream()
.anyMatch(action -> action.getExec() instanceof JavaFunction);
}

public boolean containsPythonResource() {
return this.resourceProviders.values().stream()
.anyMatch(
resourceProviderMap ->
resourceProviderMap.values().stream()
.anyMatch(
resourceProvider ->
resourceProvider
instanceof PythonResourceProvider));
}

private void writeObject(ObjectOutputStream out) throws IOException {
String serializedStr = new ObjectMapper().writeValueAsString(this);
out.writeUTF(serializedStr);
Expand Down
6 changes: 6 additions & 0 deletions python/flink_agents/api/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ class AgentConfigOptions(metaclass=AgentConfigOptionsMeta):
default=5,
)

EVENT_LISTENERS = ConfigOption(
key="event-listeners",
config_type=list[str],
default=None
)


class AgentExecutionOptions:
"""Execution options for Flink Agents."""
Expand Down
35 changes: 35 additions & 0 deletions python/flink_agents/api/event_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
from pydantic import BaseModel


class EventContext(BaseModel):
"""Contextual information about an event, such as its type and timestamp.

Attributes:
----------
eventType : str
The routing key for the event, matching the ``EVENT_TYPE`` constant or
type string.
timestamp : str
Timestamp of when the event occurred.
"""

eventType: str

timestamp: str
17 changes: 17 additions & 0 deletions python/flink_agents/api/listener/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
121 changes: 121 additions & 0 deletions python/flink_agents/api/listener/event_listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
import inspect
from abc import ABC, ABCMeta, abstractmethod

from flink_agents.api.event_context import EventContext
from flink_agents.api.events.event import Event


class EventListenerMeta(ABCMeta):
"""Metaclass for EventListener that provides a specialized string representation.

This metaclass overrides the ``__str__`` method for classes that implement
``EventListener``. The resulting string format is
``module:class_path.on_event_processed``, which is specifically designed to be
parsed by the agent's runtime (e.g., in Java via Pemja) to dynamically
instantiate the listener class.

The string representation handles:

- Standard module-level classes.
- Nested classes (using their full qualified name).
- Classes defined in the ``__main__`` module (attempting to resolve the actual
filename if available).
- Validation to ensure classes are not defined in a local scope (which would
make them inaccessible for remote/dynamic instantiation).
"""
def __str__(cls) -> str:
"""Return a string representation of the listener class for dynamic
instantiation.

The format is ``module:class_path.on_event_processed``.
Example: ``my_module:MyListener.on_event_processed`` or
``my_module:Outer.Inner.on_event_processed``.

Returns:
-------
str
A string identifier for the class and its handler method.

Raises:
------
ValueError
If the class is defined within a local scope.
"""
class_qualname = cls.__qualname__

if "<locals>" in class_qualname:
err_msg = (
f"Cannot instantiate local class in '{class_qualname}'. "
f"Classes defined within a local scope (indicated by '<locals>') "
f"are not accessible via module attributes. Move the class to the module level."
)
raise ValueError(err_msg)

module_obj = inspect.getmodule(cls)
if module_obj is None:
module_name = cls.__module__
else:
module_name = module_obj.__name__

if module_name == "__main__":
if hasattr(module_obj, "__file__"):
from pathlib import Path
file_path = Path(module_obj.__file__)
module_name = file_path.stem

return f"{module_name}:{class_qualname}.on_event_processed"


class EventListener(ABC, metaclass=EventListenerMeta):
"""Interface for event listeners that are notified when events are received
for processing.

EventListener provides a callback mechanism triggered at the beginning of
event processing. This is useful for monitoring, metrics collection,
debugging, or triggering side effects based on event reception.

Event listeners are executed synchronously when an event is received,
before any actions are triggered. Implementations should be lightweight
and avoid blocking operations to prevent impacting agent performance.

**Note:** Implementing classes must provide a public no-argument constructor to
allow for dynamic instantiation by the agent.
"""

@abstractmethod
def on_event_processed(self, context: EventContext, event: Event) -> None:
"""Called when an event is being processed.

This method is invoked when an event is received by the agent, before
it is processed by any actions. The listener can inspect the event and
its context to perform additional processing such as logging, metrics
collection, or triggering external notifications.

**Important:** This method should not throw exceptions as they will be
caught and logged but will not affect the main event processing flow.
Implementations should handle their own error recovery.

Parameters:
----------
context : EventContext
The context associated with the event
event : Event
The event that is being processed
"""
Loading
Loading