diff --git a/src/Activity.php b/src/Activity.php index a26508a..44e116b 100644 --- a/src/Activity.php +++ b/src/Activity.php @@ -170,7 +170,8 @@ public function failed(Throwable $throwable): void $this->storedWorkflow, $throwable, $workflow->connection(), - $workflow->queue() + $workflow->queue(), + $this::class ); } diff --git a/src/ActivityStub.php b/src/ActivityStub.php index ed8f2ca..a692ed9 100644 --- a/src/ActivityStub.php +++ b/src/ActivityStub.php @@ -74,6 +74,14 @@ public static function make($activity, ...$arguments): PromiseInterface return resolve($result); } + if ($context->replaying && WorkflowStub::hasReplayProbe()) { + WorkflowStub::markReplayProbe($context->index, $activity); + ++$context->index; + WorkflowStub::setContext($context); + $deferred = new Deferred(); + return $deferred->promise(); + } + $activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments); ++$context->index; diff --git a/src/ChildWorkflowStub.php b/src/ChildWorkflowStub.php index 896237a..419c345 100644 --- a/src/ChildWorkflowStub.php +++ b/src/ChildWorkflowStub.php @@ -67,6 +67,14 @@ public static function make($workflow, ...$arguments): PromiseInterface return resolve($result); } + if ($context->replaying && WorkflowStub::hasReplayProbe()) { + WorkflowStub::markReplayProbe($context->index, $workflow); + ++$context->index; + WorkflowStub::setContext($context); + $deferred = new Deferred(); + return $deferred->promise(); + } + if (! $context->replaying) { $storedChildWorkflow = $context->storedWorkflow->children() ->wherePivot('parent_index', $context->index) diff --git a/src/Exception.php b/src/Exception.php index de177e5..3c44d07 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -35,7 +35,8 @@ public function __construct( public StoredWorkflow $storedWorkflow, public $exception, $connection = null, - $queue = null + $queue = null, + public ?string $sourceClass = null ) { $connection = $connection ?? $this->storedWorkflow->effectiveConnection() ?? config('queue.default'); $queue = $queue ?? $this->storedWorkflow->effectiveQueue() ?? config( @@ -53,7 +54,7 @@ public function handle() try { if ($this->storedWorkflow->hasLogByIndex($this->index)) { $workflow->resume(); - } elseif (! $this->storedWorkflow->logs()->where('class', self::class)->exists()) { + } elseif ($this->isCurrentReplayFrontier()) { $workflow->next($this->index, $this->now, self::class, $this->exception); } } catch (TransitionNotFound) { @@ -74,4 +75,35 @@ public function middleware() ), ]; } + + private function isCurrentReplayFrontier(): bool + { + $workflowClass = $this->storedWorkflow->class; + + if (! is_string($workflowClass) || $workflowClass === '') { + return true; + } + + $workflow = new $workflowClass($this->storedWorkflow, ...$this->storedWorkflow->workflowArguments()); + $workflow->replaying = true; + + $previousContext = WorkflowStub::getContext(); + WorkflowStub::startReplayProbe($this->index, $this->sourceClass); + + WorkflowStub::setContext([ + 'storedWorkflow' => $this->storedWorkflow, + 'index' => 0, + 'now' => $this->now, + 'replaying' => true, + ]); + + try { + $workflow->handle(); + + return WorkflowStub::replayProbeMatched(); + } finally { + WorkflowStub::clearReplayProbe(); + WorkflowStub::setContext($previousContext); + } + } } diff --git a/src/WorkflowStub.php b/src/WorkflowStub.php index 5aabead..caf0731 100644 --- a/src/WorkflowStub.php +++ b/src/WorkflowStub.php @@ -41,6 +41,8 @@ final class WorkflowStub private static ?\stdClass $context = null; + private static ?array $replayProbe = null; + private static array $signalMethodCache = []; private static array $queryMethodCache = []; @@ -165,6 +167,45 @@ public static function setContext($context): void self::$context = (object) $context; } + public static function hasReplayProbe(): bool + { + return self::$replayProbe !== null; + } + + public static function startReplayProbe(int $index, ?string $class = null): void + { + self::$replayProbe = [ + 'index' => $index, + 'class' => $class, + 'matched' => false, + 'seen' => false, + ]; + } + + public static function markReplayProbe(int $index, ?string $class = null): void + { + if (self::$replayProbe === null || self::$replayProbe['seen'] === true) { + return; + } + + self::$replayProbe['seen'] = true; + self::$replayProbe['matched'] = self::$replayProbe['index'] === $index + && ( + self::$replayProbe['class'] === null + || self::$replayProbe['class'] === $class + ); + } + + public static function replayProbeMatched(): bool + { + return self::$replayProbe['matched'] ?? false; + } + + public static function clearReplayProbe(): void + { + self::$replayProbe = null; + } + public static function now() { return self::getContext()->now; @@ -287,7 +328,7 @@ public function fail($exception): void ->format('Y-m-d\TH:i:s.u\Z')); $this->storedWorkflow->parents() - ->each(static function ($parentWorkflow) use ($exception) { + ->each(function ($parentWorkflow) use ($exception) { if ( $parentWorkflow->pivot->parent_index === StoredWorkflow::CONTINUE_PARENT_INDEX || $parentWorkflow->pivot->parent_index === StoredWorkflow::ACTIVE_WORKFLOW_INDEX @@ -324,7 +365,8 @@ public function fail($exception): void $parentWorkflow, $throwable, $parentWf->connection(), - $parentWf->queue() + $parentWf->queue(), + $this->storedWorkflow->class ); }); } diff --git a/tests/Feature/SagaChildWorkflowTest.php b/tests/Feature/SagaChildWorkflowTest.php index b99255c..84ac540 100644 --- a/tests/Feature/SagaChildWorkflowTest.php +++ b/tests/Feature/SagaChildWorkflowTest.php @@ -4,9 +4,14 @@ namespace Tests\Feature; +use Tests\Fixtures\TestActivity; +use Tests\Fixtures\TestChildExceptionThrowingWorkflow; use Tests\Fixtures\TestSagaChildWorkflow; use Tests\Fixtures\TestSagaSingleChildWorkflow; +use Tests\Fixtures\TestUndoActivity; use Tests\TestCase; +use Workflow\Exception; +use Workflow\Models\StoredWorkflow; use Workflow\States\WorkflowCompletedStatus; use Workflow\WorkflowStub; @@ -34,5 +39,25 @@ public function testParallelChildExceptionsTriggersCompensation(): void $this->assertSame(WorkflowCompletedStatus::class, $workflow->status()); $this->assertSame('compensated', $workflow->output()); + + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + + $this->assertEqualsCanonicalizing([ + TestActivity::class, + TestUndoActivity::class, + Exception::class, + ], $storedWorkflow->logs() + ->pluck('class') + ->toArray()); + + $childLogs = $storedWorkflow->children() + ->with('logs') + ->get() + ->flatMap(static fn (StoredWorkflow $childWorkflow) => $childWorkflow->logs->pluck('class')) + ->values() + ->toArray(); + + $this->assertNotContains(TestUndoActivity::class, $childLogs); + $this->assertContains(TestChildExceptionThrowingWorkflow::class, $childLogs); } } diff --git a/tests/Fixtures/TestConsecutiveCaughtExceptionWorkflow.php b/tests/Fixtures/TestConsecutiveCaughtExceptionWorkflow.php new file mode 100644 index 0000000..8332876 --- /dev/null +++ b/tests/Fixtures/TestConsecutiveCaughtExceptionWorkflow.php @@ -0,0 +1,27 @@ +shouldContinue = true; + } + + public function execute() + { + try { + yield activity(TestSingleTryExceptionActivity::class, true); + } catch (Throwable) { + yield await(fn (): bool => $this->shouldContinue); + yield activity(TestSingleTryExceptionActivity::class, true); + } + + return 'handled'; + } +} diff --git a/tests/Unit/ExceptionTest.php b/tests/Unit/ExceptionTest.php index 1f6cf3c..0ec8b6e 100644 --- a/tests/Unit/ExceptionTest.php +++ b/tests/Unit/ExceptionTest.php @@ -4,12 +4,19 @@ namespace Tests\Unit; +use Tests\Fixtures\TestActivity; +use Tests\Fixtures\TestChildExceptionThrowingWorkflow; +use Tests\Fixtures\TestConsecutiveCaughtExceptionWorkflow; +use Tests\Fixtures\TestSagaChildWorkflow; +use Tests\Fixtures\TestSignalAdvancedExceptionWorkflow; +use Tests\Fixtures\TestSingleTryExceptionActivity; use Tests\Fixtures\TestWorkflow; use Tests\TestCase; use Workflow\Exception; use Workflow\Middleware\WithoutOverlappingMiddleware; use Workflow\Models\StoredWorkflow; use Workflow\Serializers\Serializer; +use Workflow\Signal; use Workflow\States\WorkflowRunningStatus; use Workflow\WorkflowStub; @@ -46,7 +53,7 @@ public function testExceptionWorkflowRunning(): void public function testSkipsWriteWhenSiblingExceptionLogExists(): void { - $workflow = WorkflowStub::load(WorkflowStub::make(TestWorkflow::class)->id()); + $workflow = WorkflowStub::load(WorkflowStub::make(TestSagaChildWorkflow::class)->id()); $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); $storedWorkflow->update([ 'arguments' => Serializer::serialize([]), @@ -56,24 +63,110 @@ public function testSkipsWriteWhenSiblingExceptionLogExists(): void $storedWorkflow->logs() ->create([ 'index' => 0, + 'now' => now() + ->toDateTimeString(), + 'class' => TestActivity::class, + 'result' => Serializer::serialize('activity'), + ]); + + $storedWorkflow->logs() + ->create([ + 'index' => 1, 'now' => now() ->toDateTimeString(), 'class' => Exception::class, 'result' => Serializer::serialize([ 'class' => \Exception::class, - 'message' => 'first child failed', + 'message' => 'first parallel child failed', 'code' => 0, ]), ]); - $exception = new Exception(1, now()->toDateTimeString(), $storedWorkflow, [ + $exception = new Exception(2, now()->toDateTimeString(), $storedWorkflow, [ 'class' => \Exception::class, 'message' => 'second child failed', 'code' => 0, + ], sourceClass: TestChildExceptionThrowingWorkflow::class); + $exception->handle(); + + $this->assertFalse($storedWorkflow->hasLogByIndex(2)); + $this->assertSame(2, $storedWorkflow->logs()->count()); + } + + public function testWritesConsecutiveCaughtExceptionWithoutIntermediateLog(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestConsecutiveCaughtExceptionWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowRunningStatus::$name, + ]); + + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => now() + ->toDateTimeString(), + 'class' => Exception::class, + 'result' => Serializer::serialize([ + 'class' => \RuntimeException::class, + 'message' => 'first failure', + 'code' => 0, + ]), + ]); + + $exception = new Exception(1, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => \InvalidArgumentException::class, + 'message' => 'second failure', + 'code' => 0, + ], sourceClass: TestSingleTryExceptionActivity::class); + $exception->handle(); + + $this->assertTrue($storedWorkflow->hasLogByIndex(1)); + $this->assertSame(2, $storedWorkflow->logs()->count()); + $this->assertSame(Exception::class, $storedWorkflow->findLogByIndex(1)?->class); + } + + public function testWritesLaterExceptionAfterWorkflowAdvances(): void + { + $workflow = WorkflowStub::load(WorkflowStub::make(TestSignalAdvancedExceptionWorkflow::class)->id()); + $storedWorkflow = StoredWorkflow::findOrFail($workflow->id()); + $storedWorkflow->update([ + 'arguments' => Serializer::serialize([]), + 'status' => WorkflowRunningStatus::$name, ]); + + $storedWorkflow->logs() + ->create([ + 'index' => 0, + 'now' => now() + ->toDateTimeString(), + 'class' => Exception::class, + 'result' => Serializer::serialize([ + 'class' => \RuntimeException::class, + 'message' => 'first failure', + 'code' => 0, + ]), + ]); + + $storedWorkflow->logs() + ->create([ + 'index' => 1, + 'now' => now() + ->toDateTimeString(), + 'class' => Signal::class, + 'result' => Serializer::serialize(null), + ]); + + $exception = new Exception(2, now()->toDateTimeString(), $storedWorkflow, [ + 'class' => \InvalidArgumentException::class, + 'message' => 'second failure', + 'code' => 0, + ], sourceClass: TestSingleTryExceptionActivity::class); $exception->handle(); - $this->assertFalse($storedWorkflow->hasLogByIndex(1)); - $this->assertSame(1, $storedWorkflow->logs()->count()); + $this->assertTrue($storedWorkflow->hasLogByIndex(2)); + $this->assertSame(3, $storedWorkflow->logs()->count()); + $this->assertSame(Exception::class, $storedWorkflow->findLogByIndex(2)?->class); } }