diff --git a/docs/content/docs/development/chat_models.md b/docs/content/docs/development/chat_models.md index 99ac9d7e5..aef8983d8 100644 --- a/docs/content/docs/development/chat_models.md +++ b/docs/content/docs/development/chat_models.md @@ -1,6 +1,6 @@ --- title: Chat Models -weight: 3 +weight: 4 type: docs --- + +## Overview + +Beyond the Python and Java programmatic APIs, Flink Agents offers a **declarative YAML API** for describing agents. Compared with the programmatic APIs, the YAML API has the following advantages: + +- **Human-friendly**: low entry barrier and easy to templatize across deployments. +- **Coding-agent-friendly**: a fixed schema reduces token cost, enables strict schema validation, and decouples configuration changes (declared parameters) from logic changes (action code). + +A Flink Agents application is composed of three concepts; the YAML API exposes all three as declarative sections: + +- **Events**: the messages flowing inside an agent. Every event has a type used for routing. +- **Actions**: code snippets triggered by events and emitting new events. They carry the agent's business logic. +- **Resources**: reusable components — Chat Models, Tools, Prompts, Vector Stores, MCP servers, etc. — referenced by actions at runtime. + +A YAML file describes **Resources** and **Actions** declaratively; the **Event** types are referenced by name (built-in event aliases like `input` / `chat_response`, or your own event-type strings). The implementation classes (action functions, tool functions, custom types) still live in your Python or Java code; the YAML file only describes *how* those pieces are wired together. + +If you'd rather declare agents directly in code, see [Workflow Agent]({{< ref "docs/development/workflow_agent" >}}) and [ReAct Agent]({{< ref "docs/development/react_agent" >}}). + +## Declaring an Agent + +This section walks through: + +1. Declaring a single agent in one YAML file. +2. Declaring multiple agents in one YAML file. +3. Declaring shared Resources and Actions that any agent in the file (or any later-loaded file) can reuse. + +### Declaring a single agent + +The example below declares one agent named `review_analysis_agent` with a chat-model connection, a chat-model setup, a function tool, and two actions. It demonstrates every field you typically use. + +```yaml +agents: + - name: review_analysis_agent + description: Analyze product reviews and emit a satisfaction score. + + # ---- Actions ---- + actions: + - name: process_input + function: my_pkg.actions:process_input + listen_to: [input] + type: python + - name: process_chat_response + function: my_pkg.actions:process_chat_response + listen_to: [chat_response] + type: python + + # ---- Resources ---- + chat_model_connections: + - name: ollama_server + clazz: ollama + type: python + base_url: http://localhost:11434 + + chat_model_setups: + - name: review_analysis_model + clazz: ollama + type: python + connection: ollama_server + model: qwen3:8b + prompt: review_analysis_prompt + tools: [notify_shipping_manager] + extract_reasoning: true + + prompts: + - name: review_analysis_prompt + messages: + - role: system + content: "Analyze the review and return JSON with score + reasons." + - role: user + content: "{input}" + + tools: + - name: notify_shipping_manager + function: my_pkg.actions:notify_shipping_manager + type: python +``` + +#### Agent properties + +| Field | Required | Description | +|-------|----------|-------------| +| `name` | yes | Agent name. Must be unique across the environment. Used to apply the agent by name later. | +| `description` | no | Free-form description of what the agent does. Not surfaced at runtime. | + +#### Resources + +All resource sections are declared as lists under the agent. Each entry has a `name` unique within its section. + +**Prompt** — declarative prompt template; pick exactly one of `text` or `messages`. + +| Field | Required | Description | +|-------|----------|-------------| +| `name` | yes | Prompt name (referenced by chat-model setups). | +| `text` | one-of | Single-string prompt template. Corresponds to `Prompt.from_text` / `Prompt.fromText`. | +| `messages` | one-of | Multi-turn message template. Corresponds to `Prompt.from_messages` / `Prompt.fromMessages`. Each entry has `role` (`system` / `user` / `assistant` / `tool`) and `content`. | + +```yaml +prompts: + - name: prompt1 + messages: + - {role: system, content: "..."} + - {role: user, content: "{input}"} + - name: prompt2 + text: "this is the {value}" +``` + +**Tool** — points at a callable that the chat model can invoke. + +| Field | Required | Description | +|-------|----------|-------------| +| `name` | yes | Tool name (referenced from `chat_model_setups[].tools`). | +| `function` | yes | Fully-qualified callable in the form `:`. See [Function references](#function-references). | +| `type` | no | Implementation language: `python` or `java`. Defaults to `python` (see [Selecting the implementation language](#selecting-the-implementation-language)). | +| `parameter_types` | java only | Required for Java tools — one Java type FQN per declared parameter, in order. Forbidden for Python tools (the signature is reflected from the callable). | + +```yaml +tools: + - name: my_tool + function: my_pkg.tools:my_tool + type: python +``` + +For **Java tools**, `parameter_types` is required because Java methods can be overloaded — list one FQN per declared parameter, in order. Generic type arguments are not part of the JVM method descriptor and must not be included (`java.util.List`, not `java.util.List`). Boxed primitives (`java.lang.Integer`, etc.) and bare primitives (`int`, `boolean`, ...) are both accepted. + +```yaml +tools: + - name: add + type: java + function: com.example.MyTools:add + parameter_types: [java.lang.Integer, java.lang.Integer] +``` + +**Skills** — bundles of agent skill assets loaded from one or more sources. At least one of `paths` / `urls` / `classpath` / `package` must be non-empty; multiple sources can coexist. + +| Field | Required | Description | +|-------|----------|-------------| +| `name` | yes | Skills resource name. | +| `paths` | one-of | `local` scheme: list of directories or `.zip` files. | +| `urls` | one-of | `url` scheme: list of `http(s)` URLs pointing to `.zip` archives. | +| `classpath` | one-of | `classpath` scheme (Java runtime only): list of classpath resource paths. | +| `package` | one-of | `package` scheme (Python runtime only): list of `{package, resource}` pairs. | + +```yaml +skills: + - name: agent_skills + paths: + - ./skills + urls: + - https://example.com/skills.zip + classpath: + - skills/my-skills + package: + - package: my_pkg + resource: skills/ +``` + +`classpath` is accepted by the Python parser for schema parity with Java but rejected at runtime, because the Python runtime does not register a `classpath` handler. + +**ResourceDescriptor-backed resources** — `chat_model_connections`, `chat_model_setups`, `embedding_model_connections`, `embedding_model_setups`, `vector_stores`, `mcp_servers`. Unlike the resources above (which are constructed inline by the loader), these are described by a `ResourceDescriptor` and instantiated by the framework at runtime. They all share the same shape: + +| Field | Required | Description | +|-------|----------|-------------| +| `name` | yes | Resource name (referenced by other resources or actions). | +| `clazz` | yes | Either a built-in [alias](#class-aliases) (e.g. `ollama`, `openai`) or a fully-qualified class path. | +| `type` | no | Implementation language: `python` or `java`. Defaults to `python` (see [Selecting the implementation language](#selecting-the-implementation-language)). | +| *extras* | no | Any additional keys are forwarded verbatim as `ResourceDescriptor` init arguments (e.g. `base_url`, `endpoint`, `model`, `request_timeout`). | + +```yaml +chat_model_connections: + - name: my_connection + clazz: ollama + type: python + base_url: http://localhost:11434 + +mcp_servers: + - name: my_mcp + clazz: mcp + type: python + endpoint: http://127.0.0.1:8000/mcp +``` + +#### Actions + +`actions:` is a list. Each entry is either an inline action **map** or, when reusing a shared action, a bare **string** referring to the shared action's name. + +Inline action (map) fields: + +| Field | Required | Description | +|-------|----------|-------------| +| `name` | yes | Action name (unique within the agent). | +| `function` | yes | Fully-qualified callable in the form `:`. See [Function references](#function-references). | +| `listen_to` | yes | List of event types the action listens to. Built-in [event aliases](#event-aliases) (`input`, `chat_request`, ...) or your own event-type strings. | +| `type` | no | Implementation language: `python` or `java`. Defaults to `python` (see [Selecting the implementation language](#selecting-the-implementation-language)). | +| `config` | no | Free-form configuration map passed to the action at runtime. | + +```yaml +actions: + - name: action1 + function: my_pkg.actions:action1 + listen_to: [input] + type: python + - name: action2 + function: my_pkg.actions:action2 + listen_to: [chat_response] + type: python + - action3 # shared action reference (declared at file level) +``` + +Action method signatures are fixed (`(Event, RunnerContext)`), so there is no `parameter_types` field on actions. + +### Declaring multiple agents in one file + +A single YAML file can declare more than one agent under `agents:`. Each agent is independent and gets its own resource set; agent names must be unique within the file (and across the whole environment, once loaded). + +```yaml +agents: + - name: agent1 + description: The first agent + # actions + # resources + + - name: agent2 + description: The second agent + # actions + # resources +``` + +### Declaring and reusing shared Resources and Actions + +Any resource section (`prompts`, `tools`, `chat_model_connections`, ...) or `actions:` declared as a **top-level sibling** of `agents:` is **shared**: it is registered on the `AgentsExecutionEnvironment` itself, and any agent in the file — or in any file later loaded into the same environment — can reference it by name. + +**Shared resources** are referenced inside an agent simply by name: + +```yaml +agents: + - name: agent1 + description: The first agent + chat_model_setups: + - name: my_llm + clazz: ollama + connection: my_connection # references the shared connection + thinking: false + + - name: agent2 + description: The second agent + chat_model_setups: + - name: my_llm + clazz: ollama + connection: my_connection # same shared connection reused + thinking: true + +# shared resource at the file level +chat_model_connections: + - name: my_connection + clazz: ollama + base_url: http://localhost:11434 +``` + +**Shared actions** are referenced inside an agent's `actions:` list by writing the action's name as a bare string: + +```yaml +agents: + - name: agent1 + description: The first agent + actions: + - action1 # shared action + - name: my_action + function: my_pkg.actions:my_action + listen_to: [input] + + - name: agent2 + description: The second agent + actions: + - action1 # same shared action reused + +# shared actions at the file level +actions: + - name: action1 + function: my_pkg.actions:action1 + listen_to: [input] + type: python + - name: action2 + function: my_pkg.actions:action2 + listen_to: [chat_response] + type: python +``` + +{{< hint info >}} +Composing one agent across multiple YAML files is **not** supported. If two files loaded into the same environment declare the same agent name, or the same shared resource/action name, the second `load_yaml` call raises an error. +{{< /hint >}} + +## Loading and Running + +`AgentsExecutionEnvironment` exposes a `load_yaml` / `loadYaml` method that parses one or more YAML files and: + +- registers all shared resources on the environment, and +- registers all declared agents on the environment. + +Once loaded, an agent declared in YAML is applied **by name** through the same `AgentBuilder` as a code-defined agent — `.apply(...)` accepts either an `Agent` instance or the name of an agent previously registered on the environment. + +{{< tabs "Load and Apply" >}} + +{{< tab "Python" >}} +```python +agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env) +agents_env.load_yaml("path/to/agents.yaml") + +review_analysis_res_stream = ( + agents_env.from_datastream( + input=product_review_stream, key_selector=lambda x: x.id + ) + .apply("review_analysis_agent") # look up the agent by name + .to_datastream() +) +``` +{{< /tab >}} + +{{< tab "Java" >}} +```java +AgentsExecutionEnvironment agentsEnv = + AgentsExecutionEnvironment.getExecutionEnvironment(env); +agentsEnv.loadYaml(Paths.get("path/to/agents.yaml")); + +DataStream outputStream = + agentsEnv + .fromDataStream(inputStream, (KeySelector) MyPojo::getId) + .apply("review_analysis_agent") // look up the agent by name + .toDataStream(); +``` +{{< /tab >}} + +{{< /tabs >}} + +`load_yaml` / `loadYaml` accepts either a single path or a list of paths, and **can be called multiple times**. Multiple calls accumulate on the same environment; the same uniqueness rules apply across the combined state. Duplicate agent or resource names — within a file or across files — raise an error. + +{{< tabs "Multi-File Loading" >}} + +{{< tab "Python" >}} +```python +# Load multiple files in one call +agents_env.load_yaml(["./agents.yaml", "./shared.yaml"]) + +# Or call repeatedly — same result +agents_env.load_yaml("./agents.yaml") +agents_env.load_yaml("./shared.yaml") +``` +{{< /tab >}} + +{{< tab "Java" >}} +```java +// Load multiple files in one call +agentsEnv.loadYaml(Paths.get("./agents.yaml"), Paths.get("./shared.yaml")); + +// Or call repeatedly — same result +agentsEnv.loadYaml(Paths.get("./agents.yaml")); +agentsEnv.loadYaml(Paths.get("./shared.yaml")); +``` +{{< /tab >}} + +{{< /tabs >}} + +A common pattern is to split a topology file (the agents themselves) from an infrastructure file (chat-model connections, vector stores, ...). The infrastructure file can be swapped per environment (dev / staging / prod) without touching the agent definitions. + +For an end-to-end runnable walkthrough that loads a YAML-declared agent and runs it on Flink, see [YAML Agent Quickstart]({{< ref "docs/get-started/quickstart/yaml_agent" >}}). + +## Advanced Topics + +### Function references + +Actions and function tools point at user code through a single `function:` string in the form: + +``` +: +``` + +The colon separates the **left side** — a Python module or a Java class FQN — from the **right side** — the attribute path inside it. + +{{< tabs "Function Reference Examples" >}} + +{{< tab "Python" >}} +```yaml +# Top-level function in a module +function: my_pkg.actions:process_input + +# Static method on a class (nested via dots in the qualname) +function: my_pkg.agents:ReviewAnalysisAgent.process_input +``` +{{< /tab >}} + +{{< tab "Java" >}} +```yaml +# Static method on a class +function: com.example.MyActions:processInput + +# Static method on a nested/inner class +function: com.example.Outer$Inner:processInput +``` +{{< /tab >}} + +{{< /tabs >}} + +### Selecting the implementation language + +Every resource, tool, and action accepts an optional `type:` field with values `python` or `java`. When omitted, the default is **`python`** — both the Python loader and the Java loader treat a missing `type:` as `python`. + +This means a YAML file loaded by the **Java loader** must set `type: java` explicitly on every resource, tool, and action whose implementation is in Java. Omitting `type:` on a Java resource in a Java-loaded YAML will make the loader try to build a Python-wrapping descriptor — usually not what you want. + +When `type:` resolves to the **opposite** language of the loader, the loader builds a cross-language descriptor that delegates to the other-language implementation — see [Cross-language agents](#cross-language-agents). + +### Provider aliases + +For `clazz:` on resource descriptors and for event names in `listen_to:`, you can use a short alias instead of a fully-qualified class path. + +#### Event aliases + +| Alias | Event type | +| ------------------------------ | -------------------------------- | +| `input` | `InputEvent` | +| `output` | `OutputEvent` | +| `chat_request` | `ChatRequestEvent` | +| `chat_response` | `ChatResponseEvent` | +| `tool_request` | `ToolRequestEvent` | +| `tool_response` | `ToolResponseEvent` | +| `context_retrieval_request` | `ContextRetrievalRequestEvent` | +| `context_retrieval_response` | `ContextRetrievalResponseEvent` | + +For custom event types defined in your code, write the event's full `EVENT_TYPE` string instead of an alias. + +#### Class aliases + +Aliases for `clazz:` are keyed on resource type **and** implementation language. The same alias (e.g. `ollama`) resolves to a different class for `chat_model_connections` vs `chat_model_setups`, and for `python` vs `java`. + +Common chat-model aliases: + +| Alias | `type: python` | `type: java` | +| -------------------- | --------------------------- | --------------------------- | +| `ollama` | Ollama (Python) | Ollama (Java) | +| `openai` | OpenAI Completions (Python) | — | +| `openai_completions` | — | OpenAI Completions (Java) | +| `openai_responses` | — | OpenAI Responses (Java) | +| `anthropic` | Anthropic | Anthropic | +| `azure_openai` | Azure OpenAI (Python) | — | +| `azure` | — | Azure OpenAI (Java) | +| `tongyi` | Tongyi (Python) | — | + +Embedding-model and vector-store aliases follow the same scheme. The full alias tables live in `flink_agents.api.yaml.aliases` (Python) and `org.apache.flink.agents.api.yaml.Aliases` (Java). + +If `clazz:` is not a known alias, the loader passes it through as-is — write a fully-qualified class path (e.g. `flink_agents.integrations.chat_models.ollama_chat_model.OllamaChatModelSetup`) for providers you've added yourself. + +### Cross-language agents + +Setting `type:` to the **opposite** language of the loader bridges Python and Java pieces in the same agent. The loader resolves the alias against the cross-language bucket and wraps the resource in the appropriate cross-language proxy. + +Example — a Python-side agent whose chat model uses a **Java** Ollama setup that also calls a **Java** function tool: + +```yaml +agents: + - name: cross_language_agent + actions: + - name: process_input + function: my_pkg.actions:process_input + listen_to: [input] + - name: process_chat_response + function: my_pkg.actions:process_chat_response + listen_to: [chat_response] + + chat_model_connections: + # Python Ollama connection used by the math chat model + - name: ollama_connection + clazz: ollama + request_timeout: 240.0 + # Java Ollama connection used by the creative chat model + - name: ollama_connection_java + clazz: ollama + type: java + endpoint: http://localhost:11434 + requestTimeout: 240 + + chat_model_setups: + - name: math_chat_model + clazz: ollama + connection: ollama_connection + model: qwen3:1.7b + tools: [calculate_bmi] # python -> java tool via the bridge + + - name: creative_chat_model + clazz: ollama + type: java + connection: ollama_connection_java + model: qwen3:1.7b + + tools: + - name: calculate_bmi + type: java + function: com.example.HealthTools:calculateBMI + parameter_types: [java.lang.Double, java.lang.Double] +``` + +Loaded with `agents_env.load_yaml(...)` on the Python side, this produces an agent where: + +- The Python `process_input` and `process_chat_response` actions are Python functions. +- `math_chat_model` is a Python Ollama setup that calls a Java function tool through the cross-language tool bridge. +- `creative_chat_model` is a Java Ollama setup driven from the Python loader via the Java chat-model wrapper. + +Not every resource type is cross-language. Currently `chat_model_connections`, `chat_model_setups`, `embedding_model_connections`, `embedding_model_setups`, and `vector_stores` support `type:` on the opposite language; others (e.g. `mcp_servers`) do not. + +## YAML API Specification + +To help users and coding agents understand the YAML format and validate YAML files, Flink Agents publishes a language-neutral **JSON Schema** for the YAML document. The schema is checked in at [`docs/yaml-schema.json`](https://github.com/apache/flink-agents/blob/main/docs/yaml-schema.json) — point your IDE's YAML language server at it for inline validation and autocompletion. + +The schema benefits both humans and machines: + +- It defines an interface contract, so LLMs and external systems can interact with Flink Agents declaratively. +- It is language-neutral, so it doubles as the compatibility contract between the Python and the Java loader. + +### Python and Java mapping + +When loading a YAML file, both runtimes parse it into typed in-memory representations and validate it against them using the framework's native validation: + +- The **Python loader** parses YAML into Pydantic `BaseModel`s declared in `flink_agents.api.yaml.specs`. Pydantic enforces the schema (required fields, value types, `extra="forbid"` on unknown keys, mutually-exclusive `text`/`messages` on prompts, ...). +- The **Java loader** parses YAML into POJOs declared in `org.apache.flink.agents.api.yaml.spec`, validated by Jackson with equivalent rules. + +### Consistency guarantees + +Because Pydantic models are easier to author and evolve than raw JSON Schema, the **ground truth** is the Pydantic spec — the checked-in `docs/yaml-schema.json` is exported from it. Continuous tests then verify cross-runtime consistency: + +- the JSON Schema exported by the Pydantic specs matches the checked-in `docs/yaml-schema.json`; +- the JSON Schema exported by the Java POJOs matches the checked-in `docs/yaml-schema.json`; +- the Pydantic specs stay aligned with the Python `Agent` API; +- the Java POJOs stay aligned with the Java `Agent` API. + +This keeps the YAML API a true cross-language contract: a YAML file that validates against the schema is guaranteed to load and run on either the Python or the Java loader (subject only to the language-specific differences documented above, such as `parameter_types` on Java tools). diff --git a/docs/content/docs/get-started/overview.md b/docs/content/docs/get-started/overview.md index fb31c74fa..4172d14e9 100644 --- a/docs/content/docs/get-started/overview.md +++ b/docs/content/docs/get-started/overview.md @@ -45,3 +45,4 @@ To get started with Apache Flink Agents, you can checkout the following quicksta - [Workflow Agent Quickstart]({{< ref "docs/get-started/quickstart/workflow_agent" >}}) - [ReAct Agent Quickstart]({{< ref "docs/get-started/quickstart/react_agent" >}}) +- [YAML Agent Quickstart]({{< ref "docs/get-started/quickstart/yaml_agent" >}}) diff --git a/docs/content/docs/get-started/quickstart/yaml_agent.md b/docs/content/docs/get-started/quickstart/yaml_agent.md new file mode 100644 index 000000000..233d71286 --- /dev/null +++ b/docs/content/docs/get-started/quickstart/yaml_agent.md @@ -0,0 +1,375 @@ +--- +title: 'YAML Agent' +weight: 3 +type: docs +--- + + +## Overview + +The YAML API lets you declare a Flink Agents application — the agent's actions, tools, prompts, chat models, and other resources — as a single declarative file, and load it into an `AgentsExecutionEnvironment` with one call. Your action and tool implementations still live in Python or Java; the YAML file only describes how those pieces are wired together. + +This quickstart runs the same **Review Analysis** workflow as the [Workflow Agent Quickstart]({{< ref "docs/get-started/quickstart/workflow_agent" >}}) — extracting a satisfaction score and dissatisfaction reasons from a stream of product reviews — but declares the agent in YAML and loads it with `load_yaml` / `loadYaml`. The action functions and the `notify_shipping_manager` tool are reused from the workflow-agent example as static methods, so this quickstart shows the **smallest possible delta** between a code-defined agent and a YAML-declared agent. + +For the full reference of the YAML format, see the [YAML API]({{< ref "docs/development/yaml" >}}) documentation. + +## Code Walkthrough + +### Declare the Agent in YAML + +The whole agent — chat model, prompt, tool, and the two actions — is declared in `yaml_review_analysis_agent.yaml`. The `function:` strings point at static methods on the existing `ReviewAnalysisAgent` class from the workflow-agent quickstart. + +{{< tabs "Declare Agent YAML" >}} + +{{< tab "Python" >}} +```yaml +agents: + - name: review_analysis_agent + description: | + YAML-declared review analysis agent. Reuses the static methods of + ReviewAnalysisAgent from the workflow_agent quickstart as actions + and tool, but wires everything together through this YAML file + instead of class-level decorators. + + actions: + - name: process_input + function: flink_agents.examples.quickstart.agents.review_analysis_agent:ReviewAnalysisAgent.process_input + listen_to: [input] + - name: process_chat_response + function: flink_agents.examples.quickstart.agents.review_analysis_agent:ReviewAnalysisAgent.process_chat_response + listen_to: [chat_response] + + chat_model_connections: + - name: ollama_server + clazz: ollama + request_timeout: 120 + + prompts: + - name: review_analysis_prompt + messages: + - role: system + content: | + Analyze the user review and product information to determine a + satisfaction score (1-5) and potential reasons for dissatisfaction. + + Example input format: + { + "id": "12345", + "review": "The headphones broke after one week. Very poor quality." + } + + Ensure your response can be parsed by Python JSON, using this format + as an example: + { + "id": "12345", + "score": 1, + "reasons": ["poor quality"] + } + + If the review mentions shipping dissatisfaction, first call + notify_shipping_manager, then return the JSON above with no + mention of the tool call. + - role: user + content: | + "input": + {input} + + chat_model_setups: + - name: review_analysis_model + clazz: ollama + connection: ollama_server + model: qwen3:8b + prompt: review_analysis_prompt + tools: [notify_shipping_manager] + extract_reasoning: true + + tools: + - name: notify_shipping_manager + function: flink_agents.examples.quickstart.agents.review_analysis_agent:ReviewAnalysisAgent.notify_shipping_manager +``` +{{< /tab >}} + +{{< tab "Java" >}} +```yaml +agents: + - name: review_analysis_agent + description: | + YAML-declared review analysis agent. Reuses the static methods of + ReviewAnalysisAgent from the workflow_agent quickstart as actions + and tool, but wires everything together through this YAML file + instead of class-level annotations. + + actions: + - name: processInput + type: java + function: org.apache.flink.agents.examples.agents.ReviewAnalysisAgent:processInput + listen_to: [input] + - name: processChatResponse + type: java + function: org.apache.flink.agents.examples.agents.ReviewAnalysisAgent:processChatResponse + listen_to: [chat_response] + + chat_model_connections: + - name: ollama_server + clazz: ollama + type: java + endpoint: http://localhost:11434 + requestTimeout: 120 + + prompts: + - name: review_analysis_prompt + messages: + - role: system + content: | + Analyze the user review and product information to determine a + satisfaction score (1-5) and potential reasons for dissatisfaction. + + Example input format: + { + "id": "12345", + "review": "The headphones broke after one week. Very poor quality." + } + + Ensure your response can be parsed by Java JSON, using this format + as an example: + { + "id": "12345", + "score": 1, + "reasons": ["poor quality"] + } + + If the review mentions shipping dissatisfaction, first call + notifyShippingManager, then return the JSON above with no + mention of the tool call. + - role: user + content: | + "input": + {input} + + # Action ``processInput`` sends ``ChatRequestEvent("reviewAnalysisModel", ...)``, + # so the chat-model setup MUST be named ``reviewAnalysisModel``. + chat_model_setups: + - name: reviewAnalysisModel + clazz: ollama + type: java + connection: ollama_server + model: qwen3:8b + prompt: review_analysis_prompt + tools: [notifyShippingManager] + extract_reasoning: true + + tools: + - name: notifyShippingManager + type: java + function: org.apache.flink.agents.examples.agents.ReviewAnalysisAgent:notifyShippingManager + parameter_types: [java.lang.String, java.lang.String] +``` +{{< /tab >}} + +{{< /tabs >}} + +A few things to notice in the YAML above: + +- `clazz: ollama` is an alias resolved by the loader to the full Ollama chat-model class — see the alias table in the [YAML API]({{< ref "docs/development/yaml#class-aliases" >}}) doc. +- `listen_to: [input]` / `[chat_response]` use **event aliases** for the framework's built-in events. +- `function:` strings use the `:` format. The right side is the class-qualified method name, so the YAML reuses the same `process_input` / `processInput` static methods the original `ReviewAnalysisAgent` already defines. +- The prompt is declared inline as a `messages:` list and referenced from the chat-model setup by name. + +### Load and Run + +In the entry-point script, build the Flink stream as usual, then load the YAML and apply the agent **by name**. + +{{< tabs "Load and Run" >}} + +{{< tab "Python" >}} +```python +# Set up the Flink streaming environment and the Agents execution environment. +env = StreamExecutionEnvironment.get_execution_environment() +agents_env = AgentsExecutionEnvironment.get_execution_environment(env) + +# limit async request to avoid overwhelming ollama server +agents_env.get_config().set(AgentExecutionOptions.NUM_ASYNC_THREADS, 2) + +# Load the YAML — agents and shared resources are registered on the env. +agents_env.load_yaml(current_dir / "yaml_review_analysis_agent.yaml") + +# Read product reviews from a text file as a streaming source. +product_review_stream = env.from_source( + source=FileSource.for_record_stream_format( + StreamFormat.text_line_format(), f"file:///{current_dir}/resources" + ) + .monitor_continuously(Duration.of_minutes(1)) + .build(), + watermark_strategy=WatermarkStrategy.no_watermarks(), + source_name="yaml_review_analysis_example", +).map(lambda x: ProductReview.model_validate_json(x)) + +# Apply the YAML-declared agent BY NAME. +review_analysis_res_stream = ( + agents_env.from_datastream( + input=product_review_stream, key_selector=lambda x: x.id + ) + .apply("review_analysis_agent") + .to_datastream() +) + +review_analysis_res_stream.print() +agents_env.execute() +``` +{{< /tab >}} + +{{< tab "Java" >}} +```java +// Set up the Flink streaming environment and the Agents execution environment. +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setParallelism(1); +AgentsExecutionEnvironment agentsEnv = + AgentsExecutionEnvironment.getExecutionEnvironment(env); + +// limit async request to avoid overwhelming ollama server +agentsEnv.getConfig().set(AgentExecutionOptions.NUM_ASYNC_THREADS, 2); + +// Load the YAML — agents and shared resources are registered on the env. +Path yamlPath = copyResource("yaml/yaml_review_analysis_agent.yaml").toPath(); +agentsEnv.loadYaml(yamlPath); + +// Read product reviews from input_data.txt file as a streaming source. +File inputDataFile = copyResource("input_data.txt"); +DataStream productReviewStream = + env.fromSource( + FileSource.forRecordStreamFormat( + new TextLineInputFormat(), + new Path(inputDataFile.getAbsolutePath())) + .build(), + WatermarkStrategy.noWatermarks(), + "yaml-review-analysis-example"); + +// Apply the YAML-declared agent BY NAME. +DataStream reviewAnalysisResStream = + agentsEnv + .fromDataStream(productReviewStream) + .apply("review_analysis_agent") + .toDataStream(); + +reviewAnalysisResStream.print(); +agentsEnv.execute(); +``` +{{< /tab >}} + +{{< /tabs >}} + +The key difference from the code-defined workflow agent quickstart is the pair of calls: + +1. `agents_env.load_yaml(...)` / `agentsEnv.loadYaml(...)` — parses the YAML and registers the declared agent(s) and shared resources on the environment. +2. `.apply("review_analysis_agent")` — looks the agent up **by name** instead of passing an `Agent` instance. + +Everything else — the Flink source, the key selector, the sink — is identical. + +## Run the Example + +### Prerequisites + +* Unix-like environment (we use Linux, Mac OS X, Cygwin, WSL) +* Git +* Java 11+ +* Python 3.10, 3.11 or 3.12 + +### Preparation + +#### Prepare Flink and Flink Agents + +Follow the [installation]({{< ref "docs/get-started/installation" >}}) instructions to set up Flink and Flink Agents. + +#### Clone the Flink Agents Repository (if not done already) + +```bash +git clone https://github.com/apache/flink-agents.git +cd flink-agents +``` + +{{< hint info >}} +For python examples, you can skip this step and submit the python file in installed flink-agents wheel. +{{< /hint >}} + +#### Deploy a Standalone Flink Cluster + +You can deploy a standalone Flink cluster in your local environment with the following command. + +{{< tabs "Deploy a Standalone Flink Cluster" >}} + +{{< tab "Python" >}} +```bash +export PYTHONPATH=$(python -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])') +$FLINK_HOME/bin/start-cluster.sh +``` +{{< /tab >}} + +{{< tab "Java" >}} +1. Build Flink Agents from source to generate example jar. See [installation]({{< ref "docs/get-started/installation" >}}) for more details. +2. Start the Flink cluster + ```bash + $FLINK_HOME/bin/start-cluster.sh + ``` + +{{< hint info >}} +To run example on JDK 21+, append jvm option `--add-exports=java.base/jdk.internal.vm=ALL-UNNAMED` to [env.java.opts.all](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/config/#env-java-opts-all) in `$FLINK_HOME/conf/config.yaml` before start the flink cluster. +{{< /hint >}} +{{< /tab >}} + +{{< /tabs >}} + +#### Prepare Ollama + +Download and install Ollama from the official [website](https://ollama.com/download). + +{{< hint info >}} +Ollama server **0.9.0** or higher is required. +{{< /hint >}} + +Then pull the `qwen3:8b` model: + +```bash +ollama pull qwen3:8b +``` + +### Submit Flink Agents Job to Standalone Flink Cluster + +{{< tabs "Submit YAML Example" >}} + +{{< tab "Python" >}} +```bash +export PYTHONPATH=$(python -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])') + +# Run the YAML-declared review analysis example +$FLINK_HOME/bin/flink run -py ./flink-agents/python/flink_agents/examples/quickstart/yaml_workflow_agent_example.py +# or submit the example python file in installed flink-agents wheel +$FLINK_HOME/bin/flink run -py $PYTHONPATH/flink_agents/examples/quickstart/yaml_workflow_agent_example.py +``` +{{< /tab >}} + +{{< tab "Java" >}} +```bash +$FLINK_HOME/bin/flink run -c org.apache.flink.agents.examples.YamlWorkflowAgentExample ./flink-agents/examples/target/flink-agents-examples-$VERSION.jar +``` +{{< /tab >}} + +{{< /tabs >}} + +You should see a Flink job submitted to the Flink Cluster in the Flink web UI at [localhost:8081](localhost:8081). After a few minutes, the analysis results — one JSON record per input review — appear in the TaskManager output log. diff --git a/examples/src/main/java/org/apache/flink/agents/examples/YamlWorkflowAgentExample.java b/examples/src/main/java/org/apache/flink/agents/examples/YamlWorkflowAgentExample.java new file mode 100644 index 000000000..60cfbb256 --- /dev/null +++ b/examples/src/main/java/org/apache/flink/agents/examples/YamlWorkflowAgentExample.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.examples; + +import org.apache.flink.agents.api.AgentsExecutionEnvironment; +import org.apache.flink.agents.api.agents.AgentExecutionOptions; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.connector.file.src.FileSource; +import org.apache.flink.connector.file.src.reader.TextLineInputFormat; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.io.File; + +/** + * Java example demonstrating a YAML-declared workflow agent. + * + *

The agent (chat model, prompt, tool, and actions) is declared in {@code + * yaml_review_analysis_agent.yaml} on the classpath. The YAML file's {@code function:} fields point + * at the static methods of {@link org.apache.flink.agents.examples.agents.ReviewAnalysisAgent} + * already defined for the code-only quickstart, so this example demonstrates the minimal delta + * between a code-defined and a YAML-declared agent. + * + *

Pipeline: + * + *

{@code
+ * FileSource (input_data.txt) -> AgentsEnv.apply("review_analysis_agent") -> print
+ * }
+ */ +public class YamlWorkflowAgentExample { + + /** Runs the example pipeline. */ + public static void main(String[] args) throws Exception { + // Set up the Flink streaming environment and the Agents execution environment. + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + AgentsExecutionEnvironment agentsEnv = + AgentsExecutionEnvironment.getExecutionEnvironment(env); + + // limit async request to avoid overwhelming ollama server + agentsEnv.getConfig().set(AgentExecutionOptions.NUM_ASYNC_THREADS, 2); + + // Load the YAML — the declared agent and its chat-model connection, + // chat-model setup, prompt, tool, and actions are all registered on + // the environment in this single call. + File yamlFile = + WorkflowSingleAgentExample.copyResource("yaml/yaml_review_analysis_agent.yaml"); + agentsEnv.loadYaml(yamlFile.toPath()); + + // Read product reviews from input_data.txt file as a streaming source. + // Each element represents a ProductReview. + File inputDataFile = WorkflowSingleAgentExample.copyResource("input_data.txt"); + DataStream productReviewStream = + env.fromSource( + FileSource.forRecordStreamFormat( + new TextLineInputFormat(), + new Path(inputDataFile.getAbsolutePath())) + .build(), + WatermarkStrategy.noWatermarks(), + "yaml-review-analysis-example"); + + // Apply the YAML-declared agent BY NAME — ``apply`` accepts either an + // Agent instance or the name of an agent registered on the environment. + DataStream reviewAnalysisResStream = + agentsEnv + .fromDataStream(productReviewStream) + .apply("review_analysis_agent") + .toDataStream(); + + // Print the analysis results to stdout. + reviewAnalysisResStream.print(); + + // Execute the Flink pipeline. + agentsEnv.execute(); + } +} diff --git a/examples/src/main/resources/yaml/yaml_review_analysis_agent.yaml b/examples/src/main/resources/yaml/yaml_review_analysis_agent.yaml new file mode 100644 index 000000000..723c1df98 --- /dev/null +++ b/examples/src/main/resources/yaml/yaml_review_analysis_agent.yaml @@ -0,0 +1,72 @@ +agents: + - name: review_analysis_agent + description: | + YAML-declared review analysis agent. Reuses the static methods of + ReviewAnalysisAgent from the workflow_agent quickstart as actions + and tool, but wires everything together through this YAML file + instead of class-level annotations. + + actions: + - name: processInput + type: java + function: org.apache.flink.agents.examples.agents.ReviewAnalysisAgent:processInput + listen_to: [input] + - name: processChatResponse + type: java + function: org.apache.flink.agents.examples.agents.ReviewAnalysisAgent:processChatResponse + listen_to: [chat_response] + + chat_model_connections: + - name: ollama_server + clazz: ollama + type: java + endpoint: http://localhost:11434 + requestTimeout: 120 + + prompts: + - name: review_analysis_prompt + messages: + - role: system + content: | + Analyze the user review and product information to determine a + satisfaction score (1-5) and potential reasons for dissatisfaction. + + Example input format: + { + "id": "12345", + "review": "The headphones broke after one week. Very poor quality." + } + + Ensure your response can be parsed by Java JSON, using this + format as an example: + { + "id": "12345", + "score": 1, + "reasons": ["poor quality"] + } + + If the review mentions shipping dissatisfaction, first call + notifyShippingManager, then return the JSON above with no + mention of the tool call. + - role: user + content: | + "input": + {input} + + # Action ``processInput`` sends ``ChatRequestEvent("reviewAnalysisModel", ...)``, + # so the chat-model setup MUST be named ``reviewAnalysisModel``. + chat_model_setups: + - name: reviewAnalysisModel + clazz: ollama + type: java + connection: ollama_server + model: qwen3:8b + prompt: review_analysis_prompt + tools: [notifyShippingManager] + extract_reasoning: true + + tools: + - name: notifyShippingManager + type: java + function: org.apache.flink.agents.examples.agents.ReviewAnalysisAgent:notifyShippingManager + parameter_types: [java.lang.String, java.lang.String] diff --git a/python/flink_agents/examples/quickstart/yaml_review_analysis_agent.yaml b/python/flink_agents/examples/quickstart/yaml_review_analysis_agent.yaml new file mode 100644 index 000000000..f6aff0a87 --- /dev/null +++ b/python/flink_agents/examples/quickstart/yaml_review_analysis_agent.yaml @@ -0,0 +1,65 @@ +agents: + - name: review_analysis_agent + description: | + YAML-declared review analysis agent. Reuses the static methods of + ReviewAnalysisAgent from the workflow_agent quickstart as actions + and tool, but wires everything together through this YAML file + instead of class-level decorators. + + actions: + - name: process_input + function: flink_agents.examples.quickstart.agents.review_analysis_agent:ReviewAnalysisAgent.process_input + listen_to: [input] + - name: process_chat_response + function: flink_agents.examples.quickstart.agents.review_analysis_agent:ReviewAnalysisAgent.process_chat_response + listen_to: [chat_response] + + chat_model_connections: + - name: ollama_server + clazz: ollama + request_timeout: 120 + + prompts: + - name: review_analysis_prompt + messages: + - role: system + content: | + Analyze the user review and product information to determine a + satisfaction score (1-5) and potential reasons for dissatisfaction. + + Example input format: + { + "id": "12345", + "review": "The headphones broke after one week. Very poor quality." + } + + Ensure your response can be parsed by Python JSON, using this + format as an example: + { + "id": "12345", + "score": 1, + "reasons": ["poor quality"] + } + + If the review mentions shipping dissatisfaction, first call + notify_shipping_manager, then return the JSON above with no + mention of the tool call. + - role: user + content: | + "input": + {input} + + # Action ``process_input`` sends ``ChatRequestEvent(model="review_analysis_model", ...)``, + # so the chat-model setup MUST be named ``review_analysis_model``. + chat_model_setups: + - name: review_analysis_model + clazz: ollama + connection: ollama_server + model: qwen3:8b + prompt: review_analysis_prompt + tools: [notify_shipping_manager] + extract_reasoning: true + + tools: + - name: notify_shipping_manager + function: flink_agents.examples.quickstart.agents.review_analysis_agent:ReviewAnalysisAgent.notify_shipping_manager diff --git a/python/flink_agents/examples/quickstart/yaml_workflow_agent_example.py b/python/flink_agents/examples/quickstart/yaml_workflow_agent_example.py new file mode 100644 index 000000000..9291de66d --- /dev/null +++ b/python/flink_agents/examples/quickstart/yaml_workflow_agent_example.py @@ -0,0 +1,88 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from pathlib import Path + +from pyflink.common import Duration, WatermarkStrategy +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.datastream.connectors.file_system import FileSource, StreamFormat + +from flink_agents.api.core_options import AgentExecutionOptions +from flink_agents.api.execution_environment import AgentsExecutionEnvironment +from flink_agents.examples.quickstart.agents.custom_types_and_resources import ( + ProductReview, +) + +current_dir = Path(__file__).parent + + +def main() -> None: + """Run the product review analysis quickstart with the agent declared in YAML. + + The agent (chat model, prompt, tool, and actions) is declared in + ``yaml_review_analysis_agent.yaml``. The YAML file's ``function:`` + fields point at the static methods of ``ReviewAnalysisAgent`` already + defined for the code-only quickstart, so this example demonstrates the + minimal delta between a code-defined and a YAML-declared agent. + """ + # Set up the Flink streaming environment and the Agents execution environment. + env = StreamExecutionEnvironment.get_execution_environment() + agents_env = AgentsExecutionEnvironment.get_execution_environment(env) + + # limit async request to avoid overwhelming ollama server + agents_env.get_config().set(AgentExecutionOptions.NUM_ASYNC_THREADS, 2) + + # Load the YAML — the declared agent and its chat-model connection, + # chat-model setup, prompt, tool, and actions are all registered on + # the environment in this single call. + agents_env.load_yaml(current_dir / "yaml_review_analysis_agent.yaml") + + # Read product reviews from a text file as a streaming source. + # Each line in the file should be a JSON string representing a ProductReview. + product_review_stream = env.from_source( + source=FileSource.for_record_stream_format( + StreamFormat.text_line_format(), f"file:///{current_dir}/resources" + ) + .monitor_continuously(Duration.of_minutes(1)) + .build(), + watermark_strategy=WatermarkStrategy.no_watermarks(), + source_name="yaml_review_analysis_example", + ).map( + lambda x: ProductReview.model_validate_json( + x + ) # Deserialize JSON to ProductReview. + ) + + # Apply the YAML-declared agent BY NAME — ``apply`` accepts either an + # Agent instance or the name of an agent registered on the environment. + review_analysis_res_stream = ( + agents_env.from_datastream( + input=product_review_stream, key_selector=lambda x: x.id + ) + .apply("review_analysis_agent") + .to_datastream() + ) + + # Print the analysis results to stdout. + review_analysis_res_stream.print() + + # Execute the Flink pipeline. + agents_env.execute() + + +if __name__ == "__main__": + main() diff --git a/python/pyproject.toml b/python/pyproject.toml index de969a15f..0c8f74e5d 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -70,6 +70,7 @@ exclude = ["_build_backend*"] [tool.setuptools.package-data] "flink_agents.lib" = ["**/*.jar"] +"flink_agents.examples.quickstart" = ["**/*.yaml"] "flink_agents.examples.quickstart.resources" = ["**/*.txt"] # Optional dependencies (dependency groups)