diff --git a/composer.json b/composer.json index 5af7e245..a67f5a68 100644 --- a/composer.json +++ b/composer.json @@ -22,7 +22,10 @@ "scripts": { "ecs": "vendor/bin/ecs check --fix", "stan": "vendor/bin/phpstan analyse src tests", - "feature": "phpunit --testdox --testsuite feature", + "feature": [ + "Composer\\Config::disableProcessTimeout", + "phpunit --testdox --testsuite feature" + ], "unit": "phpunit --testdox --testsuite unit", "test": "phpunit --testdox", "coverage": "XDEBUG_MODE=coverage phpunit --testdox --testsuite unit --coverage-clover coverage.xml", diff --git a/src/Providers/WorkflowServiceProvider.php b/src/Providers/WorkflowServiceProvider.php index 433f774a..2a050857 100644 --- a/src/Providers/WorkflowServiceProvider.php +++ b/src/Providers/WorkflowServiceProvider.php @@ -4,10 +4,13 @@ namespace Workflow\Providers; +use Illuminate\Queue\Events\Looping; +use Illuminate\Support\Facades\Event; use Illuminate\Support\ServiceProvider; use Laravel\SerializableClosure\SerializableClosure; use Workflow\Commands\ActivityMakeCommand; use Workflow\Commands\WorkflowMakeCommand; +use Workflow\Watchdog; final class WorkflowServiceProvider extends ServiceProvider { @@ -24,5 +27,9 @@ public function boot(): void ], 'migrations'); $this->commands([ActivityMakeCommand::class, WorkflowMakeCommand::class]); + + Event::listen(Looping::class, static function (Looping $event): void { + Watchdog::wake($event->connectionName, $event->queue); + }); } } diff --git a/src/Watchdog.php b/src/Watchdog.php new file mode 100644 index 00000000..92758e08 --- /dev/null +++ b/src/Watchdog.php @@ -0,0 +1,161 @@ +onConnection($connection); + + if ($queue !== null) { + $watchdog->onQueue($queue); + } + + try { + app(Dispatcher::class)->dispatch($watchdog); + } catch (\Throwable $exception) { + Cache::forget(self::CACHE_KEY); + Cache::forget(self::LOOP_THROTTLE_KEY); + + throw $exception; + } + }); + } + + public function handle(): void + { + $timeout = self::timeout(); + + Cache::put(self::CACHE_KEY, true, $timeout); + + $model = config('workflows.stored_workflow_model', StoredWorkflow::class); + + $model::where('status', WorkflowPendingStatus::$name) + ->where('updated_at', '<=', Carbon::now()->subSeconds($timeout)) + ->whereNotNull('arguments') + ->each(static function (StoredWorkflow $storedWorkflow) use ($timeout): void { + self::recover($storedWorkflow, $timeout); + }); + + if ($this->job !== null) { + $this->release($timeout); + } + } + + private static function recover(StoredWorkflow $storedWorkflow, int $timeout): bool + { + $claimTtl = self::bootstrapWindow($timeout); + + return (bool) (Cache::lock(self::RECOVERY_LOCK_PREFIX . $storedWorkflow->id, $claimTtl) + ->get(static function () use ($storedWorkflow): bool { + $storedWorkflow->refresh(); + + if ($storedWorkflow->status::class !== WorkflowPendingStatus::class) { + return false; + } + + $workflowStub = $storedWorkflow->toWorkflow(); + $workflowClass = $storedWorkflow->class; + $workflowJob = new $workflowClass($storedWorkflow, ...$storedWorkflow->workflowArguments()); + + $storedWorkflow->touch(); + + (new UniqueLock(Cache::driver()))->release($workflowJob); + + $workflowStub->resume(); + + return true; + }) ?? false); + } + + private static function timeout(): int + { + return self::DEFAULT_TIMEOUT; + } + + private static function hasRecoverablePendingWorkflows(int $timeout): bool + { + $model = config('workflows.stored_workflow_model', StoredWorkflow::class); + + return $model::where('status', WorkflowPendingStatus::$name) + ->where('updated_at', '<=', Carbon::now()->subSeconds($timeout)) + ->whereNotNull('arguments') + ->exists(); + } + + private static function bootstrapWindow(int $timeout): int + { + return max(1, min($timeout, 60)); + } + + private static function normalizeQueue(?string $queue): ?string + { + if ($queue === null) { + return null; + } + + foreach (explode(',', $queue) as $candidate) { + $candidate = trim($candidate); + + if ($candidate !== '') { + return $candidate; + } + } + + return null; + } +} diff --git a/src/Workflow.php b/src/Workflow.php index 20b80b6e..6f71ff4d 100644 --- a/src/Workflow.php +++ b/src/Workflow.php @@ -177,10 +177,14 @@ public function handle(): void $this->storedWorkflow->status->transitionTo(WorkflowRunningStatus::class); } } catch (TransitionNotFound) { - if ($this->storedWorkflow->toWorkflow()->running()) { - $this->release(); + $this->storedWorkflow->refresh(); + + if ($this->storedWorkflow->status::class !== WorkflowRunningStatus::class) { + if ($this->storedWorkflow->toWorkflow()->running()) { + $this->release(); + } + return; } - return; } $parentWorkflow = $this->storedWorkflow->parents() diff --git a/tests/Unit/Middleware/WithoutOverlappingMiddlewareTest.php b/tests/Unit/Middleware/WithoutOverlappingMiddlewareTest.php index 11d19c82..d7a1eef4 100644 --- a/tests/Unit/Middleware/WithoutOverlappingMiddlewareTest.php +++ b/tests/Unit/Middleware/WithoutOverlappingMiddlewareTest.php @@ -325,6 +325,47 @@ public function testUnlockActivityAppliesTtlWhenExpiresAfterIsConfigured(): void $this->assertTrue($result); } + public function testUnlockActivityAppliesTtlWhenOtherActivitiesRemain(): void + { + $job = new \stdClass(); + $job->key = 'test-activity-key'; + + $remainingKey = 'other-activity-key'; + + $lock = $this->mock(Lock::class, static function (MockInterface $mock) { + $mock->shouldReceive('get') + ->once() + ->andReturn(true); + $mock->shouldReceive('release') + ->once(); + }); + + $this->mock(Repository::class, static function (MockInterface $mock) use ($job, $lock, $remainingKey) { + $mock->shouldReceive('lock') + ->once() + ->andReturn($lock); + $mock->shouldReceive('get') + ->with('laravel-workflow-overlap:1:activity', []) + ->andReturn([$job->key, $remainingKey]); + $mock->shouldReceive('put') + ->with('laravel-workflow-overlap:1:activity', [$remainingKey], 60) + ->once(); + $mock->shouldReceive('forget') + ->with($job->key) + ->once(); + $mock->shouldReceive('has') + ->with($remainingKey) + ->once() + ->andReturn(false); + }); + + $middleware = new WithoutOverlappingMiddleware(1, WithoutOverlappingMiddleware::ACTIVITY, 0, 60); + + $result = $middleware->unlock($job); + + $this->assertTrue($result); + } + public function testUnlockActivityRetriesOnLockFailure(): void { $job = new \stdClass(); diff --git a/tests/Unit/Providers/WorkflowServiceProviderTest.php b/tests/Unit/Providers/WorkflowServiceProviderTest.php index 7148c48a..69d871cd 100644 --- a/tests/Unit/Providers/WorkflowServiceProviderTest.php +++ b/tests/Unit/Providers/WorkflowServiceProviderTest.php @@ -4,18 +4,39 @@ namespace Tests\Unit; +use Illuminate\Queue\Events\Looping; use Illuminate\Support\Facades\Artisan; +use Illuminate\Support\Facades\Cache; +use Illuminate\Support\Facades\Event; +use Illuminate\Support\Facades\Queue; +use Tests\Fixtures\TestSimpleWorkflow; use Tests\TestCase; +use Workflow\Models\StoredWorkflow; use Workflow\Providers\WorkflowServiceProvider; +use Workflow\Serializers\Serializer; +use Workflow\States\WorkflowPendingStatus; +use Workflow\Watchdog; final class WorkflowServiceProviderTest extends TestCase { protected function setUp(): void { parent::setUp(); + + Cache::forget('workflow:watchdog'); + Cache::forget('workflow:watchdog:looping'); + $this->app->register(WorkflowServiceProvider::class); } + protected function tearDown(): void + { + Cache::forget('workflow:watchdog'); + Cache::forget('workflow:watchdog:looping'); + + parent::tearDown(); + } + public function testProviderLoads(): void { $this->assertTrue( @@ -56,4 +77,75 @@ public function testCommandsAreRegistered(): void ); } } + + public function testLoopingEventWakesWatchdog(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + + StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds(Watchdog::DEFAULT_TIMEOUT + 1), + ]); + + Event::dispatch(new Looping('redis', 'high,default')); + + Queue::assertPushed(Watchdog::class, static function (Watchdog $watchdog): bool { + return $watchdog->connection === 'redis' + && $watchdog->queue === 'high'; + }); + } + + public function testLoopingEventThrottlesWake(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + + StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds(Watchdog::DEFAULT_TIMEOUT + 1), + ]); + + Event::dispatch(new Looping('redis', 'high,default')); + Event::dispatch(new Looping('redis', 'high,default')); + Event::dispatch(new Looping('redis', 'high,default')); + + Queue::assertPushed(Watchdog::class, 1); + } + + public function testLoopingEventSkipsWhenThrottleAlreadyHeld(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + Cache::put('workflow:watchdog:looping', true, 60); + + StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds(Watchdog::DEFAULT_TIMEOUT + 1), + ]); + + Event::dispatch(new Looping('redis', 'high,default')); + + Queue::assertNotPushed(Watchdog::class); + } + + public function testLoopingEventSkipsWhenNoRecoverablePendingWorkflowsExist(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + Cache::forget('workflow:watchdog:looping'); + + Event::dispatch(new Looping('redis', 'high,default')); + + Queue::assertNotPushed(Watchdog::class); + } } diff --git a/tests/Unit/WatchdogTest.php b/tests/Unit/WatchdogTest.php new file mode 100644 index 00000000..44161b2e --- /dev/null +++ b/tests/Unit/WatchdogTest.php @@ -0,0 +1,537 @@ + TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ]); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + $storedWorkflow->refresh(); + $this->assertSame(WorkflowPendingStatus::class, $storedWorkflow->status::class); + + Queue::assertPushed(TestSimpleWorkflow::class, static function (TestSimpleWorkflow $workflow): bool { + return $workflow->connection === 'redis' + && $workflow->queue === 'default'; + }); + } + + public function testHandleIgnoresRecentPendingWorkflows(): void + { + Queue::fake(); + + StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + Queue::assertNotPushed(TestSimpleWorkflow::class); + } + + public function testHandleIgnoresPendingWithoutArguments(): void + { + Queue::fake(); + + $timeout = Watchdog::DEFAULT_TIMEOUT; + + StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ]); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + Queue::assertNotPushed(TestSimpleWorkflow::class); + } + + public function testHandleSkipsAlreadyRecoveredWorkflow(): void + { + Queue::fake(); + + $timeout = Watchdog::DEFAULT_TIMEOUT; + + $storedWorkflow = StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ]); + + $storedWorkflow->update([ + 'status' => WorkflowRunningStatus::$name, + ]); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + Queue::assertNotPushed(TestSimpleWorkflow::class); + } + + public function testHandleSkipsAlreadyCompletedWorkflow(): void + { + Queue::fake(); + + $timeout = Watchdog::DEFAULT_TIMEOUT; + + StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowCompletedStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ]); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + Queue::assertNotPushed(TestSimpleWorkflow::class); + } + + public function testHandleRefreshesWatchdogMarker(): void + { + Queue::fake(); + + Cache::forget('workflow:watchdog'); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + $this->assertTrue(Cache::has('workflow:watchdog')); + } + + public function testWakeDispatchesWhenPendingWorkflowNeedsRecovery(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + + $this->createStalePendingWorkflow(); + + Watchdog::wake('redis'); + + Queue::assertPushed(Watchdog::class, static function (Watchdog $watchdog): bool { + return $watchdog->connection === 'redis' + && $watchdog->delay === null; + }); + $this->assertTrue(Cache::has('workflow:watchdog')); + } + + public function testWakeUsesRequestedConnectionAndQueue(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + + $this->createStalePendingWorkflow(); + + Watchdog::wake('redis', 'high,default'); + + Queue::assertPushed(Watchdog::class, static function (Watchdog $watchdog): bool { + return $watchdog->connection === 'redis' + && $watchdog->queue === 'high'; + }); + } + + public function testWakeLeavesQueueUnsetWhenQueueStringHasNoUsableQueue(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + + $this->createStalePendingWorkflow(); + + Watchdog::wake('redis', ' , '); + + Queue::assertPushed(Watchdog::class, static function (Watchdog $watchdog): bool { + return $watchdog->connection === 'redis' + && $watchdog->queue === null; + }); + } + + public function testWakeSkipsWhenMarkerPresent(): void + { + Queue::fake(); + Cache::put('workflow:watchdog', true, 300); + + $this->createStalePendingWorkflow(); + + Watchdog::wake('redis'); + + Queue::assertNotPushed(Watchdog::class); + } + + public function testWakeSkipsWhenNoRecoverablePendingWorkflowsExist(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + Cache::forget('workflow:watchdog:looping'); + + Watchdog::wake('redis'); + + Queue::assertNotPushed(Watchdog::class); + } + + public function testWakeSkipsWhenAnotherWorkerClaimsMarkerFirst(): void + { + Queue::fake(); + + $this->createStalePendingWorkflow(); + + $modelClass = new class() extends StoredWorkflow { + protected static function booted(): void + { + static::addGlobalScope( + 'mark-watchdog-present', + static function (\Illuminate\Database\Eloquent\Builder $builder): void { + Cache::put('workflow:watchdog', true, Watchdog::DEFAULT_TIMEOUT); + } + ); + } + }; + $modelClassName = get_class($modelClass); + $originalModel = config('workflows.stored_workflow_model'); + + config([ + 'workflows.stored_workflow_model' => $modelClassName, + ]); + + try { + Watchdog::wake('redis'); + } finally { + config([ + 'workflows.stored_workflow_model' => $originalModel, + ]); + } + + Queue::assertNotPushed(Watchdog::class); + } + + public function testWakeWaitsForCommitBeforeDispatching(): void + { + Queue::fake(); + + $this->createStalePendingWorkflow(); + + DB::transaction(function (): void { + Watchdog::wake('redis'); + + Queue::assertNotPushed(Watchdog::class); + $this->assertFalse(Cache::has('workflow:watchdog')); + }); + + Queue::assertPushed(Watchdog::class, static function (Watchdog $watchdog): bool { + return $watchdog->connection === 'redis' + && $watchdog->delay === null; + }); + $this->assertTrue(Cache::has('workflow:watchdog')); + } + + public function testWakeDoesNotSetMarkerOrDispatchOnRollback(): void + { + Queue::fake(); + + $this->createStalePendingWorkflow(); + + try { + DB::transaction(function (): void { + Watchdog::wake('redis'); + + Queue::assertNotPushed(Watchdog::class); + $this->assertFalse(Cache::has('workflow:watchdog')); + + throw new RuntimeException('rollback'); + }); + } catch (RuntimeException $exception) { + $this->assertSame('rollback', $exception->getMessage()); + } + + Queue::assertNotPushed(Watchdog::class); + $this->assertFalse(Cache::has('workflow:watchdog')); + $this->assertFalse(Cache::has('workflow:watchdog:looping')); + } + + public function testWakeClearsMarkerWhenDispatchFails(): void + { + $this->createStalePendingWorkflow(); + + $dispatcher = $this->createMock(Dispatcher::class); + $dispatcher->expects($this->once()) + ->method('dispatch') + ->willThrowException(new RuntimeException('dispatch failed')); + + $this->app->instance(Dispatcher::class, $dispatcher); + + try { + Watchdog::wake('redis'); + $this->fail('Expected dispatch failure to be rethrown.'); + } catch (RuntimeException $exception) { + $this->assertSame('dispatch failed', $exception->getMessage()); + } + + $this->assertFalse(Cache::has('workflow:watchdog')); + $this->assertFalse(Cache::has('workflow:watchdog:looping')); + } + + public function testWakeIsIdempotent(): void + { + Queue::fake(); + Cache::forget('workflow:watchdog'); + Cache::forget('workflow:watchdog:looping'); + + $this->createStalePendingWorkflow(); + + Watchdog::wake('redis'); + Watchdog::wake('redis'); + Watchdog::wake('redis'); + + Queue::assertPushed(Watchdog::class, 1); + } + + public function testHandleTouchesWorkflowBeforeRedispatch(): void + { + Queue::fake(); + + $timeout = Watchdog::DEFAULT_TIMEOUT; + + $storedWorkflow = StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ]); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + $storedWorkflow->refresh(); + $this->assertTrue($storedWorkflow->updated_at->greaterThan(now()->subSeconds(5))); + } + + public function testHandleRecoversPendingWorkflowOnStoredConnectionAndQueue(): void + { + Queue::fake(); + + $timeout = Watchdog::DEFAULT_TIMEOUT; + + $storedWorkflow = StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([ + 'arguments' => [], + 'options' => [ + 'connection' => 'sync', + 'queue' => 'high', + ], + ]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ]); + + Cache::lock('laravel_unique_job:' . TestSimpleWorkflow::class . ':' . $storedWorkflow->id)->get(); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + Queue::assertPushed(TestSimpleWorkflow::class, static function (TestSimpleWorkflow $workflow): bool { + return $workflow->connection === 'sync' + && $workflow->queue === 'high'; + }); + } + + public function testHandleContinuesScanningAfterSkippedWorkflow(): void + { + Queue::fake(); + + $timeout = Watchdog::DEFAULT_TIMEOUT; + + $skippedWorkflow = StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ]); + + StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ]); + + Cache::lock('workflow:watchdog:recovering:' . $skippedWorkflow->id, $timeout) + ->get(); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + Queue::assertPushed(TestSimpleWorkflow::class, 1); + } + + public function testHandleSkipsWorkflowAlreadyClaimedForRecovery(): void + { + Queue::fake(); + + $timeout = Watchdog::DEFAULT_TIMEOUT; + + $storedWorkflow = StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ]); + + Cache::lock('workflow:watchdog:recovering:' . $storedWorkflow->id, $timeout) + ->get(); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + Queue::assertNotPushed(TestSimpleWorkflow::class); + } + + public function testHandleReleasesRecoveryClaimAfterRecoveringWorkflow(): void + { + Queue::fake(); + + $timeout = Watchdog::DEFAULT_TIMEOUT; + + $storedWorkflow = StoredWorkflow::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ]); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + $this->assertTrue(Cache::lock('workflow:watchdog:recovering:' . $storedWorkflow->id, 1)->get()); + } + + public function testHandleSkipsWorkflowThatStopsBeingPendingAfterRefresh(): void + { + Queue::fake(); + + $timeout = Watchdog::DEFAULT_TIMEOUT; + $modelClass = new class() extends StoredWorkflow { + public function refresh(): static + { + $this->status = WorkflowRunningStatus::$name; + + return $this; + } + }; + $modelClassName = get_class($modelClass); + $originalStoredWorkflowModel = config('workflows.stored_workflow_model'); + + try { + config([ + 'workflows.stored_workflow_model' => $modelClassName, + ]); + + $storedWorkflow = $modelClassName::create([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ]); + + $watchdog = new Watchdog(); + $watchdog->handle(); + + $storedWorkflow->refresh(); + + Queue::assertNotPushed(TestSimpleWorkflow::class); + } finally { + config([ + 'workflows.stored_workflow_model' => $originalStoredWorkflowModel, + ]); + } + } + + public function testHandleReleasesCurrentJobWhenRunningOnQueue(): void + { + Queue::fake(); + + $timeout = Watchdog::DEFAULT_TIMEOUT; + $job = $this->createMock(JobContract::class); + $job->expects($this->once()) + ->method('release') + ->with($timeout); + + $watchdog = new Watchdog(); + $watchdog->setJob($job); + $watchdog->handle(); + } + + private function createStalePendingWorkflow(array $attributes = []): StoredWorkflow + { + $timeout = Watchdog::DEFAULT_TIMEOUT; + + return StoredWorkflow::create(array_merge([ + 'class' => TestSimpleWorkflow::class, + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + 'updated_at' => now() + ->subSeconds($timeout + 1), + ], $attributes)); + } +} diff --git a/tests/Unit/WorkflowTest.php b/tests/Unit/WorkflowTest.php index 6314ddcc..96b2e247 100644 --- a/tests/Unit/WorkflowTest.php +++ b/tests/Unit/WorkflowTest.php @@ -5,6 +5,7 @@ namespace Tests\Unit; use BadMethodCallException; +use Illuminate\Contracts\Queue\Job as JobContract; use Illuminate\Support\Carbon; use Illuminate\Support\Facades\Event; use Mockery; @@ -27,6 +28,8 @@ use Workflow\States\WorkflowContinuedStatus; use Workflow\States\WorkflowFailedStatus; use Workflow\States\WorkflowPendingStatus; +use Workflow\States\WorkflowRunningStatus; +use Workflow\States\WorkflowWaitingStatus; use Workflow\Workflow; use Workflow\WorkflowStub; @@ -470,4 +473,49 @@ public function testContinueAsNewCarriesWorkflowOptions(): void $this->assertSame('sync', $continuedWorkflow->workflowOptions()->connection); $this->assertSame('default', $continuedWorkflow->workflowOptions()->queue); } + + public function testRedeliveredJobResumesFromRunningState(): void + { + $stub = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($stub->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowRunningStatus::$name, + ]); + + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => now(), + 'class' => TestOtherActivity::class, + 'result' => Serializer::serialize('other'), + ]); + + $workflow = new TestWorkflow($storedWorkflow); + $workflow->cancel(); + $workflow->handle(); + + $this->assertSame(WorkflowWaitingStatus::class, $stub->status()); + } + + public function testWaitingWorkflowRedeliveryReleasesForRetry(): void + { + $stub = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($stub->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowWaitingStatus::$name, + ]); + + $job = Mockery::mock(JobContract::class); + $job->shouldReceive('release') + ->once() + ->with(0); + + $workflow = new TestWorkflow($storedWorkflow); + $workflow->setJob($job); + $workflow->handle(); + + $this->assertSame(WorkflowWaitingStatus::class, $stub->status()); + } }