Skip to content

Commit a3237b8

Browse files
kv, sql: Accumulate CPU time from KV BatchResponses
This commit implements a solution to recording kv cpu time on every query execution - similar to other fields recorded in topLevelQueryStats. This is done by including a new CPUTime field in the BatchResponse Header from KV. The CPUTime field tracks the time reported in Replica.Send which is representative of the "synchronous" work done by KV for the request, i.e. it does not include other async replication related work. GetKVCPUTime is added to the KVBatchFetcher interface. GetKVResponseCPUTime is added to the KVReader interface. kvCPUTime is added to the mutationPlanNode interface. Epic: https://cockroachlabs.atlassian.net/browse/CRDB-55080 Part of: https://cockroachlabs.atlassian.net/browse/CRDB-55922 Release note: None
1 parent 9510b99 commit a3237b8

37 files changed

+254
-28
lines changed

pkg/cli/debug_send_kv_batch_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ func TestSendKVBatch(t *testing.T) {
7878
{
7979
"header": {
8080
"Timestamp": {},
81-
"now": {}
81+
"now": {},
82+
"cpuTime": {}
8283
},
8384
"responses": [
8485
{"put": {
@@ -112,9 +113,11 @@ func TestSendKVBatch(t *testing.T) {
112113
require.NoError(t, err)
113114

114115
// Clean and check the BatchResponse output, by removing first line
115-
// (contains input command) and emptying out all HLC timestamp objects.
116+
// (contains input command), emptying out all HLC timestamp objects,
117+
// and zeroing out the cpuTime value.
116118
output = strings.SplitN(output, "\n", 2)[1]
117119
output = regexp.MustCompile(`(?s)\{\s*"wallTime":.*?\}`).ReplaceAllString(output, "{}")
120+
output = regexp.MustCompile(`"cpuTime"\s*:\s*("[^"]*"|\d+)`).ReplaceAllString(output, `"cpuTime": {}`)
118121
require.JSONEq(t, jsonResponse, output)
119122

120123
// Check that a structured log event was emitted.

pkg/kv/kvclient/kvstreamer/streamer.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,9 @@ type Streamer struct {
307307
type streamerStatistics struct {
308308
atomics struct {
309309
kvPairsRead *int64
310+
// kvCPUTime tracks the cumulative CPU time (in nanoseconds) that KV
311+
// reports in the BatchResponse headers.
312+
kvCPUTime *int64
310313
// batchRequestsIssued tracks the number of BatchRequests issued by the
311314
// Streamer. Note that this number is only used for the logging done by
312315
// the Streamer itself and is separate from any possible bookkeeping
@@ -381,6 +384,8 @@ type sendFn func(context.Context, *kvpb.BatchRequest) (*kvpb.BatchResponse, erro
381384
//
382385
// kvPairsRead should be incremented atomically with the sum of NumKeys
383386
// parameters of all received responses.
387+
// kvCPUTime should be incremented atomically with the CPUTime from BatchResponse
388+
// headers.
384389
func NewStreamer(
385390
distSender *kvcoord.DistSender,
386391
metrics *Metrics,
@@ -393,6 +398,7 @@ func NewStreamer(
393398
limitBytes int64,
394399
acc *mon.BoundAccount,
395400
kvPairsRead *int64,
401+
kvCPUTime *int64,
396402
lockStrength lock.Strength,
397403
lockDurability lock.Durability,
398404
reverse bool,
@@ -430,6 +436,10 @@ func NewStreamer(
430436
kvPairsRead = new(int64)
431437
}
432438
s.atomics.kvPairsRead = kvPairsRead
439+
if kvCPUTime == nil {
440+
kvCPUTime = new(int64)
441+
}
442+
s.atomics.kvCPUTime = kvCPUTime
433443
s.coordinator = workerCoordinator{
434444
s: s,
435445
sendFn: sendFn,
@@ -1452,6 +1462,11 @@ func (w *workerCoordinator) performRequestAsync(
14521462
}
14531463
atomic.AddInt64(&w.s.atomics.batchRequestsIssued, 1)
14541464

1465+
// Accumulate CPU time from the BatchResponse header.
1466+
if br.CPUTime > 0 {
1467+
atomic.AddInt64(w.s.atomics.kvCPUTime, br.CPUTime)
1468+
}
1469+
14551470
// First, we have to reconcile the memory budget. We do it
14561471
// separately from processing the results because we want to know
14571472
// how many Gets and Scans need to be allocated for the ResumeSpans,

pkg/kv/kvclient/kvstreamer/streamer_accounting_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ func TestStreamerMemoryAccounting(t *testing.T) {
100100
math.MaxInt64,
101101
&acc,
102102
nil, /* kvPairsRead */
103+
nil, /* kvCPUTime */
103104
lock.None,
104105
lock.Unreplicated,
105106
reverse,

pkg/kv/kvclient/kvstreamer/streamer_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func getStreamer(
7070
limitBytes,
7171
acc,
7272
nil, /* kvPairsRead */
73+
nil, /* kvCPUTime */
7374
lock.None,
7475
lock.Unreplicated,
7576
reverse,
@@ -131,6 +132,7 @@ func TestStreamerLimitations(t *testing.T) {
131132
math.MaxInt64, /* limitBytes */
132133
nil, /* acc */
133134
nil, /* kvPairsRead */
135+
nil, /* kvCpuTime */
134136
lock.None,
135137
lock.Unreplicated,
136138
false, /* reverse */

pkg/kv/kvpb/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -754,6 +754,7 @@ func (h *BatchResponse_Header) combine(o BatchResponse_Header) error {
754754
h.Now.Forward(o.Now)
755755
h.RangeInfos = append(h.RangeInfos, o.RangeInfos...)
756756
h.CollectedSpans = append(h.CollectedSpans, o.CollectedSpans...)
757+
h.CPUTime += o.CPUTime
757758
return nil
758759
}
759760

pkg/kv/kvpb/api.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3207,6 +3207,10 @@ message BatchResponse {
32073207
// The field is cleared by the DistSender because it refers routing
32083208
// information not exposed by the KV API.
32093209
repeated RangeInfo range_infos = 7 [(gogoproto.nullable) = false];
3210+
// CPUTime is tracked in Replica.Send, meaning it only tracks the CPU time
3211+
// for the request evaluation goroutine, and not the other replication related
3212+
// work. The value is either zero (unset) or positive.
3213+
int64 cpu_time = 8 [(gogoproto.customname) = "CPUTime"];
32103214
// NB: if you add a field here, don't forget to update combine().
32113215
}
32123216
Header header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];

pkg/kv/kvpb/batch_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -705,7 +705,8 @@ func TestBatchResponseCombine(t *testing.T) {
705705
)
706706
brTxn := &BatchResponse{
707707
BatchResponse_Header: BatchResponse_Header{
708-
Txn: &txn,
708+
Txn: &txn,
709+
CPUTime: 123,
709710
},
710711
}
711712
if err := br.Combine(context.Background(), brTxn, nil, &BatchRequest{}); err != nil {
@@ -714,6 +715,9 @@ func TestBatchResponseCombine(t *testing.T) {
714715
if br.Txn.Name != "test" {
715716
t.Fatal("Combine() did not update the header")
716717
}
718+
if br.CPUTime != 123 {
719+
t.Fatalf("Combine() did not accumulate CPUTime: expected 123, got %d", br.CPUTime)
720+
}
717721
}
718722

719723
br.Responses = make([]ResponseUnion, 1)

pkg/kv/kvserver/replica_send.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,14 +206,18 @@ func (r *Replica) SendWithWriteBytes(
206206
}
207207
}
208208

209+
cpuTime := grunning.Difference(startCPU, grunning.Time())
210+
if br != nil {
211+
br.CPUTime = int64(cpuTime)
212+
}
209213
if pErr == nil {
210214
// Return range information if it was requested. Note that we don't return it
211215
// on errors because the code doesn't currently support returning both a br
212216
// and a pErr here. Also, some errors (e.g. NotLeaseholderError) have custom
213217
// ways of returning range info.
214218
r.maybeAddRangeInfoToResponse(ctx, ba, br)
215219
// Handle load-based splitting, if necessary.
216-
r.recordBatchForLoadBasedSplitting(ctx, ba, br, int(grunning.Difference(startCPU, grunning.Time())))
220+
r.recordBatchForLoadBasedSplitting(ctx, ba, br, int(cpuTime))
217221
}
218222

219223
// Record summary throughput information about the batch request for

pkg/sql/colexecop/operator.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ type KVReader interface {
9292
// KV requests. It must be safe for concurrent use. It is used to calculate
9393
// the SQL CPU time.
9494
GetKVCPUTime() time.Duration
95+
// GetKVResponseCPUTime returns the CPU time as reported by KV BatchResponses
96+
// processed by the KVReader throughout its lifetime so far.
97+
GetKVResponseCPUTime() int64
9598
// UsedStreamer returns whether the Streamer API was used by the KVReader.
9699
UsedStreamer() bool
97100
}

pkg/sql/colfetcher/cfetcher.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -271,16 +271,17 @@ type cFetcher struct {
271271
// stableKVs indicates whether the KVs returned by nextKVer are stable (i.e.
272272
// are not invalidated) across NextKV() calls.
273273
stableKVs bool
274-
// bytesRead, kvPairsRead, and batchRequestsIssued store the total number of
275-
// bytes read, key-values pairs read, and of BatchRequests issued,
276-
// respectively, by this cFetcher throughout its lifetime in case when the
277-
// underlying row.KVFetcher has already been closed and nil-ed out.
274+
// bytesRead, kvPairsRead, kvCPUTime, and batchRequestsIssued store the total number of
275+
// bytes read, key-values pairs read, CPU time reported by KV BatchResponses, and of
276+
// BatchRequests issued, respectively, by this cFetcher throughout its lifetime in
277+
// case when the underlying row.KVFetcher has already been closed and nil-ed out.
278278
//
279279
// The fields should not be accessed directly by the users of the cFetcher -
280-
// getBytesRead(), getKVPairsRead(), and getBatchRequestsIssued() should be
280+
// getBytesRead(), getKVPairsRead(), getKVCpuTime(), and getBatchRequestsIssued() should be
281281
// used instead.
282282
bytesRead int64
283283
kvPairsRead int64
284+
kvCPUTime int64
284285
batchRequestsIssued int64
285286
// cpuStopWatch tracks the CPU time spent by this cFetcher while fulfilling KV
286287
// requests *in the current goroutine*.
@@ -1484,6 +1485,15 @@ func (cf *cFetcher) getKVPairsRead() int64 {
14841485
return cf.kvPairsRead
14851486
}
14861487

1488+
// getKVCPUTime returns the CPU time as reported by KV BatchResponses processed
1489+
// by the cFetcher throughout its lifetime so far.
1490+
func (cf *cFetcher) getKVCPUTime() int64 {
1491+
if cf.fetcher != nil {
1492+
return cf.fetcher.GetKVCPUTime()
1493+
}
1494+
return cf.kvCPUTime
1495+
}
1496+
14871497
// getBatchRequestsIssued returns the number of BatchRequests issued by the
14881498
// cFetcher throughout its lifetime so far.
14891499
func (cf *cFetcher) getBatchRequestsIssued() int64 {
@@ -1522,6 +1532,7 @@ func (cf *cFetcher) Close(ctx context.Context) {
15221532
if cf.fetcher != nil {
15231533
cf.bytesRead = cf.fetcher.GetBytesRead()
15241534
cf.kvPairsRead = cf.fetcher.GetKVPairsRead()
1535+
cf.kvCPUTime = cf.fetcher.GetKVCPUTime()
15251536
cf.batchRequestsIssued = cf.fetcher.GetBatchRequestsIssued()
15261537
cf.fetcher.Close(ctx)
15271538
cf.fetcher = nil

0 commit comments

Comments
 (0)