diff --git a/callback.go b/callback.go index 0ad378d..f80008c 100644 --- a/callback.go +++ b/callback.go @@ -63,6 +63,11 @@ func processCallback[Type any, Status StatusType]( return nil } + if wr.RunState.Stopped() { + // Skip processing of stopped workflow records to match step consumer behaviour. + return nil + } + run, err := buildRun[Type, Status](w.newRunObj(), store, wr) if err != nil { return err diff --git a/callback_internal_test.go b/callback_internal_test.go index ca2a941..93b469b 100644 --- a/callback_internal_test.go +++ b/callback_internal_test.go @@ -201,4 +201,54 @@ func TestProcessCallback(t *testing.T) { err := processCallback(ctx, w, statusStart, nil, current.ForeignID, nil, latestLookup, nil, nil) require.NoError(t, err) }) + + t.Run("Skip if record is stopped (paused)", func(t *testing.T) { + stoppedRecord := &Record{ + WorkflowName: "example", + ForeignID: "32948623984623", + RunID: "JHFJDS-LSFKHJSLD-KSJDBLSL", + RunState: RunStatePaused, + Status: int(statusStart), + Object: b, + } + + callbackCalled := false + callbackFn := CallbackFunc[string, testStatus](func(ctx context.Context, r *Run[string, testStatus], reader io.Reader) (testStatus, error) { + callbackCalled = true + return statusEnd, nil + }) + + latestLookup := func(ctx context.Context, workflowName, foreignID string) (*Record, error) { + return stoppedRecord, nil + } + + err := processCallback(ctx, w, statusStart, callbackFn, current.ForeignID, nil, latestLookup, nil, nil) + require.NoError(t, err) + require.False(t, callbackCalled, "callback should not be called for stopped records") + }) + + t.Run("Skip if record is stopped (cancelled)", func(t *testing.T) { + cancelledRecord := &Record{ + WorkflowName: "example", + ForeignID: "32948623984623", + RunID: "JHFJDS-LSFKHJSLD-KSJDBLSL", + RunState: RunStateCancelled, + Status: int(statusStart), + Object: b, + } + + callbackCalled := false + callbackFn := CallbackFunc[string, testStatus](func(ctx context.Context, r *Run[string, testStatus], reader io.Reader) (testStatus, error) { + callbackCalled = true + return statusEnd, nil + }) + + latestLookup := func(ctx context.Context, workflowName, foreignID string) (*Record, error) { + return cancelledRecord, nil + } + + err := processCallback(ctx, w, statusStart, callbackFn, current.ForeignID, nil, latestLookup, nil, nil) + require.NoError(t, err) + require.False(t, callbackCalled, "callback should not be called for cancelled records") + }) }