From 1ec60b61757d15ac365c15b8070c9d40bf434cb8 Mon Sep 17 00:00:00 2001 From: daken Date: Tue, 12 May 2026 00:15:52 +0800 Subject: [PATCH 1/2] [Feature] Support TTL on ShortTermMemory --- .../api/agents/AgentExecutionOptions.java | 18 +++ .../OpenAIResponsesModelConnection.java | 1 + .../operator/ActionExecutionOperator.java | 2 +- .../operator/OperatorStateManager.java | 48 ++++++- .../ShortTermMemoryTTLIntegrationTest.java | 130 ++++++++++++++++++ 5 files changed, 194 insertions(+), 5 deletions(-) create mode 100644 runtime/src/test/java/org/apache/flink/agents/runtime/memory/ShortTermMemoryTTLIntegrationTest.java diff --git a/api/src/main/java/org/apache/flink/agents/api/agents/AgentExecutionOptions.java b/api/src/main/java/org/apache/flink/agents/api/agents/AgentExecutionOptions.java index 2b8751d49..0f3389614 100644 --- a/api/src/main/java/org/apache/flink/agents/api/agents/AgentExecutionOptions.java +++ b/api/src/main/java/org/apache/flink/agents/api/agents/AgentExecutionOptions.java @@ -19,6 +19,7 @@ package org.apache.flink.agents.api.agents; import org.apache.flink.agents.api.configuration.ConfigOption; +import org.apache.flink.api.common.state.StateTtlConfig; public class AgentExecutionOptions { public static final ConfigOption ERROR_HANDLING_STRATEGY = @@ -47,4 +48,21 @@ public class AgentExecutionOptions { public static final ConfigOption RAG_ASYNC = new ConfigOption<>("rag.async", Boolean.class, true); + + public static final ConfigOption SHORT_TERM_MEMORY_STATE_TTL_MS = + new ConfigOption<>("short-term-memory.state-ttl.ms", Long.class, 0L); + + public static final ConfigOption + SHORT_TERM_MEMORY_STATE_TTL_UPDATE_TYPE = + new ConfigOption<>( + "short-term-memory.state-ttl.update-type", + StateTtlConfig.UpdateType.class, + StateTtlConfig.UpdateType.OnReadAndWrite); + + public static final ConfigOption + SHORT_TERM_MEMORY_STATE_TTL_VISIBILITY = + new ConfigOption<>( + "short-term-memory.state-ttl.visibility", + StateTtlConfig.StateVisibility.class, + StateTtlConfig.StateVisibility.NeverReturnExpired); } diff --git a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponsesModelConnection.java b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponsesModelConnection.java index f185d65f0..b080ac127 100644 --- a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponsesModelConnection.java +++ b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponsesModelConnection.java @@ -74,6 +74,7 @@ * public static ResourceDesc openAIResponses() { * return ResourceDescriptor.Builder.newBuilder(OpenAIResponsesModelConnection.class.getName()) * .addInitialArgument("api_key", System.getenv("OPENAI_API_KEY")) + * .addInitialArgument("api_base_url", System.getenv("OPENAI_API_URL")) * .addInitialArgument("timeout", 120) * .addInitialArgument("max_retries", 3) * .build(); diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java index 719db92d1..b0ee590a7 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java @@ -141,7 +141,7 @@ public void setup( public void open() throws Exception { super.open(); - stateManager.initializeKeyedStates(getRuntimeContext()); + stateManager.initializeKeyedStates(getRuntimeContext(), agentPlan); stateManager.initializeOperatorStates(getOperatorStateBackend()); resourceCache = new ResourceCache(agentPlan.getResourceProviders()); diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java index 843a02078..6ee287f44 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java @@ -19,11 +19,14 @@ package org.apache.flink.agents.runtime.operator; import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.agents.AgentExecutionOptions; +import org.apache.flink.agents.plan.AgentPlan; import org.apache.flink.agents.runtime.memory.MemoryObjectImpl; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -37,6 +40,7 @@ import javax.annotation.Nullable; +import java.time.Duration; import static org.apache.flink.agents.runtime.utils.StateUtil.*; /** @@ -56,9 +60,9 @@ * *

Lifecycle: instantiated by the operator's {@code initializeState()} (the Flink lifecycle runs * {@code initializeState} before {@code open}). Both {@link - * #initializeKeyedStates(org.apache.flink.api.common.functions.RuntimeContext)} and {@link - * #initializeOperatorStates(OperatorStateBackend)} are invoked later from the operator's {@code - * open()}. There is no explicit close — the underlying state handles are owned by Flink. + * #initializeKeyedStates(org.apache.flink.api.common.functions.RuntimeContext, AgentPlan)} and + * {@link #initializeOperatorStates(OperatorStateBackend)} are invoked later from the operator's + * {@code open()}. There is no explicit close — the underlying state handles are owned by Flink. * *

Design constraint: package-private; no manager-to-manager held references. Cross-cutting data * flows via method parameters (see for example {@link ActionTaskContextManager#transferContexts} @@ -87,7 +91,9 @@ class OperatorStateManager { * * @param runtimeContext the operator's runtime context, used to obtain keyed state handles. */ - void initializeKeyedStates(org.apache.flink.api.common.functions.RuntimeContext runtimeContext) + void initializeKeyedStates( + org.apache.flink.api.common.functions.RuntimeContext runtimeContext, + AgentPlan agentPlan) throws Exception { // init sensoryMemState MapStateDescriptor sensoryMemStateDescriptor = @@ -103,6 +109,7 @@ void initializeKeyedStates(org.apache.flink.api.common.functions.RuntimeContext "shortTermMemory", TypeInformation.of(String.class), TypeInformation.of(MemoryObjectImpl.MemoryItem.class)); + maybeEnableShortTermMemoryTTL(shortTermMemStateDescriptor, agentPlan); shortTermMemState = runtimeContext.getMapState(shortTermMemStateDescriptor); // init sequence number state for per key message ordering @@ -121,6 +128,39 @@ void initializeKeyedStates(org.apache.flink.api.common.functions.RuntimeContext PENDING_INPUT_EVENT_STATE_NAME, TypeInformation.of(Event.class))); } + /** + * When {@link AgentExecutionOptions#SHORT_TERM_MEMORY_STATE_TTL_MS} is positive, attaches Flink + * {@link StateTtlConfig} to the short-term memory {@link MapStateDescriptor}. Unset, null, or + * non-positive values disable TTL (Flink does not allow zero/negative TTL). + */ + private void maybeEnableShortTermMemoryTTL( + MapStateDescriptor descriptor, + AgentPlan agentPlan) { + Long ttlMs = + agentPlan.getConfig().get(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_MS); + if (ttlMs == null || ttlMs <= 0) { + return; + } + + StateTtlConfig.UpdateType updateType = + agentPlan + .getConfig() + .get(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_UPDATE_TYPE); + + StateTtlConfig.StateVisibility stateVisibility = + agentPlan + .getConfig() + .get(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_VISIBILITY); + + StateTtlConfig ttlConfig = + StateTtlConfig.newBuilder(Duration.ofMillis(ttlMs)) + .setUpdateType(updateType) + .setStateVisibility(stateVisibility) + .cleanupFullSnapshot() + .build(); + descriptor.enableTimeToLive(ttlConfig); + } + /** * Registers operator-level (non-keyed) state. * diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/memory/ShortTermMemoryTTLIntegrationTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/memory/ShortTermMemoryTTLIntegrationTest.java new file mode 100644 index 000000000..0773b0d2f --- /dev/null +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/memory/ShortTermMemoryTTLIntegrationTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.runtime.memory; + +import org.apache.flink.agents.api.AgentsExecutionEnvironment; +import org.apache.flink.agents.api.InputEvent; +import org.apache.flink.agents.api.OutputEvent; +import org.apache.flink.agents.api.agents.Agent; +import org.apache.flink.agents.api.agents.AgentExecutionOptions; +import org.apache.flink.agents.api.annotation.Action; +import org.apache.flink.agents.api.context.MemoryObject; +import org.apache.flink.agents.api.context.RunnerContext; +import org.apache.flink.agents.plan.AgentConfiguration; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Integration test for Short-Term Memory TTL functionality. */ +class ShortTermMemoryTTLIntegrationTest { + + private static final String MEMORY_KEY = "test_key"; + + private static final class TestInput { + public String eventKey; + public long sleepMs; + + private TestInput() {} + + private TestInput(String eventKey, long sleepMs) { + this.eventKey = eventKey; + this.sleepMs = sleepMs; + } + } + + public static class TTLTestAgent extends Agent { + + @Action(listenEventTypes = {InputEvent.EVENT_TYPE}) + public static void input(org.apache.flink.agents.api.Event event, RunnerContext ctx) + throws Exception { + InputEvent inputEvent = (InputEvent) event; + TestInput input = (TestInput) inputEvent.getInput(); + + MemoryObject shortTermMemory = ctx.getShortTermMemory(); + MemoryObject memoryObject = shortTermMemory.get(input.eventKey); + + Object existingValue = null; + int currentCount = 0; + if (memoryObject != null && !memoryObject.isNestedObject()) { + existingValue = memoryObject.getValue(); + if (existingValue instanceof Integer) { + currentCount = (Integer) existingValue; + } else if (existingValue instanceof Number) { + currentCount = ((Number) existingValue).intValue(); + } + } + + shortTermMemory.set(input.eventKey, currentCount + 1); + Thread.sleep(input.sleepMs); + ctx.sendEvent( + new OutputEvent( + input.eventKey + "|" + (existingValue == null ? "NEW" : "EXISTING"))); + } + } + + @Test + void testTTLConfigurationNotApplied() throws Exception { + List results = runScenario(1000L, 0L); + + assertEquals(List.of("event1|NEW", "event2|NEW", "event1|EXISTING"), results); + } + + @Test + void testTTLConfigurationApplied() throws Exception { + List results = runScenario(1000L, 2000L); + + assertEquals(List.of("event1|NEW", "event2|NEW", "event1|NEW"), results); + } + + private static List runScenario(long ttlMs, long sleepMs) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + AgentsExecutionEnvironment agentEnv = + AgentsExecutionEnvironment.getExecutionEnvironment(env); + AgentConfiguration agentsConfig = (AgentConfiguration) agentEnv.getConfig(); + agentsConfig.set(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_MS, ttlMs); + agentsConfig.set( + AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_UPDATE_TYPE, + StateTtlConfig.UpdateType.OnCreateAndWrite); + agentsConfig.set( + AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_VISIBILITY, + StateTtlConfig.StateVisibility.NeverReturnExpired); + + List testData = new ArrayList<>(); + testData.add(new TestInput("event1", sleepMs)); + testData.add(new TestInput("event2", sleepMs)); + testData.add(new TestInput("event1", sleepMs)); + + DataStream inputStream = env.fromCollection(testData); + DataStream outputStream = + agentEnv.fromDataStream(inputStream, x -> MEMORY_KEY) + .apply(new TTLTestAgent()) + .toDataStream(); + + List results = new ArrayList<>(); + outputStream.map(Object::toString).executeAndCollect().forEachRemaining(results::add); + return results; + } +} From 6e9196a5d16f5e8e9924792737f4ce038ac85d41 Mon Sep 17 00:00:00 2001 From: daken Date: Tue, 12 May 2026 23:24:03 +0800 Subject: [PATCH 2/2] [Feature] mvn spotless:apply --- .../flink/agents/runtime/operator/OperatorStateManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java index 6ee287f44..2bfe43ea5 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java @@ -41,6 +41,7 @@ import javax.annotation.Nullable; import java.time.Duration; + import static org.apache.flink.agents.runtime.utils.StateUtil.*; /**