Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@

# Dependency directories (remove the comment below to include it)
# vendor/

.CLAUDE/
CLAUDE.md
3 changes: 3 additions & 0 deletions brokers/in-memory/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ func New() *Broker {
}

func (r *Broker) Consume(ctx context.Context, work chan []byte, queue string) {
// Ensure work channel is closed when producer (broker) is done
defer close(work)

r.mu.RLock()
ch, ok := r.queues[queue]
r.mu.RUnlock()
Expand Down
17 changes: 17 additions & 0 deletions brokers/nats-js/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,23 @@ func (b *Broker) Enqueue(_ context.Context, msg []byte, queue string) error {
return nil
}

// TODO: CRITICAL - This implementation has goroutine leak issues that prevent graceful shutdown:
//
// 1. SUBSCRIPTION NOT CLEANED UP: The NATS subscription is never unsubscribed when context
// is cancelled, meaning the subscription callback continues running indefinitely.
//
// 2. CALLBACK GOROUTINE HANGS: The callback function may block forever on "work <- msg.Data"
// if the work channel is full or no longer being read, preventing graceful shutdown.
//
// 3. NO DRAIN/UNSUBSCRIBE: Should call subscription.Drain() or subscription.Unsubscribe()
// when context is cancelled to properly clean up NATS resources.
//
// This causes Server.Start() WaitGroup to hang indefinitely because s.consume() never exits.
// A proper fix would store the subscription and clean it up:
//
// sub, err := b.conn.Subscribe(queue, callback, opts...)
// defer sub.Drain() // or sub.Unsubscribe()
// <-ctx.Done()
func (b *Broker) Consume(ctx context.Context, work chan []byte, queue string) {
_, err := b.conn.Subscribe(queue, func(msg *nats.Msg) {
work <- msg.Data
Expand Down
2 changes: 2 additions & 0 deletions brokers/redis/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func (b *Broker) EnqueueScheduled(ctx context.Context, msg []byte, queue string,
}

func (b *Broker) Consume(ctx context.Context, work chan []byte, queue string) {
// Ensure work channel is closed when producer (broker) is done
defer close(work)
go b.consumeScheduled(ctx, queue)

for {
Expand Down
9 changes: 5 additions & 4 deletions chains.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (s *Server) GetChain(ctx context.Context, id string) (ChainMessage, error)
// Fetch the current job, to check its status
currJob, err := s.GetJob(ctx, c.JobID)
if err != nil {
return ChainMessage{}, nil
return ChainMessage{}, fmt.Errorf("failed to get current job %s in chain %s: %w", c.JobID, id, err)
}

checkJobs:
Expand All @@ -117,16 +117,17 @@ checkJobs:
if len(currJob.OnSuccessIDs) == 0 {
c.Status = StatusDone
} else {
currJob, err = s.GetJob(ctx, currJob.OnSuccessIDs[0])
nextJobID := currJob.OnSuccessIDs[0]
currJob, err = s.GetJob(ctx, nextJobID)
if err != nil {
return ChainMessage{}, nil
return ChainMessage{}, fmt.Errorf("failed to get next job %s in chain %s: %w", nextJobID, id, err)
}
goto checkJobs
}
}

if err = s.setChainMessage(ctx, c); err != nil {
return ChainMessage{}, nil
return ChainMessage{}, fmt.Errorf("failed to save chain message for chain %s: %w", id, err)
}

return c, nil
Expand Down
102 changes: 70 additions & 32 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,14 @@ func (s *Server) GetSuccess(ctx context.Context) ([]string, error) {

// Start() starts the job consumer and processor. It is a blocking function.
func (s *Server) Start(ctx context.Context) {
go s.cron.Start()
// Loop over each registered queue.
s.q.RLock()
queues := s.queues
s.q.RUnlock()

var wg sync.WaitGroup

s.startCronScheduler(ctx, &wg)
for q, conc := range queues {
q := q // Hack to fix the loop variable capture issue.
if s.traceProv != nil {
Expand Down Expand Up @@ -258,41 +259,43 @@ func (s *Server) process(ctx context.Context, w chan []byte) {
defer span.End()
}

select {
case <-ctx.Done():
s.log.Info("shutting down processor..")
// Channel-only shutdown: wait for work or channel closure
work, ok := <-w
if !ok {
// Channel closed by broker - clean shutdown
s.log.Info("work channel closed, shutting down processor..")
return
case work := <-w:
var (
msg JobMessage
err error
)
// Decode the bytes into a job message
if err = msgpack.Unmarshal(work, &msg); err != nil {
s.spanError(span, err)
s.log.Error("error unmarshalling task", "error", err)
break
}
}

// Fetch the registered task handler.
task, err := s.getHandler(msg.Job.Task)
if err != nil {
s.spanError(span, err)
s.log.Error("handler not found", "error", err)
break
}
var (
msg JobMessage
err error
)
// Decode the bytes into a job message
if err = msgpack.Unmarshal(work, &msg); err != nil {
s.spanError(span, err)
s.log.Error("error unmarshalling task", "error", err)
continue
}

// Set the job status as being "processed"
if err := s.statusProcessing(ctx, msg); err != nil {
s.spanError(span, err)
s.log.Error("error setting the status to processing", "error", err)
break
}
// Fetch the registered task handler.
task, err := s.getHandler(msg.Job.Task)
if err != nil {
s.spanError(span, err)
s.log.Error("handler not found", "error", err)
continue
}

if err := s.execJob(ctx, msg, task); err != nil {
s.spanError(span, err)
s.log.Error("could not execute job", "error", err)
}
// Set the job status as being "processed"
if err := s.statusProcessing(ctx, msg); err != nil {
s.spanError(span, err)
s.log.Error("error setting the status to processing", "error", err)
continue
}

if err := s.execJob(ctx, msg, task); err != nil {
s.spanError(span, err)
s.log.Error("could not execute job", "error", err)
}
}
}
Expand Down Expand Up @@ -570,3 +573,38 @@ func (s *Server) spanError(sp spans.Span, err error) {
sp.SetStatus(codes.Error, err.Error())
}
}

// startCronScheduler starts the cron scheduler and manages its lifecycle.
// It handles graceful shutdown when the context is cancelled.
func (s *Server) startCronScheduler(ctx context.Context, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()

// Check if context is already cancelled before starting cron
select {
case <-ctx.Done():
s.log.Debug("context cancelled before cron start")
return
default:
}

s.cron.Start()
s.log.Debug("cron scheduler started")

// Wait for shutdown signal
<-ctx.Done()
s.log.Info("shutting down cron scheduler...")

// Stop cron with timeout protection
cronCtx := s.cron.Stop()
shutdownTimeout := 5 * time.Second

select {
case <-cronCtx.Done():
s.log.Debug("cron scheduler stopped gracefully")
case <-time.After(shutdownTimeout):
s.log.Warn("cron scheduler stop timeout - forcing exit")
}
}()
}
59 changes: 59 additions & 0 deletions server_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tasqueue

import (
"context"
"encoding/json"
"fmt"
"log/slog"
Expand Down Expand Up @@ -58,3 +59,61 @@ func MockHandlerWithSleep(msg []byte, _ JobCtx) error {

return nil
}

// TestCronLifecycleManagement verifies that cron scheduler starts and stops properly
func TestCronLifecycleManagement(t *testing.T) {
// Create server with in-memory broker/results
srv := newServer(t, "test", func(b []byte, ctx JobCtx) error {
return nil
})

// Create a context that we can cancel
ctx, cancel := context.WithCancel(context.Background())

// Start server in a goroutine
done := make(chan bool)
go func() {
srv.Start(ctx)
done <- true
}()

// Give the server a moment to start
time.Sleep(100 * time.Millisecond)

// Cancel the context to trigger shutdown
cancel()

// Wait for server to shut down with timeout
select {
case <-done:
t.Log("Server shut down gracefully")
case <-time.After(10 * time.Second):
t.Fatal("Server failed to shut down within timeout - cron leak likely exists")
}
}

// TestCronContextCancellationBeforeStart verifies that cron handles early cancellation
func TestCronContextCancellationBeforeStart(t *testing.T) {
srv := newServer(t, "test", func(b []byte, ctx JobCtx) error {
return nil
})

// Create a context that's already cancelled
ctx, cancel := context.WithCancel(context.Background())
cancel() // Cancel immediately

// Start server in a goroutine
done := make(chan bool)
go func() {
srv.Start(ctx)
done <- true
}()

// Wait for server to shut down with timeout
select {
case <-done:
t.Log("Server handled pre-cancelled context gracefully")
case <-time.After(5 * time.Second):
t.Fatal("Server failed to handle pre-cancelled context")
}
}
Loading