From fae0c4db7e6a273ab32d1bcb4892ee0585f6ef84 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 07:30:00 +0000 Subject: [PATCH 01/15] Probe replay before persisting exception logs --- src/ActivityStub.php | 19 ++++ src/ChildWorkflowStub.php | 19 ++++ src/Exception.php | 69 ++++++++++++++- src/Traits/Awaits.php | 6 ++ src/Traits/SideEffects.php | 5 ++ src/Traits/Timers.php | 6 ++ src/Traits/Versions.php | 5 ++ src/Workflow.php | 18 +++- src/WorkflowStub.php | 38 +++++++- tests/Feature/ExceptionLoggingReplayTest.php | 88 +++++++++++++++++++ .../Fixtures/TestProbeBackToBackWorkflow.php | 28 ++++++ ...tProbeChildFailureCompensationActivity.php | 15 ++++ ...estProbeChildFailureParentStepActivity.php | 15 ++++ .../TestProbeChildFailureParentWorkflow.php | 35 ++++++++ .../TestProbeChildFailureWorkflow.php | 16 ++++ .../TestProbeParallelChildWorkflow.php | 27 ++++++ tests/Fixtures/TestProbeRetryActivity.php | 23 +++++ tests/Fixtures/TestProbeRetryWorkflow.php | 41 +++++++++ tests/Unit/ExceptionTest.php | 46 ++++++++-- 19 files changed, 508 insertions(+), 11 deletions(-) create mode 100644 tests/Feature/ExceptionLoggingReplayTest.php create mode 100644 tests/Fixtures/TestProbeBackToBackWorkflow.php create mode 100644 tests/Fixtures/TestProbeChildFailureCompensationActivity.php create mode 100644 tests/Fixtures/TestProbeChildFailureParentStepActivity.php create mode 100644 tests/Fixtures/TestProbeChildFailureParentWorkflow.php create mode 100644 tests/Fixtures/TestProbeChildFailureWorkflow.php create mode 100644 tests/Fixtures/TestProbeParallelChildWorkflow.php create mode 100644 tests/Fixtures/TestProbeRetryActivity.php create mode 100644 tests/Fixtures/TestProbeRetryWorkflow.php diff --git a/src/ActivityStub.php b/src/ActivityStub.php index ed8f2ca..7ef2caa 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -51,6 +51,18 @@ public static function make($activity, ...$arguments): PromiseInterface } if ($log) { + if ( + WorkflowStub::isProbing() + && WorkflowStub::probeIndex() === $context->index + && ( + WorkflowStub::probeClass() === null + || WorkflowStub::probeClass() === $activity + ) + && $log->class === Exception::class + ) { + WorkflowStub::markProbeMatched(); + } + ++$context->index; WorkflowStub::setContext($context); $result = Serializer::unserialize($log->result); @@ -74,6 +86,13 @@ public static function make($activity, ...$arguments): PromiseInterface return resolve($result); } + if (WorkflowStub::isProbing()) { + ++$context->index; + WorkflowStub::setContext($context); + $deferred = new Deferred(); + return $deferred->promise(); + } + $activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments); ++$context->index; diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 896237a..54f40d9 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -45,6 +45,18 @@ public static function make($workflow, ...$arguments): PromiseInterface } if ($log) { + if ( + WorkflowStub::isProbing() + && WorkflowStub::probeIndex() === $context->index + && ( + WorkflowStub::probeClass() === null + || WorkflowStub::probeClass() === $workflow + ) + && $log->class === Exception::class + ) { + WorkflowStub::markProbeMatched(); + } + ++$context->index; WorkflowStub::setContext($context); $result = Serializer::unserialize($log->result); @@ -67,6 +79,13 @@ public static function make($workflow, ...$arguments): PromiseInterface return resolve($result); } + if (WorkflowStub::isProbing()) { + ++$context->index; + WorkflowStub::setContext($context); + $deferred = new Deferred(); + return $deferred->promise(); + } + if (! $context->replaying) { $storedChildWorkflow = $context->storedWorkflow->children() ->wherePivot('parent_index', $context->index) diff --git a/src/Exception.php b/src/Exception.php index de177e5..74ca4be 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -10,9 +10,11 @@ use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; +use Throwable; use Workflow\Exceptions\TransitionNotFound; use Workflow\Middleware\WithoutOverlappingMiddleware; use Workflow\Models\StoredWorkflow; +use Workflow\Serializers\Serializer; final class Exception implements ShouldBeEncrypted, ShouldQueue { @@ -35,7 +37,8 @@ public function __construct( public StoredWorkflow $storedWorkflow, public $exception, $connection = null, - $queue = null + $queue = null, + public ?string $sourceClass = null ) { $connection = $connection ?? $this->storedWorkflow->effectiveConnection() ?? config('queue.default'); $queue = $queue ?? $this->storedWorkflow->effectiveQueue() ?? config( @@ -53,7 +56,7 @@ public function handle() try { if ($this->storedWorkflow->hasLogByIndex($this->index)) { $workflow->resume(); - } elseif (! $this->storedWorkflow->logs()->where('class', self::class)->exists()) { + } elseif ($this->shouldPersistAfterProbeReplay()) { $workflow->next($this->index, $this->now, self::class, $this->exception); } } catch (TransitionNotFound) { @@ -74,4 +77,66 @@ public function middleware() ), ]; } + + private function shouldPersistAfterProbeReplay(): bool + { + $workflowClass = $this->storedWorkflow->class; + + if (! is_string($workflowClass) || $workflowClass === '') { + return true; + } + + $previousContext = WorkflowStub::getContext(); + $connection = $this->storedWorkflow->getConnection(); + $shouldPersist = false; + + $connection->beginTransaction(); + + try { + $tentativeWorkflow = $this->createTentativeWorkflowState(); + $workflow = new $workflowClass($tentativeWorkflow, ...$tentativeWorkflow->workflowArguments()); + $workflow->replaying = true; + + WorkflowStub::setContext([ + 'storedWorkflow' => $tentativeWorkflow, + 'index' => 0, + 'now' => $this->now, + 'replaying' => true, + 'probing' => true, + 'probeIndex' => $this->index, + 'probeClass' => $this->sourceClass, + 'probeMatched' => false, + ]); + + try { + $workflow->handle(); + } catch (Throwable) { + // The replay path may still throw; we only care whether it matched this tentative log. + } + + $shouldPersist = WorkflowStub::probeMatched(); + } finally { + WorkflowStub::setContext($previousContext); + + if ($connection->transactionLevel() > 0) { + $connection->rollBack(); + } + } + + return $shouldPersist; + } + + private function createTentativeWorkflowState(): StoredWorkflow + { + $this->storedWorkflow->createLog([ + 'index' => $this->index, + 'now' => $this->now, + 'class' => self::class, + 'result' => Serializer::serialize($this->exception), + ]); + + $storedWorkflowClass = $this->storedWorkflow::class; + + return $storedWorkflowClass::query()->findOrFail($this->storedWorkflow->id); + } } diff --git a/src/Traits/Awaits.php b/src/Traits/Awaits.php index 3bd5ae1..7df13ad 100644 --- a/src/Traits/Awaits.php +++ b/src/Traits/Awaits.php @@ -22,6 +22,12 @@ public static function await($condition): PromiseInterface return resolve(Serializer::unserialize($log->result)); } + if (self::isProbing()) { + ++self::$context->index; + $deferred = new Deferred(); + return $deferred->promise(); + } + $result = $condition(); if ($result === true) { diff --git a/src/Traits/SideEffects.php b/src/Traits/SideEffects.php index e4a4db0..c271385 100644 --- a/src/Traits/SideEffects.php +++ b/src/Traits/SideEffects.php @@ -20,6 +20,11 @@ public static function sideEffect($callable): PromiseInterface return resolve(Serializer::unserialize($log->result)); } + if (self::isProbing()) { + ++self::$context->index; + return (new \React\Promise\Deferred())->promise(); + } + $result = $callable(); if (! self::$context->replaying) { diff --git a/src/Traits/Timers.php b/src/Traits/Timers.php index 538f950..f2fc6a3 100644 --- a/src/Traits/Timers.php +++ b/src/Traits/Timers.php @@ -41,6 +41,12 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa $when = self::$context->now->copy() ->addSeconds($seconds); + if (self::isProbing()) { + ++self::$context->index; + $deferred = new Deferred(); + return $deferred->promise(); + } + if (! self::$context->replaying) { $timer = self::$context->storedWorkflow->createTimer([ 'index' => self::$context->index, diff --git a/src/Traits/Versions.php b/src/Traits/Versions.php index d8fd011..3dc2307 100644 --- a/src/Traits/Versions.php +++ b/src/Traits/Versions.php @@ -33,6 +33,11 @@ public static function getVersion( return resolve($version); } + if (self::isProbing()) { + ++self::$context->index; + return (new \React\Promise\Deferred())->promise(); + } + $version = $maxSupported; if (! self::$context->replaying) { diff --git a/src/Workflow.php b/src/Workflow.php index 20b80b6..70e5957 100644 --- a/src/Workflow.php +++ b/src/Workflow.php @@ -209,7 +209,7 @@ public function handle(): void $this->now = $log ? $log->now : Carbon::now(); } - WorkflowStub::setContext([ + $this->setContext([ 'storedWorkflow' => $this->storedWorkflow, 'index' => $this->index, 'now' => $this->now, @@ -229,7 +229,7 @@ public function handle(): void $this->now = $log ? $log->now : Carbon::now(); - WorkflowStub::setContext([ + $this->setContext([ 'storedWorkflow' => $this->storedWorkflow, 'index' => $this->index, 'now' => $this->now, @@ -309,4 +309,18 @@ public function handle(): void } } } + + private function setContext(array $context): void + { + $existingContext = WorkflowStub::getContext(); + + if (property_exists($existingContext, 'probing') && $existingContext->probing) { + $context['probing'] = true; + $context['probeIndex'] = $existingContext->probeIndex ?? null; + $context['probeClass'] = $existingContext->probeClass ?? null; + $context['probeMatched'] = $existingContext->probeMatched ?? false; + } + + WorkflowStub::setContext($context); + } } diff --git a/src/WorkflowStub.php b/src/WorkflowStub.php index 5aabead..095e294 100644 --- a/src/WorkflowStub.php +++ b/src/WorkflowStub.php @@ -157,6 +157,10 @@ public static function fromStoredWorkflow(StoredWorkflow $storedWorkflow): stati public static function getContext(): \stdClass { + if (self::$context === null) { + self::$context = new \stdClass(); + } + return self::$context; } @@ -170,6 +174,35 @@ public static function now() return self::getContext()->now; } + public static function isProbing(): bool + { + return (bool) (self::getContext()->probing ?? false); + } + + public static function probeIndex(): ?int + { + return self::getContext()->probeIndex ?? null; + } + + public static function probeClass(): ?string + { + return self::getContext()->probeClass ?? null; + } + + public static function markProbeMatched(): void + { + if (! self::isProbing()) { + return; + } + + self::$context->probeMatched = true; + } + + public static function probeMatched(): bool + { + return (bool) (self::getContext()->probeMatched ?? false); + } + public function id() { return $this->storedWorkflow->id; @@ -287,7 +320,7 @@ public function fail($exception): void ->format('Y-m-d\TH:i:s.u\Z')); $this->storedWorkflow->parents() - ->each(static function ($parentWorkflow) use ($exception) { + ->each(function ($parentWorkflow) use ($exception) { if ( $parentWorkflow->pivot->parent_index === StoredWorkflow::CONTINUE_PARENT_INDEX || $parentWorkflow->pivot->parent_index === StoredWorkflow::ACTIVE_WORKFLOW_INDEX @@ -324,7 +357,8 @@ public function fail($exception): void $parentWorkflow, $throwable, $parentWf->connection(), - $parentWf->queue() + $parentWf->queue(), + $this->storedWorkflow->class ); }); } diff --git a/tests/Feature/ExceptionLoggingReplayTest.php b/tests/Feature/ExceptionLoggingReplayTest.php new file mode 100644 index 0000000..4c2dcc3 --- /dev/null +++ b/tests/Feature/ExceptionLoggingReplayTest.php @@ -0,0 +1,88 @@ +start(); + + sleep(1); + $workflow->requestRetry(); + + sleep(1); + $workflow->requestRetry(); + + while ($workflow->running()); + + $classes = $workflow->logs() + ->pluck('class') + ->all(); + + $this->assertSame(WorkflowCompletedStatus::class, $workflow->status()); + $this->assertSame('success', $workflow->output()); + $this->assertSame([ + Exception::class, + Signal::class, + Exception::class, + Signal::class, + TestProbeRetryActivity::class, + ], $classes); + } + + public function testBackToBackCaughtExceptionsEachPersist(): void + { + $workflow = WorkflowStub::make(TestProbeBackToBackWorkflow::class); + + $workflow->start(); + + while ($workflow->running()); + + $classes = $workflow->logs() + ->pluck('class') + ->all(); + + $this->assertSame(WorkflowCompletedStatus::class, $workflow->status()); + $this->assertSame('caught second: second failure', $workflow->output()); + $this->assertSame([Exception::class, Exception::class], $classes); + } + + public function testParallelChildFailuresStillDeduplicateToOneParentException(): void + { + $workflow = WorkflowStub::make(TestProbeChildFailureParentWorkflow::class); + + $workflow->start(); + + while ($workflow->running()); + + $classes = $workflow->logs() + ->pluck('class') + ->all(); + + $this->assertSame(WorkflowCompletedStatus::class, $workflow->status()); + $this->assertSame('caught: child failed: child-1', $workflow->output()); + $this->assertSame([ + TestProbeChildFailureParentStepActivity::class, + Exception::class, + TestProbeChildFailureCompensationActivity::class, + ], $classes); + $this->assertSame(1, $workflow->logs()->where('class', Exception::class)->count()); + } +} diff --git a/tests/Fixtures/TestProbeBackToBackWorkflow.php b/tests/Fixtures/TestProbeBackToBackWorkflow.php new file mode 100644 index 0000000..abc0fc6 --- /dev/null +++ b/tests/Fixtures/TestProbeBackToBackWorkflow.php @@ -0,0 +1,28 @@ +getMessage(); + } + + return 'unexpected-success'; + } +} diff --git a/tests/Fixtures/TestProbeChildFailureCompensationActivity.php b/tests/Fixtures/TestProbeChildFailureCompensationActivity.php new file mode 100644 index 0000000..d2ea956 --- /dev/null +++ b/tests/Fixtures/TestProbeChildFailureCompensationActivity.php @@ -0,0 +1,15 @@ +addCompensation(fn () => activity(TestProbeChildFailureCompensationActivity::class)); + + yield all([ + child(TestProbeChildFailureWorkflow::class, 'child-1'), + child(TestProbeChildFailureWorkflow::class, 'child-2'), + child(TestProbeChildFailureWorkflow::class, 'child-3'), + ]); + + return 'unexpected-success'; + } catch (Throwable $throwable) { + yield from $this->compensate(); + + return 'caught: ' . $throwable->getMessage(); + } + } +} diff --git a/tests/Fixtures/TestProbeChildFailureWorkflow.php b/tests/Fixtures/TestProbeChildFailureWorkflow.php new file mode 100644 index 0000000..ca36619 --- /dev/null +++ b/tests/Fixtures/TestProbeChildFailureWorkflow.php @@ -0,0 +1,16 @@ + throw new RuntimeException('first failure'), + 2 => throw new InvalidArgumentException('second failure'), + default => 'success', + }; + } +} diff --git a/tests/Fixtures/TestProbeRetryWorkflow.php b/tests/Fixtures/TestProbeRetryWorkflow.php new file mode 100644 index 0000000..05e0781 --- /dev/null +++ b/tests/Fixtures/TestProbeRetryWorkflow.php @@ -0,0 +1,41 @@ +inbox->receive('retry'); + } + + public function execute() + { + $attempt = 0; + + while (true) { + try { + ++$attempt; + + return yield activity(TestProbeRetryActivity::class, $attempt); + } catch (Throwable $throwable) { + if ($attempt >= 3) { + throw $throwable; + } + + yield await(fn (): bool => $this->inbox->hasUnread()); + + $this->inbox->nextUnread(); + } + } + } +} diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index 1f6cf3c..d01f7aa 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -4,6 +4,10 @@ namespace Tests\Unit; +use Tests\Fixtures\TestProbeBackToBackWorkflow; +use Tests\Fixtures\TestProbeChildFailureWorkflow; +use Tests\Fixtures\TestProbeParallelChildWorkflow; +use Tests\Fixtures\TestProbeRetryActivity; use Tests\Fixtures\TestWorkflow; use Tests\TestCase; use Workflow\Exception; @@ -44,9 +48,9 @@ public function testExceptionWorkflowRunning(): void $this->assertSame(WorkflowRunningStatus::class, $workflow->status()); } - public function testSkipsWriteWhenSiblingExceptionLogExists(): void + public function testSkipsWriteWhenProbeDoesNotReachCandidateException(): void { - $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $workflow = WorkflowStub::load(WorkflowStub::make(TestProbeParallelChildWorkflow::class)->id()); $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); $storedWorkflow->update([ 'arguments' => Serializer::serialize([]), @@ -61,19 +65,51 @@ public function testSkipsWriteWhenSiblingExceptionLogExists(): void 'class' => Exception::class, 'result' => Serializer::serialize([ 'class' => \Exception::class, - 'message' => 'first child failed', + 'message' => 'child failed: child-1', 'code' => 0, ]), ]); $exception = new Exception(1, now()->toDateTimeString(), $storedWorkflow, [ 'class' => \Exception::class, - 'message' => 'second child failed', + 'message' => 'child failed: child-2', 'code' => 0, - ]); + ], sourceClass: TestProbeChildFailureWorkflow::class); $exception->handle(); $this->assertFalse($storedWorkflow->hasLogByIndex(1)); $this->assertSame(1, $storedWorkflow->logs()->count()); } + + public function testPersistsWriteWhenProbeReachesCandidateException(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestProbeBackToBackWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowRunningStatus::$name, + ]); + + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => now() + ->toDateTimeString(), + 'class' => Exception::class, + 'result' => Serializer::serialize([ + 'class' => \RuntimeException::class, + 'message' => 'first failure', + 'code' => 0, + ]), + ]); + + $exception = new Exception(1, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => \InvalidArgumentException::class, + 'message' => 'second failure', + 'code' => 0, + ], sourceClass: TestProbeRetryActivity::class); + $exception->handle(); + + $this->assertTrue($storedWorkflow->fresh()->hasLogByIndex(1)); + } } From 5f61620518cd326df9e75bf352b78957ce9b7b28 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 12:59:29 +0000 Subject: [PATCH 02/15] Fix ECS issues in probe fixtures --- tests/Fixtures/TestProbeBackToBackWorkflow.php | 2 +- tests/Fixtures/TestProbeChildFailureParentWorkflow.php | 4 ++-- tests/Fixtures/TestProbeParallelChildWorkflow.php | 2 +- tests/Fixtures/TestProbeRetryWorkflow.php | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/Fixtures/TestProbeBackToBackWorkflow.php b/tests/Fixtures/TestProbeBackToBackWorkflow.php index abc0fc6..06396d6 100644 --- a/tests/Fixtures/TestProbeBackToBackWorkflow.php +++ b/tests/Fixtures/TestProbeBackToBackWorkflow.php @@ -5,8 +5,8 @@ namespace Tests\Fixtures; use Throwable; -use Workflow\Workflow; use function Workflow\activity; +use Workflow\Workflow; final class TestProbeBackToBackWorkflow extends Workflow { diff --git a/tests/Fixtures/TestProbeChildFailureParentWorkflow.php b/tests/Fixtures/TestProbeChildFailureParentWorkflow.php index 945749a..a2a8167 100644 --- a/tests/Fixtures/TestProbeChildFailureParentWorkflow.php +++ b/tests/Fixtures/TestProbeChildFailureParentWorkflow.php @@ -5,10 +5,10 @@ namespace Tests\Fixtures; use Throwable; -use Workflow\Workflow; use function Workflow\activity; use function Workflow\all; use function Workflow\child; +use Workflow\Workflow; final class TestProbeChildFailureParentWorkflow extends Workflow { @@ -17,7 +17,7 @@ public function execute() try { yield activity(TestProbeChildFailureParentStepActivity::class); - $this->addCompensation(fn () => activity(TestProbeChildFailureCompensationActivity::class)); + $this->addCompensation(static fn () => activity(TestProbeChildFailureCompensationActivity::class)); yield all([ child(TestProbeChildFailureWorkflow::class, 'child-1'), diff --git a/tests/Fixtures/TestProbeParallelChildWorkflow.php b/tests/Fixtures/TestProbeParallelChildWorkflow.php index 827958c..bc3ff58 100644 --- a/tests/Fixtures/TestProbeParallelChildWorkflow.php +++ b/tests/Fixtures/TestProbeParallelChildWorkflow.php @@ -5,9 +5,9 @@ namespace Tests\Fixtures; use Throwable; -use Workflow\Workflow; use function Workflow\all; use function Workflow\child; +use Workflow\Workflow; final class TestProbeParallelChildWorkflow extends Workflow { diff --git a/tests/Fixtures/TestProbeRetryWorkflow.php b/tests/Fixtures/TestProbeRetryWorkflow.php index 05e0781..ec80b0c 100644 --- a/tests/Fixtures/TestProbeRetryWorkflow.php +++ b/tests/Fixtures/TestProbeRetryWorkflow.php @@ -5,10 +5,10 @@ namespace Tests\Fixtures; use Throwable; -use Workflow\SignalMethod; -use Workflow\Workflow; use function Workflow\activity; use function Workflow\await; +use Workflow\SignalMethod; +use Workflow\Workflow; final class TestProbeRetryWorkflow extends Workflow { From bfb3a650869e5a0b4adcff2c04bce5fcdeb67cb6 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 13:04:53 +0000 Subject: [PATCH 03/15] Import Deferred in probe code --- src/Traits/SideEffects.php | 3 ++- src/Traits/Versions.php | 3 ++- tests/Unit/ExceptionTest.php | 15 +++++++++------ 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/Traits/SideEffects.php b/src/Traits/SideEffects.php index c271385..72dd5d7 100644 --- a/src/Traits/SideEffects.php +++ b/src/Traits/SideEffects.php @@ -5,6 +5,7 @@ namespace Workflow\Traits; use Illuminate\Database\QueryException; +use React\Promise\Deferred; use React\Promise\PromiseInterface; use function React\Promise\resolve; use Workflow\Serializers\Serializer; @@ -22,7 +23,7 @@ public static function sideEffect($callable): PromiseInterface if (self::isProbing()) { ++self::$context->index; - return (new \React\Promise\Deferred())->promise(); + return (new Deferred())->promise(); } $result = $callable(); diff --git a/src/Traits/Versions.php b/src/Traits/Versions.php index 3dc2307..7d841fa 100644 --- a/src/Traits/Versions.php +++ b/src/Traits/Versions.php @@ -5,6 +5,7 @@ namespace Workflow\Traits; use Illuminate\Database\QueryException; +use React\Promise\Deferred; use React\Promise\PromiseInterface; use function React\Promise\resolve; use Workflow\Exceptions\VersionNotSupportedException; @@ -35,7 +36,7 @@ public static function getVersion( if (self::isProbing()) { ++self::$context->index; - return (new \React\Promise\Deferred())->promise(); + return (new Deferred())->promise(); } $version = $maxSupported; diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index d01f7aa..326577f 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -4,6 +4,9 @@ namespace Tests\Unit; +use Exception as BaseException; +use InvalidArgumentException; +use RuntimeException; use Tests\Fixtures\TestProbeBackToBackWorkflow; use Tests\Fixtures\TestProbeChildFailureWorkflow; use Tests\Fixtures\TestProbeParallelChildWorkflow; @@ -21,7 +24,7 @@ final class ExceptionTest extends TestCase { public function testMiddleware(): void { - $exception = new Exception(0, now()->toDateTimeString(), new StoredWorkflow(), new \Exception( + $exception = new Exception(0, now()->toDateTimeString(), new StoredWorkflow(), new BaseException( 'Test exception' )); @@ -42,7 +45,7 @@ public function testExceptionWorkflowRunning(): void 'status' => WorkflowRunningStatus::$name, ]); - $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, new \Exception('Test exception')); + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, new BaseException('Test exception')); $exception->handle(); $this->assertSame(WorkflowRunningStatus::class, $workflow->status()); @@ -64,14 +67,14 @@ public function testSkipsWriteWhenProbeDoesNotReachCandidateException(): void ->toDateTimeString(), 'class' => Exception::class, 'result' => Serializer::serialize([ - 'class' => \Exception::class, + 'class' => BaseException::class, 'message' => 'child failed: child-1', 'code' => 0, ]), ]); $exception = new Exception(1, now()->toDateTimeString(), $storedWorkflow, [ - 'class' => \Exception::class, + 'class' => BaseException::class, 'message' => 'child failed: child-2', 'code' => 0, ], sourceClass: TestProbeChildFailureWorkflow::class); @@ -97,14 +100,14 @@ public function testPersistsWriteWhenProbeReachesCandidateException(): void ->toDateTimeString(), 'class' => Exception::class, 'result' => Serializer::serialize([ - 'class' => \RuntimeException::class, + 'class' => RuntimeException::class, 'message' => 'first failure', 'code' => 0, ]), ]); $exception = new Exception(1, now()->toDateTimeString(), $storedWorkflow, [ - 'class' => \InvalidArgumentException::class, + 'class' => InvalidArgumentException::class, 'message' => 'second failure', 'code' => 0, ], sourceClass: TestProbeRetryActivity::class); From b36bd34d1ec49a68952266c9bed92a91a5bce78a Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 13:14:16 +0000 Subject: [PATCH 04/15] Inline Deferred promise returns --- src/ActivityStub.php | 6 ++---- src/ChildWorkflowStub.php | 6 ++---- src/Traits/Awaits.php | 6 ++---- src/Traits/Timers.php | 9 +++------ 4 files changed, 9 insertions(+), 18 deletions(-) diff --git a/src/ActivityStub.php b/src/ActivityStub.php index 7ef2caa..b9cbe53 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -89,15 +89,13 @@ public static function make($activity, ...$arguments): PromiseInterface if (WorkflowStub::isProbing()) { ++$context->index; WorkflowStub::setContext($context); - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } $activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments); ++$context->index; WorkflowStub::setContext($context); - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } } diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 54f40d9..7ea7d90 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -82,8 +82,7 @@ public static function make($workflow, ...$arguments): PromiseInterface if (WorkflowStub::isProbing()) { ++$context->index; WorkflowStub::setContext($context); - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } if (! $context->replaying) { @@ -113,7 +112,6 @@ public static function make($workflow, ...$arguments): PromiseInterface ++$context->index; WorkflowStub::setContext($context); - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } } diff --git a/src/Traits/Awaits.php b/src/Traits/Awaits.php index 7df13ad..0ba8fae 100644 --- a/src/Traits/Awaits.php +++ b/src/Traits/Awaits.php @@ -24,8 +24,7 @@ public static function await($condition): PromiseInterface if (self::isProbing()) { ++self::$context->index; - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } $result = $condition(); @@ -55,7 +54,6 @@ public static function await($condition): PromiseInterface } ++self::$context->index; - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } } diff --git a/src/Traits/Timers.php b/src/Traits/Timers.php index f2fc6a3..0be2981 100644 --- a/src/Traits/Timers.php +++ b/src/Traits/Timers.php @@ -43,8 +43,7 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa if (self::isProbing()) { ++self::$context->index; - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } if (! self::$context->replaying) { @@ -54,8 +53,7 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa ]); } else { ++self::$context->index; - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } } @@ -101,7 +99,6 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa } ++self::$context->index; - $deferred = new Deferred(); - return $deferred->promise(); + return (new Deferred())->promise(); } } From 7393ea500bd7b3d2481bc562b92b3eed1ed5bff8 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 13:15:07 +0000 Subject: [PATCH 05/15] Remove redundant context resets --- src/ActivityStub.php | 3 --- src/ChildWorkflowStub.php | 3 --- 2 files changed, 6 deletions(-) diff --git a/src/ActivityStub.php b/src/ActivityStub.php index b9cbe53..8256c4b 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -64,7 +64,6 @@ public static function make($activity, ...$arguments): PromiseInterface } ++$context->index; - WorkflowStub::setContext($context); $result = Serializer::unserialize($log->result); if ( is_array($result) && @@ -88,14 +87,12 @@ public static function make($activity, ...$arguments): PromiseInterface if (WorkflowStub::isProbing()) { ++$context->index; - WorkflowStub::setContext($context); return (new Deferred())->promise(); } $activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments); ++$context->index; - WorkflowStub::setContext($context); return (new Deferred())->promise(); } } diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 7ea7d90..58279d8 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -58,7 +58,6 @@ public static function make($workflow, ...$arguments): PromiseInterface } ++$context->index; - WorkflowStub::setContext($context); $result = Serializer::unserialize($log->result); if ( is_array($result) @@ -81,7 +80,6 @@ public static function make($workflow, ...$arguments): PromiseInterface if (WorkflowStub::isProbing()) { ++$context->index; - WorkflowStub::setContext($context); return (new Deferred())->promise(); } @@ -111,7 +109,6 @@ public static function make($workflow, ...$arguments): PromiseInterface } ++$context->index; - WorkflowStub::setContext($context); return (new Deferred())->promise(); } } From f3a51750c5977516682df1b6fcbba298c6071e9a Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 13:50:06 +0000 Subject: [PATCH 06/15] Restore parent context in workflow stubs --- src/ActivityStub.php | 3 +++ src/ChildWorkflowStub.php | 3 +++ 2 files changed, 6 insertions(+) diff --git a/src/ActivityStub.php b/src/ActivityStub.php index 8256c4b..b9cbe53 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -64,6 +64,7 @@ public static function make($activity, ...$arguments): PromiseInterface } ++$context->index; + WorkflowStub::setContext($context); $result = Serializer::unserialize($log->result); if ( is_array($result) && @@ -87,12 +88,14 @@ public static function make($activity, ...$arguments): PromiseInterface if (WorkflowStub::isProbing()) { ++$context->index; + WorkflowStub::setContext($context); return (new Deferred())->promise(); } $activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments); ++$context->index; + WorkflowStub::setContext($context); return (new Deferred())->promise(); } } diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 58279d8..7ea7d90 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -58,6 +58,7 @@ public static function make($workflow, ...$arguments): PromiseInterface } ++$context->index; + WorkflowStub::setContext($context); $result = Serializer::unserialize($log->result); if ( is_array($result) @@ -80,6 +81,7 @@ public static function make($workflow, ...$arguments): PromiseInterface if (WorkflowStub::isProbing()) { ++$context->index; + WorkflowStub::setContext($context); return (new Deferred())->promise(); } @@ -109,6 +111,7 @@ public static function make($workflow, ...$arguments): PromiseInterface } ++$context->index; + WorkflowStub::setContext($context); return (new Deferred())->promise(); } } From 1ea7c85624853cd0f5a125512c7aa31187ea823f Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 14:08:24 +0000 Subject: [PATCH 07/15] Tag activity exceptions for probe replay --- src/Activity.php | 3 ++- tests/Unit/ExceptionTest.php | 44 ++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/src/Activity.php b/src/Activity.php index a26508a..44e116b 100644 --- a/src/Activity.php +++ b/src/Activity.php @@ -170,7 +170,8 @@ public function failed(Throwable $throwable): void $this->storedWorkflow, $throwable, $workflow->connection(), - $workflow->queue() + $workflow->queue(), + $this::class ); } diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index 326577f..abc334f 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -7,10 +7,13 @@ use Exception as BaseException; use InvalidArgumentException; use RuntimeException; +use Tests\Fixtures\TestActivity; use Tests\Fixtures\TestProbeBackToBackWorkflow; use Tests\Fixtures\TestProbeChildFailureWorkflow; use Tests\Fixtures\TestProbeParallelChildWorkflow; use Tests\Fixtures\TestProbeRetryActivity; +use Tests\Fixtures\TestSagaActivity; +use Tests\Fixtures\TestSagaParallelActivityWorkflow; use Tests\Fixtures\TestWorkflow; use Tests\TestCase; use Workflow\Exception; @@ -115,4 +118,45 @@ public function testPersistsWriteWhenProbeReachesCandidateException(): void $this->assertTrue($storedWorkflow->fresh()->hasLogByIndex(1)); } + + public function testSkipsWriteWhenProbeReachesDifferentActivityClassAtSameIndex(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestSagaParallelActivityWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowRunningStatus::$name, + ]); + + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => now() + ->toDateTimeString(), + 'class' => TestActivity::class, + 'result' => Serializer::serialize('step complete'), + ]); + + $storedWorkflow->logs() + ->create([ + 'index' => 1, + 'now' => now() + ->toDateTimeString(), + 'class' => Exception::class, + 'result' => Serializer::serialize([ + 'class' => RuntimeException::class, + 'message' => 'parallel failure', + 'code' => 0, + ]), + ]); + + $exception = new Exception(2, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => RuntimeException::class, + 'message' => 'another parallel failure', + 'code' => 0, + ], sourceClass: TestSagaActivity::class); + $exception->handle(); + + $this->assertFalse($storedWorkflow->fresh()->hasLogByIndex(2)); + } } From d8d4219d251e43eb6d5eade09b99ae818b9c0a45 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 14:57:02 +0000 Subject: [PATCH 08/15] Skip stale foreign exception logs during replay --- src/ActivityStub.php | 11 +++++++ src/ChildWorkflowStub.php | 11 +++++++ src/Exception.php | 44 +++++++++++++++++++++------- tests/Unit/ActivityStubTest.php | 39 ++++++++++++++++++++++++ tests/Unit/ChildWorkflowStubTest.php | 40 +++++++++++++++++++++++++ 5 files changed, 135 insertions(+), 10 deletions(-) diff --git a/src/ActivityStub.php b/src/ActivityStub.php index b9cbe53..56caaa6 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -66,6 +66,9 @@ public static function make($activity, ...$arguments): PromiseInterface ++$context->index; WorkflowStub::setContext($context); $result = Serializer::unserialize($log->result); + if ($log->class === Exception::class && self::isForeignExceptionResult($result, $activity)) { + return self::make($activity, ...$arguments); + } if ( is_array($result) && array_key_exists('class', $result) && @@ -98,4 +101,12 @@ public static function make($activity, ...$arguments): PromiseInterface WorkflowStub::setContext($context); return (new Deferred())->promise(); } + + private static function isForeignExceptionResult(mixed $result, string $activity): bool + { + return is_array($result) + && isset($result['sourceClass']) + && is_string($result['sourceClass']) + && $result['sourceClass'] !== $activity; + } } diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 7ea7d90..03a23ff 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -60,6 +60,9 @@ public static function make($workflow, ...$arguments): PromiseInterface ++$context->index; WorkflowStub::setContext($context); $result = Serializer::unserialize($log->result); + if ($log->class === Exception::class && self::isForeignExceptionResult($result, $workflow)) { + return self::make($workflow, ...$arguments); + } if ( is_array($result) && array_key_exists('class', $result) @@ -114,4 +117,12 @@ public static function make($workflow, ...$arguments): PromiseInterface WorkflowStub::setContext($context); return (new Deferred())->promise(); } + + private static function isForeignExceptionResult(mixed $result, string $workflow): bool + { + return is_array($result) + && isset($result['sourceClass']) + && is_string($result['sourceClass']) + && $result['sourceClass'] !== $workflow; + } } diff --git a/src/Exception.php b/src/Exception.php index 74ca4be..2c6ec0d 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -10,6 +10,7 @@ use Illuminate\Foundation\Bus\Dispatchable; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Queue\SerializesModels; +use Illuminate\Support\Facades\Cache; use Throwable; use Workflow\Exceptions\TransitionNotFound; use Workflow\Middleware\WithoutOverlappingMiddleware; @@ -51,18 +52,30 @@ public function __construct( public function handle() { - $workflow = $this->storedWorkflow->toWorkflow(); + $lock = Cache::lock('laravel-workflow-exception:' . $this->storedWorkflow->id, 15); + + if (! $lock->get()) { + $this->release(); + + return; + } try { - if ($this->storedWorkflow->hasLogByIndex($this->index)) { - $workflow->resume(); - } elseif ($this->shouldPersistAfterProbeReplay()) { - $workflow->next($this->index, $this->now, self::class, $this->exception); - } - } catch (TransitionNotFound) { - if ($workflow->running()) { - $this->release(); + $workflow = $this->storedWorkflow->toWorkflow(); + + try { + if ($this->storedWorkflow->hasLogByIndex($this->index)) { + $workflow->resume(); + } elseif ($this->shouldPersistAfterProbeReplay()) { + $workflow->next($this->index, $this->now, self::class, $this->exceptionPayload()); + } + } catch (TransitionNotFound) { + if ($workflow->running()) { + $this->release(); + } } + } finally { + $lock->release(); } } @@ -132,11 +145,22 @@ private function createTentativeWorkflowState(): StoredWorkflow 'index' => $this->index, 'now' => $this->now, 'class' => self::class, - 'result' => Serializer::serialize($this->exception), + 'result' => Serializer::serialize($this->exceptionPayload()), ]); $storedWorkflowClass = $this->storedWorkflow::class; return $storedWorkflowClass::query()->findOrFail($this->storedWorkflow->id); } + + private function exceptionPayload() + { + if (! is_array($this->exception) || $this->sourceClass === null) { + return $this->exception; + } + + return array_merge($this->exception, [ + 'sourceClass' => $this->sourceClass, + ]); + } } diff --git a/tests/Unit/ActivityStubTest.php b/tests/Unit/ActivityStubTest.php index af6cee9..6bd0090 100644 --- a/tests/Unit/ActivityStubTest.php +++ b/tests/Unit/ActivityStubTest.php @@ -7,9 +7,11 @@ use Exception; use RuntimeException; use Tests\Fixtures\TestActivity; +use Tests\Fixtures\TestOtherActivity; use Tests\Fixtures\TestWorkflow; use Tests\TestCase; use Workflow\ActivityStub; +use Workflow\Exception as WorkflowException; use Workflow\Models\StoredWorkflow; use Workflow\Serializers\Serializer; use Workflow\States\WorkflowPendingStatus; @@ -123,6 +125,43 @@ public function testLoadsStoredExceptionWithNonStandardConstructor(): void }); } + public function testSkipsStoredExceptionForDifferentSourceClass(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign', + 'code' => 0, + 'sourceClass' => TestOtherActivity::class, + ]), + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 1, + 'now' => WorkflowStub::now(), + 'class' => TestActivity::class, + 'result' => Serializer::serialize('test'), + ]); + + ActivityStub::make(TestActivity::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertSame('test', $result); + $this->assertSame(2, WorkflowStub::getContext()->index); + } + public function testAll(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); diff --git a/tests/Unit/ChildWorkflowStubTest.php b/tests/Unit/ChildWorkflowStubTest.php index f972356..ff7f9e0 100644 --- a/tests/Unit/ChildWorkflowStubTest.php +++ b/tests/Unit/ChildWorkflowStubTest.php @@ -4,11 +4,14 @@ namespace Tests\Unit; +use Exception; use Mockery; use Tests\Fixtures\TestChildWorkflow; +use Tests\Fixtures\TestExceptionWorkflow; use Tests\Fixtures\TestParentWorkflow; use Tests\TestCase; use Workflow\ChildWorkflowStub; +use Workflow\Exception as WorkflowException; use Workflow\Models\StoredWorkflow; use Workflow\Serializers\Serializer; use Workflow\States\WorkflowPendingStatus; @@ -91,6 +94,43 @@ public function testLoadsChildWorkflow(): void $this->assertNull($result); } + public function testSkipsStoredExceptionForDifferentSourceClass(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestParentWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'foreign child', + 'code' => 0, + 'sourceClass' => TestExceptionWorkflow::class, + ]), + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 1, + 'now' => WorkflowStub::now(), + 'class' => TestChildWorkflow::class, + 'result' => Serializer::serialize('test'), + ]); + + ChildWorkflowStub::make(TestChildWorkflow::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertSame('test', $result); + $this->assertSame(2, WorkflowStub::getContext()->index); + } + public function testDoesNotResumeRunningStartedChildWorkflow(): void { $childWorkflow = Mockery::mock(); From 3456f71bdbff3914ad7ca0ba09ea7d753c0d7ab8 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 16:43:30 +0000 Subject: [PATCH 09/15] Add unit coverage for probe replay branches --- tests/Unit/ChildWorkflowStubTest.php | 71 ++++++++++++++++++ tests/Unit/ExceptionTest.php | 102 ++++++++++++++++++++++++++ tests/Unit/Traits/AwaitsTest.php | 30 ++++++++ tests/Unit/Traits/SideEffectsTest.php | 30 ++++++++ tests/Unit/Traits/TimersTest.php | 32 ++++++++ tests/Unit/Traits/VersionsTest.php | 24 ++++++ tests/Unit/WorkflowStubTest.php | 38 ++++++++++ 7 files changed, 327 insertions(+) diff --git a/tests/Unit/ChildWorkflowStubTest.php b/tests/Unit/ChildWorkflowStubTest.php index ff7f9e0..2ab1a9f 100644 --- a/tests/Unit/ChildWorkflowStubTest.php +++ b/tests/Unit/ChildWorkflowStubTest.php @@ -131,6 +131,77 @@ public function testSkipsStoredExceptionForDifferentSourceClass(): void $this->assertSame(2, WorkflowStub::getContext()->index); } + public function testMarksProbeMatchedForMatchingStoredException(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestParentWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => WorkflowStub::now(), + 'class' => WorkflowException::class, + 'result' => Serializer::serialize([ + 'class' => Exception::class, + 'message' => 'matching child failure', + 'code' => 0, + ]), + ]); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + 'probeIndex' => 0, + 'probeClass' => TestChildWorkflow::class, + 'probeMatched' => false, + ]); + + try { + ChildWorkflowStub::make(TestChildWorkflow::class); + $this->fail('Expected child exception to be thrown.'); + } catch (Exception $exception) { + $this->assertSame('matching child failure', $exception->getMessage()); + } + + $this->assertTrue(WorkflowStub::probeMatched()); + } + + public function testReturnsUnresolvedPromiseWhenProbingWithoutStoredChildWorkflow(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestParentWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $result = null; + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + 'probeIndex' => 0, + 'probeClass' => TestChildWorkflow::class, + 'probeMatched' => false, + ]); + + ChildWorkflowStub::make(TestChildWorkflow::class) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertNull($result); + $this->assertSame(1, WorkflowStub::getContext()->index); + } + public function testDoesNotResumeRunningStartedChildWorkflow(): void { $childWorkflow = Mockery::mock(); diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index abc334f..a2fa329 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -5,7 +5,11 @@ namespace Tests\Unit; use Exception as BaseException; +use Illuminate\Contracts\Queue\Job as JobContract; +use Illuminate\Support\Facades\Cache; use InvalidArgumentException; +use Mockery; +use ReflectionMethod; use RuntimeException; use Tests\Fixtures\TestActivity; use Tests\Fixtures\TestProbeBackToBackWorkflow; @@ -54,6 +58,98 @@ public function testExceptionWorkflowRunning(): void $this->assertSame(WorkflowRunningStatus::class, $workflow->status()); } + public function testHandleResumesWorkflowWhenLogAlreadyExists(): void + { + $lock = Mockery::mock(); + $lock->shouldReceive('get') + ->once() + ->andReturn(true); + $lock->shouldReceive('release') + ->once(); + + Cache::shouldReceive('lock') + ->once() + ->with('laravel-workflow-exception:123', 15) + ->andReturn($lock); + + $workflow = Mockery::mock(); + $workflow->shouldReceive('resume') + ->once(); + + $storedWorkflow = Mockery::mock(StoredWorkflow::class) + ->makePartial(); + $storedWorkflow->id = 123; + $storedWorkflow->shouldReceive('effectiveConnection') + ->andReturn(null); + $storedWorkflow->shouldReceive('effectiveQueue') + ->andReturn(null); + $storedWorkflow->shouldReceive('toWorkflow') + ->once() + ->andReturn($workflow); + $storedWorkflow->shouldReceive('hasLogByIndex') + ->once() + ->with(0) + ->andReturn(true); + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, new BaseException('existing log')); + $exception->handle(); + + $this->assertSame(123, $storedWorkflow->id); + + Mockery::close(); + } + + public function testHandleReleasesWhenExceptionLockUnavailable(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + + $lock = Mockery::mock(); + $lock->shouldReceive('get') + ->once() + ->andReturn(false); + + Cache::shouldReceive('lock') + ->once() + ->with('laravel-workflow-exception:' . $storedWorkflow->id, 15) + ->andReturn($lock); + + $job = Mockery::mock(JobContract::class); + $job->shouldReceive('release') + ->once() + ->with(0); + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'locked', + 'code' => 0, + ]); + $exception->setJob($job); + $exception->handle(); + + $this->assertFalse($storedWorkflow->hasLogByIndex(0)); + + Mockery::close(); + } + + public function testProbeReplayShortCircuitsWhenWorkflowClassIsInvalid(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->class = ''; + + $exception = new Exception(0, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => BaseException::class, + 'message' => 'invalid workflow class', + 'code' => 0, + ]); + + $method = new ReflectionMethod(Exception::class, 'shouldPersistAfterProbeReplay'); + $method->setAccessible(true); + + $this->assertTrue($method->invoke($exception)); + } + public function testSkipsWriteWhenProbeDoesNotReachCandidateException(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestProbeParallelChildWorkflow::class)->id()); @@ -116,7 +212,13 @@ public function testPersistsWriteWhenProbeReachesCandidateException(): void ], sourceClass: TestProbeRetryActivity::class); $exception->handle(); + $log = $storedWorkflow->fresh() + ->logs() + ->firstWhere('index', 1); + + $this->assertNotNull($log); $this->assertTrue($storedWorkflow->fresh()->hasLogByIndex(1)); + $this->assertSame(TestProbeRetryActivity::class, Serializer::unserialize($log->result)['sourceClass']); } public function testSkipsWriteWhenProbeReachesDifferentActivityClassAtSameIndex(): void diff --git a/tests/Unit/Traits/AwaitsTest.php b/tests/Unit/Traits/AwaitsTest.php index 12b2a8b..1759143 100644 --- a/tests/Unit/Traits/AwaitsTest.php +++ b/tests/Unit/Traits/AwaitsTest.php @@ -102,6 +102,36 @@ public function testResolvesConflictingResult(): void $this->assertFalse(Serializer::unserialize($workflow->logs()->firstWhere('index', 0)->result)); } + public function testDefersWhenProbing(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $conditionEvaluated = false; + $result = null; + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + ]); + + WorkflowStub::await(static function () use (&$conditionEvaluated): bool { + $conditionEvaluated = true; + + return true; + }) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertFalse($conditionEvaluated); + $this->assertNull($result); + $this->assertSame(0, $workflow->logs()->count()); + $this->assertSame(1, WorkflowStub::getContext()->index); + } + public function testThrowsQueryExceptionWhenNotDuplicateKey(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); diff --git a/tests/Unit/Traits/SideEffectsTest.php b/tests/Unit/Traits/SideEffectsTest.php index 9e12100..dfbe0a3 100644 --- a/tests/Unit/Traits/SideEffectsTest.php +++ b/tests/Unit/Traits/SideEffectsTest.php @@ -88,6 +88,36 @@ public function testResolvesConflictingResult(): void $this->assertSame('test', Serializer::unserialize($workflow->logs()->firstWhere('index', 0)->result)); } + public function testDefersWhenProbing(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $callableEvaluated = false; + $result = null; + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + ]); + + WorkflowStub::sideEffect(static function () use (&$callableEvaluated): string { + $callableEvaluated = true; + + return 'test'; + }) + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertFalse($callableEvaluated); + $this->assertNull($result); + $this->assertSame(0, $workflow->logs()->count()); + $this->assertSame(1, WorkflowStub::getContext()->index); + } + public function testThrowsQueryExceptionWhenNotDuplicateKey(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); diff --git a/tests/Unit/Traits/TimersTest.php b/tests/Unit/Traits/TimersTest.php index 0e5ead9..cce3699 100644 --- a/tests/Unit/Traits/TimersTest.php +++ b/tests/Unit/Traits/TimersTest.php @@ -242,6 +242,38 @@ public function testTimerReturnsUnresolvedPromiseWhenReplayingAndNoTimer(): void ]); } + public function testTimerReturnsUnresolvedPromiseWhenProbingAndNoTimer(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $result = null; + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowPendingStatus::$name, + ]); + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + ]); + + WorkflowStub::timer('1 minute') + ->then(static function ($value) use (&$result) { + $result = $value; + }); + + $this->assertNull($result); + $this->assertSame(1, WorkflowStub::getContext()->index); + $this->assertSame(0, $workflow->logs()->count()); + $this->assertDatabaseMissing('workflow_timers', [ + 'stored_workflow_id' => $workflow->id(), + 'index' => 0, + ]); + } + public function testTimerCapsDelayForSqsDriver(): void { Bus::fake(); diff --git a/tests/Unit/Traits/VersionsTest.php b/tests/Unit/Traits/VersionsTest.php index 1fe909f..d5425b4 100644 --- a/tests/Unit/Traits/VersionsTest.php +++ b/tests/Unit/Traits/VersionsTest.php @@ -191,6 +191,30 @@ public function testResolvesConflictingResultThrowsWhenVersionNotSupported(): vo Mockery::close(); } + public function testReturnsUnresolvedPromiseWhenProbingWithoutStoredVersion(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $result = null; + + WorkflowStub::setContext([ + 'storedWorkflow' => $storedWorkflow, + 'index' => 0, + 'now' => now(), + 'replaying' => true, + 'probing' => true, + ]); + + WorkflowStub::getVersion('test-change', WorkflowStub::DEFAULT_VERSION, 1) + ->then(static function ($value) use (&$result): void { + $result = $value; + }); + + $this->assertNull($result); + $this->assertSame(1, WorkflowStub::getContext()->index); + $this->assertSame(0, $workflow->logs()->count()); + } + public function testThrowsQueryExceptionWhenNotDuplicateKey(): void { $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); diff --git a/tests/Unit/WorkflowStubTest.php b/tests/Unit/WorkflowStubTest.php index cf6b7e6..4188ddf 100644 --- a/tests/Unit/WorkflowStubTest.php +++ b/tests/Unit/WorkflowStubTest.php @@ -8,6 +8,8 @@ use Illuminate\Support\Carbon; use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\Queue; +use ReflectionProperty; +use stdClass; use Tests\Fixtures\TestAwaitWorkflow; use Tests\Fixtures\TestBadConnectionWorkflow; use Tests\Fixtures\TestChatBotWorkflow; @@ -231,6 +233,42 @@ public function testConnection(): void $this->assertSame('default', WorkflowStub::queue()); } + public function testProbeHelpers(): void + { + $contextProperty = new ReflectionProperty(WorkflowStub::class, 'context'); + $contextProperty->setAccessible(true); + $previousContext = $contextProperty->getValue(); + $contextProperty->setValue(null); + + try { + $this->assertInstanceOf(stdClass::class, WorkflowStub::getContext()); + $this->assertFalse(WorkflowStub::isProbing()); + $this->assertNull(WorkflowStub::probeIndex()); + $this->assertNull(WorkflowStub::probeClass()); + $this->assertFalse(WorkflowStub::probeMatched()); + + WorkflowStub::markProbeMatched(); + + $this->assertFalse(WorkflowStub::probeMatched()); + + WorkflowStub::setContext([ + 'probing' => true, + 'probeIndex' => 7, + 'probeClass' => TestWorkflow::class, + 'probeMatched' => false, + ]); + + WorkflowStub::markProbeMatched(); + + $this->assertTrue(WorkflowStub::isProbing()); + $this->assertSame(7, WorkflowStub::probeIndex()); + $this->assertSame(TestWorkflow::class, WorkflowStub::probeClass()); + $this->assertTrue(WorkflowStub::probeMatched()); + } finally { + $contextProperty->setValue($previousContext); + } + } + public function testHandlesDuplicateLogInsertionProperly(): void { Queue::fake(); From d0ca14dc268d35fc744fc78a51a15c457f93f8c0 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 18:10:35 +0000 Subject: [PATCH 10/15] Stream feature worker output in CI --- .github/workflows/php.yml | 2 ++ tests/TestCase.php | 18 ++++++++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/.github/workflows/php.yml b/.github/workflows/php.yml index 2934dfb..44d66c7 100644 --- a/.github/workflows/php.yml +++ b/.github/workflows/php.yml @@ -82,6 +82,7 @@ jobs: DB_CONNECTION: mysql DB_PORT: ${{ job.services.mysql.ports[3306] }} QUEUE_CONNECTION: redis + WORKFLOW_TEST_STREAM_WORKER_OUTPUT: 1 - name: Run test suite (PostgreSQL) run: vendor/bin/phpunit --testdox --debug --testsuite feature @@ -89,6 +90,7 @@ jobs: DB_CONNECTION: pgsql DB_PORT: ${{ job.services.postgres.ports[5432] }} QUEUE_CONNECTION: redis + WORKFLOW_TEST_STREAM_WORKER_OUTPUT: 1 - name: Upload laravel.log if tests fail if: failure() diff --git a/tests/TestCase.php b/tests/TestCase.php index de3b4bf..46e3167 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -43,8 +43,15 @@ public static function setUpBeforeClass(): void for ($i = 0; $i < self::NUMBER_OF_WORKERS; $i++) { self::$workers[$i] = new Process(['php', __DIR__ . '/../vendor/bin/testbench', 'queue:work']); - self::$workers[$i]->disableOutput(); - self::$workers[$i]->start(); + if (! self::shouldStreamWorkerOutput()) { + self::$workers[$i]->disableOutput(); + self::$workers[$i]->start(); + continue; + } + + self::$workers[$i]->start(static function (string $type, string $output) use ($i): void { + fwrite(STDERR, '[worker-' . $i . '][' . $type . '] ' . $output); + }); } } @@ -94,4 +101,11 @@ protected function getPackageProviders($app) { return [\Workflow\Providers\WorkflowServiceProvider::class]; } + + private static function shouldStreamWorkerOutput(): bool + { + $value = getenv('WORKFLOW_TEST_STREAM_WORKER_OUTPUT') ?: ($_ENV['WORKFLOW_TEST_STREAM_WORKER_OUTPUT'] ?? null); + + return in_array($value, ['1', 'true', 'yes', 'on'], true); + } } From 39e68393f24ad5505a334b9ae62ea3d4a7d05fa5 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 18:29:34 +0000 Subject: [PATCH 11/15] Capture CI worker output to files --- .github/workflows/php.yml | 8 +++++--- tests/TestCase.php | 19 +++++++++++++++---- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/.github/workflows/php.yml b/.github/workflows/php.yml index 44d66c7..780f6a1 100644 --- a/.github/workflows/php.yml +++ b/.github/workflows/php.yml @@ -82,7 +82,7 @@ jobs: DB_CONNECTION: mysql DB_PORT: ${{ job.services.mysql.ports[3306] }} QUEUE_CONNECTION: redis - WORKFLOW_TEST_STREAM_WORKER_OUTPUT: 1 + WORKFLOW_TEST_CAPTURE_WORKER_OUTPUT: 1 - name: Run test suite (PostgreSQL) run: vendor/bin/phpunit --testdox --debug --testsuite feature @@ -90,14 +90,16 @@ jobs: DB_CONNECTION: pgsql DB_PORT: ${{ job.services.postgres.ports[5432] }} QUEUE_CONNECTION: redis - WORKFLOW_TEST_STREAM_WORKER_OUTPUT: 1 + WORKFLOW_TEST_CAPTURE_WORKER_OUTPUT: 1 - name: Upload laravel.log if tests fail if: failure() uses: actions/upload-artifact@v4 with: name: laravel-log - path: vendor/orchestra/testbench-core/laravel/storage/logs/laravel.log + path: | + vendor/orchestra/testbench-core/laravel/storage/logs/laravel.log + vendor/orchestra/testbench-core/laravel/storage/logs/workflow-test-worker-*.log - name: Code Coverage run: | diff --git a/tests/TestCase.php b/tests/TestCase.php index 46e3167..9393ce3 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -43,14 +43,16 @@ public static function setUpBeforeClass(): void for ($i = 0; $i < self::NUMBER_OF_WORKERS; $i++) { self::$workers[$i] = new Process(['php', __DIR__ . '/../vendor/bin/testbench', 'queue:work']); - if (! self::shouldStreamWorkerOutput()) { + if (! self::shouldCaptureWorkerOutput()) { self::$workers[$i]->disableOutput(); self::$workers[$i]->start(); continue; } + file_put_contents(self::workerLogPath($i), ''); + self::$workers[$i]->start(static function (string $type, string $output) use ($i): void { - fwrite(STDERR, '[worker-' . $i . '][' . $type . '] ' . $output); + file_put_contents(self::workerLogPath($i), $output, FILE_APPEND | LOCK_EX); }); } } @@ -102,10 +104,19 @@ protected function getPackageProviders($app) return [\Workflow\Providers\WorkflowServiceProvider::class]; } - private static function shouldStreamWorkerOutput(): bool + private static function shouldCaptureWorkerOutput(): bool { - $value = getenv('WORKFLOW_TEST_STREAM_WORKER_OUTPUT') ?: ($_ENV['WORKFLOW_TEST_STREAM_WORKER_OUTPUT'] ?? null); + $value = getenv( + 'WORKFLOW_TEST_CAPTURE_WORKER_OUTPUT' + ) ?: ($_ENV['WORKFLOW_TEST_CAPTURE_WORKER_OUTPUT'] ?? null); return in_array($value, ['1', 'true', 'yes', 'on'], true); } + + private static function workerLogPath(int $worker): string + { + return dirname( + __DIR__ + ) . '/vendor/orchestra/testbench-core/laravel/storage/logs/workflow-test-worker-' . $worker . '.log'; + } } From 0ad07ad4c0c0dc0600cda3c3e910b4f657288797 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 18:40:49 +0000 Subject: [PATCH 12/15] Revert CI worker capture instrumentation --- .github/workflows/php.yml | 6 +----- tests/TestCase.php | 29 ++--------------------------- 2 files changed, 3 insertions(+), 32 deletions(-) diff --git a/.github/workflows/php.yml b/.github/workflows/php.yml index 780f6a1..2934dfb 100644 --- a/.github/workflows/php.yml +++ b/.github/workflows/php.yml @@ -82,7 +82,6 @@ jobs: DB_CONNECTION: mysql DB_PORT: ${{ job.services.mysql.ports[3306] }} QUEUE_CONNECTION: redis - WORKFLOW_TEST_CAPTURE_WORKER_OUTPUT: 1 - name: Run test suite (PostgreSQL) run: vendor/bin/phpunit --testdox --debug --testsuite feature @@ -90,16 +89,13 @@ jobs: DB_CONNECTION: pgsql DB_PORT: ${{ job.services.postgres.ports[5432] }} QUEUE_CONNECTION: redis - WORKFLOW_TEST_CAPTURE_WORKER_OUTPUT: 1 - name: Upload laravel.log if tests fail if: failure() uses: actions/upload-artifact@v4 with: name: laravel-log - path: | - vendor/orchestra/testbench-core/laravel/storage/logs/laravel.log - vendor/orchestra/testbench-core/laravel/storage/logs/workflow-test-worker-*.log + path: vendor/orchestra/testbench-core/laravel/storage/logs/laravel.log - name: Code Coverage run: | diff --git a/tests/TestCase.php b/tests/TestCase.php index 9393ce3..de3b4bf 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -43,17 +43,8 @@ public static function setUpBeforeClass(): void for ($i = 0; $i < self::NUMBER_OF_WORKERS; $i++) { self::$workers[$i] = new Process(['php', __DIR__ . '/../vendor/bin/testbench', 'queue:work']); - if (! self::shouldCaptureWorkerOutput()) { - self::$workers[$i]->disableOutput(); - self::$workers[$i]->start(); - continue; - } - - file_put_contents(self::workerLogPath($i), ''); - - self::$workers[$i]->start(static function (string $type, string $output) use ($i): void { - file_put_contents(self::workerLogPath($i), $output, FILE_APPEND | LOCK_EX); - }); + self::$workers[$i]->disableOutput(); + self::$workers[$i]->start(); } } @@ -103,20 +94,4 @@ protected function getPackageProviders($app) { return [\Workflow\Providers\WorkflowServiceProvider::class]; } - - private static function shouldCaptureWorkerOutput(): bool - { - $value = getenv( - 'WORKFLOW_TEST_CAPTURE_WORKER_OUTPUT' - ) ?: ($_ENV['WORKFLOW_TEST_CAPTURE_WORKER_OUTPUT'] ?? null); - - return in_array($value, ['1', 'true', 'yes', 'on'], true); - } - - private static function workerLogPath(int $worker): string - { - return dirname( - __DIR__ - ) . '/vendor/orchestra/testbench-core/laravel/storage/logs/workflow-test-worker-' . $worker . '.log'; - } } From ee0b47ac1a5bb711f657651dc25736b3f7acedd8 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 19:06:02 +0000 Subject: [PATCH 13/15] Use synthetic logs for probe replay --- src/Exception.php | 40 +++++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/src/Exception.php b/src/Exception.php index 2c6ec0d..d7ca59a 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -15,6 +15,7 @@ use Workflow\Exceptions\TransitionNotFound; use Workflow\Middleware\WithoutOverlappingMiddleware; use Workflow\Models\StoredWorkflow; +use Workflow\Models\StoredWorkflowLog; use Workflow\Serializers\Serializer; final class Exception implements ShouldBeEncrypted, ShouldQueue @@ -100,11 +101,8 @@ private function shouldPersistAfterProbeReplay(): bool } $previousContext = WorkflowStub::getContext(); - $connection = $this->storedWorkflow->getConnection(); $shouldPersist = false; - $connection->beginTransaction(); - try { $tentativeWorkflow = $this->createTentativeWorkflowState(); $workflow = new $workflowClass($tentativeWorkflow, ...$tentativeWorkflow->workflowArguments()); @@ -130,10 +128,6 @@ private function shouldPersistAfterProbeReplay(): bool $shouldPersist = WorkflowStub::probeMatched(); } finally { WorkflowStub::setContext($previousContext); - - if ($connection->transactionLevel() > 0) { - $connection->rollBack(); - } } return $shouldPersist; @@ -141,16 +135,32 @@ private function shouldPersistAfterProbeReplay(): bool private function createTentativeWorkflowState(): StoredWorkflow { - $this->storedWorkflow->createLog([ - 'index' => $this->index, - 'now' => $this->now, - 'class' => self::class, - 'result' => Serializer::serialize($this->exceptionPayload()), - ]); - $storedWorkflowClass = $this->storedWorkflow::class; - return $storedWorkflowClass::query()->findOrFail($this->storedWorkflow->id); + /** @var StoredWorkflow $tentativeWorkflow */ + $tentativeWorkflow = $storedWorkflowClass::query() + ->findOrFail($this->storedWorkflow->id); + + $tentativeWorkflow->loadMissing(['logs', 'signals']); + + /** @var StoredWorkflowLog $tentativeLog */ + $tentativeLog = $tentativeWorkflow->logs() + ->make([ + 'index' => $this->index, + 'now' => $this->now, + 'class' => self::class, + 'result' => Serializer::serialize($this->exceptionPayload()), + ]); + + $tentativeWorkflow->setRelation( + 'logs', + $tentativeWorkflow->getRelation('logs') + ->push($tentativeLog) + ->sortBy(static fn ($log): string => sprintf('%020d:%020d', $log->index, $log->id ?? PHP_INT_MAX)) + ->values() + ); + + return $tentativeWorkflow; } private function exceptionPayload() From ef06874de4f09ad0c9c22a307f46e42e5837354c Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 21:47:36 +0000 Subject: [PATCH 14/15] Flush Redis after feature workers stop --- tests/TestCase.php | 43 +++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/tests/TestCase.php b/tests/TestCase.php index de3b4bf..37f2819 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -29,17 +29,7 @@ public static function setUpBeforeClass(): void } } - $redisHost = getenv('REDIS_HOST') ?: ($_ENV['REDIS_HOST'] ?? null); - $redisPort = getenv('REDIS_PORT') ?: ($_ENV['REDIS_PORT'] ?? 6379); - if ($redisHost && class_exists(\Redis::class)) { - try { - $redis = new \Redis(); - $redis->connect($redisHost, (int) $redisPort); - $redis->flushDB(); - } catch (\Throwable $e) { - // Ignore if no redis - } - } + self::flushRedis(); for ($i = 0; $i < self::NUMBER_OF_WORKERS; $i++) { self::$workers[$i] = new Process(['php', __DIR__ . '/../vendor/bin/testbench', 'queue:work']); @@ -53,6 +43,10 @@ public static function tearDownAfterClass(): void foreach (self::$workers as $worker) { $worker->stop(); } + + self::$workers = []; + + self::flushRedis(); } protected function setUp(): void @@ -67,17 +61,7 @@ protected function setUp(): void Cache::flush(); - $redisHost = getenv('REDIS_HOST') ?: ($_ENV['REDIS_HOST'] ?? null); - $redisPort = getenv('REDIS_PORT') ?: ($_ENV['REDIS_PORT'] ?? 6379); - if ($redisHost && class_exists(\Redis::class)) { - try { - $redis = new \Redis(); - $redis->connect($redisHost, (int) $redisPort); - $redis->flushDB(); - } catch (\Throwable $e) { - // Ignore if no redis - } - } + self::flushRedis(); } protected function defineDatabaseMigrations() @@ -94,4 +78,19 @@ protected function getPackageProviders($app) { return [\Workflow\Providers\WorkflowServiceProvider::class]; } + + private static function flushRedis(): void + { + $redisHost = getenv('REDIS_HOST') ?: ($_ENV['REDIS_HOST'] ?? null); + $redisPort = getenv('REDIS_PORT') ?: ($_ENV['REDIS_PORT'] ?? 6379); + if ($redisHost && class_exists(\Redis::class)) { + try { + $redis = new \Redis(); + $redis->connect($redisHost, (int) $redisPort); + $redis->flushDB(); + } catch (\Throwable $e) { + // Ignore if no redis + } + } + } } From 1bea82fa55554bf05821758f4811aa30dacacd27 Mon Sep 17 00:00:00 2001 From: Durable Workflow Date: Fri, 3 Apr 2026 22:11:58 +0000 Subject: [PATCH 15/15] Remove overlap middleware from exception jobs --- src/Exception.php | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/Exception.php b/src/Exception.php index d7ca59a..fca28d0 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -13,7 +13,6 @@ use Illuminate\Support\Facades\Cache; use Throwable; use Workflow\Exceptions\TransitionNotFound; -use Workflow\Middleware\WithoutOverlappingMiddleware; use Workflow\Models\StoredWorkflow; use Workflow\Models\StoredWorkflowLog; use Workflow\Serializers\Serializer; @@ -82,14 +81,7 @@ public function handle() public function middleware() { - return [ - new WithoutOverlappingMiddleware( - $this->storedWorkflow->id, - WithoutOverlappingMiddleware::ACTIVITY, - 0, - 15 - ), - ]; + return []; } private function shouldPersistAfterProbeReplay(): bool