@@ -97,12 +97,11 @@ func StartSampler(
9797 case <- stopper .ShouldQuiesce ():
9898 return
9999 case <- ticker .C :
100- lastIntervalHistogram := s .lastIntervalHistogram ()
101- if lastIntervalHistogram == nil {
100+ schedulingLatenciesHistogram := s .getAndClearLastStatsHistogram ()
101+ if schedulingLatenciesHistogram == nil {
102102 continue
103103 }
104-
105- schedulerLatencyHistogram .update (lastIntervalHistogram )
104+ schedulerLatencyHistogram .update (schedulingLatenciesHistogram )
106105 }
107106 }
108107 })
@@ -148,8 +147,10 @@ type sampler struct {
148147 listener LatencyObserver
149148 mu struct {
150149 syncutil.Mutex
151- ringBuffer ring.Buffer [* metrics.Float64Histogram ]
152- lastIntervalHistogram * metrics.Float64Histogram
150+ ringBuffer ring.Buffer [* metrics.Float64Histogram ]
151+ // schedulerLatencyAccumulator accumulates per-sample histogram deltas of
152+ // the go scheduler latencies.
153+ schedulerLatencyAccumulator * metrics.Float64Histogram
153154 }
154155}
155156
@@ -169,7 +170,7 @@ func (s *sampler) setPeriodAndDuration(period, duration time.Duration) {
169170 numSamples = 1 // we need at least one sample to compare (also safeguards against integer division)
170171 }
171172 s .mu .ringBuffer .Resize (numSamples )
172- s .mu .lastIntervalHistogram = nil
173+ s .mu .schedulerLatencyAccumulator = nil
173174}
174175
175176// sampleOnTickAndInvokeCallbacks samples scheduler latency stats as the ticker
@@ -178,16 +179,32 @@ func (s *sampler) sampleOnTickAndInvokeCallbacks(period time.Duration) {
178179 s .mu .Lock ()
179180 defer s .mu .Unlock ()
180181
182+ // Capture the previous sample before adding the new one.
183+ var prevSample * metrics.Float64Histogram
184+ if s .mu .ringBuffer .Len () > 0 {
185+ prevSample = s .mu .ringBuffer .GetFirst ()
186+ }
187+
181188 latestCumulative := sample ()
182189 oldestCumulative , ok := s .recordLocked (latestCumulative )
183190 if ! ok {
184191 return
185192 }
186- s .mu .lastIntervalHistogram = sub (latestCumulative , oldestCumulative )
187- p99 := time .Duration (int64 (percentile (s .mu .lastIntervalHistogram , 0.99 ) * float64 (time .Second .Nanoseconds ())))
193+
194+ // Compute the delta since the previous sample for stats accumulation.
195+ if prevSample != nil {
196+ sampleDelta := sub (latestCumulative , prevSample )
197+ if s .mu .schedulerLatencyAccumulator == nil {
198+ s .mu .schedulerLatencyAccumulator = sampleDelta
199+ } else {
200+ s .mu .schedulerLatencyAccumulator = add (s .mu .schedulerLatencyAccumulator , sampleDelta )
201+ }
202+ }
188203
189204 // Perform the callback if there's a listener.
190205 if s .listener != nil {
206+ p99 := time .Duration (int64 (percentile (sub (latestCumulative , oldestCumulative ),
207+ 0.99 ) * float64 (time .Second .Nanoseconds ())))
191208 s .listener .SchedulerLatency (p99 , period )
192209 }
193210}
@@ -203,10 +220,12 @@ func (s *sampler) recordLocked(
203220 return oldest , oldest != nil
204221}
205222
206- func (s * sampler ) lastIntervalHistogram () * metrics.Float64Histogram {
223+ func (s * sampler ) getAndClearLastStatsHistogram () * metrics.Float64Histogram {
207224 s .mu .Lock ()
208225 defer s .mu .Unlock ()
209- return s .mu .lastIntervalHistogram
226+ res := s .mu .schedulerLatencyAccumulator
227+ s .mu .schedulerLatencyAccumulator = nil
228+ return res
210229}
211230
212231// sample the cumulative (since process start) scheduler latency histogram from
@@ -248,6 +267,16 @@ func sub(a, b *metrics.Float64Histogram) *metrics.Float64Histogram {
248267 return res
249268}
250269
270+ // add adds the counts of one histogram to another, assuming the bucket
271+ // boundaries are the same.
272+ func add (a , b * metrics.Float64Histogram ) * metrics.Float64Histogram {
273+ res := clone (a )
274+ for i := 0 ; i < len (res .Counts ); i ++ {
275+ res .Counts [i ] += b .Counts [i ]
276+ }
277+ return res
278+ }
279+
251280// percentile computes a specific percentile value of the given histogram.
252281//
253282// TODO(irfansharif): Deduplicate this with the quantile computation in
0 commit comments