Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 31 additions & 42 deletions adapters/wredis/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import (
const (
defaultListLimit = 25
// Key prefixes
recordKeyPrefix = "workflow:record:"
indexKeyPrefix = "workflow:index:"
outboxKeyPrefix = "workflow:outbox:"
recordKeyPrefix = "workflow:record:"
indexKeyPrefix = "workflow:index:"
outboxKeyPrefix = "workflow:outbox:"
outboxReverseKeyPrefix = "workflow:outbox-reverse:"
)

type Store struct {
Expand All @@ -38,11 +39,13 @@ var (
local list_key = KEYS[3]
local global_list_key = KEYS[4]
local outbox_key = KEYS[5]
local reverse_index_key = KEYS[6]

local record_data = ARGV[1]
local run_id = ARGV[2]
local score = ARGV[3]
local outbox_data = ARGV[4]
local outbox_event_id = ARGV[5]

-- Store record
redis.call('SET', record_key, record_data)
Expand All @@ -57,6 +60,9 @@ var (
-- Add to outbox
redis.call('LPUSH', outbox_key, outbox_data)

-- Store reverse index: outbox event ID -> workflow outbox key
redis.call('SET', reverse_index_key, outbox_key)

return 'OK'
`)

Expand Down Expand Up @@ -114,13 +120,14 @@ func (s *Store) Store(ctx context.Context, record *workflow.Record) error {
listKey := "workflow:list:" + record.WorkflowName
globalListKey := "workflow:list:all"
outboxKey := outboxKeyPrefix + record.WorkflowName
reverseIndexKey := outboxReverseKeyPrefix + eventData.ID

score := strconv.FormatFloat(float64(record.CreatedAt.Unix()), 'f', -1, 64)

// Execute Lua script atomically
return storeScript.Run(ctx, s.client,
[]string{recordKey, indexKey, listKey, globalListKey, outboxKey},
string(recordData), record.RunID, score, string(outboxData)).Err()
[]string{recordKey, indexKey, listKey, globalListKey, outboxKey, reverseIndexKey},
string(recordData), record.RunID, score, string(outboxData), eventData.ID).Err()
}

// Lookup implements the RecordStore interface
Expand Down Expand Up @@ -283,48 +290,30 @@ func (s *Store) ListOutboxEvents(ctx context.Context, workflowName string, limit

// DeleteOutboxEvent implements the RecordStore interface
func (s *Store) DeleteOutboxEvent(ctx context.Context, id string) error {
// We need to check all workflow outbox lists to find the event with this ID
// Since we don't know which workflow this event belongs to, we'll need to check all of them
// For production use, you should maintain a reverse index (outboxEventID → workflowName)
// or include workflowName in the event ID to target the single outbox key directly

// Use SCAN instead of KEYS to avoid blocking Redis
pattern := outboxKeyPrefix + "*"
var cursor uint64
var luaErrors []error

for {
keys, nextCursor, err := s.client.Scan(ctx, cursor, pattern, 10).Result()
if err != nil {
return err
}
reverseKey := outboxReverseKeyPrefix + id

// Try to delete from each outbox list using the Lua script
for _, outboxKey := range keys {
result, err := deleteOutboxScript.Run(ctx, s.client, []string{outboxKey}, id).Int()
if err != nil {
// Log Lua script execution errors instead of silently ignoring them
luaErrors = append(luaErrors, err)
continue
}
if result == 1 {
// Successfully found and deleted the event
return nil
}
}
// Look up which outbox key this event belongs to via the reverse index.
outboxKey, err := s.client.Get(ctx, reverseKey).Result()
if err == redis.Nil {
// No reverse index found — event may have already been deleted.
return nil
} else if err != nil {
return err
}

cursor = nextCursor
if cursor == 0 {
break
}
// Delete the event from the outbox list.
result, err := deleteOutboxScript.Run(ctx, s.client, []string{outboxKey}, id).Int()
if err != nil {
return err
}

// If we had Lua script errors but didn't find the event, report the errors
if len(luaErrors) > 0 {
// Return the first Lua error to make failures visible
return luaErrors[0]
// Clean up the reverse index regardless of whether the event was found.
s.client.Del(ctx, reverseKey)

if result == 0 {
// Event was not found in the outbox — already deleted.
return nil
}

// Event not found in any outbox - this is not necessarily an error
return nil
}
114 changes: 114 additions & 0 deletions adapters/wredis/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,117 @@ func TestRedisRecordStore(t *testing.T) {

adaptertest.RunRecordStoreTest(t, factory)
}

func TestDeleteOutboxEvent_NotFound(t *testing.T) {
ctx := t.Context()

redisInstance, err := rediscontainer.Run(ctx, "redis:7-alpine")
testcontainers.CleanupContainer(t, redisInstance)
require.NoError(t, err)

host, err := redisInstance.Host(ctx)
require.NoError(t, err)

port, err := redisInstance.MappedPort(ctx, "6379/tcp")
require.NoError(t, err)

client := redis.NewClient(&redis.Options{
Addr: host + ":" + port.Port(),
})

store := wredis.New(client)

// Deleting a non-existent event should not error (reverse index miss).
err = store.DeleteOutboxEvent(ctx, "non-existent-id")
require.NoError(t, err)
}

func TestDeleteOutboxEvent_DoubleDelete(t *testing.T) {
ctx := t.Context()

redisInstance, err := rediscontainer.Run(ctx, "redis:7-alpine")
testcontainers.CleanupContainer(t, redisInstance)
require.NoError(t, err)

host, err := redisInstance.Host(ctx)
require.NoError(t, err)

port, err := redisInstance.MappedPort(ctx, "6379/tcp")
require.NoError(t, err)

client := redis.NewClient(&redis.Options{
Addr: host + ":" + port.Port(),
})

store := wredis.New(client)

// Store a record to create an outbox event.
record := &workflow.Record{
WorkflowName: "test-workflow",
ForeignID: "foreign-1",
RunID: "run-1",
RunState: 1,
Status: 1,
Object: []byte(`{}`),
}
err = store.Store(ctx, record)
require.NoError(t, err)

// List the outbox events to get the ID.
events, err := store.ListOutboxEvents(ctx, "test-workflow", 10)
require.NoError(t, err)
require.Len(t, events, 1)

eventID := events[0].ID

// First delete should succeed.
err = store.DeleteOutboxEvent(ctx, eventID)
require.NoError(t, err)

// Second delete should also succeed (idempotent — reverse index cleaned up,
// so this hits the redis.Nil path).
err = store.DeleteOutboxEvent(ctx, eventID)
require.NoError(t, err)

// Outbox should be empty.
events, err = store.ListOutboxEvents(ctx, "test-workflow", 10)
require.NoError(t, err)
require.Empty(t, events)
}

func TestDeleteOutboxEvent_StaleReverseIndex(t *testing.T) {
ctx := t.Context()

redisInstance, err := rediscontainer.Run(ctx, "redis:7-alpine")
testcontainers.CleanupContainer(t, redisInstance)
require.NoError(t, err)

host, err := redisInstance.Host(ctx)
require.NoError(t, err)

port, err := redisInstance.MappedPort(ctx, "6379/tcp")
require.NoError(t, err)

client := redis.NewClient(&redis.Options{
Addr: host + ":" + port.Port(),
})

// Simulate a stale reverse index: the reverse key exists but the event
// is no longer in the outbox list (e.g. manually removed or race condition).
outboxKey := "workflow:outbox:test-workflow"
reverseKey := "workflow:outbox-reverse:stale-event-id"
err = client.Set(ctx, reverseKey, outboxKey, 0).Err()
require.NoError(t, err)

store := wredis.New(client)

// Should succeed — the Lua script returns 0 (not found in list) which is
// handled gracefully.
err = store.DeleteOutboxEvent(ctx, "stale-event-id")
require.NoError(t, err)

// Reverse index should be cleaned up even though the event wasn't in the list.
exists, err := client.Exists(ctx, reverseKey).Result()
require.NoError(t, err)
require.Equal(t, int64(0), exists)
}
Loading