Skip to content

Commit 30a4919

Browse files
authored
chore(poller): avoid warn log on shutdown (#1480)
What changed? poller will no longer poll on permit failure poller will no longer emit warn log on shutdown Why? red herring of warning log on poller acquire permit failures. How did you test it? Unit Test --------- Signed-off-by: Shijie Sheng <liouvetren@gmail.com>
1 parent b9b333e commit 30a4919

File tree

2 files changed

+70
-4
lines changed

2 files changed

+70
-4
lines changed

internal/internal_worker_base.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ type (
139139
pollLimiter *rate.Limiter
140140
taskLimiter *rate.Limiter
141141
limiterContext context.Context
142-
limiterContextCancel func()
142+
limiterContextCancel context.CancelCauseFunc
143143
retrier *backoff.ConcurrentRetrier // Service errors back off retrier
144144
logger *zap.Logger
145145
metricsScope tally.Scope
@@ -168,7 +168,7 @@ func createPollRetryPolicy() backoff.RetryPolicy {
168168
}
169169

170170
func newBaseWorker(options baseWorkerOptions, logger *zap.Logger, metricsScope tally.Scope, sessionTokenBucket *sessionTokenBucket) *baseWorker {
171-
ctx, cancel := context.WithCancel(context.Background())
171+
ctx, cancel := context.WithCancelCause(context.Background())
172172
logger = logger.With(zap.String(tagWorkerType, options.workerType))
173173
metricsScope = tagScope(metricsScope, tagWorkerType, options.workerType)
174174

@@ -324,7 +324,10 @@ func (bw *baseWorker) pollTask() {
324324
if pErr := bw.concurrency.PollerPermit.Acquire(bw.limiterContext); pErr == nil {
325325
defer bw.concurrency.PollerPermit.Release()
326326
} else {
327-
bw.logger.Warn("poller permit acquire error", zap.Error(pErr))
327+
if !errors.Is(context.Cause(bw.limiterContext), errShutdown) { // don't log on shutdown
328+
bw.logger.Warn("poller permit acquire error", zap.Error(pErr))
329+
}
330+
return // don't try to poll without a permit, will retry on next poll
328331
}
329332

330333
bw.retrier.Throttle()
@@ -418,7 +421,7 @@ func (bw *baseWorker) Stop() {
418421
return
419422
}
420423
close(bw.shutdownCh)
421-
bw.limiterContextCancel()
424+
bw.limiterContextCancel(errShutdown)
422425
bw.concurrencyAutoScaler.Stop()
423426

424427
if success := util.AwaitWaitGroup(&bw.shutdownWG, bw.options.shutdownTimeout); !success {
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package internal
2+
3+
import (
4+
"errors"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
"github.com/uber-go/tally"
9+
"go.uber.org/zap"
10+
"go.uber.org/zap/zapcore"
11+
"go.uber.org/zap/zaptest/observer"
12+
13+
"go.uber.org/cadence/internal/common/debug"
14+
)
15+
16+
func TestBaseWorker_pollTask_no_warnLogOnShutdown(t *testing.T) {
17+
core, observed := observer.New(zapcore.InfoLevel)
18+
logger := zap.New(core, zap.Development())
19+
worker := newBaseWorker(baseWorkerOptions{
20+
maxConcurrentTask: 1,
21+
pollerCountWithoutAutoScaling: 1,
22+
identity: "test-identity",
23+
pollerTracker: debug.NewNoopPollerTracker(),
24+
taskWorker: &testTaskWorker{},
25+
}, logger, tally.NoopScope, nil)
26+
27+
// mock the worker started
28+
worker.Start()
29+
worker.Stop()
30+
worker.pollTask()
31+
32+
assert.Equal(t, 0, observed.FilterMessage("poller permit acquire error").Len())
33+
}
34+
35+
func TestBaseWorker_processTask_warnLogOnOtherError(t *testing.T) {
36+
core, observed := observer.New(zapcore.InfoLevel)
37+
logger := zap.New(core, zap.Development())
38+
worker := newBaseWorker(baseWorkerOptions{
39+
maxConcurrentTask: 1,
40+
pollerCountWithoutAutoScaling: 1,
41+
identity: "test-identity",
42+
pollerTracker: debug.NewNoopPollerTracker(),
43+
taskWorker: &testTaskWorker{},
44+
}, logger, tally.NoopScope, nil)
45+
46+
// mock the worker started
47+
worker.Start()
48+
worker.limiterContextCancel(errors.New("test error"))
49+
worker.pollTask()
50+
worker.Stop()
51+
52+
assert.GreaterOrEqual(t, observed.FilterMessage("poller permit acquire error").Len(), 1)
53+
}
54+
55+
type testTaskWorker struct{}
56+
57+
func (t *testTaskWorker) PollTask() (interface{}, error) {
58+
return nil, errors.New("poll in test will fail")
59+
}
60+
61+
func (t *testTaskWorker) ProcessTask(task interface{}) error {
62+
return nil
63+
}

0 commit comments

Comments
 (0)