Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Activity.php
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ public function failed(Throwable $throwable): void
$this->storedWorkflow,
$throwable,
$workflow->connection(),
$workflow->queue()
$workflow->queue(),
$this::class
);
}

Expand Down
8 changes: 8 additions & 0 deletions src/ActivityStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ public static function make($activity, ...$arguments): PromiseInterface
return resolve($result);
}

if ($context->replaying && WorkflowStub::hasReplayProbe()) {
WorkflowStub::markReplayProbe($context->index, $activity);
++$context->index;
WorkflowStub::setContext($context);
$deferred = new Deferred();
return $deferred->promise();
}

$activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments);

++$context->index;
Expand Down
8 changes: 8 additions & 0 deletions src/ChildWorkflowStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ public static function make($workflow, ...$arguments): PromiseInterface
return resolve($result);
}

if ($context->replaying && WorkflowStub::hasReplayProbe()) {
WorkflowStub::markReplayProbe($context->index, $workflow);
++$context->index;
WorkflowStub::setContext($context);
$deferred = new Deferred();
return $deferred->promise();
}

if (! $context->replaying) {
$storedChildWorkflow = $context->storedWorkflow->children()
->wherePivot('parent_index', $context->index)
Expand Down
36 changes: 34 additions & 2 deletions src/Exception.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public function __construct(
public StoredWorkflow $storedWorkflow,
public $exception,
$connection = null,
$queue = null
$queue = null,
public ?string $sourceClass = null
) {
$connection = $connection ?? $this->storedWorkflow->effectiveConnection() ?? config('queue.default');
$queue = $queue ?? $this->storedWorkflow->effectiveQueue() ?? config(
Expand All @@ -53,7 +54,7 @@ public function handle()
try {
if ($this->storedWorkflow->hasLogByIndex($this->index)) {
$workflow->resume();
} elseif (! $this->storedWorkflow->logs()->where('class', self::class)->exists()) {
} elseif ($this->isCurrentReplayFrontier()) {
$workflow->next($this->index, $this->now, self::class, $this->exception);
}
} catch (TransitionNotFound) {
Expand All @@ -74,4 +75,35 @@ public function middleware()
),
];
}

private function isCurrentReplayFrontier(): bool
{
$workflowClass = $this->storedWorkflow->class;

if (! is_string($workflowClass) || $workflowClass === '') {
return true;
}

$workflow = new $workflowClass($this->storedWorkflow, ...$this->storedWorkflow->workflowArguments());
$workflow->replaying = true;

$previousContext = WorkflowStub::getContext();
WorkflowStub::startReplayProbe($this->index, $this->sourceClass);

WorkflowStub::setContext([
'storedWorkflow' => $this->storedWorkflow,
'index' => 0,
'now' => $this->now,
'replaying' => true,
]);

try {
$workflow->handle();

return WorkflowStub::replayProbeMatched();
} finally {
WorkflowStub::clearReplayProbe();
WorkflowStub::setContext($previousContext);
}
}
}
46 changes: 44 additions & 2 deletions src/WorkflowStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ final class WorkflowStub

private static ?\stdClass $context = null;

private static ?array $replayProbe = null;

private static array $signalMethodCache = [];

private static array $queryMethodCache = [];
Expand Down Expand Up @@ -165,6 +167,45 @@ public static function setContext($context): void
self::$context = (object) $context;
}

public static function hasReplayProbe(): bool
{
return self::$replayProbe !== null;
}

public static function startReplayProbe(int $index, ?string $class = null): void
{
self::$replayProbe = [
'index' => $index,
'class' => $class,
'matched' => false,
'seen' => false,
];
}

public static function markReplayProbe(int $index, ?string $class = null): void
{
if (self::$replayProbe === null || self::$replayProbe['seen'] === true) {
return;
}

self::$replayProbe['seen'] = true;
self::$replayProbe['matched'] = self::$replayProbe['index'] === $index
&& (
self::$replayProbe['class'] === null
|| self::$replayProbe['class'] === $class
);
}

public static function replayProbeMatched(): bool
{
return self::$replayProbe['matched'] ?? false;
}

public static function clearReplayProbe(): void
{
self::$replayProbe = null;
}

public static function now()
{
return self::getContext()->now;
Expand Down Expand Up @@ -287,7 +328,7 @@ public function fail($exception): void
->format('Y-m-d\TH:i:s.u\Z'));

$this->storedWorkflow->parents()
->each(static function ($parentWorkflow) use ($exception) {
->each(function ($parentWorkflow) use ($exception) {
if (
$parentWorkflow->pivot->parent_index === StoredWorkflow::CONTINUE_PARENT_INDEX
|| $parentWorkflow->pivot->parent_index === StoredWorkflow::ACTIVE_WORKFLOW_INDEX
Expand Down Expand Up @@ -324,7 +365,8 @@ public function fail($exception): void
$parentWorkflow,
$throwable,
$parentWf->connection(),
$parentWf->queue()
$parentWf->queue(),
$this->storedWorkflow->class
);
});
}
Expand Down
25 changes: 25 additions & 0 deletions tests/Feature/SagaChildWorkflowTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@

namespace Tests\Feature;

use Tests\Fixtures\TestActivity;
use Tests\Fixtures\TestChildExceptionThrowingWorkflow;
use Tests\Fixtures\TestSagaChildWorkflow;
use Tests\Fixtures\TestSagaSingleChildWorkflow;
use Tests\Fixtures\TestUndoActivity;
use Tests\TestCase;
use Workflow\Exception;
use Workflow\Models\StoredWorkflow;
use Workflow\States\WorkflowCompletedStatus;
use Workflow\WorkflowStub;

Expand Down Expand Up @@ -34,5 +39,25 @@ public function testParallelChildExceptionsTriggersCompensation(): void

$this->assertSame(WorkflowCompletedStatus::class, $workflow->status());
$this->assertSame('compensated', $workflow->output());

$storedWorkflow = StoredWorkflow::findOrFail($workflow->id());

$this->assertEqualsCanonicalizing([
TestActivity::class,
TestUndoActivity::class,
Exception::class,
], $storedWorkflow->logs()
->pluck('class')
->toArray());

$childLogs = $storedWorkflow->children()
->with('logs')
->get()
->flatMap(static fn (StoredWorkflow $childWorkflow) => $childWorkflow->logs->pluck('class'))
->values()
->toArray();

$this->assertNotContains(TestUndoActivity::class, $childLogs);
$this->assertContains(TestChildExceptionThrowingWorkflow::class, $childLogs);
}
}
27 changes: 27 additions & 0 deletions tests/Fixtures/TestConsecutiveCaughtExceptionWorkflow.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

namespace Tests\Fixtures;

use Throwable;
use function Workflow\activity;
use Workflow\Workflow;

final class TestConsecutiveCaughtExceptionWorkflow extends Workflow
{
public function execute()
{
try {
yield activity(TestSingleTryExceptionActivity::class, true);
} catch (Throwable) {
try {
yield activity(TestSingleTryExceptionActivity::class, true);
} catch (Throwable) {
return 'handled';
}
}

return 'unhandled';
}
}
27 changes: 27 additions & 0 deletions tests/Fixtures/TestParallelCaughtExceptionWorkflow.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

namespace Tests\Fixtures;

use Throwable;
use function Workflow\activity;
use function Workflow\all;
use Workflow\Workflow;

final class TestParallelCaughtExceptionWorkflow extends Workflow
{
public function execute()
{
try {
yield all([
activity(TestSingleTryExceptionActivity::class, true),
activity(TestSingleTryExceptionActivity::class, true),
]);
} catch (Throwable) {
return 'handled';
}

return 'unhandled';
}
}
34 changes: 34 additions & 0 deletions tests/Fixtures/TestSignalAdvancedExceptionWorkflow.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

declare(strict_types=1);

namespace Tests\Fixtures;

use Throwable;
use function Workflow\activity;
use function Workflow\await;
use Workflow\SignalMethod;
use Workflow\Workflow;

final class TestSignalAdvancedExceptionWorkflow extends Workflow
{
private bool $shouldContinue = false;

#[SignalMethod]
public function continueAfterFailure(): void
{
$this->shouldContinue = true;
}

public function execute()
{
try {
yield activity(TestSingleTryExceptionActivity::class, true);
} catch (Throwable) {
yield await(fn (): bool => $this->shouldContinue);
yield activity(TestSingleTryExceptionActivity::class, true);
}

return 'handled';
}
}
Loading