[SPARK-53339][CONNECT] Fix interrupt on pending operations by moving postStarted() and allowing Pending to Canceled/Failed transition#54774
Open
sarutak wants to merge 3 commits intoapache:masterfrom
Open
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This PR aims to solve SPARK-53339 using a different approach than #52083.
The issue is that interrupting an operation in
Pendingstate causes anIllegalStateExceptionand leaves the operation in a broken state where subsequent interrupts never work.The root cause is that in
SparkConnectExecutionManager#createExecuteHolderAndAttach, there was a window betweencreateExecuteHolder(which registers the operation) andpostStarted()where the operation was registered but still inPendingstate. If an interrupt arrived during this window:ExecuteThreadRunner#interrupt()transitionedstatefromnotStartedtointerruptedvia CASErrorUtils.handleErrorwas called withisInterrupted=true, which calledpostCanceled()postCanceled()threwIllegalStateExceptionbecausePendingwas not in its allowed source statusesExecuteThreadRunner.statewas already in the terminalinterruptedstateThis issue can be reproduced by inserting
Thread.sleep(100)intoSparkConnectExecutionManager#createExecuteHolderAndAttachlike as follows:And then run a test
interrupt all - background queries, foreground interruptinSparkSessionE2ESuite.The fix consists of:
Move
postStarted()intoExecuteThreadRunner#executeInternal()— Previously,postStarted()was called increateExecuteHolderAndAttachbeforestart(), creating a window where an interrupt could race with the status transition. By movingpostStarted()to right after thenotStarted -> startedCAS inexecuteInternal(), the status transition and the CAS are now sequenced — if interrupt wins the CAS (notStarted -> interrupted),postStarted()is never called.Allow
Pending -> CanceledandPending -> Failedtransitions — When interrupt wins the CAS beforepostStarted()is called,ExecuteEventsManager._statusis stillPending. ThepostCanceled()call fromErrorUtils.handleErrorneeds to transition fromPendingtoCanceled. Similarly,postFailed()needs to handle the case wherepostStarted()itself throws an exception (e.g., session state check failure) while_statusis stillPending.Remove plan validation from
postStarted()—postStarted()previously threwUnsupportedOperationExceptionfor unknownOpTypeCasevalues (e.g.,OPTYPE_NOT_SET). This was an implicit validation that doesn't belong inpostStarted(), whose responsibility is status transition and listener event firing. Thecase _branch now falls back torequest.getPlaninstead of throwing, since theplanvariable is only used for generating thestatementtext in the listener event. Actual plan validation is handled byexecuteInternal().Add early plan validation in
createExecuteHolderAndAttach— SincepostStarted()was moved intoexecuteInternal()(change 1) and no longer validates the plan (change 3), invalid plans that previously failed synchronously inpostStarted()would now fail asynchronously inside the execution thread. This means the existingcatchblock increateExecuteHolderAndAttach— which callsremoveExecuteHolderto clean up the holder — would no longer be triggered for invalid plans. To preserve this behavior, an explicitOpTypeCasevalidation is added beforestart(), ensuring that invalid plans are still caught synchronously and the holder is properly removed from theexecutionsmap.Why are the changes needed?
Bug fix.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Add new tests.
I also confirmed that
SparkSessionE2ESuitementioned above succeeded.Was this patch authored or co-authored using generative AI tooling?
Kiro CLI / Opus 4.6