diff --git a/.gitignore b/.gitignore index d4c13f3..7fd7dfd 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,7 @@ .CLAUDE/ CLAUDE.md + +# Development artifacts +TODO.md +*.rdb diff --git a/brokers/in-memory/broker.go b/brokers/in-memory/broker.go index 9b8dc00..887d795 100644 --- a/brokers/in-memory/broker.go +++ b/brokers/in-memory/broker.go @@ -84,6 +84,66 @@ func (r *Broker) GetPending(ctx context.Context, queue string) ([]string, error) return pending, nil } +// GetPendingWithPagination returns a paginated list of pending jobs from the in-memory queue +func (r *Broker) GetPendingWithPagination(ctx context.Context, queue string, offset, limit int) ([]string, int64, error) { + r.pmu.RLock() + pending, ok := r.pending[queue] + r.pmu.RUnlock() + + if !ok { + // Queue doesn't exist yet, return empty result + return []string{}, 0, nil + } + + total := int64(len(pending)) + if total == 0 { + return []string{}, 0, nil + } + + // Validate offset + if offset < 0 { + offset = 0 + } + if int64(offset) >= total { + return []string{}, total, nil + } + + // Validate limit + if limit <= 0 { + limit = 100 // Default limit + } + // Cap maximum limit to prevent abuse + if limit > 10000 { + limit = 10000 + } + + // Calculate end index + end := offset + limit + if int64(end) > total { + end = int(total) + } + + // Return slice of pending jobs + result := make([]string, end-offset) + copy(result, pending[offset:end]) + + return result, total, nil +} + +// GetPendingCount returns the count of pending jobs in the in-memory queue +func (r *Broker) GetPendingCount(ctx context.Context, queue string) (int64, error) { + r.pmu.RLock() + pending, ok := r.pending[queue] + r.pmu.RUnlock() + + if !ok { + // Queue doesn't exist yet, return 0 count + return 0, nil + } + + return int64(len(pending)), nil +} + func (b *Broker) EnqueueScheduled(ctx context.Context, msg []byte, queue string, ts time.Time) error { return fmt.Errorf("in-memory broker does not support this method") } diff --git a/brokers/nats-js/broker.go b/brokers/nats-js/broker.go index 88c7de6..05128bb 100644 --- a/brokers/nats-js/broker.go +++ b/brokers/nats-js/broker.go @@ -129,6 +129,16 @@ func (b *Broker) GetPending(ctx context.Context, queue string) ([]string, error) return nil, fmt.Errorf("nats broker does not support this method") } +// GetPendingWithPagination is not supported for NATS broker +func (b *Broker) GetPendingWithPagination(ctx context.Context, queue string, offset, limit int) ([]string, int64, error) { + return nil, 0, fmt.Errorf("nats broker does not support this method") +} + +// GetPendingCount is not supported for NATS broker +func (b *Broker) GetPendingCount(ctx context.Context, queue string) (int64, error) { + return 0, fmt.Errorf("nats broker does not support this method") +} + func (b *Broker) EnqueueScheduled(ctx context.Context, msg []byte, queue string, ts time.Time) error { return fmt.Errorf("nats broker does not support this method") } diff --git a/brokers/redis/broker.go b/brokers/redis/broker.go index df8e295..f629b1b 100644 --- a/brokers/redis/broker.go +++ b/brokers/redis/broker.go @@ -102,6 +102,63 @@ func (r *Broker) GetPending(ctx context.Context, queue string) ([]string, error) return rs, nil } +// GetPendingWithPagination returns a paginated list of pending jobs from the Redis queue +func (r *Broker) GetPendingWithPagination(ctx context.Context, queue string, offset, limit int) ([]string, int64, error) { + r.lo.Debug("getting pending jobs with pagination", "queue", queue, "offset", offset, "limit", limit) + + // Get total count + total, err := r.conn.LLen(ctx, queue).Result() + if err != nil { + return nil, 0, err + } + + if total == 0 { + return []string{}, 0, nil + } + + // Validate offset + if offset < 0 { + offset = 0 + } + if int64(offset) >= total { + return []string{}, total, nil + } + + // Validate limit + if limit <= 0 { + limit = 100 // Default limit + } + // Cap maximum limit to prevent abuse + if limit > 10000 { + limit = 10000 + } + + // Calculate end index for LRANGE (inclusive) + end := offset + limit - 1 + + // Get paginated results + rs, err := r.conn.LRange(ctx, queue, int64(offset), int64(end)).Result() + if err == redis.Nil { + return []string{}, total, nil + } else if err != nil { + return nil, 0, err + } + + return rs, total, nil +} + +// GetPendingCount returns the count of pending jobs in the Redis queue +func (r *Broker) GetPendingCount(ctx context.Context, queue string) (int64, error) { + r.lo.Debug("getting pending jobs count", "queue", queue) + + count, err := r.conn.LLen(ctx, queue).Result() + if err != nil { + return 0, err + } + + return count, nil +} + func (b *Broker) Enqueue(ctx context.Context, msg []byte, queue string) error { if b.opts.PipePeriod != 0 { return b.pipe.LPush(ctx, queue, msg).Err() diff --git a/examples/redis/main.go b/examples/redis/main.go index a1433bd..10dddff 100644 --- a/examples/redis/main.go +++ b/examples/redis/main.go @@ -26,10 +26,10 @@ func main() { DB: 0, }, lo), Results: rr.New(rr.Options{ - Addrs: []string{"127.0.0.1:6379"}, - Password: "", - DB: 0, - MetaExpiry: time.Second * 5, + Addrs: []string{"127.0.0.1:6379"}, + Password: "", + DB: 0, + //MetaExpiry: time.Second * 5, }, lo), Logger: lo.Handler(), }) diff --git a/interfaces.go b/interfaces.go index 8324f23..7b27823 100644 --- a/interfaces.go +++ b/interfaces.go @@ -16,6 +16,12 @@ type Results interface { GetSuccess(ctx context.Context) ([]string, error) SetFailed(ctx context.Context, id string) error SetSuccess(ctx context.Context, id string) error + + // Task management methods for external UI access + SetTask(ctx context.Context, name string, task []byte) error + GetTask(ctx context.Context, name string) ([]byte, error) + GetAllTasks(ctx context.Context) ([][]byte, error) + DeleteTask(ctx context.Context, name string) error } type Broker interface { @@ -30,5 +36,15 @@ type Broker interface { Consume(ctx context.Context, work chan []byte, queue string) // GetPending returns a list of stored job messages on the particular queue + // Deprecated: Use GetPendingWithPagination for better performance with large queues GetPending(ctx context.Context, queue string) ([]string, error) + + // GetPendingWithPagination returns a paginated list of stored job messages on the particular queue + // offset: the starting index (0-based) + // limit: maximum number of items to return + // Returns: job messages, total count, error + GetPendingWithPagination(ctx context.Context, queue string, offset, limit int) ([]string, int64, error) + + // GetPendingCount returns the count of pending jobs in the queue without fetching the actual jobs + GetPendingCount(ctx context.Context, queue string) (int64, error) } diff --git a/results/in-memory/results.go b/results/in-memory/results.go index de376c6..c7e9947 100644 --- a/results/in-memory/results.go +++ b/results/in-memory/results.go @@ -11,6 +11,7 @@ type Results struct { store map[string][]byte failed map[string]struct{} success map[string]struct{} + tasks map[string][]byte } func New() *Results { @@ -18,6 +19,7 @@ func New() *Results { store: make(map[string][]byte), failed: make(map[string]struct{}), success: make(map[string]struct{}), + tasks: make(map[string][]byte), } } @@ -93,3 +95,46 @@ func (r *Results) GetFailed(_ context.Context) ([]string, error) { return fl, nil } + +// SetTask stores task metadata in memory +func (r *Results) SetTask(_ context.Context, name string, task []byte) error { + r.mu.Lock() + r.tasks[name] = task + r.mu.Unlock() + + return nil +} + +// GetTask retrieves task metadata from memory +func (r *Results) GetTask(_ context.Context, name string) ([]byte, error) { + r.mu.Lock() + task, ok := r.tasks[name] + r.mu.Unlock() + + if !ok { + return nil, errNotFound + } + + return task, nil +} + +// GetAllTasks retrieves all task metadata from memory +func (r *Results) GetAllTasks(_ context.Context) ([][]byte, error) { + r.mu.Lock() + tasks := make([][]byte, 0, len(r.tasks)) + for _, task := range r.tasks { + tasks = append(tasks, task) + } + r.mu.Unlock() + + return tasks, nil +} + +// DeleteTask removes task metadata from memory +func (r *Results) DeleteTask(_ context.Context, name string) error { + r.mu.Lock() + delete(r.tasks, name) + r.mu.Unlock() + + return nil +} diff --git a/results/nats-js/results.go b/results/nats-js/results.go index 6c247f5..4533589 100644 --- a/results/nats-js/results.go +++ b/results/nats-js/results.go @@ -10,6 +10,8 @@ import ( const ( resultPrefix = "tasqueue-results-" + taskPrefix = "tasqueue-task-" + tasksKey = "tasqueue-tasks-list" kvBucket = "tasqueue" ) @@ -95,3 +97,166 @@ func (r *Results) GetFailed(_ context.Context) ([]string, error) { func (r *Results) DeleteJob(_ context.Context, id string) error { return r.conn.Delete(resultPrefix + id) } + +// SetTask stores task metadata in NATS KV +func (r *Results) SetTask(_ context.Context, name string, task []byte) error { + // Store the task metadata + if _, err := r.conn.Put(taskPrefix+name, task); err != nil { + return err + } + + // Get existing tasks list + entry, err := r.conn.Get(tasksKey) + var tasksList []string + if err != nil && err != nats.ErrKeyNotFound { + return err + } + if err == nil { + // Parse existing tasks list + tasksList = []string{} + for _, line := range string(entry.Value()) { + if line != 0 { + tasksList = append(tasksList, string(line)) + } + } + } + + // Add task name if not already in list + found := false + for _, t := range tasksList { + if t == name { + found = true + break + } + } + if !found { + tasksList = append(tasksList, name) + // Store updated list (simple newline-separated format) + listBytes := []byte{} + for i, t := range tasksList { + if i > 0 { + listBytes = append(listBytes, '\n') + } + listBytes = append(listBytes, []byte(t)...) + } + if _, err := r.conn.Put(tasksKey, listBytes); err != nil { + return err + } + } + + return nil +} + +// GetTask retrieves task metadata from NATS KV +func (r *Results) GetTask(_ context.Context, name string) ([]byte, error) { + entry, err := r.conn.Get(taskPrefix + name) + if err != nil { + return nil, err + } + + return entry.Value(), nil +} + +// GetAllTasks retrieves all task metadata from NATS KV +func (r *Results) GetAllTasks(_ context.Context) ([][]byte, error) { + // Get list of task names + entry, err := r.conn.Get(tasksKey) + if err != nil { + if err == nats.ErrKeyNotFound { + return [][]byte{}, nil + } + return nil, err + } + + // Parse task names + var taskNames []string + current := []byte{} + for _, b := range entry.Value() { + if b == '\n' { + if len(current) > 0 { + taskNames = append(taskNames, string(current)) + current = []byte{} + } + } else { + current = append(current, b) + } + } + if len(current) > 0 { + taskNames = append(taskNames, string(current)) + } + + if len(taskNames) == 0 { + return [][]byte{}, nil + } + + // Fetch all task metadata + tasks := make([][]byte, 0, len(taskNames)) + for _, name := range taskNames { + entry, err := r.conn.Get(taskPrefix + name) + if err != nil { + if err == nats.ErrKeyNotFound { + continue + } + return nil, err + } + tasks = append(tasks, entry.Value()) + } + + return tasks, nil +} + +// DeleteTask removes task metadata from NATS KV +func (r *Results) DeleteTask(_ context.Context, name string) error { + // Delete the task metadata + if err := r.conn.Delete(taskPrefix + name); err != nil { + return err + } + + // Get existing tasks list + entry, err := r.conn.Get(tasksKey) + if err != nil { + if err == nats.ErrKeyNotFound { + return nil + } + return err + } + + // Parse and filter out the deleted task + var taskNames []string + current := []byte{} + for _, b := range entry.Value() { + if b == '\n' { + if len(current) > 0 { + taskNames = append(taskNames, string(current)) + current = []byte{} + } + } else { + current = append(current, b) + } + } + if len(current) > 0 { + taskNames = append(taskNames, string(current)) + } + + // Filter out the task + filtered := []string{} + for _, t := range taskNames { + if t != name { + filtered = append(filtered, t) + } + } + + // Store updated list + listBytes := []byte{} + for i, t := range filtered { + if i > 0 { + listBytes = append(listBytes, '\n') + } + listBytes = append(listBytes, []byte(t)...) + } + if _, err := r.conn.Put(tasksKey, listBytes); err != nil { + return err + } + + return nil +} diff --git a/results/redis/results.go b/results/redis/results.go index f7af666..d0e3142 100644 --- a/results/redis/results.go +++ b/results/redis/results.go @@ -15,6 +15,11 @@ const ( // Suffix for hashmaps storing success/failed job ids success = "success" failed = "failed" + + // Prefix for task metadata + taskPrefix = "tq:task:" + // Key for set of all task names + tasksSet = "tq:tasks" ) type Results struct { @@ -239,3 +244,94 @@ func (r *Results) expireMeta(ttl time.Duration) { func (r *Results) NilError() error { return redis.Nil } + +// SetTask stores task metadata in Redis +func (r *Results) SetTask(ctx context.Context, name string, task []byte) error { + r.lo.Debug("setting task metadata", "name", name) + + pipe := r.conn.Pipeline() + // Store the task metadata + if err := pipe.Set(ctx, taskPrefix+name, task, 0).Err(); err != nil { + return err + } + // Add task name to the set of all tasks + if err := pipe.SAdd(ctx, tasksSet, name).Err(); err != nil { + return err + } + if _, err := pipe.Exec(ctx); err != nil { + return err + } + + return nil +} + +// GetTask retrieves task metadata from Redis +func (r *Results) GetTask(ctx context.Context, name string) ([]byte, error) { + r.lo.Debug("getting task metadata", "name", name) + rs, err := r.conn.Get(ctx, taskPrefix+name).Bytes() + if err != nil { + return nil, err + } + + return rs, nil +} + +// GetAllTasks retrieves all task metadata from Redis +func (r *Results) GetAllTasks(ctx context.Context) ([][]byte, error) { + r.lo.Debug("getting all task metadata") + + // Get all task names from the set + taskNames, err := r.conn.SMembers(ctx, tasksSet).Result() + if err != nil { + return nil, err + } + + if len(taskNames) == 0 { + return [][]byte{}, nil + } + + // Build keys for all tasks + keys := make([]string, len(taskNames)) + for i, name := range taskNames { + keys[i] = taskPrefix + name + } + + // Fetch all task metadata in one go using MGet + results, err := r.conn.MGet(ctx, keys...).Result() + if err != nil { + return nil, err + } + + // Convert results to [][]byte + tasks := make([][]byte, 0, len(results)) + for _, result := range results { + if result == nil { + continue + } + if taskBytes, ok := result.(string); ok { + tasks = append(tasks, []byte(taskBytes)) + } + } + + return tasks, nil +} + +// DeleteTask removes task metadata from Redis +func (r *Results) DeleteTask(ctx context.Context, name string) error { + r.lo.Debug("deleting task metadata", "name", name) + + pipe := r.conn.Pipeline() + // Remove the task metadata + if err := pipe.Del(ctx, taskPrefix+name).Err(); err != nil { + return err + } + // Remove task name from the set + if err := pipe.SRem(ctx, tasksSet, name).Err(); err != nil { + return err + } + if _, err := pipe.Exec(ctx); err != nil { + return err + } + + return nil +} diff --git a/server.go b/server.go index f463c18..776d943 100644 --- a/server.go +++ b/server.go @@ -52,6 +52,14 @@ type Task struct { opts TaskOpts } +// TaskInfo represents the serializable metadata about a registered task. +// This is stored in the results backend for external UI access. +type TaskInfo struct { + Name string `json:"name" msgpack:"name"` + Queue string `json:"queue" msgpack:"queue"` + Concurrency uint32 `json:"concurrency" msgpack:"concurrency"` +} + type TaskOpts struct { Concurrency uint32 Queue string @@ -78,7 +86,9 @@ func (s *Server) RegisterTask(name string, fn handler, opts TaskOpts) error { s.q.RUnlock() if !ok { s.registerQueue(opts.Queue, opts.Concurrency) - s.registerHandler(name, Task{name: name, handler: fn, opts: opts}) + if err := s.registerHandler(name, Task{name: name, handler: fn, opts: opts}); err != nil { + return err + } return nil @@ -87,7 +97,9 @@ func (s *Server) RegisterTask(name string, fn handler, opts TaskOpts) error { // If the queue is already defined and the passed concurrency optional // is same (it can be default queue/conc) so simply register the handler if opts.Concurrency == conc { - s.registerHandler(name, Task{name: name, handler: fn, opts: opts}) + if err := s.registerHandler(name, Task{name: name, handler: fn, opts: opts}); err != nil { + return err + } return nil } @@ -143,17 +155,25 @@ func NewServer(o ServerOpts) (*Server, error) { }, nil } -// GetTasks() returns a list of all tasks registered with the server. -func (s *Server) GetTasks() []string { - s.p.RLock() - defer s.p.RUnlock() +// GetTasks() returns a list of all tasks registered with the server by pulling from the store. +func (s *Server) GetTasks() ([]TaskInfo, error) { + ctx := context.Background() + tasksBytes, err := s.results.GetAllTasks(ctx) + if err != nil { + return nil, fmt.Errorf("error fetching tasks from store: %w", err) + } - t := make([]string, 0, len(s.tasks)) - for name := range s.tasks { - t = append(t, name) + tasks := make([]TaskInfo, 0, len(tasksBytes)) + for _, taskBytes := range tasksBytes { + var taskInfo TaskInfo + if err := msgpack.Unmarshal(taskBytes, &taskInfo); err != nil { + s.log.Error("error unmarshalling task info", "error", err) + continue + } + tasks = append(tasks, taskInfo) } - return t + return tasks, nil } var ErrNotFound = errors.New("result not found") @@ -174,6 +194,7 @@ func (s *Server) GetResult(ctx context.Context, id string) ([]byte, error) { } // GetPending() returns the pending job message's in the broker's queue. +// Deprecated: Use GetPendingWithPagination for better performance with large queues func (s *Server) GetPending(ctx context.Context, queue string) ([]JobMessage, error) { rs, err := s.broker.GetPending(ctx, queue) if err != nil { @@ -190,6 +211,31 @@ func (s *Server) GetPending(ctx context.Context, queue string) ([]JobMessage, er return jobMsg, nil } +// GetPendingWithPagination returns a paginated list of pending job messages in the broker's queue. +// offset: the starting index (0-based) +// limit: maximum number of items to return +// Returns: job messages, total count, error +func (s *Server) GetPendingWithPagination(ctx context.Context, queue string, offset, limit int) ([]JobMessage, int64, error) { + rs, total, err := s.broker.GetPendingWithPagination(ctx, queue, offset, limit) + if err != nil { + return nil, 0, err + } + + var jobMsg = make([]JobMessage, len(rs)) + for i, r := range rs { + if err := msgpack.Unmarshal([]byte(r), &jobMsg[i]); err != nil { + return nil, 0, err + } + } + + return jobMsg, total, nil +} + +// GetPendingCount returns the count of pending jobs in the broker's queue without fetching the actual jobs. +func (s *Server) GetPendingCount(ctx context.Context, queue string) (int64, error) { + return s.broker.GetPendingCount(ctx, queue) +} + // DeleteJob() removes the stored results of a particular job. It does not "dequeue" // an unprocessed job. It is useful for removing the status of old finished jobs. func (s *Server) DeleteJob(ctx context.Context, id string) error { @@ -450,10 +496,28 @@ func (s *Server) registerQueue(name string, conc uint32) { s.q.Unlock() } -func (s *Server) registerHandler(name string, t Task) { +func (s *Server) registerHandler(name string, t Task) error { + // Store in-memory for fast access s.p.Lock() s.tasks[name] = t s.p.Unlock() + + // Persist task metadata to store for external UI access + taskInfo := TaskInfo{ + Name: name, + Queue: t.opts.Queue, + Concurrency: t.opts.Concurrency, + } + taskBytes, err := msgpack.Marshal(taskInfo) + if err != nil { + return fmt.Errorf("error marshalling task info: %w", err) + } + + if err := s.results.SetTask(context.Background(), name, taskBytes); err != nil { + return fmt.Errorf("error persisting task to store: %w", err) + } + + return nil } func (s *Server) getHandler(name string) (Task, error) {