From 692631f9a09d518f0c90cab1a798854d4bccaa9e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 1 Apr 2026 22:45:28 +0000 Subject: [PATCH 01/19] Initial plan From ec20c67cbcb04fbb561c2f900a406bb81522ce8f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 1 Apr 2026 23:13:49 +0000 Subject: [PATCH 02/19] Implement watchdog-based recovery for stranded workflows - Add Watchdog job that scans for stale pending workflows and recovers them - Fix Workflow re-release loop: redelivered jobs now proceed when status is Running - WorkflowStub::start() kicks watchdog as fast path - WorkflowServiceProvider registers Queue Looping listener for worker liveness - Add watchdog_timeout config option (default 300s) Agent-Logs-Url: https://github.com/durable-workflow/workflow/sessions/4624e0c2-61c6-402b-bc64-bbfbf59a9d7c Co-authored-by: rmcdaniel <1130888+rmcdaniel@users.noreply.github.com> --- src/Providers/WorkflowServiceProvider.php | 12 ++ src/Watchdog.php | 57 +++++++ src/Workflow.php | 10 +- src/WorkflowStub.php | 4 + src/config/workflows.php | 2 + tests/Unit/WatchdogTest.php | 182 ++++++++++++++++++++++ 6 files changed, 265 insertions(+), 2 deletions(-) create mode 100644 src/Watchdog.php create mode 100644 tests/Unit/WatchdogTest.php diff --git a/src/Providers/WorkflowServiceProvider.php b/src/Providers/WorkflowServiceProvider.php index 433f774a..32766b16 100644 --- a/src/Providers/WorkflowServiceProvider.php +++ b/src/Providers/WorkflowServiceProvider.php @@ -4,10 +4,13 @@ namespace Workflow\Providers; +use Illuminate\Queue\Events\Looping; +use Illuminate\Support\Facades\Event; use Illuminate\Support\ServiceProvider; use Laravel\SerializableClosure\SerializableClosure; use Workflow\Commands\ActivityMakeCommand; use Workflow\Commands\WorkflowMakeCommand; +use Workflow\Watchdog; final class WorkflowServiceProvider extends ServiceProvider { @@ -24,5 +27,14 @@ public function boot(): void ], 'migrations'); $this->commands([ActivityMakeCommand::class, WorkflowMakeCommand::class]); + + Event::listen(Looping::class, static function (): void { + static $lastKick = 0; + $now = time(); + if ($now - $lastKick >= 60) { + $lastKick = $now; + Watchdog::kick(); + } + }); } } diff --git a/src/Watchdog.php b/src/Watchdog.php new file mode 100644 index 00000000..613e92f0 --- /dev/null +++ b/src/Watchdog.php @@ -0,0 +1,57 @@ +delay($timeout); + } + } + + public function handle(): void + { + $timeout = (int) config('workflows.watchdog_timeout', 300); + + Cache::put('workflow:watchdog', true, $timeout); + + $model = config('workflows.stored_workflow_model', StoredWorkflow::class); + + $model::where('status', WorkflowPendingStatus::$name) + ->where('updated_at', '<=', now()->subSeconds($timeout)) + ->whereNotNull('arguments') + ->each(static function (StoredWorkflow $storedWorkflow): void { + Cache::lock('laravel_unique_job:' . $storedWorkflow->class . $storedWorkflow->id) + ->forceRelease(); + + $storedWorkflow->class::dispatch($storedWorkflow, ...$storedWorkflow->workflowArguments()); + }); + + $this->release($timeout); + } +} diff --git a/src/Workflow.php b/src/Workflow.php index 20b80b6e..861e6137 100644 --- a/src/Workflow.php +++ b/src/Workflow.php @@ -177,10 +177,16 @@ public function handle(): void $this->storedWorkflow->status->transitionTo(WorkflowRunningStatus::class); } } catch (TransitionNotFound) { - if ($this->storedWorkflow->toWorkflow()->running()) { + $this->storedWorkflow->refresh(); + + if ($this->storedWorkflow->status::class === WorkflowRunningStatus::class) { + // Redelivered after worker crash – proceed with replay. + } elseif ($this->storedWorkflow->toWorkflow()->running()) { $this->release(); + return; + } else { + return; } - return; } $parentWorkflow = $this->storedWorkflow->parents() diff --git a/src/WorkflowStub.php b/src/WorkflowStub.php index 5aabead1..4d6d5c68 100644 --- a/src/WorkflowStub.php +++ b/src/WorkflowStub.php @@ -246,6 +246,10 @@ public function start(...$arguments): void $this->storedWorkflow->arguments = Serializer::serialize($metadata->toArray()); $this->dispatch(); + + if (! static::faked()) { + Watchdog::kick(); + } } public function startAsChild(StoredWorkflow $parentWorkflow, int $index, $now, ...$arguments): void diff --git a/src/config/workflows.php b/src/config/workflows.php index 26fc15cb..e21bce92 100644 --- a/src/config/workflows.php +++ b/src/config/workflows.php @@ -21,6 +21,8 @@ 'prune_age' => '1 month', + 'watchdog_timeout' => 300, + 'webhooks_route' => env('WORKFLOW_WEBHOOKS_ROUTE', 'webhooks'), 'webhook_auth' => [ diff --git a/tests/Unit/WatchdogTest.php b/tests/Unit/WatchdogTest.php new file mode 100644 index 00000000..01cd01b7 --- /dev/null +++ b/tests/Unit/WatchdogTest.php @@ -0,0 +1,182 @@ + TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now()->subSeconds($timeout + 1), + ]); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + Queue::assertPushed(TestSimpleWorkflow::class, static function ($job) use ($storedWorkflow) { + return $job->storedWorkflow->id === $storedWorkflow->id; + }); + } + + public function testHandleIgnoresRecentPendingWorkflows(): void + { + Queue::fake(); + + StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + Queue::assertNotPushed(TestSimpleWorkflow::class); + } + + public function testHandleIgnoresPendingWithoutArguments(): void + { + Queue::fake(); + + $timeout = (int) config('workflows.watchdog_timeout', 300); + + StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now()->subSeconds($timeout + 1), + ]); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + Queue::assertNotPushed(TestSimpleWorkflow::class); + } + + public function testHandleClearsUniqueLockBeforeRedispatch(): void + { + Queue::fake(); + + $timeout = (int) config('workflows.watchdog_timeout', 300); + + $storedWorkflow = StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now()->subSeconds($timeout + 1), + ]); + + $lockKey = 'laravel_unique_job:' . TestSimpleWorkflow::class . $storedWorkflow->id; + Cache::lock($lockKey)->get(); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + $this->assertTrue(Cache::lock($lockKey)->get()); + } + + public function testHandleRefreshesWatchdogMarker(): void + { + Queue::fake(); + + Cache::forget('workflow:watchdog'); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + $this->assertTrue(Cache::has('workflow:watchdog')); + } + + public function testKickDispatchesWhenMarkerAbsent(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + + Watchdog::kick(); + + Queue::assertPushed(Watchdog::class); + $this->assertTrue(Cache::has('workflow:watchdog')); + } + + public function testKickSkipsWhenMarkerPresent(): void + { + Queue::fake(); + Cache::put('workflow:watchdog', true, 300); + + Watchdog::kick(); + + Queue::assertNotPushed(Watchdog::class); + } + + public function testKickIsIdempotent(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + + Watchdog::kick(); + Watchdog::kick(); + Watchdog::kick(); + + Queue::assertPushed(Watchdog::class, 1); + } + + public function testHandleToleratesAlreadyRecoveredWorkflow(): void + { + Queue::fake(); + + $timeout = (int) config('workflows.watchdog_timeout', 300); + + $storedWorkflow = StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now()->subSeconds($timeout + 1), + ]); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + $storedWorkflow->refresh(); + $this->assertSame(WorkflowPendingStatus::$name, $storedWorkflow->status::class === WorkflowPendingStatus::class ? 'pending' : (string) $storedWorkflow->status); + + Queue::assertPushed(TestSimpleWorkflow::class, 1); + } + + public function testWatchdogTimeoutConfig(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + + config(['workflows.watchdog_timeout' => 60]); + + $storedWorkflow = StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now()->subSeconds(61), + ]); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + Queue::assertPushed(TestSimpleWorkflow::class); + } +} From 404857af19661d328e5edeeb80b9960367bb1bc7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 2 Apr 2026 02:17:48 +0000 Subject: [PATCH 03/19] Fix watchdog recovery: prevent duplicates, fix namespace collision, reorder kick - Watchdog now refreshes workflow from DB before recovery (prevents duplicates) - Watchdog touches updated_at before re-dispatch (prevents re-scan) - Uses Cache::lock()->forceRelease() matching Laravel's UniqueLock pattern - Uses Carbon::now() instead of now() to avoid Workflow\now() namespace collision - Kick watchdog before workflow dispatch in start() (covers producer crash) - Add redelivery test, Looping listener tests, comprehensive watchdog tests Agent-Logs-Url: https://github.com/durable-workflow/workflow/sessions/3276cfde-9a62-4b33-abd7-2d821e4c8203 Co-authored-by: rmcdaniel <1130888+rmcdaniel@users.noreply.github.com> --- src/Watchdog.php | 11 ++- src/Workflow.php | 11 +-- src/WorkflowStub.php | 4 +- .../Providers/WorkflowServiceProviderTest.php | 27 ++++++ tests/Unit/WatchdogTest.php | 96 ++++++++++++++----- tests/Unit/WorkflowTest.php | 26 +++++ 6 files changed, 144 insertions(+), 31 deletions(-) diff --git a/src/Watchdog.php b/src/Watchdog.php index 613e92f0..24698762 100644 --- a/src/Watchdog.php +++ b/src/Watchdog.php @@ -9,6 +9,7 @@ use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; +use Illuminate\Support\Carbon; use Illuminate\Support\Facades\Cache; use Workflow\Models\StoredWorkflow; use Workflow\States\WorkflowPendingStatus; @@ -43,9 +44,17 @@ public function handle(): void $model = config('workflows.stored_workflow_model', StoredWorkflow::class); $model::where('status', WorkflowPendingStatus::$name) - ->where('updated_at', '<=', now()->subSeconds($timeout)) + ->where('updated_at', '<=', Carbon::now()->subSeconds($timeout)) ->whereNotNull('arguments') ->each(static function (StoredWorkflow $storedWorkflow): void { + $storedWorkflow->refresh(); + + if ($storedWorkflow->status::class !== WorkflowPendingStatus::class) { + return; + } + + $storedWorkflow->touch(); + Cache::lock('laravel_unique_job:' . $storedWorkflow->class . $storedWorkflow->id) ->forceRelease(); diff --git a/src/Workflow.php b/src/Workflow.php index 861e6137..d1b42c78 100644 --- a/src/Workflow.php +++ b/src/Workflow.php @@ -179,14 +179,13 @@ public function handle(): void } catch (TransitionNotFound) { $this->storedWorkflow->refresh(); - if ($this->storedWorkflow->status::class === WorkflowRunningStatus::class) { - // Redelivered after worker crash – proceed with replay. - } elseif ($this->storedWorkflow->toWorkflow()->running()) { - $this->release(); - return; - } else { + if ($this->storedWorkflow->status::class !== WorkflowRunningStatus::class) { + if ($this->storedWorkflow->toWorkflow()->running()) { + $this->release(); + } return; } + // Redelivered after worker crash – proceed with replay. } $parentWorkflow = $this->storedWorkflow->parents() diff --git a/src/WorkflowStub.php b/src/WorkflowStub.php index 4d6d5c68..e7f7387f 100644 --- a/src/WorkflowStub.php +++ b/src/WorkflowStub.php @@ -245,11 +245,11 @@ public function start(...$arguments): void $this->storedWorkflow->arguments = Serializer::serialize($metadata->toArray()); - $this->dispatch(); - if (! static::faked()) { Watchdog::kick(); } + + $this->dispatch(); } public function startAsChild(StoredWorkflow $parentWorkflow, int $index, $now, ...$arguments): void diff --git a/tests/Unit/Providers/WorkflowServiceProviderTest.php b/tests/Unit/Providers/WorkflowServiceProviderTest.php index 7148c48a..14d828d1 100644 --- a/tests/Unit/Providers/WorkflowServiceProviderTest.php +++ b/tests/Unit/Providers/WorkflowServiceProviderTest.php @@ -4,9 +4,14 @@ namespace Tests\Unit; +use Illuminate\Queue\Events\Looping; use Illuminate\Support\Facades\Artisan; +use Illuminate\Support\Facades\Cache; +use Illuminate\Support\Facades\Event; +use Illuminate\Support\Facades\Queue; use Tests\TestCase; use Workflow\Providers\WorkflowServiceProvider; +use Workflow\Watchdog; final class WorkflowServiceProviderTest extends TestCase { @@ -56,4 +61,26 @@ public function testCommandsAreRegistered(): void ); } } + + public function testLoopingEventKicksWatchdog(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + + Event::dispatch(new Looping('sync', 'default')); + + Queue::assertPushed(Watchdog::class, 1); + } + + public function testLoopingEventThrottlesKick(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + + Event::dispatch(new Looping('sync', 'default')); + Event::dispatch(new Looping('sync', 'default')); + Event::dispatch(new Looping('sync', 'default')); + + Queue::assertPushed(Watchdog::class, 1); + } } diff --git a/tests/Unit/WatchdogTest.php b/tests/Unit/WatchdogTest.php index 01cd01b7..99a17343 100644 --- a/tests/Unit/WatchdogTest.php +++ b/tests/Unit/WatchdogTest.php @@ -4,19 +4,20 @@ namespace Tests\Unit; -use Illuminate\Support\Carbon; use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\Queue; use Tests\Fixtures\TestSimpleWorkflow; use Tests\TestCase; use Workflow\Models\StoredWorkflow; use Workflow\Serializers\Serializer; +use Workflow\States\WorkflowCompletedStatus; use Workflow\States\WorkflowPendingStatus; +use Workflow\States\WorkflowRunningStatus; use Workflow\Watchdog; final class WatchdogTest extends TestCase { - public function testHandleRecoversStalePendingWorkflows(): void + public function testHandleRecoversStalePendingWorkflow(): void { Queue::fake(); @@ -26,15 +27,17 @@ public function testHandleRecoversStalePendingWorkflows(): void 'class' => TestSimpleWorkflow::class, 'arguments' => Serializer::serialize([]), 'status' => WorkflowPendingStatus::$name, - 'updated_at' => now()->subSeconds($timeout + 1), + 'updated_at' => now() + ->subSeconds($timeout + 1), ]); $watchdog = new Watchdog(); $watchdog->handle(); - Queue::assertPushed(TestSimpleWorkflow::class, static function ($job) use ($storedWorkflow) { - return $job->storedWorkflow->id === $storedWorkflow->id; - }); + $storedWorkflow->refresh(); + $this->assertSame(WorkflowPendingStatus::class, $storedWorkflow->status::class); + + Queue::assertPushed(TestSimpleWorkflow::class, 1); } public function testHandleIgnoresRecentPendingWorkflows(): void @@ -62,7 +65,8 @@ public function testHandleIgnoresPendingWithoutArguments(): void StoredWorkflow::create([ 'class' => TestSimpleWorkflow::class, 'status' => WorkflowPendingStatus::$name, - 'updated_at' => now()->subSeconds($timeout + 1), + 'updated_at' => now() + ->subSeconds($timeout + 1), ]); $watchdog = new Watchdog(); @@ -71,7 +75,7 @@ public function testHandleIgnoresPendingWithoutArguments(): void Queue::assertNotPushed(TestSimpleWorkflow::class); } - public function testHandleClearsUniqueLockBeforeRedispatch(): void + public function testHandleSkipsAlreadyRecoveredWorkflow(): void { Queue::fake(); @@ -81,16 +85,38 @@ public function testHandleClearsUniqueLockBeforeRedispatch(): void 'class' => TestSimpleWorkflow::class, 'arguments' => Serializer::serialize([]), 'status' => WorkflowPendingStatus::$name, - 'updated_at' => now()->subSeconds($timeout + 1), + 'updated_at' => now() + ->subSeconds($timeout + 1), ]); - $lockKey = 'laravel_unique_job:' . TestSimpleWorkflow::class . $storedWorkflow->id; - Cache::lock($lockKey)->get(); + $storedWorkflow->update([ + 'status' => WorkflowRunningStatus::$name, + ]); $watchdog = new Watchdog(); $watchdog->handle(); - $this->assertTrue(Cache::lock($lockKey)->get()); + Queue::assertNotPushed(TestSimpleWorkflow::class); + } + + public function testHandleSkipsAlreadyCompletedWorkflow(): void + { + Queue::fake(); + + $timeout = (int) config('workflows.watchdog_timeout', 300); + + StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowCompletedStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ]); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + Queue::assertNotPushed(TestSimpleWorkflow::class); } public function testHandleRefreshesWatchdogMarker(): void @@ -138,7 +164,30 @@ public function testKickIsIdempotent(): void Queue::assertPushed(Watchdog::class, 1); } - public function testHandleToleratesAlreadyRecoveredWorkflow(): void + public function testWatchdogTimeoutConfig(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + + config([ + 'workflows.watchdog_timeout' => 60, + ]); + + StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds(61), + ]); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + Queue::assertPushed(TestSimpleWorkflow::class); + } + + public function testHandleTouchesWorkflowBeforeRedispatch(): void { Queue::fake(); @@ -148,35 +197,38 @@ public function testHandleToleratesAlreadyRecoveredWorkflow(): void 'class' => TestSimpleWorkflow::class, 'arguments' => Serializer::serialize([]), 'status' => WorkflowPendingStatus::$name, - 'updated_at' => now()->subSeconds($timeout + 1), + 'updated_at' => now() + ->subSeconds($timeout + 1), ]); $watchdog = new Watchdog(); $watchdog->handle(); $storedWorkflow->refresh(); - $this->assertSame(WorkflowPendingStatus::$name, $storedWorkflow->status::class === WorkflowPendingStatus::class ? 'pending' : (string) $storedWorkflow->status); - - Queue::assertPushed(TestSimpleWorkflow::class, 1); + $this->assertTrue($storedWorkflow->updated_at->greaterThan(now()->subSeconds(5))); } - public function testWatchdogTimeoutConfig(): void + public function testHandleClearsUniqueLockBeforeRedispatch(): void { Queue::fake(); - Cache::forget('workflow:watchdog'); - config(['workflows.watchdog_timeout' => 60]); + $timeout = (int) config('workflows.watchdog_timeout', 300); $storedWorkflow = StoredWorkflow::create([ 'class' => TestSimpleWorkflow::class, 'arguments' => Serializer::serialize([]), 'status' => WorkflowPendingStatus::$name, - 'updated_at' => now()->subSeconds(61), + 'updated_at' => now() + ->subSeconds($timeout + 1), ]); + $lockKey = 'laravel_unique_job:' . TestSimpleWorkflow::class . $storedWorkflow->id; + Cache::lock($lockKey)->get(); + $watchdog = new Watchdog(); $watchdog->handle(); - Queue::assertPushed(TestSimpleWorkflow::class); + $this->assertTrue(Cache::lock($lockKey)->get()); + Queue::assertPushed(TestSimpleWorkflow::class, 1); } } diff --git a/tests/Unit/WorkflowTest.php b/tests/Unit/WorkflowTest.php index 6314ddcc..9cc80c8b 100644 --- a/tests/Unit/WorkflowTest.php +++ b/tests/Unit/WorkflowTest.php @@ -27,6 +27,8 @@ use Workflow\States\WorkflowContinuedStatus; use Workflow\States\WorkflowFailedStatus; use Workflow\States\WorkflowPendingStatus; +use Workflow\States\WorkflowRunningStatus; +use Workflow\States\WorkflowWaitingStatus; use Workflow\Workflow; use Workflow\WorkflowStub; @@ -470,4 +472,28 @@ public function testContinueAsNewCarriesWorkflowOptions(): void $this->assertSame('sync', $continuedWorkflow->workflowOptions()->connection); $this->assertSame('default', $continuedWorkflow->workflowOptions()->queue); } + + public function testRedeliveredJobResumesFromRunningState(): void + { + $stub = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($stub->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowRunningStatus::$name, + ]); + + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => now(), + 'class' => TestOtherActivity::class, + 'result' => Serializer::serialize('other'), + ]); + + $workflow = new TestWorkflow($storedWorkflow); + $workflow->cancel(); + $workflow->handle(); + + $this->assertSame(WorkflowWaitingStatus::class, $stub->status()); + } } From 813747f4125f2aa0e7bd5a54278b7f41c53dac6f Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Thu, 2 Apr 2026 02:52:24 +0000 Subject: [PATCH 04/19] Harden watchdog recovery bootstrap --- src/Providers/WorkflowServiceProvider.php | 9 +- src/Watchdog.php | 116 +++++++++++++++--- src/WorkflowStub.php | 2 +- .../Providers/WorkflowServiceProviderTest.php | 33 ++++- tests/Unit/WatchdogTest.php | 57 ++++++++- 5 files changed, 182 insertions(+), 35 deletions(-) diff --git a/src/Providers/WorkflowServiceProvider.php b/src/Providers/WorkflowServiceProvider.php index 32766b16..b0542704 100644 --- a/src/Providers/WorkflowServiceProvider.php +++ b/src/Providers/WorkflowServiceProvider.php @@ -28,13 +28,8 @@ public function boot(): void $this->commands([ActivityMakeCommand::class, WorkflowMakeCommand::class]); - Event::listen(Looping::class, static function (): void { - static $lastKick = 0; - $now = time(); - if ($now - $lastKick >= 60) { - $lastKick = $now; - Watchdog::kick(); - } + Event::listen(Looping::class, static function (Looping $event): void { + Watchdog::kickFromWorkerLoop($event->connectionName, $event->queue); }); } } diff --git a/src/Watchdog.php b/src/Watchdog.php index 24698762..5f041c9c 100644 --- a/src/Watchdog.php +++ b/src/Watchdog.php @@ -20,47 +20,129 @@ class Watchdog implements ShouldBeEncrypted, ShouldQueue use InteractsWithQueue; use Queueable; + private const CACHE_KEY = 'workflow:watchdog'; + + private const LOOP_THROTTLE_KEY = 'workflow:watchdog:looping'; + + private const RECOVERY_LOCK_PREFIX = 'workflow:watchdog:recovering:'; + public int $tries = 0; public int $maxExceptions = 0; public $timeout = 0; - public static function kick(): void + public static function kick(?string $connection = null, ?string $queue = null): void { - $timeout = (int) config('workflows.watchdog_timeout', 300); + $timeout = self::timeout(); + + if (Cache::add(self::CACHE_KEY, true, self::bootstrapWindow($timeout))) { + $dispatch = static::dispatch() + ->afterCommit() + ->delay($timeout); + + if ($connection !== null) { + $dispatch->onConnection($connection); + } + + $queue = self::normalizeQueue($queue); - if (Cache::add('workflow:watchdog', true, $timeout)) { - static::dispatch()->delay($timeout); + if ($queue !== null) { + $dispatch->onQueue($queue); + } } } + public static function kickFromWorkerLoop(string $connection, ?string $queue): void + { + if (Cache::has(self::CACHE_KEY)) { + return; + } + + if (! Cache::add(self::LOOP_THROTTLE_KEY, true, 60)) { + return; + } + + if (! self::hasRecoverablePendingWorkflows(self::timeout())) { + return; + } + + static::kick($connection, $queue); + } + public function handle(): void { - $timeout = (int) config('workflows.watchdog_timeout', 300); + $timeout = self::timeout(); - Cache::put('workflow:watchdog', true, $timeout); + Cache::put(self::CACHE_KEY, true, $timeout); $model = config('workflows.stored_workflow_model', StoredWorkflow::class); $model::where('status', WorkflowPendingStatus::$name) ->where('updated_at', '<=', Carbon::now()->subSeconds($timeout)) ->whereNotNull('arguments') - ->each(static function (StoredWorkflow $storedWorkflow): void { - $storedWorkflow->refresh(); + ->each(static fn (StoredWorkflow $storedWorkflow): bool => self::recover($storedWorkflow, $timeout)); - if ($storedWorkflow->status::class !== WorkflowPendingStatus::class) { - return; - } + $this->release($timeout); + } - $storedWorkflow->touch(); + private static function recover(StoredWorkflow $storedWorkflow, int $timeout): bool + { + $storedWorkflow->refresh(); - Cache::lock('laravel_unique_job:' . $storedWorkflow->class . $storedWorkflow->id) - ->forceRelease(); + if ($storedWorkflow->status::class !== WorkflowPendingStatus::class) { + return false; + } - $storedWorkflow->class::dispatch($storedWorkflow, ...$storedWorkflow->workflowArguments()); - }); + if (! Cache::lock(self::RECOVERY_LOCK_PREFIX . $storedWorkflow->id, $timeout)->get()) { + return false; + } - $this->release($timeout); + $storedWorkflow->touch(); + + Cache::lock('laravel_unique_job:' . $storedWorkflow->class . $storedWorkflow->id) + ->forceRelease(); + + $storedWorkflow->toWorkflow() + ->resume(); + + return true; + } + + private static function timeout(): int + { + return (int) config('workflows.watchdog_timeout', 300); + } + + private static function hasRecoverablePendingWorkflows(int $timeout): bool + { + $model = config('workflows.stored_workflow_model', StoredWorkflow::class); + + return $model::where('status', WorkflowPendingStatus::$name) + ->where('updated_at', '<=', Carbon::now()->subSeconds($timeout)) + ->whereNotNull('arguments') + ->exists(); + } + + private static function bootstrapWindow(int $timeout): int + { + return max(1, min($timeout, 60)); + } + + private static function normalizeQueue(?string $queue): ?string + { + if ($queue === null) { + return null; + } + + foreach (explode(',', $queue) as $candidate) { + $candidate = trim($candidate); + + if ($candidate !== '') { + return $candidate; + } + } + + return null; } } diff --git a/src/WorkflowStub.php b/src/WorkflowStub.php index e7f7387f..d8929bc2 100644 --- a/src/WorkflowStub.php +++ b/src/WorkflowStub.php @@ -246,7 +246,7 @@ public function start(...$arguments): void $this->storedWorkflow->arguments = Serializer::serialize($metadata->toArray()); if (! static::faked()) { - Watchdog::kick(); + Watchdog::kick($this->storedWorkflow->effectiveConnection(), $this->storedWorkflow->effectiveQueue()); } $this->dispatch(); diff --git a/tests/Unit/Providers/WorkflowServiceProviderTest.php b/tests/Unit/Providers/WorkflowServiceProviderTest.php index 14d828d1..837b863a 100644 --- a/tests/Unit/Providers/WorkflowServiceProviderTest.php +++ b/tests/Unit/Providers/WorkflowServiceProviderTest.php @@ -9,8 +9,12 @@ use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\Event; use Illuminate\Support\Facades\Queue; +use Tests\Fixtures\TestSimpleWorkflow; use Tests\TestCase; +use Workflow\Models\StoredWorkflow; use Workflow\Providers\WorkflowServiceProvider; +use Workflow\Serializers\Serializer; +use Workflow\States\WorkflowPendingStatus; use Workflow\Watchdog; final class WorkflowServiceProviderTest extends TestCase @@ -67,9 +71,20 @@ public function testLoopingEventKicksWatchdog(): void Queue::fake(); Cache::forget('workflow:watchdog'); - Event::dispatch(new Looping('sync', 'default')); + StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds((int) config('workflows.watchdog_timeout', 300) + 1), + ]); - Queue::assertPushed(Watchdog::class, 1); + Event::dispatch(new Looping('redis', 'high,default')); + + Queue::assertPushed(Watchdog::class, static function (Watchdog $watchdog): bool { + return $watchdog->connection === 'redis' + && $watchdog->queue === 'high'; + }); } public function testLoopingEventThrottlesKick(): void @@ -77,9 +92,17 @@ public function testLoopingEventThrottlesKick(): void Queue::fake(); Cache::forget('workflow:watchdog'); - Event::dispatch(new Looping('sync', 'default')); - Event::dispatch(new Looping('sync', 'default')); - Event::dispatch(new Looping('sync', 'default')); + StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds((int) config('workflows.watchdog_timeout', 300) + 1), + ]); + + Event::dispatch(new Looping('redis', 'high,default')); + Event::dispatch(new Looping('redis', 'high,default')); + Event::dispatch(new Looping('redis', 'high,default')); Queue::assertPushed(Watchdog::class, 1); } diff --git a/tests/Unit/WatchdogTest.php b/tests/Unit/WatchdogTest.php index 99a17343..b2dcd49d 100644 --- a/tests/Unit/WatchdogTest.php +++ b/tests/Unit/WatchdogTest.php @@ -37,7 +37,10 @@ public function testHandleRecoversStalePendingWorkflow(): void $storedWorkflow->refresh(); $this->assertSame(WorkflowPendingStatus::class, $storedWorkflow->status::class); - Queue::assertPushed(TestSimpleWorkflow::class, 1); + Queue::assertPushed(TestSimpleWorkflow::class, static function (TestSimpleWorkflow $workflow): bool { + return $workflow->connection === 'redis' + && $workflow->queue === 'default'; + }); } public function testHandleIgnoresRecentPendingWorkflows(): void @@ -142,6 +145,19 @@ public function testKickDispatchesWhenMarkerAbsent(): void $this->assertTrue(Cache::has('workflow:watchdog')); } + public function testKickUsesRequestedConnectionAndQueue(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + + Watchdog::kick('redis', 'high,default'); + + Queue::assertPushed(Watchdog::class, static function (Watchdog $watchdog): bool { + return $watchdog->connection === 'redis' + && $watchdog->queue === 'high'; + }); + } + public function testKickSkipsWhenMarkerPresent(): void { Queue::fake(); @@ -208,7 +224,7 @@ public function testHandleTouchesWorkflowBeforeRedispatch(): void $this->assertTrue($storedWorkflow->updated_at->greaterThan(now()->subSeconds(5))); } - public function testHandleClearsUniqueLockBeforeRedispatch(): void + public function testHandleRecoversPendingWorkflowOnStoredConnectionAndQueue(): void { Queue::fake(); @@ -216,7 +232,13 @@ public function testHandleClearsUniqueLockBeforeRedispatch(): void $storedWorkflow = StoredWorkflow::create([ 'class' => TestSimpleWorkflow::class, - 'arguments' => Serializer::serialize([]), + 'arguments' => Serializer::serialize([ + 'arguments' => [], + 'options' => [ + 'connection' => 'sync', + 'queue' => 'high', + ], + ]), 'status' => WorkflowPendingStatus::$name, 'updated_at' => now() ->subSeconds($timeout + 1), @@ -228,7 +250,32 @@ public function testHandleClearsUniqueLockBeforeRedispatch(): void $watchdog = new Watchdog(); $watchdog->handle(); - $this->assertTrue(Cache::lock($lockKey)->get()); - Queue::assertPushed(TestSimpleWorkflow::class, 1); + Queue::assertPushed(TestSimpleWorkflow::class, static function (TestSimpleWorkflow $workflow): bool { + return $workflow->connection === 'sync' + && $workflow->queue === 'high'; + }); + } + + public function testHandleSkipsWorkflowAlreadyClaimedForRecovery(): void + { + Queue::fake(); + + $timeout = (int) config('workflows.watchdog_timeout', 300); + + $storedWorkflow = StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ]); + + Cache::lock('workflow:watchdog:recovering:' . $storedWorkflow->id, $timeout) + ->get(); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + Queue::assertNotPushed(TestSimpleWorkflow::class); } } From 7d7550327d7230277c897ca29bb7e6a59d8f5798 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Thu, 2 Apr 2026 04:00:27 +0000 Subject: [PATCH 05/19] Cover watchdog loop early exits --- .../Providers/WorkflowServiceProviderTest.php | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tests/Unit/Providers/WorkflowServiceProviderTest.php b/tests/Unit/Providers/WorkflowServiceProviderTest.php index 837b863a..7c023d33 100644 --- a/tests/Unit/Providers/WorkflowServiceProviderTest.php +++ b/tests/Unit/Providers/WorkflowServiceProviderTest.php @@ -106,4 +106,34 @@ public function testLoopingEventThrottlesKick(): void Queue::assertPushed(Watchdog::class, 1); } + + public function testLoopingEventSkipsWhenThrottleAlreadyHeld(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + Cache::put('workflow:watchdog:looping', true, 60); + + StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds((int) config('workflows.watchdog_timeout', 300) + 1), + ]); + + Event::dispatch(new Looping('redis', 'high,default')); + + Queue::assertNotPushed(Watchdog::class); + } + + public function testLoopingEventSkipsWhenNoRecoverablePendingWorkflowsExist(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + Cache::forget('workflow:watchdog:looping'); + + Event::dispatch(new Looping('redis', 'high,default')); + + Queue::assertNotPushed(Watchdog::class); + } } From ce0766354d78b86f59f9430d45ef618a92cc704a Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Thu, 2 Apr 2026 04:20:07 +0000 Subject: [PATCH 06/19] Cover remaining watchdog branches --- tests/Unit/WatchdogTest.php | 48 +++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/tests/Unit/WatchdogTest.php b/tests/Unit/WatchdogTest.php index b2dcd49d..4f988962 100644 --- a/tests/Unit/WatchdogTest.php +++ b/tests/Unit/WatchdogTest.php @@ -158,6 +158,19 @@ public function testKickUsesRequestedConnectionAndQueue(): void }); } + public function testKickLeavesQueueUnsetWhenQueueStringHasNoUsableQueue(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + + Watchdog::kick('redis', ' , '); + + Queue::assertPushed(Watchdog::class, static function (Watchdog $watchdog): bool { + return $watchdog->connection === 'redis' + && $watchdog->queue === null; + }); + } + public function testKickSkipsWhenMarkerPresent(): void { Queue::fake(); @@ -278,4 +291,39 @@ public function testHandleSkipsWorkflowAlreadyClaimedForRecovery(): void Queue::assertNotPushed(TestSimpleWorkflow::class); } + + public function testHandleSkipsWorkflowThatStopsBeingPendingAfterRefresh(): void + { + Queue::fake(); + + $timeout = (int) config('workflows.watchdog_timeout', 300); + $modelClass = new class() extends StoredWorkflow { + public function refresh(): static + { + $this->status = WorkflowRunningStatus::$name; + + return $this; + } + }; + $modelClassName = get_class($modelClass); + + config([ + 'workflows.stored_workflow_model' => $modelClassName, + ]); + + $storedWorkflow = $modelClassName::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ]); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + $storedWorkflow->refresh(); + + Queue::assertNotPushed(TestSimpleWorkflow::class); + } } From 567f21764659c43027908bda9ecd9c824a416872 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Thu, 2 Apr 2026 04:55:36 +0000 Subject: [PATCH 07/19] Harden watchdog recovery loop --- src/Watchdog.php | 30 +++++++++------- tests/Unit/WatchdogTest.php | 70 +++++++++++++++++++++++++++++++++++-- 2 files changed, 86 insertions(+), 14 deletions(-) diff --git a/src/Watchdog.php b/src/Watchdog.php index 5f041c9c..d70ef8aa 100644 --- a/src/Watchdog.php +++ b/src/Watchdog.php @@ -5,6 +5,7 @@ namespace Workflow; use Illuminate\Bus\Queueable; +use Illuminate\Bus\UniqueLock; use Illuminate\Contracts\Queue\ShouldBeEncrypted; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; @@ -36,7 +37,7 @@ public static function kick(?string $connection = null, ?string $queue = null): { $timeout = self::timeout(); - if (Cache::add(self::CACHE_KEY, true, self::bootstrapWindow($timeout))) { + if (Cache::add(self::CACHE_KEY, true, $timeout)) { $dispatch = static::dispatch() ->afterCommit() ->delay($timeout); @@ -81,9 +82,13 @@ public function handle(): void $model::where('status', WorkflowPendingStatus::$name) ->where('updated_at', '<=', Carbon::now()->subSeconds($timeout)) ->whereNotNull('arguments') - ->each(static fn (StoredWorkflow $storedWorkflow): bool => self::recover($storedWorkflow, $timeout)); + ->each(static function (StoredWorkflow $storedWorkflow) use ($timeout): void { + self::recover($storedWorkflow, $timeout); + }); - $this->release($timeout); + if ($this->job !== null) { + $this->release($timeout); + } } private static function recover(StoredWorkflow $storedWorkflow, int $timeout): bool @@ -94,19 +99,20 @@ private static function recover(StoredWorkflow $storedWorkflow, int $timeout): b return false; } - if (! Cache::lock(self::RECOVERY_LOCK_PREFIX . $storedWorkflow->id, $timeout)->get()) { - return false; - } + $claimTtl = self::bootstrapWindow($timeout); + $workflowStub = $storedWorkflow->toWorkflow(); + $workflowJob = new $storedWorkflow->class($storedWorkflow, ...$storedWorkflow->workflowArguments()); - $storedWorkflow->touch(); + return (bool) (Cache::lock(self::RECOVERY_LOCK_PREFIX . $storedWorkflow->id, $claimTtl) + ->get(static function () use ($storedWorkflow, $workflowJob, $workflowStub): bool { + $storedWorkflow->touch(); - Cache::lock('laravel_unique_job:' . $storedWorkflow->class . $storedWorkflow->id) - ->forceRelease(); + (new UniqueLock(Cache::driver()))->release($workflowJob); - $storedWorkflow->toWorkflow() - ->resume(); + $workflowStub->resume(); - return true; + return true; + }) ?? false); } private static function timeout(): int diff --git a/tests/Unit/WatchdogTest.php b/tests/Unit/WatchdogTest.php index 4f988962..70045b45 100644 --- a/tests/Unit/WatchdogTest.php +++ b/tests/Unit/WatchdogTest.php @@ -4,6 +4,7 @@ namespace Tests\Unit; +use Illuminate\Contracts\Queue\Job as JobContract; use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\Queue; use Tests\Fixtures\TestSimpleWorkflow; @@ -257,8 +258,7 @@ public function testHandleRecoversPendingWorkflowOnStoredConnectionAndQueue(): v ->subSeconds($timeout + 1), ]); - $lockKey = 'laravel_unique_job:' . TestSimpleWorkflow::class . $storedWorkflow->id; - Cache::lock($lockKey)->get(); + Cache::lock('laravel_unique_job:' . TestSimpleWorkflow::class . $storedWorkflow->id)->get(); $watchdog = new Watchdog(); $watchdog->handle(); @@ -269,6 +269,37 @@ public function testHandleRecoversPendingWorkflowOnStoredConnectionAndQueue(): v }); } + public function testHandleContinuesScanningAfterSkippedWorkflow(): void + { + Queue::fake(); + + $timeout = (int) config('workflows.watchdog_timeout', 300); + + $skippedWorkflow = StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ]); + + StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ]); + + Cache::lock('workflow:watchdog:recovering:' . $skippedWorkflow->id, $timeout) + ->get(); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + Queue::assertPushed(TestSimpleWorkflow::class, 1); + } + public function testHandleSkipsWorkflowAlreadyClaimedForRecovery(): void { Queue::fake(); @@ -292,6 +323,26 @@ public function testHandleSkipsWorkflowAlreadyClaimedForRecovery(): void Queue::assertNotPushed(TestSimpleWorkflow::class); } + public function testHandleReleasesRecoveryClaimAfterRecoveringWorkflow(): void + { + Queue::fake(); + + $timeout = (int) config('workflows.watchdog_timeout', 300); + + $storedWorkflow = StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ]); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + $this->assertTrue(Cache::lock('workflow:watchdog:recovering:' . $storedWorkflow->id, 1)->get()); + } + public function testHandleSkipsWorkflowThatStopsBeingPendingAfterRefresh(): void { Queue::fake(); @@ -326,4 +377,19 @@ public function refresh(): static Queue::assertNotPushed(TestSimpleWorkflow::class); } + + public function testHandleReleasesCurrentJobWhenRunningOnQueue(): void + { + Queue::fake(); + + $timeout = (int) config('workflows.watchdog_timeout', 300); + $job = $this->createMock(JobContract::class); + $job->expects($this->once()) + ->method('release') + ->with($timeout); + + $watchdog = new Watchdog(); + $watchdog->setJob($job); + $watchdog->handle(); + } } From a661f6e023b5acfaf634bf7af5bc5465ca8b5b41 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Thu, 2 Apr 2026 05:28:36 +0000 Subject: [PATCH 08/19] Simplify watchdog wake-up flow --- src/Providers/WorkflowServiceProvider.php | 2 +- src/Watchdog.php | 38 +++++------- src/WorkflowStub.php | 4 -- .../Providers/WorkflowServiceProviderTest.php | 4 +- tests/Unit/WatchdogTest.php | 59 +++++++++++++++---- 5 files changed, 66 insertions(+), 41 deletions(-) diff --git a/src/Providers/WorkflowServiceProvider.php b/src/Providers/WorkflowServiceProvider.php index b0542704..2a050857 100644 --- a/src/Providers/WorkflowServiceProvider.php +++ b/src/Providers/WorkflowServiceProvider.php @@ -29,7 +29,7 @@ public function boot(): void $this->commands([ActivityMakeCommand::class, WorkflowMakeCommand::class]); Event::listen(Looping::class, static function (Looping $event): void { - Watchdog::kickFromWorkerLoop($event->connectionName, $event->queue); + Watchdog::wake($event->connectionName, $event->queue); }); } } diff --git a/src/Watchdog.php b/src/Watchdog.php index d70ef8aa..ae12c3a9 100644 --- a/src/Watchdog.php +++ b/src/Watchdog.php @@ -33,29 +33,10 @@ class Watchdog implements ShouldBeEncrypted, ShouldQueue public $timeout = 0; - public static function kick(?string $connection = null, ?string $queue = null): void + public static function wake(string $connection, ?string $queue = null): void { $timeout = self::timeout(); - if (Cache::add(self::CACHE_KEY, true, $timeout)) { - $dispatch = static::dispatch() - ->afterCommit() - ->delay($timeout); - - if ($connection !== null) { - $dispatch->onConnection($connection); - } - - $queue = self::normalizeQueue($queue); - - if ($queue !== null) { - $dispatch->onQueue($queue); - } - } - } - - public static function kickFromWorkerLoop(string $connection, ?string $queue): void - { if (Cache::has(self::CACHE_KEY)) { return; } @@ -64,11 +45,24 @@ public static function kickFromWorkerLoop(string $connection, ?string $queue): v return; } - if (! self::hasRecoverablePendingWorkflows(self::timeout())) { + if (! self::hasRecoverablePendingWorkflows($timeout)) { + return; + } + + if (! Cache::add(self::CACHE_KEY, true, $timeout)) { return; } - static::kick($connection, $queue); + $dispatch = static::dispatch() + ->afterCommit() + ->delay($timeout) + ->onConnection($connection); + + $queue = self::normalizeQueue($queue); + + if ($queue !== null) { + $dispatch->onQueue($queue); + } } public function handle(): void diff --git a/src/WorkflowStub.php b/src/WorkflowStub.php index d8929bc2..5aabead1 100644 --- a/src/WorkflowStub.php +++ b/src/WorkflowStub.php @@ -245,10 +245,6 @@ public function start(...$arguments): void $this->storedWorkflow->arguments = Serializer::serialize($metadata->toArray()); - if (! static::faked()) { - Watchdog::kick($this->storedWorkflow->effectiveConnection(), $this->storedWorkflow->effectiveQueue()); - } - $this->dispatch(); } diff --git a/tests/Unit/Providers/WorkflowServiceProviderTest.php b/tests/Unit/Providers/WorkflowServiceProviderTest.php index 7c023d33..b713bfa3 100644 --- a/tests/Unit/Providers/WorkflowServiceProviderTest.php +++ b/tests/Unit/Providers/WorkflowServiceProviderTest.php @@ -66,7 +66,7 @@ public function testCommandsAreRegistered(): void } } - public function testLoopingEventKicksWatchdog(): void + public function testLoopingEventWakesWatchdog(): void { Queue::fake(); Cache::forget('workflow:watchdog'); @@ -87,7 +87,7 @@ public function testLoopingEventKicksWatchdog(): void }); } - public function testLoopingEventThrottlesKick(): void + public function testLoopingEventThrottlesWake(): void { Queue::fake(); Cache::forget('workflow:watchdog'); diff --git a/tests/Unit/WatchdogTest.php b/tests/Unit/WatchdogTest.php index 70045b45..c190b72f 100644 --- a/tests/Unit/WatchdogTest.php +++ b/tests/Unit/WatchdogTest.php @@ -135,23 +135,27 @@ public function testHandleRefreshesWatchdogMarker(): void $this->assertTrue(Cache::has('workflow:watchdog')); } - public function testKickDispatchesWhenMarkerAbsent(): void + public function testWakeDispatchesWhenPendingWorkflowNeedsRecovery(): void { Queue::fake(); Cache::forget('workflow:watchdog'); - Watchdog::kick(); + $this->createStalePendingWorkflow(); + + Watchdog::wake('redis'); Queue::assertPushed(Watchdog::class); $this->assertTrue(Cache::has('workflow:watchdog')); } - public function testKickUsesRequestedConnectionAndQueue(): void + public function testWakeUsesRequestedConnectionAndQueue(): void { Queue::fake(); Cache::forget('workflow:watchdog'); - Watchdog::kick('redis', 'high,default'); + $this->createStalePendingWorkflow(); + + Watchdog::wake('redis', 'high,default'); Queue::assertPushed(Watchdog::class, static function (Watchdog $watchdog): bool { return $watchdog->connection === 'redis' @@ -159,12 +163,14 @@ public function testKickUsesRequestedConnectionAndQueue(): void }); } - public function testKickLeavesQueueUnsetWhenQueueStringHasNoUsableQueue(): void + public function testWakeLeavesQueueUnsetWhenQueueStringHasNoUsableQueue(): void { Queue::fake(); Cache::forget('workflow:watchdog'); - Watchdog::kick('redis', ' , '); + $this->createStalePendingWorkflow(); + + Watchdog::wake('redis', ' , '); Queue::assertPushed(Watchdog::class, static function (Watchdog $watchdog): bool { return $watchdog->connection === 'redis' @@ -172,24 +178,40 @@ public function testKickLeavesQueueUnsetWhenQueueStringHasNoUsableQueue(): void }); } - public function testKickSkipsWhenMarkerPresent(): void + public function testWakeSkipsWhenMarkerPresent(): void { Queue::fake(); Cache::put('workflow:watchdog', true, 300); - Watchdog::kick(); + $this->createStalePendingWorkflow(); + + Watchdog::wake('redis'); + + Queue::assertNotPushed(Watchdog::class); + } + + public function testWakeSkipsWhenNoRecoverablePendingWorkflowsExist(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + Cache::forget('workflow:watchdog:looping'); + + Watchdog::wake('redis'); Queue::assertNotPushed(Watchdog::class); } - public function testKickIsIdempotent(): void + public function testWakeIsIdempotent(): void { Queue::fake(); Cache::forget('workflow:watchdog'); + Cache::forget('workflow:watchdog:looping'); - Watchdog::kick(); - Watchdog::kick(); - Watchdog::kick(); + $this->createStalePendingWorkflow(); + + Watchdog::wake('redis'); + Watchdog::wake('redis'); + Watchdog::wake('redis'); Queue::assertPushed(Watchdog::class, 1); } @@ -392,4 +414,17 @@ public function testHandleReleasesCurrentJobWhenRunningOnQueue(): void $watchdog->setJob($job); $watchdog->handle(); } + + private function createStalePendingWorkflow(array $attributes = []): StoredWorkflow + { + $timeout = (int) config('workflows.watchdog_timeout', 300); + + return StoredWorkflow::create(array_merge([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ], $attributes)); + } } From 41d262e3d9024233f9650ec2d289a2dbb24e4d22 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Thu, 2 Apr 2026 06:22:39 +0000 Subject: [PATCH 09/19] Simplify watchdog defaults and test flow --- composer.json | 5 +- src/Watchdog.php | 4 +- src/Workflow.php | 1 - src/config/workflows.php | 2 - .../Providers/WorkflowServiceProviderTest.php | 18 +++++- tests/Unit/WatchdogTest.php | 63 +++++++++---------- 6 files changed, 50 insertions(+), 43 deletions(-) diff --git a/composer.json b/composer.json index 5af7e245..a67f5a68 100644 --- a/composer.json +++ b/composer.json @@ -22,7 +22,10 @@ "scripts": { "ecs": "vendor/bin/ecs check --fix", "stan": "vendor/bin/phpstan analyse src tests", - "feature": "phpunit --testdox --testsuite feature", + "feature": [ + "Composer\\Config::disableProcessTimeout", + "phpunit --testdox --testsuite feature" + ], "unit": "phpunit --testdox --testsuite unit", "test": "phpunit --testdox", "coverage": "XDEBUG_MODE=coverage phpunit --testdox --testsuite unit --coverage-clover coverage.xml", diff --git a/src/Watchdog.php b/src/Watchdog.php index ae12c3a9..9ff40100 100644 --- a/src/Watchdog.php +++ b/src/Watchdog.php @@ -21,6 +21,8 @@ class Watchdog implements ShouldBeEncrypted, ShouldQueue use InteractsWithQueue; use Queueable; + public const DEFAULT_TIMEOUT = 300; + private const CACHE_KEY = 'workflow:watchdog'; private const LOOP_THROTTLE_KEY = 'workflow:watchdog:looping'; @@ -111,7 +113,7 @@ private static function recover(StoredWorkflow $storedWorkflow, int $timeout): b private static function timeout(): int { - return (int) config('workflows.watchdog_timeout', 300); + return self::DEFAULT_TIMEOUT; } private static function hasRecoverablePendingWorkflows(int $timeout): bool diff --git a/src/Workflow.php b/src/Workflow.php index d1b42c78..6f71ff4d 100644 --- a/src/Workflow.php +++ b/src/Workflow.php @@ -185,7 +185,6 @@ public function handle(): void } return; } - // Redelivered after worker crash – proceed with replay. } $parentWorkflow = $this->storedWorkflow->parents() diff --git a/src/config/workflows.php b/src/config/workflows.php index e21bce92..26fc15cb 100644 --- a/src/config/workflows.php +++ b/src/config/workflows.php @@ -21,8 +21,6 @@ 'prune_age' => '1 month', - 'watchdog_timeout' => 300, - 'webhooks_route' => env('WORKFLOW_WEBHOOKS_ROUTE', 'webhooks'), 'webhook_auth' => [ diff --git a/tests/Unit/Providers/WorkflowServiceProviderTest.php b/tests/Unit/Providers/WorkflowServiceProviderTest.php index b713bfa3..69d871cd 100644 --- a/tests/Unit/Providers/WorkflowServiceProviderTest.php +++ b/tests/Unit/Providers/WorkflowServiceProviderTest.php @@ -22,9 +22,21 @@ final class WorkflowServiceProviderTest extends TestCase protected function setUp(): void { parent::setUp(); + + Cache::forget('workflow:watchdog'); + Cache::forget('workflow:watchdog:looping'); + $this->app->register(WorkflowServiceProvider::class); } + protected function tearDown(): void + { + Cache::forget('workflow:watchdog'); + Cache::forget('workflow:watchdog:looping'); + + parent::tearDown(); + } + public function testProviderLoads(): void { $this->assertTrue( @@ -76,7 +88,7 @@ public function testLoopingEventWakesWatchdog(): void 'arguments' => Serializer::serialize([]), 'status' => WorkflowPendingStatus::$name, 'updated_at' => now() - ->subSeconds((int) config('workflows.watchdog_timeout', 300) + 1), + ->subSeconds(Watchdog::DEFAULT_TIMEOUT + 1), ]); Event::dispatch(new Looping('redis', 'high,default')); @@ -97,7 +109,7 @@ public function testLoopingEventThrottlesWake(): void 'arguments' => Serializer::serialize([]), 'status' => WorkflowPendingStatus::$name, 'updated_at' => now() - ->subSeconds((int) config('workflows.watchdog_timeout', 300) + 1), + ->subSeconds(Watchdog::DEFAULT_TIMEOUT + 1), ]); Event::dispatch(new Looping('redis', 'high,default')); @@ -118,7 +130,7 @@ public function testLoopingEventSkipsWhenThrottleAlreadyHeld(): void 'arguments' => Serializer::serialize([]), 'status' => WorkflowPendingStatus::$name, 'updated_at' => now() - ->subSeconds((int) config('workflows.watchdog_timeout', 300) + 1), + ->subSeconds(Watchdog::DEFAULT_TIMEOUT + 1), ]); Event::dispatch(new Looping('redis', 'high,default')); diff --git a/tests/Unit/WatchdogTest.php b/tests/Unit/WatchdogTest.php index c190b72f..ea7b340b 100644 --- a/tests/Unit/WatchdogTest.php +++ b/tests/Unit/WatchdogTest.php @@ -18,11 +18,27 @@ final class WatchdogTest extends TestCase { + protected function setUp(): void + { + parent::setUp(); + + Cache::forget('workflow:watchdog'); + Cache::forget('workflow:watchdog:looping'); + } + + protected function tearDown(): void + { + Cache::forget('workflow:watchdog'); + Cache::forget('workflow:watchdog:looping'); + + parent::tearDown(); + } + public function testHandleRecoversStalePendingWorkflow(): void { Queue::fake(); - $timeout = (int) config('workflows.watchdog_timeout', 300); + $timeout = Watchdog::DEFAULT_TIMEOUT; $storedWorkflow = StoredWorkflow::create([ 'class' => TestSimpleWorkflow::class, @@ -64,7 +80,7 @@ public function testHandleIgnoresPendingWithoutArguments(): void { Queue::fake(); - $timeout = (int) config('workflows.watchdog_timeout', 300); + $timeout = Watchdog::DEFAULT_TIMEOUT; StoredWorkflow::create([ 'class' => TestSimpleWorkflow::class, @@ -83,7 +99,7 @@ public function testHandleSkipsAlreadyRecoveredWorkflow(): void { Queue::fake(); - $timeout = (int) config('workflows.watchdog_timeout', 300); + $timeout = Watchdog::DEFAULT_TIMEOUT; $storedWorkflow = StoredWorkflow::create([ 'class' => TestSimpleWorkflow::class, @@ -107,7 +123,7 @@ public function testHandleSkipsAlreadyCompletedWorkflow(): void { Queue::fake(); - $timeout = (int) config('workflows.watchdog_timeout', 300); + $timeout = Watchdog::DEFAULT_TIMEOUT; StoredWorkflow::create([ 'class' => TestSimpleWorkflow::class, @@ -216,34 +232,11 @@ public function testWakeIsIdempotent(): void Queue::assertPushed(Watchdog::class, 1); } - public function testWatchdogTimeoutConfig(): void - { - Queue::fake(); - Cache::forget('workflow:watchdog'); - - config([ - 'workflows.watchdog_timeout' => 60, - ]); - - StoredWorkflow::create([ - 'class' => TestSimpleWorkflow::class, - 'arguments' => Serializer::serialize([]), - 'status' => WorkflowPendingStatus::$name, - 'updated_at' => now() - ->subSeconds(61), - ]); - - $watchdog = new Watchdog(); - $watchdog->handle(); - - Queue::assertPushed(TestSimpleWorkflow::class); - } - public function testHandleTouchesWorkflowBeforeRedispatch(): void { Queue::fake(); - $timeout = (int) config('workflows.watchdog_timeout', 300); + $timeout = Watchdog::DEFAULT_TIMEOUT; $storedWorkflow = StoredWorkflow::create([ 'class' => TestSimpleWorkflow::class, @@ -264,7 +257,7 @@ public function testHandleRecoversPendingWorkflowOnStoredConnectionAndQueue(): v { Queue::fake(); - $timeout = (int) config('workflows.watchdog_timeout', 300); + $timeout = Watchdog::DEFAULT_TIMEOUT; $storedWorkflow = StoredWorkflow::create([ 'class' => TestSimpleWorkflow::class, @@ -295,7 +288,7 @@ public function testHandleContinuesScanningAfterSkippedWorkflow(): void { Queue::fake(); - $timeout = (int) config('workflows.watchdog_timeout', 300); + $timeout = Watchdog::DEFAULT_TIMEOUT; $skippedWorkflow = StoredWorkflow::create([ 'class' => TestSimpleWorkflow::class, @@ -326,7 +319,7 @@ public function testHandleSkipsWorkflowAlreadyClaimedForRecovery(): void { Queue::fake(); - $timeout = (int) config('workflows.watchdog_timeout', 300); + $timeout = Watchdog::DEFAULT_TIMEOUT; $storedWorkflow = StoredWorkflow::create([ 'class' => TestSimpleWorkflow::class, @@ -349,7 +342,7 @@ public function testHandleReleasesRecoveryClaimAfterRecoveringWorkflow(): void { Queue::fake(); - $timeout = (int) config('workflows.watchdog_timeout', 300); + $timeout = Watchdog::DEFAULT_TIMEOUT; $storedWorkflow = StoredWorkflow::create([ 'class' => TestSimpleWorkflow::class, @@ -369,7 +362,7 @@ public function testHandleSkipsWorkflowThatStopsBeingPendingAfterRefresh(): void { Queue::fake(); - $timeout = (int) config('workflows.watchdog_timeout', 300); + $timeout = Watchdog::DEFAULT_TIMEOUT; $modelClass = new class() extends StoredWorkflow { public function refresh(): static { @@ -404,7 +397,7 @@ public function testHandleReleasesCurrentJobWhenRunningOnQueue(): void { Queue::fake(); - $timeout = (int) config('workflows.watchdog_timeout', 300); + $timeout = Watchdog::DEFAULT_TIMEOUT; $job = $this->createMock(JobContract::class); $job->expects($this->once()) ->method('release') @@ -417,7 +410,7 @@ public function testHandleReleasesCurrentJobWhenRunningOnQueue(): void private function createStalePendingWorkflow(array $attributes = []): StoredWorkflow { - $timeout = (int) config('workflows.watchdog_timeout', 300); + $timeout = Watchdog::DEFAULT_TIMEOUT; return StoredWorkflow::create(array_merge([ 'class' => TestSimpleWorkflow::class, From 0ecb8d18a0c2c734db51c61fad268dccb7fd786c Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Thu, 2 Apr 2026 13:10:29 +0000 Subject: [PATCH 10/19] Cover watchdog wake race path --- tests/Unit/WatchdogTest.php | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/Unit/WatchdogTest.php b/tests/Unit/WatchdogTest.php index ea7b340b..8437bc9e 100644 --- a/tests/Unit/WatchdogTest.php +++ b/tests/Unit/WatchdogTest.php @@ -217,6 +217,38 @@ public function testWakeSkipsWhenNoRecoverablePendingWorkflowsExist(): void Queue::assertNotPushed(Watchdog::class); } + public function testWakeSkipsWhenAnotherWorkerClaimsMarkerFirst(): void + { + Queue::fake(); + + $this->createStalePendingWorkflow(); + + $modelClass = new class() extends StoredWorkflow { + public function newQuery() + { + Cache::put('workflow:watchdog', true, Watchdog::DEFAULT_TIMEOUT); + + return parent::newQuery(); + } + }; + $modelClassName = get_class($modelClass); + $originalModel = config('workflows.stored_workflow_model'); + + config([ + 'workflows.stored_workflow_model' => $modelClassName, + ]); + + try { + Watchdog::wake('redis'); + } finally { + config([ + 'workflows.stored_workflow_model' => $originalModel, + ]); + } + + Queue::assertNotPushed(Watchdog::class); + } + public function testWakeIsIdempotent(): void { Queue::fake(); From aa62d1d61b1cc4c872dc21962395168ce8cab30f Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Thu, 2 Apr 2026 18:52:16 +0000 Subject: [PATCH 11/19] Harden watchdog wake flow --- src/ChildWorkflow.php | 2 +- src/Exception.php | 2 +- src/Watchdog.php | 34 +++++---- .../WithoutOverlappingMiddlewareTest.php | 41 +++++++++++ tests/Unit/WatchdogTest.php | 72 ++++++++++++++++++- 5 files changed, 135 insertions(+), 16 deletions(-) diff --git a/src/ChildWorkflow.php b/src/ChildWorkflow.php index 631b3c5a..00378af8 100644 --- a/src/ChildWorkflow.php +++ b/src/ChildWorkflow.php @@ -73,7 +73,7 @@ public function handle() public function middleware() { return [ - new WithoutOverlappingMiddleware($this->parentWorkflow->id, WithoutOverlappingMiddleware::ACTIVITY, 0, 15), + new WithoutOverlappingMiddleware($this->parentWorkflow->id, WithoutOverlappingMiddleware::WORKFLOW, 0, 15), ]; } } diff --git a/src/Exception.php b/src/Exception.php index de177e51..26709296 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -68,7 +68,7 @@ public function middleware() return [ new WithoutOverlappingMiddleware( $this->storedWorkflow->id, - WithoutOverlappingMiddleware::ACTIVITY, + WithoutOverlappingMiddleware::WORKFLOW, 0, 15 ), diff --git a/src/Watchdog.php b/src/Watchdog.php index 9ff40100..cc941f50 100644 --- a/src/Watchdog.php +++ b/src/Watchdog.php @@ -6,18 +6,18 @@ use Illuminate\Bus\Queueable; use Illuminate\Bus\UniqueLock; +use Illuminate\Contracts\Bus\Dispatcher; use Illuminate\Contracts\Queue\ShouldBeEncrypted; use Illuminate\Contracts\Queue\ShouldQueue; -use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Support\Carbon; use Illuminate\Support\Facades\Cache; +use Illuminate\Support\Facades\DB; use Workflow\Models\StoredWorkflow; use Workflow\States\WorkflowPendingStatus; class Watchdog implements ShouldBeEncrypted, ShouldQueue { - use Dispatchable; use InteractsWithQueue; use Queueable; @@ -51,20 +51,28 @@ public static function wake(string $connection, ?string $queue = null): void return; } - if (! Cache::add(self::CACHE_KEY, true, $timeout)) { - return; - } + $queue = self::normalizeQueue($queue); - $dispatch = static::dispatch() - ->afterCommit() - ->delay($timeout) - ->onConnection($connection); + DB::afterCommit(static function () use ($connection, $queue, $timeout): void { + if (! Cache::add(self::CACHE_KEY, true, $timeout)) { + return; + } - $queue = self::normalizeQueue($queue); + $watchdog = (new self()) + ->onConnection($connection); - if ($queue !== null) { - $dispatch->onQueue($queue); - } + if ($queue !== null) { + $watchdog->onQueue($queue); + } + + try { + app(Dispatcher::class)->dispatch($watchdog); + } catch (\Throwable $exception) { + Cache::forget(self::CACHE_KEY); + + throw $exception; + } + }); } public function handle(): void diff --git a/tests/Unit/Middleware/WithoutOverlappingMiddlewareTest.php b/tests/Unit/Middleware/WithoutOverlappingMiddlewareTest.php index 11d19c82..d7a1eef4 100644 --- a/tests/Unit/Middleware/WithoutOverlappingMiddlewareTest.php +++ b/tests/Unit/Middleware/WithoutOverlappingMiddlewareTest.php @@ -325,6 +325,47 @@ public function testUnlockActivityAppliesTtlWhenExpiresAfterIsConfigured(): void $this->assertTrue($result); } + public function testUnlockActivityAppliesTtlWhenOtherActivitiesRemain(): void + { + $job = new \stdClass(); + $job->key = 'test-activity-key'; + + $remainingKey = 'other-activity-key'; + + $lock = $this->mock(Lock::class, static function (MockInterface $mock) { + $mock->shouldReceive('get') + ->once() + ->andReturn(true); + $mock->shouldReceive('release') + ->once(); + }); + + $this->mock(Repository::class, static function (MockInterface $mock) use ($job, $lock, $remainingKey) { + $mock->shouldReceive('lock') + ->once() + ->andReturn($lock); + $mock->shouldReceive('get') + ->with('laravel-workflow-overlap:1:activity', []) + ->andReturn([$job->key, $remainingKey]); + $mock->shouldReceive('put') + ->with('laravel-workflow-overlap:1:activity', [$remainingKey], 60) + ->once(); + $mock->shouldReceive('forget') + ->with($job->key) + ->once(); + $mock->shouldReceive('has') + ->with($remainingKey) + ->once() + ->andReturn(false); + }); + + $middleware = new WithoutOverlappingMiddleware(1, WithoutOverlappingMiddleware::ACTIVITY, 0, 60); + + $result = $middleware->unlock($job); + + $this->assertTrue($result); + } + public function testUnlockActivityRetriesOnLockFailure(): void { $job = new \stdClass(); diff --git a/tests/Unit/WatchdogTest.php b/tests/Unit/WatchdogTest.php index 8437bc9e..ba91ba1c 100644 --- a/tests/Unit/WatchdogTest.php +++ b/tests/Unit/WatchdogTest.php @@ -4,9 +4,12 @@ namespace Tests\Unit; +use Illuminate\Contracts\Bus\Dispatcher; use Illuminate\Contracts\Queue\Job as JobContract; use Illuminate\Support\Facades\Cache; +use Illuminate\Support\Facades\DB; use Illuminate\Support\Facades\Queue; +use RuntimeException; use Tests\Fixtures\TestSimpleWorkflow; use Tests\TestCase; use Workflow\Models\StoredWorkflow; @@ -160,7 +163,10 @@ public function testWakeDispatchesWhenPendingWorkflowNeedsRecovery(): void Watchdog::wake('redis'); - Queue::assertPushed(Watchdog::class); + Queue::assertPushed(Watchdog::class, static function (Watchdog $watchdog): bool { + return $watchdog->connection === 'redis' + && $watchdog->delay === null; + }); $this->assertTrue(Cache::has('workflow:watchdog')); } @@ -249,6 +255,70 @@ public function newQuery() Queue::assertNotPushed(Watchdog::class); } + public function testWakeWaitsForCommitBeforeDispatching(): void + { + Queue::fake(); + + $this->createStalePendingWorkflow(); + + DB::transaction(function (): void { + Watchdog::wake('redis'); + + Queue::assertNotPushed(Watchdog::class); + $this->assertFalse(Cache::has('workflow:watchdog')); + }); + + Queue::assertPushed(Watchdog::class, static function (Watchdog $watchdog): bool { + return $watchdog->connection === 'redis' + && $watchdog->delay === null; + }); + $this->assertTrue(Cache::has('workflow:watchdog')); + } + + public function testWakeDoesNotSetMarkerOrDispatchOnRollback(): void + { + Queue::fake(); + + $this->createStalePendingWorkflow(); + + try { + DB::transaction(function (): void { + Watchdog::wake('redis'); + + Queue::assertNotPushed(Watchdog::class); + $this->assertFalse(Cache::has('workflow:watchdog')); + + throw new RuntimeException('rollback'); + }); + } catch (RuntimeException $exception) { + $this->assertSame('rollback', $exception->getMessage()); + } + + Queue::assertNotPushed(Watchdog::class); + $this->assertFalse(Cache::has('workflow:watchdog')); + } + + public function testWakeClearsMarkerWhenDispatchFails(): void + { + $this->createStalePendingWorkflow(); + + $dispatcher = $this->createMock(Dispatcher::class); + $dispatcher->expects($this->once()) + ->method('dispatch') + ->willThrowException(new RuntimeException('dispatch failed')); + + $this->app->instance(Dispatcher::class, $dispatcher); + + try { + Watchdog::wake('redis'); + $this->fail('Expected dispatch failure to be rethrown.'); + } catch (RuntimeException $exception) { + $this->assertSame('dispatch failed', $exception->getMessage()); + } + + $this->assertFalse(Cache::has('workflow:watchdog')); + } + public function testWakeIsIdempotent(): void { Queue::fake(); From d9e6a48943c0509d9ed84cd0e441b25967dfa79a Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Thu, 2 Apr 2026 19:29:16 +0000 Subject: [PATCH 12/19] Scope callback overlap locks --- src/ChildWorkflow.php | 7 ++++++- src/Exception.php | 2 +- tests/Unit/ChildWorkflowTest.php | 23 +++++++++++++++++++++++ tests/Unit/ExceptionTest.php | 7 ++++++- tests/Unit/WorkflowTest.php | 22 ++++++++++++++++++++++ 5 files changed, 58 insertions(+), 3 deletions(-) diff --git a/src/ChildWorkflow.php b/src/ChildWorkflow.php index 00378af8..438e1d72 100644 --- a/src/ChildWorkflow.php +++ b/src/ChildWorkflow.php @@ -73,7 +73,12 @@ public function handle() public function middleware() { return [ - new WithoutOverlappingMiddleware($this->parentWorkflow->id, WithoutOverlappingMiddleware::WORKFLOW, 0, 15), + new WithoutOverlappingMiddleware( + $this->parentWorkflow->id . ':callbacks', + WithoutOverlappingMiddleware::WORKFLOW, + 0, + 15 + ), ]; } } diff --git a/src/Exception.php b/src/Exception.php index 26709296..cb3874b4 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -67,7 +67,7 @@ public function middleware() { return [ new WithoutOverlappingMiddleware( - $this->storedWorkflow->id, + $this->storedWorkflow->id . ':callbacks', WithoutOverlappingMiddleware::WORKFLOW, 0, 15 diff --git a/tests/Unit/ChildWorkflowTest.php b/tests/Unit/ChildWorkflowTest.php index 49da05bf..005a1484 100644 --- a/tests/Unit/ChildWorkflowTest.php +++ b/tests/Unit/ChildWorkflowTest.php @@ -8,6 +8,7 @@ use Tests\Fixtures\TestWorkflow; use Tests\TestCase; use Workflow\ChildWorkflow; +use Workflow\Middleware\WithoutOverlappingMiddleware; use Workflow\Models\StoredWorkflow; use Workflow\Serializers\Serializer; use Workflow\States\WorkflowRunningStatus; @@ -15,6 +16,28 @@ final class ChildWorkflowTest extends TestCase { + public function testMiddlewareUsesDedicatedCallbackLock(): void + { + $parent = WorkflowStub::make(TestWorkflow::class); + $storedParent = StoredWorkflow::findOrFail($parent->id()); + + $storedChild = StoredWorkflow::create([ + 'class' => TestChildWorkflow::class, + 'arguments' => Serializer::serialize([]), + ]); + + $job = new ChildWorkflow(0, now()->toDateTimeString(), $storedChild, true, $storedParent); + + $middleware = collect($job->middleware()) + ->values(); + + $this->assertCount(1, $middleware); + $this->assertSame(WithoutOverlappingMiddleware::class, get_class($middleware[0])); + $this->assertSame(WithoutOverlappingMiddleware::WORKFLOW, $middleware[0]->type); + $this->assertSame($storedParent->id . ':callbacks', $middleware[0]->key); + $this->assertSame(15, $middleware[0]->expiresAfter); + } + public function testHandleReleasesWhenParentWorkflowIsRunning(): void { $parent = WorkflowStub::make(TestWorkflow::class); diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index 1f6cf3ca..3be31314 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -17,7 +17,10 @@ final class ExceptionTest extends TestCase { public function testMiddleware(): void { - $exception = new Exception(0, now()->toDateTimeString(), new StoredWorkflow(), new \Exception( + $storedWorkflow = new StoredWorkflow(); + $storedWorkflow->id = 123; + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, new \Exception( 'Test exception' )); @@ -26,6 +29,8 @@ public function testMiddleware(): void $this->assertCount(1, $middleware); $this->assertSame(WithoutOverlappingMiddleware::class, get_class($middleware[0])); + $this->assertSame(WithoutOverlappingMiddleware::WORKFLOW, $middleware[0]->type); + $this->assertSame('123:callbacks', $middleware[0]->key); $this->assertSame(15, $middleware[0]->expiresAfter); } diff --git a/tests/Unit/WorkflowTest.php b/tests/Unit/WorkflowTest.php index 9cc80c8b..96b2e247 100644 --- a/tests/Unit/WorkflowTest.php +++ b/tests/Unit/WorkflowTest.php @@ -5,6 +5,7 @@ namespace Tests\Unit; use BadMethodCallException; +use Illuminate\Contracts\Queue\Job as JobContract; use Illuminate\Support\Carbon; use Illuminate\Support\Facades\Event; use Mockery; @@ -496,4 +497,25 @@ public function testRedeliveredJobResumesFromRunningState(): void $this->assertSame(WorkflowWaitingStatus::class, $stub->status()); } + + public function testWaitingWorkflowRedeliveryReleasesForRetry(): void + { + $stub = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($stub->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowWaitingStatus::$name, + ]); + + $job = Mockery::mock(JobContract::class); + $job->shouldReceive('release') + ->once() + ->with(0); + + $workflow = new TestWorkflow($storedWorkflow); + $workflow->setJob($job); + $workflow->handle(); + + $this->assertSame(WorkflowWaitingStatus::class, $stub->status()); + } } From 82772e1077397d5585c101291c14525292b5ba5b Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Thu, 2 Apr 2026 19:54:46 +0000 Subject: [PATCH 13/19] Keep callback jobs activity-scoped --- src/ChildWorkflow.php | 7 +------ src/Exception.php | 4 ++-- tests/Unit/ChildWorkflowTest.php | 23 ----------------------- tests/Unit/ExceptionTest.php | 7 +------ 4 files changed, 4 insertions(+), 37 deletions(-) diff --git a/src/ChildWorkflow.php b/src/ChildWorkflow.php index 438e1d72..631b3c5a 100644 --- a/src/ChildWorkflow.php +++ b/src/ChildWorkflow.php @@ -73,12 +73,7 @@ public function handle() public function middleware() { return [ - new WithoutOverlappingMiddleware( - $this->parentWorkflow->id . ':callbacks', - WithoutOverlappingMiddleware::WORKFLOW, - 0, - 15 - ), + new WithoutOverlappingMiddleware($this->parentWorkflow->id, WithoutOverlappingMiddleware::ACTIVITY, 0, 15), ]; } } diff --git a/src/Exception.php b/src/Exception.php index cb3874b4..de177e51 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -67,8 +67,8 @@ public function middleware() { return [ new WithoutOverlappingMiddleware( - $this->storedWorkflow->id . ':callbacks', - WithoutOverlappingMiddleware::WORKFLOW, + $this->storedWorkflow->id, + WithoutOverlappingMiddleware::ACTIVITY, 0, 15 ), diff --git a/tests/Unit/ChildWorkflowTest.php b/tests/Unit/ChildWorkflowTest.php index 005a1484..49da05bf 100644 --- a/tests/Unit/ChildWorkflowTest.php +++ b/tests/Unit/ChildWorkflowTest.php @@ -8,7 +8,6 @@ use Tests\Fixtures\TestWorkflow; use Tests\TestCase; use Workflow\ChildWorkflow; -use Workflow\Middleware\WithoutOverlappingMiddleware; use Workflow\Models\StoredWorkflow; use Workflow\Serializers\Serializer; use Workflow\States\WorkflowRunningStatus; @@ -16,28 +15,6 @@ final class ChildWorkflowTest extends TestCase { - public function testMiddlewareUsesDedicatedCallbackLock(): void - { - $parent = WorkflowStub::make(TestWorkflow::class); - $storedParent = StoredWorkflow::findOrFail($parent->id()); - - $storedChild = StoredWorkflow::create([ - 'class' => TestChildWorkflow::class, - 'arguments' => Serializer::serialize([]), - ]); - - $job = new ChildWorkflow(0, now()->toDateTimeString(), $storedChild, true, $storedParent); - - $middleware = collect($job->middleware()) - ->values(); - - $this->assertCount(1, $middleware); - $this->assertSame(WithoutOverlappingMiddleware::class, get_class($middleware[0])); - $this->assertSame(WithoutOverlappingMiddleware::WORKFLOW, $middleware[0]->type); - $this->assertSame($storedParent->id . ':callbacks', $middleware[0]->key); - $this->assertSame(15, $middleware[0]->expiresAfter); - } - public function testHandleReleasesWhenParentWorkflowIsRunning(): void { $parent = WorkflowStub::make(TestWorkflow::class); diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index 3be31314..1f6cf3ca 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -17,10 +17,7 @@ final class ExceptionTest extends TestCase { public function testMiddleware(): void { - $storedWorkflow = new StoredWorkflow(); - $storedWorkflow->id = 123; - - $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, new \Exception( + $exception = new Exception(0, now()->toDateTimeString(), new StoredWorkflow(), new \Exception( 'Test exception' )); @@ -29,8 +26,6 @@ public function testMiddleware(): void $this->assertCount(1, $middleware); $this->assertSame(WithoutOverlappingMiddleware::class, get_class($middleware[0])); - $this->assertSame(WithoutOverlappingMiddleware::WORKFLOW, $middleware[0]->type); - $this->assertSame('123:callbacks', $middleware[0]->key); $this->assertSame(15, $middleware[0]->expiresAfter); } From 844da8e0c037d8ea0653113c8e149bff3cf4e317 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Thu, 2 Apr 2026 22:13:12 +0000 Subject: [PATCH 14/19] Move watchdog pending check under lock --- src/Watchdog.php | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Watchdog.php b/src/Watchdog.php index cc941f50..1296c355 100644 --- a/src/Watchdog.php +++ b/src/Watchdog.php @@ -97,18 +97,18 @@ public function handle(): void private static function recover(StoredWorkflow $storedWorkflow, int $timeout): bool { - $storedWorkflow->refresh(); - - if ($storedWorkflow->status::class !== WorkflowPendingStatus::class) { - return false; - } - $claimTtl = self::bootstrapWindow($timeout); $workflowStub = $storedWorkflow->toWorkflow(); $workflowJob = new $storedWorkflow->class($storedWorkflow, ...$storedWorkflow->workflowArguments()); return (bool) (Cache::lock(self::RECOVERY_LOCK_PREFIX . $storedWorkflow->id, $claimTtl) ->get(static function () use ($storedWorkflow, $workflowJob, $workflowStub): bool { + $storedWorkflow->refresh(); + + if ($storedWorkflow->status::class !== WorkflowPendingStatus::class) { + return false; + } + $storedWorkflow->touch(); (new UniqueLock(Cache::driver()))->release($workflowJob); From d5907235046686f13b0af9d16cc39951bafefd9f Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Thu, 2 Apr 2026 22:40:24 +0000 Subject: [PATCH 15/19] Clear watchdog wake throttle on dispatch failure --- src/Watchdog.php | 1 + tests/Unit/WatchdogTest.php | 1 + 2 files changed, 2 insertions(+) diff --git a/src/Watchdog.php b/src/Watchdog.php index 1296c355..bac23b44 100644 --- a/src/Watchdog.php +++ b/src/Watchdog.php @@ -69,6 +69,7 @@ public static function wake(string $connection, ?string $queue = null): void app(Dispatcher::class)->dispatch($watchdog); } catch (\Throwable $exception) { Cache::forget(self::CACHE_KEY); + Cache::forget(self::LOOP_THROTTLE_KEY); throw $exception; } diff --git a/tests/Unit/WatchdogTest.php b/tests/Unit/WatchdogTest.php index ba91ba1c..11b352b3 100644 --- a/tests/Unit/WatchdogTest.php +++ b/tests/Unit/WatchdogTest.php @@ -317,6 +317,7 @@ public function testWakeClearsMarkerWhenDispatchFails(): void } $this->assertFalse(Cache::has('workflow:watchdog')); + $this->assertFalse(Cache::has('workflow:watchdog:looping')); } public function testWakeIsIdempotent(): void From 0b2f526806d1dbc7b0ddbc6df35689431238988a Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Thu, 2 Apr 2026 23:07:36 +0000 Subject: [PATCH 16/19] Address remaining watchdog review threads --- src/Watchdog.php | 7 ++++--- tests/Unit/WatchdogTest.php | 35 +++++++++++++++++++++-------------- 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/src/Watchdog.php b/src/Watchdog.php index bac23b44..103315d4 100644 --- a/src/Watchdog.php +++ b/src/Watchdog.php @@ -99,17 +99,18 @@ public function handle(): void private static function recover(StoredWorkflow $storedWorkflow, int $timeout): bool { $claimTtl = self::bootstrapWindow($timeout); - $workflowStub = $storedWorkflow->toWorkflow(); - $workflowJob = new $storedWorkflow->class($storedWorkflow, ...$storedWorkflow->workflowArguments()); return (bool) (Cache::lock(self::RECOVERY_LOCK_PREFIX . $storedWorkflow->id, $claimTtl) - ->get(static function () use ($storedWorkflow, $workflowJob, $workflowStub): bool { + ->get(static function () use ($storedWorkflow): bool { $storedWorkflow->refresh(); if ($storedWorkflow->status::class !== WorkflowPendingStatus::class) { return false; } + $workflowStub = $storedWorkflow->toWorkflow(); + $workflowJob = new $storedWorkflow->class($storedWorkflow, ...$storedWorkflow->workflowArguments()); + $storedWorkflow->touch(); (new UniqueLock(Cache::driver()))->release($workflowJob); diff --git a/tests/Unit/WatchdogTest.php b/tests/Unit/WatchdogTest.php index 11b352b3..974f6faa 100644 --- a/tests/Unit/WatchdogTest.php +++ b/tests/Unit/WatchdogTest.php @@ -475,25 +475,32 @@ public function refresh(): static } }; $modelClassName = get_class($modelClass); + $originalStoredWorkflowModel = config('workflows.stored_workflow_model'); - config([ - 'workflows.stored_workflow_model' => $modelClassName, - ]); + try { + config([ + 'workflows.stored_workflow_model' => $modelClassName, + ]); - $storedWorkflow = $modelClassName::create([ - 'class' => TestSimpleWorkflow::class, - 'arguments' => Serializer::serialize([]), - 'status' => WorkflowPendingStatus::$name, - 'updated_at' => now() - ->subSeconds($timeout + 1), - ]); + $storedWorkflow = $modelClassName::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ]); - $watchdog = new Watchdog(); - $watchdog->handle(); + $watchdog = new Watchdog(); + $watchdog->handle(); - $storedWorkflow->refresh(); + $storedWorkflow->refresh(); - Queue::assertNotPushed(TestSimpleWorkflow::class); + Queue::assertNotPushed(TestSimpleWorkflow::class); + } finally { + config([ + 'workflows.stored_workflow_model' => $originalStoredWorkflowModel, + ]); + } } public function testHandleReleasesCurrentJobWhenRunningOnQueue(): void From 053b1a360c45dd1d07d2c63fb45ac1774c288f31 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 00:52:03 +0000 Subject: [PATCH 17/19] Fix watchdog unique lock test key --- tests/Unit/WatchdogTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/Unit/WatchdogTest.php b/tests/Unit/WatchdogTest.php index 974f6faa..c0f559c0 100644 --- a/tests/Unit/WatchdogTest.php +++ b/tests/Unit/WatchdogTest.php @@ -376,7 +376,7 @@ public function testHandleRecoversPendingWorkflowOnStoredConnectionAndQueue(): v ->subSeconds($timeout + 1), ]); - Cache::lock('laravel_unique_job:' . TestSimpleWorkflow::class . $storedWorkflow->id)->get(); + Cache::lock('laravel_unique_job:' . TestSimpleWorkflow::class . ':' . $storedWorkflow->id)->get(); $watchdog = new Watchdog(); $watchdog->handle(); From db4f639ceb479504c32241d2536087a97bdacf01 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 01:46:43 +0000 Subject: [PATCH 18/19] Avoid newQuery override in watchdog test --- tests/Unit/WatchdogTest.php | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/Unit/WatchdogTest.php b/tests/Unit/WatchdogTest.php index c0f559c0..be758f63 100644 --- a/tests/Unit/WatchdogTest.php +++ b/tests/Unit/WatchdogTest.php @@ -230,11 +230,14 @@ public function testWakeSkipsWhenAnotherWorkerClaimsMarkerFirst(): void $this->createStalePendingWorkflow(); $modelClass = new class() extends StoredWorkflow { - public function newQuery() + protected static function booted(): void { - Cache::put('workflow:watchdog', true, Watchdog::DEFAULT_TIMEOUT); - - return parent::newQuery(); + static::addGlobalScope( + 'mark-watchdog-present', + static function (\Illuminate\Database\Eloquent\Builder $builder): void { + Cache::put('workflow:watchdog', true, Watchdog::DEFAULT_TIMEOUT); + } + ); } }; $modelClassName = get_class($modelClass); From d93516230e3bc5fe7f546b05baf1c488aeae6481 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 03:38:24 +0000 Subject: [PATCH 19/19] Harden watchdog after-commit wake path --- src/Watchdog.php | 25 +++++++++++++------------ tests/Unit/WatchdogTest.php | 1 + 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/Watchdog.php b/src/Watchdog.php index 103315d4..92758e08 100644 --- a/src/Watchdog.php +++ b/src/Watchdog.php @@ -39,21 +39,21 @@ public static function wake(string $connection, ?string $queue = null): void { $timeout = self::timeout(); - if (Cache::has(self::CACHE_KEY)) { - return; - } + $queue = self::normalizeQueue($queue); - if (! Cache::add(self::LOOP_THROTTLE_KEY, true, 60)) { - return; - } + DB::afterCommit(static function () use ($connection, $queue, $timeout): void { + if (Cache::has(self::CACHE_KEY)) { + return; + } - if (! self::hasRecoverablePendingWorkflows($timeout)) { - return; - } + if (! Cache::add(self::LOOP_THROTTLE_KEY, true, 60)) { + return; + } - $queue = self::normalizeQueue($queue); + if (! self::hasRecoverablePendingWorkflows($timeout)) { + return; + } - DB::afterCommit(static function () use ($connection, $queue, $timeout): void { if (! Cache::add(self::CACHE_KEY, true, $timeout)) { return; } @@ -109,7 +109,8 @@ private static function recover(StoredWorkflow $storedWorkflow, int $timeout): b } $workflowStub = $storedWorkflow->toWorkflow(); - $workflowJob = new $storedWorkflow->class($storedWorkflow, ...$storedWorkflow->workflowArguments()); + $workflowClass = $storedWorkflow->class; + $workflowJob = new $workflowClass($storedWorkflow, ...$storedWorkflow->workflowArguments()); $storedWorkflow->touch(); diff --git a/tests/Unit/WatchdogTest.php b/tests/Unit/WatchdogTest.php index be758f63..44161b2e 100644 --- a/tests/Unit/WatchdogTest.php +++ b/tests/Unit/WatchdogTest.php @@ -299,6 +299,7 @@ public function testWakeDoesNotSetMarkerOrDispatchOnRollback(): void Queue::assertNotPushed(Watchdog::class); $this->assertFalse(Cache::has('workflow:watchdog')); + $this->assertFalse(Cache::has('workflow:watchdog:looping')); } public function testWakeClearsMarkerWhenDispatchFails(): void