Skip to content

Commit dac332e

Browse files
committed
distsql: emit cancellation error in MoveToDraining
Previously if the context used to run a processor was canceled, most of the time the emitted error would be that context cancellation, however this depended on the exact implementation of each processor and the code it called reliably returning the context cancelled error exactly in this case. In at least some places, this is not reliably done. This changes DistSQL's base processor to ensure that if the context it is using to run a processor is canceled, the error emitted in MoveToDraining is always the context cancellation error, by overriding the error passed with the context error in MoveToDraining if the former is set. This ensures callers inspecting errors to determine if they are due to cancellation can always rely a cancelled flow's emitted error being a cancellation error. Fixes #156337. Release note: none. Epic: none.
1 parent 5134873 commit dac332e

File tree

1 file changed

+15
-1
lines changed

1 file changed

+15
-1
lines changed

pkg/sql/execinfra/processorsbase.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,9 @@ const (
526526
// at init() time), then we move straight to the StateTrailingMeta.
527527
//
528528
// An error can be optionally passed. It will be the first piece of metadata
529-
// returned by DrainHelper().
529+
// returned by DrainHelper(), unless the processor's context has already been
530+
// canceled, in which case the context's error is returned instead (as often an
531+
// error passed to MoveToDraining() is a consequence of context cancellation).
530532
//
531533
// MoveToDraining should only be called from the main goroutine of the
532534
// processor.
@@ -547,6 +549,18 @@ func (pb *ProcessorBaseNoHelper) MoveToDraining(err error) {
547549
}
548550

549551
if err != nil {
552+
// If processor ctx was canceled, reply with that err rather than whatever
553+
// error was passed to MoveToDraining by a processor running on top of a
554+
// canceled context, which is expected to error. Generally the error passed
555+
// in this case will be context.Canceled anyway, but doing this ensures that
556+
// distsql can promise that if it cancels a context, the emitted error will
557+
// reflect that, making cancellation detectable by callers.
558+
if pb.Ctx().Err() != nil {
559+
if !errors.Is(err, pb.Ctx().Err()) {
560+
log.Dev.Warningf(pb.Ctx(), "overriding non-cancelation emitted after context cancellation: %+v", err)
561+
}
562+
err = pb.Ctx().Err()
563+
}
550564
pb.trailingMeta = append(pb.trailingMeta, execinfrapb.ProducerMetadata{Err: err})
551565
}
552566
if pb.curInputToDrain < len(pb.inputsToDrain) {

0 commit comments

Comments
 (0)