From 025e497d2ae2356c4c2f59b10ceffcd42c008ace Mon Sep 17 00:00:00 2001 From: bux Date: Mon, 4 May 2026 21:48:20 +0000 Subject: [PATCH] agent: agency-report + SQLite suggestion DB + dedupe --- agent/agency-report | 195 +++++++++++++++++++++++++++++++++ agent/agency_db.py | 245 ++++++++++++++++++++++++++++++++++++++++++ agent/bootstrap.sh | 12 ++- agent/telegram_bot.py | 13 +++ 4 files changed, 462 insertions(+), 3 deletions(-) create mode 100755 agent/agency-report create mode 100644 agent/agency_db.py diff --git a/agent/agency-report b/agent/agency-report new file mode 100755 index 0000000..461a735 --- /dev/null +++ b/agent/agency-report @@ -0,0 +1,195 @@ +#!/usr/bin/env python3 +"""agency-report — record + post an Agency suggestion to Telegram. + +Always: + 1. records the suggestion in /var/lib/bux/agency.db + 2. posts the body to TG with inline-keyboard buttons (default 4) + 3. wires the message_id back into the row so a button tap can record + the user's decision against the right suggestion. + +Default buttons: ✅ Yes, do it · ❌ No · ✏️ Just do it differently · 🔄 Regenerate + +Custom buttons: pass --button (repeatable) to override the set. + +Dedupe: pass --source + --skip-if-exists to suppress if the same +source already has a row that is not 'pending'. Returns 0 silently when +suppressed (status of the prior row is printed to stderr). + +Usage: + agency-report --title "Hassan HIPAA send" \\ + --description "Saved Gmail draft 19df…1d sitting unsent." \\ + --importance high \\ + --source gmail-draft-19df00477868154d \\ + --prompt "Open Gmail draft 19df…, send it, post to #wall-king-magnus." \\ + --skip-if-exists + + agency-report --title "Pick a draft" \\ + --description "Three drafts ready for the Hassan thread." \\ + --button "Send draft A" --button "Send draft B" --button "Send draft C" +""" +from __future__ import annotations + +import argparse +import json +import os +import sys +from pathlib import Path + +REPO_AGENT = Path(__file__).resolve().parent +sys.path.insert(0, str(REPO_AGENT)) + +import agency_db # noqa: E402 + +import urllib.request # noqa: E402 + +DEFAULT_BUTTONS = [ + "✅ Yes, do it", + "❌ No", + "✏️ Just do it differently", + "🔄 Regenerate", +] + + +def _read_kv(path: Path) -> dict: + out: dict[str, str] = {} + try: + for line in path.read_text().splitlines(): + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + k, v = line.split("=", 1) + out[k.strip()] = v.strip().strip('"').strip("'") + except FileNotFoundError: + pass + return out + + +def bot_token() -> str: + tok = os.environ.get("TG_BOT_TOKEN") + if tok: + return tok + tok = _read_kv(Path("/etc/bux/tg.env")).get("TG_BOT_TOKEN") + if not tok: + sys.exit("agency-report: TG_BOT_TOKEN missing (env or /etc/bux/tg.env)") + return tok + + +def chat_id() -> int: + raw = Path("/etc/bux/tg-allowed.txt").read_text().splitlines() + for line in raw: + line = line.strip() + if line: + return int(line) + sys.exit("agency-report: no bound chat (run /start in TG first)") + + +def send_with_buttons( + *, token: str, chat: int, thread: int, text: str, buttons: list[str] +) -> int: + keyboard = [ + [{"text": label, "callback_data": f"agcy:{thread}:{i}"}] + for i, label in enumerate(buttons) + ] + body: dict = { + "chat_id": chat, + "text": text, + "reply_markup": {"inline_keyboard": keyboard}, + } + if thread > 0: + body["message_thread_id"] = thread + req = urllib.request.Request( + f"https://api.telegram.org/bot{token}/sendMessage", + data=json.dumps(body).encode("utf-8"), + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=15) as r: + resp = json.loads(r.read()) + if not resp.get("ok"): + sys.exit(f"agency-report: sendMessage failed: {resp}") + return int(resp["result"]["message_id"]) + + +def main() -> int: + p = argparse.ArgumentParser(description="Record + post an Agency suggestion to Telegram.") + p.add_argument("--title", required=True, help="Short scannable headline.") + p.add_argument("--description", required=True, help="One-paragraph body.") + p.add_argument( + "--importance", + choices=("high", "med", "low"), + default="med", + help="Priority bucket for triage. Default: med.", + ) + p.add_argument( + "--source", + help="Stable slug for dedupe (e.g. slack-c-minerva, gmail-thread-19df, gh-pr-78).", + ) + p.add_argument( + "--prompt", + help="Exact action that runs if user taps yes — agent will see this in the lane.", + ) + p.add_argument( + "--button", + action="append", + default=None, + help="Custom button label. Repeatable. If omitted, uses the default 4-button set.", + ) + p.add_argument( + "--thread-id", + type=int, + default=int(os.environ.get("TG_THREAD_ID", "0")), + help="TG forum thread to post into. Defaults to $TG_THREAD_ID or 0 (general).", + ) + p.add_argument( + "--skip-if-exists", + action="store_true", + help="If a suggestion with this --source already exists and isn't pending, " + "skip posting (exit 0).", + ) + args = p.parse_args() + + buttons = args.button or DEFAULT_BUTTONS + + db = agency_db.conn() + + if args.skip_if_exists and args.source: + prior = agency_db.exists(db, args.source) + if prior and prior.get("status") != "pending": + print( + f"agency-report: source={args.source!r} already exists " + f"(id={prior['id']}, status={prior['status']}). Skipping.", + file=sys.stderr, + ) + return 0 + + sugg_id = agency_db.insert( + db, + title=args.title, + description=args.description, + importance=args.importance, + source=args.source, + prompt=args.prompt, + buttons=buttons, + chat_id=chat_id(), + thread_id=args.thread_id, + ) + + body_lines = [f"🎫 #{sugg_id} · {args.title}", "", args.description] + if args.prompt: + body_lines += ["", "If yes I'll run:", "```", args.prompt, "```"] + body = "\n".join(body_lines) + + msg_id = send_with_buttons( + token=bot_token(), + chat=chat_id(), + thread=args.thread_id, + text=body, + buttons=buttons, + ) + agency_db.update_message(db, sugg_id, msg_id) + print(sugg_id) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/agent/agency_db.py b/agent/agency_db.py new file mode 100644 index 0000000..0ead3db --- /dev/null +++ b/agent/agency_db.py @@ -0,0 +1,245 @@ +"""Agency DB — persistent SQLite store for every suggestion the Agency loop +posts to Telegram, plus the user's decision (yes/no/different/regenerate/…) +and any worker topic where the resulting work runs. + +Why: Magnus wants every Agency suggestion deduped, tracked, and persistent. +If he never responded to a topic, future agency runs should suppress it. +The schema is generalizable — `buttons_json` stores whichever label set +was offered, `decision` records the literal label tapped, so the same +table works for the default 4 buttons and for ad-hoc custom sets like +"Send draft A / Send draft B / Send draft C". + +Stored at /var/lib/bux/agency.db (created on first use, owned by `bux`). +This is a small, self-contained module — no migrations framework, no ORM, +no abstraction layer. Just a few helpers. + +Public surface: + conn() -> sqlite3.Connection (init + return) + init_schema(conn) + insert(...) -> int # suggestion id + update_message(suggestion_id, message_id) + record_decision(chat_id, message_id, decision, decision_at) + set_worker_topic(suggestion_id, worker_topic_id) + set_status(suggestion_id, status, completed_at=None) + exists(source) -> dict | None # latest row for a given source + search(query, limit=10) -> [row...] # fuzzy LIKE-search by title/desc +""" +from __future__ import annotations + +import json +import os +import sqlite3 +import time +from pathlib import Path +from typing import Any + +DB_PATH = Path(os.environ.get("BUX_AGENCY_DB", "/var/lib/bux/agency.db")) + + +def conn() -> sqlite3.Connection: + DB_PATH.parent.mkdir(parents=True, exist_ok=True) + db = sqlite3.connect(str(DB_PATH)) + db.row_factory = sqlite3.Row + db.execute("PRAGMA foreign_keys = ON") + db.execute("PRAGMA journal_mode = WAL") + init_schema(db) + return db + + +def init_schema(db: sqlite3.Connection) -> None: + db.executescript( + """ + CREATE TABLE IF NOT EXISTS suggestions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + title TEXT NOT NULL, + description TEXT NOT NULL, + importance TEXT CHECK (importance IN ('high','med','low')) DEFAULT 'med', + source TEXT, -- e.g. slack-c-minerva, gmail-thread-19df, gh-pr-78 + prompt TEXT, -- the action that would run if user says yes + buttons_json TEXT, -- JSON list of the labels shown + tg_chat_id INTEGER, + tg_thread_id INTEGER, + tg_message_id INTEGER, + status TEXT CHECK (status IN + ('pending','accepted','dismissed','differently', + 'regenerated','expired','completed','failed')) + DEFAULT 'pending', + decision TEXT, -- the literal label tapped + decision_at INTEGER, + worker_topic_id INTEGER, -- TG topic where the resulting agent runs + worker_started_at INTEGER, + worker_completed_at INTEGER, + created_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)), + updated_at INTEGER NOT NULL DEFAULT (CAST(strftime('%s','now') AS INTEGER)) + ); + CREATE INDEX IF NOT EXISTS idx_sugg_status ON suggestions(status); + CREATE INDEX IF NOT EXISTS idx_sugg_source ON suggestions(source); + CREATE INDEX IF NOT EXISTS idx_sugg_created ON suggestions(created_at); + CREATE INDEX IF NOT EXISTS idx_sugg_msg ON suggestions(tg_chat_id, tg_message_id); + CREATE INDEX IF NOT EXISTS idx_sugg_worker_topic ON suggestions(worker_topic_id); + """ + ) + db.commit() + + +def _now() -> int: + return int(time.time()) + + +def insert( + db: sqlite3.Connection, + *, + title: str, + description: str, + importance: str = "med", + source: str | None = None, + prompt: str | None = None, + buttons: list[str] | None = None, + chat_id: int | None = None, + thread_id: int | None = None, +) -> int: + cur = db.execute( + """ + INSERT INTO suggestions ( + title, description, importance, source, prompt, buttons_json, + tg_chat_id, tg_thread_id + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + title, + description, + importance, + source, + prompt, + json.dumps(buttons) if buttons is not None else None, + chat_id, + thread_id, + ), + ) + db.commit() + return int(cur.lastrowid) + + +def update_message(db: sqlite3.Connection, suggestion_id: int, message_id: int) -> None: + db.execute( + "UPDATE suggestions SET tg_message_id = ?, updated_at = ? WHERE id = ?", + (message_id, _now(), suggestion_id), + ) + db.commit() + + +def find_by_message( + db: sqlite3.Connection, chat_id: int, message_id: int +) -> sqlite3.Row | None: + cur = db.execute( + "SELECT * FROM suggestions WHERE tg_chat_id = ? AND tg_message_id = ? LIMIT 1", + (chat_id, message_id), + ) + return cur.fetchone() + + +def record_decision( + db: sqlite3.Connection, + chat_id: int, + message_id: int, + decision: str, +) -> int | None: + """Idempotent: locate the row by (chat_id, message_id), set the decision + + derive a status from the label. Returns the suggestion id, or None if + no row matched (out-of-band button or message not stored).""" + row = find_by_message(db, chat_id, message_id) + if row is None: + return None + low = decision.lower() + if any(w in low for w in ("yes", "do it", "ship", "send", "merge", "approve")): + status = "accepted" + elif any(w in low for w in ("regen", "redo", "rethink")): + status = "regenerated" + elif any(w in low for w in ("different", "differently")): + status = "differently" + elif "no" in low or "skip" in low or "don't" in low or "ignore" in low: + status = "dismissed" + else: + status = "accepted" # custom labels like "Send draft A" → treat as accept + db.execute( + """ + UPDATE suggestions + SET decision = ?, decision_at = ?, status = ?, updated_at = ? + WHERE id = ? + """, + (decision, _now(), status, _now(), row["id"]), + ) + db.commit() + return int(row["id"]) + + +def set_worker_topic( + db: sqlite3.Connection, suggestion_id: int, worker_topic_id: int +) -> None: + db.execute( + """ + UPDATE suggestions + SET worker_topic_id = ?, worker_started_at = COALESCE(worker_started_at, ?), updated_at = ? + WHERE id = ? + """, + (worker_topic_id, _now(), _now(), suggestion_id), + ) + db.commit() + + +def set_status( + db: sqlite3.Connection, + suggestion_id: int, + status: str, + completed_at: int | None = None, +) -> None: + db.execute( + """ + UPDATE suggestions + SET status = ?, worker_completed_at = COALESCE(?, worker_completed_at), updated_at = ? + WHERE id = ? + """, + (status, completed_at, _now(), suggestion_id), + ) + db.commit() + + +def exists(db: sqlite3.Connection, source: str) -> dict[str, Any] | None: + cur = db.execute( + "SELECT * FROM suggestions WHERE source = ? ORDER BY id DESC LIMIT 1", + (source,), + ) + row = cur.fetchone() + return dict(row) if row else None + + +def search( + db: sqlite3.Connection, query: str, limit: int = 10 +) -> list[dict[str, Any]]: + """Fuzzy LIKE-search across title + description. Lower-cases both.""" + q = f"%{query.lower()}%" + cur = db.execute( + """ + SELECT * FROM suggestions + WHERE LOWER(title) LIKE ? OR LOWER(description) LIKE ? + ORDER BY created_at DESC + LIMIT ? + """, + (q, q, limit), + ) + return [dict(r) for r in cur.fetchall()] + + +def list_recent( + db: sqlite3.Connection, status: str | None = None, limit: int = 20 +) -> list[dict[str, Any]]: + if status: + cur = db.execute( + "SELECT * FROM suggestions WHERE status = ? ORDER BY id DESC LIMIT ?", + (status, limit), + ) + else: + cur = db.execute( + "SELECT * FROM suggestions ORDER BY id DESC LIMIT ?", (limit,) + ) + return [dict(r) for r in cur.fetchall()] diff --git a/agent/bootstrap.sh b/agent/bootstrap.sh index 16ef211..fa69370 100755 --- a/agent/bootstrap.sh +++ b/agent/bootstrap.sh @@ -91,9 +91,15 @@ fi # agent/ after a box has already been provisioned never get linked into # /usr/local/bin without a re-bootstrap. Re-assert here on every update so # the symlinks track agent/ as new helpers ship. Idempotent (ln -sfn). -ln -sfn "$REPO_DIR/agent/tg-send" /usr/local/bin/tg-send -ln -sfn "$REPO_DIR/agent/tg-buttons" /usr/local/bin/tg-buttons -ln -sfn "$REPO_DIR/agent/bux-restart" /usr/local/bin/bux-restart +ln -sfn "$REPO_DIR/agent/tg-send" /usr/local/bin/tg-send +ln -sfn "$REPO_DIR/agent/tg-buttons" /usr/local/bin/tg-buttons +ln -sfn "$REPO_DIR/agent/agency-report" /usr/local/bin/agency-report +ln -sfn "$REPO_DIR/agent/bux-restart" /usr/local/bin/bux-restart + +# Agency DB lives at /var/lib/bux/agency.db (created by agency_db on +# first use). Make sure the directory is writable by `bux` so any +# agency-report invocation can init the schema without sudo. +install -d -o bux -g bux -m 0755 /var/lib/bux # --- Cloud Composio MCP server (cloud-side proxy) ------------------------- # Why MCP at all: cloud holds the platform's Composio API key plus every diff --git a/agent/telegram_bot.py b/agent/telegram_bot.py index c6ad904..863ba8f 100644 --- a/agent/telegram_bot.py +++ b/agent/telegram_bot.py @@ -5773,6 +5773,19 @@ def _handle_agency_callback(self, cb: dict, data: str) -> None: ) except Exception: LOG.exception("agency keyboard mark failed") + # Record the decision against the suggestion row in agency.db so + # future Agency runs can dedupe + suppress repeat suggestions + # Magnus already responded to. Best-effort: a missing row (button + # posted out-of-band, e.g. the legacy tg-buttons helper) just + # no-ops. + try: + import agency_db + db = agency_db.conn() + agency_db.record_decision( + db, chat_id, msg.get("message_id"), label + ) + except Exception: + LOG.exception("agency_db record_decision failed") who = sender.get("username") or sender.get("first_name") or sender.get("id") # Dispatch the choice into the lane as a synthesized user # message. The agent resumes the lane session (UUID kept), sees