Conversation
…ch fallback (#343, #270) - Fix #270: GenerationService._collect_context() now checks semantic_available and falls back to search_local() (FTS) when embeddings are unavailable - Add PipelineNodeType, PipelineNode, PipelineEdge, PipelineGraph, PipelineTemplate models with n8n-compatible JSON format and round-trip serialization - Add pipeline_json column to content_pipelines and pipeline_templates table via migrations; seed 11 built-in templates on DB initialize - Add src/services/pipeline_nodes/ package with NodeContext, BaseNodeHandler, and 14 handler implementations (source, retrieve_context, llm_generate, llm_refine, image_generate, publish, notify, filter, delay, react, forward, delete_message, condition, search_query_trigger) - Add PipelineExecutor with topological sort (Kahn's algorithm) for DAG execution - Add pipeline_service methods: export_json, import_json, list_templates, create_from_template, edit_via_llm (AI-driven JSON editing) - ContentGenerationService routes to PipelineExecutor when pipeline_json is set; legacy flat-field pipelines continue to work unchanged - Add web routes: GET /templates, POST /from-template, GET /{id}/export, POST /import, POST /{id}/ai-edit; update pipelines.html with import modal, AI-edit controls, JSON/AI buttons per pipeline; add templates.html page - Add CLI subcommands: pipeline export, import, templates, from-template, ai-edit - Add agent tools: export_pipeline_json, import_pipeline_json, list_pipeline_templates, create_pipeline_from_template, ai_edit_pipeline - Add tests/test_pipeline_graph.py (33 tests) and extend tests/test_generation_service.py with semantic fallback tests Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR addresses two areas: (1) prevents generation/pipeline runs from crashing when semantic search (embeddings) is unavailable by falling back to local FTS/LIKE search, and (2) redesigns pipelines into an n8n-style node-based DAG with templates, JSON import/export, AI-assisted editing, and corresponding UI/CLI/agent-tool support.
Changes:
- Add node-based pipeline graph models + executor + node handlers, and route graph execution through
ContentGenerationServicewhenpipeline_jsonis set. - Introduce pipeline templates (DB table + built-in seeders), plus web UI, CLI, and agent tools for templates and JSON import/export/AI edit.
- Fix semantic search fallback in
GenerationService._collect_context()and add tests for the behavior.
Reviewed changes
Copilot reviewed 23 out of 23 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_pipeline_graph.py | Adds test coverage for graph models, topo-sort, NodeContext, handlers, executor, and builtin templates. |
| tests/test_generation_service.py | Adds tests ensuring fallback to local search when semantic is unavailable. |
| src/web/templates/pipelines/templates.html | New templates listing page + “create from template” modal + JSON preview. |
| src/web/templates/pipelines.html | Updates pipelines UI with JSON import modal, export links, and AI-edit panel for graph pipelines. |
| src/web/routes/pipelines.py | Adds routes for templates, JSON import/export, and AI-edit endpoint. |
| src/services/pipeline_templates_builtin.py | Defines built-in pipeline templates (seeded on startup). |
| src/services/pipeline_service.py | Implements JSON import/export, template operations, and AI-edit orchestration. |
| src/services/pipeline_nodes/handlers.py | Implements concrete node handlers (source/retrieve/LLM/image/publish/etc.). |
| src/services/pipeline_nodes/base.py | Introduces BaseNodeHandler and NodeContext. |
| src/services/pipeline_nodes/init.py | Adds handler registry and get_handler() factory. |
| src/services/pipeline_executor.py | Adds DAG executor + topological sort. |
| src/services/generation_service.py | Adds semantic-availability check and local-search fallback in _collect_context(). |
| src/services/content_generation_service.py | Routes to graph executor when pipeline.pipeline_json is set; keeps legacy refinement/image behavior for non-graph pipelines. |
| src/models.py | Adds pipeline graph/node/template models and extends ContentPipeline with pipeline_json. |
| src/database/repositories/pipeline_templates.py | Adds repository for pipeline_templates CRUD + builtin seeding. |
| src/database/repositories/content_pipelines.py | Persists/loads pipeline_json column and adds set_pipeline_json(). |
| src/database/migrations.py | Adds pipeline_json column + creates pipeline_templates table. |
| src/database/facade.py | Instantiates PipelineTemplatesRepository and seeds built-in templates on startup. |
| src/database/bundles.py | Wires pipeline_templates into repository bundles. |
| src/cli/parser.py | Adds CLI subcommands for pipeline JSON import/export, templates, and AI-edit. |
| src/cli/commands/pipeline.py | Implements CLI behavior for new pipeline subcommands. |
| src/agent/tools/pipelines.py | Adds agent tools for pipeline JSON export/import, templates, create-from-template, and AI-edit. |
| src/agent/tools/permissions.py | Registers new agent tools in categories and default allow-list. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| async def execute(self, node_config: dict, context: NodeContext, services: dict) -> None: | ||
| provider_callable = services.get("provider_callable") | ||
| if provider_callable is None: | ||
| raise RuntimeError("LlmGenerateHandler: no provider_callable in services") | ||
|
|
||
| from datetime import datetime | ||
|
|
||
| from src.agent.prompt_template import render_prompt_template | ||
|
|
||
| prompt_template = node_config.get("prompt_template") or context.get_global("prompt_template", "") | ||
| max_tokens = int(node_config.get("max_tokens", 2000)) | ||
| temperature = float(node_config.get("temperature", 0.7)) | ||
| model = node_config.get("model") or services.get("default_model") or "" | ||
|
|
||
| # Build source messages string from context | ||
| messages = context.get_global("context_messages", []) | ||
| source_parts = [] | ||
| for m in messages: | ||
| text = (m.text or "").strip() | ||
| if not text: | ||
| continue | ||
| header = m.channel_title or m.channel_username or "" | ||
| when = m.date.isoformat() if isinstance(m.date, datetime) else str(m.date) | ||
| source_parts.append(f"[{header}] {text} (id:{m.message_id} date:{when})") | ||
| source_messages = "\n\n".join(source_parts) | ||
|
|
||
| rendered = render_prompt_template( | ||
| prompt_template, | ||
| { | ||
| "source_messages": source_messages, | ||
| "query": context.get_global("generation_query", ""), | ||
| }, | ||
| ) | ||
|
|
||
| result = await provider_callable( | ||
| rendered, | ||
| model=model, | ||
| max_tokens=max_tokens, | ||
| temperature=temperature, | ||
| ) | ||
| generated_text = result.get("text") or result.get("generated_text") or "" | ||
| context.set_global("generated_text", generated_text) | ||
| context.set_global("citations", result.get("citations", [])) | ||
|
|
There was a problem hiding this comment.
provider_callable in this repo (via AgentProviderService.get_provider_callable) returns a plain string, but this handler treats the provider result as a dict and calls .get(...). That will raise at runtime for graph-based pipelines. Update the handler to accept both return shapes (dict or str), and normalize to a generated_text string before storing it in context (same for citations).
| async def execute(self, node_config: dict, context: NodeContext, services: dict) -> None: | ||
| provider_callable = services.get("provider_callable") | ||
| if provider_callable is None: | ||
| raise RuntimeError("LlmRefineHandler: no provider_callable in services") | ||
|
|
||
| text = context.get_global("generated_text", "") or "" | ||
| # If no text generated yet, try to use source messages as input | ||
| if not text: | ||
| messages = context.get_global("context_messages", []) | ||
| parts = [] | ||
| for m in messages: | ||
| t = (m.text or "").strip() | ||
| if t: | ||
| parts.append(t) | ||
| text = "\n\n".join(parts[:3]) | ||
|
|
||
| prompt = node_config.get("prompt", "Перепиши следующий текст:\n\n{text}") | ||
| rendered = prompt.replace("{text}", text) | ||
| max_tokens = int(node_config.get("max_tokens", 1000)) | ||
| temperature = float(node_config.get("temperature", 0.7)) | ||
| model = node_config.get("model") or services.get("default_model") or "" | ||
|
|
||
| result = await provider_callable( | ||
| rendered, | ||
| model=model, | ||
| max_tokens=max_tokens, | ||
| temperature=temperature, | ||
| ) | ||
| refined = result.get("text") or result.get("generated_text") or "" | ||
| if refined: | ||
| context.set_global("generated_text", refined) | ||
|
|
There was a problem hiding this comment.
Same provider contract issue as LlmGenerateHandler: provider_callable usually returns a string, but this code assumes a dict and calls .get(...). This will crash _run_graph when refinement nodes are present. Normalize provider output to string (support dict as optional) before writing generated_text.
| try: | ||
| from src.services.provider_service import AgentProviderService | ||
| provider_service = AgentProviderService(db) | ||
| provider_callable = provider_service.get_provider_callable(pipeline.llm_model) | ||
| result = await provider_callable(prompt, model=pipeline.llm_model or "", max_tokens=4096, temperature=0.2) | ||
| raw = result.get("text") or result.get("generated_text") or "" | ||
| # Strip markdown fences if present | ||
| raw = raw.strip() | ||
| if raw.startswith("```"): | ||
| raw = raw.split("```", 2)[1] | ||
| if raw.startswith("json"): | ||
| raw = raw[4:] | ||
| raw = raw.split("```")[0].strip() | ||
| new_graph = PipelineGraph.from_json(raw) | ||
| # Save the updated graph | ||
| await db.repos.content_pipelines.set_pipeline_json(pipeline_id, new_graph) | ||
| return {"ok": True, "pipeline_json": _json.loads(new_graph.to_json())} |
There was a problem hiding this comment.
edit_via_llm() assumes the LLM provider returns a dict (result.get(...)), but AgentProviderService.get_provider_callable() returns callables that typically return a string. This makes AI-edit fail at runtime with 'str' object has no attribute get'. Handle both provider response types (string/dict) when extracting the generated JSON text.
| <label class="form-label">Интервал (мин)</label> | ||
| <input class="form-control" type="number" name="generate_interval_minutes" min="1" value="60"> | ||
| </div> | ||
| <p class="text-muted small">После создания добавьте источники и цели публикации в разделе редактирования.</p> |
There was a problem hiding this comment.
The template creation modal does not submit any source_channel_ids or target_refs, but the backend create_from_template path currently requires both (via _normalize_sources/_normalize_targets). As-is, creating a pipeline from any template will always fail validation. Either add source/target selectors to this modal or change the backend/template flow to allow creating an inactive pipeline with empty relations (to be filled in later, as the UI text suggests).
| <p class="text-muted small">После создания добавьте источники и цели публикации в разделе редактирования.</p> | |
| <div class="mb-3"> | |
| <label class="form-label">Источники (ID каналов через запятую)</label> | |
| <input class="form-control" type="text" name="source_channel_ids" placeholder="12345,67890" required> | |
| <div class="form-text">Укажите один или несколько ID исходных каналов через запятую.</div> | |
| </div> | |
| <div class="mb-3"> | |
| <label class="form-label">Цели публикации (refs через запятую)</label> | |
| <input class="form-control" type="text" name="target_refs" placeholder="@target_channel,@backup_channel" required> | |
| <div class="form-text">Укажите одну или несколько целей публикации через запятую.</div> | |
| </div> | |
| <p class="text-muted small">После создания вы сможете дополнительно настроить пайплайн в разделе редактирования.</p> |
| @router.post("/from-template") | ||
| async def create_from_template( | ||
| request: Request, | ||
| template_id: int = Form(...), | ||
| name: str = Form(...), | ||
| source_channel_ids: list[int] = Form(default=[]), | ||
| target_refs: list[str] = Form(default=[]), | ||
| llm_model: str = Form(""), | ||
| image_model: str = Form(""), | ||
| generate_interval_minutes: int = Form(60), | ||
| ): | ||
| svc: PipelineService = deps.pipeline_service(request) | ||
| try: | ||
| pipeline_id = await svc.create_from_template( | ||
| template_id, | ||
| name=name, | ||
| source_ids=source_channel_ids, | ||
| target_refs=_target_refs(target_refs), | ||
| overrides={ | ||
| "llm_model": llm_model or None, | ||
| "image_model": image_model or None, | ||
| "generate_interval_minutes": generate_interval_minutes, | ||
| }, | ||
| ) |
There was a problem hiding this comment.
create_from_template() receives source_channel_ids/target_refs defaulting to empty lists, but PipelineService.create_from_template() ultimately requires non-empty sources/targets. Given the templates UI doesn't provide these fields, this route will consistently redirect with a validation error. Align route/service behavior with the UI (either require & accept the fields here, or allow empty and let users configure later).
src/services/pipeline_executor.py
Outdated
| for node in ordered: | ||
| handler = get_handler(node.type) | ||
| try: | ||
| logger.debug("Executing node %s (%s)", node.id, node.type) | ||
| await handler.execute(node.config, context, services) | ||
|
|
||
| # Short-circuit condition nodes: skip subtree if condition is False | ||
| if node.type == PipelineNodeType.CONDITION: | ||
| if not context.get_global("condition_result", True): | ||
| logger.debug("Condition node %s evaluated False; stopping execution", node.id) | ||
| break | ||
|
|
||
| # Short-circuit trigger nodes: skip if not matched | ||
| if node.type == PipelineNodeType.SEARCH_QUERY_TRIGGER: | ||
| if not context.get_global("trigger_matched", False): | ||
| logger.debug("Trigger node %s did not match; stopping execution", node.id) | ||
| break |
There was a problem hiding this comment.
The comment says "skip subtree if condition is False", but the implementation breaks out of the whole execution loop, stopping all remaining nodes (including nodes that are not downstream of the condition). For DAG semantics this is incorrect. Consider computing the downstream set from this node via graph.edges and skipping only those nodes, or implementing edge-aware execution rather than a global stop.
| ), | ||
| _node( | ||
| "react_1", PipelineNodeType.REACT, "Реакция", | ||
| {"emoji": "👍", "random_emoji": False}, 440, 0, |
There was a problem hiding this comment.
Built-in React template uses config key random_emoji, but ReactHandler reads random_emojis (list). This means the template field is ignored and may confuse users. Rename the template key to match the handler (or update the handler to support the boolean flag).
| {"emoji": "👍", "random_emoji": False}, 440, 0, | |
| {"emoji": "👍", "random_emojis": []}, 440, 0, |
| nodes=[ | ||
| _node( | ||
| "trigger_1", PipelineNodeType.SEARCH_QUERY_TRIGGER, | ||
| "Триггер запроса", {"query": "", "min_score": 0.5}, 0, 0, | ||
| ), | ||
| _node( | ||
| "notify_1", PipelineNodeType.NOTIFY, "Уведомление", | ||
| {"message_template": "Найдено совпадение: {text}"}, 220, 0, | ||
| ), |
There was a problem hiding this comment.
Built-in "Уведомление по поисковому запросу" template includes min_score, but SearchQueryTriggerHandler doesn't use it, so changing it has no effect. Either remove it from the template to avoid misleading config, or implement scoring/threshold logic in the handler.
|
@claude review. Focus on critical issues: bugs, security vulnerabilities, logical errors, data loss risks, performance problems. Do NOT nitpick style, naming conventions, minor formatting, or subjective preferences — only flag issues that could break functionality or cause real harm in production. |
|
Claude finished @axisrow's task in 2m 0s —— View job Code Review
Critical Bugs1. Provider callable returns
|
…ic, template creation - Fix provider callable result normalization: handlers.py and pipeline_service.py called .get() on str result from AgentProviderService; now handles both str and dict - Fix PipelineExecutor condition/trigger short-circuit: replace break (stops all nodes) with downstream BFS skip via _downstream_nodes() — only nodes reachable from the failed condition/trigger are skipped, independent branches continue executing - Fix create_from_template: sources/targets are now optional at creation time so templates can be created inactive and configured later - Fix React built-in template: config key was "random_emoji" (singular) but ReactHandler reads "random_emojis" (plural list) - Remove unimplemented "min_score" from search query trigger template Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@claude review. Focus on critical issues: bugs, security vulnerabilities, logical errors, data loss risks, performance problems. Do NOT nitpick style, naming conventions, minor formatting, or subjective preferences — only flag issues that could break functionality or cause real harm in production. |
|
Claude finished @axisrow's task in 2m 28s —— View job Code Review
Remaining Critical Bug1.
|
…e {text} variable, import_json empty sources
- Fix _apply_refinement_steps in content_generation_service.py: normalize
provider_callable result to str before calling .get() — same bug as handlers.py
- Fix "Автоответчик" built-in template: LlmGenerateHandler uses
render_prompt_template which only knows {source_messages}; was using {text}
(LlmRefineHandler contract) causing KeyError on every run
- Fix import_json: allow empty sources/targets at import time, consistent with
create_from_template — imported pipelines may not have valid IDs for the
target environment and should be created inactive for later configuration
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@claude review. Focus on critical issues: bugs, security vulnerabilities, logical errors, data loss risks, performance problems. Do NOT nitpick style, naming conventions, minor formatting, or subjective preferences — only flag issues that could break functionality or cause real harm in production. |
|
Claude finished @axisrow's task in 2m 58s —— View job Code Review
Remaining Critical Bug1.
|
…cutor _run_graph() now returns publish_mode from PipelineExecutor result so that PublishHandler node config (mode/reply) is honoured. ContentGenerationService uses the graph-level publish_mode override for the moderation notification gate instead of always falling back to pipeline.publish_mode. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
Non-critical issues from the latest review:
|
|
@claude review. Focus on critical issues: bugs, security vulnerabilities, logical errors, data loss risks, performance problems. Do NOT nitpick style, naming conventions, minor formatting, or subjective preferences — only flag issues that could break functionality or cause real harm in production. |
|
Claude finished @axisrow's task in 2m 21s —— View job Code Review
Remaining Critical Bug
|
…ugh result chain publish_reply from PublishHandler node config was stored in NodeContext but dropped by PipelineExecutor return dict. Now propagated through executor → _run_graph → generation_run metadata so downstream publish step can act on it (e.g. auto-responder template reply: True). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@claude review. Focus on critical issues: bugs, security vulnerabilities, logical errors, data loss risks, performance problems. Do NOT nitpick style, naming conventions, minor formatting, or subjective preferences — only flag issues that could break functionality or cause real harm in production. |
|
Claude finished @axisrow's task in 2m 36s —— View job Code Review
Remaining Critical Bug
|
… template - PublishHandler captures reply_to_message_id from context_messages[0] when reply=True, stores it in NodeContext - PipelineExecutor propagates reply_to_message_id in result dict - _run_graph propagates it through return value - ContentGenerationService stores publish_reply + reply_to_message_id in generation_run metadata - PublishService._publish_to_target reads run.metadata for publish_reply flag and passes reply_to kwarg to session.send_message() when set Enables the "Автоответчик" built-in template's reply behavior to work end-to-end. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@claude review. Focus on critical issues: bugs, security vulnerabilities, logical errors, data loss risks, performance problems. Do NOT nitpick style, naming conventions, minor formatting, or subjective preferences — only flag issues that could break functionality or cause real harm in production. |
|
Claude finished @axisrow's task in 2m 59s —— View job Code Review
Remaining Critical Bugs1. Graph-level
|
…tive_publish_mode - Add client_pool parameter to ContentGenerationService.__init__() and wire it into the services dict passed to PipelineExecutor so ReactHandler, ForwardHandler, and DeleteMessageHandler are no longer permanently no-ops - Pass client_pool=self._client_pool from both UnifiedDispatcher call sites - Store effective_publish_mode in run.metadata so dispatcher can detect when a PublishHandler node overrides the pipeline's default publish mode - Update UnifiedDispatcher._handle_pipeline_run() auto-publish gate to use run.metadata["effective_publish_mode"] instead of pipeline.publish_mode Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@claude review. Focus on critical issues: bugs, security vulnerabilities, logical errors, data loss risks, performance problems. Do NOT nitpick style, naming conventions, minor formatting, or subjective preferences — only flag issues that could break functionality or cause real harm in production. |
|
Claude finished @axisrow's task in 2m 45s —— View job Code Review
Remaining Critical Bug
|
…evel publish mode publish_run() was checking pipeline.publish_mode (DB column, always MODERATED for template-created pipelines) instead of the effective mode from run.metadata. Now resolves effective_mode from run.metadata["effective_publish_mode"] with fallback to pipeline.publish_mode.value, so graph pipelines with PublishHandler mode="auto" correctly bypass the moderation gate. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@claude review. Focus on critical issues: bugs, security vulnerabilities, logical errors, data loss risks, performance problems. Do NOT nitpick style, naming conventions, minor formatting, or subjective preferences — only flag issues that could break functionality or cause real harm in production. |
|
Claude finished @axisrow's task in 2m 5s —— View job Code Review
Remaining Critical Bug
|
…lease, send_reaction - ReactHandler, ForwardHandler, DeleteMessageHandler all treated (session, phone) tuple from get_available_client()/get_client_by_phone() as a bare session; now correctly unpacked as session, acquired_phone = result - All three handlers lacked try/finally to release_client(acquired_phone); added to prevent client pool lease leaks - Add send_reaction() to TelegramTransportSession (backends.py) wrapping Telethon SendReactionRequest — ReactHandler now has a valid API to call Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@claude review. Focus on critical issues: bugs, security vulnerabilities, logical errors, data loss risks, performance problems. Do NOT nitpick style, naming conventions, minor formatting, or subjective preferences — only flag issues that could break functionality or cause real harm in production. |
|
Claude finished @axisrow's task in 2m 7s —— View job Code Review
No Remaining Critical BugsThe codebase is clean of critical issues. The only non-critical gap I can identify:
|
Summary
GenerationService._collect_context()now checkssemantic_availableand falls back tosearch_local()(FTS/LIKE) when embeddings are unavailable — no moreRuntimeError: Semantic search is unavailablecrashesPipelineExecutoris only activated whenpipeline_jsonis setChanges
New models (
src/models.py)PipelineNodeType(14 values),PipelineNode,PipelineEdge,PipelineGraphwithto_json()/from_json()round-tripPipelineTemplate;ContentPipelineextended withpipeline_json: PipelineGraph | NoneDB (
src/database/)pipeline_json TEXTcolumn incontent_pipelines, newpipeline_templatestablePipelineTemplatesRepositorywithensure_builtins()seeded on startupPipeline executor (
src/services/pipeline_nodes/,pipeline_executor.py)NodeContextfor inter-node data passingPipelineExecutor.execute()with Kahn's topological sort + condition-based branch stoppingService layer
PipelineService:export_json,import_json,list_templates,create_from_template,edit_via_llmContentGenerationService: routes toPipelineExecutorwhenpipeline_jsonis setWeb UI
GET /templates,POST /from-template,GET /{id}/export,POST /import,POST /{id}/ai-editpipelines.html: import modal, AI-edit accordion per pipeline, JSON/AI buttonspipelines/templates.htmlpage: templates grouped by category with node badgesCLI
pipeline export,pipeline import,pipeline templates,pipeline from-template,pipeline ai-editAgent tools
export_pipeline_json,import_pipeline_json,list_pipeline_templates,create_pipeline_from_template,ai_edit_pipelineTOOL_CATEGORIESand default allow-listTests
tests/test_pipeline_graph.py— 33 tests covering models, topo-sort, NodeContext, all handlers, PipelineExecutor, get_handler registry, builtin templatestests/test_generation_service.py— 2 new tests for semantic fallback behaviorTest plan
ruff check src/ tests/ conftest.py— all checks passedpytest tests/test_pipeline_graph.py tests/test_generation_service.py -v— 33 passedpytest tests/ -m "not aiosqlite_serial" -n auto— 4169 passedpytest tests/ -m aiosqlite_serial— 511 passedCloses #343
Fixes #270
🤖 Generated with Claude Code