-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathhub.py
More file actions
2787 lines (2401 loc) · 121 KB
/
hub.py
File metadata and controls
2787 lines (2401 loc) · 121 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
print("HUB.PY IS EXECUTING!")
import logging
from flask import Flask, render_template, jsonify, request, redirect, url_for, session
import threading
from rich.logging import RichHandler
import os
from waitress import serve
import json # ADD json for pretty printing
import signal # ADD signal for process termination
import sys # For sys.executable in subprocess calls
import atexit # For cleanup on hub exit
import uuid # NEW: For unique approval request IDs
import secrets # For webhook secret tokens
import time # Ensure time is imported
import hashlib # For password hashing
from dotenv import load_dotenv, set_key # Import load_dotenv and set_key
import zipfile # NEW: For handling zip file uploads
import shutil # NEW: For directory operations (e.g., rmtree)
from werkzeug.utils import secure_filename # NEW: For securing uploaded filenames
from pathlib import Path # NEW: For path manipulation
import requests # IMPORTED: Add requests for HTTP communication
import subprocess as _subprocess # For update system git/pip commands
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
import pytz
# Configure logging for the hub
log_file_path = os.path.join(os.path.dirname(__file__), "agent_debug.log")
logging.basicConfig(
level="DEBUG", format="%(message)s", datefmt="[%X]", handlers=[RichHandler(), logging.FileHandler(log_file_path)]
)
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.jinja_env.cache = {}
app.secret_key = os.environ.get("FLASK_SECRET_KEY", secrets.token_hex(32))
# ─── Authentication System ───
AUTH_FILE = os.path.join(os.path.dirname(__file__), "auth.json")
def _load_auth():
if os.path.exists(AUTH_FILE):
try:
with open(AUTH_FILE, "r") as f:
return json.load(f)
except Exception:
pass
return {} # Empty = no account created yet (first run)
def _save_auth(auth_data):
with open(AUTH_FILE, "w") as f:
json.dump(auth_data, f, indent=2)
def _hash_password(password):
return hashlib.sha256(password.encode()).hexdigest()
_auth_config = _load_auth()
def _is_first_run():
"""True if no admin account has been set up yet."""
return not _auth_config.get("username")
# Routes that agents use (no auth required)
AGENT_API_PREFIXES = (
"/register_agent", "/heartbeat", "/submit_experience",
"/receive_user_message", "/webhook/", "/get_pending_messages",
"/api/agent_health", "/static/"
)
@app.before_request
def _check_auth():
path = request.path
# Always allow agent-to-hub API routes
for prefix in AGENT_API_PREFIXES:
if path.startswith(prefix):
return None
# Always allow setup, login, logout, favicon
if path in ("/setup", "/login", "/logout", "/favicon.ico"):
return None
# First run — force setup before anything else
if _is_first_run():
if path.startswith("/api/"):
return jsonify({"status": "error", "message": "Initial setup required. Visit the dashboard to create your admin account."}), 401
return redirect(url_for("setup_page"))
# Auth enabled — check session
if _auth_config.get("enabled", True):
if session.get("authenticated"):
return None
if path.startswith("/api/") or request.is_json:
return jsonify({"status": "error", "message": "Authentication required."}), 401
return redirect(url_for("login_page"))
return None
@app.route("/setup", methods=["GET", "POST"])
def setup_page():
"""First-run setup: create admin username and password."""
if not _is_first_run():
return redirect(url_for("dashboard"))
error = None
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "").strip()
confirm = request.form.get("confirm_password", "").strip()
if not username:
error = "Username is required."
elif not password:
error = "Password is required."
elif len(password) < 4:
error = "Password must be at least 4 characters."
elif password != confirm:
error = "Passwords do not match."
else:
global _auth_config
_auth_config = {
"username": username,
"password_hash": _hash_password(password),
"enabled": True,
"created_at": time.time()
}
_save_auth(_auth_config)
session["authenticated"] = True
logger.info(f"[Auth] Admin account created: {username}")
_log_activity("admin_setup", f"Admin account '{username}' created")
return redirect(url_for("dashboard"))
return render_template("setup.html", error=error)
@app.route("/login", methods=["GET", "POST"])
def login_page():
if _is_first_run():
return redirect(url_for("setup_page"))
error = None
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
if (username == _auth_config.get("username", "") and
_hash_password(password) == _auth_config.get("password_hash", "")):
session["authenticated"] = True
_log_activity("login_success", f"User '{username}' logged in")
return redirect(url_for("dashboard"))
_log_activity("login_failed", f"Failed login attempt for user '{username}'")
error = "Invalid username or password."
return render_template("login.html", error=error)
@app.route("/logout")
def logout():
session.pop("authenticated", None)
return redirect(url_for("login_page"))
@app.route("/api/auth/status", methods=["GET"])
def api_auth_status():
return jsonify({
"status": "success",
"enabled": _auth_config.get("enabled", False),
"username": _auth_config.get("username", ""),
"setup_complete": not _is_first_run()
}), 200
@app.route("/api/auth/set_password", methods=["POST"])
def api_set_password():
global _auth_config
data = request.json or {}
new_password = data.get("password", "").strip()
new_username = data.get("username", "").strip()
enabled = data.get("enabled", _auth_config.get("enabled", True))
if enabled and not new_password and not _auth_config.get("password_hash"):
return jsonify({"status": "error", "message": "Password is required to enable authentication."}), 400
if new_password:
if len(new_password) < 4:
return jsonify({"status": "error", "message": "Password must be at least 4 characters."}), 400
_auth_config["password_hash"] = _hash_password(new_password)
if new_username:
_auth_config["username"] = new_username
_auth_config["enabled"] = bool(enabled)
_save_auth(_auth_config)
session["authenticated"] = True
_log_activity("auth_settings_changed", f"Auth settings updated (enabled={_auth_config['enabled']})")
return jsonify({"status": "success", "enabled": _auth_config["enabled"], "message": "Authentication settings updated."}), 200
hub_memory = {
"experiences": [],
"distilled_tips": [],
"active_agents": {},
"agent_message_queues": {},
"recent_experiences": [],
"MAX_RECENT_EXPERIENCES": 20,
"user_messages": [],
"pending_approvals": {}, # NEW: For handling approvals
"total_token_usage": { # NEW: To track aggregated token usage
"moonshot_ai": {"prompt_tokens": 0, "completion_tokens": 0},
"ollama": {"prompt_tokens": 0, "completion_tokens": 0},
"openrouter": {"prompt_tokens": 0, "completion_tokens": 0},
"voyage_ai": {"prompt_tokens": 0, "completion_tokens": 0},
"huggingface": {"prompt_tokens": 0, "completion_tokens": 0}
},
"per_agent_token_usage": {}, # {"AgentName": {"openrouter": {"prompt_tokens": 0, "completion_tokens": 0}, ...}}
"global_plugin_preferences": {}, # NEW: To store global enable/disable status for plugins
"global_auto_install_deps": True, # NEW: Global setting for auto-installing plugin dependencies
"discovered_plugins": {}, # NEW: To store details of all plugins found in the plugins directory
"scheduled_tasks": [], # Scheduled/cron tasks (persisted to scheduled_tasks.json)
"webhooks": {}, # {"<id>": {"id", "name", "agent_name", "secret", "created_at", "enabled", "last_triggered", "trigger_count"}}
"notifications": [], # [{"id", "type", "message", "timestamp", "read"}] — capped at 50
"agent_templates": [], # Persisted to agent_templates.json
"workflows": [], # Persisted to workflows.json
"workflow_runs": [], # Persisted to workflow_runs.json
"agent_groups": {}, # {"group_name": {"name", "agents": [], "created_at"}} — Persisted to agent_groups.json
"activity_log": [], # [{"id", "event", "details", "timestamp"}] — capped at 500, persisted
"agent_messages": [], # [{id, from_agent, to_agent, message, timestamp}] — agent-to-agent history, capped at 200
"swarm_runs": [] # [{run_id, task, lead_agent, status, agents, sub_results, final_result, created_at, completed_at}]
}
launched_agent_processes = {}
launched_agent_configs = {} # {agent_name: {"cmd": [...], "env": {...}}} — for auto-restart
auto_restart_enabled = True
shutdown_event = threading.Event()
waitress_server = None
def _cleanup_agent_processes():
"""Kill all launched agent processes on hub exit (Ctrl+C, SIGTERM, etc.)."""
for agent_name, pid in list(launched_agent_processes.items()):
try:
os.killpg(pid, signal.SIGTERM)
logger.info(f"Cleanup: Terminated agent '{agent_name}' (PID: {pid})")
except ProcessLookupError:
pass # Already dead
except Exception as e:
logger.error(f"Cleanup: Error killing agent '{agent_name}' (PID: {pid}): {e}")
launched_agent_processes.clear()
atexit.register(_cleanup_agent_processes)
def _signal_handler(signum, frame):
"""Handle SIGINT/SIGTERM to ensure agent cleanup before exit."""
logger.info(f"Received signal {signum}, cleaning up agent processes...")
_cleanup_agent_processes()
sys.exit(0)
signal.signal(signal.SIGINT, _signal_handler)
signal.signal(signal.SIGTERM, _signal_handler)
def _auto_restart_monitor():
"""Background thread: periodically checks if any launched agent processes have died and restarts them."""
while not shutdown_event.is_set():
shutdown_event.wait(30) # Check every 30 seconds
if shutdown_event.is_set() or not auto_restart_enabled:
continue
for agent_name, pid in list(launched_agent_processes.items()):
try:
os.kill(pid, 0) # Check if process is alive (signal 0 = no-op)
except ProcessLookupError:
# Process is dead — try to restart
config = launched_agent_configs.get(agent_name)
if not config:
logger.warning(f"[AutoRestart] Agent '{agent_name}' (PID {pid}) died but no launch config saved — cannot restart.")
launched_agent_processes.pop(agent_name, None)
continue
logger.warning(f"[AutoRestart] Agent '{agent_name}' (PID {pid}) died — restarting...")
try:
cmd = config["cmd"]
env = config["env"]
stdout_path = os.path.join(os.path.dirname(__file__), f"agent_{agent_name}_stdout.log")
stderr_path = os.path.join(os.path.dirname(__file__), f"agent_{agent_name}_stderr.log")
with open(stdout_path, "a") as stdout_f, open(stderr_path, "a") as stderr_f:
with open(os.devnull, 'r') as devnull:
process = subprocess.Popen(cmd, stdin=devnull, stdout=stdout_f, stderr=stderr_f, preexec_fn=os.setsid, env=env)
launched_agent_processes[agent_name] = process.pid
logger.info(f"[AutoRestart] Agent '{agent_name}' restarted (new PID: {process.pid})")
_add_notification("agent_restarted", f"Agent '{agent_name}' crashed and was auto-restarted")
except Exception as e:
logger.error(f"[AutoRestart] Failed to restart agent '{agent_name}': {e}")
launched_agent_processes.pop(agent_name, None)
except PermissionError:
pass # Process exists but we can't signal it — leave it
_auto_restart_thread = threading.Thread(target=_auto_restart_monitor, daemon=True)
_auto_restart_thread.start()
def _discover_all_plugins():
"""
Scans the plugins directory for all available plugins and stores their manifest data.
"""
logger.info("Starting plugin discovery...")
plugins_dir = Path(__file__).parent / "plugins"
if not plugins_dir.is_dir():
logger.info(f"Plugins directory not found at {plugins_dir}. No plugins will be discovered.")
return
logger.info(f"Discovering all plugins in {plugins_dir}...")
for plugin_path in plugins_dir.iterdir():
if plugin_path.is_dir():
manifest_path = plugin_path / "manifest.json"
if manifest_path.is_file():
try:
with open(manifest_path, 'r') as f:
manifest = json.load(f)
plugin_name = manifest.get('name', plugin_path.name)
hub_memory["discovered_plugins"][plugin_name] = {
"name": plugin_name,
"description": manifest.get("description", "No description provided."),
"version": manifest.get("version", "N/A"),
"author": manifest.get("author", "Unknown"),
"path": str(plugin_path),
"status": hub_memory["global_plugin_preferences"].get(plugin_name, "enabled") # Reflect global preference
}
logger.info(f"Discovered plugin: {plugin_name}")
except json.JSONDecodeError:
logger.error(f"Invalid manifest.json in plugin '{plugin_path.name}'. Skipping discovery.")
except Exception as e:
logger.error(f"Error discovering plugin '{plugin_path.name}': {e}", exc_info=True)
logger.info(f"Finished plugin discovery. Found {len(hub_memory['discovered_plugins'])} plugins.")
# Initial plugin discovery when the hub starts
_discover_all_plugins()
# ─── Scheduled Tasks System ───
_scheduler_timezone = os.environ.get("SCHEDULER_TIMEZONE", "UTC")
_scheduler = BackgroundScheduler(timezone=_scheduler_timezone)
_scheduler.start()
atexit.register(lambda: _scheduler.shutdown(wait=False))
SCHEDULED_TASKS_FILE = os.path.join(os.path.dirname(__file__), "scheduled_tasks.json")
def _execute_scheduled_task(task_id, agent_name, task_message):
"""APScheduler callback — drops a message into the agent's queue."""
logger.info(f"[Scheduler] Firing task '{task_id}' for agent '{agent_name}': {task_message[:80]}")
if agent_name not in hub_memory["agent_message_queues"]:
hub_memory["agent_message_queues"][agent_name] = []
hub_memory["agent_message_queues"][agent_name].append({
"sender": "scheduler",
"message": task_message
})
_add_notification("scheduled_task_fired", f"Scheduled task fired for '{agent_name}': {task_message[:60]}")
# Update last_run
for t in hub_memory["scheduled_tasks"]:
if t["id"] == task_id:
t["last_run"] = time.time()
break
_save_scheduled_tasks()
def _save_scheduled_tasks():
"""Persist scheduled tasks to disk."""
try:
with open(SCHEDULED_TASKS_FILE, "w") as f:
json.dump(hub_memory["scheduled_tasks"], f, indent=2)
except Exception as e:
logger.error(f"[Scheduler] Error saving tasks: {e}")
def _register_task_with_scheduler(task):
"""Register a single task dict with APScheduler."""
try:
if task["schedule_type"] == "cron":
tz = os.environ.get("SCHEDULER_TIMEZONE", "UTC")
trigger = CronTrigger(
hour=int(task.get("cron_hour", 9)),
minute=int(task.get("cron_minute", 0)),
day_of_week=task.get("cron_days", "*"),
timezone=tz
)
else:
trigger = IntervalTrigger(seconds=int(task.get("interval_seconds", 3600)))
_scheduler.add_job(
_execute_scheduled_task,
trigger=trigger,
args=[task["id"], task["agent_name"], task["task_message"]],
id=task["id"],
name=f"{task['agent_name']}: {task['task_message'][:50]}",
replace_existing=True
)
except Exception as e:
logger.error(f"[Scheduler] Error registering task '{task['id']}': {e}")
def _load_scheduled_tasks():
"""Load tasks from disk and register them with APScheduler."""
if os.path.exists(SCHEDULED_TASKS_FILE):
try:
with open(SCHEDULED_TASKS_FILE, "r") as f:
hub_memory["scheduled_tasks"] = json.load(f)
for task in hub_memory["scheduled_tasks"]:
if task.get("enabled", True):
_register_task_with_scheduler(task)
logger.info(f"[Scheduler] Loaded {len(hub_memory['scheduled_tasks'])} scheduled task(s) from disk.")
except Exception as e:
logger.error(f"[Scheduler] Error loading tasks: {e}")
_load_scheduled_tasks()
# ─── Notification System ───
MAX_NOTIFICATIONS = 50
NOTIFICATIONS_FILE = os.path.join(os.path.dirname(__file__), "notifications.json")
def _save_notifications():
try:
with open(NOTIFICATIONS_FILE, "w") as f:
json.dump(hub_memory["notifications"], f, indent=2)
except Exception as e:
logger.error(f"[Notifications] Error saving: {e}")
def _load_notifications():
if os.path.exists(NOTIFICATIONS_FILE):
try:
with open(NOTIFICATIONS_FILE, "r") as f:
hub_memory["notifications"] = json.load(f)
logger.info(f"[Notifications] Loaded {len(hub_memory['notifications'])} notification(s) from disk.")
except Exception as e:
logger.error(f"[Notifications] Error loading: {e}")
_load_notifications()
def _add_notification(ntype, message):
"""Add a notification to the feed (capped at MAX_NOTIFICATIONS, persisted to disk)."""
hub_memory["notifications"].insert(0, {
"id": str(uuid.uuid4())[:8],
"type": ntype,
"message": message,
"timestamp": time.time(),
"read": False
})
if len(hub_memory["notifications"]) > MAX_NOTIFICATIONS:
hub_memory["notifications"] = hub_memory["notifications"][:MAX_NOTIFICATIONS]
_save_notifications()
# Also record in the activity log
_log_activity(ntype, message)
# ─── Activity Log System ───
MAX_ACTIVITY_LOG = 500
ACTIVITY_LOG_FILE = os.path.join(os.path.dirname(__file__), "activity_log.json")
def _save_activity_log():
try:
with open(ACTIVITY_LOG_FILE, "w") as f:
json.dump(hub_memory["activity_log"], f, indent=2)
except Exception as e:
logger.error(f"[ActivityLog] Error saving: {e}")
def _load_activity_log():
if os.path.exists(ACTIVITY_LOG_FILE):
try:
with open(ACTIVITY_LOG_FILE, "r") as f:
hub_memory["activity_log"] = json.load(f)
logger.info(f"[ActivityLog] Loaded {len(hub_memory['activity_log'])} entries from disk.")
except Exception as e:
logger.error(f"[ActivityLog] Error loading: {e}")
_load_activity_log()
def _log_activity(event, details=""):
"""Add an entry to the activity log (capped at MAX_ACTIVITY_LOG, persisted)."""
hub_memory["activity_log"].insert(0, {
"id": str(uuid.uuid4())[:8],
"event": event,
"details": details,
"timestamp": time.time()
})
if len(hub_memory["activity_log"]) > MAX_ACTIVITY_LOG:
hub_memory["activity_log"] = hub_memory["activity_log"][:MAX_ACTIVITY_LOG]
_save_activity_log()
# ─── Agent-to-Agent Message History ───
MAX_AGENT_MESSAGES = 200
AGENT_MESSAGES_FILE = os.path.join(os.path.dirname(__file__), "agent_messages.json")
def _save_agent_messages():
try:
with open(AGENT_MESSAGES_FILE, "w") as f:
json.dump(hub_memory["agent_messages"], f, indent=2)
except Exception as e:
logger.error(f"[AgentMessages] Error saving: {e}")
def _load_agent_messages():
if os.path.exists(AGENT_MESSAGES_FILE):
try:
with open(AGENT_MESSAGES_FILE, "r") as f:
hub_memory["agent_messages"] = json.load(f)
logger.info(f"[AgentMessages] Loaded {len(hub_memory['agent_messages'])} message(s) from disk.")
except Exception as e:
logger.error(f"[AgentMessages] Error loading: {e}")
_load_agent_messages()
def _record_agent_message(from_agent, to_agent, message):
"""Record an agent-to-agent message in the history (capped, persisted)."""
hub_memory["agent_messages"].insert(0, {
"id": str(uuid.uuid4())[:8],
"from_agent": from_agent,
"to_agent": to_agent,
"message": message,
"timestamp": time.time()
})
if len(hub_memory["agent_messages"]) > MAX_AGENT_MESSAGES:
hub_memory["agent_messages"] = hub_memory["agent_messages"][:MAX_AGENT_MESSAGES]
_save_agent_messages()
# ─── Swarm System ───
SWARM_RUNS_FILE = os.path.join(os.path.dirname(__file__), "swarm_runs.json")
def _save_swarm_runs():
try:
with open(SWARM_RUNS_FILE, "w") as f:
json.dump(hub_memory["swarm_runs"], f, indent=2)
except Exception as e:
logger.error(f"[Swarm] Error saving: {e}")
def _load_swarm_runs():
if os.path.exists(SWARM_RUNS_FILE):
try:
with open(SWARM_RUNS_FILE, "r") as f:
hub_memory["swarm_runs"] = json.load(f)
logger.info(f"[Swarm] Loaded {len(hub_memory['swarm_runs'])} swarm run(s) from disk.")
except Exception as e:
logger.error(f"[Swarm] Error loading: {e}")
_load_swarm_runs()
def _check_swarm_completion(agent_name, message):
"""Check if an agent's response contributes to an active swarm run."""
for run in hub_memory["swarm_runs"]:
if run["status"] != "running":
continue
# Check if this agent is a participant (not the lead) in this swarm
if agent_name in run.get("agents", []) and agent_name != run["lead_agent"]:
# Record this sub-result
run["sub_results"].append({
"agent": agent_name,
"result": message,
"timestamp": time.time()
})
_save_swarm_runs()
logger.info(f"[Swarm] Sub-result from '{agent_name}' for swarm '{run['run_id']}' ({len(run['sub_results'])}/{len(run['agents'])-1} results)")
# Check if we have enough results (all non-lead agents responded)
non_lead_agents = [a for a in run["agents"] if a != run["lead_agent"]]
responded_agents = set(r["agent"] for r in run["sub_results"])
if responded_agents >= set(non_lead_agents):
# All sub-agents responded — send aggregation task to lead
logger.info(f"[Swarm] All sub-results collected for swarm '{run['run_id']}'. Sending aggregation to lead '{run['lead_agent']}'.")
results_summary = "\n".join([f"- {r['agent']}: {r['result'][:500]}" for r in run["sub_results"]])
agg_message = f"[SWARM_AGGREGATE:{run['run_id']}] You are the lead agent. Your team has completed their sub-tasks. Synthesize these results into a final comprehensive answer.\n\nOriginal task: {run['task']}\n\nSub-agent results:\n{results_summary}\n\nProvide a synthesized final answer."
if run["lead_agent"] not in hub_memory["agent_message_queues"]:
hub_memory["agent_message_queues"][run["lead_agent"]] = []
hub_memory["agent_message_queues"][run["lead_agent"]].append({
"sender": "swarm",
"message": agg_message
})
run["status"] = "aggregating"
_save_swarm_runs()
_add_notification("swarm_aggregating", f"Swarm '{run['run_id']}': all sub-results collected, lead agent synthesizing")
return # Only match the first relevant swarm run
# Check if the lead agent is sending the final aggregated result
if agent_name == run["lead_agent"] and run["status"] == "aggregating":
run["final_result"] = message
run["status"] = "completed"
run["completed_at"] = time.time()
_save_swarm_runs()
_add_notification("swarm_completed", f"Swarm '{run['run_id']}' completed: {message[:80]}")
logger.info(f"[Swarm] Swarm '{run['run_id']}' completed with final result.")
return
# ─── Webhook System ───
WEBHOOKS_FILE = os.path.join(os.path.dirname(__file__), "webhooks.json")
def _save_webhooks():
try:
with open(WEBHOOKS_FILE, "w") as f:
json.dump(hub_memory["webhooks"], f, indent=2)
except Exception as e:
logger.error(f"[Webhooks] Error saving: {e}")
def _load_webhooks():
if os.path.exists(WEBHOOKS_FILE):
try:
with open(WEBHOOKS_FILE, "r") as f:
hub_memory["webhooks"] = json.load(f)
logger.info(f"[Webhooks] Loaded {len(hub_memory['webhooks'])} webhook(s) from disk.")
except Exception as e:
logger.error(f"[Webhooks] Error loading: {e}")
_load_webhooks()
# ─── Agent Groups System ───
AGENT_GROUPS_FILE = os.path.join(os.path.dirname(__file__), "agent_groups.json")
def _save_agent_groups():
try:
with open(AGENT_GROUPS_FILE, "w") as f:
json.dump(hub_memory["agent_groups"], f, indent=2)
except Exception as e:
logger.error(f"[Groups] Error saving: {e}")
def _load_agent_groups():
if os.path.exists(AGENT_GROUPS_FILE):
try:
with open(AGENT_GROUPS_FILE, "r") as f:
hub_memory["agent_groups"] = json.load(f)
logger.info(f"[Groups] Loaded {len(hub_memory['agent_groups'])} group(s) from disk.")
except Exception as e:
logger.error(f"[Groups] Error loading: {e}")
_load_agent_groups()
# ─── Agent Templates System ───
AGENT_TEMPLATES_FILE = os.path.join(os.path.dirname(__file__), "agent_templates.json")
def _save_agent_templates():
try:
with open(AGENT_TEMPLATES_FILE, "w") as f:
json.dump(hub_memory["agent_templates"], f, indent=2)
except Exception as e:
logger.error(f"[Templates] Error saving: {e}")
def _load_agent_templates():
if os.path.exists(AGENT_TEMPLATES_FILE):
try:
with open(AGENT_TEMPLATES_FILE, "r") as f:
hub_memory["agent_templates"] = json.load(f)
logger.info(f"[Templates] Loaded {len(hub_memory['agent_templates'])} template(s) from disk.")
except Exception as e:
logger.error(f"[Templates] Error loading: {e}")
_load_agent_templates()
# ─── Workflow Engine ───
WORKFLOWS_FILE = os.path.join(os.path.dirname(__file__), "workflows.json")
WORKFLOW_RUNS_FILE = os.path.join(os.path.dirname(__file__), "workflow_runs.json")
def _save_workflows():
try:
with open(WORKFLOWS_FILE, "w") as f:
json.dump(hub_memory["workflows"], f, indent=2)
except Exception as e:
logger.error(f"[Workflows] Error saving: {e}")
def _load_workflows():
if os.path.exists(WORKFLOWS_FILE):
try:
with open(WORKFLOWS_FILE, "r") as f:
hub_memory["workflows"] = json.load(f)
logger.info(f"[Workflows] Loaded {len(hub_memory['workflows'])} workflow(s) from disk.")
except Exception as e:
logger.error(f"[Workflows] Error loading: {e}")
def _save_workflow_runs():
try:
with open(WORKFLOW_RUNS_FILE, "w") as f:
json.dump(hub_memory["workflow_runs"], f, indent=2)
except Exception as e:
logger.error(f"[Workflow Runs] Error saving: {e}")
def _load_workflow_runs():
if os.path.exists(WORKFLOW_RUNS_FILE):
try:
with open(WORKFLOW_RUNS_FILE, "r") as f:
hub_memory["workflow_runs"] = json.load(f)
logger.info(f"[Workflow Runs] Loaded {len(hub_memory['workflow_runs'])} run(s) from disk.")
except Exception as e:
logger.error(f"[Workflow Runs] Error loading: {e}")
_load_workflows()
_load_workflow_runs()
import re as _re
def _resolve_template(template_str, trigger_input, step_results):
"""Replace {{trigger_input}} and {{stepX.output}} with actual values."""
result = template_str.replace("{{trigger_input}}", str(trigger_input or ""))
# Replace {{stepX.output}} patterns
def _step_replacer(match):
step_id = match.group(1)
if step_id in step_results and step_results[step_id].get("output"):
return str(step_results[step_id]["output"])
return match.group(0) # Leave placeholder if no output yet
result = _re.sub(r"\{\{(\w+)\.output\}\}", _step_replacer, result)
return result
def _advance_workflow(run_id):
"""Advance a workflow run to its next step."""
run = None
for r in hub_memory["workflow_runs"]:
if r["run_id"] == run_id:
run = r
break
if not run or run["status"] not in ("running", "paused"):
return
# Find workflow definition
workflow = None
for w in hub_memory["workflows"]:
if w["id"] == run["workflow_id"]:
workflow = w
break
if not workflow:
run["status"] = "failed"
run["completed_at"] = time.time()
_save_workflow_runs()
_add_notification("workflow_failed", f"Workflow '{run['workflow_name']}' failed — workflow definition not found")
return
current_step_id = run["current_step_id"]
# Find current step definition
current_step_def = None
for s in workflow["steps"]:
if s["id"] == current_step_id:
current_step_def = s
break
if not current_step_def:
run["status"] = "failed"
run["completed_at"] = time.time()
_save_workflow_runs()
_add_notification("workflow_failed", f"Workflow '{run['workflow_name']}' failed — step '{current_step_id}' not found")
return
step_result = run["step_results"].get(current_step_id, {})
step_status = step_result.get("status", "pending")
if step_status == "completed":
next_step_id = current_step_def.get("next_step")
if not next_step_id:
# Workflow is done
run["status"] = "completed"
run["completed_at"] = time.time()
_save_workflow_runs()
_add_notification("workflow_completed", f"Workflow '{run['workflow_name']}' completed successfully")
return
# Find next step definition
next_step_def = None
for s in workflow["steps"]:
if s["id"] == next_step_id:
next_step_def = s
break
if not next_step_def:
run["status"] = "failed"
run["completed_at"] = time.time()
_save_workflow_runs()
_add_notification("workflow_failed", f"Workflow '{run['workflow_name']}' failed — next step '{next_step_id}' not found")
return
# Check if next step needs approval before running
if next_step_def.get("wait_for_approval"):
run["status"] = "paused"
run["current_step_id"] = next_step_id
run["step_results"][next_step_id] = {
"status": "waiting_approval",
"agent_name": next_step_def["agent_name"],
"started_at": None,
"completed_at": None,
"output": None
}
_save_workflow_runs()
_add_notification("workflow_paused", f"Workflow '{run['workflow_name']}' paused — awaiting approval for step '{next_step_def['name']}'")
return
# Execute next step
_execute_workflow_step(run, workflow, next_step_def)
elif step_status == "failed":
on_failure = current_step_def.get("on_failure")
if on_failure:
# Jump to the on_failure step
fail_step_def = None
for s in workflow["steps"]:
if s["id"] == on_failure:
fail_step_def = s
break
if fail_step_def:
_execute_workflow_step(run, workflow, fail_step_def)
return
# No recovery — workflow failed
run["status"] = "failed"
run["completed_at"] = time.time()
_save_workflow_runs()
_add_notification("workflow_failed", f"Workflow '{run['workflow_name']}' failed at step '{current_step_def['name']}'")
def _execute_workflow_step(run, workflow, step_def):
"""Send a task to the agent for a workflow step."""
agent_name = step_def["agent_name"]
step_id = step_def["id"]
# Check if agent is active
if agent_name not in hub_memory["active_agents"]:
run["status"] = "failed"
run["current_step_id"] = step_id
run["step_results"][step_id] = {
"status": "failed",
"agent_name": agent_name,
"started_at": time.time(),
"completed_at": time.time(),
"output": f"Agent '{agent_name}' is not running"
}
run["completed_at"] = time.time()
_save_workflow_runs()
_add_notification("workflow_failed", f"Workflow '{run['workflow_name']}' failed — agent '{agent_name}' is not running")
return
# Resolve template variables
task_message = _resolve_template(step_def.get("task_template", ""), run.get("trigger_input", ""), run.get("step_results", {}))
# Update run state
run["status"] = "running"
run["current_step_id"] = step_id
run["step_results"][step_id] = {
"status": "running",
"agent_name": agent_name,
"started_at": time.time(),
"completed_at": None,
"output": None
}
_save_workflow_runs()
# Send task to agent queue
if agent_name not in hub_memory["agent_message_queues"]:
hub_memory["agent_message_queues"][agent_name] = []
hub_memory["agent_message_queues"][agent_name].append({
"sender": "workflow",
"message": task_message
})
logger.info(f"[Workflow] Sent task to agent '{agent_name}' for step '{step_def['name']}' in workflow '{run['workflow_name']}'")
def _check_workflow_step_completion(agent_name, message):
"""Check if an agent's message completes a workflow step."""
for run in hub_memory["workflow_runs"]:
if run["status"] != "running":
continue
step_id = run.get("current_step_id")
if not step_id:
continue
step_result = run["step_results"].get(step_id, {})
if step_result.get("status") == "running" and step_result.get("agent_name") == agent_name:
# This agent's output completes this workflow step
step_result["status"] = "completed"
step_result["completed_at"] = time.time()
step_result["output"] = message
_save_workflow_runs()
step_name = step_id
# Get step name from workflow def
for w in hub_memory["workflows"]:
if w["id"] == run["workflow_id"]:
for s in w["steps"]:
if s["id"] == step_id:
step_name = s["name"]
break
break
_add_notification("workflow_step_completed", f"Workflow '{run['workflow_name']}': step '{step_name}' completed by agent '{agent_name}'")
_advance_workflow(run["run_id"])
return # Only match first active run for this agent
def _check_workflow_step_failure(agent_name):
"""Check if an agent's failure affects a workflow step."""
for run in hub_memory["workflow_runs"]:
if run["status"] != "running":
continue
step_id = run.get("current_step_id")
if not step_id:
continue
step_result = run["step_results"].get(step_id, {})
if step_result.get("status") == "running" and step_result.get("agent_name") == agent_name:
step_result["status"] = "failed"
step_result["completed_at"] = time.time()
_save_workflow_runs()
_advance_workflow(run["run_id"])
return
# -------------------- Routes --------------------
@app.route("/")
def index():
return redirect(url_for("dashboard"))
@app.route("/dashboard")
def dashboard():
# Convert timestamps to human-readable format for display
display_agents = {}
for name, details in hub_memory["active_agents"].items():
display_agents[name] = {
"url": details["url"],
"last_heartbeat": time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(details["last_heartbeat"]))
}
return render_template("dashboard.html",
active_agents=display_agents, # Use display_agents for dashboard
user_messages=hub_memory["user_messages"],
distilled_tips=hub_memory["distilled_tips"],
recent_experiences=hub_memory["recent_experiences"],
pending_approvals=hub_memory["pending_approvals"],
discovered_plugins=hub_memory["discovered_plugins"])
@app.route("/register_agent", methods=["POST"])
def register_agent():
data = request.json
agent_name = data.get("name")
agent_url = data.get("url")
if agent_name and agent_url:
hub_memory["active_agents"][agent_name] = {"url": agent_url, "last_heartbeat": time.time(), "registered_at": time.time()}
hub_memory["agent_message_queues"][agent_name] = [] # Initialize message queue for new agent
logger.info(f"Agent '{agent_name}' registered with URL: {agent_url}")
return jsonify({"status": "success", "message": f"Agent {agent_name} registered."}), 200
return jsonify({"status": "error", "message": "Invalid agent registration data."}), 400
@app.route("/api/heartbeat/<agent_name>", methods=["POST"])
def agent_heartbeat(agent_name):
if agent_name in hub_memory["active_agents"]:
data = request.json
hub_memory["active_agents"][agent_name]["last_heartbeat"] = time.time()
# NEW: Store plugin data from heartbeat
plugins_data = data.get("plugins")
if plugins_data:
hub_memory["active_agents"][agent_name]["plugins"] = plugins_data
# Initialize global plugin preferences for newly seen plugins
for plugin_name, manifest in plugins_data.get("manifests", {}).items():
if plugin_name not in hub_memory["global_plugin_preferences"]:
hub_memory["global_plugin_preferences"][plugin_name] = "enabled" # Default to enabled
logger.info(f"Discovered new plugin '{plugin_name}'. Defaulting to 'enabled'.")
# NEW: Store Ollama models from heartbeat
ollama_models = data.get("ollama_models")
if ollama_models:
hub_memory["active_agents"][agent_name]["ollama_models"] = ollama_models
logger.debug(f"Received Ollama models for agent '{agent_name}': {ollama_models}")
logger.debug(f"Received heartbeat from agent '{agent_name}'.")
return jsonify({"status": "success", "message": f"Heartbeat received for {agent_name}."}), 200
return jsonify({"status": "error", "message": f"Agent {agent_name} not found."}), 404
@app.route("/get_active_agents", methods=["GET"])
def get_active_agents():
# Optionally, clean up inactive agents here based on last_heartbeat
return jsonify(hub_memory["active_agents"]), 200
@app.route("/submit_experience", methods=["POST"])
def submit_experience():
experiences = request.json
for exp in experiences:
hub_memory["experiences"].append(exp)
hub_memory["recent_experiences"].append(exp)
if len(hub_memory["recent_experiences"]) > hub_memory["MAX_RECENT_EXPERIENCES"]:
hub_memory["recent_experiences"].pop(0) # Keep only the most recent experiences
# Aggregate token usage if present in the experience
if "token_usage" in exp and exp["token_usage"]:
token_data = exp["token_usage"]
agent_name = exp.get("agent_name", "unknown")
# Ensure per-agent bucket exists
if agent_name not in hub_memory["per_agent_token_usage"]:
hub_memory["per_agent_token_usage"][agent_name] = {}
def _add_tokens(provider, prompt_tokens, completion_tokens):
"""Add tokens to both global and per-agent tracking."""
# Global
if provider not in hub_memory["total_token_usage"]:
hub_memory["total_token_usage"][provider] = {"prompt_tokens": 0, "completion_tokens": 0}
hub_memory["total_token_usage"][provider]["prompt_tokens"] += prompt_tokens
hub_memory["total_token_usage"][provider]["completion_tokens"] += completion_tokens
# Per-agent
agent_bucket = hub_memory["per_agent_token_usage"][agent_name]
if provider not in agent_bucket:
agent_bucket[provider] = {"prompt_tokens": 0, "completion_tokens": 0}
agent_bucket[provider]["prompt_tokens"] += prompt_tokens
agent_bucket[provider]["completion_tokens"] += completion_tokens
# Handle flat format: {"provider": "ollama", "prompt_tokens": 10, ...}
if "provider" in token_data and token_data["provider"] != "none":
_add_tokens(token_data["provider"], token_data.get("prompt_tokens", 0), token_data.get("completion_tokens", 0))
logger.debug(f"Aggregated token usage for {token_data['provider']} (agent={agent_name})")
else:
# Handle nested format: {"ollama": {"prompt_tokens": 10, ...}, "openrouter": {...}}
for provider, usage in token_data.items():
if isinstance(usage, dict) and (usage.get("prompt_tokens", 0) > 0 or usage.get("completion_tokens", 0) > 0):
_add_tokens(provider, usage.get("prompt_tokens", 0), usage.get("completion_tokens", 0))
logger.debug(f"Aggregated token usage for {provider} (agent={agent_name})")
# Notify on errors
step_result = exp.get("step_result", {})
if step_result.get("status") == "failed":
exp_agent = exp.get("agent_name", "unknown")
_add_notification("task_error", f"Agent '{exp_agent}' encountered an error")