Skip to content

Commit 1d3dc6f

Browse files
craig[bot]sumeerbholaspilchenstevendanna
committed
156395: kvserver,storage: recreate long-lived iterators in replica consistenc… r=sumeerbhola a=sumeerbhola …y check This capability is enabled by changing the `ComputeStatsVisitors` callbacks to specify a `resumeSoon` bool return value. This is used in `computeStatsForIterWithVisitors` to return before finishing all the work and the returned resumeKey is used in `ComputeStatsWithVisitors` to resume using a new iterator. Similarly, `computeLockTableStatsWithVisitors` uses the resumeSoon to construct new iterators when doing the lock table iteration. To ensure correctness, the `resumeSoon=true` feature must only be used when the `storage.Reader` returns true from `Reader.ConsistentIterators`, which makes the promise that the different Iterators returned by the `Reader` see the same underlying `Engine` state. The replica consistency check uses a Pebble snapshot, for which the iterators are consistent. Fixes #154533 Epic: none Release note: None 158216: sql/inspect: record table version in INSPECT job to detect schema drift r=spilchen a=spilchen Previously, the INSPECT job only recorded the table IDs involved in the job. However, there is a time gap between when the job record is created and when the job is executed, during which schema changes amy occur. These changes can invalidate assumptions made at job creation (e.g. index ID existence). This change introduces table version tracking into the job record. Before executing any checks, the job will verify that the current table versions match those stored at creation. If a mismatch is detected, the job will fail early. This mechanism mirrors similar protections implemented in other jobs, such as TTL jobs. Informs #158197 Release note: none 158330: kvcoord: add traces to TestUnexpectedCommitOnTxnRecovery r=miraradeva a=stevendanna We've recently observed a failure in this test that was hard to debug because it appears that the test was running for a long time and then hit an assesrtion failure inside a goroutine. Here, ensure that we don't fail the test from inside a goroutine and also try to ensure that the test fails a bit more promptly by setting a context timeout. We've also added traces that should be printed in the case of a failure to help debug what happened. Informs #158194 Release note: None Co-authored-by: sumeerbhola <sumeer@cockroachlabs.com> Co-authored-by: Matt Spilchen <matt.spilchen@cockroachlabs.com> Co-authored-by: Steven Danna <danna@cockroachlabs.com>
4 parents 1ef7f73 + 6b92909 + ebe8d76 + cb99431 commit 1d3dc6f

File tree

9 files changed

+549
-193
lines changed

9 files changed

+549
-193
lines changed

pkg/jobs/jobspb/jobs.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1474,6 +1474,14 @@ message InspectDetails {
14741474
(gogoproto.customname) = "IndexID",
14751475
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.IndexID"
14761476
];
1477+
1478+
// TableVersion records the descriptor version observed when the check was
1479+
// planned. It is used at execution time to detect concurrent schema changes
1480+
// and abort the job before running inconsistent checks.
1481+
uint64 table_version = 4 [
1482+
(gogoproto.customname) = "TableVersion",
1483+
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.DescriptorVersion"
1484+
];
14771485
}
14781486

14791487
// Checks is the list of individual checks this job will perform.

pkg/kv/kvclient/kvcoord/dist_sender_server_test.go

Lines changed: 101 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
4242
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
4343
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
44+
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
4445
"github.com/cockroachdb/cockroach/pkg/util/hlc"
4546
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
4647
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -4438,9 +4439,8 @@ func TestUnexpectedCommitOnTxnRecovery(t *testing.T) {
44384439
return nil
44394440
},
44404441
TestingApplyCalledTwiceFilter: func(fArgs kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) {
4441-
if fArgs.CmdID == cmdID.Load().(kvserverbase.CmdIDKey) {
4442-
t.Logf("failing application for raft cmdID: %s", cmdID)
4443-
4442+
if cID := cmdID.Load().(kvserverbase.CmdIDKey); fArgs.CmdID == cID {
4443+
t.Logf("failing application for raft cmdID: %s", cID)
44444444
return 0, kvpb.NewErrorf("test injected error")
44454445
}
44464446
return 0, nil
@@ -4472,75 +4472,139 @@ func TestUnexpectedCommitOnTxnRecovery(t *testing.T) {
44724472
)
44734473
db := kv.NewDB(ambient, tsf, s.Clock(), s.Stopper())
44744474

4475-
startTxn2 := make(chan struct{})
4475+
startTxn2Ch := make(chan struct{})
4476+
var startTxn2Once sync.Once
4477+
startTxn2 := func() {
4478+
startTxn2Once.Do(func() { close(startTxn2Ch) })
4479+
}
44764480
blockCh := make(chan struct{})
4477-
var wg sync.WaitGroup
4478-
wg.Add(1)
44794481

44804482
// Write to keyB so that we can later get a lock on it.
4483+
valueBytes := []byte("valueB")
44814484
txn := db.NewTxn(ctx, "txn")
4482-
err := txn.Put(ctx, keyB, "valueB")
4485+
err := txn.Put(ctx, keyB, valueBytes)
44834486
require.NoError(t, err)
44844487
require.NoError(t, txn.Commit(ctx))
44854488

4486-
go func() {
4487-
defer wg.Done()
4489+
ctx, cancel := context.WithTimeout(context.Background(), testutils.SucceedsSoonDuration())
4490+
defer cancel()
4491+
4492+
tracer := s.TracerI().(*tracing.Tracer)
4493+
txn1Ctx, collectAndFinishTxn1 := tracing.ContextWithRecordingSpan(ctx, tracer, "txn1")
4494+
defer func() {
4495+
rec := collectAndFinishTxn1()
4496+
if t.Failed() {
4497+
t.Logf("TXN 1 TRACE: %s", rec)
4498+
}
4499+
}()
44884500

4489-
err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
4501+
g := ctxgroup.WithContext(txn1Ctx)
4502+
g.GoCtx(func(ctx context.Context) error {
4503+
return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
4504+
// If we exit early, unblock main thread to make sure we observe the error.
4505+
defer startTxn2()
4506+
t.Logf("txn 1: id=%s epo=%d", txn.ID(), txn.Epoch())
44904507
if txnID := targetTxnIDString.Load(); txnID == "" {
44914508
// Store the txnID for the testing knobs.
4509+
t.Logf("txn 1: ID set for testing knob")
44924510
targetTxnIDString.Store(txn.ID().String())
4493-
t.Logf("txn1 ID is: %s", txn.ID())
44944511
} else if txnID != txn.ID() {
44954512
// Since txn recovery aborted us, we get retried again but with an
44964513
// entirely new txnID. This time we just return. Writing nothing.
44974514
return nil
44984515
}
44994516

4517+
expectErrorMatch := func(err error, s string) error {
4518+
if err == nil {
4519+
return errors.New("unexpected nil error")
4520+
} else if !strings.Contains(err.Error(), s) {
4521+
return errors.Wrapf(err, "unexpected error does not contain %q", s)
4522+
}
4523+
return nil
4524+
}
4525+
expectResultMatch := func(res kv.KeyValue, value []byte) error {
4526+
if !bytes.Equal(res.ValueBytes(), valueBytes) {
4527+
return errors.Errorf("unexpected result: %v != %v", res.ValueBytes(), valueBytes)
4528+
}
4529+
return nil
4530+
}
4531+
45004532
switch txn.Epoch() {
45014533
case 0:
45024534
err := txn.Put(ctx, keyA, "value")
45034535
require.NoError(t, err)
45044536
res, err := txn.GetForUpdate(ctx, keyB, kvpb.BestEffort)
4505-
require.NoError(t, err)
4506-
require.Equal(t, res.ValueBytes(), []byte("valueB"))
4537+
if err != nil {
4538+
return err
4539+
}
4540+
if err := expectResultMatch(res, valueBytes); err != nil {
4541+
return err
4542+
}
4543+
45074544
res, err = txn.GetForShare(ctx, keyB, kvpb.GuaranteedDurability)
4508-
require.NoError(t, err)
4509-
require.Equal(t, res.ValueBytes(), []byte("valueB"))
4545+
if err != nil {
4546+
return err
4547+
4548+
}
4549+
if err := expectResultMatch(res, valueBytes); err != nil {
4550+
return err
4551+
}
4552+
45104553
err = txn.Commit(ctx)
4511-
require.Error(t, err)
4512-
require.ErrorContains(t, err, "RETRY_ASYNC_WRITE_FAILURE")
4554+
if matchErr := expectErrorMatch(err, "RETRY_ASYNC_WRITE_FAILURE"); matchErr != nil {
4555+
return matchErr
4556+
}
45134557
// Transfer the lease to n3.
45144558
transferLease(2)
4515-
close(startTxn2)
4516-
// Block until Txn2 recovers us.
4517-
<-blockCh
4559+
t.Logf("txn 1: allowing txn 2 to start")
4560+
startTxn2()
4561+
// Block until Txn2 recover (and aborts) us.
4562+
select {
4563+
case <-blockCh:
4564+
case <-ctx.Done():
4565+
return errors.Wrap(ctx.Err(), "txn1 waiting for unblock from txn2")
4566+
}
4567+
4568+
t.Logf("txn 1: resuming execution")
45184569
return err
45194570
case 1:
45204571
// When retrying the write failure we should discover that txn recovery
45214572
// has aborted this transaction.
45224573
err := txn.Put(ctx, keyA, "value")
4523-
require.Error(t, err)
4524-
require.ErrorContains(t, err, "ABORT_REASON_ABORT_SPAN")
4574+
if matchErr := expectErrorMatch(err, "ABORT_REASON_ABORT_SPAN"); matchErr != nil {
4575+
return matchErr
4576+
}
45254577
return err
45264578
default:
4527-
t.Errorf("unexpected epoch: %d", txn.Epoch())
4579+
return errors.Errorf("unexpected epoch: %d", txn.Epoch())
45284580
}
4529-
return nil
45304581
})
4531-
require.NoError(t, err)
4532-
}()
4533-
<-startTxn2
4582+
})
4583+
4584+
// Txn2 just runs on the main goroutine.
4585+
select {
4586+
case <-startTxn2Ch:
4587+
case <-ctx.Done():
4588+
t.Fatal(errors.Wrap(ctx.Err(), "txn2 waiting for start signal from txn1"))
4589+
}
45344590

4535-
txn2 := db.NewTxn(ctx, "txn2")
4536-
res, err := txn2.Get(ctx, keyA)
4591+
txn2Ctx, collectAndFinishTxn2 := tracing.ContextWithRecordingSpan(context.Background(), tracer, "txn1")
4592+
defer func() {
4593+
rec := collectAndFinishTxn2()
4594+
if t.Failed() {
4595+
t.Logf("TXN 2 TRACE: %s", rec)
4596+
}
4597+
}()
4598+
txn2 := db.NewTxn(txn2Ctx, "txn2")
4599+
t.Logf("txn 2: id=%s, epo=%d", txn2.ID(), txn2.Epoch())
4600+
res, err := txn2.Get(txn2Ctx, keyA)
45374601
require.NoError(t, err)
4538-
// NB: Nothing should exist on keyA, because txn1 didn't commit at epoch 1 (or
4539-
// any epoch, for that matter).
4602+
// NB: Nothing should exist on keyA, because txn1 didn't commit at epoch 1
4603+
// (or any epoch, for that matter).
45404604
require.False(t, res.Exists())
4541-
45424605
close(blockCh)
4543-
wg.Wait()
4606+
// Txn1 should now complete. We return a nil error after observing an abort.
4607+
require.NoError(t, g.Wait())
45444608
}
45454609

45464610
// TestUnprocessedWritesHaveResumeSpanSet tests that writes that weren't
@@ -4698,8 +4762,8 @@ func TestRollbackAfterRefreshAndFailedCommit(t *testing.T) {
46984762
return nil
46994763
},
47004764
TestingApplyCalledTwiceFilter: func(fArgs kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) {
4701-
if fArgs.CmdID == cmdID.Load().(kvserverbase.CmdIDKey) {
4702-
t.Logf("failing application for raft cmdID: %s", fArgs.CmdID)
4765+
if cID := cmdID.Load().(kvserverbase.CmdIDKey); fArgs.CmdID == cID {
4766+
t.Logf("failing application for raft cmdID: %s", cID)
47034767
return 0, kvpb.NewErrorf("test injected error")
47044768
}
47054769
return 0, nil
@@ -4839,8 +4903,8 @@ func TestUnexpectedCommitOnTxnAbortAfterRefresh(t *testing.T) {
48394903
return nil
48404904
},
48414905
TestingApplyCalledTwiceFilter: func(fArgs kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) {
4842-
if fArgs.CmdID == cmdID.Load().(kvserverbase.CmdIDKey) {
4843-
t.Logf("failing application for raft cmdID: %s", fArgs.CmdID)
4906+
if cID := cmdID.Load().(kvserverbase.CmdIDKey); fArgs.CmdID == cID {
4907+
t.Logf("failing application for raft cmdID: %s", cID)
48444908
return 0, kvpb.NewErrorf("test injected error")
48454909
}
48464910
return 0, nil

0 commit comments

Comments
 (0)