@@ -13,6 +13,7 @@ import (
1313 pg "github.com/djthorpe/go-pg"
1414 httpresponse "github.com/mutablelogic/go-server/pkg/httpresponse"
1515 schema "github.com/mutablelogic/go-server/pkg/pgqueue/schema"
16+ ref "github.com/mutablelogic/go-server/pkg/ref"
1617 types "github.com/mutablelogic/go-server/pkg/types"
1718)
1819
@@ -462,26 +463,27 @@ func (manager *Manager) RunTaskLoop(ctx context.Context, ch chan<- *schema.Task)
462463
463464// RunNotificationLoop runs a loop to process database notifications, until the context is cancelled
464465// or an error occurs.
465- func (manager * Manager ) RunNotificationLoop (ctx context.Context , ch chan <- * pg.Notification ) error {
466+ func (manager * Manager ) RunNotificationLoop (parent context.Context , ch chan <- * pg.Notification ) error {
466467 // Subscribe to topics
467468 for _ , topic := range manager .topics {
468- if err := manager .listener .Listen (ctx , topic ); err != nil {
469+ if err := manager .listener .Listen (parent , topic ); err != nil {
469470 return err
470471 }
471472 }
472473 defer func () {
474+ ctx := ref .WithProvider (context .Background (), ref .Provider (parent ))
473475 for _ , topic := range manager .topics {
474- manager .listener .Unlisten (context . TODO () , topic )
476+ manager .listener .Unlisten (ctx , topic )
475477 }
476478 }()
477479
478480 // Loop until context is cancelled
479481 for {
480482 select {
481- case <- ctx .Done ():
483+ case <- parent .Done ():
482484 return nil
483485 default :
484- if notification , err := manager .listener .WaitForNotification (ctx ); err != nil {
486+ if notification , err := manager .listener .WaitForNotification (parent ); err != nil {
485487 if ! errors .Is (err , context .Canceled ) && ! errors .Is (err , context .DeadlineExceeded ) {
486488 return err
487489 }
0 commit comments