Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Activity.php
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ public function failed(Throwable $throwable): void
$this->storedWorkflow,
$throwable,
$workflow->connection(),
$workflow->queue()
$workflow->queue(),
$this::class
);
}

Expand Down
32 changes: 30 additions & 2 deletions src/ActivityStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,24 @@ public static function make($activity, ...$arguments): PromiseInterface
}

if ($log) {
if (
WorkflowStub::isProbing()
&& WorkflowStub::probeIndex() === $context->index
&& (
WorkflowStub::probeClass() === null
|| WorkflowStub::probeClass() === $activity
)
&& $log->class === Exception::class
) {
WorkflowStub::markProbeMatched();
}

++$context->index;
WorkflowStub::setContext($context);
$result = Serializer::unserialize($log->result);
if ($log->class === Exception::class && self::isForeignExceptionResult($result, $activity)) {
return self::make($activity, ...$arguments);
}
if (
is_array($result) &&
array_key_exists('class', $result) &&
Expand All @@ -74,11 +89,24 @@ public static function make($activity, ...$arguments): PromiseInterface
return resolve($result);
}

if (WorkflowStub::isProbing()) {
++$context->index;
WorkflowStub::setContext($context);
return (new Deferred())->promise();
}

$activity::dispatch($context->index, $context->now, $context->storedWorkflow, ...$arguments);

++$context->index;
WorkflowStub::setContext($context);
$deferred = new Deferred();
return $deferred->promise();
return (new Deferred())->promise();
}

private static function isForeignExceptionResult(mixed $result, string $activity): bool
{
return is_array($result)
&& isset($result['sourceClass'])
&& is_string($result['sourceClass'])
&& $result['sourceClass'] !== $activity;
}
}
32 changes: 30 additions & 2 deletions src/ChildWorkflowStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,24 @@ public static function make($workflow, ...$arguments): PromiseInterface
}

if ($log) {
if (
WorkflowStub::isProbing()
&& WorkflowStub::probeIndex() === $context->index
&& (
WorkflowStub::probeClass() === null
|| WorkflowStub::probeClass() === $workflow
)
&& $log->class === Exception::class
) {
WorkflowStub::markProbeMatched();
}

++$context->index;
WorkflowStub::setContext($context);
$result = Serializer::unserialize($log->result);
if ($log->class === Exception::class && self::isForeignExceptionResult($result, $workflow)) {
return self::make($workflow, ...$arguments);
}
if (
is_array($result)
&& array_key_exists('class', $result)
Expand All @@ -67,6 +82,12 @@ public static function make($workflow, ...$arguments): PromiseInterface
return resolve($result);
}

if (WorkflowStub::isProbing()) {
++$context->index;
WorkflowStub::setContext($context);
return (new Deferred())->promise();
}

if (! $context->replaying) {
$storedChildWorkflow = $context->storedWorkflow->children()
->wherePivot('parent_index', $context->index)
Expand Down Expand Up @@ -94,7 +115,14 @@ public static function make($workflow, ...$arguments): PromiseInterface

++$context->index;
WorkflowStub::setContext($context);
$deferred = new Deferred();
return $deferred->promise();
return (new Deferred())->promise();
}

private static function isForeignExceptionResult(mixed $result, string $workflow): bool
{
return is_array($result)
&& isset($result['sourceClass'])
&& is_string($result['sourceClass'])
&& $result['sourceClass'] !== $workflow;
}
}
129 changes: 110 additions & 19 deletions src/Exception.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Cache;
use Throwable;
use Workflow\Exceptions\TransitionNotFound;
use Workflow\Middleware\WithoutOverlappingMiddleware;
use Workflow\Models\StoredWorkflow;
use Workflow\Models\StoredWorkflowLog;
use Workflow\Serializers\Serializer;

final class Exception implements ShouldBeEncrypted, ShouldQueue
{
Expand All @@ -35,7 +38,8 @@ public function __construct(
public StoredWorkflow $storedWorkflow,
public $exception,
$connection = null,
$queue = null
$queue = null,
public ?string $sourceClass = null
) {
$connection = $connection ?? $this->storedWorkflow->effectiveConnection() ?? config('queue.default');
$queue = $queue ?? $this->storedWorkflow->effectiveQueue() ?? config(
Expand All @@ -48,30 +52,117 @@ public function __construct(

public function handle()
{
$workflow = $this->storedWorkflow->toWorkflow();
$lock = Cache::lock('laravel-workflow-exception:' . $this->storedWorkflow->id, 15);

if (! $lock->get()) {
$this->release();

return;
}

try {
if ($this->storedWorkflow->hasLogByIndex($this->index)) {
$workflow->resume();
} elseif (! $this->storedWorkflow->logs()->where('class', self::class)->exists()) {
$workflow->next($this->index, $this->now, self::class, $this->exception);
}
} catch (TransitionNotFound) {
if ($workflow->running()) {
$this->release();
$workflow = $this->storedWorkflow->toWorkflow();

try {
if ($this->storedWorkflow->hasLogByIndex($this->index)) {
$workflow->resume();
} elseif ($this->shouldPersistAfterProbeReplay()) {
$workflow->next($this->index, $this->now, self::class, $this->exceptionPayload());
}
} catch (TransitionNotFound) {
if ($workflow->running()) {
$this->release();
}
}
} finally {
$lock->release();
}
}

public function middleware()
{
return [
new WithoutOverlappingMiddleware(
$this->storedWorkflow->id,
WithoutOverlappingMiddleware::ACTIVITY,
0,
15
),
];
return [];
}

private function shouldPersistAfterProbeReplay(): bool
{
$workflowClass = $this->storedWorkflow->class;

if (! is_string($workflowClass) || $workflowClass === '') {
return true;
}

$previousContext = WorkflowStub::getContext();
$shouldPersist = false;

try {
$tentativeWorkflow = $this->createTentativeWorkflowState();
$workflow = new $workflowClass($tentativeWorkflow, ...$tentativeWorkflow->workflowArguments());
$workflow->replaying = true;

WorkflowStub::setContext([
'storedWorkflow' => $tentativeWorkflow,
'index' => 0,
'now' => $this->now,
'replaying' => true,
'probing' => true,
'probeIndex' => $this->index,
'probeClass' => $this->sourceClass,
'probeMatched' => false,
]);

try {
$workflow->handle();
} catch (Throwable) {
// The replay path may still throw; we only care whether it matched this tentative log.
}

$shouldPersist = WorkflowStub::probeMatched();
} finally {
WorkflowStub::setContext($previousContext);
}

return $shouldPersist;
}

private function createTentativeWorkflowState(): StoredWorkflow
{
$storedWorkflowClass = $this->storedWorkflow::class;

/** @var StoredWorkflow $tentativeWorkflow */
$tentativeWorkflow = $storedWorkflowClass::query()
->findOrFail($this->storedWorkflow->id);

$tentativeWorkflow->loadMissing(['logs', 'signals']);

/** @var StoredWorkflowLog $tentativeLog */
$tentativeLog = $tentativeWorkflow->logs()
->make([
'index' => $this->index,
'now' => $this->now,
'class' => self::class,
'result' => Serializer::serialize($this->exceptionPayload()),
]);

$tentativeWorkflow->setRelation(
'logs',
$tentativeWorkflow->getRelation('logs')
->push($tentativeLog)
->sortBy(static fn ($log): string => sprintf('%020d:%020d', $log->index, $log->id ?? PHP_INT_MAX))
->values()
);

return $tentativeWorkflow;
}

private function exceptionPayload()
{
if (! is_array($this->exception) || $this->sourceClass === null) {
return $this->exception;
}

return array_merge($this->exception, [
'sourceClass' => $this->sourceClass,
]);
}
}
8 changes: 6 additions & 2 deletions src/Traits/Awaits.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ public static function await($condition): PromiseInterface
return resolve(Serializer::unserialize($log->result));
}

if (self::isProbing()) {
++self::$context->index;
return (new Deferred())->promise();
}

$result = $condition();

if ($result === true) {
Expand Down Expand Up @@ -49,7 +54,6 @@ public static function await($condition): PromiseInterface
}

++self::$context->index;
$deferred = new Deferred();
return $deferred->promise();
return (new Deferred())->promise();
}
}
6 changes: 6 additions & 0 deletions src/Traits/SideEffects.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Workflow\Traits;

use Illuminate\Database\QueryException;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use function React\Promise\resolve;
use Workflow\Serializers\Serializer;
Expand All @@ -20,6 +21,11 @@ public static function sideEffect($callable): PromiseInterface
return resolve(Serializer::unserialize($log->result));
}

if (self::isProbing()) {
++self::$context->index;
return (new Deferred())->promise();
}

$result = $callable();

if (! self::$context->replaying) {
Expand Down
11 changes: 7 additions & 4 deletions src/Traits/Timers.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,19 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa
$when = self::$context->now->copy()
->addSeconds($seconds);

if (self::isProbing()) {
++self::$context->index;
return (new Deferred())->promise();
}

if (! self::$context->replaying) {
$timer = self::$context->storedWorkflow->createTimer([
'index' => self::$context->index,
'stop_at' => $when,
]);
} else {
++self::$context->index;
$deferred = new Deferred();
return $deferred->promise();
return (new Deferred())->promise();
}
}

Expand Down Expand Up @@ -95,7 +99,6 @@ public static function timer(int|string|CarbonInterval $seconds): PromiseInterfa
}

++self::$context->index;
$deferred = new Deferred();
return $deferred->promise();
return (new Deferred())->promise();
}
}
6 changes: 6 additions & 0 deletions src/Traits/Versions.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Workflow\Traits;

use Illuminate\Database\QueryException;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use function React\Promise\resolve;
use Workflow\Exceptions\VersionNotSupportedException;
Expand Down Expand Up @@ -33,6 +34,11 @@ public static function getVersion(
return resolve($version);
}

if (self::isProbing()) {
++self::$context->index;
return (new Deferred())->promise();
}

$version = $maxSupported;

if (! self::$context->replaying) {
Expand Down
Loading
Loading