From 91e45776af3476bb4b89a3292b5aa3e9c2d14e64 Mon Sep 17 00:00:00 2001 From: Aaron Bockelie Date: Mon, 1 Jun 2026 17:40:03 -0500 Subject: [PATCH 1/2] chore(adr-102): purge superseded admin CLIs + dead reset path (P6b) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove the legacy backup/restore admin CLIs that the ADR-102 API+worker spine supersedes, plus the dead database-reset path: - Delete api/admin/{backup,restore,stitch}.py — superseded by the streaming /admin/backup route + restore_worker; zero automated callers (only standalone python -m api.admin.* entry points with stale src.admin.* docstrings). - Delete api/lib/restitching.py (legacy ConceptMatcher) — its only importers were stitch.py + restore.py. The live integration-mode engine is api/app/lib/concept_matcher.py (the canonical two-tier port). - Delete api/admin/reset.py + admin_service.reset_database + the ResetRequest/ResetResponse/SchemaValidation models. Reset was one of the earliest worker functions and is dangerous in the current architecture; the API reset path was already retired (route removed) and the only importer of ResetManager was the dead reset_database method. Database reset is now an operator-level concern, not an API/worker operation. - Drop the dead skipped test for the removed /admin/reset endpoint. - Fix stale doc/comment references to the removed files. Keep api/admin/{prune,check_integrity}.py — adjacent operator utilities not superseded by the backup/restore spine (they use age_ops + integrity, both live). No legacy backup/restore compatibility is retained from before this cycle. ~2032 lines removed. All ADR-102 + admin/security test slices green. --- api/admin/backup.py | 366 ------------------ api/admin/reset.py | 551 ---------------------------- api/admin/restore.py | 398 -------------------- api/admin/stitch.py | 296 --------------- api/app/lib/concept_matcher.py | 7 +- api/app/models/admin.py | 30 +- api/app/routes/admin.py | 6 +- api/app/services/admin_service.py | 47 --- api/lib/restitching.py | 336 ----------------- tests/api/test_endpoint_security.py | 7 - 10 files changed, 12 insertions(+), 2032 deletions(-) delete mode 100755 api/admin/backup.py delete mode 100644 api/admin/reset.py delete mode 100755 api/admin/restore.py delete mode 100755 api/admin/stitch.py delete mode 100644 api/lib/restitching.py diff --git a/api/admin/backup.py b/api/admin/backup.py deleted file mode 100755 index fc9c2bb83..000000000 --- a/api/admin/backup.py +++ /dev/null @@ -1,366 +0,0 @@ -#!/usr/bin/env python3 -""" -Backup CLI - Interactive backup tool for knowledge graph data - -Provides menu-driven interface to backup entire database or specific ontologies. -Exports data to portable JSON format with full embeddings preserved. - -Usage: - python -m src.admin.backup - python -m src.admin.backup --auto-full - python -m src.admin.backup --ontology "My Ontology" --output custom_backup.json -""" - -import argparse -import json -import sys -from pathlib import Path -from datetime import datetime -from typing import Optional, List - -# Add project root to path -sys.path.insert(0, str(Path(__file__).parent.parent.parent)) - -from api.lib.console import Console, Colors -from api.lib.config import Config -from api.lib.age_ops import AGEConnection, AGEQueries -from api.lib.serialization import DataExporter, KgBackupV2Reader - - -def _external_concepts(backup_data) -> set: - """Cross-ontology concept references in the backup (kg-backup/2 model). - - Replaces the v1-shape BackupAssessment external-dependency scan with the shared - reader method. - """ - return KgBackupV2Reader(backup_data).external_concept_ids() - - -class BackupCLI: - """Interactive backup CLI""" - - def __init__(self, backup_dir: str = "backups"): - self.backup_dir = Path(backup_dir) - self.backup_dir.mkdir(exist_ok=True) - self.conn = AGEConnection() - - def run_interactive(self): - """Run interactive backup menu""" - Console.section("Knowledge Graph System - Backup") - - # Test connection - if not self.conn.test_connection(): - Console.error("✗ Cannot connect to Apache AGE database") - Console.warning(f" Check connection: {Config.postgres_host()}:{Config.postgres_port()}") - Console.warning(" Start database with: docker-compose up -d") - sys.exit(1) - - Console.success("✓ Connected to Apache AGE") - - # Get ontology list - client = self.conn.get_client() - ontologies = AGEQueries.get_ontology_list(client) - - if not ontologies: - Console.error("✗ No ontologies found in database") - sys.exit(1) - - Console.info(f"Found {len(ontologies)} ontologies\n") - - # Show menu - self._show_menu(ontologies) - - def _show_menu(self, ontologies: List[dict]): - """Display backup options menu""" - Console.bold("Backup Options:") - print(" 1) Full database backup (all ontologies)") - print(" 2) Specific ontology backup") - print("") - - choice = input("Select option [1-2]: ").strip() - - if choice == "1": - self._backup_full(ontologies) - elif choice == "2": - self._backup_ontology(ontologies) - else: - Console.error("Invalid option") - sys.exit(1) - - def _backup_full(self, ontologies: List[dict]): - """Full database backup""" - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - backup_file = self.backup_dir / f"full_backup_{timestamp}.json" - - Console.section("Full Database Backup") - - # Show what will be backed up - Console.warning("The following will be backed up:") - for ont in ontologies: - print(f" • {ont['ontology']} ({ont['file_count']} files, {ont['concept_count']} concepts)") - - # Show statistics - client = self.conn.get_client() - stats = AGEQueries.get_database_stats(client) - - Console.warning("\nDatabase totals:") - Console.key_value(" Concepts", str(stats['nodes'].get('concepts', 0)), Colors.BOLD, Colors.OKGREEN) - Console.key_value(" Sources", str(stats['nodes'].get('sources', 0)), Colors.BOLD, Colors.OKGREEN) - Console.key_value(" Instances", str(stats['nodes'].get('instances', 0)), Colors.BOLD, Colors.OKGREEN) - Console.key_value(" Relationships", str(stats['relationships'].get('total', 0)), Colors.BOLD, Colors.OKGREEN) - - Console.warning(f"\nBackup destination: {backup_file}") - print("") - - if not Console.confirm("Press Enter to start backup (Ctrl+C to cancel)"): - Console.warning("Backup cancelled") - sys.exit(0) - - # Export - Console.info("\nExporting database...") - client = self.conn.get_client() - backup_data = DataExporter.export_kg_backup_v2(client) - - # Write to file - Console.info(f"Writing to {backup_file}...") - with open(backup_file, 'w') as f: - json.dump(backup_data, f, indent=2) - - # Show summary - file_size = backup_file.stat().st_size - size_mb = file_size / (1024 * 1024) - - counts = KgBackupV2Reader(backup_data).counts() - Console.section("Backup Complete") - Console.success(f"✓ Full backup created: {backup_file}") - Console.info(f" Size: {size_mb:.2f} MB") - Console.info(f" Concepts: {counts['concepts']}") - Console.info(f" Sources: {counts['sources']}") - Console.info(f" Instances: {counts['instances']}") - Console.info(f" Relationships: {counts['relationships']}") - - self._show_tips() - - def _backup_ontology(self, ontologies: List[dict]): - """Ontology-specific backup""" - Console.section("Select Ontologies to Backup") - - # Show ontologies - for ont in ontologies: - print(f" • {ont['ontology']} ({ont['file_count']} files, {ont['concept_count']} concepts)") - - Console.warning("\nEnter ontology names to backup (comma-separated):") - Console.warning("Or type 'all' to backup all ontologies separately") - ontology_input = input("> ").strip() - - if ontology_input.lower() == "all": - self._backup_all_ontologies_separately(ontologies) - else: - self._backup_specific_ontologies(ontology_input, ontologies) - - def _backup_all_ontologies_separately(self, ontologies: List[dict]): - """Backup each ontology to separate files""" - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - - Console.warning(f"\nWill create separate backups for {len(ontologies)} ontologies") - if not Console.confirm("Press Enter to continue (Ctrl+C to cancel)"): - Console.warning("Backup cancelled") - sys.exit(0) - - backup_files = [] - for i, ont_info in enumerate(ontologies, 1): - ontology = ont_info['ontology'] - safe_name = ontology.replace(' ', '_').replace('/', '_').lower() - backup_file = self.backup_dir / f"ontology_{safe_name}_{timestamp}.json" - - Console.info(f"\n[{i}/{len(ontologies)}] Backing up: {ontology}") - - client = self.conn.get_client() - backup_data = DataExporter.export_kg_backup_v2(client, ontology=ontology) - - with open(backup_file, 'w') as f: - json.dump(backup_data, f, indent=2) - - file_size = backup_file.stat().st_size - size_mb = file_size / (1024 * 1024) - Console.success(f"✓ Backed up to: {backup_file} ({size_mb:.2f} MB)") - backup_files.append(backup_file) - - # Summary - Console.section("Backup Complete") - Console.success(f"✓ Created {len(backup_files)} ontology backups") - for f in backup_files: - print(f" • {f.name}") - - self._show_tips() - - def _backup_specific_ontologies(self, ontology_input: str, ontologies: List[dict]): - """Backup specific ontologies""" - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - ontology_names = [name.strip() for name in ontology_input.split(',')] - - # Validate - available_names = {ont['ontology'] for ont in ontologies} - invalid = [name for name in ontology_names if name not in available_names] - if invalid: - Console.error(f"✗ Ontologies not found: {', '.join(invalid)}") - sys.exit(1) - - backup_files = [] - for i, ontology in enumerate(ontology_names, 1): - safe_name = ontology.replace(' ', '_').replace('/', '_').lower() - backup_file = self.backup_dir / f"ontology_{safe_name}_{timestamp}.json" - - # Show info - ont_info = next(ont for ont in ontologies if ont['ontology'] == ontology) - Console.info(f"\n[{i}/{len(ontology_names)}] Backing up: {ontology}") - Console.info(f" Files: {ont_info['file_count']}") - Console.info(f" Concepts: {ont_info['concept_count']}") - Console.info(f" Instances: {ont_info['instance_count']}") - Console.info(f" Destination: {backup_file}") - - # Export - client = self.conn.get_client() - backup_data = DataExporter.export_kg_backup_v2(client, ontology=ontology) - - # Assess external dependencies (cross-ontology concept references) - external = _external_concepts(backup_data) - - # Write - with open(backup_file, 'w') as f: - json.dump(backup_data, f, indent=2) - - file_size = backup_file.stat().st_size - size_mb = file_size / (1024 * 1024) - Console.success(f"✓ Backed up ({size_mb:.2f} MB)") - - # Show warning if external dependencies exist - if external: - Console.warning(f" ⚠ {len(external)} references to external concepts") - - backup_files.append(backup_file) - - # Summary - Console.section("Backup Complete") - Console.success(f"✓ Created {len(backup_files)} backups") - - # Check if any backup has external deps - has_external_deps = False - for f in backup_files: - with open(f, 'r') as bf: - backup_data = json.load(bf) - if _external_concepts(backup_data): - has_external_deps = True - print(f" • {f.name} {Colors.WARNING}(has external dependencies){Colors.ENDC}") - else: - print(f" • {f.name}") - - self._show_tips(show_stitch_warning=has_external_deps) - - def _show_tips(self, show_stitch_warning: bool = False): - """Show helpful tips""" - Console.warning("\n💡 Tips:") - print(" • Backup files are portable - share or move them as needed") - print(" • Use restore.py to restore ontologies into this or another database") - print(" • Ontology backups can be selectively restored without affecting other data") - print(" • Backups include full embeddings (1536-dim vectors) and all text") - - if show_stitch_warning: - Console.info("\nℹ External Dependencies Detected:") - print(" This backup has cross-ontology relationships (external concept references)") - print(" After restoring, you can optionally reconnect them using the semantic stitcher:") - print(f" {Colors.OKCYAN}python -m src.admin.stitch --backup {Colors.ENDC}") - print(" Or leave them isolated if you prefer strict ontology boundaries") - - def backup_non_interactive(self, ontology: Optional[str] = None, output: Optional[str] = None): - """Non-interactive backup for automation""" - if not self.conn.test_connection(): - Console.error("✗ Cannot connect to Apache AGE database") - sys.exit(1) - - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - client = self.conn.get_client() - - if ontology: - # Ontology backup - if output: - backup_file = Path(output) - else: - safe_name = ontology.replace(' ', '_').replace('/', '_').lower() - backup_file = self.backup_dir / f"ontology_{safe_name}_{timestamp}.json" - - Console.info(f"Backing up ontology: {ontology}") - backup_data = DataExporter.export_kg_backup_v2(client, ontology=ontology) - - else: - # Full backup - if output: - backup_file = Path(output) - else: - backup_file = self.backup_dir / f"full_backup_{timestamp}.json" - - Console.info("Backing up full database") - backup_data = DataExporter.export_kg_backup_v2(client) - - # Write - with open(backup_file, 'w') as f: - json.dump(backup_data, f, indent=2) - - file_size = backup_file.stat().st_size - size_mb = file_size / (1024 * 1024) - Console.success(f"✓ Backup complete: {backup_file} ({size_mb:.2f} MB)") - - def close(self): - """Cleanup""" - self.conn.close() - - -def main(): - parser = argparse.ArgumentParser( - description="Backup knowledge graph data to portable JSON format", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - # Interactive menu - python -m src.admin.backup - - # Non-interactive full backup - python -m src.admin.backup --auto-full - - # Backup specific ontology - python -m src.admin.backup --ontology "My Ontology" - - # Custom output file - python -m src.admin.backup --ontology "My Ontology" --output my_backup.json - """ - ) - - parser.add_argument('--auto-full', action='store_true', - help='Automatically backup full database (non-interactive)') - parser.add_argument('--ontology', type=str, - help='Backup specific ontology (non-interactive)') - parser.add_argument('--output', type=str, - help='Custom output file path') - parser.add_argument('--backup-dir', type=str, default='backups', - help='Backup directory (default: backups/)') - - args = parser.parse_args() - - cli = BackupCLI(backup_dir=args.backup_dir) - - try: - if args.auto_full or args.ontology: - # Non-interactive mode - cli.backup_non_interactive( - ontology=args.ontology, - output=args.output - ) - else: - # Interactive mode - cli.run_interactive() - finally: - cli.close() - - -if __name__ == '__main__': - main() diff --git a/api/admin/reset.py b/api/admin/reset.py deleted file mode 100644 index 663ddd833..000000000 --- a/api/admin/reset.py +++ /dev/null @@ -1,551 +0,0 @@ -#!/usr/bin/env python3 -""" -Reset CLI - Database reset tool for knowledge graph system - -Provides nuclear reset option to completely wipe and reinitialize the database. -Includes schema verification and optional cleanup of logs and checkpoints. - -Usage: - python -m src.admin.reset - python -m src.admin.reset --auto-confirm - python -m src.admin.reset --no-logs --no-checkpoints -""" - -import argparse -import subprocess -import sys -import time -import os -import glob -from pathlib import Path -from typing import Dict, Any - -# Add project root to path -sys.path.insert(0, str(Path(__file__).parent.parent.parent)) - -from api.lib.console import Console, Colors -from api.lib.config import Config -from api.lib.age_ops import AGEConnection - - -class ResetManager: - """Database reset manager""" - - def __init__(self, project_root: Path = None): - self.project_root = project_root or Path(__file__).parent.parent.parent - self.postgres_user = Config.postgres_user() - self.postgres_db = Config.postgres_db() - self.container_name = "knowledge-graph-postgres" - - def run_interactive(self): - """Run interactive reset with confirmation""" - Console.section("Knowledge Graph System - Database Reset") - - Console.error("⚠️ WARNING: This will delete ALL graph data!") - Console.warning("\nThis operation will:") - print(" • Stop the PostgreSQL container") - print(" • Delete all database volumes (complete data loss)") - print(" • Reinitialize with fresh schema") - print(" • Optionally clear logs and checkpoints") - print("") - - if not Console.confirm("Are you sure you want to continue? Type 'yes' to confirm: ", exact="yes"): - Console.warning("Reset cancelled") - sys.exit(0) - - # Ask about clearing logs/checkpoints - clear_logs = Console.confirm("\nClear ingestion log files? [Y/n]: ") - clear_checkpoints = Console.confirm("Clear checkpoint files? [Y/n]: ") - - # Execute reset - result = self.reset( - clear_logs=clear_logs, - clear_checkpoints=clear_checkpoints, - verbose=True - ) - - # Show results - Console.section("Reset Complete") - - if result["success"]: - Console.success("✅ Database reset successfully") - - validation = result["validation"] - Console.info("\nSchema Validation:") - Console.key_value(" AGE Graph", "✓" if validation["graph_exists"] else "✗", - Colors.BOLD, Colors.OKGREEN if validation["graph_exists"] else Colors.FAIL) - Console.key_value(" PG Schemas", f"{validation['schema_count']}/3 (kg_api, kg_auth, kg_logs)", - Colors.BOLD, Colors.OKGREEN if validation["schema_count"] == 3 else Colors.WARNING) - Console.key_value(" Core Tables", f"{validation['table_count']}/5 (users, api_keys, roles, jobs, sessions)", - Colors.BOLD, Colors.OKGREEN if validation["table_count"] == 5 else Colors.WARNING) - Console.key_value(" Node count", str(validation["node_count"]), - Colors.BOLD, Colors.OKGREEN if validation["node_count"] == 0 else Colors.WARNING) - Console.key_value(" Graph test", "✓" if validation["schema_test_passed"] else "✗", - Colors.BOLD, Colors.OKGREEN if validation["schema_test_passed"] else Colors.FAIL) - - Console.warning("\n💡 Database is now empty and ready for fresh data") - Console.info(" Use 'kg ingest' to add documents") - - else: - Console.error("✗ Reset failed") - Console.error(f" {result['error']}") - sys.exit(1) - - def reset( - self, - clear_logs: bool = True, - clear_checkpoints: bool = True, - verbose: bool = False - ) -> Dict[str, Any]: - """ - Execute database reset - - Args: - clear_logs: Clear ingestion log files - clear_checkpoints: Clear checkpoint files - verbose: Show detailed progress messages - - Returns: - Dict with success status, validation results, and any errors - """ - - try: - # Step 1: Stop and remove PostgreSQL container with volumes - if verbose: - Console.info("\n🛑 Stopping PostgreSQL container...") - - result = subprocess.run( - ["docker-compose", "down", "-v"], - cwd=self.project_root, - capture_output=True, - text=True - ) - - if result.returncode != 0: - return {"success": False, "error": "Failed to stop PostgreSQL container"} - - # Step 2: Remove data volumes explicitly - if verbose: - Console.info("🗑️ Removing database volumes...") - - volumes = [ - "knowledge-graph-system_postgres_data", - "knowledge-graph-system_postgres_import", - ] - - for volume in volumes: - subprocess.run( - ["docker", "volume", "rm", volume], - capture_output=True, - text=True - ) - # Ignore errors if volume doesn't exist - - # Step 3: Start fresh PostgreSQL container - if verbose: - Console.info("🚀 Starting fresh PostgreSQL container...") - - result = subprocess.run( - ["docker-compose", "up", "-d"], - cwd=self.project_root, - capture_output=True, - text=True - ) - - if result.returncode != 0: - return { - "success": False, - "error": f"Failed to start PostgreSQL container: {result.stderr}" - } - - # Step 4: Wait for PostgreSQL initialization to complete - # Note: docker-entrypoint-initdb.d automatically runs schema files - # (01_init_age.sql and 02_multi_schema.sql) on fresh volumes - if verbose: - Console.info("⏳ Waiting for PostgreSQL initialization to complete...") - - max_attempts = 45 # Increased from 30 to allow time for schema initialization - for attempt in range(max_attempts): - time.sleep(2) - - # Check if PostgreSQL is accepting connections - result = subprocess.run( - [ - "docker", "exec", self.container_name, - "psql", "-U", self.postgres_user, "-d", self.postgres_db, - "-c", "SELECT 1" - ], - capture_output=True, - text=True - ) - - if result.returncode == 0: - # PostgreSQL is up, now check if AGE graph is initialized - graph_check = subprocess.run( - [ - "docker", "exec", self.container_name, - "psql", "-U", self.postgres_user, "-d", self.postgres_db, - "-t", "-c", "SELECT name FROM ag_catalog.ag_graph WHERE name = 'knowledge_graph'" - ], - capture_output=True, - text=True - ) - - if "knowledge_graph" in graph_check.stdout: - if verbose: - Console.success(f"✓ PostgreSQL initialization complete (took {(attempt + 1) * 2}s)") - break - - if verbose and attempt % 5 == 0: - print(".", end="", flush=True) - - if attempt == max_attempts - 1: - return { - "success": False, - "error": "PostgreSQL initialization failed to complete within timeout" - } - - # Step 5: Wait for schema initialization to fully complete - # Check that schema_migrations table exists (created by baseline migration) - if verbose: - Console.info("⏳ Waiting for schema initialization to complete...") - - max_schema_wait = 30 # 30 attempts = 60 seconds max - schema_ready = False - for attempt in range(max_schema_wait): - time.sleep(2) - - # Check if schema_migrations table exists - result = subprocess.run( - [ - "docker", "exec", self.container_name, - "psql", "-U", self.postgres_user, "-d", self.postgres_db, - "-t", "-A", "-c", - "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'schema_migrations')" - ], - capture_output=True, - text=True - ) - - if result.returncode == 0 and "t" in result.stdout.lower(): - schema_ready = True - if verbose: - Console.success(f"✓ Schema initialized (took {(attempt + 1) * 2}s)") - break - - if verbose and attempt % 3 == 0: - print(".", end="", flush=True) - - if not schema_ready: - return { - "success": False, - "error": "Schema initialization timeout: schema_migrations table not created" - } - - # Step 6: Apply database migrations (ADR-040) - if verbose: - Console.info("📦 Applying database migrations...") - - migration_result = self._apply_migrations(verbose=verbose) - if not migration_result["success"]: - return { - "success": False, - "error": f"Failed to apply migrations: {migration_result['error']}" - } - - if verbose: - Console.success(f"✓ Applied {migration_result['applied_count']} migration(s)") - - # Step 7: Graph already empty after volume removal - # Note: We do NOT need to manually clear the graph here because: - # - docker-compose down -v removed all volumes (full cold restart) - # - Database was recreated from scratch with fresh volumes - # - Migrations just ran on a brand new empty database - # - Graph is already empty except for system vocabulary from migrations - # - # The old code here tried to clear user nodes with invalid openCypher syntax: - # WHERE n:Concept OR n:Source (invalid - can't use :Label in WHERE) - # - # If we ever need "soft reset" (clear data but keep DB running), use: - # MATCH (n) - # WHERE 'Concept' IN labels(n) OR 'Source' IN labels(n) ... - # DETACH DELETE n - - # Step 8: Clear log files - if clear_logs: - if verbose: - Console.info("🧹 Clearing log files...") - - log_dir = self.project_root / "logs" - if log_dir.exists(): - log_count = 0 - for log_file in glob.glob(str(log_dir / "ingest_*.log")): - try: - Path(log_file).unlink() - log_count += 1 - except Exception: - pass # Ignore errors - - if verbose and log_count > 0: - Console.success(f"✓ Cleared {log_count} log file(s)") - - # Step 9: Clear checkpoint files - if clear_checkpoints: - if verbose: - Console.info("🧹 Clearing checkpoint files...") - - checkpoint_dir = self.project_root / ".checkpoints" - if checkpoint_dir.exists(): - checkpoint_count = 0 - for checkpoint_file in glob.glob(str(checkpoint_dir / "*.json")): - try: - Path(checkpoint_file).unlink() - checkpoint_count += 1 - except Exception: - pass # Ignore errors - - if verbose and checkpoint_count > 0: - Console.success(f"✓ Cleared {checkpoint_count} checkpoint file(s)") - - # Step 10: Verify schema - if verbose: - Console.info("✅ Verifying schema...") - - validation = self._verify_schema() - - return { - "success": True, - "validation": validation, - "error": None - } - - except Exception as e: - return { - "success": False, - "error": str(e), - "validation": None - } - - def _apply_migrations(self, verbose: bool = False) -> Dict[str, Any]: - """ - Apply database migrations from schema/migrations/ directory (ADR-040). - - This is a Python-based migrator that works identically to migrate-db.sh: - - Idempotent: checks schema_migrations table - - Ordered: applies migrations in numeric order (001, 002, 003...) - - Atomic: stops on first failure - - Returns: - Dict with success status, applied_count, and any errors - """ - migrations_dir = self.project_root / "schema" / "migrations" - - if not migrations_dir.exists(): - return {"success": False, "error": "Migrations directory not found", "applied_count": 0} - - # Get currently applied migrations from database - result = subprocess.run( - [ - "docker", "exec", self.container_name, - "psql", "-U", self.postgres_user, "-d", self.postgres_db, - "-t", "-A", "-c", - "SELECT version FROM public.schema_migrations ORDER BY version" - ], - capture_output=True, - text=True - ) - - applied_versions = set() - if result.returncode == 0 and result.stdout.strip(): - applied_versions = set(result.stdout.strip().split('\n')) - - # Find pending migrations - pending_migrations = [] - for migration_file in sorted(migrations_dir.glob("*.sql")): - # Extract version from filename (001_baseline.sql → 001) - filename = migration_file.name - if filename == "README.md": - continue - - version = filename.split('_')[0] - - # Skip if already applied - if version in applied_versions: - continue - - pending_migrations.append((version, migration_file)) - - if not pending_migrations: - return {"success": True, "applied_count": 0} - - # Apply each pending migration - applied_count = 0 - for version, migration_file in pending_migrations: - if verbose: - print(f" → Applying migration {version}...", end=" ", flush=True) - - # Apply migration - with open(migration_file, 'r') as f: - result = subprocess.run( - [ - "docker", "exec", "-i", self.container_name, - "psql", "-U", self.postgres_user, "-d", self.postgres_db - ], - stdin=f, - capture_output=True, - text=True - ) - - if result.returncode != 0 or "ERROR" in result.stderr: - error_msg = result.stderr if result.stderr else result.stdout - return { - "success": False, - "error": f"Migration {version} failed: {error_msg}", - "applied_count": applied_count - } - - applied_count += 1 - if verbose: - print("✓") - - return {"success": True, "applied_count": applied_count} - - def _verify_schema(self) -> Dict[str, Any]: - """Verify schema was created correctly after reset (PostgreSQL + AGE)""" - - # Check that knowledge_graph exists in AGE - result = subprocess.run( - [ - "docker", "exec", self.container_name, - "psql", "-U", self.postgres_user, "-d", self.postgres_db, - "-t", "-c", "SELECT name FROM ag_catalog.ag_graph WHERE name = 'knowledge_graph'" - ], - capture_output=True, - text=True - ) - graph_exists = "knowledge_graph" in result.stdout.strip() - - # Check that schemas exist (PostgreSQL multi-schema architecture per ADR-024) - result = subprocess.run( - [ - "docker", "exec", self.container_name, - "psql", "-U", self.postgres_user, "-d", self.postgres_db, - "-t", "-c", - "SELECT COUNT(*) FROM information_schema.schemata WHERE schema_name IN ('kg_api', 'kg_auth', 'kg_logs')" - ], - capture_output=True, - text=True - ) - schema_count = int(result.stdout.strip() or "0") - - # Check that critical tables exist in their schemas - result = subprocess.run( - [ - "docker", "exec", self.container_name, - "psql", "-U", self.postgres_user, "-d", self.postgres_db, - "-t", "-c", - """SELECT COUNT(*) FROM information_schema.tables - WHERE (table_schema = 'kg_auth' AND table_name IN ('users', 'api_keys', 'roles')) - OR (table_schema = 'kg_api' AND table_name IN ('ingestion_jobs', 'sessions'))""" - ], - capture_output=True, - text=True - ) - table_count = int(result.stdout.strip() or "0") - - # Check graph node count (should be 0 after reset) - node_count = 0 - try: - conn = AGEConnection() - client = conn.get_client() - results = client._execute_cypher("MATCH (n) RETURN count(n) as node_count", fetch_one=True) - node_count = int(list(results.values())[0]) if results else 0 - conn.close() - except Exception: - node_count = 0 - - # Schema test: Try creating and deleting a test concept (verifies AGE graph works) - schema_test_passed = False - try: - conn = AGEConnection() - client = conn.get_client() - # Use params to properly convert lists to JSON - client._execute_cypher( - "CREATE (c:Concept {concept_id: $cid, label: $label, embedding: $emb, search_terms: $terms})", - params={ - "cid": "test_schema", - "label": "Test", - "emb": [0.1], - "terms": [] - } - ) - client._execute_cypher( - "MATCH (c:Concept {concept_id: $cid}) DELETE c", - params={"cid": "test_schema"} - ) - schema_test_passed = True - conn.close() - except Exception: - schema_test_passed = False - - return { - "graph_exists": graph_exists, - "schema_count": schema_count, # PostgreSQL schemas (kg_api, kg_auth, kg_logs) - "table_count": table_count, # Critical tables in those schemas - "node_count": node_count, - "schema_test_passed": schema_test_passed, - } - - -def main(): - parser = argparse.ArgumentParser( - description="Reset knowledge graph database (DESTRUCTIVE)", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - # Interactive mode (with confirmation) - python -m src.admin.reset - - # Auto-confirm (for scripts) - python -m src.admin.reset --auto-confirm - - # Skip clearing logs and checkpoints - python -m src.admin.reset --auto-confirm --no-logs --no-checkpoints - """ - ) - - parser.add_argument('--auto-confirm', action='store_true', - help='Skip confirmation prompt (DANGEROUS)') - parser.add_argument('--no-logs', action='store_true', - help='Do not clear log files') - parser.add_argument('--no-checkpoints', action='store_true', - help='Do not clear checkpoint files') - - args = parser.parse_args() - - manager = ResetManager() - - if args.auto_confirm: - # Non-interactive mode - Console.section("Knowledge Graph System - Database Reset") - Console.warning("⚠️ Auto-confirm mode - proceeding without confirmation") - - result = manager.reset( - clear_logs=not args.no_logs, - clear_checkpoints=not args.no_checkpoints, - verbose=True - ) - - if result["success"]: - Console.success("\n✅ Reset complete") - sys.exit(0) - else: - Console.error(f"\n✗ Reset failed: {result['error']}") - sys.exit(1) - else: - # Interactive mode with confirmation - manager.run_interactive() - - -if __name__ == '__main__': - main() diff --git a/api/admin/restore.py b/api/admin/restore.py deleted file mode 100755 index 1f5c99346..000000000 --- a/api/admin/restore.py +++ /dev/null @@ -1,398 +0,0 @@ -#!/usr/bin/env python3 -""" -Restore CLI - Interactive restore tool for knowledge graph data - -Provides menu-driven interface to restore backups into database. -Supports full or selective restore with conflict resolution options. - -Usage: - python -m src.admin.restore - python -m src.admin.restore --file backups/full_backup_20251006.json - python -m src.admin.restore --file backups/ontology_watts_20251006.json --overwrite -""" - -import argparse -import json -import sys -from pathlib import Path -from typing import Optional, List, Dict, Any - -# Add project root to path -sys.path.insert(0, str(Path(__file__).parent.parent.parent)) - -from api.lib.console import Console, Colors -from api.lib.config import Config -from api.lib.age_ops import AGEConnection, AGEQueries -from api.lib.serialization import DataImporter, KgBackupV2Reader -from api.lib.integrity import DatabaseIntegrity -from api.lib.restitching import ConceptMatcher - - -class RestoreCLI: - """Interactive restore CLI""" - - def __init__(self, backup_dir: str = "backups"): - self.backup_dir = Path(backup_dir) - self.conn = AGEConnection() - - def run_interactive(self): - """Run interactive restore menu""" - Console.section("Knowledge Graph System - Restore") - - # Test connection - if not self.conn.test_connection(): - Console.error("✗ Cannot connect to Apache AGE database") - Console.warning(f" Check connection: {Config.postgres_host()}:{Config.postgres_port()}") - Console.warning(" Start database with: docker-compose up -d") - sys.exit(1) - - Console.success("✓ Connected to Apache AGE") - - # Find backup files - if not self.backup_dir.exists(): - Console.error(f"✗ Backup directory not found: {self.backup_dir}") - sys.exit(1) - - backup_files = sorted(self.backup_dir.glob("*.json"), key=lambda f: f.stat().st_mtime, reverse=True) - - if not backup_files: - Console.error(f"✗ No backup files found in {self.backup_dir}") - sys.exit(1) - - Console.info(f"Found {len(backup_files)} backup files\n") - - # Show menu - self._show_menu(backup_files) - - def _show_menu(self, backup_files: List[Path]): - """Display restore options menu""" - Console.bold("Available Backups:") - for i, backup_file in enumerate(backup_files[:10], 1): # Show latest 10 - file_size = backup_file.stat().st_size / (1024 * 1024) - print(f" {i}. {backup_file.name} ({file_size:.2f} MB)") - - if len(backup_files) > 10: - Console.warning(f"\n ... and {len(backup_files) - 10} more") - - print("") - choice = input("Select backup to restore [1-10] or path to file: ").strip() - - # Parse choice - if choice.isdigit() and 1 <= int(choice) <= min(10, len(backup_files)): - backup_file = backup_files[int(choice) - 1] - else: - backup_file = Path(choice) - if not backup_file.exists(): - Console.error(f"✗ File not found: {backup_file}") - sys.exit(1) - - self._restore_backup(backup_file) - - def _restore_backup(self, backup_file: Path, overwrite: bool = False): - """Restore a backup file""" - Console.section(f"Restoring: {backup_file.name}") - - # Load and validate backup - Console.info("Loading backup file...") - try: - with open(backup_file, 'r') as f: - backup_data = json.load(f) - - DataImporter.validate_backup(backup_data) - except Exception as e: - Console.error(f"✗ Invalid backup file: {e}") - sys.exit(1) - - # Assess external dependencies before restore (single kg-backup/2 model). - # Format validity was already gated by validate_backup above. - Console.info("\nAnalyzing backup...") - external_concepts = KgBackupV2Reader(backup_data).external_concept_ids() - if external_concepts: - Console.warning(f" ⚠ {len(external_concepts)} cross-ontology concept references") - - # Handle external dependencies - MUST choose stitch or prune - restitch_action = None - if external_concepts: - ext_count = len(external_concepts) - - # Check if target database is empty (nothing to stitch to) - client = self.conn.get_client() - result = client._execute_cypher("MATCH (c:Concept) RETURN count(c) as count", fetch_one=True) - existing_concepts = int(str(result.get("count", 0))) if result else 0 - - if existing_concepts == 0: - # Clean database - stitching is impossible, auto-prune - Console.info(f"\n✓ Target database is empty ({ext_count} external references detected)") - Console.info(" No existing concepts to stitch to - will auto-prune to keep ontology isolated") - restitch_action = "prune" - else: - # Database has concepts - offer stitching options - Console.warning(f"\n⚠ This backup has {ext_count} external concept dependencies") - Console.warning(" ALL external references MUST be handled to maintain graph integrity") - Console.warning(" Choose how to handle them:") - print("") - print(" 1) Auto-prune after restore (keep isolated)") - print(" 2) Stitch later (defer - WARNING: graph will be broken!)") - print(" 3) Cancel restore") - print("") - choice = input("Select option [1-3]: ").strip() - - if choice == "1": - restitch_action = "prune" - Console.info(f"\n✓ Will prune {ext_count} dangling relationships after restore") - elif choice == "2": - restitch_action = "defer" - Console.error("\n⚠ DANGER: Graph will have dangling refs until you fix it!") - Console.warning(" You MUST run ONE of these immediately after restore:") - Console.info(f" python -m src.admin.stitch --backup {backup_file}") - Console.info(f" python -m src.admin.prune") - Console.warning("\n Stitcher will handle matched refs AND auto-prune unmatched") - if not Console.confirm("\nI understand the graph will be broken - proceed?"): - Console.warning("Restore cancelled - wise choice!") - sys.exit(0) - else: - Console.warning("Restore cancelled") - sys.exit(0) - - # Check for conflicts (single kg-backup/2 model: scope is named in the header; - # one ontology entry == a scoped backup). - _ontologies = [ - o.get("name") for o in backup_data.get("header", {}).get("ontologies", []) - if o.get("name") - ] - # Scope for downstream integrity ops: one ontology entry == a scoped backup; - # otherwise graph-wide. (kg-backup/2 has no top-level "ontology" field — - # passing None here would silently widen a scoped prune to the whole graph.) - scope = _ontologies[0] if len(_ontologies) == 1 else None - if len(_ontologies) == 1: - ontology_name = _ontologies[0] - client = self.conn.get_client() - existing = AGEQueries.get_ontology_info(client, ontology_name) - - if existing: - Console.warning(f"\n⚠ Ontology '{ontology_name}' already exists in database") - Console.info(f" Current concepts: {existing['statistics']['concept_count']}") - - print("\nRestore options:") - print(" 1) Skip existing nodes (add only new data)") - print(" 2) Overwrite existing nodes (update with backup data)") - print(" 3) Cancel") - print("") - - conflict_choice = input("Select option [1-3]: ").strip() - - if conflict_choice == "1": - overwrite = False - elif conflict_choice == "2": - overwrite = True - Console.warning("⚠ This will overwrite existing data!") - if not Console.confirm("Are you sure?"): - Console.warning("Restore cancelled") - sys.exit(0) - else: - Console.warning("Restore cancelled") - sys.exit(0) - - # Confirm - print("") - if not Console.confirm("Proceed with restore?"): - Console.warning("Restore cancelled") - sys.exit(0) - - # Restore - Console.info("\nRestoring data...") - try: - client = self.conn.get_client() - import_stats = DataImporter.import_backup( - client, - backup_data, - overwrite_existing=overwrite - ) - - # Summary - Console.section("Restore Complete") - Console.success("✓ Data restored successfully") - Console.info(f" Concepts: {import_stats['concepts_created']}") - Console.info(f" Sources: {import_stats['sources_created']}") - Console.info(f" Instances: {import_stats['instances_created']}") - Console.info(f" Relationships: {import_stats['relationships_created']}") - - # Handle dangling relationships based on user choice - if restitch_action == "prune": - Console.section("Pruning Dangling Relationships") - Console.info("Removing relationships to external concepts...") - - client = self.conn.get_client() - prune_result = DatabaseIntegrity.prune_dangling_relationships( - client, - ontology=scope, - dry_run=False - ) - - if prune_result["total_pruned"] > 0: - Console.success(f"✓ Pruned {prune_result['total_pruned']} dangling relationships") - Console.info(" Graph is now clean and isolated") - else: - Console.success("✓ No dangling relationships found") - - elif restitch_action == "defer": - Console.warning("\n⚠ Graph has dangling relationships - integrity compromised") - Console.warning(" Run stitcher or pruner immediately!") - - # Validate integrity after restore - Console.info("\nValidating database integrity...") - client = self.conn.get_client() - integrity = DatabaseIntegrity.check_integrity( - client, - ontology=scope - ) - - if integrity["issues"] or integrity["warnings"]: - DatabaseIntegrity.print_integrity_report(integrity) - - # Offer repair - if integrity["issues"]: - Console.warning("\n⚠ Integrity issues detected after restore") - if Console.confirm("Attempt automatic repair?"): - Console.info("Repairing orphaned concepts...") - repairs = DatabaseIntegrity.repair_orphaned_concepts( - client, - ontology=scope - ) - Console.success(f"✓ Repaired {repairs} orphaned concepts") - - # Re-check - Console.info("Re-validating...") - integrity = DatabaseIntegrity.check_integrity( - client, - ontology=scope - ) - if not integrity["issues"]: - Console.success("✓ All issues resolved") - else: - Console.warning("⚠ Some issues remain - manual intervention may be needed") - else: - Console.success("✓ No integrity issues detected") - - # Show tips based on action taken - if restitch_action == "defer": - # URGENT: graph is broken - self._show_tips(backup_file=backup_file, urgent=True) - elif external_concepts: - # Pruned - graph is clean - self._show_tips(backup_file=None, urgent=False) - else: - # No external deps - self._show_tips() - - except Exception as e: - Console.error(f"✗ Restore failed: {e}") - import traceback - traceback.print_exc() - sys.exit(1) - - def _show_tips(self, backup_file: Optional[Path] = None, urgent: bool = False): - """Show helpful tips""" - if urgent: - Console.error("\n⚠ URGENT: Graph integrity compromised!") - Console.error(" Dangling relationships will break traversal queries") - Console.warning("\nRun ONE of these immediately:") - print(f" {Colors.FAIL}python -m src.admin.stitch --backup {backup_file}{Colors.ENDC}") - print(f" {Colors.FAIL}python -m src.admin.prune{Colors.ENDC}") - Console.warning("\nUntil you do, the graph is in an inconsistent state!") - else: - Console.warning("\n💡 Next steps:") - print(" • Verify data: python cli.py database stats") - print(" • Query concepts: python cli.py search \"your query\"") - print(" • View in browser: http://localhost:7474") - - if backup_file: - Console.info("\nℹ Optional: Reconnect external relationships using semantic stitcher") - print(f" {Colors.OKCYAN}python -m src.admin.stitch --backup {backup_file}{Colors.ENDC}") - print(" (Or leave isolated if you prefer strict ontology boundaries)") - - def restore_non_interactive(self, backup_file: str, overwrite: bool = False): - """Non-interactive restore for automation""" - if not self.conn.test_connection(): - Console.error("✗ Cannot connect to Apache AGE database") - sys.exit(1) - - backup_path = Path(backup_file) - if not backup_path.exists(): - Console.error(f"✗ Backup file not found: {backup_file}") - sys.exit(1) - - Console.info(f"Restoring from: {backup_path.name}") - - # Load backup - with open(backup_path, 'r') as f: - backup_data = json.load(f) - - DataImporter.validate_backup(backup_data) - - # Restore - client = self.conn.get_client() - import_stats = DataImporter.import_backup( - client, - backup_data, - overwrite_existing=overwrite - ) - - Console.success("✓ Restore complete") - Console.info(f" Concepts: {import_stats['concepts_created']}") - Console.info(f" Sources: {import_stats['sources_created']}") - Console.info(f" Instances: {import_stats['instances_created']}") - Console.info(f" Relationships: {import_stats['relationships_created']}") - - def close(self): - """Cleanup""" - self.conn.close() - - -def main(): - parser = argparse.ArgumentParser( - description="Restore knowledge graph data from backup files", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - # Interactive menu - python -m src.admin.restore - - # Restore specific file (skip existing nodes) - python -m src.admin.restore --file backups/ontology_watts_20251006.json - - # Restore and overwrite existing nodes - python -m src.admin.restore --file backups/full_backup_20251006.json --overwrite - - # Custom backup directory - python -m src.admin.restore --backup-dir /path/to/backups - """ - ) - - parser.add_argument('--file', type=str, - help='Backup file to restore (non-interactive)') - parser.add_argument('--overwrite', action='store_true', - help='Overwrite existing nodes (default: skip duplicates)') - parser.add_argument('--backup-dir', type=str, default='backups', - help='Backup directory (default: backups/)') - - args = parser.parse_args() - - cli = RestoreCLI(backup_dir=args.backup_dir) - - try: - if args.file: - # Non-interactive mode - cli.restore_non_interactive( - backup_file=args.file, - overwrite=args.overwrite - ) - else: - # Interactive mode - cli.run_interactive() - finally: - cli.close() - - -if __name__ == '__main__': - main() diff --git a/api/admin/stitch.py b/api/admin/stitch.py deleted file mode 100755 index df4a91a33..000000000 --- a/api/admin/stitch.py +++ /dev/null @@ -1,296 +0,0 @@ -#!/usr/bin/env python3 -""" -Semantic Stitcher - Reconnect dangling relationships after partial restore - -When you restore a partial ontology backup, external concept references become -dangling (point to non-existent concepts). This tool finds similar concepts in -the target database and reconnects the relationships using vector similarity. - -Workflow: -1. Restore ontology normally (dangling refs created) -2. Run stitcher to analyze and reconnect -3. Review proposed stitches before applying -4. Optionally adjust similarity threshold and re-run - -Usage: - python -m src.admin.stitch --backup backups/ontology_a.json - python -m src.admin.stitch --backup backups/ontology_a.json --threshold 0.90 - python -m src.admin.stitch --backup backups/ontology_a.json --auto-apply -""" - -import argparse -import json -import sys -from pathlib import Path - -# Add project root to path -sys.path.insert(0, str(Path(__file__).parent.parent.parent)) - -from api.lib.console import Console, Colors -from api.lib.config import Config -from api.lib.age_ops import AGEConnection -from api.lib.restitching import ConceptMatcher -from api.lib.serialization import DataImporter, KgBackupV2Reader -from api.lib.integrity import DatabaseIntegrity - - -def _find_external_concepts(reader: KgBackupV2Reader): - """Find concept references with no matching concept record in the backup. - - Reads the single backup model (kg-backup/2) via the reader (relationship types - are de-interned). Produces the same structure the legacy - ConceptMatcher.create_restitch_plan consumes, so the DB-side re-stitch engine is - unchanged. Full migration of the engine to api/app/lib/concept_matcher.py is - ADR-102 P4 (integration mode). - """ - internal = {c["concept_id"] for c in reader.concepts()} - external = {} - for rel in reader.relationships(): - for endpoint, direction in ((rel.get("from"), "outgoing"), (rel.get("to"), "incoming")): - if endpoint and endpoint not in internal: - external.setdefault(endpoint, { - "concept_id": endpoint, - "referencing_relationships": [], - "label": None, - "embedding": None, - })["referencing_relationships"].append({ - "from": rel.get("from"), "to": rel.get("to"), - "type": rel.get("type"), "direction": direction, - }) - return list(external.values()) - - -def main(): - parser = argparse.ArgumentParser( - description="Semantic stitcher - Reconnect dangling relationships using vector similarity", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Workflow: - 1. Restore partial ontology backup (creates dangling references) - 2. Run stitcher to analyze external dependencies - 3. Review proposed re-stitching (concept matches) - 4. Apply stitches to reconnect relationships - -Examples: - # Analyze external dependencies and propose stitches - python -m src.admin.stitch --backup backups/ontology_a.json - - # Higher threshold for stricter matching (>95% similarity) - python -m src.admin.stitch --backup backups/ontology_a.json --threshold 0.95 - - # Lower threshold for more permissive matching (>80% similarity) - python -m src.admin.stitch --backup backups/ontology_a.json --threshold 0.80 - - # Auto-apply without confirmation - python -m src.admin.stitch --backup backups/ontology_a.json --auto-apply - - # Dry-run: show proposed stitches but don't apply - python -m src.admin.stitch --backup backups/ontology_a.json --dry-run - - # Create placeholders for unmatched concepts - python -m src.admin.stitch --backup backups/ontology_a.json --create-placeholders - -Note: - This tool analyzes the BACKUP FILE to identify external dependencies, then - searches the CURRENT DATABASE for similar concepts. It does NOT modify the - backup file - only the live database. - """ - ) - - parser.add_argument('--backup', '-b', type=str, required=True, - help='Backup file to analyze for external dependencies') - parser.add_argument('--threshold', '-t', type=float, default=0.85, - help='Similarity threshold for matching (0.0-1.0, default: 0.85)') - parser.add_argument('--auto-apply', action='store_true', - help='Auto-apply stitches without confirmation') - parser.add_argument('--dry-run', action='store_true', - help='Show proposed stitches but do not apply') - parser.add_argument('--create-placeholders', action='store_true', - help='Create placeholder concepts for unmatched references') - - args = parser.parse_args() - - # Validate threshold - if not 0.0 <= args.threshold <= 1.0: - Console.error("Threshold must be between 0.0 and 1.0") - sys.exit(1) - - Console.section("Semantic Stitcher") - - # Load backup file - backup_path = Path(args.backup) - if not backup_path.exists(): - Console.error(f"✗ Backup file not found: {backup_path}") - sys.exit(1) - - Console.info(f"Loading backup: {backup_path.name}") - with open(backup_path, 'r') as f: - backup_data = json.load(f) - - # Validate backup - try: - DataImporter.validate_backup(backup_data) - except Exception as e: - Console.error(f"✗ Invalid backup file: {e}") - sys.exit(1) - - # Connect to database - conn = AGEConnection() - if not conn.test_connection(): - Console.error("✗ Cannot connect to Apache AGE database") - Console.warning(f" Check connection: {Config.postgres_host()}:{Config.postgres_port()}") - sys.exit(1) - - Console.success("✓ Connected to target database") - reader = KgBackupV2Reader(backup_data) - _ontologies = [o.get("name") for o in reader.header.get("ontologies", []) if o.get("name")] - # Scope for integrity ops: one ontology entry == scoped; else graph-wide - # (passing None to a scoped prune would silently widen it to the whole graph). - scope = _ontologies[0] if len(_ontologies) == 1 else None - Console.key_value(" Backup format", reader.format_version) - if len(_ontologies) == 1: - Console.key_value(" Ontology", _ontologies[0]) - Console.key_value(" Similarity threshold", f"{args.threshold:.0%}") - - # Create matcher - matcher = ConceptMatcher(conn, similarity_threshold=args.threshold) - - # Find external concepts (read from the single kg-backup/2 model) - Console.info("\nAnalyzing backup for external concept references...") - external_concepts = _find_external_concepts(reader) - - if not external_concepts: - Console.success("\n✓ No external concept references found") - Console.info(" This backup has no cross-ontology dependencies") - conn.close() - sys.exit(0) - - Console.warning(f"\nFound {len(external_concepts)} external concept references") - - # Create re-stitching plan - Console.info("Searching target database for similar concepts...") - client = conn.get_client() - restitch_plan = matcher.create_restitch_plan(external_concepts, client) - - # Print plan - matcher.print_restitch_plan(restitch_plan) - - # Check if any matches found - if not restitch_plan["matched"] and not restitch_plan["unmatched"]: - Console.info("\nNo stitching needed") - conn.close() - sys.exit(0) - - # Dry-run mode - if args.dry_run: - Console.warning("\n[DRY-RUN MODE] No changes will be applied") - conn.close() - sys.exit(0) - - # Offer choices - if restitch_plan["matched"]: - if args.auto_apply: - Console.warning("\n[AUTO-APPLY MODE] Applying stitches automatically...") - apply = True - prune = False - else: - Console.warning("\n⚠ What would you like to do with external references?") - print(" 1) Re-stitch to similar concepts (semantic merging)") - print(" 2) Prune dangling relationships (keep isolated)") - print(" 3) Leave as-is (accept dangling references)") - print("") - choice = input("Select option [1-3]: ").strip() - - if choice == "1": - apply = True - prune = False - elif choice == "2": - apply = False - prune = True - else: - apply = False - prune = False - - if apply: - Console.section("Applying Re-stitching") - client = conn.get_client() - stats = matcher.execute_restitch( - restitch_plan, - client, - create_placeholders=args.create_placeholders - ) - - Console.success(f"\n✓ Re-stitched {stats['restitched']} relationships") - - if stats["placeholders"] > 0: - Console.info(f" Created {stats['placeholders']} placeholder concepts") - - # MUST prune unmatched references - 100% edge handling - if restitch_plan["unmatched"]: - Console.warning(f"\n⚠ {len(restitch_plan['unmatched'])} external concepts could not be matched") - Console.info(" Pruning relationships to unmatched concepts...") - - ontology = scope - client = conn.get_client() - prune_result = DatabaseIntegrity.prune_dangling_relationships( - client, - ontology=ontology, - dry_run=False - ) - - if prune_result["total_pruned"] > 0: - Console.success(f"✓ Pruned {prune_result['total_pruned']} unmatched relationships") - Console.info(" All external references handled - graph is clean") - - # Show next steps - Console.warning("\n💡 Next steps:") - print(" • Verify stitches: python cli.py ontology info \"Ontology Name\"") - print(" • Check integrity: python -m src.admin.check_integrity") - print(" • Query concepts: python cli.py search \"your query\"") - elif prune: - # Prune dangling relationships - Console.section("Pruning Dangling Relationships") - Console.info("Removing relationships to external concepts...") - - ontology = scope - client = conn.get_client() - prune_result = DatabaseIntegrity.prune_dangling_relationships( - client, - ontology=ontology, - dry_run=False - ) - - DatabaseIntegrity.print_pruning_report(prune_result, dry_run=False) - - Console.success("\n✓ All external references handled - graph is clean") - Console.info(" Ontology is now isolated - no cross-ontology relationships") - else: - Console.warning("\nNo action taken - dangling relationships remain") - Console.info(" You can:") - print(" • Re-run with different --threshold to adjust matches") - print(" • Use: python -m src.admin.prune to remove danglers") - print(" • Leave as-is if queries don't traverse these relationships") - else: - Console.warning("\nNo matches found - all external references would remain dangling") - - if args.create_placeholders: - Console.warning(f"Creating {len(restitch_plan['unmatched'])} placeholder concepts...") - client = conn.get_client() - stats = matcher.execute_restitch( - restitch_plan, - client, - create_placeholders=True - ) - Console.success(f"✓ Created {stats['placeholders']} placeholder concepts") - Console.warning(" ⚠ Placeholders need manual review and connection") - else: - Console.info("\n Options:") - print(" 1. Lower --threshold to find more permissive matches") - print(" 2. Use --create-placeholders to create stub concepts") - print(" 3. Accept dangling references (queries will skip them)") - - conn.close() - - -if __name__ == '__main__': - main() diff --git a/api/app/lib/concept_matcher.py b/api/app/lib/concept_matcher.py index 8558b3181..eb1d47b4d 100644 --- a/api/app/lib/concept_matcher.py +++ b/api/app/lib/concept_matcher.py @@ -1,9 +1,10 @@ """ Concept matching engine for restore integration mode (ADR-102 §2). -Succeeds the legacy ``api/lib/restitching.py`` ``ConceptMatcher`` as the -``api/app`` integration-mode engine. NOTE: this is not a line-for-line port — -the legacy matcher used a single 0.85 threshold; this module implements the +Succeeds the legacy ``api/lib/restitching.py`` ``ConceptMatcher`` (removed in +ADR-102 P6) as the ``api/app`` integration-mode engine. NOTE: this was not a +line-for-line port — the legacy matcher used a single 0.85 threshold; this +module implements the project's **canonical two-tier policy** (``api/app/lib/ingestion.py:432-461``), which is what ADR-102 §2 specifies for integration mode. It is the *matching engine only*: given an external concept (carrying an embedding, and optionally a diff --git a/api/app/models/admin.py b/api/app/models/admin.py index d884817b7..d8c063741 100644 --- a/api/app/models/admin.py +++ b/api/app/models/admin.py @@ -1,7 +1,7 @@ """ Admin API Models -Models for backup, restore, status, and reset operations. +Models for backup, status, and admin-job operations. """ from pydantic import BaseModel, Field @@ -87,31 +87,9 @@ class ListBackupsResponse(BaseModel): # (mode, epoch), not request/response models. See routes/admin.py + restore_worker. -# ========== Reset Models ========== - -class ResetRequest(BaseModel): - """Request to reset database (requires authentication)""" - username: str = Field(..., description="Username for authentication") - password: str = Field(..., description="Password for authentication") - confirm: bool = Field(..., description="Must be true to confirm destructive operation") - clear_logs: bool = Field(True, description="Clear log files") - clear_checkpoints: bool = Field(True, description="Clear checkpoint files") - - -class SchemaValidation(BaseModel): - """Schema validation results""" - constraints_count: int - vector_index_exists: bool - node_count: int - schema_test_passed: bool - - -class ResetResponse(BaseModel): - """Reset operation response""" - success: bool - schema_validation: SchemaValidation - message: str - warnings: List[str] = [] +# Reset models (ResetRequest/ResetResponse/SchemaValidation) removed in ADR-102 P6: +# the API reset path was retired (too dangerous) and the api/admin/reset.py CLI it +# delegated to was deleted. Database reset is an operator-level concern now. # ========== Admin Job Models ========== diff --git a/api/app/routes/admin.py b/api/app/routes/admin.py index dab0b3026..40f2573e1 100644 --- a/api/app/routes/admin.py +++ b/api/app/routes/admin.py @@ -28,8 +28,10 @@ SystemStatusResponse, BackupRequest, ListBackupsResponse, - # ResetRequest, ResetResponse removed - reset moved to initialize-platform.sh option 0 - # BackupResponse / RestoreRequest / RestoreResponse removed in ADR-102 P6 (dead) + # ResetRequest/ResetResponse/SchemaValidation and BackupResponse/RestoreRequest/ + # RestoreResponse removed in ADR-102 P6 (dead). The API reset path was retired + # (too dangerous — reset was one of the earliest worker functions); database reset + # is an operator-level concern now, not an API/worker operation. ) from ..dependencies.auth import CurrentUser, require_permission from ..services.admin_service import AdminService diff --git a/api/app/services/admin_service.py b/api/app/services/admin_service.py index f915ea184..bc9f0418e 100644 --- a/api/app/services/admin_service.py +++ b/api/app/services/admin_service.py @@ -25,8 +25,6 @@ PythonEnvironment, ConfigurationStatus, ListBackupsResponse, - ResetResponse, - SchemaValidation, ) @@ -104,51 +102,6 @@ async def list_backups(self) -> ListBackupsResponse: count=len(backups), ) - async def reset_database( - self, - clear_logs: bool = True, - clear_checkpoints: bool = True - ) -> ResetResponse: - """Reset database (nuclear option) - Delegates to reset module""" - - # Import reset module - from ...admin.reset import ResetManager - from ..services.job_queue import get_job_queue - - # Execute reset in thread pool (reset module uses subprocess) - loop = asyncio.get_event_loop() - manager = ResetManager(project_root=self.project_root) - - result = await loop.run_in_executor( - None, - manager.reset, - clear_logs, - clear_checkpoints, - False # verbose=False for API calls - ) - - if not result["success"]: - raise RuntimeError(result["error"]) - - # Note: Jobs table is already cleared by database reset (docker-compose down -v) - # No need to clear again - attempting to do so would fail with stale connection - logger.info("✓ Jobs database cleared by database reset") - - # Convert validation results to API response model - validation = result["validation"] - schema_validation = SchemaValidation( - constraints_count=validation["schema_count"], # PostgreSQL schemas (kg_api, kg_auth, kg_logs) - vector_index_exists=validation["graph_exists"], # AGE graph exists - node_count=validation["node_count"], - schema_test_passed=validation["schema_test_passed"], - ) - - return ResetResponse( - success=True, - schema_validation=schema_validation, - message="Database reset successfully", - ) - # ========== Helper Methods ========== async def _check_docker_running(self) -> bool: diff --git a/api/lib/restitching.py b/api/lib/restitching.py deleted file mode 100644 index d9c994d7c..000000000 --- a/api/lib/restitching.py +++ /dev/null @@ -1,336 +0,0 @@ -""" -Semantic Re-stitching - Intelligently reconnect dangling relationships - -When restoring partial backups, external concept references can be reconnected -to similar concepts in the target database using vector similarity matching. - -This uses the same concept matching algorithm as ingestion: -1. Detect external concept references (not in backup) -2. Check if similar concepts exist in target database -3. Offer to reconnect relationships to matched concepts -4. Optionally create placeholder concepts for unmatched references -""" - -from typing import Dict, Any, List, Tuple, Optional -import json -import numpy as np - -from .console import Console, Colors -from .age_ops import AGEConnection -from ..api.lib.age_client import AGEClient - - -class ConceptMatcher: - """Match external concepts to similar concepts in target database""" - - def __init__(self, conn: AGEConnection, similarity_threshold: float = 0.85): - """ - Initialize concept matcher - - Args: - conn: AGE connection - similarity_threshold: Minimum similarity for matching (default: 0.85) - """ - self.conn = conn - self.threshold = similarity_threshold - - def find_external_concepts(self, backup_data: Dict[str, Any]) -> List[Dict[str, Any]]: - """ - Find all external concept references in backup - - DEAD (ADR-102 P6): reads the removed v1 ``data["data"]`` shape. stitch.py now - derives external concepts from KgBackupV2Reader (``_find_external_concepts``) - and feeds them to ``create_restitch_plan``; this method has no live caller. - Do not wire to new code. - - Args: - backup_data: Parsed backup JSON - - Returns: - List of external concept references with metadata - """ - data = backup_data.get("data", {}) - internal_concept_ids = {c["concept_id"] for c in data.get("concepts", [])} - - external_refs = {} - - # Scan relationships for external concepts - for rel in data.get("relationships", []): - from_id = rel.get("from") - to_id = rel.get("to") - - # Check 'from' concept - if from_id not in internal_concept_ids: - if from_id not in external_refs: - external_refs[from_id] = { - "concept_id": from_id, - "referencing_relationships": [], - "label": None, # Will try to infer - "embedding": None - } - external_refs[from_id]["referencing_relationships"].append({ - "from": from_id, - "to": to_id, - "type": rel["type"], - "direction": "outgoing" - }) - - # Check 'to' concept - if to_id not in internal_concept_ids: - if to_id not in external_refs: - external_refs[to_id] = { - "concept_id": to_id, - "referencing_relationships": [], - "label": None, - "embedding": None - } - external_refs[to_id]["referencing_relationships"].append({ - "from": from_id, - "to": to_id, - "type": rel["type"], - "direction": "incoming" - }) - - return list(external_refs.values()) - - def match_concept_in_database( - self, - external_concept: Dict[str, Any], - client: AGEClient - ) -> Optional[Dict[str, Any]]: - """ - Find similar concept in target database using vector similarity - - Args: - external_concept: External concept metadata - client: AGE client - - Returns: - Matched concept with similarity score, or None if no match - """ - # If we have an embedding for the external concept, use it - if external_concept.get("embedding"): - embedding = external_concept["embedding"] - elif external_concept.get("label"): - # Generate embedding from label - embedding = self.conn.generate_embedding(external_concept["label"]) - else: - # Can't match without label or embedding - return None - - # Fetch all concepts with embeddings - query = """ - MATCH (c:Concept) - WHERE c.embedding IS NOT NULL - RETURN c.concept_id as concept_id, - c.label as label, - c.embedding as embedding - """ - results = client._execute_cypher(query) - - if not results: - return None - - # Compute cosine similarity in Python - embedding_vec = np.array(embedding) - best_match = None - best_score = self.threshold - - for record in results: - concept_id = str(record.get("concept_id", "")).strip('"') - label = str(record.get("label", "")).strip('"') - embedding_str = str(record.get("embedding", "[]")) - - try: - candidate_embedding = json.loads(embedding_str) - candidate_vec = np.array(candidate_embedding) - - # Cosine similarity - similarity = float(np.dot(embedding_vec, candidate_vec) / - (np.linalg.norm(embedding_vec) * np.linalg.norm(candidate_vec))) - - if similarity > best_score: - best_score = similarity - best_match = { - "concept_id": concept_id, - "label": label, - "similarity": similarity - } - except (json.JSONDecodeError, ValueError): - continue - - return best_match - - def create_restitch_plan( - self, - external_concepts: List[Dict[str, Any]], - client: AGEClient - ) -> Dict[str, Any]: - """ - Create re-stitching plan for external concepts - - Args: - external_concepts: List of external concept references - client: AGE client - - Returns: - Re-stitching plan with matches and recommendations - """ - plan = { - "total_external": len(external_concepts), - "matched": [], - "unmatched": [], - "statistics": { - "high_confidence": 0, # > 0.95 - "medium_confidence": 0, # 0.85-0.95 - "no_match": 0 - } - } - - Console.info(f"Analyzing {len(external_concepts)} external concept references...") - - for ext_concept in external_concepts: - match = self.match_concept_in_database(ext_concept, client) - - if match: - similarity = match["similarity"] - restitch_info = { - "external_id": ext_concept["concept_id"], - "target_id": match["concept_id"], - "target_label": match["label"], - "similarity": similarity, - "relationship_count": len(ext_concept["referencing_relationships"]), - "relationships": ext_concept["referencing_relationships"] - } - - plan["matched"].append(restitch_info) - - if similarity > 0.95: - plan["statistics"]["high_confidence"] += 1 - else: - plan["statistics"]["medium_confidence"] += 1 - else: - plan["unmatched"].append({ - "external_id": ext_concept["concept_id"], - "relationship_count": len(ext_concept["referencing_relationships"]), - "relationships": ext_concept["referencing_relationships"] - }) - plan["statistics"]["no_match"] += 1 - - return plan - - def print_restitch_plan(self, plan: Dict[str, Any]): - """Print re-stitching plan to console""" - Console.section("Semantic Re-stitching Plan") - - Console.info(f"Found {plan['total_external']} external concept references") - - # Statistics - stats = plan["statistics"] - Console.info("\nMatching Results:") - Console.key_value(" High confidence (>95%)", str(stats["high_confidence"]), - Colors.BOLD, Colors.OKGREEN) - Console.key_value(" Medium confidence (85-95%)", str(stats["medium_confidence"]), - Colors.BOLD, Colors.WARNING) - Console.key_value(" No match", str(stats["no_match"]), - Colors.BOLD, Colors.FAIL) - - # Show matches - if plan["matched"]: - Console.warning("\nProposed Re-stitching:") - for i, match in enumerate(plan["matched"][:5], 1): - color = Colors.OKGREEN if match["similarity"] > 0.95 else Colors.WARNING - print(f"\n {i}. External: {match['external_id']}") - print(f" → Connect to: {color}{match['target_label']}{Colors.ENDC}") - print(f" Similarity: {color}{match['similarity']:.1%}{Colors.ENDC}") - print(f" Affects {match['relationship_count']} relationship(s)") - - if len(plan["matched"]) > 5: - Console.info(f"\n ... and {len(plan['matched']) - 5} more matches") - - # Show unmatched - if plan["unmatched"]: - Console.warning(f"\nUnmatched ({len(plan['unmatched'])}):") - for i, unmatch in enumerate(plan["unmatched"][:5], 1): - print(f" {i}. {unmatch['external_id']} ({unmatch['relationship_count']} relationships)") - - if len(plan["unmatched"]) > 5: - Console.info(f" ... and {len(plan['unmatched']) - 5} more") - - Console.warning("\n Options for unmatched concepts:") - print(" 1. Leave relationships dangling (skip)") - print(" 2. Create placeholder concepts") - print(" 3. Manual review required") - - def execute_restitch( - self, - plan: Dict[str, Any], - client: AGEClient, - create_placeholders: bool = False - ) -> Dict[str, int]: - """ - Execute re-stitching plan - - Args: - plan: Re-stitching plan from create_restitch_plan() - client: AGE client - create_placeholders: If True, create placeholder concepts for unmatched - - Returns: - Statistics: restitched_count, placeholder_count - """ - stats = { - "restitched": 0, - "placeholders": 0, - "skipped": 0 - } - - # Re-stitch matched concepts - Console.info("\nRe-stitching matched concepts...") - for match in plan["matched"]: - for rel in match["relationships"]: - # Determine which concept to update - if rel["direction"] == "incoming": - # External concept is the 'to' side - # Update relationship: (from)-[type]->(external) → (from)-[type]->(target) - query = """ - MATCH (from_concept:Concept {concept_id: $from_id}) - MATCH (target_concept:Concept {concept_id: $target_id}) - MERGE (from_concept)-[r:""" + rel["type"] + """]->(target_concept) - RETURN count(r) as created - """ - client._execute_cypher(query, params={"from_id": rel["from"], "target_id": match["target_id"]}) - else: - # External concept is the 'from' side - # Update relationship: (external)-[type]->(to) → (target)-[type]->(to) - query = """ - MATCH (to_concept:Concept {concept_id: $to_id}) - MATCH (target_concept:Concept {concept_id: $target_id}) - MERGE (target_concept)-[r:""" + rel["type"] + """]->(to_concept) - RETURN count(r) as created - """ - client._execute_cypher(query, params={"to_id": rel["to"], "target_id": match["target_id"]}) - - stats["restitched"] += len(match["relationships"]) - Console.progress(stats["restitched"], - sum(len(m["relationships"]) for m in plan["matched"]), - "Re-stitching") - - # Handle unmatched - if create_placeholders and plan["unmatched"]: - Console.info("\nCreating placeholder concepts for unmatched references...") - for unmatch in plan["unmatched"]: - # Create placeholder concept (no embedding, marked as placeholder) - query = """ - MERGE (c:Concept {concept_id: $concept_id}) - ON CREATE SET c.label = 'Placeholder: ' + $concept_id, - c.placeholder = true, - c.search_terms = [] - RETURN c - """ - client._execute_cypher(query, params={"concept_id": unmatch["external_id"]}) - stats["placeholders"] += 1 - else: - stats["skipped"] = len(plan["unmatched"]) - - return stats diff --git a/tests/api/test_endpoint_security.py b/tests/api/test_endpoint_security.py index 6099154c2..e20dbbb57 100644 --- a/tests/api/test_endpoint_security.py +++ b/tests/api/test_endpoint_security.py @@ -263,13 +263,6 @@ def test_admin_status_succeeds_with_admin_token(api_client, mock_oauth_validatio assert response.status_code == 200 -@pytest.mark.api -@pytest.mark.security -def test_admin_reset_requires_admin_role(api_client, mock_oauth_validation, auth_headers_user): - """POST /admin/reset was removed — skip.""" - pytest.skip("POST /admin/reset endpoint removed") - - @pytest.mark.api @pytest.mark.security def test_admin_extraction_config_requires_admin(api_client, mock_oauth_validation, auth_headers_user): From 436180c9aaf1b6f2ffc32fb85c72d383864ff8a6 Mon Sep 17 00:00:00 2001 From: Aaron Bockelie Date: Mon, 1 Jun 2026 17:42:56 -0500 Subject: [PATCH 2/2] docs: update require_role example off the retired /admin/reset endpoint The Phase-2 require_role docstring used /admin/reset / reset_database as its illustrative example; that endpoint was retired in this PR. Point it at the live /admin/backup route instead. Cosmetic (placeholder decorator, never wired). --- api/app/middleware/auth.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/app/middleware/auth.py b/api/app/middleware/auth.py index 2f32c7cba..c4881839d 100644 --- a/api/app/middleware/auth.py +++ b/api/app/middleware/auth.py @@ -129,9 +129,9 @@ def require_role(role: str): Decorator for role-based access control (Phase 2). Usage: - @router.post("/admin/reset") + @router.post("/admin/backup") @require_role("admin") - async def reset_database(): + async def create_backup(): ... """ def decorator(func):