Skip to content

Commit 44a50a0

Browse files
craig[bot]dt
andcommitted
Merge #157145
157145: backup, jobs: reliably emit context cancelled on drain, treat as retryable r=dt a=dt Closes #156337. Co-authored-by: David Taylor <davidt@davidt.io>
2 parents 8b9e545 + dac332e commit 44a50a0

File tree

3 files changed

+21
-4
lines changed

3 files changed

+21
-4
lines changed

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -814,10 +814,10 @@ func (ca *changeAggregator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMet
814814
err = kvFeedErr
815815
}
816816
}
817-
// Shut down the poller if it wasn't already.
818-
ca.cancel()
819817
log.Changefeed.Warningf(ca.Ctx(), "moving to draining due to error from tick: %v", err)
820818
ca.MoveToDraining(err)
819+
// Shut down the poller if it wasn't already.
820+
ca.cancel()
821821
break
822822
}
823823
}

pkg/jobs/joberror/errors.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
package joberror
77

88
import (
9+
"context"
10+
911
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
1012
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
1113
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
@@ -32,5 +34,6 @@ func IsPermanentBulkJobError(err error) bool {
3234
!errors.Is(err, circuit.ErrBreakerOpen) &&
3335
!sysutil.IsErrConnectionReset(err) &&
3436
!sysutil.IsErrConnectionRefused(err) &&
35-
!errors.Is(err, sqlinstance.NonExistentInstanceError)
37+
!errors.Is(err, sqlinstance.NonExistentInstanceError) &&
38+
!errors.Is(err, context.Canceled)
3639
}

pkg/sql/execinfra/processorsbase.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,9 @@ const (
526526
// at init() time), then we move straight to the StateTrailingMeta.
527527
//
528528
// An error can be optionally passed. It will be the first piece of metadata
529-
// returned by DrainHelper().
529+
// returned by DrainHelper(), unless the processor's context has already been
530+
// canceled, in which case the context's error is returned instead (as often an
531+
// error passed to MoveToDraining() is a consequence of context cancellation).
530532
//
531533
// MoveToDraining should only be called from the main goroutine of the
532534
// processor.
@@ -547,6 +549,18 @@ func (pb *ProcessorBaseNoHelper) MoveToDraining(err error) {
547549
}
548550

549551
if err != nil {
552+
// If processor ctx was canceled, reply with that err rather than whatever
553+
// error was passed to MoveToDraining by a processor running on top of a
554+
// canceled context, which is expected to error. Generally the error passed
555+
// in this case will be context.Canceled anyway, but doing this ensures that
556+
// distsql can promise that if it cancels a context, the emitted error will
557+
// reflect that, making cancellation detectable by callers.
558+
if pb.Ctx().Err() != nil {
559+
if !errors.Is(err, pb.Ctx().Err()) {
560+
log.Dev.Warningf(pb.Ctx(), "overriding non-cancelation emitted after context cancellation: %+v", err)
561+
}
562+
err = pb.Ctx().Err()
563+
}
550564
pb.trailingMeta = append(pb.trailingMeta, execinfrapb.ProducerMetadata{Err: err})
551565
}
552566
if pb.curInputToDrain < len(pb.inputsToDrain) {

0 commit comments

Comments
 (0)