Skip to content

[SPARK-53339][CONNECT] Fix interrupt on pending operations by moving postStarted() and allowing Pending to Canceled/Failed transition#54774

Open
sarutak wants to merge 3 commits intoapache:masterfrom
sarutak:SPARK-53339-2
Open

[SPARK-53339][CONNECT] Fix interrupt on pending operations by moving postStarted() and allowing Pending to Canceled/Failed transition#54774
sarutak wants to merge 3 commits intoapache:masterfrom
sarutak:SPARK-53339-2

Conversation

@sarutak
Copy link
Member

@sarutak sarutak commented Mar 12, 2026

What changes were proposed in this pull request?

This PR aims to solve SPARK-53339 using a different approach than #52083.
The issue is that interrupting an operation in Pending state causes an IllegalStateException and leaves the operation in a broken state where subsequent interrupts never work.
The root cause is that in SparkConnectExecutionManager#createExecuteHolderAndAttach, there was a window between createExecuteHolder (which registers the operation) and postStarted() where the operation was registered but still in Pending state. If an interrupt arrived during this window:

  1. ExecuteThreadRunner#interrupt() transitioned state from notStarted to interrupted via CAS
  2. ErrorUtils.handleError was called with isInterrupted=true, which called postCanceled()
  3. postCanceled() threw IllegalStateException because Pending was not in its allowed source statuses
  4. All subsequent interrupts for the same operation failed silently because ExecuteThreadRunner.state was already in the terminal interrupted state

This issue can be reproduced by inserting Thread.sleep(100) into SparkConnectExecutionManager#createExecuteHolderAndAttach like as follows:

     val executeHolder = createExecuteHolder(executeKey, request, sessionHolder)
     try {
+      Thread.sleep(1000)
       executeHolder.eventsManager.postStarted()
       executeHolder.start()
     } catch {

And then run a test interrupt all - background queries, foreground interrupt in SparkSessionE2ESuite.

$ build/sbt 'connect-client-jvm/testOnly org.apache.spark.sql.connect.SparkSessionE2ESuite -- -z "interrupt all - background queries, foreground interrupt"'

The fix consists of:

  1. Move postStarted() into ExecuteThreadRunner#executeInternal() — Previously, postStarted() was called in createExecuteHolderAndAttach before start(), creating a window where an interrupt could race with the status transition. By moving postStarted() to right after the notStarted -> started CAS in executeInternal(), the status transition and the CAS are now sequenced — if interrupt wins the CAS (notStarted -> interrupted), postStarted() is never called.

  2. Allow Pending -> Canceled and Pending -> Failed transitions — When interrupt wins the CAS before postStarted() is called, ExecuteEventsManager._status is still Pending. The postCanceled() call from ErrorUtils.handleError needs to transition from Pending to Canceled. Similarly, postFailed() needs to handle the case where postStarted() itself throws an exception (e.g., session state check failure) while _status is still Pending.

  3. Remove plan validation from postStarted()postStarted() previously threw UnsupportedOperationException for unknown OpTypeCase values (e.g., OPTYPE_NOT_SET). This was an implicit validation that doesn't belong in postStarted(), whose responsibility is status transition and listener event firing. The case _ branch now falls back to request.getPlan instead of throwing, since the plan variable is only used for generating the statement text in the listener event. Actual plan validation is handled by executeInternal().

  4. Add early plan validation in createExecuteHolderAndAttach — Since postStarted() was moved into executeInternal() (change 1) and no longer validates the plan (change 3), invalid plans that previously failed synchronously in postStarted() would now fail asynchronously inside the execution thread. This means the existing catch block in createExecuteHolderAndAttach — which calls removeExecuteHolder to clean up the holder — would no longer be triggered for invalid plans. To preserve this behavior, an explicit OpTypeCase validation is added before start(), ensuring that invalid plans are still caught synchronously and the holder is properly removed from the executions map.

Why are the changes needed?

Bug fix.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Add new tests.
I also confirmed that SparkSessionE2ESuite mentioned above succeeded.

Was this patch authored or co-authored using generative AI tooling?

Kiro CLI / Opus 4.6

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant