From ed9763805cf4699c0212e438390f3cb1b655b7c2 Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Tue, 10 Mar 2026 09:37:12 +0000 Subject: [PATCH 1/2] Replace SCAN-based outbox delete with reverse index in Redis store Co-Authored-By: Claude Opus 4.6 --- adapters/wredis/store.go | 73 +++++++++++++++++----------------------- 1 file changed, 31 insertions(+), 42 deletions(-) diff --git a/adapters/wredis/store.go b/adapters/wredis/store.go index d248559..0d2fc7c 100644 --- a/adapters/wredis/store.go +++ b/adapters/wredis/store.go @@ -13,9 +13,10 @@ import ( const ( defaultListLimit = 25 // Key prefixes - recordKeyPrefix = "workflow:record:" - indexKeyPrefix = "workflow:index:" - outboxKeyPrefix = "workflow:outbox:" + recordKeyPrefix = "workflow:record:" + indexKeyPrefix = "workflow:index:" + outboxKeyPrefix = "workflow:outbox:" + outboxReverseKeyPrefix = "workflow:outbox-reverse:" ) type Store struct { @@ -38,11 +39,13 @@ var ( local list_key = KEYS[3] local global_list_key = KEYS[4] local outbox_key = KEYS[5] + local reverse_index_key = KEYS[6] local record_data = ARGV[1] local run_id = ARGV[2] local score = ARGV[3] local outbox_data = ARGV[4] + local outbox_event_id = ARGV[5] -- Store record redis.call('SET', record_key, record_data) @@ -57,6 +60,9 @@ var ( -- Add to outbox redis.call('LPUSH', outbox_key, outbox_data) + -- Store reverse index: outbox event ID -> workflow outbox key + redis.call('SET', reverse_index_key, outbox_key) + return 'OK' `) @@ -114,13 +120,14 @@ func (s *Store) Store(ctx context.Context, record *workflow.Record) error { listKey := "workflow:list:" + record.WorkflowName globalListKey := "workflow:list:all" outboxKey := outboxKeyPrefix + record.WorkflowName + reverseIndexKey := outboxReverseKeyPrefix + eventData.ID score := strconv.FormatFloat(float64(record.CreatedAt.Unix()), 'f', -1, 64) // Execute Lua script atomically return storeScript.Run(ctx, s.client, - []string{recordKey, indexKey, listKey, globalListKey, outboxKey}, - string(recordData), record.RunID, score, string(outboxData)).Err() + []string{recordKey, indexKey, listKey, globalListKey, outboxKey, reverseIndexKey}, + string(recordData), record.RunID, score, string(outboxData), eventData.ID).Err() } // Lookup implements the RecordStore interface @@ -283,48 +290,30 @@ func (s *Store) ListOutboxEvents(ctx context.Context, workflowName string, limit // DeleteOutboxEvent implements the RecordStore interface func (s *Store) DeleteOutboxEvent(ctx context.Context, id string) error { - // We need to check all workflow outbox lists to find the event with this ID - // Since we don't know which workflow this event belongs to, we'll need to check all of them - // For production use, you should maintain a reverse index (outboxEventID → workflowName) - // or include workflowName in the event ID to target the single outbox key directly - - // Use SCAN instead of KEYS to avoid blocking Redis - pattern := outboxKeyPrefix + "*" - var cursor uint64 - var luaErrors []error - - for { - keys, nextCursor, err := s.client.Scan(ctx, cursor, pattern, 10).Result() - if err != nil { - return err - } + reverseKey := outboxReverseKeyPrefix + id - // Try to delete from each outbox list using the Lua script - for _, outboxKey := range keys { - result, err := deleteOutboxScript.Run(ctx, s.client, []string{outboxKey}, id).Int() - if err != nil { - // Log Lua script execution errors instead of silently ignoring them - luaErrors = append(luaErrors, err) - continue - } - if result == 1 { - // Successfully found and deleted the event - return nil - } - } + // Look up which outbox key this event belongs to via the reverse index. + outboxKey, err := s.client.Get(ctx, reverseKey).Result() + if err == redis.Nil { + // No reverse index found — event may have already been deleted. + return nil + } else if err != nil { + return err + } - cursor = nextCursor - if cursor == 0 { - break - } + // Delete the event from the outbox list. + result, err := deleteOutboxScript.Run(ctx, s.client, []string{outboxKey}, id).Int() + if err != nil { + return err } - // If we had Lua script errors but didn't find the event, report the errors - if len(luaErrors) > 0 { - // Return the first Lua error to make failures visible - return luaErrors[0] + // Clean up the reverse index regardless of whether the event was found. + s.client.Del(ctx, reverseKey) + + if result == 0 { + // Event was not found in the outbox — already deleted. + return nil } - // Event not found in any outbox - this is not necessarily an error return nil } From 8946426292afa651b49e4d6c2f621cc72555bba3 Mon Sep 17 00:00:00 2001 From: Andrew Wormald Date: Tue, 10 Mar 2026 10:16:32 +0000 Subject: [PATCH 2/2] Add tests for reverse-index-based DeleteOutboxEvent Co-Authored-By: Claude Opus 4.6 --- adapters/wredis/store_test.go | 114 ++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/adapters/wredis/store_test.go b/adapters/wredis/store_test.go index 5ba1822..ec55ce7 100644 --- a/adapters/wredis/store_test.go +++ b/adapters/wredis/store_test.go @@ -40,3 +40,117 @@ func TestRedisRecordStore(t *testing.T) { adaptertest.RunRecordStoreTest(t, factory) } + +func TestDeleteOutboxEvent_NotFound(t *testing.T) { + ctx := t.Context() + + redisInstance, err := rediscontainer.Run(ctx, "redis:7-alpine") + testcontainers.CleanupContainer(t, redisInstance) + require.NoError(t, err) + + host, err := redisInstance.Host(ctx) + require.NoError(t, err) + + port, err := redisInstance.MappedPort(ctx, "6379/tcp") + require.NoError(t, err) + + client := redis.NewClient(&redis.Options{ + Addr: host + ":" + port.Port(), + }) + + store := wredis.New(client) + + // Deleting a non-existent event should not error (reverse index miss). + err = store.DeleteOutboxEvent(ctx, "non-existent-id") + require.NoError(t, err) +} + +func TestDeleteOutboxEvent_DoubleDelete(t *testing.T) { + ctx := t.Context() + + redisInstance, err := rediscontainer.Run(ctx, "redis:7-alpine") + testcontainers.CleanupContainer(t, redisInstance) + require.NoError(t, err) + + host, err := redisInstance.Host(ctx) + require.NoError(t, err) + + port, err := redisInstance.MappedPort(ctx, "6379/tcp") + require.NoError(t, err) + + client := redis.NewClient(&redis.Options{ + Addr: host + ":" + port.Port(), + }) + + store := wredis.New(client) + + // Store a record to create an outbox event. + record := &workflow.Record{ + WorkflowName: "test-workflow", + ForeignID: "foreign-1", + RunID: "run-1", + RunState: 1, + Status: 1, + Object: []byte(`{}`), + } + err = store.Store(ctx, record) + require.NoError(t, err) + + // List the outbox events to get the ID. + events, err := store.ListOutboxEvents(ctx, "test-workflow", 10) + require.NoError(t, err) + require.Len(t, events, 1) + + eventID := events[0].ID + + // First delete should succeed. + err = store.DeleteOutboxEvent(ctx, eventID) + require.NoError(t, err) + + // Second delete should also succeed (idempotent — reverse index cleaned up, + // so this hits the redis.Nil path). + err = store.DeleteOutboxEvent(ctx, eventID) + require.NoError(t, err) + + // Outbox should be empty. + events, err = store.ListOutboxEvents(ctx, "test-workflow", 10) + require.NoError(t, err) + require.Empty(t, events) +} + +func TestDeleteOutboxEvent_StaleReverseIndex(t *testing.T) { + ctx := t.Context() + + redisInstance, err := rediscontainer.Run(ctx, "redis:7-alpine") + testcontainers.CleanupContainer(t, redisInstance) + require.NoError(t, err) + + host, err := redisInstance.Host(ctx) + require.NoError(t, err) + + port, err := redisInstance.MappedPort(ctx, "6379/tcp") + require.NoError(t, err) + + client := redis.NewClient(&redis.Options{ + Addr: host + ":" + port.Port(), + }) + + // Simulate a stale reverse index: the reverse key exists but the event + // is no longer in the outbox list (e.g. manually removed or race condition). + outboxKey := "workflow:outbox:test-workflow" + reverseKey := "workflow:outbox-reverse:stale-event-id" + err = client.Set(ctx, reverseKey, outboxKey, 0).Err() + require.NoError(t, err) + + store := wredis.New(client) + + // Should succeed — the Lua script returns 0 (not found in list) which is + // handled gracefully. + err = store.DeleteOutboxEvent(ctx, "stale-event-id") + require.NoError(t, err) + + // Reverse index should be cleaned up even though the event wasn't in the list. + exists, err := client.Exists(ctx, reverseKey).Result() + require.NoError(t, err) + require.Equal(t, int64(0), exists) +}