diff --git a/src/Activity.php b/src/Activity.php index a26508a..44e116b 100644 --- a/src/Activity.php +++ b/src/Activity.php @@ -170,7 +170,8 @@ public function failed(Throwable $throwable): void $this->storedWorkflow, $throwable, $workflow->connection(), - $workflow->queue() + $workflow->queue(), + $this::class ); } diff --git a/src/ActivityStub.php b/src/ActivityStub.php index ed8f2ca..56caaa6 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -51,9 +51,24 @@ public static function make($activity, ...$arguments): PromiseInterface } if ($log) { + if ( + WorkflowStub::isProbing() + && WorkflowStub::probeIndex() === $context->index + && ( + WorkflowStub::probeClass() === null + || WorkflowStub::probeClass() === $activity + ) + && $log->class === Exception::class + ) { + WorkflowStub::markProbeMatched(); + } + ++$context->index; WorkflowStub::setContext($context); $result = Serializer::unserialize($log->result); + if ($log->class === Exception::class && self::isForeignExceptionResult($result, $activity)) { + return self::make($activity, ...$arguments); + } if ( is_array($result) && array_key_exists('class', $result) && @@ -74,11 +89,24 @@ public static function make($activity, ...$arguments): PromiseInterface return resolve($result); } + if (WorkflowStub::isProbing()) { + ++$context->index; + WorkflowStub::setContext($context); + return (new Deferred())->promise(); + } + $activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments); ++$context->index; WorkflowStub::setContext($context); - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); + } + + private static function isForeignExceptionResult(mixed $result, string $activity): bool + { + return is_array($result) + && isset($result['sourceClass']) + && is_string($result['sourceClass']) + && $result['sourceClass'] !== $activity; } } diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 896237a..03a23ff 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -45,9 +45,24 @@ public static function make($workflow, ...$arguments): PromiseInterface } if ($log) { + if ( + WorkflowStub::isProbing() + && WorkflowStub::probeIndex() === $context->index + && ( + WorkflowStub::probeClass() === null + || WorkflowStub::probeClass() === $workflow + ) + && $log->class === Exception::class + ) { + WorkflowStub::markProbeMatched(); + } + ++$context->index; WorkflowStub::setContext($context); $result = Serializer::unserialize($log->result); + if ($log->class === Exception::class && self::isForeignExceptionResult($result, $workflow)) { + return self::make($workflow, ...$arguments); + } if ( is_array($result) && array_key_exists('class', $result) @@ -67,6 +82,12 @@ public static function make($workflow, ...$arguments): PromiseInterface return resolve($result); } + if (WorkflowStub::isProbing()) { + ++$context->index; + WorkflowStub::setContext($context); + return (new Deferred())->promise(); + } + if (! $context->replaying) { $storedChildWorkflow = $context->storedWorkflow->children() ->wherePivot('parent_index', $context->index) @@ -94,7 +115,14 @@ public static function make($workflow, ...$arguments): PromiseInterface ++$context->index; WorkflowStub::setContext($context); - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); + } + + private static function isForeignExceptionResult(mixed $result, string $workflow): bool + { + return is_array($result) + && isset($result['sourceClass']) + && is_string($result['sourceClass']) + && $result['sourceClass'] !== $workflow; } } diff --git a/src/Exception.php b/src/Exception.php index de177e5..fca28d0 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -10,9 +10,12 @@ use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; +use Illuminate\Support\Facades\Cache; +use Throwable; use Workflow\Exceptions\TransitionNotFound; -use Workflow\Middleware\WithoutOverlappingMiddleware; use Workflow\Models\StoredWorkflow; +use Workflow\Models\StoredWorkflowLog; +use Workflow\Serializers\Serializer; final class Exception implements ShouldBeEncrypted, ShouldQueue { @@ -35,7 +38,8 @@ public function __construct( public StoredWorkflow $storedWorkflow, public $exception, $connection = null, - $queue = null + $queue = null, + public ?string $sourceClass = null ) { $connection = $connection ?? $this->storedWorkflow->effectiveConnection() ?? config('queue.default'); $queue = $queue ?? $this->storedWorkflow->effectiveQueue() ?? config( @@ -48,30 +52,117 @@ public function __construct( public function handle() { - $workflow = $this->storedWorkflow->toWorkflow(); + $lock = Cache::lock('laravel-workflow-exception:' . $this->storedWorkflow->id, 15); + + if (! $lock->get()) { + $this->release(); + + return; + } try { - if ($this->storedWorkflow->hasLogByIndex($this->index)) { - $workflow->resume(); - } elseif (! $this->storedWorkflow->logs()->where('class', self::class)->exists()) { - $workflow->next($this->index, $this->now, self::class, $this->exception); - } - } catch (TransitionNotFound) { - if ($workflow->running()) { - $this->release(); + $workflow = $this->storedWorkflow->toWorkflow(); + + try { + if ($this->storedWorkflow->hasLogByIndex($this->index)) { + $workflow->resume(); + } elseif ($this->shouldPersistAfterProbeReplay()) { + $workflow->next($this->index, $this->now, self::class, $this->exceptionPayload()); + } + } catch (TransitionNotFound) { + if ($workflow->running()) { + $this->release(); + } } + } finally { + $lock->release(); } } public function middleware() { - return [ - new WithoutOverlappingMiddleware( - $this->storedWorkflow->id, - WithoutOverlappingMiddleware::ACTIVITY, - 0, - 15 - ), - ]; + return []; + } + + private function shouldPersistAfterProbeReplay(): bool + { + $workflowClass = $this->storedWorkflow->class; + + if (! is_string($workflowClass) || $workflowClass === '') { + return true; + } + + $previousContext = WorkflowStub::getContext(); + $shouldPersist = false; + + try { + $tentativeWorkflow = $this->createTentativeWorkflowState(); + $workflow = new $workflowClass($tentativeWorkflow, ...$tentativeWorkflow->workflowArguments()); + $workflow->replaying = true; + + WorkflowStub::setContext([ + 'storedWorkflow' => $tentativeWorkflow, + 'index' => 0, + 'now' => $this->now, + 'replaying' => true, + 'probing' => true, + 'probeIndex' => $this->index, + 'probeClass' => $this->sourceClass, + 'probeMatched' => false, + ]); + + try { + $workflow->handle(); + } catch (Throwable) { + // The replay path may still throw; we only care whether it matched this tentative log. + } + + $shouldPersist = WorkflowStub::probeMatched(); + } finally { + WorkflowStub::setContext($previousContext); + } + + return $shouldPersist; + } + + private function createTentativeWorkflowState(): StoredWorkflow + { + $storedWorkflowClass = $this->storedWorkflow::class; + + /** @var StoredWorkflow $tentativeWorkflow */ + $tentativeWorkflow = $storedWorkflowClass::query() + ->findOrFail($this->storedWorkflow->id); + + $tentativeWorkflow->loadMissing(['logs', 'signals']); + + /** @var StoredWorkflowLog $tentativeLog */ + $tentativeLog = $tentativeWorkflow->logs() + ->make([ + 'index' => $this->index, + 'now' => $this->now, + 'class' => self::class, + 'result' => Serializer::serialize($this->exceptionPayload()), + ]); + + $tentativeWorkflow->setRelation( + 'logs', + $tentativeWorkflow->getRelation('logs') + ->push($tentativeLog) + ->sortBy(static fn ($log): string => sprintf('%020d:%020d', $log->index, $log->id ?? PHP_INT_MAX)) + ->values() + ); + + return $tentativeWorkflow; + } + + private function exceptionPayload() + { + if (! is_array($this->exception) || $this->sourceClass === null) { + return $this->exception; + } + + return array_merge($this->exception, [ + 'sourceClass' => $this->sourceClass, + ]); } } diff --git a/src/Traits/Awaits.php b/src/Traits/Awaits.php index 3bd5ae1..0ba8fae 100644 --- a/src/Traits/Awaits.php +++ b/src/Traits/Awaits.php @@ -22,6 +22,11 @@ public static function await($condition): PromiseInterface return resolve(Serializer::unserialize($log->result)); } + if (self::isProbing()) { + ++self::$context->index; + return (new Deferred())->promise(); + } + $result = $condition(); if ($result === true) { @@ -49,7 +54,6 @@ public static function await($condition): PromiseInterface } ++self::$context->index; - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } } diff --git a/src/Traits/SideEffects.php b/src/Traits/SideEffects.php index e4a4db0..72dd5d7 100644 --- a/src/Traits/SideEffects.php +++ b/src/Traits/SideEffects.php @@ -5,6 +5,7 @@ namespace Workflow\Traits; use Illuminate\Database\QueryException; +use React\Promise\Deferred; use React\Promise\PromiseInterface; use function React\Promise\resolve; use Workflow\Serializers\Serializer; @@ -20,6 +21,11 @@ public static function sideEffect($callable): PromiseInterface return resolve(Serializer::unserialize($log->result)); } + if (self::isProbing()) { + ++self::$context->index; + return (new Deferred())->promise(); + } + $result = $callable(); if (! self::$context->replaying) { diff --git a/src/Traits/Timers.php b/src/Traits/Timers.php index 538f950..0be2981 100644 --- a/src/Traits/Timers.php +++ b/src/Traits/Timers.php @@ -41,6 +41,11 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa $when = self::$context->now->copy() ->addSeconds($seconds); + if (self::isProbing()) { + ++self::$context->index; + return (new Deferred())->promise(); + } + if (! self::$context->replaying) { $timer = self::$context->storedWorkflow->createTimer([ 'index' => self::$context->index, @@ -48,8 +53,7 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa ]); } else { ++self::$context->index; - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } } @@ -95,7 +99,6 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa } ++self::$context->index; - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } } diff --git a/src/Traits/Versions.php b/src/Traits/Versions.php index d8fd011..7d841fa 100644 --- a/src/Traits/Versions.php +++ b/src/Traits/Versions.php @@ -5,6 +5,7 @@ namespace Workflow\Traits; use Illuminate\Database\QueryException; +use React\Promise\Deferred; use React\Promise\PromiseInterface; use function React\Promise\resolve; use Workflow\Exceptions\VersionNotSupportedException; @@ -33,6 +34,11 @@ public static function getVersion( return resolve($version); } + if (self::isProbing()) { + ++self::$context->index; + return (new Deferred())->promise(); + } + $version = $maxSupported; if (! self::$context->replaying) { diff --git a/src/Workflow.php b/src/Workflow.php index 20b80b6..70e5957 100644 --- a/src/Workflow.php +++ b/src/Workflow.php @@ -209,7 +209,7 @@ public function handle(): void $this->now = $log ? $log->now : Carbon::now(); } - WorkflowStub::setContext([ + $this->setContext([ 'storedWorkflow' => $this->storedWorkflow, 'index' => $this->index, 'now' => $this->now, @@ -229,7 +229,7 @@ public function handle(): void $this->now = $log ? $log->now : Carbon::now(); - WorkflowStub::setContext([ + $this->setContext([ 'storedWorkflow' => $this->storedWorkflow, 'index' => $this->index, 'now' => $this->now, @@ -309,4 +309,18 @@ public function handle(): void } } } + + private function setContext(array $context): void + { + $existingContext = WorkflowStub::getContext(); + + if (property_exists($existingContext, 'probing') && $existingContext->probing) { + $context['probing'] = true; + $context['probeIndex'] = $existingContext->probeIndex ?? null; + $context['probeClass'] = $existingContext->probeClass ?? null; + $context['probeMatched'] = $existingContext->probeMatched ?? false; + } + + WorkflowStub::setContext($context); + } } diff --git a/src/WorkflowStub.php b/src/WorkflowStub.php index 5aabead..095e294 100644 --- a/src/WorkflowStub.php +++ b/src/WorkflowStub.php @@ -157,6 +157,10 @@ public static function fromStoredWorkflow(StoredWorkflow $storedWorkflow): stati public static function getContext(): \stdClass { + if (self::$context === null) { + self::$context = new \stdClass(); + } + return self::$context; } @@ -170,6 +174,35 @@ public static function now() return self::getContext()->now; } + public static function isProbing(): bool + { + return (bool) (self::getContext()->probing ?? false); + } + + public static function probeIndex(): ?int + { + return self::getContext()->probeIndex ?? null; + } + + public static function probeClass(): ?string + { + return self::getContext()->probeClass ?? null; + } + + public static function markProbeMatched(): void + { + if (! self::isProbing()) { + return; + } + + self::$context->probeMatched = true; + } + + public static function probeMatched(): bool + { + return (bool) (self::getContext()->probeMatched ?? false); + } + public function id() { return $this->storedWorkflow->id; @@ -287,7 +320,7 @@ public function fail($exception): void ->format('Y-m-d\TH:i:s.u\Z')); $this->storedWorkflow->parents() - ->each(static function ($parentWorkflow) use ($exception) { + ->each(function ($parentWorkflow) use ($exception) { if ( $parentWorkflow->pivot->parent_index === StoredWorkflow::CONTINUE_PARENT_INDEX || $parentWorkflow->pivot->parent_index === StoredWorkflow::ACTIVE_WORKFLOW_INDEX @@ -324,7 +357,8 @@ public function fail($exception): void $parentWorkflow, $throwable, $parentWf->connection(), - $parentWf->queue() + $parentWf->queue(), + $this->storedWorkflow->class ); }); } diff --git a/tests/Feature/ExceptionLoggingReplayTest.php b/tests/Feature/ExceptionLoggingReplayTest.php new file mode 100644 index 0000000..4c2dcc3 --- /dev/null +++ b/tests/Feature/ExceptionLoggingReplayTest.php @@ -0,0 +1,88 @@ +start(); + + sleep(1); + $workflow->requestRetry(); + + sleep(1); + $workflow->requestRetry(); + + while ($workflow->running()); + + $classes = $workflow->logs() + ->pluck('class') + ->all(); + + $this->assertSame(WorkflowCompletedStatus::class, $workflow->status()); + $this->assertSame('success', $workflow->output()); + $this->assertSame([ + Exception::class, + Signal::class, + Exception::class, + Signal::class, + TestProbeRetryActivity::class, + ], $classes); + } + + public function testBackToBackCaughtExceptionsEachPersist(): void + { + $workflow = WorkflowStub::make(TestProbeBackToBackWorkflow::class); + + $workflow->start(); + + while ($workflow->running()); + + $classes = $workflow->logs() + ->pluck('class') + ->all(); + + $this->assertSame(WorkflowCompletedStatus::class, $workflow->status()); + $this->assertSame('caught second: second failure', $workflow->output()); + $this->assertSame([Exception::class, Exception::class], $classes); + } + + public function testParallelChildFailuresStillDeduplicateToOneParentException(): void + { + $workflow = WorkflowStub::make(TestProbeChildFailureParentWorkflow::class); + + $workflow->start(); + + while ($workflow->running()); + + $classes = $workflow->logs() + ->pluck('class') + ->all(); + + $this->assertSame(WorkflowCompletedStatus::class, $workflow->status()); + $this->assertSame('caught: child failed: child-1', $workflow->output()); + $this->assertSame([ + TestProbeChildFailureParentStepActivity::class, + Exception::class, + TestProbeChildFailureCompensationActivity::class, + ], $classes); + $this->assertSame(1, $workflow->logs()->where('class', Exception::class)->count()); + } +} diff --git a/tests/Fixtures/TestProbeBackToBackWorkflow.php b/tests/Fixtures/TestProbeBackToBackWorkflow.php new file mode 100644 index 0000000..06396d6 --- /dev/null +++ b/tests/Fixtures/TestProbeBackToBackWorkflow.php @@ -0,0 +1,28 @@ +getMessage(); + } + + return 'unexpected-success'; + } +} diff --git a/tests/Fixtures/TestProbeChildFailureCompensationActivity.php b/tests/Fixtures/TestProbeChildFailureCompensationActivity.php new file mode 100644 index 0000000..d2ea956 --- /dev/null +++ b/tests/Fixtures/TestProbeChildFailureCompensationActivity.php @@ -0,0 +1,15 @@ +addCompensation(static fn () => activity(TestProbeChildFailureCompensationActivity::class)); + + yield all([ + child(TestProbeChildFailureWorkflow::class, 'child-1'), + child(TestProbeChildFailureWorkflow::class, 'child-2'), + child(TestProbeChildFailureWorkflow::class, 'child-3'), + ]); + + return 'unexpected-success'; + } catch (Throwable $throwable) { + yield from $this->compensate(); + + return 'caught: ' . $throwable->getMessage(); + } + } +} diff --git a/tests/Fixtures/TestProbeChildFailureWorkflow.php b/tests/Fixtures/TestProbeChildFailureWorkflow.php new file mode 100644 index 0000000..ca36619 --- /dev/null +++ b/tests/Fixtures/TestProbeChildFailureWorkflow.php @@ -0,0 +1,16 @@ + throw new RuntimeException('first failure'), + 2 => throw new InvalidArgumentException('second failure'), + default => 'success', + }; + } +} diff --git a/tests/Fixtures/TestProbeRetryWorkflow.php b/tests/Fixtures/TestProbeRetryWorkflow.php new file mode 100644 index 0000000..ec80b0c --- /dev/null +++ b/tests/Fixtures/TestProbeRetryWorkflow.php @@ -0,0 +1,41 @@ +inbox->receive('retry'); + } + + public function execute() + { + $attempt = 0; + + while (true) { + try { + ++$attempt; + + return yield activity(TestProbeRetryActivity::class, $attempt); + } catch (Throwable $throwable) { + if ($attempt >= 3) { + throw $throwable; + } + + yield await(fn (): bool => $this->inbox->hasUnread()); + + $this->inbox->nextUnread(); + } + } + } +} diff --git a/tests/TestCase.php b/tests/TestCase.php index de3b4bf..37f2819 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -29,17 +29,7 @@ public static function setUpBeforeClass(): void } } - $redisHost = getenv('REDIS_HOST') ?: ($_ENV['REDIS_HOST'] ?? null); - $redisPort = getenv('REDIS_PORT') ?: ($_ENV['REDIS_PORT'] ?? 6379); - if ($redisHost && class_exists(\Redis::class)) { - try { - $redis = new \Redis(); - $redis->connect($redisHost, (int) $redisPort); - $redis->flushDB(); - } catch (\Throwable $e) { - // Ignore if no redis - } - } + self::flushRedis(); for ($i = 0; $i < self::NUMBER_OF_WORKERS; $i++) { self::$workers[$i] = new Process(['php', __DIR__ . '/../vendor/bin/testbench', 'queue:work']); @@ -53,6 +43,10 @@ public static function tearDownAfterClass(): void foreach (self::$workers as $worker) { $worker->stop(); } + + self::$workers = []; + + self::flushRedis(); } protected function setUp(): void @@ -67,17 +61,7 @@ protected function setUp(): void Cache::flush(); - $redisHost = getenv('REDIS_HOST') ?: ($_ENV['REDIS_HOST'] ?? null); - $redisPort = getenv('REDIS_PORT') ?: ($_ENV['REDIS_PORT'] ?? 6379); - if ($redisHost && class_exists(\Redis::class)) { - try { - $redis = new \Redis(); - $redis->connect($redisHost, (int) $redisPort); - $redis->flushDB(); - } catch (\Throwable $e) { - // Ignore if no redis - } - } + self::flushRedis(); } protected function defineDatabaseMigrations() @@ -94,4 +78,19 @@ protected function getPackageProviders($app) { return [\Workflow\Providers\WorkflowServiceProvider::class]; } + + private static function flushRedis(): void + { + $redisHost = getenv('REDIS_HOST') ?: ($_ENV['REDIS_HOST'] ?? null); + $redisPort = getenv('REDIS_PORT') ?: ($_ENV['REDIS_PORT'] ?? 6379); + if ($redisHost && class_exists(\Redis::class)) { + try { + $redis = new \Redis(); + $redis->connect($redisHost, (int) $redisPort); + $redis->flushDB(); + } catch (\Throwable $e) { + // Ignore if no redis + } + } + } } diff --git a/tests/Unit/ActivityStubTest.php b/tests/Unit/ActivityStubTest.php index af6cee9..6bd0090 100644 --- a/tests/Unit/ActivityStubTest.php +++ b/tests/Unit/ActivityStubTest.php @@ -7,9 +7,11 @@ use Exception; use RuntimeException; use Tests\Fixtures\TestActivity; +use Tests\Fixtures\TestOtherActivity; use Tests\Fixtures\TestWorkflow; use Tests\TestCase; use Workflow\ActivityStub; +use Workflow\Exception as WorkflowException; use Workflow\Models\StoredWorkflow; use Workflow\Serializers\Serializer; use Workflow\States\WorkflowPendingStatus; @@ -123,6 +125,43 @@ public function testLoadsStoredExceptionWithNonStandardConstructor(): void }); } + public function testSkipsStoredExceptionForDifferentSourceClass(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign', + 'code' => 0, + 'sourceClass' => TestOtherActivity::class, + ]), + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 1, + 'now' => WorkflowStub::now(), + 'class' => TestActivity::class, + 'result' => Serializer::serialize('test'), + ]); + + ActivityStub::make(TestActivity::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertSame('test', $result); + $this->assertSame(2, WorkflowStub::getContext()->index); + } + public function testAll(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); diff --git a/tests/Unit/ChildWorkflowStubTest.php b/tests/Unit/ChildWorkflowStubTest.php index f972356..2ab1a9f 100644 --- a/tests/Unit/ChildWorkflowStubTest.php +++ b/tests/Unit/ChildWorkflowStubTest.php @@ -4,11 +4,14 @@ namespace Tests\Unit; +use Exception; use Mockery; use Tests\Fixtures\TestChildWorkflow; +use Tests\Fixtures\TestExceptionWorkflow; use Tests\Fixtures\TestParentWorkflow; use Tests\TestCase; use Workflow\ChildWorkflowStub; +use Workflow\Exception as WorkflowException; use Workflow\Models\StoredWorkflow; use Workflow\Serializers\Serializer; use Workflow\States\WorkflowPendingStatus; @@ -91,6 +94,114 @@ public function testLoadsChildWorkflow(): void $this->assertNull($result); } + public function testSkipsStoredExceptionForDifferentSourceClass(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestParentWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign child', + 'code' => 0, + 'sourceClass' => TestExceptionWorkflow::class, + ]), + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 1, + 'now' => WorkflowStub::now(), + 'class' => TestChildWorkflow::class, + 'result' => Serializer::serialize('test'), + ]); + + ChildWorkflowStub::make(TestChildWorkflow::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertSame('test', $result); + $this->assertSame(2, WorkflowStub::getContext()->index); + } + + public function testMarksProbeMatchedForMatchingStoredException(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestParentWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'matching child failure', + 'code' => 0, + ]), + ]); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + 'probeIndex' => 0, + 'probeClass' => TestChildWorkflow::class, + 'probeMatched' => false, + ]); + + try { + ChildWorkflowStub::make(TestChildWorkflow::class); + $this->fail('Expected child exception to be thrown.'); + } catch (Exception $exception) { + $this->assertSame('matching child failure', $exception->getMessage()); + } + + $this->assertTrue(WorkflowStub::probeMatched()); + } + + public function testReturnsUnresolvedPromiseWhenProbingWithoutStoredChildWorkflow(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestParentWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $result = null; + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + 'probeIndex' => 0, + 'probeClass' => TestChildWorkflow::class, + 'probeMatched' => false, + ]); + + ChildWorkflowStub::make(TestChildWorkflow::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertNull($result); + $this->assertSame(1, WorkflowStub::getContext()->index); + } + public function testDoesNotResumeRunningStartedChildWorkflow(): void { $childWorkflow = Mockery::mock(); diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index 1f6cf3c..a2fa329 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -4,6 +4,20 @@ namespace Tests\Unit; +use Exception as BaseException; +use Illuminate\Contracts\Queue\Job as JobContract; +use Illuminate\Support\Facades\Cache; +use InvalidArgumentException; +use Mockery; +use ReflectionMethod; +use RuntimeException; +use Tests\Fixtures\TestActivity; +use Tests\Fixtures\TestProbeBackToBackWorkflow; +use Tests\Fixtures\TestProbeChildFailureWorkflow; +use Tests\Fixtures\TestProbeParallelChildWorkflow; +use Tests\Fixtures\TestProbeRetryActivity; +use Tests\Fixtures\TestSagaActivity; +use Tests\Fixtures\TestSagaParallelActivityWorkflow; use Tests\Fixtures\TestWorkflow; use Tests\TestCase; use Workflow\Exception; @@ -17,7 +31,7 @@ final class ExceptionTest extends TestCase { public function testMiddleware(): void { - $exception = new Exception(0, now()->toDateTimeString(), new StoredWorkflow(), new \Exception( + $exception = new Exception(0, now()->toDateTimeString(), new StoredWorkflow(), new BaseException( 'Test exception' )); @@ -38,16 +52,108 @@ public function testExceptionWorkflowRunning(): void 'status' => WorkflowRunningStatus::$name, ]); - $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, new \Exception('Test exception')); + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, new BaseException('Test exception')); $exception->handle(); $this->assertSame(WorkflowRunningStatus::class, $workflow->status()); } - public function testSkipsWriteWhenSiblingExceptionLogExists(): void + public function testHandleResumesWorkflowWhenLogAlreadyExists(): void + { + $lock = Mockery::mock(); + $lock->shouldReceive('get') + ->once() + ->andReturn(true); + $lock->shouldReceive('release') + ->once(); + + Cache::shouldReceive('lock') + ->once() + ->with('laravel-workflow-exception:123', 15) + ->andReturn($lock); + + $workflow = Mockery::mock(); + $workflow->shouldReceive('resume') + ->once(); + + $storedWorkflow = Mockery::mock(StoredWorkflow::class) + ->makePartial(); + $storedWorkflow->id = 123; + $storedWorkflow->shouldReceive('effectiveConnection') + ->andReturn(null); + $storedWorkflow->shouldReceive('effectiveQueue') + ->andReturn(null); + $storedWorkflow->shouldReceive('toWorkflow') + ->once() + ->andReturn($workflow); + $storedWorkflow->shouldReceive('hasLogByIndex') + ->once() + ->with(0) + ->andReturn(true); + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, new BaseException('existing log')); + $exception->handle(); + + $this->assertSame(123, $storedWorkflow->id); + + Mockery::close(); + } + + public function testHandleReleasesWhenExceptionLockUnavailable(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + + $lock = Mockery::mock(); + $lock->shouldReceive('get') + ->once() + ->andReturn(false); + + Cache::shouldReceive('lock') + ->once() + ->with('laravel-workflow-exception:' . $storedWorkflow->id, 15) + ->andReturn($lock); + + $job = Mockery::mock(JobContract::class); + $job->shouldReceive('release') + ->once() + ->with(0); + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'locked', + 'code' => 0, + ]); + $exception->setJob($job); + $exception->handle(); + + $this->assertFalse($storedWorkflow->hasLogByIndex(0)); + + Mockery::close(); + } + + public function testProbeReplayShortCircuitsWhenWorkflowClassIsInvalid(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->class = ''; + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'invalid workflow class', + 'code' => 0, + ]); + + $method = new ReflectionMethod(Exception::class, 'shouldPersistAfterProbeReplay'); + $method->setAccessible(true); + + $this->assertTrue($method->invoke($exception)); + } + + public function testSkipsWriteWhenProbeDoesNotReachCandidateException(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestProbeParallelChildWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); $storedWorkflow->update([ 'arguments' => Serializer::serialize([]), 'status' => WorkflowRunningStatus::$name, @@ -60,20 +166,99 @@ public function testSkipsWriteWhenSiblingExceptionLogExists(): void ->toDateTimeString(), 'class' => Exception::class, 'result' => Serializer::serialize([ - 'class' => \Exception::class, - 'message' => 'first child failed', + 'class' => BaseException::class, + 'message' => 'child failed: child-1', 'code' => 0, ]), ]); $exception = new Exception(1, now()->toDateTimeString(), $storedWorkflow, [ - 'class' => \Exception::class, - 'message' => 'second child failed', + 'class' => BaseException::class, + 'message' => 'child failed: child-2', 'code' => 0, - ]); + ], sourceClass: TestProbeChildFailureWorkflow::class); $exception->handle(); $this->assertFalse($storedWorkflow->hasLogByIndex(1)); $this->assertSame(1, $storedWorkflow->logs()->count()); } + + public function testPersistsWriteWhenProbeReachesCandidateException(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestProbeBackToBackWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowRunningStatus::$name, + ]); + + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => now() + ->toDateTimeString(), + 'class' => Exception::class, + 'result' => Serializer::serialize([ + 'class' => RuntimeException::class, + 'message' => 'first failure', + 'code' => 0, + ]), + ]); + + $exception = new Exception(1, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => InvalidArgumentException::class, + 'message' => 'second failure', + 'code' => 0, + ], sourceClass: TestProbeRetryActivity::class); + $exception->handle(); + + $log = $storedWorkflow->fresh() + ->logs() + ->firstWhere('index', 1); + + $this->assertNotNull($log); + $this->assertTrue($storedWorkflow->fresh()->hasLogByIndex(1)); + $this->assertSame(TestProbeRetryActivity::class, Serializer::unserialize($log->result)['sourceClass']); + } + + public function testSkipsWriteWhenProbeReachesDifferentActivityClassAtSameIndex(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestSagaParallelActivityWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowRunningStatus::$name, + ]); + + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => now() + ->toDateTimeString(), + 'class' => TestActivity::class, + 'result' => Serializer::serialize('step complete'), + ]); + + $storedWorkflow->logs() + ->create([ + 'index' => 1, + 'now' => now() + ->toDateTimeString(), + 'class' => Exception::class, + 'result' => Serializer::serialize([ + 'class' => RuntimeException::class, + 'message' => 'parallel failure', + 'code' => 0, + ]), + ]); + + $exception = new Exception(2, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => RuntimeException::class, + 'message' => 'another parallel failure', + 'code' => 0, + ], sourceClass: TestSagaActivity::class); + $exception->handle(); + + $this->assertFalse($storedWorkflow->fresh()->hasLogByIndex(2)); + } } diff --git a/tests/Unit/Traits/AwaitsTest.php b/tests/Unit/Traits/AwaitsTest.php index 12b2a8b..1759143 100644 --- a/tests/Unit/Traits/AwaitsTest.php +++ b/tests/Unit/Traits/AwaitsTest.php @@ -102,6 +102,36 @@ public function testResolvesConflictingResult(): void $this->assertFalse(Serializer::unserialize($workflow->logs()->firstWhere('index', 0)->result)); } + public function testDefersWhenProbing(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $conditionEvaluated = false; + $result = null; + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + ]); + + WorkflowStub::await(static function () use (&$conditionEvaluated): bool { + $conditionEvaluated = true; + + return true; + }) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertFalse($conditionEvaluated); + $this->assertNull($result); + $this->assertSame(0, $workflow->logs()->count()); + $this->assertSame(1, WorkflowStub::getContext()->index); + } + public function testThrowsQueryExceptionWhenNotDuplicateKey(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); diff --git a/tests/Unit/Traits/SideEffectsTest.php b/tests/Unit/Traits/SideEffectsTest.php index 9e12100..dfbe0a3 100644 --- a/tests/Unit/Traits/SideEffectsTest.php +++ b/tests/Unit/Traits/SideEffectsTest.php @@ -88,6 +88,36 @@ public function testResolvesConflictingResult(): void $this->assertSame('test', Serializer::unserialize($workflow->logs()->firstWhere('index', 0)->result)); } + public function testDefersWhenProbing(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $callableEvaluated = false; + $result = null; + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + ]); + + WorkflowStub::sideEffect(static function () use (&$callableEvaluated): string { + $callableEvaluated = true; + + return 'test'; + }) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertFalse($callableEvaluated); + $this->assertNull($result); + $this->assertSame(0, $workflow->logs()->count()); + $this->assertSame(1, WorkflowStub::getContext()->index); + } + public function testThrowsQueryExceptionWhenNotDuplicateKey(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); diff --git a/tests/Unit/Traits/TimersTest.php b/tests/Unit/Traits/TimersTest.php index 0e5ead9..cce3699 100644 --- a/tests/Unit/Traits/TimersTest.php +++ b/tests/Unit/Traits/TimersTest.php @@ -242,6 +242,38 @@ public function testTimerReturnsUnresolvedPromiseWhenReplayingAndNoTimer(): void ]); } + public function testTimerReturnsUnresolvedPromiseWhenProbingAndNoTimer(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $result = null; + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + ]); + + WorkflowStub::timer('1 minute') + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertNull($result); + $this->assertSame(1, WorkflowStub::getContext()->index); + $this->assertSame(0, $workflow->logs()->count()); + $this->assertDatabaseMissing('workflow_timers', [ + 'stored_workflow_id' => $workflow->id(), + 'index' => 0, + ]); + } + public function testTimerCapsDelayForSqsDriver(): void { Bus::fake(); diff --git a/tests/Unit/Traits/VersionsTest.php b/tests/Unit/Traits/VersionsTest.php index 1fe909f..d5425b4 100644 --- a/tests/Unit/Traits/VersionsTest.php +++ b/tests/Unit/Traits/VersionsTest.php @@ -191,6 +191,30 @@ public function testResolvesConflictingResultThrowsWhenVersionNotSupported(): vo Mockery::close(); } + public function testReturnsUnresolvedPromiseWhenProbingWithoutStoredVersion(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $result = null; + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + ]); + + WorkflowStub::getVersion('test-change', WorkflowStub::DEFAULT_VERSION, 1) + ->then(static function ($value) use (&$result): void { + $result = $value; + }); + + $this->assertNull($result); + $this->assertSame(1, WorkflowStub::getContext()->index); + $this->assertSame(0, $workflow->logs()->count()); + } + public function testThrowsQueryExceptionWhenNotDuplicateKey(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); diff --git a/tests/Unit/WorkflowStubTest.php b/tests/Unit/WorkflowStubTest.php index cf6b7e6..4188ddf 100644 --- a/tests/Unit/WorkflowStubTest.php +++ b/tests/Unit/WorkflowStubTest.php @@ -8,6 +8,8 @@ use Illuminate\Support\Carbon; use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\Queue; +use ReflectionProperty; +use stdClass; use Tests\Fixtures\TestAwaitWorkflow; use Tests\Fixtures\TestBadConnectionWorkflow; use Tests\Fixtures\TestChatBotWorkflow; @@ -231,6 +233,42 @@ public function testConnection(): void $this->assertSame('default', WorkflowStub::queue()); } + public function testProbeHelpers(): void + { + $contextProperty = new ReflectionProperty(WorkflowStub::class, 'context'); + $contextProperty->setAccessible(true); + $previousContext = $contextProperty->getValue(); + $contextProperty->setValue(null); + + try { + $this->assertInstanceOf(stdClass::class, WorkflowStub::getContext()); + $this->assertFalse(WorkflowStub::isProbing()); + $this->assertNull(WorkflowStub::probeIndex()); + $this->assertNull(WorkflowStub::probeClass()); + $this->assertFalse(WorkflowStub::probeMatched()); + + WorkflowStub::markProbeMatched(); + + $this->assertFalse(WorkflowStub::probeMatched()); + + WorkflowStub::setContext([ + 'probing' => true, + 'probeIndex' => 7, + 'probeClass' => TestWorkflow::class, + 'probeMatched' => false, + ]); + + WorkflowStub::markProbeMatched(); + + $this->assertTrue(WorkflowStub::isProbing()); + $this->assertSame(7, WorkflowStub::probeIndex()); + $this->assertSame(TestWorkflow::class, WorkflowStub::probeClass()); + $this->assertTrue(WorkflowStub::probeMatched()); + } finally { + $contextProperty->setValue($previousContext); + } + } + public function testHandlesDuplicateLogInsertionProperly(): void { Queue::fake();