Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
"scripts": {
"ecs": "vendor/bin/ecs check --fix",
"stan": "vendor/bin/phpstan analyse src tests",
"feature": "phpunit --testdox --testsuite feature",
"feature": [
"Composer\\Config::disableProcessTimeout",
"phpunit --testdox --testsuite feature"
],
"unit": "phpunit --testdox --testsuite unit",
"test": "phpunit --testdox",
"coverage": "XDEBUG_MODE=coverage phpunit --testdox --testsuite unit --coverage-clover coverage.xml",
Expand Down
7 changes: 7 additions & 0 deletions src/Providers/WorkflowServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

namespace Workflow\Providers;

use Illuminate\Queue\Events\Looping;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\ServiceProvider;
use Laravel\SerializableClosure\SerializableClosure;
use Workflow\Commands\ActivityMakeCommand;
use Workflow\Commands\WorkflowMakeCommand;
use Workflow\Watchdog;

final class WorkflowServiceProvider extends ServiceProvider
{
Expand All @@ -24,5 +27,9 @@ public function boot(): void
], 'migrations');

$this->commands([ActivityMakeCommand::class, WorkflowMakeCommand::class]);

Event::listen(Looping::class, static function (Looping $event): void {
Watchdog::wake($event->connectionName, $event->queue);
});
}
}
161 changes: 161 additions & 0 deletions src/Watchdog.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
<?php

declare(strict_types=1);

namespace Workflow;

use Illuminate\Bus\Queueable;
use Illuminate\Bus\UniqueLock;
use Illuminate\Contracts\Bus\Dispatcher;
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Support\Carbon;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\DB;
use Workflow\Models\StoredWorkflow;
use Workflow\States\WorkflowPendingStatus;

class Watchdog implements ShouldBeEncrypted, ShouldQueue
{
use InteractsWithQueue;
use Queueable;

public const DEFAULT_TIMEOUT = 300;

private const CACHE_KEY = 'workflow:watchdog';

private const LOOP_THROTTLE_KEY = 'workflow:watchdog:looping';

private const RECOVERY_LOCK_PREFIX = 'workflow:watchdog:recovering:';

public int $tries = 0;

public int $maxExceptions = 0;

public $timeout = 0;

public static function wake(string $connection, ?string $queue = null): void
{
$timeout = self::timeout();

$queue = self::normalizeQueue($queue);

DB::afterCommit(static function () use ($connection, $queue, $timeout): void {
if (Cache::has(self::CACHE_KEY)) {
return;
}

if (! Cache::add(self::LOOP_THROTTLE_KEY, true, 60)) {
return;
}

if (! self::hasRecoverablePendingWorkflows($timeout)) {
return;
}

if (! Cache::add(self::CACHE_KEY, true, $timeout)) {
return;
}

$watchdog = (new self())
->onConnection($connection);

if ($queue !== null) {
$watchdog->onQueue($queue);
}

try {
app(Dispatcher::class)->dispatch($watchdog);
} catch (\Throwable $exception) {
Cache::forget(self::CACHE_KEY);
Cache::forget(self::LOOP_THROTTLE_KEY);

throw $exception;
}
});
}

public function handle(): void
{
$timeout = self::timeout();

Cache::put(self::CACHE_KEY, true, $timeout);

$model = config('workflows.stored_workflow_model', StoredWorkflow::class);

$model::where('status', WorkflowPendingStatus::$name)
->where('updated_at', '<=', Carbon::now()->subSeconds($timeout))
->whereNotNull('arguments')
->each(static function (StoredWorkflow $storedWorkflow) use ($timeout): void {
self::recover($storedWorkflow, $timeout);
});

if ($this->job !== null) {
$this->release($timeout);
}
}

private static function recover(StoredWorkflow $storedWorkflow, int $timeout): bool
{
$claimTtl = self::bootstrapWindow($timeout);

return (bool) (Cache::lock(self::RECOVERY_LOCK_PREFIX . $storedWorkflow->id, $claimTtl)
->get(static function () use ($storedWorkflow): bool {
$storedWorkflow->refresh();

if ($storedWorkflow->status::class !== WorkflowPendingStatus::class) {
return false;
}

$workflowStub = $storedWorkflow->toWorkflow();
$workflowClass = $storedWorkflow->class;
$workflowJob = new $workflowClass($storedWorkflow, ...$storedWorkflow->workflowArguments());

$storedWorkflow->touch();

(new UniqueLock(Cache::driver()))->release($workflowJob);

$workflowStub->resume();

return true;
}) ?? false);
}

private static function timeout(): int
{
return self::DEFAULT_TIMEOUT;
}

private static function hasRecoverablePendingWorkflows(int $timeout): bool
{
$model = config('workflows.stored_workflow_model', StoredWorkflow::class);

return $model::where('status', WorkflowPendingStatus::$name)
->where('updated_at', '<=', Carbon::now()->subSeconds($timeout))
->whereNotNull('arguments')
->exists();
}

private static function bootstrapWindow(int $timeout): int
{
return max(1, min($timeout, 60));
}

private static function normalizeQueue(?string $queue): ?string
{
if ($queue === null) {
return null;
}

foreach (explode(',', $queue) as $candidate) {
$candidate = trim($candidate);

if ($candidate !== '') {
return $candidate;
}
}

return null;
}
}
10 changes: 7 additions & 3 deletions src/Workflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,14 @@ public function handle(): void
$this->storedWorkflow->status->transitionTo(WorkflowRunningStatus::class);
}
} catch (TransitionNotFound) {
if ($this->storedWorkflow->toWorkflow()->running()) {
$this->release();
$this->storedWorkflow->refresh();

if ($this->storedWorkflow->status::class !== WorkflowRunningStatus::class) {
if ($this->storedWorkflow->toWorkflow()->running()) {
$this->release();
}
return;
}
return;
}

$parentWorkflow = $this->storedWorkflow->parents()
Expand Down
41 changes: 41 additions & 0 deletions tests/Unit/Middleware/WithoutOverlappingMiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,47 @@ public function testUnlockActivityAppliesTtlWhenExpiresAfterIsConfigured(): void
$this->assertTrue($result);
}

public function testUnlockActivityAppliesTtlWhenOtherActivitiesRemain(): void
{
$job = new \stdClass();
$job->key = 'test-activity-key';

$remainingKey = 'other-activity-key';

$lock = $this->mock(Lock::class, static function (MockInterface $mock) {
$mock->shouldReceive('get')
->once()
->andReturn(true);
$mock->shouldReceive('release')
->once();
});

$this->mock(Repository::class, static function (MockInterface $mock) use ($job, $lock, $remainingKey) {
$mock->shouldReceive('lock')
->once()
->andReturn($lock);
$mock->shouldReceive('get')
->with('laravel-workflow-overlap:1:activity', [])
->andReturn([$job->key, $remainingKey]);
$mock->shouldReceive('put')
->with('laravel-workflow-overlap:1:activity', [$remainingKey], 60)
->once();
$mock->shouldReceive('forget')
->with($job->key)
->once();
$mock->shouldReceive('has')
->with($remainingKey)
->once()
->andReturn(false);
});

$middleware = new WithoutOverlappingMiddleware(1, WithoutOverlappingMiddleware::ACTIVITY, 0, 60);

$result = $middleware->unlock($job);

$this->assertTrue($result);
}

public function testUnlockActivityRetriesOnLockFailure(): void
{
$job = new \stdClass();
Expand Down
92 changes: 92 additions & 0 deletions tests/Unit/Providers/WorkflowServiceProviderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,39 @@

namespace Tests\Unit;

use Illuminate\Queue\Events\Looping;
use Illuminate\Support\Facades\Artisan;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Queue;
use Tests\Fixtures\TestSimpleWorkflow;
use Tests\TestCase;
use Workflow\Models\StoredWorkflow;
use Workflow\Providers\WorkflowServiceProvider;
use Workflow\Serializers\Serializer;
use Workflow\States\WorkflowPendingStatus;
use Workflow\Watchdog;

final class WorkflowServiceProviderTest extends TestCase
{
protected function setUp(): void
{
parent::setUp();

Cache::forget('workflow:watchdog');
Cache::forget('workflow:watchdog:looping');

$this->app->register(WorkflowServiceProvider::class);
}

protected function tearDown(): void
{
Cache::forget('workflow:watchdog');
Cache::forget('workflow:watchdog:looping');

parent::tearDown();
}

public function testProviderLoads(): void
{
$this->assertTrue(
Expand Down Expand Up @@ -56,4 +77,75 @@ public function testCommandsAreRegistered(): void
);
}
}

public function testLoopingEventWakesWatchdog(): void
{
Queue::fake();
Cache::forget('workflow:watchdog');

StoredWorkflow::create([
'class' => TestSimpleWorkflow::class,
'arguments' => Serializer::serialize([]),
'status' => WorkflowPendingStatus::$name,
'updated_at' => now()
->subSeconds(Watchdog::DEFAULT_TIMEOUT + 1),
]);

Event::dispatch(new Looping('redis', 'high,default'));

Queue::assertPushed(Watchdog::class, static function (Watchdog $watchdog): bool {
return $watchdog->connection === 'redis'
&& $watchdog->queue === 'high';
});
}

public function testLoopingEventThrottlesWake(): void
{
Queue::fake();
Cache::forget('workflow:watchdog');

StoredWorkflow::create([
'class' => TestSimpleWorkflow::class,
'arguments' => Serializer::serialize([]),
'status' => WorkflowPendingStatus::$name,
'updated_at' => now()
->subSeconds(Watchdog::DEFAULT_TIMEOUT + 1),
]);

Event::dispatch(new Looping('redis', 'high,default'));
Event::dispatch(new Looping('redis', 'high,default'));
Event::dispatch(new Looping('redis', 'high,default'));

Queue::assertPushed(Watchdog::class, 1);
}

public function testLoopingEventSkipsWhenThrottleAlreadyHeld(): void
{
Queue::fake();
Cache::forget('workflow:watchdog');
Cache::put('workflow:watchdog:looping', true, 60);

StoredWorkflow::create([
'class' => TestSimpleWorkflow::class,
'arguments' => Serializer::serialize([]),
'status' => WorkflowPendingStatus::$name,
'updated_at' => now()
->subSeconds(Watchdog::DEFAULT_TIMEOUT + 1),
]);

Event::dispatch(new Looping('redis', 'high,default'));

Queue::assertNotPushed(Watchdog::class);
}

public function testLoopingEventSkipsWhenNoRecoverablePendingWorkflowsExist(): void
{
Queue::fake();
Cache::forget('workflow:watchdog');
Cache::forget('workflow:watchdog:looping');

Event::dispatch(new Looping('redis', 'high,default'));

Queue::assertNotPushed(Watchdog::class);
}
}
Loading
Loading