@@ -41,6 +41,7 @@ import (
4141 "github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
4242 "github.com/cockroachdb/cockroach/pkg/testutils/skip"
4343 "github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
44+ "github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
4445 "github.com/cockroachdb/cockroach/pkg/util/hlc"
4546 "github.com/cockroachdb/cockroach/pkg/util/leaktest"
4647 "github.com/cockroachdb/cockroach/pkg/util/log"
@@ -4438,9 +4439,8 @@ func TestUnexpectedCommitOnTxnRecovery(t *testing.T) {
44384439 return nil
44394440 },
44404441 TestingApplyCalledTwiceFilter : func (fArgs kvserverbase.ApplyFilterArgs ) (int , * kvpb.Error ) {
4441- if fArgs .CmdID == cmdID .Load ().(kvserverbase.CmdIDKey ) {
4442- t .Logf ("failing application for raft cmdID: %s" , cmdID )
4443-
4442+ if cID := cmdID .Load ().(kvserverbase.CmdIDKey ); fArgs .CmdID == cID {
4443+ t .Logf ("failing application for raft cmdID: %s" , cID )
44444444 return 0 , kvpb .NewErrorf ("test injected error" )
44454445 }
44464446 return 0 , nil
@@ -4472,75 +4472,139 @@ func TestUnexpectedCommitOnTxnRecovery(t *testing.T) {
44724472 )
44734473 db := kv .NewDB (ambient , tsf , s .Clock (), s .Stopper ())
44744474
4475- startTxn2 := make (chan struct {})
4475+ startTxn2Ch := make (chan struct {})
4476+ var startTxn2Once sync.Once
4477+ startTxn2 := func () {
4478+ startTxn2Once .Do (func () { close (startTxn2Ch ) })
4479+ }
44764480 blockCh := make (chan struct {})
4477- var wg sync.WaitGroup
4478- wg .Add (1 )
44794481
44804482 // Write to keyB so that we can later get a lock on it.
4483+ valueBytes := []byte ("valueB" )
44814484 txn := db .NewTxn (ctx , "txn" )
4482- err := txn .Put (ctx , keyB , "valueB" )
4485+ err := txn .Put (ctx , keyB , valueBytes )
44834486 require .NoError (t , err )
44844487 require .NoError (t , txn .Commit (ctx ))
44854488
4486- go func () {
4487- defer wg .Done ()
4489+ ctx , cancel := context .WithTimeout (context .Background (), testutils .SucceedsSoonDuration ())
4490+ defer cancel ()
4491+
4492+ tracer := s .TracerI ().(* tracing.Tracer )
4493+ txn1Ctx , collectAndFinishTxn1 := tracing .ContextWithRecordingSpan (ctx , tracer , "txn1" )
4494+ defer func () {
4495+ rec := collectAndFinishTxn1 ()
4496+ if t .Failed () {
4497+ t .Logf ("TXN 1 TRACE: %s" , rec )
4498+ }
4499+ }()
44884500
4489- err := db .Txn (ctx , func (ctx context.Context , txn * kv.Txn ) error {
4501+ g := ctxgroup .WithContext (txn1Ctx )
4502+ g .GoCtx (func (ctx context.Context ) error {
4503+ return db .Txn (ctx , func (ctx context.Context , txn * kv.Txn ) error {
4504+ // If we exit early, unblock main thread to make sure we observe the error.
4505+ defer startTxn2 ()
4506+ t .Logf ("txn 1: id=%s epo=%d" , txn .ID (), txn .Epoch ())
44904507 if txnID := targetTxnIDString .Load (); txnID == "" {
44914508 // Store the txnID for the testing knobs.
4509+ t .Logf ("txn 1: ID set for testing knob" )
44924510 targetTxnIDString .Store (txn .ID ().String ())
4493- t .Logf ("txn1 ID is: %s" , txn .ID ())
44944511 } else if txnID != txn .ID () {
44954512 // Since txn recovery aborted us, we get retried again but with an
44964513 // entirely new txnID. This time we just return. Writing nothing.
44974514 return nil
44984515 }
44994516
4517+ expectErrorMatch := func (err error , s string ) error {
4518+ if err == nil {
4519+ return errors .New ("unexpected nil error" )
4520+ } else if ! strings .Contains (err .Error (), s ) {
4521+ return errors .Wrapf (err , "unexpected error does not contain %q" , s )
4522+ }
4523+ return nil
4524+ }
4525+ expectResultMatch := func (res kv.KeyValue , value []byte ) error {
4526+ if ! bytes .Equal (res .ValueBytes (), valueBytes ) {
4527+ return errors .Errorf ("unexpected result: %v != %v" , res .ValueBytes (), valueBytes )
4528+ }
4529+ return nil
4530+ }
4531+
45004532 switch txn .Epoch () {
45014533 case 0 :
45024534 err := txn .Put (ctx , keyA , "value" )
45034535 require .NoError (t , err )
45044536 res , err := txn .GetForUpdate (ctx , keyB , kvpb .BestEffort )
4505- require .NoError (t , err )
4506- require .Equal (t , res .ValueBytes (), []byte ("valueB" ))
4537+ if err != nil {
4538+ return err
4539+ }
4540+ if err := expectResultMatch (res , valueBytes ); err != nil {
4541+ return err
4542+ }
4543+
45074544 res , err = txn .GetForShare (ctx , keyB , kvpb .GuaranteedDurability )
4508- require .NoError (t , err )
4509- require .Equal (t , res .ValueBytes (), []byte ("valueB" ))
4545+ if err != nil {
4546+ return err
4547+
4548+ }
4549+ if err := expectResultMatch (res , valueBytes ); err != nil {
4550+ return err
4551+ }
4552+
45104553 err = txn .Commit (ctx )
4511- require .Error (t , err )
4512- require .ErrorContains (t , err , "RETRY_ASYNC_WRITE_FAILURE" )
4554+ if matchErr := expectErrorMatch (err , "RETRY_ASYNC_WRITE_FAILURE" ); matchErr != nil {
4555+ return matchErr
4556+ }
45134557 // Transfer the lease to n3.
45144558 transferLease (2 )
4515- close (startTxn2 )
4516- // Block until Txn2 recovers us.
4517- <- blockCh
4559+ t .Logf ("txn 1: allowing txn 2 to start" )
4560+ startTxn2 ()
4561+ // Block until Txn2 recover (and aborts) us.
4562+ select {
4563+ case <- blockCh :
4564+ case <- ctx .Done ():
4565+ return errors .Wrap (ctx .Err (), "txn1 waiting for unblock from txn2" )
4566+ }
4567+
4568+ t .Logf ("txn 1: resuming execution" )
45184569 return err
45194570 case 1 :
45204571 // When retrying the write failure we should discover that txn recovery
45214572 // has aborted this transaction.
45224573 err := txn .Put (ctx , keyA , "value" )
4523- require .Error (t , err )
4524- require .ErrorContains (t , err , "ABORT_REASON_ABORT_SPAN" )
4574+ if matchErr := expectErrorMatch (err , "ABORT_REASON_ABORT_SPAN" ); matchErr != nil {
4575+ return matchErr
4576+ }
45254577 return err
45264578 default :
4527- t .Errorf ("unexpected epoch: %d" , txn .Epoch ())
4579+ return errors .Errorf ("unexpected epoch: %d" , txn .Epoch ())
45284580 }
4529- return nil
45304581 })
4531- require .NoError (t , err )
4532- }()
4533- <- startTxn2
4582+ })
4583+
4584+ // Txn2 just runs on the main goroutine.
4585+ select {
4586+ case <- startTxn2Ch :
4587+ case <- ctx .Done ():
4588+ t .Fatal (errors .Wrap (ctx .Err (), "txn2 waiting for start signal from txn1" ))
4589+ }
45344590
4535- txn2 := db .NewTxn (ctx , "txn2" )
4536- res , err := txn2 .Get (ctx , keyA )
4591+ txn2Ctx , collectAndFinishTxn2 := tracing .ContextWithRecordingSpan (context .Background (), tracer , "txn1" )
4592+ defer func () {
4593+ rec := collectAndFinishTxn2 ()
4594+ if t .Failed () {
4595+ t .Logf ("TXN 2 TRACE: %s" , rec )
4596+ }
4597+ }()
4598+ txn2 := db .NewTxn (txn2Ctx , "txn2" )
4599+ t .Logf ("txn 2: id=%s, epo=%d" , txn2 .ID (), txn2 .Epoch ())
4600+ res , err := txn2 .Get (txn2Ctx , keyA )
45374601 require .NoError (t , err )
4538- // NB: Nothing should exist on keyA, because txn1 didn't commit at epoch 1 (or
4539- // any epoch, for that matter).
4602+ // NB: Nothing should exist on keyA, because txn1 didn't commit at epoch 1
4603+ // (or any epoch, for that matter).
45404604 require .False (t , res .Exists ())
4541-
45424605 close (blockCh )
4543- wg .Wait ()
4606+ // Txn1 should now complete. We return a nil error after observing an abort.
4607+ require .NoError (t , g .Wait ())
45444608}
45454609
45464610// TestUnprocessedWritesHaveResumeSpanSet tests that writes that weren't
@@ -4698,8 +4762,8 @@ func TestRollbackAfterRefreshAndFailedCommit(t *testing.T) {
46984762 return nil
46994763 },
47004764 TestingApplyCalledTwiceFilter : func (fArgs kvserverbase.ApplyFilterArgs ) (int , * kvpb.Error ) {
4701- if fArgs . CmdID == cmdID .Load ().(kvserverbase.CmdIDKey ) {
4702- t .Logf ("failing application for raft cmdID: %s" , fArgs . CmdID )
4765+ if cID := cmdID .Load ().(kvserverbase.CmdIDKey ); fArgs . CmdID == cID {
4766+ t .Logf ("failing application for raft cmdID: %s" , cID )
47034767 return 0 , kvpb .NewErrorf ("test injected error" )
47044768 }
47054769 return 0 , nil
@@ -4839,8 +4903,8 @@ func TestUnexpectedCommitOnTxnAbortAfterRefresh(t *testing.T) {
48394903 return nil
48404904 },
48414905 TestingApplyCalledTwiceFilter : func (fArgs kvserverbase.ApplyFilterArgs ) (int , * kvpb.Error ) {
4842- if fArgs . CmdID == cmdID .Load ().(kvserverbase.CmdIDKey ) {
4843- t .Logf ("failing application for raft cmdID: %s" , fArgs . CmdID )
4906+ if cID := cmdID .Load ().(kvserverbase.CmdIDKey ); fArgs . CmdID == cID {
4907+ t .Logf ("failing application for raft cmdID: %s" , cID )
48444908 return 0 , kvpb .NewErrorf ("test injected error" )
48454909 }
48464910 return 0 , nil
0 commit comments