From bfa0e28cb138779c651343f93e4b4ade7036fb74 Mon Sep 17 00:00:00 2001 From: Carlos Ramirez Date: Sat, 7 Mar 2026 14:39:32 -0500 Subject: [PATCH 1/3] fix: cancel Temporal timer when workflow.sleep() task is cancelled --- temporalio/worker/_workflow_instance.py | 7 ++- tests/worker/test_workflow.py | 67 +++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 9c22c05ce..125f2a373 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -1739,10 +1739,13 @@ async def workflow_sleep( else None ) fut = self.create_future() - self._timer_impl( + timer_handle = self._timer_impl( duration, _TimerOptions(user_metadata=user_metadata), - lambda: fut.set_result(None), + lambda: fut.set_result(None) if not fut.done() else None, + ) + fut.add_done_callback( + lambda f: timer_handle.cancel() if f.cancelled() else None ) await fut diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index deedae964..e4f862732 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -3431,6 +3431,73 @@ async def test_workflow_cancel_signal_and_timer_fired_in_same_task( await result_task +@workflow.defn +class CancelWorkflowSleepTaskWorkflow: + """Like CancelSignalAndTimerFiredInSameTaskWorkflow but uses workflow.sleep.""" + + timer_task: asyncio.Task[None] # type: ignore[reportUninitializedInstanceVariable] + + @workflow.run + async def run(self) -> str: + # Start a long workflow.sleep wrapped in a task + self.timer_task = asyncio.create_task(workflow.sleep(60 * 60)) + try: + await self.timer_task + return "timer_completed" + except asyncio.CancelledError: + return "timer_cancelled" + + @workflow.signal + def cancel_timer(self) -> None: + self.timer_task.cancel() + + +async def test_workflow_sleep_task_cancellation( + client: Client, env: WorkflowEnvironment +): + """Cancelling a task wrapping workflow.sleep() should cancel the timer + and not raise InvalidStateError.""" + if not env.supports_time_skipping: + pytest.skip("Need to skip time to validate this test") + + async with await WorkflowEnvironment.start_time_skipping() as env: + # Use max_cached_workflows=0 and restart worker so the signal and + # timer fire arrive in the same activation (mirrors the existing + # CancelSignalAndTimerFiredInSameTaskWorkflow test). + async with new_worker( + client, + CancelWorkflowSleepTaskWorkflow, + max_cached_workflows=0, + ) as worker: + task_queue = worker.task_queue + handle = await client.start_workflow( + CancelWorkflowSleepTaskWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=task_queue, + ) + # Wait so the worker starts the timer + await env.sleep(30 * 60) + + # Listen to result in background so the auto-skipping works + result_task = asyncio.create_task(handle.result()) + + # Send signal to cancel the workflow.sleep task, then advance past timer + await handle.signal(CancelWorkflowSleepTaskWorkflow.cancel_timer) + await env.sleep(60 * 60) + + # Start worker again and wait for completion — previously this would + # log InvalidStateError when the timer callback fired on a cancelled future + async with new_worker( + env.client, + CancelWorkflowSleepTaskWorkflow, + task_queue=task_queue, + max_cached_workflows=0, + ): + result = await result_task + + assert result == "timer_cancelled" + + class MyCustomError(ApplicationError): def __init__(self, message: str) -> None: super().__init__(message, type="MyCustomError", non_retryable=True) From 08a640890e08c88812fb0fbdf2b791a2c60cfe8d Mon Sep 17 00:00:00 2001 From: Carlos Ramirez Date: Mon, 9 Mar 2026 12:14:11 -0400 Subject: [PATCH 2/3] chore: simplify unit test to check for TimerCanceled event in workflow history --- tests/worker/test_workflow.py | 72 ++++++++++++++++------------------- 1 file changed, 33 insertions(+), 39 deletions(-) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index e4f862732..c0377eeda 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -3435,67 +3435,61 @@ async def test_workflow_cancel_signal_and_timer_fired_in_same_task( class CancelWorkflowSleepTaskWorkflow: """Like CancelSignalAndTimerFiredInSameTaskWorkflow but uses workflow.sleep.""" + _ready = False timer_task: asyncio.Task[None] # type: ignore[reportUninitializedInstanceVariable] @workflow.run async def run(self) -> str: - # Start a long workflow.sleep wrapped in a task self.timer_task = asyncio.create_task(workflow.sleep(60 * 60)) + self._ready = True try: await self.timer_task return "timer_completed" except asyncio.CancelledError: return "timer_cancelled" + @workflow.query + def ready(self) -> bool: + return self._ready + @workflow.signal def cancel_timer(self) -> None: self.timer_task.cancel() async def test_workflow_sleep_task_cancellation( - client: Client, env: WorkflowEnvironment + client: Client, ): - """Cancelling a task wrapping workflow.sleep() should cancel the timer - and not raise InvalidStateError.""" - if not env.supports_time_skipping: - pytest.skip("Need to skip time to validate this test") - - async with await WorkflowEnvironment.start_time_skipping() as env: - # Use max_cached_workflows=0 and restart worker so the signal and - # timer fire arrive in the same activation (mirrors the existing - # CancelSignalAndTimerFiredInSameTaskWorkflow test). - async with new_worker( - client, - CancelWorkflowSleepTaskWorkflow, - max_cached_workflows=0, - ) as worker: - task_queue = worker.task_queue - handle = await client.start_workflow( - CancelWorkflowSleepTaskWorkflow.run, - id=f"workflow-{uuid.uuid4()}", - task_queue=task_queue, - ) - # Wait so the worker starts the timer - await env.sleep(30 * 60) + async with new_worker( + client, + CancelWorkflowSleepTaskWorkflow, + ) as worker: + handle = await client.start_workflow( + CancelWorkflowSleepTaskWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) - # Listen to result in background so the auto-skipping works - result_task = asyncio.create_task(handle.result()) + async def ready() -> bool: + return await handle.query(CancelWorkflowSleepTaskWorkflow.ready) - # Send signal to cancel the workflow.sleep task, then advance past timer + await assert_eq_eventually(True, ready) await handle.signal(CancelWorkflowSleepTaskWorkflow.cancel_timer) - await env.sleep(60 * 60) - - # Start worker again and wait for completion — previously this would - # log InvalidStateError when the timer callback fired on a cancelled future - async with new_worker( - env.client, - CancelWorkflowSleepTaskWorkflow, - task_queue=task_queue, - max_cached_workflows=0, - ): - result = await result_task + result = await handle.result() - assert result == "timer_cancelled" + assert result == "timer_cancelled" + # Verify the Temporal timer was actually cancelled on the server + resp = await client.workflow_service.get_workflow_execution_history( + GetWorkflowExecutionHistoryRequest( + namespace=client.namespace, + execution=WorkflowExecution(workflow_id=handle.id), + ) + ) + timer_canceled = any( + e.event_type == EventType.EVENT_TYPE_TIMER_CANCELED + for e in resp.history.events + ) + assert timer_canceled, "Expected TimerCanceled event in history" class MyCustomError(ApplicationError): From 705d140971833dd956f71859e8269dd634a2aa90 Mon Sep 17 00:00:00 2001 From: Carlos Ramirez Date: Mon, 9 Mar 2026 15:33:33 -0400 Subject: [PATCH 3/3] fix: linting --- tests/worker/test_workflow.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index c0377eeda..068716e3f 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -3486,8 +3486,7 @@ async def ready() -> bool: ) ) timer_canceled = any( - e.event_type == EventType.EVENT_TYPE_TIMER_CANCELED - for e in resp.history.events + e.event_type == EventType.EVENT_TYPE_TIMER_CANCELED for e in resp.history.events ) assert timer_canceled, "Expected TimerCanceled event in history"