|
1 | 1 | """Continue-as-new with caching using the LangGraph Functional API with Temporal. |
2 | 2 |
|
3 | | -Same pattern as the Graph API version, but using @task and @entrypoint decorators. |
| 3 | +Demonstrates Temporal's continue-as-new with LangGraph's task result caching |
| 4 | +to avoid re-executing completed @task functions across workflow boundaries. |
4 | 5 | """ |
5 | 6 |
|
6 | 7 | from dataclasses import dataclass |
7 | 8 | from datetime import timedelta |
8 | 9 | from typing import Any |
9 | 10 |
|
10 | | -from langgraph.func import entrypoint as lg_entrypoint |
11 | | -from langgraph.func import task |
| 11 | +from langgraph.func import entrypoint, task |
12 | 12 | from temporalio import workflow |
13 | | -from temporalio.contrib.langgraph import cache, entrypoint |
| 13 | +from temporalio.contrib.langgraph import cache |
| 14 | +from temporalio.contrib.langgraph import entrypoint as temporal_entrypoint |
14 | 15 |
|
15 | 16 |
|
16 | 17 | @task |
17 | | -def extract(data: int) -> int: |
18 | | - """Stage 1: Extract -- simulate data extraction by doubling the input.""" |
| 18 | +def double(data: int) -> int: |
| 19 | + """Stage 1: double the input.""" |
19 | 20 | return data * 2 |
20 | 21 |
|
21 | 22 |
|
22 | 23 | @task |
23 | | -def transform(data: int) -> int: |
24 | | - """Stage 2: Transform -- simulate transformation by adding 50.""" |
| 24 | +def add_50(data: int) -> int: |
| 25 | + """Stage 2: add 50.""" |
25 | 26 | return data + 50 |
26 | 27 |
|
27 | 28 |
|
28 | 29 | @task |
29 | | -def load(data: int) -> int: |
30 | | - """Stage 3: Load -- simulate loading by tripling the result.""" |
| 30 | +def triple(data: int) -> int: |
| 31 | + """Stage 3: triple the result.""" |
31 | 32 | return data * 3 |
32 | 33 |
|
33 | 34 |
|
34 | | -@lg_entrypoint() |
| 35 | +@entrypoint() |
35 | 36 | async def pipeline_entrypoint(data: int) -> dict: |
36 | | - """Run the 3-stage pipeline: extract -> transform -> load.""" |
37 | | - extracted = await extract(data) |
38 | | - transformed = await transform(extracted) |
39 | | - loaded = await load(transformed) |
40 | | - return {"result": loaded} |
| 37 | + """Run the 3-stage pipeline: double -> add_50 -> triple.""" |
| 38 | + doubled = await double(data) |
| 39 | + plus_50 = await add_50(doubled) |
| 40 | + tripled = await triple(plus_50) |
| 41 | + return {"result": tripled} |
41 | 42 |
|
42 | 43 |
|
43 | | -all_tasks: list[Any] = [extract, transform, load] |
| 44 | +all_tasks: list[Any] = [double, add_50, triple] |
44 | 45 |
|
45 | 46 | activity_options = { |
46 | 47 | t.func.__name__: { |
@@ -68,7 +69,7 @@ class PipelineFunctionalWorkflow: |
68 | 69 |
|
69 | 70 | @workflow.run |
70 | 71 | async def run(self, input_data: PipelineInput) -> dict[str, Any]: |
71 | | - app = entrypoint("pipeline", cache=input_data.cache) |
| 72 | + app = temporal_entrypoint("pipeline", cache=input_data.cache) |
72 | 73 | result = await app.ainvoke(input_data.data) |
73 | 74 |
|
74 | 75 | if input_data.phase < 3: |
|
0 commit comments