Stateful, graph-based workflow engine for Laravel.
Build multi-step agent pipelines, human-in-the-loop processes, and parallel fan-out/fan-in tasks — all backed by your database and queue.
Inspired by LangGraph
- Installation
- Core Concepts
- Building a Workflow
- Running a Workflow
- State
- Human-in-the-Loop
- Node Contracts
- Built-in Nodes
- Prism Integration
- Laravel AI Integration
- Sub-graph Workflows
- Recursion Limit
- Events
- Configuration
- Testing
composer require cainy/laragraphPublish and run the migration:
php artisan vendor:publish --tag="laragraph-migrations"
php artisan migratePublish the config file:
php artisan vendor:publish --tag="laragraph-config"LaraGraph models a workflow as a directed graph of nodes connected by edges. Each run of that graph is a WorkflowRun — a database record that tracks the current state, status, and active node pointers.
| Term | Meaning |
|---|---|
| Node | A unit of work. Receives the current state, returns a mutation. |
| Edge | A directed connection between two nodes, optionally conditional. |
| State | A plain PHP array that accumulates mutations as nodes execute. |
| Pointer | Tracks which nodes are currently in-flight for a run. |
| WorkflowRun | The persisted record for a single execution of a workflow. |
Execution is fully queue-driven. Each node runs as an independent ExecuteNode job, so parallel branches execute concurrently across your worker pool.
Workflows are classes that extend Workflow and define their graph in a definition() method:
use Cainy\Laragraph\Builder\Workflow;
class MyPipeline extends Workflow
{
public function definition(): void
{
$this->addNode('fetch', FetchNode::class)
->addNode('transform', TransformNode::class)
->addNode('store', StoreNode::class)
->transition(Workflow::START, 'fetch')
->transition('fetch', 'transform')
->transition('transform', 'store')
->transition('store', Workflow::END);
}
}You can also call compile() directly on a Workflow instance if you prefer building inline, but the class-based approach is recommended since workflows are stored by class name.
A node is any class implementing Cainy\Laragraph\Contracts\Node:
use Cainy\Laragraph\Contracts\Node;
use Cainy\Laragraph\Engine\NodeExecutionContext;
class SummarizeNode implements Node
{
public function handle(NodeExecutionContext $context, array $state): array
{
$text = implode("\n", $state['paragraphs'] ?? []);
return ['summary' => substr($text, 0, 200)];
}
}handle() receives a typed NodeExecutionContext and the current full state. It returns an array of mutations — only the keys you want to change.
$context->runId // int — ID of the WorkflowRun
$context->workflowKey // string — class name of the workflow
$context->nodeName // string — name of this node in the graph
$context->attempt // int — current queue attempt (1-based)
$context->maxAttempts // int — maximum attempts configured
$context->createdAt // DateTimeImmutable
$context->isolatedPayload // ?array — payload injected by a Send (see Dynamic Fan-out)
$context->parentRunId // ?int — set when this run was dispatched as a child workflow
$context->parentNodeName // ?string — the sub-graph node name on the parent
$context->routing // array — read-only engine routing snapshot (counters, interrupt marker, etc.)
// Helpers:
$context->isSendExecution() // bool — true when dispatched via a Send
$context->payload('key', $default) // mixed — read a value from the isolated payload
$context->parentMetadata() // ?array — lazy-loads the parent run's metadata (null at top-level)User state vs engine state: $state only contains keys your nodes write. Engine
bookkeeping (spawn counters, interrupt markers, gate reasons, child-run ids,
error summaries) lives on a separate workflow_runs.routing column and is
never merged into $state. Read it via $context->routing when you need it.
$this->transition(Workflow::START, 'fetch')
->transition('fetch', 'transform')
->transition('transform', Workflow::END);Workflow::START and Workflow::END are reserved entry and exit pseudo-nodes.
Nodes can be registered as class strings (resolved via the container) or as pre-built instances.
Pass a Closure as the third argument to ->transition():
->transition('classify', 'approve', fn(array $state) => $state['score'] > 50)
->transition('classify', 'reject', fn(array $state) => $state['score'] <= 50)A branch edge uses a resolver to return one or more target node names dynamically at runtime:
->branch('router', function(array $state): string {
return $state['approved'] ? 'publish' : 'revise';
}, targets: ['publish', 'revise'])The targets array is optional but recommended — it enables graph visualization without executing the resolver.
To execute multiple nodes in parallel from a single node, add multiple transitions from the same source:
$this->addNode('split', SplitNode::class)
->addNode('branch-a', BranchANode::class)
->addNode('branch-b', BranchBNode::class)
->addNode('merge', MergeNode::class)
->transition(Workflow::START, 'split')
->transition('split', 'branch-a')
->transition('split', 'branch-b')
->transition('branch-a', 'merge')
->transition('branch-b', 'merge')
->transition('merge', Workflow::END);branch-a and branch-b run as independent queue jobs. Use a BarrierNode as the merge node to wait for all branches before continuing.
To fan out over a dynamic list, return Send objects from a branch edge resolver:
use Cainy\Laragraph\Routing\Send;
->branch('planner', function(array $state): array {
return array_map(
fn(string $query) => new Send('worker', ['query' => $query]),
$state['queries']
);
}, targets: ['worker'])Each Send dispatches an independent ExecuteNode job. The target node receives the payload via $context->isolatedPayload or the helper methods:
public function handle(NodeExecutionContext $context, array $state): array
{
$query = $context->payload('query');
// ...
}The same fan-out is available via the SendNode prebuilt (see Built-in Nodes).
Three canonical shapes cover almost every real workflow:
1. One-shot fan-out → barrier. Single worker node per item, then a
BarrierNode. Use for independent work with no per-item downstream steps.
->branch('split', fn($s) => array_map(fn($id) => new Send('worker', ['id' => $id]), $s['ids']), ['worker'])
->transition('worker', 'barrier')
->transition('barrier', 'aggregate')2. Per-item pipeline → child workflow → barrier. When each item needs a
multi-step pipeline (enrich → qualify → classify → draft), make the pipeline
its own Workflow and dispatch it as a sub-graph via Send::toWorkflow().
The payload becomes the child's initial state (parent state is not
leaked), and each Send spawns its own isolated child run.
class LeadPipeline extends Workflow
{
public function definition(): void
{
$this->addNode('enrich', EnrichNode::class)
->addNode('qualify', QualifyNode::class)
->addNode('classify', ClassifyNode::class)
->transition(Workflow::START, 'enrich')
->transition('enrich', 'qualify')
->transition('qualify', 'classify')
->transition('classify', Workflow::END);
}
}
class FanoutPipeline extends Workflow
{
public function definition(): void
{
$this->addNode('per_lead', app(LeadPipeline::class))
->addNode('barrier', new BarrierNode())
->branch(Workflow::START, fn($s) => collect($s['lead_ids'])->map(
fn($id) => Send::toWorkflow('per_lead', ['lead_id' => $id])
)->all(), ['per_lead'])
->transition('per_lead', 'barrier')
->transition('barrier', Workflow::END);
}
}Why a sub-workflow rather than chaining sibling nodes in the parent? Send
payloads are per-job and don't propagate through subsequent static edges
(enrich → qualify would see payload = null). Inside a child workflow the
payload becomes state, so enrich writes to state and qualify sees it on
the next hop — no payload-threading needed.
3. Parent supervises long-running children. Same shape as (2), but use
cascadeFailure(false) on the child workflow when you want a map-reduce style
aggregate where one child's failure shouldn't fail the whole run. See
Sub-graph Workflows below.
use Cainy\Laragraph\Facades\Laragraph;
$run = Laragraph::run(MyPipeline::class, initialState: [
'input' => 'Hello, world!',
]);
echo $run->id; // WorkflowRun ID
echo $run->status; // RunStatus::RunningPass an optional metadata array as the third argument to attach correlation data that travels with the run without being visible to nodes:
$run = Laragraph::run(MyPipeline::class,
initialState: ['input' => 'Hello'],
metadata: ['trace_id' => $traceId, 'user_id' => $userId],
);
$run->metadata; // ['trace_id' => ..., 'user_id' => ...]The run is created synchronously. Node jobs are dispatched to your queue immediately after.
// Pause a running workflow
Laragraph::pause($run->id);
// Resume a paused workflow, optionally merging additional state
Laragraph::resume($run->id, ['approved' => true]);
// Abort a workflow (sets status to Failed, clears all pointers)
Laragraph::abort($run->id);Override any of these methods on your Workflow subclass to react to run lifecycle events. Hook exceptions are swallowed and never affect engine state.
class MyPipeline extends Workflow
{
public function definition(): void { /* ... */ }
public function onStarting(WorkflowRun $run): void
{
Log::info("Run {$run->id} starting");
}
public function onCompleted(WorkflowRun $run): void
{
Cache::forget("pipeline:{$run->metadata['trace_id']}");
}
public function onFailed(WorkflowRun $run, Throwable $exception): void
{
report($exception);
}
}State is a plain PHP array that persists in the workflow_runs.state column. Every node receives the full current state and returns a mutation — a partial array of keys to update.
The reducer determines how mutations are merged into the existing state.
LaraGraph ships with three reducers:
| Class | Behaviour |
|---|---|
SmartReducer (default) |
List arrays are appended. Scalars and associative arrays are overwritten. |
MergeReducer |
Deep recursive merge for all keys. |
OverwriteReducer |
Shallow array_merge — always overwrites. |
SmartReducer is the right default for most agent workflows: message histories accumulate naturally, while scalar values like status or score simply overwrite.
Implement StateReducerInterface and bind it in your service provider, or attach it to a specific workflow:
// Globally
$this->app->bind(StateReducerInterface::class, MyReducer::class);
// Per workflow
$this->withReducer(MyReducer::class)LaraGraph has first-class support for pausing workflows and waiting for human input.
Pause the run before a node executes. On resume, the node runs normally.
$this->addNode('review', ReviewNode::class)
->interruptBefore('review');Pause the run after a node executes but before its outgoing edges are evaluated.
$this->addNode('drafter', DrafterNode::class)
->addNode('publish', PublishNode::class)
->transition(Workflow::START, 'drafter')
->transition('drafter', 'publish')
->transition('publish', Workflow::END)
->interruptAfter('drafter');Call Laragraph::resume() with any additional state to merge before the run continues:
Laragraph::resume($run->id, [
'meta' => ['approved' => true],
]);Any node can pause the run at runtime by throwing NodePausedException:
use Cainy\Laragraph\Exceptions\NodePausedException;
class ConfidenceCheckNode implements Node
{
public function handle(NodeExecutionContext $context, array $state): array
{
if ($state['confidence'] < 0.7) {
throw new NodePausedException($context->nodeName);
}
return ['status' => 'confident'];
}
}You can also pass state mutations to persist before pausing, and surface a
human-readable gate reason that rides on the engine's routing column (not
$state):
throw new NodePausedException(
nodeName: $context->nodeName,
stateMutation: ['draft_attempt' => ($state['draft_attempt'] ?? 0) + 1],
gateReason: 'Score too low — human review required',
);The HumanInterventionRequired event fires with (runId, nodeName, gateReason).
Gate reason is also readable on the paused run via $run->routing['gate_reason'].
Nodes can implement optional contracts to declare capabilities to the engine.
Give a node a stable identifier used in edge routing and graph visualization:
use Cainy\Laragraph\Contracts\HasName;
class ResearchAgentNode implements Node, HasName
{
public function name(): string
{
return 'research-agent';
}
}Emit metadata alongside each node execution — useful for tracking token usage, model names, cost centers, or tenant IDs. Tags are automatically persisted to the workflow_node_executions table and broadcast on the NodeCompleted event:
use Cainy\Laragraph\Contracts\HasTags;
class LLMNode implements Node, HasTags
{
private string $model = '';
private int $tokens = 0;
public function handle(NodeExecutionContext $context, array $state): array
{
// ... call LLM, populate $this->model and $this->tokens ...
return ['response' => $result];
}
public function tags(): array
{
return [
'model' => $this->model,
'tokens' => $this->tokens,
'cost_usd' => $this->tokens * 0.000003,
];
}
}The engine calls tags() after handle() returns, so the node can accumulate values during execution and expose them at the end.
// All executions for a run
$run->nodeExecutions;
// Total cost for a run
$run->nodeExecutions->sum(fn($e) => $e->tags['cost_usd'] ?? 0);
// Per-node cost breakdown
$run->nodeExecutions
->groupBy('node_name')
->map(fn($execs) => $execs->sum(fn($e) => $e->tags['cost_usd'] ?? 0));
// Failed executions only
$run->nodeExecutions->filter(fn($e) => $e->failed());NodeExecution columns: run_id, node_name, attempt, tags (JSON),
executed_at, error_class, error_message, error_trace, failed_at.
When a node fails after exhausting retries, the engine writes exactly one row
with the error columns populated — no polluting state.error or parsing
Laravel logs.
Define per-node retry behaviour with exponential backoff and optional jitter:
use Cainy\Laragraph\Contracts\HasRetryPolicy;
use Cainy\Laragraph\Engine\RetryPolicy;
class FlakyAPINode implements Node, HasRetryPolicy
{
public function retryPolicy(): RetryPolicy
{
return new RetryPolicy(
initialInterval: 1.0,
backoffFactor: 2.0,
maxInterval: 30.0,
maxAttempts: 5,
jitter: true,
);
}
}Restrict retries to specific exception types:
new RetryPolicy(
maxAttempts: 3,
retryOn: [RateLimitException::class, TimeoutException::class],
)
// Or with a Closure for full control:
new RetryPolicy(
maxAttempts: 3,
retryOn: fn(Throwable $e) => $e->getCode() === 429,
)Route a node's job to a specific queue or connection:
use Cainy\Laragraph\Contracts\HasQueue;
class HeavyLLMNode implements Node, HasQueue
{
public function queue(): string
{
return 'llm';
}
public function connection(): ?string
{
return null; // use default connection
}
}Attach Laravel job middleware to a node's execution job:
use Cainy\Laragraph\Contracts\HasMiddleware;
use Illuminate\Queue\Middleware\RateLimited;
class AnthropicNode implements Node, HasMiddleware
{
public function middleware(): array
{
return [new RateLimited('anthropic')];
}
}Declare that a node should loop — driving tool execution cycles, polling, or any other repeated sub-task. The compiler automatically injects the loop edges at compile time.
use Cainy\Laragraph\Contracts\HasLoop;
class PollingNode implements Node, HasLoop
{
public function loopNode(string $nodeName): Node
{
return new CheckStatusNode();
}
public function loopCondition(): \Closure
{
return fn(array $state) => $state['status'] !== 'done';
}
}When compiled, the engine injects a {name}.__loop__ node and guards existing exit edges with the negated condition. Use Workflow::toolNode('name') to reference the synthetic loop node in interrupt points:
->interruptBefore(Workflow::toolNode('agent'))Mark a node as a fan-in barrier. The engine tracks how many workers were dispatched into this node and how many have committed their results. It serialises concurrent arrivals under a database lock, and only the final arrival — the one that sees all predecessors complete — runs handle(). All earlier arrivals skip cleanly.
use Cainy\Laragraph\Contracts\IsFanInBarrier;
class MyBarrierNode implements Node, IsFanInBarrier
{
public function handle(NodeExecutionContext $context, array $state): array
{
// Only called once — after every predecessor has committed.
return ['merged' => true];
}
}BarrierNode implements IsFanInBarrier out of the box. Implement it on any custom node that acts as a convergence point for parallel branches.
Pauses the workflow unconditionally until manually resumed. Use as a static approval gate.
use Cainy\Laragraph\Nodes\GateNode;
$this->addNode('approve', new GateNode(reason: 'Manager approval required'))
->transition('draft', 'approve')
->transition('approve', 'publish');When the gate triggers, a HumanInterventionRequired event fires with the
reason. The reason is also stored on $run->routing['gate_reason'] (engine
state — not merged into $state). Resume via Laragraph::resume($runId).
GateNodeis a one-shot pause: itshandle()unconditionally throws. For human-in-the-loop checkpoints where you still want the node's side effects to run first, useinterruptAfter()on a regular node instead — see Human-in-the-Loop.
Fan-out node — dispatches a Send for each item in a state list, sending each to the same target node with an isolated payload.
use Cainy\Laragraph\Nodes\SendNode;
$this->addNode('fanout', new SendNode(
sourceKey: 'queries',
targetNode: 'worker',
payloadKey: 'query',
))
->addNode('worker', WorkerNode::class)
->transition(Workflow::START, 'fanout')
->transition('fanout', 'worker');Inside WorkerNode, access the payload via $context->payload('query').
Fan-in barrier — waits for all parallel workers to complete before allowing the downstream edge to fire. Zero configuration required.
use Cainy\Laragraph\Nodes\BarrierNode;
->addNode('barrier', new BarrierNode())
->transition('worker', 'barrier')
->transition('barrier', 'aggregator')The engine automatically tracks how many workers were dispatched into the barrier and how many have committed their results. Early arrivals skip cleanly (removing their pointer to maintain equilibrium). Only the final arrival — when all predecessors are fully complete — runs handle() and evaluates the downstream edges. The node body itself is a no-op; all logic lives in the engine.
Works with both transition fan-out and Send-based fan-out, including multiple sequential barriers in the same workflow.
Makes an HTTP request and stores the response in state. The URL supports {state.key} interpolation.
use Cainy\Laragraph\Nodes\HttpNode;
->addNode('fetch', new HttpNode(
url: 'https://api.example.com/items/{state.item_id}',
method: 'GET',
headers: ['Authorization' => 'Bearer token'],
responseKey: 'api_response',
))The response is stored as ['status' => 200, 'body' => [...], 'ok' => true] under responseKey.
For POST/PUT/PATCH requests, set bodyKey to a state key whose value will be sent as the request body:
new HttpNode(url: '...', method: 'POST', bodyKey: 'payload', responseKey: 'result')Pauses execution for a given number of seconds, then continues.
use Cainy\Laragraph\Nodes\DelayNode;
->addNode('wait', new DelayNode(seconds: 300))On first execution the node stores a resume-after timestamp, dispatches a delayed queue job, and pauses. The job automatically calls Laragraph::resume() when the delay elapses — no scheduled command or polling required.
Reads from or writes to the Laravel cache. The cache key supports {state.key} interpolation.
use Cainy\Laragraph\Nodes\CacheNode;
->addNode('load', new CacheNode(operation: 'get', cacheKey: 'report:{state.user_id}', stateKey: 'cached_report'))
->addNode('store', new CacheNode(operation: 'put', cacheKey: 'report:{state.user_id}', stateKey: 'report', ttl: 3600))
->addNode('bust', new CacheNode(operation: 'forget', cacheKey: 'report:{state.user_id}', stateKey: 'report'))Dispatches a Laravel event with values from state as constructor arguments.
use Cainy\Laragraph\Nodes\NotifyNode;
->addNode('notify', new NotifyNode(
eventClass: ReportReady::class,
dataKeys: ['user_id', 'report_url'],
))LaraGraph ships with first-class support for Prism via the Cainy\Laragraph\Integrations\Prism namespace.
composer require prism-php/prismA general-purpose LLM node. Supports text generation and structured output —
no tool loop injected by default. Use PrismToolNode (below) when you
want tool-calling with automatic re-entry.
use Cainy\Laragraph\Integrations\Prism\PrismNode;
use Prism\Prism\Enums\Provider;
$this->addNode('assistant', new PrismNode(
provider: Provider::Anthropic,
model: 'claude-sonnet-4-6',
systemPrompt: 'You are a helpful assistant.',
maxTokens: 1024,
));By default the assistant's response is appended to state['messages'].
Subclass when you need dynamic behaviour or structured output:
use Prism\Prism\Contracts\Schema;
use Prism\Prism\Schema\ObjectSchema;
use Prism\Prism\Schema\StringSchema;
class ClassifierNode extends PrismNode
{
protected function systemPrompt(array $state): string
{
return 'Classify the input into category + confidence.';
}
protected function prompt(array $state): string
{
return "Input: {$state['text']}";
}
protected function schema(): ?Schema
{
return new ObjectSchema(
name: 'classification',
description: 'Result of the classification',
properties: [
new StringSchema('category', 'category'),
new StringSchema('confidence', 'confidence 0..1'),
],
requiredFields: ['category', 'confidence'],
);
}
public function outputKey(): string
{
return 'classification'; // default: 'output'
}
}When schema() returns non-null, the node calls Prism's structured-output path
and writes the result to state[outputKey()] instead of state['messages'].
Available overrides: provider(), model(), maxTokens(), temperature(),
topP(), systemPrompt($state), prompt($state), messages($state),
tools(), schema(), messagesKey(), outputKey(), applyProviderOptions().
Extends PrismNode and implements HasLoop — the compiler injects a
synthetic .__loop__ tool-executor so the node re-enters itself after each
tool call completes (classic ReAct loop).
use Cainy\Laragraph\Integrations\Prism\PrismToolNode;
use Prism\Prism\Tool;
class WeatherAgent extends PrismToolNode
{
public function tools(): array
{
return [
(new Tool)
->as('get_weather')
->for('Get weather for a city')
->withStringParameter('city', 'City name')
->using(fn(string $city): string => "Sunny, 22°C in {$city}"),
];
}
}
$this->addNode('agent', new WeatherAgent(
provider: Provider::Anthropic,
model: 'claude-sonnet-4-6',
));The injected graph:
START → agent ──(tool_calls present)──→ agent.__loop__ → agent
──(no tool_calls)──────→ END
To interrupt before tool execution runs:
->interruptBefore(Workflow::toolNode('agent'))Abstract base for nodes that execute tool calls from state['messages']
without the automatic loop. Use when you want explicit edges around tool
execution (for conditional routing, logging, approval gates, etc.).
use Cainy\Laragraph\Integrations\Prism\ToolNode;
class WeatherToolNode extends ToolNode
{
protected function toolMap(): array
{
return [
'get_weather' => fn(array $args): string =>
"Sunny, 22°C in " . ($args['city'] ?? 'unknown'),
];
}
}Wire the edges yourself:
$this->addNode('agent', MyAgentNode::class)
->addNode('tools', WeatherToolNode::class)
->transition(Workflow::START, 'agent')
->transition('agent', 'tools', fn($s) => ! empty(end($s['messages'])['tool_calls'] ?? []))
->transition('agent', Workflow::END, fn($s) => empty(end($s['messages'])['tool_calls'] ?? []))
->transition('tools', 'agent');LaraGraph integrates with Laravel AI via the AsGraphNode trait.
composer require laravel/aiAdd AsGraphNode to a standard Laravel AI agent to make it a Laragraph node:
use Cainy\Laragraph\Contracts\Node;
use Cainy\Laragraph\Integrations\LaravelAi\AsGraphNode;
use Laravel\Ai\Contracts\Agent;
use Laravel\Ai\Promptable;
class ResearchAgent implements Agent, Node
{
use AsGraphNode, Promptable;
public function instructions(): string
{
return 'You are a research assistant.';
}
protected function getAgentPrompt(): string
{
return 'Research: ' . ($this->state['topic'] ?? 'general');
}
}If your agent implements HasStructuredOutput, the trait maps structured response keys directly to state mutation keys:
use Laravel\Ai\Contracts\HasStructuredOutput;
use Illuminate\Contracts\JsonSchema\JsonSchema;
class ClassifierAgent implements Agent, Node, HasStructuredOutput
{
use AsGraphNode, Promptable;
public function instructions(): string
{
return 'Classify the input into a category and confidence score.';
}
public function schema(JsonSchema $schema): array
{
return [
'category' => $schema->string()->required(),
'confidence' => $schema->number()->min(0)->max(1)->required(),
];
}
}After execution, state['category'] and state['confidence'] are set directly.
Laravel AI agents implementing HasTools are automatically detected by the compiler — tool loop injection works exactly as with PrismNode:
use Laravel\Ai\Contracts\HasTools;
class WeatherAgent implements Agent, Node, HasTools
{
use AsGraphNode, Promptable;
public function tools(): array { return [new GetWeather]; }
}Any Workflow subclass implements Node and can be embedded inside another workflow. The sub-graph is identified by its class name — no snapshot serialization required.
class ResearchSubgraph extends Workflow
{
public function definition(): void
{
$this->addNode('search', SearchNode::class)
->addNode('extract', ExtractNode::class)
->transition(Workflow::START, 'search')
->transition('search', 'extract')
->transition('extract', Workflow::END);
}
}
class ParentPipeline extends Workflow
{
public function definition(): void
{
$this->addNode('research', ResearchSubgraph::class)
->addNode('write', WriteNode::class)
->transition(Workflow::START, 'research')
->transition('research', 'write')
->transition('write', Workflow::END);
}
}When the engine executes a sub-graph node:
- A child
WorkflowRunis created and linked viaparent_run_id/parent_node_name. - The child workflow starts normally — its nodes run as independent queue jobs.
- The parent run pauses at the sub-graph node.
- When the child completes, the engine resumes the parent automatically.
- The parent node returns the state delta from the child's final state as a mutation.
$run->parent; // ?WorkflowRun
$run->children; // Collection<WorkflowRun>Sub-graphs behave differently depending on how they're reached:
| Reached via | Child initial state | Delta merged back |
|---|---|---|
Embedded (transition('sub', …)) |
Parent's full state | Child's state diff vs parent's state |
Send::toWorkflow('sub', $payload) |
The Send payload only | Child's full final state |
Use Send::toWorkflow() for per-item pipelines where parent state should not
leak into the child. Use plain embedding when the child should see and operate
on parent state (e.g. a reusable "research this topic" sub-graph).
By default, when a child workflow fails, the failure cascades to the parent:
the parent is marked Failed, its pointers are cleared, and WorkflowFailed
fires with the child's exception.
Opt out on a per-child-workflow basis by overriding shouldCascadeFailure():
class ToleratingChildWorkflow extends Workflow
{
public function shouldCascadeFailure(): bool
{
return false; // parent stays Paused; caller can decide what to do
}
public function definition(): void { /* ... */ }
}Useful for map-reduce patterns where individual child failures should be aggregated rather than fatal.
Inside a child node, use the context accessors:
public function handle(NodeExecutionContext $context, array $state): array
{
$profileId = $context->parentMetadata()['profile_id'] ?? null;
// ...
}$context->parentRunId, $context->parentNodeName, and $context->parentMetadata()
are all null at the top level.
The engine tracks total node executions per run and throws RecursionLimitExceeded if the limit is hit.
The default limit is config('laragraph.recursion_limit', 100). Override it per workflow:
class MyPipeline extends Workflow
{
public function definition(): void
{
$this->withRecursionLimit(500);
// ...
}
}For legitimate fan-out workflows (e.g. 50 items, each running a 5-step
pipeline), raise this to items × steps + headroom. The exception message
includes a hint pointing at ->withRecursionLimit() when the limit is hit.
LaraGraph fires events throughout the workflow lifecycle. All events implement ShouldBroadcast and are broadcast on the workflow channel when broadcasting is enabled.
| Event | Payload |
|---|---|
WorkflowStarted |
runId, workflowKey |
NodeExecuting |
runId, nodeName |
NodeCompleted |
runId, nodeName, mutation, tags |
NodeFailed |
runId, nodeName, exception |
WorkflowCompleted |
runId, workflowKey |
WorkflowFailed |
runId, exception, workflowKey |
WorkflowResumed |
runId, workflowKey |
HumanInterventionRequired |
runId, nodeName, reason |
Enable broadcasting in your .env:
LARAGRAPH_BROADCASTING_ENABLED=true
LARAGRAPH_CHANNEL_TYPE=private # public | private | presence
LARAGRAPH_CHANNEL_PREFIX=workflow.Each run broadcasts on channel {prefix}{runId} (e.g. workflow.42). Authorize the channel in routes/channels.php as needed.
// config/laragraph.php
return [
// Queue name for ExecuteNode jobs (overridden per-node via HasQueue)
'queue' => env('LARAGRAPH_QUEUE', 'default'),
// Queue connection (null = default connection)
'connection' => env('LARAGRAPH_QUEUE_CONNECTION'),
// Hold jobs until the wrapping transaction commits (enable if you call
// Laragraph::run() inside your own DB transactions)
'after_commit' => env('LARAGRAPH_AFTER_COMMIT', false),
// Default max attempts per node (overridden per-node via HasRetryPolicy)
'max_node_attempts' => 3,
// Default node timeout in seconds
'node_timeout' => 60,
// Maximum node executions per run before RecursionLimitExceeded is thrown
'recursion_limit' => 100,
// Prune completed/failed runs older than this many days
'prunable_after_days' => 30,
// Default retry backoff settings (overridden per-node via HasRetryPolicy)
'retry' => [
'initial_interval' => 0.5,
'backoff_factor' => 2.0,
'max_interval' => 128.0,
'jitter' => true,
],
'broadcasting' => [
'enabled' => env('LARAGRAPH_BROADCASTING_ENABLED', false),
'channel_type' => env('LARAGRAPH_CHANNEL_TYPE', 'private'),
'channel_prefix' => env('LARAGRAPH_CHANNEL_PREFIX', 'workflow.'),
],
];composer testLaraGraph works with the sync queue driver in tests — set QUEUE_CONNECTION=sync in your phpunit.xml and runs execute synchronously, making assertions straightforward:
use Cainy\Laragraph\Facades\Laragraph;
use Cainy\Laragraph\Enums\RunStatus;
it('completes the pipeline', function () {
$run = Laragraph::run(MyPipeline::class, ['input' => 'hello']);
expect($run->fresh())
->status->toBe(RunStatus::Completed)
->state->toHaveKey('output');
});For unit-testing individual nodes, use the makeContext() test helper:
use function Cainy\Laragraph\Tests\makeContext;
it('returns a summary mutation', function () {
$node = new SummarizeNode();
$mutation = $node->handle(
makeContext(nodeName: 'summarize'),
['text' => 'Long article...'],
);
expect($mutation)->toHaveKey('summary');
});The MIT License (MIT). Please see License File for more information.