Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 83 additions & 43 deletions sequencers/single/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package single

import (
"context"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"strconv"
"sync"

ds "github.com/ipfs/go-datastore"
Expand All @@ -20,30 +22,56 @@ import (
// ErrQueueFull is returned when the batch queue has reached its maximum size
var ErrQueueFull = errors.New("batch queue is full")

// initialSeqNum is the starting sequence number for new queues.
// It is set to the middle of the uint64 range to allow for both
// appending (incrementing) and prepending (decrementing) transactions.
const initialSeqNum = uint64(0x8000000000000000)

func newPrefixKV(kvStore ds.Batching, prefix string) ds.Batching {
return ktds.Wrap(kvStore, ktds.PrefixTransform{Prefix: ds.NewKey(prefix)})
}

// queuedItem holds a batch and its associated persistence key
type queuedItem struct {
Batch coresequencer.Batch
Key string
}

// BatchQueue implements a persistent queue for transaction batches
type BatchQueue struct {
queue []coresequencer.Batch
queue []queuedItem
head int // index of the first element in the queue
maxQueueSize int // maximum number of batches allowed in queue (0 = unlimited)
mu sync.Mutex
db ds.Batching

// Sequence numbers for generating new keys
nextAddSeq uint64
nextPrependSeq uint64

mu sync.Mutex
db ds.Batching
}

// NewBatchQueue creates a new BatchQueue with the specified maximum size.
// If maxSize is 0, the queue will be unlimited.
func NewBatchQueue(db ds.Batching, prefix string, maxSize int) *BatchQueue {
return &BatchQueue{
queue: make([]coresequencer.Batch, 0),
head: 0,
maxQueueSize: maxSize,
db: newPrefixKV(db, prefix),
queue: make([]queuedItem, 0),
head: 0,
maxQueueSize: maxSize,
db: newPrefixKV(db, prefix),
nextAddSeq: initialSeqNum,
nextPrependSeq: initialSeqNum - 1,
}
}

// seqToKey converts a sequence number to a hex-encoded big-endian key.
// We use big-endian so that lexicographical sort order matches numeric order.
func seqToKey(seq uint64) string {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, seq)
return hex.EncodeToString(b)
}

// AddBatch adds a new transaction to the queue and writes it to the WAL.
// Returns ErrQueueFull if the queue has reached its maximum size.
func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) error {
Expand All @@ -57,12 +85,14 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e
return ErrQueueFull
}

if err := bq.persistBatch(ctx, batch); err != nil {
key := seqToKey(bq.nextAddSeq)
if err := bq.persistBatch(ctx, batch, key); err != nil {
return err
}
bq.nextAddSeq++

// Then add to in-memory queue
bq.queue = append(bq.queue, batch)
bq.queue = append(bq.queue, queuedItem{Batch: batch, Key: key})

return nil
}
Expand All @@ -72,25 +102,27 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e
// The batch is persisted to the DB to ensure durability in case of crashes.
//
// NOTE: Prepend intentionally bypasses the maxQueueSize limit to ensure high-priority
// transactions can always be re-queued. This means the effective queue size may temporarily
// exceed the configured maximum when Prepend is used. This is by design to prevent loss
// of transactions that have already been accepted but couldn't fit in the current batch.
// transactions can always be re-queued.
func (bq *BatchQueue) Prepend(ctx context.Context, batch coresequencer.Batch) error {
bq.mu.Lock()
defer bq.mu.Unlock()

if err := bq.persistBatch(ctx, batch); err != nil {
key := seqToKey(bq.nextPrependSeq)
if err := bq.persistBatch(ctx, batch, key); err != nil {
return err
}
bq.nextPrependSeq--

item := queuedItem{Batch: batch, Key: key}

// Then add to in-memory queue
// If we have room before head, use it
if bq.head > 0 {
bq.head--
bq.queue[bq.head] = batch
bq.queue[bq.head] = item
} else {
// Need to expand the queue at the front
bq.queue = append([]coresequencer.Batch{batch}, bq.queue...)
bq.queue = append([]queuedItem{item}, bq.queue...)
}

return nil
Expand All @@ -106,8 +138,9 @@ func (bq *BatchQueue) Next(ctx context.Context) (*coresequencer.Batch, error) {
return &coresequencer.Batch{Transactions: nil}, nil
}

batch := bq.queue[bq.head]
bq.queue[bq.head] = coresequencer.Batch{} // Release memory for the dequeued element
item := bq.queue[bq.head]
// Release memory for the dequeued element
bq.queue[bq.head] = queuedItem{}
bq.head++

// Compact when head gets too large to prevent memory leaks
Expand All @@ -116,59 +149,72 @@ func (bq *BatchQueue) Next(ctx context.Context) (*coresequencer.Batch, error) {
// frequent compactions on small queues
if bq.head > len(bq.queue)/2 && bq.head > 100 {
remaining := copy(bq.queue, bq.queue[bq.head:])
// Zero out the rest of the slice to release memory
// Zero out the rest of the slice
for i := remaining; i < len(bq.queue); i++ {
bq.queue[i] = coresequencer.Batch{}
bq.queue[i] = queuedItem{}
}
bq.queue = bq.queue[:remaining]
bq.head = 0
}

hash, err := batch.Hash()
if err != nil {
return &coresequencer.Batch{Transactions: nil}, err
}
key := hex.EncodeToString(hash)

// Delete the batch from the WAL since it's been processed
err = bq.db.Delete(ctx, ds.NewKey(key))
if err != nil {
// Use the stored key directly
if err := bq.db.Delete(ctx, ds.NewKey(item.Key)); err != nil {
// Log the error but continue
fmt.Printf("Error deleting processed batch: %v\n", err)
}

return &batch, nil
return &item.Batch, nil
}

// Load reloads all batches from WAL file into the in-memory queue after a crash or restart
func (bq *BatchQueue) Load(ctx context.Context) error {
bq.mu.Lock()
defer bq.mu.Unlock()

// Clear the current queue
bq.queue = make([]coresequencer.Batch, 0)
// Clear the current queue and reset sequences
bq.queue = make([]queuedItem, 0)
bq.head = 0
bq.nextAddSeq = initialSeqNum
bq.nextPrependSeq = initialSeqNum - 1

q := query.Query{}
q := query.Query{
Orders: []query.Order{query.OrderByKey{}},
}
results, err := bq.db.Query(ctx, q)
if err != nil {
return fmt.Errorf("error querying datastore: %w", err)
}
defer results.Close()

// Load each batch
for result := range results.Next() {
if result.Error != nil {
fmt.Printf("Error reading entry from datastore: %v\n", result.Error)
continue
}
// We care about the last part of the key (the sequence number)
// ds.Key usually has a leading slash.
key := ds.NewKey(result.Key).Name()

pbBatch := &pb.Batch{}
err := proto.Unmarshal(result.Value, pbBatch)
if err != nil {
fmt.Printf("Error decoding batch for key '%s': %v. Skipping entry.\n", result.Key, err)
fmt.Printf("Error decoding batch for key '%s': %v. Skipping entry.\n", key, err)
continue
}
bq.queue = append(bq.queue, coresequencer.Batch{Transactions: pbBatch.Txs})

batch := coresequencer.Batch{Transactions: pbBatch.Txs}
bq.queue = append(bq.queue, queuedItem{Batch: batch, Key: key})

// Update sequences based on loaded keys to avoid collisions
if seq, err := strconv.ParseUint(key, 16, 64); err == nil {
if seq >= bq.nextAddSeq {
bq.nextAddSeq = seq + 1
}
if seq <= bq.nextPrependSeq {
bq.nextPrependSeq = seq - 1
}
}
}
Comment on lines 187 to 244
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Throughout the Load and Next functions, errors are logged using fmt.Printf. In a library or production service, it's better to use a structured logging library (e.g., log/slog, zap, zerolog). This provides several benefits:

  • Controllable Output: Consumers of the package can direct logs to files, network services, etc.
  • Log Levels: You can assign severity levels (e.g., error, warn, info) to messages.
  • Structured Data: You can include key-value pairs (like the key in this case) for easier parsing and filtering by log analysis tools.

Consider injecting a logger into the BatchQueue to handle these cases.


return nil
Expand All @@ -182,14 +228,8 @@ func (bq *BatchQueue) Size() int {
return len(bq.queue) - bq.head
}

// persistBatch persists a batch to the datastore
func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batch) error {
hash, err := batch.Hash()
if err != nil {
return err
}
key := hex.EncodeToString(hash)

// persistBatch persists a batch to the datastore with the given key
func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batch, key string) error {
pbBatch := &pb.Batch{
Txs: batch.Transactions,
}
Expand All @@ -199,7 +239,7 @@ func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batc
return err
}

// First write to DB for durability
// Write to DB
if err := bq.db.Put(ctx, ds.NewKey(key), encodedBatch); err != nil {
return err
}
Expand Down
49 changes: 44 additions & 5 deletions sequencers/single/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,24 +227,27 @@ func TestLoad_WithMixedData(t *testing.T) {
require.NotNil(bq)

// 1. Add valid batch data under the correct prefix
// Use valid hex sequence keys to ensure Load parses them correctly if needed
key1 := "8000000000000001"
validBatch1 := createTestBatch(t, 3)
hash1, err := validBatch1.Hash()
require.NoError(err)
hexHash1 := hex.EncodeToString(hash1)
pbBatch1 := &pb.Batch{Txs: validBatch1.Transactions}
encodedBatch1, err := proto.Marshal(pbBatch1)
require.NoError(err)
err = rawDB.Put(ctx, ds.NewKey(queuePrefix+hexHash1), encodedBatch1)
err = rawDB.Put(ctx, ds.NewKey(queuePrefix+key1), encodedBatch1)
require.NoError(err)

key2 := "8000000000000002"
validBatch2 := createTestBatch(t, 5)
hash2, err := validBatch2.Hash()
require.NoError(err)
hexHash2 := hex.EncodeToString(hash2)
pbBatch2 := &pb.Batch{Txs: validBatch2.Transactions}
encodedBatch2, err := proto.Marshal(pbBatch2)
require.NoError(err)
err = rawDB.Put(ctx, ds.NewKey(queuePrefix+hexHash2), encodedBatch2)
err = rawDB.Put(ctx, ds.NewKey(queuePrefix+key2), encodedBatch2)
require.NoError(err)

// 3. Add data outside the queue's prefix
Expand All @@ -257,8 +260,8 @@ func TestLoad_WithMixedData(t *testing.T) {

// Ensure all data is initially present in the raw DB
initialKeys := map[string]bool{
queuePrefix + hexHash1: true,
queuePrefix + hexHash2: true,
queuePrefix + key1: true,
queuePrefix + key2: true,
otherDataKey1.String(): true,
otherDataKey2.String(): true,
}
Expand Down Expand Up @@ -286,7 +289,7 @@ func TestLoad_WithMixedData(t *testing.T) {
loadedHashes := make(map[string]bool)
bq.mu.Lock()
for i := bq.head; i < len(bq.queue); i++ {
h, _ := bq.queue[i].Hash()
h, _ := bq.queue[i].Batch.Hash()
loadedHashes[hex.EncodeToString(h)] = true
}
bq.mu.Unlock()
Expand All @@ -302,6 +305,42 @@ func TestLoad_WithMixedData(t *testing.T) {
require.Equal([]byte("more data"), val)
}

func TestBatchQueue_Load_SetsSequencesProperly(t *testing.T) {
ctx := context.Background()
db := ds.NewMapDatastore()
prefix := "test-load-sequences"

// Build some persisted state with both AddBatch and Prepend so we have
// keys on both sides of the initialSeqNum.
q1 := NewBatchQueue(db, prefix, 0)
require.NoError(t, q1.Load(ctx))

require.NoError(t, q1.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("add-1")}})) // initialSeqNum
require.NoError(t, q1.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("add-2")}})) // initialSeqNum+1

require.NoError(t, q1.Prepend(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("pre-1")}})) // initialSeqNum-1
require.NoError(t, q1.Prepend(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("pre-2")}})) // initialSeqNum-2

// Simulate restart.
q2 := NewBatchQueue(db, prefix, 0)
require.NoError(t, q2.Load(ctx))

// After Load(), the sequencers should be positioned to avoid collisions:
// - nextAddSeq should be (maxSeq + 1)
// - nextPrependSeq should be (minSeq - 1)
require.Equal(t, initialSeqNum+2, q2.nextAddSeq, "nextAddSeq should continue after the max loaded key")
require.Equal(t, initialSeqNum-3, q2.nextPrependSeq, "nextPrependSeq should continue before the min loaded key")

// Verify we actually use those sequences when persisting new items.
require.NoError(t, q2.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("add-after-load")}}))
_, err := q2.db.Get(ctx, ds.NewKey(seqToKey(initialSeqNum+2)))
require.NoError(t, err, "expected AddBatch after Load to persist using nextAddSeq key")

require.NoError(t, q2.Prepend(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("pre-after-load")}}))
_, err = q2.db.Get(ctx, ds.NewKey(seqToKey(initialSeqNum-3)))
require.NoError(t, err, "expected Prepend after Load to persist using nextPrependSeq key")
}

func TestConcurrency(t *testing.T) {
bq := setupTestQueue(t)
ctx := context.Background()
Expand Down
Loading