Skip to content

Commit 1be5978

Browse files
craig[bot]alyshanjahani-crlwenyihu6
committed
156895: kv, sql: Accumulate CPU time from KV BatchResponses r=alyshanjahani-crl a=alyshanjahani-crl This commit implements a solution to recording kv cpu time on every query execution - similar to other fields recorded in topLevelQueryStats. This is done by including a new cpuTime field in the BatchResponse Header from KV. The cpuTime field tracks the time reported in Replica.Send which is representative of the "synchronous" work done by KV for the request, i.e. it does not include other async replication related work. GetKVCpuTime is added to the KVBatchFetcher interface. GetKVResponseCpuTime is added to the KVReader interface. kvCpuTime is added to the mutationPlanNode interface. Epic: https://cockroachlabs.atlassian.net/browse/CRDB-55080 Part of: https://cockroachlabs.atlassian.net/browse/CRDB-55922 Release note: None 158175: mmaprototype: extract analyzeFunc r=wenyihu6 a=wenyihu6 Epic: CRDB-55052 Release note: none --- **mmaprototype: extract analyzeFunc** This commit moves analyzeFunc out of finishInit into its own helper improve readability. --- **mmaprototype: extract doneFunc** This commit moves doneFunc out of analyzeFunc into its own helper function to improve readability. --- **mmaprototype: add more comments for finishInit** This commit adds more comments for rangeAnalyzedConstraints.finishInit. --- **mmaprototype: extract diversity score computation** This commit moves diversity score computation out of analyzeFunc into its own helper function to improve readability. --- **mmaprototype: clean up diversityScore** Previously, we were inefficiently invoking diversityFunc twice over voters X voters diversity score calculation when refactoring the code. This change reverts it back so diversity score is computed once and reused. --- **mmaprototype: rename diversityFunc to diversityOfTwoStoreSets** --- **mmaprototype: rename ac.analyzeFunc to ac.initialize** This commit renames ac.analyzeFunc to ac.initialize and updates the signature so constraints are passed directly to initialize, rather than being populated by caller. --- **mmaprototype: move isConstraintSatisfied to a closure** This commit moves isConstraintSatisfied from a struct method to a helper closure inside ac.initialize. --- **mmaprototype: clean up comments** Co-authored-by: alyshanjahani-crl <alyshan@cockroachlabs.com> Co-authored-by: wenyihu6 <wenyi@cockroachlabs.com>
3 parents b2158da + a3237b8 + 1f2576b commit 1be5978

39 files changed

+529
-159
lines changed

pkg/cli/debug_send_kv_batch_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ func TestSendKVBatch(t *testing.T) {
7878
{
7979
"header": {
8080
"Timestamp": {},
81-
"now": {}
81+
"now": {},
82+
"cpuTime": {}
8283
},
8384
"responses": [
8485
{"put": {
@@ -112,9 +113,11 @@ func TestSendKVBatch(t *testing.T) {
112113
require.NoError(t, err)
113114

114115
// Clean and check the BatchResponse output, by removing first line
115-
// (contains input command) and emptying out all HLC timestamp objects.
116+
// (contains input command), emptying out all HLC timestamp objects,
117+
// and zeroing out the cpuTime value.
116118
output = strings.SplitN(output, "\n", 2)[1]
117119
output = regexp.MustCompile(`(?s)\{\s*"wallTime":.*?\}`).ReplaceAllString(output, "{}")
120+
output = regexp.MustCompile(`"cpuTime"\s*:\s*("[^"]*"|\d+)`).ReplaceAllString(output, `"cpuTime": {}`)
118121
require.JSONEq(t, jsonResponse, output)
119122

120123
// Check that a structured log event was emitted.

pkg/kv/kvclient/kvstreamer/streamer.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,9 @@ type Streamer struct {
307307
type streamerStatistics struct {
308308
atomics struct {
309309
kvPairsRead *int64
310+
// kvCPUTime tracks the cumulative CPU time (in nanoseconds) that KV
311+
// reports in the BatchResponse headers.
312+
kvCPUTime *int64
310313
// batchRequestsIssued tracks the number of BatchRequests issued by the
311314
// Streamer. Note that this number is only used for the logging done by
312315
// the Streamer itself and is separate from any possible bookkeeping
@@ -381,6 +384,8 @@ type sendFn func(context.Context, *kvpb.BatchRequest) (*kvpb.BatchResponse, erro
381384
//
382385
// kvPairsRead should be incremented atomically with the sum of NumKeys
383386
// parameters of all received responses.
387+
// kvCPUTime should be incremented atomically with the CPUTime from BatchResponse
388+
// headers.
384389
func NewStreamer(
385390
distSender *kvcoord.DistSender,
386391
metrics *Metrics,
@@ -393,6 +398,7 @@ func NewStreamer(
393398
limitBytes int64,
394399
acc *mon.BoundAccount,
395400
kvPairsRead *int64,
401+
kvCPUTime *int64,
396402
lockStrength lock.Strength,
397403
lockDurability lock.Durability,
398404
reverse bool,
@@ -430,6 +436,10 @@ func NewStreamer(
430436
kvPairsRead = new(int64)
431437
}
432438
s.atomics.kvPairsRead = kvPairsRead
439+
if kvCPUTime == nil {
440+
kvCPUTime = new(int64)
441+
}
442+
s.atomics.kvCPUTime = kvCPUTime
433443
s.coordinator = workerCoordinator{
434444
s: s,
435445
sendFn: sendFn,
@@ -1452,6 +1462,11 @@ func (w *workerCoordinator) performRequestAsync(
14521462
}
14531463
atomic.AddInt64(&w.s.atomics.batchRequestsIssued, 1)
14541464

1465+
// Accumulate CPU time from the BatchResponse header.
1466+
if br.CPUTime > 0 {
1467+
atomic.AddInt64(w.s.atomics.kvCPUTime, br.CPUTime)
1468+
}
1469+
14551470
// First, we have to reconcile the memory budget. We do it
14561471
// separately from processing the results because we want to know
14571472
// how many Gets and Scans need to be allocated for the ResumeSpans,

pkg/kv/kvclient/kvstreamer/streamer_accounting_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ func TestStreamerMemoryAccounting(t *testing.T) {
100100
math.MaxInt64,
101101
&acc,
102102
nil, /* kvPairsRead */
103+
nil, /* kvCPUTime */
103104
lock.None,
104105
lock.Unreplicated,
105106
reverse,

pkg/kv/kvclient/kvstreamer/streamer_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func getStreamer(
7070
limitBytes,
7171
acc,
7272
nil, /* kvPairsRead */
73+
nil, /* kvCPUTime */
7374
lock.None,
7475
lock.Unreplicated,
7576
reverse,
@@ -131,6 +132,7 @@ func TestStreamerLimitations(t *testing.T) {
131132
math.MaxInt64, /* limitBytes */
132133
nil, /* acc */
133134
nil, /* kvPairsRead */
135+
nil, /* kvCpuTime */
134136
lock.None,
135137
lock.Unreplicated,
136138
false, /* reverse */

pkg/kv/kvpb/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -754,6 +754,7 @@ func (h *BatchResponse_Header) combine(o BatchResponse_Header) error {
754754
h.Now.Forward(o.Now)
755755
h.RangeInfos = append(h.RangeInfos, o.RangeInfos...)
756756
h.CollectedSpans = append(h.CollectedSpans, o.CollectedSpans...)
757+
h.CPUTime += o.CPUTime
757758
return nil
758759
}
759760

pkg/kv/kvpb/api.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3207,6 +3207,10 @@ message BatchResponse {
32073207
// The field is cleared by the DistSender because it refers routing
32083208
// information not exposed by the KV API.
32093209
repeated RangeInfo range_infos = 7 [(gogoproto.nullable) = false];
3210+
// CPUTime is tracked in Replica.Send, meaning it only tracks the CPU time
3211+
// for the request evaluation goroutine, and not the other replication related
3212+
// work. The value is either zero (unset) or positive.
3213+
int64 cpu_time = 8 [(gogoproto.customname) = "CPUTime"];
32103214
// NB: if you add a field here, don't forget to update combine().
32113215
}
32123216
Header header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];

pkg/kv/kvpb/batch_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -705,7 +705,8 @@ func TestBatchResponseCombine(t *testing.T) {
705705
)
706706
brTxn := &BatchResponse{
707707
BatchResponse_Header: BatchResponse_Header{
708-
Txn: &txn,
708+
Txn: &txn,
709+
CPUTime: 123,
709710
},
710711
}
711712
if err := br.Combine(context.Background(), brTxn, nil, &BatchRequest{}); err != nil {
@@ -714,6 +715,9 @@ func TestBatchResponseCombine(t *testing.T) {
714715
if br.Txn.Name != "test" {
715716
t.Fatal("Combine() did not update the header")
716717
}
718+
if br.CPUTime != 123 {
719+
t.Fatalf("Combine() did not accumulate CPUTime: expected 123, got %d", br.CPUTime)
720+
}
717721
}
718722

719723
br.Responses = make([]ResponseUnion, 1)

pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,11 @@ func sortTargetCandidateSetAndPick(
703703
return cands.candidates[j].StoreID
704704
}
705705

706+
// ensureAnalyzedConstraints ensures that the constraints field of rangeState is
707+
// populated. It uses rangeState.{replicas,conf} as inputs to the computation.
708+
//
709+
// NB: Caller is responsible for calling clearAnalyzedConstraints when rstate or
710+
// the rstate.constraints is no longer needed.
706711
func (cs *clusterState) ensureAnalyzedConstraints(rstate *rangeState) {
707712
if rstate.constraints != nil {
708713
return

0 commit comments

Comments
 (0)