Skip to content

Commit 6b92909

Browse files
committed
kvserver,storage: recreate long-lived iterators in replica consistency check
This capability is enabled by changing the ComputeStatsVisitors callbacks to specify a resumeSoon bool return value. This is used in computeStatsForIterWithVisitors to return before finishing all the work and the returned resumeKey is used in ComputeStatsWithVisitors to resume using a new iterator. Similarly, computeLockTableStatsWithVisitors uses the resumeSoon to construct new iterators when doing the lock table iteration. To ensure correctness, the resumeSoon=true feature must only be used when the storage.Reader returns true from Reader.ConsistentIterators, which makes the promise that the different Iterators returned by the Reader see the same underlying Engine state. The replica consistency check uses a Pebble snapshot, for which the iterators are consistent. Fixes #154533 Epic: none Release note: None
1 parent 076a595 commit 6b92909

File tree

2 files changed

+314
-142
lines changed

2 files changed

+314
-142
lines changed

pkg/kv/kvserver/replica_consistency.go

Lines changed: 84 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/cockroachdb/cockroach/pkg/util/stop"
3838
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
3939
"github.com/cockroachdb/cockroach/pkg/util/uuid"
40+
"github.com/cockroachdb/crlib/crtime"
4041
"github.com/cockroachdb/errors"
4142
"github.com/cockroachdb/redact"
4243
)
@@ -488,133 +489,164 @@ func CalcReplicaDigest(
488489
// amortize the overhead of the limiter when reading many small KVs.
489490
var batchSize int64
490491
const targetBatchSize = int64(256 << 10) // 256 KiB
491-
wait := func(size int64) error {
492+
const uninitializedResumeSoonTime crtime.Mono = crtime.Mono(-1)
493+
// lastResumeSoonTime is used to return resumeSoon=true. It is oblivious to
494+
// the fact that ComputeStatsWithVisitors first uses an iterator over lock
495+
// table keys and subsequently a different iterator over point and range
496+
// keys. If we spend 5s on the first, it will deduct from the resumption
497+
// duration of the second iteration. We are ok with this imprecision
498+
// (instead of complicating the callback interface), since the bulk of the
499+
// iteration in the common case will be spent on the point and range key
500+
// iteration.
501+
lastResumeSoonTime := uninitializedResumeSoonTime
502+
wait := func(size int64) (resumeSoon bool, _ error) {
503+
if lastResumeSoonTime == uninitializedResumeSoonTime {
504+
lastResumeSoonTime = crtime.NowMono()
505+
}
492506
if batchSize += size; batchSize < targetBatchSize {
493-
return nil
507+
return false, nil
494508
}
495509
tokens := batchSize
496510
batchSize = 0
497-
return limiter.WaitN(ctx, tokens)
511+
if err := limiter.WaitN(ctx, tokens); err != nil {
512+
return false, err
513+
}
514+
if crtime.NowMono().Sub(lastResumeSoonTime) > storage.SnapshotRecreateIterDuration.Get(&settings.SV) ||
515+
util.RaceEnabled {
516+
// NB: we will start returning resumeSoon=false in subsequent calls,
517+
// even if the caller has not resumed yet. This is permitted by the
518+
// contract documented in ComputeStatsVisitors. Also, the next call to
519+
// wait will initialize lastResumeSoonTime, which can be earlier than
520+
// the actual resumption, so it isn't very precise, but it is the best
521+
// we can do. And this imprecision should not matter given the coarse
522+
// granularity of SnapshotRecreateIterDuration.
523+
lastResumeSoonTime = uninitializedResumeSoonTime
524+
return true, nil
525+
}
526+
return false, nil
498527
}
499528

500529
var visitors storage.ComputeStatsVisitors
501530

502-
visitors.PointKey = func(unsafeKey storage.MVCCKey, unsafeValue []byte) error {
531+
visitors.PointKey = func(unsafeKey storage.MVCCKey, unsafeValue []byte) (resumeSoon bool, _ error) {
503532
// Rate limit the scan through the range.
504-
if err := wait(int64(len(unsafeKey.Key) + len(unsafeValue))); err != nil {
505-
return err
533+
var err error
534+
if resumeSoon, err = wait(int64(len(unsafeKey.Key) + len(unsafeValue))); err != nil {
535+
return false, err
506536
}
507537
// Encode the length of the key and value.
508538
binary.LittleEndian.PutUint64(intBuf[:], uint64(len(unsafeKey.Key)))
509-
if _, err := hasher.Write(intBuf[:]); err != nil {
510-
return err
539+
if _, err = hasher.Write(intBuf[:]); err != nil {
540+
return false, err
511541
}
512542
binary.LittleEndian.PutUint64(intBuf[:], uint64(len(unsafeValue)))
513-
if _, err := hasher.Write(intBuf[:]); err != nil {
514-
return err
543+
if _, err = hasher.Write(intBuf[:]); err != nil {
544+
return false, err
515545
}
516546
// Encode the key.
517-
if _, err := hasher.Write(unsafeKey.Key); err != nil {
518-
return err
547+
if _, err = hasher.Write(unsafeKey.Key); err != nil {
548+
return false, err
519549
}
520550
timestamp = unsafeKey.Timestamp
521551
if size := timestamp.Size(); size > cap(timestampBuf) {
522552
timestampBuf = make([]byte, size)
523553
} else {
524554
timestampBuf = timestampBuf[:size]
525555
}
526-
if _, err := protoutil.MarshalToSizedBuffer(&timestamp, timestampBuf); err != nil {
527-
return err
556+
if _, err = protoutil.MarshalToSizedBuffer(&timestamp, timestampBuf); err != nil {
557+
return false, err
528558
}
529-
if _, err := hasher.Write(timestampBuf); err != nil {
530-
return err
559+
if _, err = hasher.Write(timestampBuf); err != nil {
560+
return false, err
531561
}
532562
// Encode the value.
533-
_, err := hasher.Write(unsafeValue)
534-
return err
563+
_, err = hasher.Write(unsafeValue)
564+
return resumeSoon, err
535565
}
536566

537-
visitors.RangeKey = func(rangeKV storage.MVCCRangeKeyValue) error {
567+
visitors.RangeKey = func(rangeKV storage.MVCCRangeKeyValue) (resumeSoon bool, _ error) {
538568
// Rate limit the scan through the range.
539-
err := wait(
569+
var err error
570+
resumeSoon, err = wait(
540571
int64(len(rangeKV.RangeKey.StartKey) + len(rangeKV.RangeKey.EndKey) + len(rangeKV.Value)))
541572
if err != nil {
542-
return err
573+
return false, err
543574
}
544575
// Encode the length of the start key and end key.
545576
binary.LittleEndian.PutUint64(intBuf[:], uint64(len(rangeKV.RangeKey.StartKey)))
546-
if _, err := hasher.Write(intBuf[:]); err != nil {
547-
return err
577+
if _, err = hasher.Write(intBuf[:]); err != nil {
578+
return false, err
548579
}
549580
binary.LittleEndian.PutUint64(intBuf[:], uint64(len(rangeKV.RangeKey.EndKey)))
550-
if _, err := hasher.Write(intBuf[:]); err != nil {
551-
return err
581+
if _, err = hasher.Write(intBuf[:]); err != nil {
582+
return false, err
552583
}
553584
binary.LittleEndian.PutUint64(intBuf[:], uint64(len(rangeKV.Value)))
554-
if _, err := hasher.Write(intBuf[:]); err != nil {
555-
return err
585+
if _, err = hasher.Write(intBuf[:]); err != nil {
586+
return false, err
556587
}
557588
// Encode the key.
558-
if _, err := hasher.Write(rangeKV.RangeKey.StartKey); err != nil {
559-
return err
589+
if _, err = hasher.Write(rangeKV.RangeKey.StartKey); err != nil {
590+
return false, err
560591
}
561-
if _, err := hasher.Write(rangeKV.RangeKey.EndKey); err != nil {
562-
return err
592+
if _, err = hasher.Write(rangeKV.RangeKey.EndKey); err != nil {
593+
return false, err
563594
}
564595
timestamp = rangeKV.RangeKey.Timestamp
565596
if size := timestamp.Size(); size > cap(timestampBuf) {
566597
timestampBuf = make([]byte, size)
567598
} else {
568599
timestampBuf = timestampBuf[:size]
569600
}
570-
if _, err := protoutil.MarshalToSizedBuffer(&timestamp, timestampBuf); err != nil {
571-
return err
601+
if _, err = protoutil.MarshalToSizedBuffer(&timestamp, timestampBuf); err != nil {
602+
return false, err
572603
}
573-
if _, err := hasher.Write(timestampBuf); err != nil {
574-
return err
604+
if _, err = hasher.Write(timestampBuf); err != nil {
605+
return false, err
575606
}
576607
// Encode the value.
577608
_, err = hasher.Write(rangeKV.Value)
578-
return err
609+
return resumeSoon, err
579610
}
580611

581-
visitors.LockTableKey = func(unsafeKey storage.LockTableKey, unsafeValue []byte) error {
612+
visitors.LockTableKey = func(unsafeKey storage.LockTableKey, unsafeValue []byte) (resumeSoon bool, _ error) {
582613
// Assert that the lock is not an intent. Intents are handled by the
583614
// PointKey visitor function, not by the LockTableKey visitor function.
584615
if unsafeKey.Strength == lock.Intent {
585-
return errors.AssertionFailedf("unexpected intent lock in LockTableKey visitor: %s", unsafeKey)
616+
return false, errors.AssertionFailedf("unexpected intent lock in LockTableKey visitor: %s", unsafeKey)
586617
}
587618
// Rate limit the scan through the lock table.
588-
if err := wait(int64(len(unsafeKey.Key) + len(unsafeValue))); err != nil {
589-
return err
619+
var err error
620+
if resumeSoon, err = wait(int64(len(unsafeKey.Key) + len(unsafeValue))); err != nil {
621+
return false, err
590622
}
591623
// Encode the length of the key and value.
592624
binary.LittleEndian.PutUint64(intBuf[:], uint64(len(unsafeKey.Key)))
593-
if _, err := hasher.Write(intBuf[:]); err != nil {
594-
return err
625+
if _, err = hasher.Write(intBuf[:]); err != nil {
626+
return false, err
595627
}
596628
binary.LittleEndian.PutUint64(intBuf[:], uint64(len(unsafeValue)))
597-
if _, err := hasher.Write(intBuf[:]); err != nil {
598-
return err
629+
if _, err = hasher.Write(intBuf[:]); err != nil {
630+
return false, err
599631
}
600632
// Encode the key.
601-
if _, err := hasher.Write(unsafeKey.Key); err != nil {
602-
return err
633+
if _, err = hasher.Write(unsafeKey.Key); err != nil {
634+
return false, err
603635
}
604636
// NOTE: this is not the same strength encoding that the actual lock
605637
// table version uses. For that, see getByteForReplicatedLockStrength.
606638
strengthBuf := intBuf[:1]
607639
strengthBuf[0] = byte(unsafeKey.Strength)
608-
if _, err := hasher.Write(strengthBuf); err != nil {
609-
return err
640+
if _, err = hasher.Write(strengthBuf); err != nil {
641+
return false, err
610642
}
611643
copy(uuidBuf[:], unsafeKey.TxnUUID.GetBytes())
612-
if _, err := hasher.Write(uuidBuf[:]); err != nil {
613-
return err
644+
if _, err = hasher.Write(uuidBuf[:]); err != nil {
645+
return false, err
614646
}
615647
// Encode the value.
616-
_, err := hasher.Write(unsafeValue)
617-
return err
648+
_, err = hasher.Write(unsafeValue)
649+
return resumeSoon, err
618650
}
619651

620652
// In statsOnly mode, we hash only the RangeAppliedState. In regular mode, hash

0 commit comments

Comments
 (0)