You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
flink-agents already supports durable execution in two important ways:
If a durable call has already completed and its result or exception has been persisted, recovery can replay the persisted terminal state without re-executing the external call.
If the entire action has already completed, the framework skips the whole action and only replays persisted memory updates and output events.
Problem
There is still a recovery gap while a durable call is in progress: the runtime may have already started the external call, but the call result has not yet been persisted in a replayable form. If the job fails in that window, recovery cannot determine whether the external system has already produced side effects, so the call may be executed again. For non-idempotent systems, that can lead to duplication.
Goal
This proposal improves recovery semantics for that in-progress window without changing the existing durable execution abstraction. During recovery, an in-progress durable call can optionally provide a reconciler hook so user code or an external system can recover a terminal outcome directly when it can be determined reliably.
Design
Overview
DurableCallable remains the only durable call abstraction.
A durable call opts into reconcile support by returning an optional reconciler callable from DurableCallable.reconciler(). Calls that do so are referred to below as reconcilable durable calls.
Only reconcilable durable calls use a two-phase state model: persist PENDING before execution, then finalize the slot as SUCCEEDED or FAILED.
During recovery, if the runtime hits a matching PENDING record for the current call, it executes the reconciler callable first. If that callable returns a result, the runtime recovers a successful terminal state directly. If it throws, the runtime recovers a failed terminal state directly.
reconciler() returns an optional Callable<T>. Returning null disables reconcile for this durable call and falls back to the existing durable execution behavior. During recovery, the runtime replays a previously completed durable result when one is available; otherwise it executes the original call().
The runtime invokes the reconciler callable only when recovery revisits this durable call and finds that the original execution result has not yet been persisted.
Returning a result provides the recovered successful outcome for this durable call. The runtime persists and replays that recovered result.
Throwing an exception provides the recovered failed outcome for this durable call. The runtime persists and replays that recovered failure.
Add a status field to represent the execution state of a reconcilable durable call:
PENDING: the current slot has not yet been finalized to a terminal state. Importantly, PENDING means only that no terminal state has been persisted yet; it does not guarantee that the external call has already started or produced side effects.
SUCCEEDED: a replayable successful result is available.
FAILED: a replayable failure is available.
Runtime Behavior
Execution Path
For a reconcilable durable call, the runtime performs the following steps:
Persist a PENDING record at the current call index.
Execute call().
If call() succeeds, finalize the current slot as SUCCEEDED.
If call() throws, finalize the current slot as FAILED.
Durable calls without a reconciler callable keep the existing completion-only behavior.
Recovery Path
For a reconcilable durable call, the runtime evaluates the current slot using the existing sequential matching rule driven by currentCallIndex:
If there is no record at the current index:
persist PENDING, execute call(), and persist the resulting terminal state.
If the record at the current index does not match the current functionId and argsDigest:
clear the current and subsequent records, persist PENDING, execute call(), and persist the resulting terminal state.
If the current slot is SUCCEEDED:
replay the result directly.
If the current slot is FAILED:
replay the exception directly.
If the current slot is PENDING:
execute the reconciler callable.
If the reconciler callable returns a result, finalize the current slot as SUCCEEDED and replay the result.
If the reconciler callable throws, finalize the current slot as FAILED and replay the exception.
Two points are worth noting:
Recovery matching is still driven by currentCallIndex together with the current slot's functionId and argsDigest, not by getId() alone.
The reconcile branch is entered only when the current index matches a PENDING slot for the current call.
Cleanup on Call-Order Mismatch
Cleanup behavior remains consistent with the current implementation: when call order no longer matches, the runtime clears the current slot and all subsequent call results. Two points are worth noting:
PENDING records follow the same cleanup rule.
After cleanup, the trimmed ActionState should be persisted promptly so the same stale suffix does not keep participating in repeated recovery attempts.
Implementation
API
Java: add a default reconciler() method to DurableCallable that returns an optional reconciler callable.
Python: add a keyword-only reconciler parameter to RunnerContext.durable_execute(...) and durable_execute_async(...).
State Model
CallResult adds a status field with the three states PENDING, SUCCEEDED, and FAILED.
Also add ActionState.replaceCallResult(int index, CallResult callResult) so the current slot can be updated in place from PENDING to SUCCEEDED or FAILED.
Runtime
RunnerContextImpl.durableExecute(...) selects either the completion-only path or the reconcile-aware path based on whether the durable call provides a reconciler callable.
RunnerContextImpl.durableExecuteAsync(...) remains a base-class fallback. It does not provide true async durable execution, and instead explicitly falls back to durableExecute(...). Real async durable execution remains only in JavaRunnerContextImpl.
@Overridepublic <T> TdurableExecuteAsync(DurableCallable<T> durableCallable) throwsException {
LOG.debug(
"Async durable execution is not supported in RunnerContextImpl; falling back to durableExecute for {}",
durableCallable.getId());
returndurableExecute(durableCallable);
}
JavaRunnerContextImpl.durableExecuteAsync(...) continues to reuse the same state machine as the sync path. The only variation point is how the original call() is executed.
Key implementation constraints:
On the first execution of a reconcilable durable call, the runtime appends a PENDING record at the current index. The later terminal state must overwrite that slot instead of appending a new call result.
currentCallIndex advances only when a terminal state is replayed or the current slot is finalized as terminal. Writing PENDING itself does not advance the index.
When the reconciler callable throws, the runtime finalizes the current slot as FAILED.
Sync and async share the same reconcile state machine. Their only difference is how the original call() is executed. In particular, the async path must guarantee that PENDING is already persisted before the underlying call actually begins.
Storage and Serialization
ActionStateKafkaSeder must remain compatible with both old and new CallResult formats. New code must be able to read older persisted records that do not contain the status field and interpret them using the previous SUCCEEDED / FAILED semantics.
Documentation
Document reconcilable durable calls and provide an end-to-end example.
Explain the recovery semantics and underlying rationale.
Show how reconciler logic can query external systems and provide a recovered terminal outcome.
Testing
ActionState / CallResult serialization and deserialization
reconcilable durable call sync/async execution and mismatch handling
PENDING -> recovered success
PENDING -> recovered failure
mixed ordering between plain durable calls and reconcilable durable calls
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Durable Execution Reconcile
Motivation
Background
flink-agentsalready supports durable execution in two important ways:Problem
There is still a recovery gap while a durable call is in progress: the runtime may have already started the external call, but the call result has not yet been persisted in a replayable form. If the job fails in that window, recovery cannot determine whether the external system has already produced side effects, so the call may be executed again. For non-idempotent systems, that can lead to duplication.
Goal
This proposal improves recovery semantics for that in-progress window without changing the existing durable execution abstraction. During recovery, an in-progress durable call can optionally provide a reconciler hook so user code or an external system can recover a terminal outcome directly when it can be determined reliably.
Design
Overview
DurableCallableremains the only durable call abstraction.DurableCallable.reconciler(). Calls that do so are referred to below as reconcilable durable calls.PENDINGbefore execution, then finalize the slot asSUCCEEDEDorFAILED.PENDINGrecord for the current call, it executes the reconciler callable first. If that callable returns a result, the runtime recovers a successful terminal state directly. If it throws, the runtime recovers a failed terminal state directly.API Design
Java API
Notes:
reconciler()returns an optionalCallable<T>. Returningnulldisables reconcile for this durable call and falls back to the existing durable execution behavior. During recovery, the runtime replays a previously completed durable result when one is available; otherwise it executes the originalcall().Java Example
Python API
Notes:
reconcileris an optional keyword-only reconciler callable. If it is not provided, the existing completion-only semantics remain unchanged.reconcilerreturns a result, the runtime persists and replays that recovered successful outcome.reconcilerraises, the runtime persists and replays that recovered failure.Python Example
State Model
CallResult
Add a
statusfield to represent the execution state of a reconcilable durable call:PENDING: the current slot has not yet been finalized to a terminal state. Importantly,PENDINGmeans only that no terminal state has been persisted yet; it does not guarantee that the external call has already started or produced side effects.SUCCEEDED: a replayable successful result is available.FAILED: a replayable failure is available.Runtime Behavior
Execution Path
For a reconcilable durable call, the runtime performs the following steps:
PENDINGrecord at the current call index.call().call()succeeds, finalize the current slot asSUCCEEDED.call()throws, finalize the current slot asFAILED.Durable calls without a reconciler callable keep the existing completion-only behavior.
Recovery Path
For a reconcilable durable call, the runtime evaluates the current slot using the existing sequential matching rule driven by
currentCallIndex:PENDING, executecall(), and persist the resulting terminal state.functionIdandargsDigest:PENDING, executecall(), and persist the resulting terminal state.SUCCEEDED:FAILED:PENDING:SUCCEEDEDand replay the result.FAILEDand replay the exception.Two points are worth noting:
currentCallIndextogether with the current slot'sfunctionIdandargsDigest, not bygetId()alone.PENDINGslot for the current call.Cleanup on Call-Order Mismatch
Cleanup behavior remains consistent with the current implementation: when call order no longer matches, the runtime clears the current slot and all subsequent call results. Two points are worth noting:
PENDINGrecords follow the same cleanup rule.ActionStateshould be persisted promptly so the same stale suffix does not keep participating in repeated recovery attempts.Implementation
API
reconciler()method toDurableCallablethat returns an optional reconciler callable.reconcilerparameter toRunnerContext.durable_execute(...)anddurable_execute_async(...).State Model
CallResultadds astatusfield with the three statesPENDING,SUCCEEDED, andFAILED.Also add
ActionState.replaceCallResult(int index, CallResult callResult)so the current slot can be updated in place fromPENDINGtoSUCCEEDEDorFAILED.Runtime
RunnerContextImpl.durableExecute(...)selects either the completion-only path or the reconcile-aware path based on whether the durable call provides a reconciler callable.RunnerContextImpl.durableExecuteAsync(...)remains a base-class fallback. It does not provide true async durable execution, and instead explicitly falls back todurableExecute(...). Real async durable execution remains only inJavaRunnerContextImpl.JavaRunnerContextImpl.durableExecuteAsync(...)continues to reuse the same state machine as the sync path. The only variation point is how the originalcall()is executed.Key implementation constraints:
PENDINGrecord at the current index. The later terminal state must overwrite that slot instead of appending a new call result.currentCallIndexadvances only when a terminal state is replayed or the current slot is finalized as terminal. WritingPENDINGitself does not advance the index.FAILED.call()is executed. In particular, the async path must guarantee thatPENDINGis already persisted before the underlying call actually begins.Storage and Serialization
ActionStateKafkaSedermust remain compatible with both old and newCallResultformats. New code must be able to read older persisted records that do not contain thestatusfield and interpret them using the previousSUCCEEDED/FAILEDsemantics.Documentation
Testing
ActionState/CallResultserialization and deserializationPENDING -> recovered successPENDING -> recovered failureBeta Was this translation helpful? Give feedback.
All reactions