Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@

.CLAUDE/
CLAUDE.md

# Development artifacts
TODO.md
*.rdb
60 changes: 60 additions & 0 deletions brokers/in-memory/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,66 @@ func (r *Broker) GetPending(ctx context.Context, queue string) ([]string, error)
return pending, nil
}

// GetPendingWithPagination returns a paginated list of pending jobs from the in-memory queue
func (r *Broker) GetPendingWithPagination(ctx context.Context, queue string, offset, limit int) ([]string, int64, error) {
r.pmu.RLock()
pending, ok := r.pending[queue]
r.pmu.RUnlock()

if !ok {
// Queue doesn't exist yet, return empty result
return []string{}, 0, nil
}

total := int64(len(pending))
if total == 0 {
return []string{}, 0, nil
}

// Validate offset
if offset < 0 {
offset = 0
}
if int64(offset) >= total {
return []string{}, total, nil
}

// Validate limit
if limit <= 0 {
limit = 100 // Default limit
}
// Cap maximum limit to prevent abuse
if limit > 10000 {
limit = 10000
}

// Calculate end index
end := offset + limit
if int64(end) > total {
end = int(total)
}

// Return slice of pending jobs
result := make([]string, end-offset)
copy(result, pending[offset:end])

return result, total, nil
}

// GetPendingCount returns the count of pending jobs in the in-memory queue
func (r *Broker) GetPendingCount(ctx context.Context, queue string) (int64, error) {
r.pmu.RLock()
pending, ok := r.pending[queue]
r.pmu.RUnlock()

if !ok {
// Queue doesn't exist yet, return 0 count
return 0, nil
}

return int64(len(pending)), nil
}

func (b *Broker) EnqueueScheduled(ctx context.Context, msg []byte, queue string, ts time.Time) error {
return fmt.Errorf("in-memory broker does not support this method")
}
10 changes: 10 additions & 0 deletions brokers/nats-js/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,16 @@ func (b *Broker) GetPending(ctx context.Context, queue string) ([]string, error)
return nil, fmt.Errorf("nats broker does not support this method")
}

// GetPendingWithPagination is not supported for NATS broker
func (b *Broker) GetPendingWithPagination(ctx context.Context, queue string, offset, limit int) ([]string, int64, error) {
return nil, 0, fmt.Errorf("nats broker does not support this method")
}

// GetPendingCount is not supported for NATS broker
func (b *Broker) GetPendingCount(ctx context.Context, queue string) (int64, error) {
return 0, fmt.Errorf("nats broker does not support this method")
}

func (b *Broker) EnqueueScheduled(ctx context.Context, msg []byte, queue string, ts time.Time) error {
return fmt.Errorf("nats broker does not support this method")
}
57 changes: 57 additions & 0 deletions brokers/redis/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,63 @@ func (r *Broker) GetPending(ctx context.Context, queue string) ([]string, error)
return rs, nil
}

// GetPendingWithPagination returns a paginated list of pending jobs from the Redis queue
func (r *Broker) GetPendingWithPagination(ctx context.Context, queue string, offset, limit int) ([]string, int64, error) {
r.lo.Debug("getting pending jobs with pagination", "queue", queue, "offset", offset, "limit", limit)

// Get total count
total, err := r.conn.LLen(ctx, queue).Result()
if err != nil {
return nil, 0, err
}

if total == 0 {
return []string{}, 0, nil
}

// Validate offset
if offset < 0 {
offset = 0
}
if int64(offset) >= total {
return []string{}, total, nil
}

// Validate limit
if limit <= 0 {
limit = 100 // Default limit
}
// Cap maximum limit to prevent abuse
if limit > 10000 {
limit = 10000
}

// Calculate end index for LRANGE (inclusive)
end := offset + limit - 1

// Get paginated results
rs, err := r.conn.LRange(ctx, queue, int64(offset), int64(end)).Result()
if err == redis.Nil {
return []string{}, total, nil
} else if err != nil {
return nil, 0, err
}

return rs, total, nil
}

// GetPendingCount returns the count of pending jobs in the Redis queue
func (r *Broker) GetPendingCount(ctx context.Context, queue string) (int64, error) {
r.lo.Debug("getting pending jobs count", "queue", queue)

count, err := r.conn.LLen(ctx, queue).Result()
if err != nil {
return 0, err
}

return count, nil
}

func (b *Broker) Enqueue(ctx context.Context, msg []byte, queue string) error {
if b.opts.PipePeriod != 0 {
return b.pipe.LPush(ctx, queue, msg).Err()
Expand Down
8 changes: 4 additions & 4 deletions examples/redis/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ func main() {
DB: 0,
}, lo),
Results: rr.New(rr.Options{
Addrs: []string{"127.0.0.1:6379"},
Password: "",
DB: 0,
MetaExpiry: time.Second * 5,
Addrs: []string{"127.0.0.1:6379"},
Password: "",
DB: 0,
//MetaExpiry: time.Second * 5,
}, lo),
Logger: lo.Handler(),
})
Expand Down
16 changes: 16 additions & 0 deletions interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ type Results interface {
GetSuccess(ctx context.Context) ([]string, error)
SetFailed(ctx context.Context, id string) error
SetSuccess(ctx context.Context, id string) error

// Task management methods for external UI access
SetTask(ctx context.Context, name string, task []byte) error
GetTask(ctx context.Context, name string) ([]byte, error)
GetAllTasks(ctx context.Context) ([][]byte, error)
DeleteTask(ctx context.Context, name string) error
}

type Broker interface {
Expand All @@ -30,5 +36,15 @@ type Broker interface {
Consume(ctx context.Context, work chan []byte, queue string)

// GetPending returns a list of stored job messages on the particular queue
// Deprecated: Use GetPendingWithPagination for better performance with large queues
GetPending(ctx context.Context, queue string) ([]string, error)

// GetPendingWithPagination returns a paginated list of stored job messages on the particular queue
// offset: the starting index (0-based)
// limit: maximum number of items to return
// Returns: job messages, total count, error
GetPendingWithPagination(ctx context.Context, queue string, offset, limit int) ([]string, int64, error)

// GetPendingCount returns the count of pending jobs in the queue without fetching the actual jobs
GetPendingCount(ctx context.Context, queue string) (int64, error)
}
45 changes: 45 additions & 0 deletions results/in-memory/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ type Results struct {
store map[string][]byte
failed map[string]struct{}
success map[string]struct{}
tasks map[string][]byte
}

func New() *Results {
return &Results{
store: make(map[string][]byte),
failed: make(map[string]struct{}),
success: make(map[string]struct{}),
tasks: make(map[string][]byte),
}
}

Expand Down Expand Up @@ -93,3 +95,46 @@ func (r *Results) GetFailed(_ context.Context) ([]string, error) {

return fl, nil
}

// SetTask stores task metadata in memory
func (r *Results) SetTask(_ context.Context, name string, task []byte) error {
r.mu.Lock()
r.tasks[name] = task
r.mu.Unlock()

return nil
}

// GetTask retrieves task metadata from memory
func (r *Results) GetTask(_ context.Context, name string) ([]byte, error) {
r.mu.Lock()
task, ok := r.tasks[name]
r.mu.Unlock()

if !ok {
return nil, errNotFound
}

return task, nil
}

// GetAllTasks retrieves all task metadata from memory
func (r *Results) GetAllTasks(_ context.Context) ([][]byte, error) {
r.mu.Lock()
tasks := make([][]byte, 0, len(r.tasks))
for _, task := range r.tasks {
tasks = append(tasks, task)
}
r.mu.Unlock()

return tasks, nil
}

// DeleteTask removes task metadata from memory
func (r *Results) DeleteTask(_ context.Context, name string) error {
r.mu.Lock()
delete(r.tasks, name)
r.mu.Unlock()

return nil
}
Loading