diff --git a/pkg/ds/daemon_set.go b/pkg/ds/daemon_set.go index f98c722e3..cb1e653a2 100644 --- a/pkg/ds/daemon_set.go +++ b/pkg/ds/daemon_set.go @@ -361,6 +361,7 @@ func (ds *daemonSet) WatchDesires( // so that the timer would be stopped after err = nil case <-ctx.Done(): + ds.logger.Warnln("goroutine exiting due to canceled context") return } } else { @@ -380,6 +381,7 @@ func (ds *daemonSet) WatchDesires( case newDS, ok := <-updatedCh: if !ok { // channel closed + ds.logger.Warnln("goroutine exiting because updatedCh has closed") return } if ds.ID() != newDS.ID { @@ -462,6 +464,7 @@ func (ds *daemonSet) WatchDesires( case deleteDS, ok := <-deletedCh: if !ok { + ds.logger.Warnln("goroutine exiting because deletedCh has closed") return } // Deleting a daemon sets has no effect @@ -471,6 +474,7 @@ func (ds *daemonSet) WatchDesires( case _, ok := <-nodesChangedCh: if !ok { // channel closed + ds.logger.Warnln("goroutine exiting because nodesChangedCh has closed") return } if reportErr := ds.reportEligible(); reportErr != nil { @@ -530,6 +534,7 @@ func (ds *daemonSet) WatchDesires( nodesToAdd <- addedNodes case <-ctx.Done(): + ds.logger.Warnln("goroutine exiting because context was canceled") return } } diff --git a/pkg/ds/farm.go b/pkg/ds/farm.go index dab525b82..e81a32fd5 100644 --- a/pkg/ds/farm.go +++ b/pkg/ds/farm.go @@ -87,7 +87,7 @@ type Farm struct { type childDS struct { ds DaemonSet cancel context.CancelFunc - updatedCh chan<- ds_fields.DaemonSet + updatedCh chan ds_fields.DaemonSet deletedCh chan<- ds_fields.DaemonSet errCh <-chan error unlocker consul.TxnUnlocker @@ -310,6 +310,10 @@ func (dsf *Farm) closeChild(dsID fields.ID) { if child, ok := dsf.children[dsID]; ok { dsf.logger.WithField("ds", dsID).Infoln("Releasing daemon set") child.cancel() + + // drain the updatedCh (it's buffered) + for range child.updatedCh { + } close(child.updatedCh) close(child.deletedCh) @@ -560,7 +564,9 @@ func (dsf *Farm) spawnDaemonSet( dsf.statusWritingInterval, ) - updatedCh := make(chan ds_fields.DaemonSet) + // updatedCh is buffered by 1 to protect the control loop by slow (or + // dead) readers + updatedCh := make(chan ds_fields.DaemonSet, 1) deletedCh := make(chan ds_fields.DaemonSet) ctx, cancel := context.WithCancel(ctx) @@ -733,6 +739,14 @@ func (dsf *Farm) lockAndSpawn(ctx context.Context, dsFields ds_fields.DaemonSet, // If we already are running the daemon set, just pass the update. Otherwise spawn one if ok { + // try to drain the buffered value off the updatedCh if there is one (which + // indicates the worker goroutine was slow to read it or is dead) + select { + case <-child.updatedCh: + dsf.logger.Warnln("daemon set worker missed an update, sending a newer one") + default: + } + child.updatedCh <- dsFields } else { dsf.children[dsFields.ID] = dsf.spawnDaemonSet(