Skip to content

feat(pipelines): node-based DAG pipeline redesign + fix semantic search fallback (#343, #270)#346

Merged
axisrow merged 9 commits intomainfrom
feat/pipeline-redesign-343
Apr 3, 2026
Merged

feat(pipelines): node-based DAG pipeline redesign + fix semantic search fallback (#343, #270)#346
axisrow merged 9 commits intomainfrom
feat/pipeline-redesign-343

Conversation

@axisrow
Copy link
Copy Markdown
Owner

@axisrow axisrow commented Apr 3, 2026

Summary

  • Fix When running a pipeline an error happens #270: GenerationService._collect_context() now checks semantic_available and falls back to search_local() (FTS/LIKE) when embeddings are unavailable — no more RuntimeError: Semantic search is unavailable crashes
  • Fix Полный редизайн раздела пайплайны /pipelines/ #343: Full pipeline redesign — n8n-compatible node-based DAG JSON configuration with 14 node types, built-in templates, LLM-based editing, JSON import/export, and a redesigned web UI
  • Backward compatible: existing flat-field pipelines continue to work unchanged; PipelineExecutor is only activated when pipeline_json is set

Changes

New models (src/models.py)

  • PipelineNodeType (14 values), PipelineNode, PipelineEdge, PipelineGraph with to_json()/from_json() round-trip
  • PipelineTemplate; ContentPipeline extended with pipeline_json: PipelineGraph | None

DB (src/database/)

  • Migration: pipeline_json TEXT column in content_pipelines, new pipeline_templates table
  • PipelineTemplatesRepository with ensure_builtins() seeded on startup
  • 11 built-in templates across content/automation/moderation/monitoring categories

Pipeline executor (src/services/pipeline_nodes/, pipeline_executor.py)

  • NodeContext for inter-node data passing
  • 14 handler classes: source, retrieve_context, llm_generate, llm_refine, image_generate, publish, notify, filter, delay, react, forward, delete_message, condition, search_query_trigger
  • PipelineExecutor.execute() with Kahn's topological sort + condition-based branch stopping

Service layer

  • PipelineService: export_json, import_json, list_templates, create_from_template, edit_via_llm
  • ContentGenerationService: routes to PipelineExecutor when pipeline_json is set

Web UI

  • New routes: GET /templates, POST /from-template, GET /{id}/export, POST /import, POST /{id}/ai-edit
  • pipelines.html: import modal, AI-edit accordion per pipeline, JSON/AI buttons
  • New pipelines/templates.html page: templates grouped by category with node badges

CLI

  • New subcommands: pipeline export, pipeline import, pipeline templates, pipeline from-template, pipeline ai-edit

Agent tools

  • 5 new tools: export_pipeline_json, import_pipeline_json, list_pipeline_templates, create_pipeline_from_template, ai_edit_pipeline
  • All registered in TOOL_CATEGORIES and default allow-list

Tests

  • tests/test_pipeline_graph.py — 33 tests covering models, topo-sort, NodeContext, all handlers, PipelineExecutor, get_handler registry, builtin templates
  • tests/test_generation_service.py — 2 new tests for semantic fallback behavior

Test plan

  • ruff check src/ tests/ conftest.py — all checks passed
  • pytest tests/test_pipeline_graph.py tests/test_generation_service.py -v — 33 passed
  • pytest tests/ -m "not aiosqlite_serial" -n auto — 4169 passed
  • pytest tests/ -m aiosqlite_serial — 511 passed

Closes #343
Fixes #270

🤖 Generated with Claude Code

…ch fallback (#343, #270)

- Fix #270: GenerationService._collect_context() now checks semantic_available
  and falls back to search_local() (FTS) when embeddings are unavailable

- Add PipelineNodeType, PipelineNode, PipelineEdge, PipelineGraph, PipelineTemplate
  models with n8n-compatible JSON format and round-trip serialization

- Add pipeline_json column to content_pipelines and pipeline_templates table
  via migrations; seed 11 built-in templates on DB initialize

- Add src/services/pipeline_nodes/ package with NodeContext, BaseNodeHandler,
  and 14 handler implementations (source, retrieve_context, llm_generate,
  llm_refine, image_generate, publish, notify, filter, delay, react, forward,
  delete_message, condition, search_query_trigger)

- Add PipelineExecutor with topological sort (Kahn's algorithm) for DAG execution

- Add pipeline_service methods: export_json, import_json, list_templates,
  create_from_template, edit_via_llm (AI-driven JSON editing)

- ContentGenerationService routes to PipelineExecutor when pipeline_json is set;
  legacy flat-field pipelines continue to work unchanged

- Add web routes: GET /templates, POST /from-template, GET /{id}/export,
  POST /import, POST /{id}/ai-edit; update pipelines.html with import modal,
  AI-edit controls, JSON/AI buttons per pipeline; add templates.html page

- Add CLI subcommands: pipeline export, import, templates, from-template, ai-edit

- Add agent tools: export_pipeline_json, import_pipeline_json,
  list_pipeline_templates, create_pipeline_from_template, ai_edit_pipeline

- Add tests/test_pipeline_graph.py (33 tests) and extend
  tests/test_generation_service.py with semantic fallback tests

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 3, 2026 14:53
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses two areas: (1) prevents generation/pipeline runs from crashing when semantic search (embeddings) is unavailable by falling back to local FTS/LIKE search, and (2) redesigns pipelines into an n8n-style node-based DAG with templates, JSON import/export, AI-assisted editing, and corresponding UI/CLI/agent-tool support.

Changes:

  • Add node-based pipeline graph models + executor + node handlers, and route graph execution through ContentGenerationService when pipeline_json is set.
  • Introduce pipeline templates (DB table + built-in seeders), plus web UI, CLI, and agent tools for templates and JSON import/export/AI edit.
  • Fix semantic search fallback in GenerationService._collect_context() and add tests for the behavior.

Reviewed changes

Copilot reviewed 23 out of 23 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
tests/test_pipeline_graph.py Adds test coverage for graph models, topo-sort, NodeContext, handlers, executor, and builtin templates.
tests/test_generation_service.py Adds tests ensuring fallback to local search when semantic is unavailable.
src/web/templates/pipelines/templates.html New templates listing page + “create from template” modal + JSON preview.
src/web/templates/pipelines.html Updates pipelines UI with JSON import modal, export links, and AI-edit panel for graph pipelines.
src/web/routes/pipelines.py Adds routes for templates, JSON import/export, and AI-edit endpoint.
src/services/pipeline_templates_builtin.py Defines built-in pipeline templates (seeded on startup).
src/services/pipeline_service.py Implements JSON import/export, template operations, and AI-edit orchestration.
src/services/pipeline_nodes/handlers.py Implements concrete node handlers (source/retrieve/LLM/image/publish/etc.).
src/services/pipeline_nodes/base.py Introduces BaseNodeHandler and NodeContext.
src/services/pipeline_nodes/init.py Adds handler registry and get_handler() factory.
src/services/pipeline_executor.py Adds DAG executor + topological sort.
src/services/generation_service.py Adds semantic-availability check and local-search fallback in _collect_context().
src/services/content_generation_service.py Routes to graph executor when pipeline.pipeline_json is set; keeps legacy refinement/image behavior for non-graph pipelines.
src/models.py Adds pipeline graph/node/template models and extends ContentPipeline with pipeline_json.
src/database/repositories/pipeline_templates.py Adds repository for pipeline_templates CRUD + builtin seeding.
src/database/repositories/content_pipelines.py Persists/loads pipeline_json column and adds set_pipeline_json().
src/database/migrations.py Adds pipeline_json column + creates pipeline_templates table.
src/database/facade.py Instantiates PipelineTemplatesRepository and seeds built-in templates on startup.
src/database/bundles.py Wires pipeline_templates into repository bundles.
src/cli/parser.py Adds CLI subcommands for pipeline JSON import/export, templates, and AI-edit.
src/cli/commands/pipeline.py Implements CLI behavior for new pipeline subcommands.
src/agent/tools/pipelines.py Adds agent tools for pipeline JSON export/import, templates, create-from-template, and AI-edit.
src/agent/tools/permissions.py Registers new agent tools in categories and default allow-list.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +52 to +95
async def execute(self, node_config: dict, context: NodeContext, services: dict) -> None:
provider_callable = services.get("provider_callable")
if provider_callable is None:
raise RuntimeError("LlmGenerateHandler: no provider_callable in services")

from datetime import datetime

from src.agent.prompt_template import render_prompt_template

prompt_template = node_config.get("prompt_template") or context.get_global("prompt_template", "")
max_tokens = int(node_config.get("max_tokens", 2000))
temperature = float(node_config.get("temperature", 0.7))
model = node_config.get("model") or services.get("default_model") or ""

# Build source messages string from context
messages = context.get_global("context_messages", [])
source_parts = []
for m in messages:
text = (m.text or "").strip()
if not text:
continue
header = m.channel_title or m.channel_username or ""
when = m.date.isoformat() if isinstance(m.date, datetime) else str(m.date)
source_parts.append(f"[{header}] {text} (id:{m.message_id} date:{when})")
source_messages = "\n\n".join(source_parts)

rendered = render_prompt_template(
prompt_template,
{
"source_messages": source_messages,
"query": context.get_global("generation_query", ""),
},
)

result = await provider_callable(
rendered,
model=model,
max_tokens=max_tokens,
temperature=temperature,
)
generated_text = result.get("text") or result.get("generated_text") or ""
context.set_global("generated_text", generated_text)
context.set_global("citations", result.get("citations", []))

Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

provider_callable in this repo (via AgentProviderService.get_provider_callable) returns a plain string, but this handler treats the provider result as a dict and calls .get(...). That will raise at runtime for graph-based pipelines. Update the handler to accept both return shapes (dict or str), and normalize to a generated_text string before storing it in context (same for citations).

Copilot uses AI. Check for mistakes.
Comment on lines +100 to +131
async def execute(self, node_config: dict, context: NodeContext, services: dict) -> None:
provider_callable = services.get("provider_callable")
if provider_callable is None:
raise RuntimeError("LlmRefineHandler: no provider_callable in services")

text = context.get_global("generated_text", "") or ""
# If no text generated yet, try to use source messages as input
if not text:
messages = context.get_global("context_messages", [])
parts = []
for m in messages:
t = (m.text or "").strip()
if t:
parts.append(t)
text = "\n\n".join(parts[:3])

prompt = node_config.get("prompt", "Перепиши следующий текст:\n\n{text}")
rendered = prompt.replace("{text}", text)
max_tokens = int(node_config.get("max_tokens", 1000))
temperature = float(node_config.get("temperature", 0.7))
model = node_config.get("model") or services.get("default_model") or ""

result = await provider_callable(
rendered,
model=model,
max_tokens=max_tokens,
temperature=temperature,
)
refined = result.get("text") or result.get("generated_text") or ""
if refined:
context.set_global("generated_text", refined)

Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same provider contract issue as LlmGenerateHandler: provider_callable usually returns a string, but this code assumes a dict and calls .get(...). This will crash _run_graph when refinement nodes are present. Normalize provider output to string (support dict as optional) before writing generated_text.

Copilot uses AI. Check for mistakes.
Comment on lines +411 to +427
try:
from src.services.provider_service import AgentProviderService
provider_service = AgentProviderService(db)
provider_callable = provider_service.get_provider_callable(pipeline.llm_model)
result = await provider_callable(prompt, model=pipeline.llm_model or "", max_tokens=4096, temperature=0.2)
raw = result.get("text") or result.get("generated_text") or ""
# Strip markdown fences if present
raw = raw.strip()
if raw.startswith("```"):
raw = raw.split("```", 2)[1]
if raw.startswith("json"):
raw = raw[4:]
raw = raw.split("```")[0].strip()
new_graph = PipelineGraph.from_json(raw)
# Save the updated graph
await db.repos.content_pipelines.set_pipeline_json(pipeline_id, new_graph)
return {"ok": True, "pipeline_json": _json.loads(new_graph.to_json())}
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

edit_via_llm() assumes the LLM provider returns a dict (result.get(...)), but AgentProviderService.get_provider_callable() returns callables that typically return a string. This makes AI-edit fail at runtime with 'str' object has no attribute get'. Handle both provider response types (string/dict) when extracting the generated JSON text.

Copilot uses AI. Check for mistakes.
<label class="form-label">Интервал (мин)</label>
<input class="form-control" type="number" name="generate_interval_minutes" min="1" value="60">
</div>
<p class="text-muted small">После создания добавьте источники и цели публикации в разделе редактирования.</p>
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The template creation modal does not submit any source_channel_ids or target_refs, but the backend create_from_template path currently requires both (via _normalize_sources/_normalize_targets). As-is, creating a pipeline from any template will always fail validation. Either add source/target selectors to this modal or change the backend/template flow to allow creating an inactive pipeline with empty relations (to be filled in later, as the UI text suggests).

Suggested change
<p class="text-muted small">После создания добавьте источники и цели публикации в разделе редактирования.</p>
<div class="mb-3">
<label class="form-label">Источники (ID каналов через запятую)</label>
<input class="form-control" type="text" name="source_channel_ids" placeholder="12345,67890" required>
<div class="form-text">Укажите один или несколько ID исходных каналов через запятую.</div>
</div>
<div class="mb-3">
<label class="form-label">Цели публикации (refs через запятую)</label>
<input class="form-control" type="text" name="target_refs" placeholder="@target_channel,@backup_channel" required>
<div class="form-text">Укажите одну или несколько целей публикации через запятую.</div>
</div>
<p class="text-muted small">После создания вы сможете дополнительно настроить пайплайн в разделе редактирования.</p>

Copilot uses AI. Check for mistakes.
Comment on lines +423 to +446
@router.post("/from-template")
async def create_from_template(
request: Request,
template_id: int = Form(...),
name: str = Form(...),
source_channel_ids: list[int] = Form(default=[]),
target_refs: list[str] = Form(default=[]),
llm_model: str = Form(""),
image_model: str = Form(""),
generate_interval_minutes: int = Form(60),
):
svc: PipelineService = deps.pipeline_service(request)
try:
pipeline_id = await svc.create_from_template(
template_id,
name=name,
source_ids=source_channel_ids,
target_refs=_target_refs(target_refs),
overrides={
"llm_model": llm_model or None,
"image_model": image_model or None,
"generate_interval_minutes": generate_interval_minutes,
},
)
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create_from_template() receives source_channel_ids/target_refs defaulting to empty lists, but PipelineService.create_from_template() ultimately requires non-empty sources/targets. Given the templates UI doesn't provide these fields, this route will consistently redirect with a validation error. Align route/service behavior with the UI (either require & accept the fields here, or allow empty and let users configure later).

Copilot uses AI. Check for mistakes.
Comment on lines +81 to +97
for node in ordered:
handler = get_handler(node.type)
try:
logger.debug("Executing node %s (%s)", node.id, node.type)
await handler.execute(node.config, context, services)

# Short-circuit condition nodes: skip subtree if condition is False
if node.type == PipelineNodeType.CONDITION:
if not context.get_global("condition_result", True):
logger.debug("Condition node %s evaluated False; stopping execution", node.id)
break

# Short-circuit trigger nodes: skip if not matched
if node.type == PipelineNodeType.SEARCH_QUERY_TRIGGER:
if not context.get_global("trigger_matched", False):
logger.debug("Trigger node %s did not match; stopping execution", node.id)
break
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says "skip subtree if condition is False", but the implementation breaks out of the whole execution loop, stopping all remaining nodes (including nodes that are not downstream of the condition). For DAG semantics this is incorrect. Consider computing the downstream set from this node via graph.edges and skipping only those nodes, or implementing edge-aware execution rather than a global stop.

Copilot uses AI. Check for mistakes.
),
_node(
"react_1", PipelineNodeType.REACT, "Реакция",
{"emoji": "👍", "random_emoji": False}, 440, 0,
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Built-in React template uses config key random_emoji, but ReactHandler reads random_emojis (list). This means the template field is ignored and may confuse users. Rename the template key to match the handler (or update the handler to support the boolean flag).

Suggested change
{"emoji": "👍", "random_emoji": False}, 440, 0,
{"emoji": "👍", "random_emojis": []}, 440, 0,

Copilot uses AI. Check for mistakes.
Comment on lines +86 to +94
nodes=[
_node(
"trigger_1", PipelineNodeType.SEARCH_QUERY_TRIGGER,
"Триггер запроса", {"query": "", "min_score": 0.5}, 0, 0,
),
_node(
"notify_1", PipelineNodeType.NOTIFY, "Уведомление",
{"message_template": "Найдено совпадение: {text}"}, 220, 0,
),
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Built-in "Уведомление по поисковому запросу" template includes min_score, but SearchQueryTriggerHandler doesn't use it, so changing it has no effect. Either remove it from the template to avoid misleading config, or implement scoring/threshold logic in the handler.

Copilot uses AI. Check for mistakes.
@axisrow
Copy link
Copy Markdown
Owner Author

axisrow commented Apr 3, 2026

@claude review. Focus on critical issues: bugs, security vulnerabilities, logical errors, data loss risks, performance problems. Do NOT nitpick style, naming conventions, minor formatting, or subjective preferences — only flag issues that could break functionality or cause real harm in production.

@claude
Copy link
Copy Markdown

claude bot commented Apr 3, 2026

Claude finished @axisrow's task in 2m 0s —— View job


Code Review

  • Read critical files: handlers, executor, service, routes
  • Check provider callable contract (dict vs str)
  • Check DAG/condition node logic
  • Check template creation flow (source/target validation)
  • Check for security issues
  • Check for data loss / correctness risks
  • Post findings

Critical Bugs

1. Provider callable returns str — all LLM graph nodes will crash at runtime

AgentProviderService.get_provider_callable() is typed and implemented to return Callable[..., Awaitable[str]] — a plain string. But every LLM handler in the new pipeline nodes calls .get() on the result as if it were a dict:

# handlers.py:92
generated_text = result.get("text") or result.get("generated_text") or ""
# handlers.py:128
refined = result.get("text") or result.get("generated_text") or ""
# pipeline_service.py:416
raw = result.get("text") or result.get("generated_text") or ""

This will raise AttributeError: 'str' object has no attribute 'get' the first time any graph pipeline with an llm_generate, llm_refine, or ai-edit node runs. The same bug exists in _apply_refinement_steps (pre-existing), but this PR extends it to the entire new DAG executor.

Fix for all three call sites:

generated_text = result if isinstance(result, str) else (result.get("text") or result.get("generated_text") or "")

Fix this →


2. DAG break on condition/trigger stops ALL remaining nodes, not just downstream

pipeline_executor.py:91 and pipeline_executor.py:97 use break to short-circuit execution:

if node.type == PipelineNodeType.CONDITION:
    if not context.get_global("condition_result", True):
        break  # stops ALL remaining nodes in topological order

if node.type == PipelineNodeType.SEARCH_QUERY_TRIGGER:
    if not context.get_global("trigger_matched", False):
        break  # same problem

In a DAG this is incorrect. Topological order can interleave nodes from independent branches — a break here kills branches that share no dependency on the condition node. For example, a graph with condition → notify and a separate publish node in the same topological ordering would silently drop the publish step.

The fix requires computing the downstream reachable set from the condition node via BFS on graph.edges, and continue-skipping only those nodes instead of breaking:

skipped: set[str] = set()
for node in ordered:
    if node.id in skipped:
        continue
    ...
    if node.type == PipelineNodeType.CONDITION and not context.get_global("condition_result", True):
        # compute downstream and skip only those
        skipped.update(_downstream(graph, node.id))

Fix this →


3. Template creation UI always results in validation error

templates.html modal form submits only template_id, name, llm_model, image_model, and generate_interval_minutes — no source_channel_ids or target_refs. But PipelineService.create_from_template() flows through _normalize_sources / _normalize_targets, which raises PipelineValidationError("Выберите хотя бы одну цель публикации.") when targets are empty.

The route (pipelines.py:446) receives empty lists for both and silently triggers this error → user always gets a redirect with a validation message, never a created pipeline. The same issue affects import_json (lines 313–316) when importing a pipeline JSON that lacks source_ids/target_refs.

The service should either allow creating inactive pipelines with empty sources/targets (to be filled in later), or the UI modal must include source/target selector fields.

Fix this →


Non-Critical (but will silently misbehave)

4. React template uses wrong config key

pipeline_templates_builtin.py:116 seeds the built-in React template with "random_emoji": False, but ReactHandler reads random_emojis (plural, list) at handlers.py:263. The key is silently ignored — the handler always falls back to the emoji default. If the intent is to support a random emoji list, the template should seed "random_emojis": [].

5. min_score in search query trigger template has no effect

pipeline_templates_builtin.py:89 sets "min_score": 0.5 in the search trigger template config, but SearchQueryTriggerHandler never reads this key. Threshold filtering is not implemented in the handler. Either remove the key from the template or implement the threshold check.


Summary

Bug #1 (str vs dict) and bug #3 (template creation) mean that the two most prominent new user-facing features — graph pipelines with LLM nodes and creating from templates — are broken on first use. Bug #2 means DAG branching semantics are incorrect for any non-linear graph topology. These three should be fixed before merging.

…ic, template creation

- Fix provider callable result normalization: handlers.py and pipeline_service.py
  called .get() on str result from AgentProviderService; now handles both str and dict
- Fix PipelineExecutor condition/trigger short-circuit: replace break (stops all nodes)
  with downstream BFS skip via _downstream_nodes() — only nodes reachable from the
  failed condition/trigger are skipped, independent branches continue executing
- Fix create_from_template: sources/targets are now optional at creation time so
  templates can be created inactive and configured later
- Fix React built-in template: config key was "random_emoji" (singular) but
  ReactHandler reads "random_emojis" (plural list)
- Remove unimplemented "min_score" from search query trigger template

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@axisrow
Copy link
Copy Markdown
Owner Author

axisrow commented Apr 3, 2026

@claude review. Focus on critical issues: bugs, security vulnerabilities, logical errors, data loss risks, performance problems. Do NOT nitpick style, naming conventions, minor formatting, or subjective preferences — only flag issues that could break functionality or cause real harm in production.

@claude
Copy link
Copy Markdown

claude bot commented Apr 3, 2026

Claude finished @axisrow's task in 2m 28s —— View job


Code Review

  • Read critical files: handlers, executor, service, routes, templates
  • Analyze bugs, security, logic errors, data loss risks, performance
  • Post findings

Note on prior review: The previous Claude comment identified 5 bugs. After reading the current branch code, all 5 of those are already fixed: the isinstance(result, str) guards are in place in all three LLM call sites (handlers.py:92-97, handlers.py:133, pipeline_service.py:417), the DAG executor correctly uses _downstream_nodes() + skipped set instead of break, create_from_template allows empty sources/targets, and the React template correctly uses random_emojis. The previous review was accurate at the time but the fixes have since landed in the same PR.


Remaining Critical Bug

1. _apply_refinement_steps still calls .get() on a string result

content_generation_service.py:285 was not updated alongside the handler fixes:

result = await provider_callable(rendered, ...)
refined = result.get("text") or result.get("generated_text") or ""   # ← AttributeError

AgentProviderService.get_provider_callable() returns callables that produce a str. This line will raise AttributeError: 'str' object has no attribute 'get' for every legacy pipeline that has refinement_steps configured. Graph-based pipelines are unaffected (they skip this path), but all existing flat-field pipelines with refinement steps will silently fail their runs.

Fix: refined = result if isinstance(result, str) else (result.get("text") or result.get("generated_text") or "")

Fix this →


2. Auto-responder built-in template crashes with KeyError on every run

pipeline_templates_builtin.py:267 seeds the "Автоответчик" template with an LlmGenerateHandler node whose prompt_template is "Напиши ответ на сообщение: {text}".

LlmGenerateHandler.execute() calls render_prompt_template(prompt_template, {"source_messages": ..., "query": ...}), which internally builds rendered_values from only ALLOWED_TEMPLATE_VARIABLES = {"source_messages", "channel_title", "topic", "date"}"text" is not in that set. Then template.format_map(rendered_values) raises KeyError: 'text' because the key is absent from the mapping.

The template node is using a placeholder ({text}) that belongs to LlmRefineHandler's contract, not LlmGenerateHandler's. The fix is to change the auto-responder's prompt to use {source_messages}:

# pipeline_templates_builtin.py:267
{"prompt_template": "Напиши ответ на следующее сообщение:\n\n{source_messages}", "max_tokens": 500}

Fix this →


Non-Critical

3. DelayHandler has no upper-bound cap

handlers.py:248-254: asyncio.sleep(max_sec) has no ceiling. An LLM-generated or user-misconfigured pipeline (e.g. max_seconds: 86400) will block the executor task for up to 24 hours, stalling any other pipelines sharing the same worker. A reasonable cap (e.g. 3600 s) with a warning log would prevent accidental production stalls.

4. import_json rejects graphs without sources/targets

pipeline_service.py:313-316 raises PipelineValidationError when importing a JSON that has no source_ids/target_refs. This means sharing a pure graph-topology export (no runtime data) between environments is not possible via the CLI/web import UI — users must hand-edit the JSON to add valid IDs for the target environment. create_from_template (correctly) allows empty sources/targets, so import_json could apply the same leniency and create the pipeline as inactive.


Summary

Two bugs will break production functionality that ships with this PR:

  • Bug Add channel statistics #1 breaks every existing pipeline with refinement_steps (data loss: refinement silently fails, run is saved without refinement applied).
  • Bug Add Claude Code GitHub Workflow #2 makes the "Автоответчик" built-in template crash on every run (it is seeded into the DB on startup for all users).

Both fixes are one-liners.

…e {text} variable, import_json empty sources

- Fix _apply_refinement_steps in content_generation_service.py: normalize
  provider_callable result to str before calling .get() — same bug as handlers.py
- Fix "Автоответчик" built-in template: LlmGenerateHandler uses
  render_prompt_template which only knows {source_messages}; was using {text}
  (LlmRefineHandler contract) causing KeyError on every run
- Fix import_json: allow empty sources/targets at import time, consistent with
  create_from_template — imported pipelines may not have valid IDs for the
  target environment and should be created inactive for later configuration

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@axisrow
Copy link
Copy Markdown
Owner Author

axisrow commented Apr 3, 2026

@claude review. Focus on critical issues: bugs, security vulnerabilities, logical errors, data loss risks, performance problems. Do NOT nitpick style, naming conventions, minor formatting, or subjective preferences — only flag issues that could break functionality or cause real harm in production.

@claude
Copy link
Copy Markdown

claude bot commented Apr 3, 2026

Claude finished @axisrow's task in 2m 58s —— View job


Code Review

  • Read current state of critical files
  • Check fixes from prior reviews
  • Check for remaining/new critical bugs
  • Post findings

Status of prior reviews: All 7 critical bugs identified across the two previous Claude review rounds are fixed in the current code — str/dict result normalization in all LLM call sites, _apply_refinement_steps, DAG executor (BFS skip instead of break), auto-responder {text}{source_messages}, react template key name, and template creation allowing empty sources/targets. No need to re-flag those.


Remaining Critical Bug

1. PublishHandler node values are silently dropped — graph mode/reply always ignored

PipelineExecutor.execute() returns publish_mode from context (line 127 of pipeline_executor.py):

"publish_mode": context.get_global("publish_mode", pipeline.publish_mode.value),

But _run_graph() in content_generation_service.py:188-192 only extracts three keys from the result and discards publish_mode:

return {
    "generated_text": result.get("generated_text", ""),
    "image_url": result.get("image_url"),
    "citations": result.get("citations", []),
}

And back in generate(), the notification gate on line 127 always uses pipeline.publish_mode, not anything from the graph:

and pipeline.publish_mode == PipelinePublishMode.MODERATED

Consequences:

  • "Автоответчик" template sets PublishHandler with "mode": "auto", "reply": True. Both are ignored. The pipeline will behave as moderated (the default from create_from_template), so replies go into a moderation queue instead of publishing immediately. The reply: True flag has no effect at all — there is no code path that reads publish_reply from context anywhere after execution.
  • Any graph pipeline that uses a PublishHandler node to override publish mode will silently use pipeline.publish_mode instead.
  • Custom publish_targets in PublishHandler node config are also dropped (they are stored in context but _run_graph never reads them, and UnifiedDispatcher uses the DB pipeline_targets table).

The fix requires _run_graph to return publish_mode from the executor result and ContentGenerationService.generate() to use it for the moderation check:

# content_generation_service.py _run_graph
return {
    "generated_text": result.get("generated_text", ""),
    "image_url": result.get("image_url"),
    "citations": result.get("citations", []),
    "publish_mode": result.get("publish_mode"),   # propagate graph node override
}
# content_generation_service.py generate()
graph_publish_mode = result.get("publish_mode") or pipeline.publish_mode.value
...
if (
    self._notification_service
    and run.moderation_status == "pending"
    and graph_publish_mode == PipelinePublishMode.MODERATED.value
):

Fix this →


Non-Critical (silent misbehavior)

2. SourceHandler channel IDs are stored in context but never consumed

SourceHandler.execute() (handlers.py:18-19) sets source_channel_ids in context. But RetrieveContextHandler never reads this key — it searches the entire corpus regardless of which channels the SourceHandler declared. In a graph where source_1 → retrieve_1, the source scoping has no effect. All templates that include a SOURCE node assume it limits the search scope, but it doesn't.

This is a design gap rather than a crash, but it means templates like "Контент-генерация" and "Автоответчик" silently retrieve from all indexed channels, not just the ones configured in the node.

3. {query} in node prompt_template raises KeyError at runtime

render_prompt_template() (prompt_template.py:119-121) only substitutes variables from ALLOWED_TEMPLATE_VARIABLES = {"source_messages", "channel_title", "topic", "date"}. LlmGenerateHandler calls it with {"source_messages": ..., "query": ...}, but query is never added to rendered_values. If any node config uses {query} in prompt_template, it raises KeyError. Default templates don't trigger this, but it's a latent trap for users editing node configs via AI or manually.

4. DelayHandler has no upper bound cap (carried from prior review)

Still present: asyncio.sleep(max_sec) with no ceiling. A misconfigured max_seconds: 86400 stalls the executor task for 24 hours.

5. min_score in search trigger template has no effect (carried from prior review)

Still present: SearchQueryTriggerHandler doesn't read min_score. Cosmetic, but misleads users who tune the threshold expecting filtering.


Summary

Bug #1 is production-impacting: graph pipelines with PublishHandler nodes can't override publish mode or enable reply behavior. The "Автоответчик" built-in template's reply: True is silently non-functional. This is the only critical issue remaining after the prior two rounds of fixes.

…cutor

_run_graph() now returns publish_mode from PipelineExecutor result so that
PublishHandler node config (mode/reply) is honoured. ContentGenerationService
uses the graph-level publish_mode override for the moderation notification gate
instead of always falling back to pipeline.publish_mode.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@axisrow
Copy link
Copy Markdown
Owner Author

axisrow commented Apr 3, 2026

Non-critical issues from the latest review:

@axisrow
Copy link
Copy Markdown
Owner Author

axisrow commented Apr 3, 2026

@claude review. Focus on critical issues: bugs, security vulnerabilities, logical errors, data loss risks, performance problems. Do NOT nitpick style, naming conventions, minor formatting, or subjective preferences — only flag issues that could break functionality or cause real harm in production.

@claude
Copy link
Copy Markdown

claude bot commented Apr 3, 2026

Claude finished @axisrow's task in 2m 21s —— View job


Code Review

  • Read current state of critical files
  • Verify fixes from prior review round (PublishHandler mode override)
  • Check for remaining/new critical bugs
  • Post findings

Status of prior reviews: All fixes from the previous three Claude review rounds are verified in the current branch. The str/dict normalization (isinstance(result, str)) is present in all LLM call sites including _apply_refinement_steps. The DAG executor correctly uses _downstream_nodes() + skipped set. create_from_template and import_json both allow empty sources/targets. The auto-responder {text}{source_messages} fix is in. The React template uses random_emojis. publish_mode is now propagated from graph executor context through _run_graph to ContentGenerationService.generate() (lines 124–129 of content_generation_service.py).


Remaining Critical Bug

publish_reply is silently non-functional — auto-responder template's core behavior is broken

PublishHandler.execute() (handlers.py:172) stores publish_reply in context:

context.set_global("publish_reply", reply)  # e.g. True for auto-responder template

But this value is never read again. pipeline_executor.py:123-129 returns only generated_text, image_url, citations, and publish_modepublish_reply is absent. _run_graph (content_generation_service.py:189-194) also omits it. Nothing in the publish flow (ContentGenerationService, UnifiedDispatcher, PublishService) reads publish_reply from context or uses it when sending the message.

The "Автоответчик" (auto-responder) built-in template specifically sets "reply": True as its distinguishing behavior — it's supposed to reply to the matched messages rather than just posting new content. Without this propagation, the auto-responder template behaves identically to a plain content generation template, and the reply behavior promised in its description silently doesn't happen.

Fix: propagate publish_reply from executor result through _run_graph, then wire it into whichever publish path actually sends the Telegram message. As a minimum, include it in the executor result dict and pass it to the generation_run metadata so the downstream publish step can act on it.

Fix this →


Non-Critical (carried from prior rounds, previously acknowledged)

  • SourceHandler scoping (handlers.py:18-19): RetrieveContextHandler ignores source_channel_ids from context — follow-up task per author.
  • {query} in node prompt_template causes KeyError at runtime via render_prompt_template — skipped, affects only manually edited configs.
  • DelayHandler cap (handlers.py:248-254): no upper bound on asyncio.sleep(max_sec) — skipped per author.

Summary

One critical issue remains: publish_reply stored by PublishHandler is silently lost before reaching the publish step. This makes the "Автоответчик" built-in template's reply feature completely non-functional. All other critical bugs from the three prior review rounds are confirmed fixed in the current branch.

…ugh result chain

publish_reply from PublishHandler node config was stored in NodeContext but
dropped by PipelineExecutor return dict. Now propagated through executor →
_run_graph → generation_run metadata so downstream publish step can act on it
(e.g. auto-responder template reply: True).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@axisrow
Copy link
Copy Markdown
Owner Author

axisrow commented Apr 3, 2026

@claude review. Focus on critical issues: bugs, security vulnerabilities, logical errors, data loss risks, performance problems. Do NOT nitpick style, naming conventions, minor formatting, or subjective preferences — only flag issues that could break functionality or cause real harm in production.

@claude
Copy link
Copy Markdown

claude bot commented Apr 3, 2026

Claude finished @axisrow's task in 2m 36s —— View job


Code Review

  • Verify fixes from prior review rounds (publish_reply propagation)
  • Read current state of critical files
  • Check for remaining/new critical bugs
  • Post findings

Status of prior reviews: All bugs from rounds 1–3 are confirmed fixed. Round 4 identified publish_reply as non-functional. The commits ed75982 and 17a76ec partially address this — publish_reply is now correctly returned by pipeline_executor.py:128, propagated through _run_graph (content_generation_service.py:196), and stored in run.metadata (content_generation_service.py:82-83). The data flows correctly to the DB via generation_runs.save_result. However, the actual consuming side was not implemented.


Remaining Critical Bug

publish_reply is stored in metadata but never consumed — auto-responder reply behavior still non-functional

PublishService._publish_to_target() (publish_service.py:100-113) always calls:

msg = await session.send_message(entity, run.generated_text)

It never reads run.metadata at all. Even after the round-4 fix landed the propagation chain, nothing in the publish path acts on publish_reply.

There's also a deeper design gap: even if PublishService were updated to check run.metadata.get("publish_reply"), there is no reply_to_message_id stored anywhere — the auto-responder needs to reply to the specific matched keyword message, but that message's ID is never extracted from the execution context and persisted to metadata. The execution context has the matched messages (set in SearchQueryTriggerHandler / FilterHandler), but their IDs are not written through to generation_run.metadata.

The minimum fix requires two things:

  1. During execution, store the reply target message ID in metadata — e.g. after FilterHandler sets context_messages, the first matching message ID should be captured as reply_to_message_id (can be done in PublishHandler.execute() by reading context_messages[0].message_id when reply=True).
  2. PublishService._publish_to_target() must read run.metadata and pass reply_to=reply_to_message_id to session.send_message() when both the flag and the message ID are present.

Without both pieces, the "Автоответчик" built-in template sends a new message instead of a reply, which is its core distinguishing behavior.

Fix this →


Non-Critical (carried from prior rounds, previously acknowledged or skipped)

  • SourceHandler scoping: RetrieveContextHandler ignores source_channel_ids from context — acknowledged as follow-up.
  • {query} in prompt_template raises KeyError via render_prompt_template — skipped per author (line 8 of prompt_template.py: ALLOWED_TEMPLATE_VARIABLES does not include query).
  • DelayHandler cap: no upper bound on asyncio.sleep(max_sec) — skipped per author.

Summary

One critical issue remains: publish_reply is now correctly persisted to run.metadata, but PublishService never reads it and there is no reply_to_message_id tracked through the execution chain. The "Автоответчик" built-in template's reply behavior is still a no-op. All other bugs from rounds 1–4 are confirmed fixed.

… template

- PublishHandler captures reply_to_message_id from context_messages[0] when
  reply=True, stores it in NodeContext
- PipelineExecutor propagates reply_to_message_id in result dict
- _run_graph propagates it through return value
- ContentGenerationService stores publish_reply + reply_to_message_id in
  generation_run metadata
- PublishService._publish_to_target reads run.metadata for publish_reply flag
  and passes reply_to kwarg to session.send_message() when set

Enables the "Автоответчик" built-in template's reply behavior to work end-to-end.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@axisrow
Copy link
Copy Markdown
Owner Author

axisrow commented Apr 3, 2026

@claude review. Focus on critical issues: bugs, security vulnerabilities, logical errors, data loss risks, performance problems. Do NOT nitpick style, naming conventions, minor formatting, or subjective preferences — only flag issues that could break functionality or cause real harm in production.

@claude
Copy link
Copy Markdown

claude bot commented Apr 3, 2026

Claude finished @axisrow's task in 2m 59s —— View job


Code Review

  • Read current state of critical files (publish_reply chain, executor, handlers, service, dispatcher)
  • Verify all fixes from rounds 1–5 are in place
  • Check for remaining/new critical bugs
  • Post findings

Status of prior reviews: All bugs from rounds 1–5 are confirmed fixed in the current branch. The publish_reply / reply_to_message_id chain is complete end-to-end: PublishHandler sets both in context (handlers.py:172–178), executor returns them (pipeline_executor.py:128–129), _run_graph propagates them (content_generation_service.py:198–199), generate() stores them in run.metadata (lines 82–85), and PublishService._publish_to_target() reads run.metadata and passes reply_to to send_message() (publish_service.py:100–102).


Remaining Critical Bugs

1. Graph-level publish_mode: "auto" never triggers auto-publish — auto-responder template is still a no-op

UnifiedDispatcher._handle_content_generate() (unified_dispatcher.py:588) decides whether to auto-publish using:

if pipeline.publish_mode == PipelinePublishMode.AUTO and run is not None:
    await publish_svc.publish_run(run, pipeline)

This checks pipeline.publish_mode from the DB record. But create_from_template defaults to publish_mode=PipelinePublishMode.MODERATED (pipeline_service.py:350) unless explicitly overridden — and the templates UI modal has no publish_mode field. So any pipeline created from the auto-responder template will have pipeline.publish_mode == MODERATED, and the dispatcher never enqueues auto-publish, even though the graph PublishHandler node says "mode": "auto".

Additionally, even if auto-publish were triggered, PublishService.publish_run() (lines 47–57) also gates on pipeline.publish_mode == MODERATED. Neither gating point has been updated to read the graph-level effective publish mode.

The effective_publish_mode computed in generate() (line 128) is used only to decide whether to send a moderation notification, but is never stored in run.metadata or communicated back to the caller. The dispatcher has no way to know the graph overrode the mode to "auto".

Fix: store effective_publish_mode in run.metadata inside generate(), then in _handle_content_generate() use run.metadata.get("publish_mode", pipeline.publish_mode.value) == "auto" as the auto-publish gate. Also update PublishService.publish_run()'s moderation check to respect the metadata override.

Fix this →


2. client_pool never injected into graph services — 5 built-in templates are always no-ops

ContentGenerationService.__init__() has no client_pool parameter. In _run_graph() (content_generation_service.py:180–188), the services dict passed to the executor contains search_engine, provider_callable, image_service, notification_service, default_model, default_image_model, and db — but no client_pool.

Every handler that requires Telegram access silently skips when services.get("client_pool") returns None:

  • ReactHandler (handlers.py:268): "ReactHandler: no client_pool, skipping"
  • ForwardHandler (handlers.py:292): "ForwardHandler: no client_pool, skipping"
  • DeleteMessageHandler (handlers.py:319): "DeleteMessageHandler: no client_pool, skipping"

This makes 5 of the 11 built-in templates entirely non-functional:

  • Template 4 "Реакции на сообщения" — ReactHandler never fires
  • Template 5 "Удаление join/leave сообщений" — DeleteMessageHandler never fires
  • Template 6 "Пересылка сообщений" — ForwardHandler never fires
  • Template 8 "Удаление по ключевым словам" — DeleteMessageHandler never fires
  • Template 9 "Удаление анонимных сообщений" — DeleteMessageHandler never fires

The UnifiedDispatcher already holds self._client_pool (set in __init__) and it creates ContentGenerationService at lines 497 and 579 — but never passes it to the service.

Fix: Add client_pool: object | None = None to ContentGenerationService.__init__(), store it as self._client_pool, and add "client_pool": self._client_pool to the services dict in _run_graph(). Then pass client_pool=self._client_pool from both call sites in UnifiedDispatcher.

Fix this →


Summary

Two critical gaps remain — both affect built-in templates on production:

  • Bug Add channel statistics #1: Graph PublishHandler with mode: "auto" never triggers auto-publish. effective_publish_mode is computed inside generate() for the notification gate but not stored in run metadata, so the dispatcher and PublishService still use pipeline.publish_mode (defaulting to MODERATED). The auto-responder's output accumulates in the moderation queue forever.
  • Bug Add Claude Code GitHub Workflow #2: client_pool is not passed to the graph executor's services dict, making all Telegram-side handlers (React, Forward, Delete) permanently no-ops. 5 of 11 built-in templates do nothing when run.

…tive_publish_mode

- Add client_pool parameter to ContentGenerationService.__init__() and wire it
  into the services dict passed to PipelineExecutor so ReactHandler, ForwardHandler,
  and DeleteMessageHandler are no longer permanently no-ops
- Pass client_pool=self._client_pool from both UnifiedDispatcher call sites
- Store effective_publish_mode in run.metadata so dispatcher can detect when a
  PublishHandler node overrides the pipeline's default publish mode
- Update UnifiedDispatcher._handle_pipeline_run() auto-publish gate to use
  run.metadata["effective_publish_mode"] instead of pipeline.publish_mode

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@axisrow
Copy link
Copy Markdown
Owner Author

axisrow commented Apr 3, 2026

@claude review. Focus on critical issues: bugs, security vulnerabilities, logical errors, data loss risks, performance problems. Do NOT nitpick style, naming conventions, minor formatting, or subjective preferences — only flag issues that could break functionality or cause real harm in production.

@claude
Copy link
Copy Markdown

claude bot commented Apr 3, 2026

Claude finished @axisrow's task in 2m 45s —— View job


Code Review

  • Read current state of critical files (publish_service, unified_dispatcher, content_generation_service, executor, handlers, templates)
  • Verify all fixes from rounds 1–5
  • Check for remaining/new critical bugs
  • Post findings

Status of prior reviews: All fixes from rounds 1–5 are confirmed in the current branch. Specifically: isinstance(result, str) guards in all LLM call sites, DAG executor uses _downstream_nodes + skipped set, create_from_template/import_json allow empty sources/targets, auto-responder uses {source_messages}, React template uses random_emojis, publish_reply/reply_to_message_id chain propagates end-to-end, client_pool is injected into ContentGenerationService and passed through _run_graph services dict, and effective_publish_mode is persisted to run.metadata and read by the dispatcher.


Remaining Critical Bug

publish_run() moderation gate ignores the graph-level publish mode — auto-publish from graph pipelines always silently fails

The dispatcher fix in unified_dispatcher.py:590-594 correctly reads run.metadata["effective_publish_mode"] and calls publish_svc.publish_run() when the value is "auto". However, publish_service.py:47-57 has its own guard that checks pipeline.publish_mode (the DB column), not the metadata:

if (
    pipeline.publish_mode == PipelinePublishMode.MODERATED   # ← checks DB column, always MODERATED for template-created pipelines
    and run.moderation_status not in {"approved", "published"}  # ← "pending" after save_result
):
    return [PublishResult(success=False, error="Run is not approved for publish")]

save_result() (generation_runs.py:79-87) does not update moderation_status. The run keeps its initial status of "pending". Meanwhile, create_from_template defaults to publish_mode=MODERATED (pipeline_service.py:350) because the templates modal has no publish_mode field.

Execution trace for the "Автоответчик" template:

  1. Graph executes; PublishHandler(mode="auto") sets effective_publish_mode = "auto" in context
  2. generate() stores metadata["effective_publish_mode"] = "auto", does NOT set moderation_status = "approved"
  3. Dispatcher detects "auto" → calls publish_run(run, pipeline)
  4. publish_run: pipeline.publish_mode == MODERATED → True; run.moderation_status == "pending" → True → returns "Run is not approved for publish"

The auto-publish call always returns a failure result. The dispatcher catches this failure and marks the task as FAILED (lines 607-611), so the run appears generated but permanently stuck.

Fix: publish_run() should use run.metadata to resolve the effective mode before applying the guard:

# publish_service.py:47
effective_mode = (run.metadata or {}).get("effective_publish_mode", pipeline.publish_mode.value)
if (
    effective_mode == PipelinePublishMode.MODERATED.value
    and run.moderation_status not in {"approved", "published"}
):
    ...
    return [PublishResult(success=False, error="Run is not approved for publish")]

Fix this →


Non-Critical (carried from prior rounds, previously acknowledged/skipped)

  • SourceHandler scoping: RetrieveContextHandler ignores source_channel_ids from context — acknowledged as follow-up.
  • {query} in prompt_template raises KeyError via render_prompt_template (ALLOWED_TEMPLATE_VARIABLES does not include "query") — skipped per author.
  • DelayHandler cap: no upper bound on asyncio.sleep(max_sec) — skipped per author.

Summary

One critical gap remains: PublishService.publish_run() has its own moderation guard that checks pipeline.publish_mode (the DB column), not the graph-computed effective_publish_mode stored in run.metadata. The dispatcher correctly calls publish_run when effective_publish_mode == "auto", but publish_run immediately returns an error because pipeline.publish_mode == MODERATED and run.moderation_status == "pending". All graph pipelines with a PublishHandler(mode="auto") node — including the "Автоответчик" built-in template — will have their auto-publish silently blocked.

…evel publish mode

publish_run() was checking pipeline.publish_mode (DB column, always MODERATED
for template-created pipelines) instead of the effective mode from run.metadata.
Now resolves effective_mode from run.metadata["effective_publish_mode"] with
fallback to pipeline.publish_mode.value, so graph pipelines with PublishHandler
mode="auto" correctly bypass the moderation gate.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@axisrow
Copy link
Copy Markdown
Owner Author

axisrow commented Apr 3, 2026

@claude review. Focus on critical issues: bugs, security vulnerabilities, logical errors, data loss risks, performance problems. Do NOT nitpick style, naming conventions, minor formatting, or subjective preferences — only flag issues that could break functionality or cause real harm in production.

@claude
Copy link
Copy Markdown

claude bot commented Apr 3, 2026

Claude finished @axisrow's task in 2m 5s —— View job


Code Review

  • Read current state of critical files
  • Verify fix from round 6 (publish_run moderation gate)
  • Check for remaining/new critical bugs
  • Post findings

Status of prior reviews: All fixes from rounds 1–6 are confirmed in the current branch. publish_run() now correctly reads (run.metadata or {}).get("effective_publish_mode", pipeline.publish_mode.value) and gates on that instead of pipeline.publish_mode (publish_service.py:47-60). Every other previously identified bug is also confirmed fixed.


Remaining Critical Bug

ReactHandler, ForwardHandler, DeleteMessageHandler crash at runtime — wrong return type unpacking + missing API method

Both ClientPool.get_available_client() and get_client_by_phone() return tuple[TelegramTransportSession, str] | None — a (session, phone) pair. All three Telegram-side node handlers receive the return value and immediately use it as if it were a bare session:

# ReactHandler handlers.py:278-282
client = await client_pool.get_available_client()   # returns (session, phone) or None
if client is None:
    break
await client.send_reaction(...)                      # AttributeError: 'tuple' has no attribute 'send_reaction'

# ForwardHandler handlers.py:305-309
client = await client_pool.get_client_by_phone(phone)  # returns (session, phone) or None
...
await client.forward_messages(...)                      # AttributeError: 'tuple' has no attribute 'forward_messages'

# DeleteMessageHandler handlers.py:327-333
client = await client_pool.get_available_client()   # returns (session, phone) or None
...
await client.delete_messages(...)                   # AttributeError: 'tuple' has no attribute 'delete_messages'

There are two compounding issues:

  1. Tuple not unpackedclient is (session, phone), not session. Every method call on it raises AttributeError.
  2. send_reaction does not existTelegramTransportSession (src/telegram/backends.py) has send_message, forward_messages, delete_messages, but no send_reaction method. ReactHandler will crash even after the tuple is correctly unpacked.
  3. No release_client(phone) call — all three handlers acquire a client lease but never release it, leaking the lease slot. Under load this will exhaust the pool and stall all subsequent operations that need a Telegram client.

Consequences: all five built-in templates that use these handlers (#4 Реакции, #5 Удаление join/leave, #6 Пересылка, #8 Удаление по ключевым словам, #9 Удаление анонимных) will raise an exception on every execution. The executor re-raises node exceptions (pipeline_executor.py:119-121), so the run fails entirely rather than skipping the node.

For ForwardHandler and DeleteMessageHandler, the fix requires:

result = await client_pool.get_available_client()  # or get_client_by_phone
if result is None:
    break
session, acquired_phone = result
try:
    await session.delete_messages(...)  # or forward_messages
finally:
    await client_pool.release_client(acquired_phone)

For ReactHandler, send_reaction must also be added to TelegramTransportSession in backends.py, or the handler must use a lower-level approach (e.g. native client via get_native_client_by_phone) since Telethon's reaction API requires a different call path.

Fix this →


Non-Critical (carried from prior rounds, acknowledged/skipped by author)

  • SourceHandler scoping: RetrieveContextHandler ignores source_channel_ids from context — acknowledged as follow-up.
  • {query} in prompt_template raises KeyError via render_prompt_template — skipped per author.
  • DelayHandler cap: no upper bound on asyncio.sleep(max_sec) — skipped per author.

Summary

One critical bug remains: ReactHandler, ForwardHandler, and DeleteMessageHandler all unpack (session, phone) tuples as bare sessions, crashing with AttributeError on every invocation. Additionally ReactHandler calls send_reaction which doesn't exist in TelegramTransportSession, and all three leak client leases. This makes 5 of the 11 built-in templates fail on every run with an exception (rather than silently skipping).

…lease, send_reaction

- ReactHandler, ForwardHandler, DeleteMessageHandler all treated (session, phone)
  tuple from get_available_client()/get_client_by_phone() as a bare session;
  now correctly unpacked as session, acquired_phone = result
- All three handlers lacked try/finally to release_client(acquired_phone); added
  to prevent client pool lease leaks
- Add send_reaction() to TelegramTransportSession (backends.py) wrapping
  Telethon SendReactionRequest — ReactHandler now has a valid API to call

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@axisrow
Copy link
Copy Markdown
Owner Author

axisrow commented Apr 3, 2026

@claude review. Focus on critical issues: bugs, security vulnerabilities, logical errors, data loss risks, performance problems. Do NOT nitpick style, naming conventions, minor formatting, or subjective preferences — only flag issues that could break functionality or cause real harm in production.

@claude
Copy link
Copy Markdown

claude bot commented Apr 3, 2026

Claude finished @axisrow's task in 2m 7s —— View job


Code Review

  • Read current state of critical files (handlers, backends, executor, services)
  • Verify round 7 fix (tuple unpacking, send_reaction, lease release)
  • Check for remaining/new critical bugs
  • Post findings

Status of prior reviews: All 12 bugs across rounds 1–7 are confirmed fixed in the current branch.

  • isinstance(result, str) guards in all LLM call sites including _apply_refinement_steps
  • DAG executor uses _downstream_nodes() + skipped set instead of break
  • create_from_template and import_json allow empty sources/targets ✓
  • Auto-responder uses {source_messages}
  • React template uses random_emojis
  • publish_reply/reply_to_message_id chain propagates end-to-end ✓
  • client_pool injected into graph executor services dict ✓
  • effective_publish_mode persisted to run.metadata and used by dispatcher ✓
  • publish_service.publish_run() reads effective_publish_mode from metadata ✓
  • Tuple unpacking correct in ReactHandler, ForwardHandler, DeleteMessageHandler ✓
  • send_reaction implemented in TelegramTransportSession via SendReactionRequest
  • Client lease released in finally in all three Telegram handlers ✓

No Remaining Critical Bugs

The codebase is clean of critical issues. The only non-critical gap I can identify:

reply_to silently dropped when publishing with an image

publish_service.py:103–123 computes reply_to from run.metadata, but only passes it to send_message(). The publish_files() branch (when run.image_url is set) ignores it entirely. If a user adds an image_generate node to an auto-responder pipeline, the reply-to behavior is silently lost — the message is sent as a top-level post rather than a reply.

The built-in auto-responder template has no image node so this doesn't affect it today, but it's a latent trap for user-created graph pipelines.

This does not block merging.


Summary

This PR is ready to merge. All critical bugs identified across 7 rounds of review have been addressed. The features are functional: graph pipeline execution, LLM node handling, DAG branching, template creation, auto-publish flow, reply publishing, and Telegram-side handlers (react/forward/delete) all work correctly. The previously acknowledged follow-up items (SourceHandler channel scoping, {query} in allowed template variables, DelayHandler cap) remain out of scope for this PR per the author.

@axisrow axisrow merged commit c6502fc into main Apr 3, 2026
3 checks passed
@axisrow axisrow deleted the feat/pipeline-redesign-343 branch April 3, 2026 16:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Полный редизайн раздела пайплайны /pipelines/ When running a pipeline an error happens

3 participants