From eede7899da50c0f69539b1288686bc05ba1c5f5b Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 4 Nov 2025 16:32:29 -0800 Subject: [PATCH 1/8] build: add actor module dependency for protofsm In this commit, we add the actor module as a dependency to support request-response patterns in the protofsm state machine. The actor module provides a Future/Promise abstraction that enables asynchronous operations with result handling. We use a local replace directive to point to the ./actor subdirectory, as the actor package is being developed alongside protofsm. This allows us to iterate on both packages simultaneously while maintaining clean module boundaries. --- go.mod | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/go.mod b/go.mod index 7c501f88170..50763762ac0 100644 --- a/go.mod +++ b/go.mod @@ -64,6 +64,8 @@ require ( pgregory.net/rapid v1.2.0 ) +require github.com/lightningnetwork/lnd/actor v0.0.1-alpha + require ( dario.cat/mergo v1.0.1 // indirect github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect @@ -213,6 +215,9 @@ replace github.com/gogo/protobuf => github.com/gogo/protobuf v1.3.2 // allows us to specify that as an option. replace google.golang.org/protobuf => github.com/lightninglabs/protobuf-go-hex-display v1.33.0-hex-display +// Use the local actor module for development. +replace github.com/lightningnetwork/lnd/actor => ./actor + // If you change this please also update docs/INSTALL.md and GO_VERSION in // Makefile (then run `make lint` to see where else it needs to be updated as // well). From f2c93bd57e141d0a2bb29ce82e74cd9c61bd97c6 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 4 Nov 2025 16:32:54 -0800 Subject: [PATCH 2/8] protofsm: add Outbox field to EmittedEvent In this commit, we extend the EmittedEvent structure with an Outbox field that can hold events to be returned to the caller. This enables nested state machines to emit events that bubble up to their parent state machine for further processing. The Outbox field serves a different purpose than InternalEvent and ExternalEvents. InternalEvent loops events back into the same state machine, while ExternalEvents triggers I/O operations like sending messages or broadcasting transactions. The Outbox, on the other hand, carries events outward to be processed by a parent or calling context. This pattern is particularly useful for composed state machines where a child state machine needs to communicate state changes or results back to its parent without tight coupling between the two machines. --- protofsm/state_machine.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index b3e16f5fd35..74fb8d9333d 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -42,6 +42,12 @@ type EmittedEvent[Event any] struct { // ExternalEvent is an optional external event that is to be sent to // the daemon for dispatch. Usually, this is some form of I/O. ExternalEvents DaemonEventSet + + // Outbox is an optional set of events that are accumulated during event + // processing and returned to the caller for processing into the main + // state machine. This enables nested state machines to emit events that + // bubble up to their parent. + Outbox []Event } // StateTransition is a state transition type. It denotes the next state to go From 98643c0c72dff82928daea6c7b28e19d66ff8feb Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 4 Nov 2025 16:33:44 -0800 Subject: [PATCH 3/8] protofsm: add AskEvent method for request-response pattern In this commit, we introduce the AskEvent method that implements the Ask pattern from actor systems, complementing the existing fire-and- forget SendEvent method. The key distinction is that AskEvent returns a Future that resolves with the accumulated outbox events after the state machine fully processes the event. The implementation uses the Promise/Future abstraction from the actor module to enable asynchronous result delivery. When a caller invokes AskEvent, we create a Promise and send both the event and the promise to the state machine via a dedicated syncEvents channel. The state machine's driveMachine goroutine will process the event and complete the promise with either the accumulated outbox events or an error. --- protofsm/state_machine.go | 75 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index 74fb8d9333d..f694d757f96 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -11,6 +11,7 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog/v2" + "github.com/lightningnetwork/lnd/actor" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/lnutils" @@ -130,6 +131,18 @@ type stateQuery[Event any, Env Environment] struct { CurrentState chan State[Event, Env] } +// syncEventRequest is used to send an event to the state machine synchronously, +// waiting for the event processing to complete and returning the accumulated +// outbox events. +type syncEventRequest[Event any] struct { + // event is the event to process. + event Event + + // promise is used to signal completion and return the accumulated + // outbox events or an error. + promise actor.Promise[[]Event] +} + // StateMachine represents an abstract FSM that is able to process new incoming // events and drive a state machine to termination. This implementation uses // type params to abstract over the types of events and environment. Events @@ -146,6 +159,10 @@ type StateMachine[Event any, Env Environment] struct { // FSM. events chan Event + // syncEvents is the channel that will be used to send synchronous event + // requests to the FSM, returning the accumulated outbox events. + syncEvents chan syncEventRequest[Event] + // newStateEvents is an EventDistributor that will be used to notify // any relevant callers of new state transitions that occur. newStateEvents *fn.EventDistributor[State[Event, Env]] @@ -220,6 +237,7 @@ func NewStateMachine[Event any, Env Environment]( fmt.Sprintf("FSM(%v):", cfg.Env.Name()), ), events: make(chan Event, 1), + syncEvents: make(chan syncEventRequest[Event], 1), stateQuery: make(chan stateQuery[Event, Env]), gm: *fn.NewGoroutineManager(), newStateEvents: fn.NewEventDistributor[State[Event, Env]](), @@ -265,6 +283,63 @@ func (s *StateMachine[Event, Env]) SendEvent(ctx context.Context, event Event) { } } +// AskEvent sends a new event to the state machine using the Ask pattern +// (request-response), waiting for the event to be fully processed. It +// returns a Future that will be resolved with the accumulated outbox events +// from all state transitions triggered by this event, including nested +// internal events. The Future's Await method will return fn.Result[[]Event] +// containing either the accumulated outbox events or an error if processing +// failed. +func (s *StateMachine[Event, Env]) AskEvent(ctx context.Context, + event Event) actor.Future[[]Event] { + + s.log.Debugf("Asking event %T", event) + + // Create a promise to signal completion and return results. + promise := actor.NewPromise[[]Event]() + + req := syncEventRequest[Event]{ + event: event, + promise: promise, + } + + // Check for context cancellation or shutdown first to avoid races. + select { + case <-ctx.Done(): + promise.Complete( + fn.Errf[[]Event]("context cancelled: %w", + ctx.Err()), + ) + + return promise.Future() + + case <-s.quit: + promise.Complete(fn.Err[[]Event](ErrStateMachineShutdown)) + + return promise.Future() + + default: + } + + // Send the request to the state machine. If we can't send it due to + // context cancellation or shutdown, complete the promise with an error. + select { + // Successfully sent, the promise will be completed by driveMachine. + case s.syncEvents <- req: + + case <-ctx.Done(): + promise.Complete( + fn.Errf[[]Event]("context cancelled: %w", + ctx.Err()), + ) + + case <-s.quit: + promise.Complete(fn.Err[[]Event](ErrStateMachineShutdown)) + } + + return promise.Future() +} + // CanHandle returns true if the target message can be routed to the state // machine. func (s *StateMachine[Event, Env]) CanHandle(msg msgmux.PeerMsg) bool { From 2aa70d0e43bf650c62e9dfa7336c901e71f7102b Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 4 Nov 2025 16:34:11 -0800 Subject: [PATCH 4/8] protofsm: wire up outbox accumulation and AskEvent handling In this commit, we complete the outbox pattern implementation by modifying applyEvents to accumulate outbox events and extending driveMachine to handle AskEvent requests. The applyEvents function now maintains an outbox slice that accumulates events from every EmittedEvent encountered during the event processing loop. This includes events from the initial state transition as well as all nested internal events, ensuring callers receive the complete set of outbox messages regardless of how deeply nested the state transitions become. The function signature changes from returning (State, error) to (State, []Event, error) to carry this information. The driveMachine event loop now handles the syncEvents channel alongside the existing events channel. When processing an AskEvent request, we call applyEvents and capture the returned outbox slice. If processing succeeds, we complete the promise with the accumulated events using fn.Ok(outbox). If an error occurs, we complete the promise with the error using fn.Err[[]Event](err) before shutting down the state machine, maintaining our invariant that processing errors are fatal. The regular SendEvent path remains unchanged except for discarding the outbox return value with an underscore, preserving backward compatibility for fire-and-forget event processing. --- protofsm/state_machine.go | 51 +++++++++++++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index f694d757f96..2c7afe21fd4 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -644,13 +644,19 @@ func (s *StateMachine[Event, Env]) executeDaemonEvent(ctx context.Context, // applyEvents applies a new event to the state machine. This will continue // until no further events are emitted by the state machine. Along the way, -// we'll also ensure to execute any daemon events that are emitted. +// we'll also ensure to execute any daemon events that are emitted. The +// function returns the final state, any accumulated outbox events, and an +// error if one occurred. func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context, currentState State[Event, Env], newEvent Event) (State[Event, Env], - error) { + []Event, error) { eventQueue := fn.NewQueue(newEvent) + // outbox accumulates all outbox events from state transitions during + // the entire event processing chain. + var outbox []Event + // Given the next event to handle, we'll process the event, then add // any new emitted internal events to our event queue. This continues // until we reach a terminal state, or we run out of internal events to @@ -694,6 +700,10 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context, eventQueue.Enqueue(inEvent) } + // Accumulate any outbox events from this state + // transition. + outbox = append(outbox, events.Outbox...) + return nil }) if err != nil { @@ -717,11 +727,11 @@ func (s *StateMachine[Event, Env]) applyEvents(ctx context.Context, return nil }) if err != nil { - return currentState, err + return currentState, nil, err } } - return currentState, nil + return currentState, outbox, nil } // driveMachine is the main event loop of the state machine. It accepts any new @@ -752,7 +762,7 @@ func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) { // machine forward until we either run out of internal events, // or we reach a terminal state. case newEvent := <-s.events: - newState, err := s.applyEvents( + newState, _, err := s.applyEvents( ctx, currentState, newEvent, ) if err != nil { @@ -769,6 +779,37 @@ func (s *StateMachine[Event, Env]) driveMachine(ctx context.Context) { currentState = newState + // We have a synchronous event request that expects the + // accumulated outbox events to be returned via the promise. + case syncReq := <-s.syncEvents: + newState, outbox, err := s.applyEvents( + ctx, currentState, syncReq.event, + ) + if err != nil { + s.cfg.ErrorReporter.ReportError(err) + + s.log.ErrorS(ctx, "Unable to apply sync event", + err) + + // Complete the promise with the error. + // + // TODO(roasbeef): distinguish between error + // types? state vs processing + syncReq.promise.Complete(fn.Err[[]Event](err)) + + // An error occurred, so we'll tear down the + // entire state machine as we can't proceed. + go s.Stop() + + return + } + + currentState = newState + + // Complete the promise with the accumulated outbox + // events. + syncReq.promise.Complete(fn.Ok(outbox)) + // An outside caller is querying our state, so we'll return the // latest state. case stateQuery := <-s.stateQuery: From 1387ac1e1da486e217c0542e09963c360785610a Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 4 Nov 2025 16:34:34 -0800 Subject: [PATCH 5/8] protofsm/test: add tests for AskEvent and Outbox --- protofsm/state_machine_test.go | 345 +++++++++++++++++++++++++++++++++ 1 file changed, 345 insertions(+) diff --git a/protofsm/state_machine_test.go b/protofsm/state_machine_test.go index ca060614f3b..469b76a7756 100644 --- a/protofsm/state_machine_test.go +++ b/protofsm/state_machine_test.go @@ -1,6 +1,7 @@ package protofsm import ( + "context" "encoding/hex" "fmt" "sync/atomic" @@ -868,3 +869,347 @@ func TestStateMachineMsgMapper(t *testing.T) { adapters.AssertExpectations(t) env.AssertExpectations(t) } + +// outboxEvent is a test event type that gets added to the outbox. +type outboxEvent struct { + id int +} + +func (o *outboxEvent) dummy() { +} + +// emitOutbox is a test event that triggers a state to emit outbox events. +type emitOutbox struct { + numOutbox int + numInternal int + shouldGoToFin bool +} + +func (e *emitOutbox) dummy() { +} + +// dummyStateOutbox is a test state that emits outbox events during +// transitions. +type dummyStateOutbox struct { + counter int +} + +func (d *dummyStateOutbox) String() string { + return fmt.Sprintf("dummyStateOutbox(%d)", d.counter) +} + +func (d *dummyStateOutbox) ProcessEvent(event dummyEvents, env *dummyEnv, +) (*StateTransition[dummyEvents, *dummyEnv], error) { + + switch newEvent := event.(type) { + case *emitOutbox: + // Create outbox events based on the request. + outbox := make([]dummyEvents, newEvent.numOutbox) + for i := 0; i < newEvent.numOutbox; i++ { + outbox[i] = &outboxEvent{ + id: d.counter*100 + i, + } + } + + // Create internal events that will also emit outbox events. + internalEvents := make([]dummyEvents, newEvent.numInternal) + for i := 0; i < newEvent.numInternal; i++ { + internalEvents[i] = &emitOutbox{ + numOutbox: 1, + numInternal: 0, + shouldGoToFin: false, + } + } + + var nextState State[dummyEvents, *dummyEnv] + if newEvent.shouldGoToFin { + nextState = &dummyStateFin{} + } else { + nextState = &dummyStateOutbox{counter: d.counter + 1} + } + + return &StateTransition[dummyEvents, *dummyEnv]{ + NextState: nextState, + NewEvents: fn.Some(EmittedEvent[dummyEvents]{ + InternalEvent: internalEvents, + Outbox: outbox, + }), + }, nil + + case *goToFin: + return &StateTransition[dummyEvents, *dummyEnv]{ + NextState: &dummyStateFin{}, + }, nil + + case *outboxEvent: + // When processing an outbox event (shouldn't happen in normal + // flow), just stay in current state. + return &StateTransition[dummyEvents, *dummyEnv]{ + NextState: d, + }, nil + } + + return nil, fmt.Errorf("unknown event: %T", event) +} + +func (d *dummyStateOutbox) IsTerminal() bool { + return false +} + +// TestStateMachineAskEvent tests the AskEvent method and outbox event +// accumulation functionality. +func TestStateMachineAskEvent(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + event dummyEvents + expectedOutboxCount int + expectError bool + }{ + { + name: "basic outbox accumulation", + event: &emitOutbox{ + numOutbox: 3, + numInternal: 0, + shouldGoToFin: false, + }, + expectedOutboxCount: 3, + expectError: false, + }, + + // 2 from top-level + 3 from internal events (1 each). + { + name: "nested internal events with outbox", + event: &emitOutbox{ + numOutbox: 2, + numInternal: 3, + shouldGoToFin: false, + }, + expectedOutboxCount: 5, + expectError: false, + }, + + { + name: "empty outbox", + event: &emitOutbox{ + numOutbox: 0, + numInternal: 0, + shouldGoToFin: false, + }, + expectedOutboxCount: 0, + expectError: false, + }, + + // 1 from top-level + 5 from internal events. + { + name: "deeply nested outbox", + event: &emitOutbox{ + numOutbox: 1, + numInternal: 5, + shouldGoToFin: false, + }, + expectedOutboxCount: 6, + expectError: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctx := t.Context() + + // Create our state machine with the outbox test state. + env := &dummyEnv{} + startingState := &dummyStateOutbox{counter: 0} + adapters := newDaemonAdapters() + + cfg := StateMachineCfg[dummyEvents, *dummyEnv]{ + Daemon: adapters, + InitialState: startingState, + Env: env, + } + stateMachine := NewStateMachine(cfg) + + stateSub := stateMachine.RegisterStateEvents() + defer stateMachine.RemoveStateSub(stateSub) + + stateMachine.Start(ctx) + defer stateMachine.Stop() + + // Wait for initial state. + expectedStates := []State[dummyEvents, *dummyEnv]{ + &dummyStateOutbox{}, + } + assertStateTransitions(t, stateSub, expectedStates) + + // Send the event using Ask pattern. + future := stateMachine.AskEvent(ctx, tc.event) + require.NotNil(t, future) + + result := future.Await(ctx) + + if tc.expectError { + require.True(t, result.IsErr()) + } else { + require.True(t, result.IsOk()) + + // Extract the outbox events. + outbox := result.UnwrapOr(nil) + require.Len(t, outbox, tc.expectedOutboxCount) + + // Verify outbox events are of the correct type. + for _, event := range outbox { + _, ok := event.(*outboxEvent) + require.True(t, ok, + "expected outboxEvent, got %T", + event) + } + } + + adapters.AssertExpectations(t) + env.AssertExpectations(t) + }) + } +} + +// TestStateMachineOutboxWithMixedEvents tests that outbox accumulation works +// correctly when mixed with regular SendEvent calls. +func TestStateMachineOutboxWithMixedEvents(t *testing.T) { + t.Parallel() + + ctx := t.Context() + + // Create our state machine with the outbox test state. + env := &dummyEnv{} + startingState := &dummyStateOutbox{counter: 0} + adapters := newDaemonAdapters() + + cfg := StateMachineCfg[dummyEvents, *dummyEnv]{ + Daemon: adapters, + InitialState: startingState, + Env: env, + } + stateMachine := NewStateMachine(cfg) + + // Subscribe to state transitions, then start the main state machine. + stateSub := stateMachine.RegisterStateEvents() + defer stateMachine.RemoveStateSub(stateSub) + + stateMachine.Start(ctx) + defer stateMachine.Stop() + + expectedStates := []State[dummyEvents, *dummyEnv]{ + &dummyStateOutbox{}, + } + assertStateTransitions(t, stateSub, expectedStates) + + // Send a regular async event first. + stateMachine.SendEvent(ctx, &emitOutbox{ + numOutbox: 1, + numInternal: 0, + shouldGoToFin: false, + }) + + // Wait for state transition from async event. + expectedStates = []State[dummyEvents, *dummyEnv]{ + &dummyStateOutbox{counter: 1}, + } + assertStateTransitions(t, stateSub, expectedStates) + + // Now send an event using Ask pattern. + future := stateMachine.AskEvent(ctx, &emitOutbox{ + numOutbox: 2, + numInternal: 1, + shouldGoToFin: false, + }) + + result := future.Await(ctx) + require.True(t, result.IsOk()) + + // We should have 3 outbox events (2 from top-level + 1 from internal). + outbox := result.UnwrapOr(nil) + require.Len(t, outbox, 3) + + adapters.AssertExpectations(t) + env.AssertExpectations(t) +} + +// TestStateMachineAskEventContextCancellation tests that context cancellation +// is properly handled in AskEvent. +func TestStateMachineAskEventContextCancellation(t *testing.T) { + t.Parallel() + + ctx := t.Context() + + env := &dummyEnv{} + startingState := &dummyStateOutbox{counter: 0} + adapters := newDaemonAdapters() + + cfg := StateMachineCfg[dummyEvents, *dummyEnv]{ + Daemon: adapters, + InitialState: startingState, + Env: env, + } + stateMachine := NewStateMachine(cfg) + + stateMachine.Start(ctx) + defer stateMachine.Stop() + + // Create a context that's already cancelled. + cancelledCtx, cancel := context.WithCancel(t.Context()) + cancel() + + // Try to send an event with a cancelled context. + future := stateMachine.AskEvent(cancelledCtx, &emitOutbox{ + numOutbox: 1, + numInternal: 0, + shouldGoToFin: false, + }) + + // The future should be completed with an error. + result := future.Await(ctx) + require.True(t, result.IsErr()) + + adapters.AssertExpectations(t) + env.AssertExpectations(t) +} + +// TestStateMachineAskEventAfterShutdown tests that AskEvent properly handles +// the case where the state machine has been shut down. +func TestStateMachineAskEventAfterShutdown(t *testing.T) { + t.Parallel() + + ctx := t.Context() + + // Create our state machine. + env := &dummyEnv{} + startingState := &dummyStateOutbox{counter: 0} + adapters := newDaemonAdapters() + + cfg := StateMachineCfg[dummyEvents, *dummyEnv]{ + Daemon: adapters, + InitialState: startingState, + Env: env, + } + stateMachine := NewStateMachine(cfg) + + stateMachine.Start(ctx) + + // Stop the state machine. + stateMachine.Stop() + + // Try to send an event after shutdown. + future := stateMachine.AskEvent(ctx, &emitOutbox{ + numOutbox: 1, + numInternal: 0, + shouldGoToFin: false, + }) + + // The future should be completed with a shutdown error. + result := future.Await(ctx) + require.True(t, result.IsErr()) + require.ErrorIs(t, result.Err(), ErrStateMachineShutdown) + + adapters.AssertExpectations(t) + env.AssertExpectations(t) +} From eb118f8ab6b5c72de8ea32334058d88ef33a731f Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 16 May 2025 17:23:06 -0700 Subject: [PATCH 6/8] protofsm: implement the actor.ActorBehavior interface for StateMachine In this commit, we implement the actor.ActorBehavior interface for StateMachine. This enables the state machine executor to be registered as an actor, and have messages be sent to it via a unique ServiceKey that a concrete instance will set. --- protofsm/actor_wrapper.go | 23 +++++++++++++++++++++++ protofsm/state_machine.go | 20 ++++++++++++++++++++ 2 files changed, 43 insertions(+) create mode 100644 protofsm/actor_wrapper.go diff --git a/protofsm/actor_wrapper.go b/protofsm/actor_wrapper.go new file mode 100644 index 00000000000..b0be9a07eb4 --- /dev/null +++ b/protofsm/actor_wrapper.go @@ -0,0 +1,23 @@ +package protofsm + +import ( + "fmt" + + "github.com/lightningnetwork/lnd/actor" +) + +// ActorMessage wraps an Event, in order to create a new message that can be +// used with the actor package. +type ActorMessage[Event any] struct { + actor.BaseMessage + + // Event is the event that is being sent to the actor. + Event Event +} + +// MessageType returns the type of the message. +// +// NOTE: This implements the actor.Message interface. +func (a ActorMessage[Event]) MessageType() string { + return fmt.Sprintf("ActorMessage(%T)", a.Event) +} diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index 2c7afe21fd4..c8d7850ba3b 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -340,6 +340,26 @@ func (s *StateMachine[Event, Env]) AskEvent(ctx context.Context, return promise.Future() } +// Receive processes a message and returns a Result. The provided context is the +// actor's internal context, which can be used to detect actor shutdown +// requests. +// +// NOTE: This implements the actor.ActorBehavior interface. +func (s *StateMachine[Event, Env]) Receive(ctx context.Context, + e ActorMessage[Event]) fn.Result[bool] { + + select { + case s.events <- e.Event: + return fn.Ok(true) + + case <-ctx.Done(): + return fn.Err[bool](ctx.Err()) + + case <-s.quit: + return fn.Err[bool](ErrStateMachineShutdown) + } +} + // CanHandle returns true if the target message can be routed to the state // machine. func (s *StateMachine[Event, Env]) CanHandle(msg msgmux.PeerMsg) bool { From 11d0375eb0ee735d1a397bc046c8917cd0bcd82a Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 7 Nov 2025 15:51:08 -0800 Subject: [PATCH 7/8] protofsm: change Receive to use AskEvent and return outbox In this commit, we modify the Receive method that implements the actor.ActorBehavior interface to use the AskEvent pattern instead of the fire-and-forget SendEvent approach. This fundamental change enables proper event propagation through actor hierarchies when state machines are composed. Previously, Receive would send events directly to the events channel and return a boolean indicating success. This provided no visibility into what happened during event processing or what events were emitted as a result. The fire-and-forget nature meant that parent actors had no way to receive outbox events from nested state machines. With this change, Receive now delegates to AskEvent and awaits the Future, returning fn.Result[[]Event] instead of fn.Result[bool]. This means the actor system can now propagate events upward through the actor hierarchy. When a state machine is embedded as an actor and processes a message, any outbox events it emits are returned to the calling actor for further processing. The implementation becomes remarkably simple as AskEvent already handles all the complexity of context cancellation, shutdown detection, and promise completion. We simply invoke AskEvent with the actor's context and the incoming event, then await and return the result directly. This change completes the outbox pattern implementation by ensuring that state machines used as actors can participate in event propagation, enabling clean composition of state machines in actor-based systems. --- protofsm/state_machine.go | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/protofsm/state_machine.go b/protofsm/state_machine.go index c8d7850ba3b..984abe7ee76 100644 --- a/protofsm/state_machine.go +++ b/protofsm/state_machine.go @@ -340,24 +340,25 @@ func (s *StateMachine[Event, Env]) AskEvent(ctx context.Context, return promise.Future() } -// Receive processes a message and returns a Result. The provided context is the -// actor's internal context, which can be used to detect actor shutdown -// requests. +// Receive processes a message and returns a Result containing the accumulated +// outbox events from the state machine. The provided context is the actor's +// internal context, which can be used to detect actor shutdown requests. +// +// This method uses the AskEvent pattern to wait for the event to be fully +// processed and collect any outbox events emitted during state transitions. +// This enables the actor system to propagate events from nested state machines +// up through the actor hierarchy. // // NOTE: This implements the actor.ActorBehavior interface. func (s *StateMachine[Event, Env]) Receive(ctx context.Context, - e ActorMessage[Event]) fn.Result[bool] { - - select { - case s.events <- e.Event: - return fn.Ok(true) + e ActorMessage[Event]) fn.Result[[]Event] { - case <-ctx.Done(): - return fn.Err[bool](ctx.Err()) + // Use AskEvent to process the event and get the outbox events back. + future := s.AskEvent(ctx, e.Event) - case <-s.quit: - return fn.Err[bool](ErrStateMachineShutdown) - } + // Await the result which will contain the accumulated outbox events + // from all state transitions triggered by this event. + return future.Await(ctx) } // CanHandle returns true if the target message can be routed to the state From 6d762b68af217e634c979faa4dee450ab969ce0b Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 12 Nov 2025 17:42:46 -0800 Subject: [PATCH 8/8] actor: add transform function for tall refs This is useful when you want to be able to take a message type A from an actor, and adapt it to message type B. --- actor/transform.go | 58 ++++++++++++ actor/transform_test.go | 201 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 259 insertions(+) create mode 100644 actor/transform.go create mode 100644 actor/transform_test.go diff --git a/actor/transform.go b/actor/transform.go new file mode 100644 index 00000000000..055e0f3382a --- /dev/null +++ b/actor/transform.go @@ -0,0 +1,58 @@ +package actor + +import "context" + +// MapInputRef wraps a TellOnlyRef and transforms incoming messages before +// forwarding them to the target ref. This allows adapting a ref that expects +// message type Out to accept message type In, eliminating the need for +// intermediate adapter actors. +// +// This is particularly useful for notification patterns where a source actor +// sends events of a specific type, but consumers want to receive events in +// their own domain-specific type. +// +// Example usage: +// +// // roundActorRef accepts round.ConfirmationEvent +// // chainsource sends chainsource.ConfirmationEvent +// adaptedRef := actor.NewMapInputRef( +// roundActorRef, +// func(cs chainsource.ConfirmationEvent) round.ConfirmationEvent { +// return round.ConfirmationEvent{ +// TxID: cs.Txid, +// BlockHeight: cs.BlockHeight, +// // ... transform fields +// } +// }, +// ) +// // Now adaptedRef can be used as TellOnlyRef[chainsource.ConfirmationEvent] +type MapInputRef[In Message, Out Message] struct { + targetRef TellOnlyRef[Out] + mapFn func(In) Out +} + +// NewMapInputRef creates a new message-transforming wrapper around a +// TellOnlyRef. The mapFn function is called for each message to transform it +// from type In to type Out before forwarding to targetRef. +func NewMapInputRef[In Message, Out Message]( + targetRef TellOnlyRef[Out], mapFn func(In) Out) *MapInputRef[In, Out] { + + return &MapInputRef[In, Out]{ + targetRef: targetRef, + mapFn: mapFn, + } +} + +// Tell transforms the incoming message using the map function and forwards it +// to the target ref. If the context is cancelled before the message can be sent +// to the target actor's mailbox, the message may be dropped. +func (m *MapInputRef[In, Out]) Tell(ctx context.Context, msg In) { + transformed := m.mapFn(msg) + m.targetRef.Tell(ctx, transformed) +} + +// ID returns a unique identifier for this actor. The ID includes the +// "map-input-" prefix to indicate this is a transformation wrapper. +func (m *MapInputRef[In, Out]) ID() string { + return "map-input-" + m.targetRef.ID() +} diff --git a/actor/transform_test.go b/actor/transform_test.go new file mode 100644 index 00000000000..93f3915c482 --- /dev/null +++ b/actor/transform_test.go @@ -0,0 +1,201 @@ +package actor + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +// testMessageA is a test message type for transformation testing. +type testMessageA struct { + BaseMessage + Value int + Text string +} + +// MessageType returns the message type identifier. +func (m testMessageA) MessageType() string { + return "testMessageA" +} + +// testMessageB is another test message type for transformation testing. +type testMessageB struct { + BaseMessage + DoubledValue int + UpperText string +} + +// MessageType returns the message type identifier. +func (m testMessageB) MessageType() string { + return "testMessageB" +} + +// mockTellOnlyRef is a mock implementation of TellOnlyRef for testing. +type mockTellOnlyRef[M Message] struct { + id string + received []M +} + +func (m *mockTellOnlyRef[M]) Tell(ctx context.Context, msg M) { + m.received = append(m.received, msg) +} + +func (m *mockTellOnlyRef[M]) ID() string { + return m.id +} + +// TestMapInputRefBasicTransformation tests that messages are correctly +// transformed when sent through a MapInputRef. +func TestMapInputRefBasicTransformation(t *testing.T) { + t.Parallel() + + // Create a mock target ref that expects testMessageB. + targetRef := &mockTellOnlyRef[testMessageB]{ + id: "test-target", + } + + // Create a transformation function from A to B. + transformFn := func(a testMessageA) testMessageB { + return testMessageB{ + DoubledValue: a.Value * 2, + UpperText: a.Text + "-TRANSFORMED", + } + } + + // Create the MapInputRef that accepts testMessageA. + adaptedRef := NewMapInputRef(targetRef, transformFn) + + // Send a message of type A. + ctx := context.Background() + inputMsg := testMessageA{ + Value: 42, + Text: "hello", + } + adaptedRef.Tell(ctx, inputMsg) + + // Verify the target received the transformed message. + require.Len(t, targetRef.received, 1) + received := targetRef.received[0] + require.Equal(t, 84, received.DoubledValue) + require.Equal(t, "hello-TRANSFORMED", received.UpperText) +} + +// TestMapInputRefMultipleMessages tests that multiple messages are all +// correctly transformed. +func TestMapInputRefMultipleMessages(t *testing.T) { + t.Parallel() + + targetRef := &mockTellOnlyRef[testMessageB]{ + id: "test-target", + } + + transformFn := func(a testMessageA) testMessageB { + return testMessageB{ + DoubledValue: a.Value * 2, + UpperText: a.Text, + } + } + + adaptedRef := NewMapInputRef(targetRef, transformFn) + ctx := context.Background() + + // Send multiple messages. + messages := []testMessageA{ + {Value: 1, Text: "one"}, + {Value: 2, Text: "two"}, + {Value: 3, Text: "three"}, + } + + for _, msg := range messages { + adaptedRef.Tell(ctx, msg) + } + + // Verify all messages were transformed and received. + require.Len(t, targetRef.received, 3) + require.Equal(t, 2, targetRef.received[0].DoubledValue) + require.Equal(t, "one", targetRef.received[0].UpperText) + require.Equal(t, 4, targetRef.received[1].DoubledValue) + require.Equal(t, "two", targetRef.received[1].UpperText) + require.Equal(t, 6, targetRef.received[2].DoubledValue) + require.Equal(t, "three", targetRef.received[2].UpperText) +} + +// TestMapInputRefID tests that the ID method returns a prefixed version of +// the target ref's ID. +func TestMapInputRefID(t *testing.T) { + t.Parallel() + + targetRef := &mockTellOnlyRef[testMessageB]{ + id: "my-target-actor", + } + + transformFn := func(a testMessageA) testMessageB { + return testMessageB{} + } + + adaptedRef := NewMapInputRef(targetRef, transformFn) + + // Verify the ID includes the target ID with a prefix. + expectedID := "map-input-my-target-actor" + require.Equal(t, expectedID, adaptedRef.ID()) +} + +// TestMapInputRefTypeSafety tests that the generic type constraints ensure +// compile-time type safety. +func TestMapInputRefTypeSafety(t *testing.T) { + t.Parallel() + + // This test verifies that the type system works correctly. If this + // compiles, it proves type safety is maintained. + targetRef := &mockTellOnlyRef[testMessageB]{ + id: "test-target", + } + + // Create a MapInputRef[A, B]. + var adaptedRef TellOnlyRef[testMessageA] = NewMapInputRef( + targetRef, + func(a testMessageA) testMessageB { + return testMessageB{ + DoubledValue: a.Value, + } + }, + ) + + // Verify we can use it as a TellOnlyRef[testMessageA]. + ctx := context.Background() + adaptedRef.Tell(ctx, testMessageA{Value: 10}) + + // The fact that this compiles and runs proves type safety. + require.Len(t, targetRef.received, 1) +} + +// TestMapInputRefIdentityTransform tests that MapInputRef works when the +// input and output types are the same (identity transformation). +func TestMapInputRefIdentityTransform(t *testing.T) { + t.Parallel() + + targetRef := &mockTellOnlyRef[testMessageA]{ + id: "test-target", + } + + // Identity transformation: A -> A with modified value. + transformFn := func(a testMessageA) testMessageA { + a.Value = a.Value + 100 + return a + } + + adaptedRef := NewMapInputRef(targetRef, transformFn) + + ctx := context.Background() + inputMsg := testMessageA{ + Value: 5, + Text: "test", + } + adaptedRef.Tell(ctx, inputMsg) + + // Verify the transformation was applied. + require.Len(t, targetRef.received, 1) + require.Equal(t, 105, targetRef.received[0].Value) + require.Equal(t, "test", targetRef.received[0].Text) +}