From 7b8a9fcd6b8fd1eb46198149317aaa57cac9d5ed Mon Sep 17 00:00:00 2001 From: MichaelAnders Date: Mon, 26 Jan 2026 15:31:08 +0100 Subject: [PATCH] Revert "Add intelligent audit logging and oversized error logging systems (#20)" This reverts commit 7857c51f17e03a30a0144c5f4417a7fdf7ed6eef. --- scripts/audit-log-reader.js | 399 ------------------ scripts/compact-dictionary.js | 204 --------- scripts/test-deduplication.js | 448 -------------------- src/api/middleware/session.js | 4 - src/config/index.js | 384 +++-------------- src/logger/audit-logger.js | 609 --------------------------- src/logger/deduplicator.js | 547 ------------------------ src/logger/index.js | 106 +---- src/logger/oversized-error-stream.js | 288 ------------- src/orchestrator/index.js | 316 ++++---------- 10 files changed, 153 insertions(+), 3152 deletions(-) delete mode 100755 scripts/audit-log-reader.js delete mode 100755 scripts/compact-dictionary.js delete mode 100755 scripts/test-deduplication.js delete mode 100644 src/logger/audit-logger.js delete mode 100644 src/logger/deduplicator.js delete mode 100644 src/logger/oversized-error-stream.js diff --git a/scripts/audit-log-reader.js b/scripts/audit-log-reader.js deleted file mode 100755 index 48b1fbc..0000000 --- a/scripts/audit-log-reader.js +++ /dev/null @@ -1,399 +0,0 @@ -#!/usr/bin/env node - -/** - * LLM Audit Log Reader - * - * Utility to read and reconstruct audit log entries from deduplicated logs. - * Resolves hash references from the dictionary file and outputs full content. - * - * Usage: - * node scripts/audit-log-reader.js [options] - * - * Options: - * --log-file Path to audit log file (default: logs/llm-audit.log) - * --dict-file Path to dictionary file (default: logs/llm-audit-dictionary.jsonl) - * --full Output full restored entries (resolve all references) - * --filter Filter by field (e.g., type=llm_request, provider=anthropic) - * --correlation-id Filter by correlation ID - * --last Show only last N entries - * --stats Show deduplication statistics - * --verify Verify all references can be resolved - * --help Show this help message - * - * Examples: - * # Show all entries with full content - * node scripts/audit-log-reader.js --full - * - * # Show only requests - * node scripts/audit-log-reader.js --filter type=llm_request - * - * # Show last 5 entries - * node scripts/audit-log-reader.js --last 5 --full - * - * # Show deduplication statistics - * node scripts/audit-log-reader.js --stats - * - * # Verify all references resolve - * node scripts/audit-log-reader.js --verify - */ - -const fs = require("fs"); -const path = require("path"); -const readline = require("readline"); -const { ContentDeduplicator } = require("../src/logger/deduplicator"); - -// Default file paths -const DEFAULT_LOG_FILE = path.join(process.cwd(), "logs", "llm-audit.log"); -const DEFAULT_DICT_FILE = path.join(process.cwd(), "logs", "llm-audit-dictionary.jsonl"); - -/** - * Parse command line arguments - */ -function parseArgs() { - const args = process.argv.slice(2); - const options = { - logFile: DEFAULT_LOG_FILE, - dictFile: DEFAULT_DICT_FILE, - full: false, - filter: null, - correlationId: null, - last: null, - stats: false, - verify: false, - help: false, - }; - - for (let i = 0; i < args.length; i++) { - const arg = args[i]; - switch (arg) { - case "--log-file": - options.logFile = args[++i]; - break; - case "--dict-file": - options.dictFile = args[++i]; - break; - case "--full": - options.full = true; - break; - case "--filter": - const filterArg = args[++i]; - const [key, value] = filterArg.split("="); - options.filter = { key, value }; - break; - case "--correlation-id": - options.correlationId = args[++i]; - break; - case "--last": - options.last = Number.parseInt(args[++i], 10); - break; - case "--stats": - options.stats = true; - break; - case "--verify": - options.verify = true; - break; - case "--help": - options.help = true; - break; - default: - console.error(`Unknown option: ${arg}`); - process.exit(1); - } - } - - return options; -} - -/** - * Show help message - */ -function showHelp() { - const helpText = ` -LLM Audit Log Reader - -Usage: node scripts/audit-log-reader.js [options] - -Options: - --log-file Path to audit log file (default: logs/llm-audit.log) - --dict-file Path to dictionary file (default: logs/llm-audit-dictionary.jsonl) - --full Output full restored entries (resolve all references) - --filter Filter by field (e.g., type=llm_request, provider=anthropic) - --correlation-id Filter by correlation ID - --last Show only last N entries - --stats Show deduplication statistics - --verify Verify all references can be resolved - --help Show this help message - -Examples: - # Show all entries with full content - node scripts/audit-log-reader.js --full - - # Show only requests - node scripts/audit-log-reader.js --filter type=llm_request - - # Show last 5 entries - node scripts/audit-log-reader.js --last 5 --full - - # Show deduplication statistics - node scripts/audit-log-reader.js --stats - - # Verify all references resolve - node scripts/audit-log-reader.js --verify -`; - console.log(helpText); -} - -/** - * Read and process log entries - */ -async function readLogEntries(options) { - const { logFile, dictFile, full, filter, correlationId, last } = options; - - // Check if log file exists - if (!fs.existsSync(logFile)) { - console.error(`Log file not found: ${logFile}`); - process.exit(1); - } - - // Initialize deduplicator if needed - let deduplicator = null; - if (full && fs.existsSync(dictFile)) { - deduplicator = new ContentDeduplicator(dictFile); - } - - const entries = []; - - // Read log file line by line - const fileStream = fs.createReadStream(logFile); - const rl = readline.createInterface({ - input: fileStream, - crlfDelay: Infinity, - }); - - for await (const line of rl) { - if (!line.trim()) continue; - - try { - let entry = JSON.parse(line); - - // Apply filters - if (filter && entry[filter.key] !== filter.value) { - continue; - } - if (correlationId && entry.correlationId !== correlationId) { - continue; - } - - // Restore full content if requested - if (full && deduplicator) { - entry = deduplicator.restoreEntry(entry); - } - - entries.push(entry); - } catch (err) { - console.error("Malformed log entry:", err.message); - } - } - - // Apply last N filter - const output = last ? entries.slice(-last) : entries; - - // Output as JSONL - for (const entry of output) { - console.log(JSON.stringify(entry, null, 2)); - } - - return entries.length; -} - -/** - * Show deduplication statistics - */ -async function showStats(options) { - const { logFile, dictFile } = options; - - if (!fs.existsSync(logFile)) { - console.error(`Log file not found: ${logFile}`); - process.exit(1); - } - - if (!fs.existsSync(dictFile)) { - console.log("No dictionary file found. Deduplication may not be enabled."); - return; - } - - // Get file sizes - const logStats = fs.statSync(logFile); - const dictStats = fs.statSync(dictFile); - - console.log("\n=== LLM Audit Log Deduplication Statistics ===\n"); - console.log(`Log file: ${logFile}`); - console.log(` Size: ${formatBytes(logStats.size)}`); - console.log(` Lines: ${await countLines(logFile)}`); - console.log(); - console.log(`Dictionary file: ${dictFile}`); - console.log(` Size: ${formatBytes(dictStats.size)}`); - console.log(` Entries: ${await countLines(dictFile)}`); - console.log(); - console.log(`Total size: ${formatBytes(logStats.size + dictStats.size)}`); - console.log(); - - // Count reference occurrences in log - const refCount = await countReferences(logFile); - console.log(`Reference objects in log: ${refCount}`); - console.log(`Estimated space saved: ~${formatBytes(refCount * 2000)} (assuming ~2KB per deduplicated field)`); - console.log(); -} - -/** - * Verify all references can be resolved - */ -async function verifyReferences(options) { - const { logFile, dictFile } = options; - - if (!fs.existsSync(logFile)) { - console.error(`Log file not found: ${logFile}`); - process.exit(1); - } - - if (!fs.existsSync(dictFile)) { - console.log("No dictionary file found. Nothing to verify."); - return; - } - - const deduplicator = new ContentDeduplicator(dictFile); - const fileStream = fs.createReadStream(logFile); - const rl = readline.createInterface({ - input: fileStream, - crlfDelay: Infinity, - }); - - let totalRefs = 0; - let unresolvedRefs = 0; - const unresolvedHashes = new Set(); - - console.log("Verifying references...\n"); - - for await (const line of rl) { - if (!line.trim()) continue; - - try { - const entry = JSON.parse(line); - - // Check all fields for references - for (const [key, value] of Object.entries(entry)) { - if (typeof value === "object" && value !== null && value.$ref) { - totalRefs++; - const content = deduplicator.getContent(value.$ref); - if (content === null) { - unresolvedRefs++; - unresolvedHashes.add(value.$ref); - console.error(`✗ Unresolved reference: ${value.$ref} in field "${key}"`); - } - } - } - } catch (err) { - console.error("Malformed log entry:", err.message); - } - } - - console.log("\n=== Verification Results ===\n"); - console.log(`Total references: ${totalRefs}`); - console.log(`Unresolved references: ${unresolvedRefs}`); - console.log(`Unique unresolved hashes: ${unresolvedHashes.size}`); - - if (unresolvedRefs === 0) { - console.log("\n✓ All references resolved successfully!"); - } else { - console.log("\n✗ Some references could not be resolved. Dictionary may be incomplete."); - process.exit(1); - } -} - -/** - * Helper: Format bytes to human-readable string - */ -function formatBytes(bytes) { - if (bytes < 1024) return `${bytes} B`; - if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(2)} KB`; - return `${(bytes / (1024 * 1024)).toFixed(2)} MB`; -} - -/** - * Helper: Count lines in a file - */ -async function countLines(filePath) { - const fileStream = fs.createReadStream(filePath); - const rl = readline.createInterface({ - input: fileStream, - crlfDelay: Infinity, - }); - - let count = 0; - for await (const line of rl) { - if (line.trim()) count++; - } - return count; -} - -/** - * Helper: Count reference objects in log - */ -async function countReferences(filePath) { - const fileStream = fs.createReadStream(filePath); - const rl = readline.createInterface({ - input: fileStream, - crlfDelay: Infinity, - }); - - let count = 0; - for await (const line of rl) { - if (!line.trim()) continue; - try { - const entry = JSON.parse(line); - for (const value of Object.values(entry)) { - if (typeof value === "object" && value !== null && value.$ref) { - count++; - } - } - } catch { - // Skip malformed lines - } - } - return count; -} - -/** - * Main entry point - */ -async function main() { - const options = parseArgs(); - - if (options.help) { - showHelp(); - return; - } - - if (options.stats) { - await showStats(options); - } else if (options.verify) { - await verifyReferences(options); - } else { - const count = await readLogEntries(options); - console.error(`\n(Processed ${count} entries)`); - } -} - -// Run if called directly -if (require.main === module) { - main().catch((err) => { - console.error("Error:", err.message); - process.exit(1); - }); -} - -module.exports = { - readLogEntries, - showStats, - verifyReferences, -}; diff --git a/scripts/compact-dictionary.js b/scripts/compact-dictionary.js deleted file mode 100755 index d8805c2..0000000 --- a/scripts/compact-dictionary.js +++ /dev/null @@ -1,204 +0,0 @@ -#!/usr/bin/env node - -/** - * Compact LLM Audit Dictionary - * - * Removes redundant UPDATE entries from the dictionary file, keeping only: - * - One entry per hash with full content - * - Latest metadata (useCount, lastSeen) - * - * Usage: - * node scripts/compact-dictionary.js [options] - * - * Options: - * --dict-path Path to dictionary file (default: logs/llm-audit-dictionary.jsonl) - * --backup Create backup before compacting (default: true) - * --dry-run Show what would be done without making changes - * --help Show this help message - */ - -const fs = require('fs'); -const path = require('path'); -const readline = require('readline'); - -// Parse command line arguments -function parseArgs() { - const args = process.argv.slice(2); - const options = { - dictPath: 'logs/llm-audit-dictionary.jsonl', - backup: true, - dryRun: false, - }; - - for (let i = 0; i < args.length; i++) { - const arg = args[i]; - switch (arg) { - case '--dict-path': - options.dictPath = args[++i]; - break; - case '--backup': - options.backup = true; - break; - case '--no-backup': - options.backup = false; - break; - case '--dry-run': - options.dryRun = true; - break; - case '--help': - console.log(` -Compact LLM Audit Dictionary - -Removes redundant UPDATE entries from the dictionary file. - -Usage: - node scripts/compact-dictionary.js [options] - -Options: - --dict-path Path to dictionary file (default: logs/llm-audit-dictionary.jsonl) - --backup Create backup before compacting (default: true) - --no-backup Skip creating backup - --dry-run Show what would be done without making changes - --help Show this help message - -Example: - node scripts/compact-dictionary.js --dict-path logs/llm-audit-dictionary.jsonl --dry-run - `); - process.exit(0); - default: - if (arg.startsWith('--')) { - console.error(`Unknown option: ${arg}`); - process.exit(1); - } - } - } - - return options; -} - -// Read and compact dictionary -async function compactDictionary(dictPath) { - if (!fs.existsSync(dictPath)) { - throw new Error(`Dictionary file not found: ${dictPath}`); - } - - console.log(`Reading dictionary: ${dictPath}`); - - // Map: hash -> entry object - // For each hash, we'll keep the latest metadata merged with content - const entries = new Map(); - let totalLines = 0; - let malformedLines = 0; - - // Read all entries - const fileStream = fs.createReadStream(dictPath); - const rl = readline.createInterface({ - input: fileStream, - crlfDelay: Infinity, - }); - - for await (const line of rl) { - totalLines++; - if (!line.trim()) continue; - - try { - const entry = JSON.parse(line); - if (!entry.hash) { - malformedLines++; - continue; - } - - const hash = entry.hash; - - // Check if we already have an entry for this hash - if (entries.has(hash)) { - const existing = entries.get(hash); - - // Merge: keep content from entry that has it, use latest metadata - const merged = { - hash, - firstSeen: existing.firstSeen || entry.firstSeen, - useCount: entry.useCount || existing.useCount, - lastSeen: entry.lastSeen || existing.lastSeen, - content: existing.content || entry.content, - }; - - entries.set(hash, merged); - } else { - // First time seeing this hash - entries.set(hash, entry); - } - } catch (err) { - malformedLines++; - console.warn(`Skipping malformed line ${totalLines}: ${err.message}`); - } - } - - const compactedCount = entries.size; - const removedCount = totalLines - malformedLines - compactedCount; - - return { - entries, - stats: { - totalLines, - malformedLines, - uniqueHashes: compactedCount, - removedEntries: removedCount, - }, - }; -} - -// Write compacted dictionary -async function writeCompactedDictionary(dictPath, entries, backup = true) { - // Create backup if requested - if (backup) { - const backupPath = `${dictPath}.backup.${Date.now()}`; - console.log(`Creating backup: ${backupPath}`); - fs.copyFileSync(dictPath, backupPath); - } - - // Write compacted entries - console.log(`Writing compacted dictionary: ${dictPath}`); - const lines = Array.from(entries.values()).map((entry) => JSON.stringify(entry)); - fs.writeFileSync(dictPath, lines.join('\n') + '\n'); -} - -// Main -async function main() { - try { - const options = parseArgs(); - const dictPath = path.resolve(options.dictPath); - - console.log('=== LLM Audit Dictionary Compaction ===\n'); - - // Read and compact - const { entries, stats } = await compactDictionary(dictPath); - - // Report statistics - console.log('\nCompaction Statistics:'); - console.log(` Total lines in dictionary: ${stats.totalLines}`); - console.log(` Malformed lines skipped: ${stats.malformedLines}`); - console.log(` Unique content hashes: ${stats.uniqueHashes}`); - console.log(` Redundant entries removed: ${stats.removedEntries}`); - - const reductionPercent = - stats.totalLines > 0 - ? ((stats.removedEntries / stats.totalLines) * 100).toFixed(1) - : 0; - console.log(` Size reduction: ${reductionPercent}%\n`); - - if (options.dryRun) { - console.log('DRY RUN: No changes made to dictionary file.'); - console.log(`Would have written ${stats.uniqueHashes} entries.\n`); - } else { - // Write compacted dictionary - await writeCompactedDictionary(dictPath, entries, options.backup); - console.log('✓ Dictionary compaction complete!\n'); - } - } catch (err) { - console.error('Error:', err.message); - process.exit(1); - } -} - -main(); diff --git a/scripts/test-deduplication.js b/scripts/test-deduplication.js deleted file mode 100755 index c6d72bb..0000000 --- a/scripts/test-deduplication.js +++ /dev/null @@ -1,448 +0,0 @@ -#!/usr/bin/env node - -/** - * Test script for deduplication functionality - * Creates mock log entries and verifies deduplication works correctly - */ - -const fs = require("fs"); -const path = require("path"); -const { ContentDeduplicator } = require("../src/logger/deduplicator"); - -// Test configuration -const TEST_DICT_PATH = path.join(process.cwd(), "logs", "test-dictionary.jsonl"); -const TEST_LOG_PATH = path.join(process.cwd(), "logs", "test-audit.log"); - -// Clean up test files if they exist -function cleanup() { - if (fs.existsSync(TEST_DICT_PATH)) { - fs.unlinkSync(TEST_DICT_PATH); - } - if (fs.existsSync(TEST_LOG_PATH)) { - fs.unlinkSync(TEST_LOG_PATH); - } -} - -// Test 1: Basic deduplication -function testBasicDeduplication() { - console.log("\n=== Test 1: Basic Deduplication ==="); - - const deduplicator = new ContentDeduplicator(TEST_DICT_PATH, { - minSize: 50, // Lower threshold for testing - cacheSize: 10, - }); - - const content1 = "This is a test content that is longer than 50 characters and should be deduplicated."; - const content2 = "This is a test content that is longer than 50 characters and should be deduplicated."; - const content3 = "Short"; - - // First content should be stored - const ref1 = deduplicator.storeContent(content1); - console.log("✓ Stored content1:", ref1); - - // Second identical content should return same reference - const ref2 = deduplicator.storeContent(content2); - console.log("✓ Stored content2 (should be same hash):", ref2); - - // Verify same hash - if (ref1.$ref !== ref2.$ref) { - console.error("✗ FAIL: Different hashes for identical content!"); - return false; - } - console.log("✓ PASS: Identical content produces same hash"); - - // Short content should not be deduplicated (below threshold) - const shouldNotDedup = deduplicator.shouldDeduplicate(content3, 50); - if (shouldNotDedup) { - console.error("✗ FAIL: Short content should not be deduplicated!"); - return false; - } - console.log("✓ PASS: Short content not deduplicated"); - - return true; -} - -// Test 2: Content restoration -function testContentRestoration() { - console.log("\n=== Test 2: Content Restoration ==="); - - const deduplicator = new ContentDeduplicator(TEST_DICT_PATH, { - minSize: 50, - cacheSize: 10, - }); - - const originalContent = "This is original content that needs to be restored from the dictionary file."; - - // Store and get reference - const ref = deduplicator.storeContent(originalContent); - console.log("✓ Stored content with ref:", ref.$ref); - - // Retrieve content - const retrieved = deduplicator.getContent(ref.$ref); - console.log("✓ Retrieved content length:", retrieved?.length); - - // Verify content matches - if (retrieved !== originalContent) { - console.error("✗ FAIL: Retrieved content doesn't match original!"); - console.error("Expected:", originalContent); - console.error("Got:", retrieved); - return false; - } - console.log("✓ PASS: Content restored correctly"); - - return true; -} - -// Test 3: Entry deduplication and restoration -function testEntryProcessing() { - console.log("\n=== Test 3: Entry Deduplication and Restoration ==="); - - const deduplicator = new ContentDeduplicator(TEST_DICT_PATH, { - minSize: 50, - cacheSize: 10, - }); - - const systemPrompt = "You are a helpful AI assistant. This is a long system prompt that should be deduplicated."; - const userMessage = "This is a user message that is long enough to be deduplicated by the deduplication system."; - - const entry = { - type: "llm_request", - correlationId: "test-123", - systemPrompt: systemPrompt, - userMessages: userMessage, - model: "test-model", - }; - - // Deduplicate entry - const deduplicated = deduplicator.deduplicateEntry(entry, ["systemPrompt", "userMessages"]); - console.log("✓ Deduplicated entry:", JSON.stringify(deduplicated, null, 2)); - - // Verify fields are now references - if (typeof deduplicated.systemPrompt !== "object" || !deduplicated.systemPrompt.$ref) { - console.error("✗ FAIL: systemPrompt was not deduplicated!"); - return false; - } - if (typeof deduplicated.userMessages !== "object" || !deduplicated.userMessages.$ref) { - console.error("✗ FAIL: userMessages was not deduplicated!"); - return false; - } - console.log("✓ PASS: Fields converted to references"); - - // Restore entry - const restored = deduplicator.restoreEntry(deduplicated); - console.log("✓ Restored entry keys:", Object.keys(restored)); - - // Verify restoration - if (restored.systemPrompt !== systemPrompt) { - console.error("✗ FAIL: systemPrompt not restored correctly!"); - console.error("Expected:", systemPrompt); - console.error("Got:", restored.systemPrompt); - return false; - } - if (restored.userMessages !== userMessage) { - console.error("✗ FAIL: userMessages not restored correctly!"); - console.error("Expected:", userMessage); - console.error("Got:", restored.userMessages); - return false; - } - console.log("✓ PASS: Entry restored correctly"); - - return true; -} - -// Test 4: Dictionary persistence -function testDictionaryPersistence() { - console.log("\n=== Test 4: Dictionary Persistence ==="); - - // Create first deduplicator and store content - const deduplicator1 = new ContentDeduplicator(TEST_DICT_PATH, { - minSize: 50, - cacheSize: 10, - }); - - const content = "This is test content for persistence verification across deduplicator instances."; - const ref = deduplicator1.storeContent(content); - console.log("✓ Stored content with first deduplicator:", ref.$ref); - - // Wait for async write to complete - setTimeout(() => { - // Create second deduplicator (should load from dictionary) - const deduplicator2 = new ContentDeduplicator(TEST_DICT_PATH, { - minSize: 50, - cacheSize: 10, - }); - - // Try to retrieve with second deduplicator - const retrieved = deduplicator2.getContent(ref.$ref); - - if (retrieved !== content) { - console.error("✗ FAIL: Content not persisted to dictionary!"); - console.error("Expected:", content); - console.error("Got:", retrieved); - return false; - } - console.log("✓ PASS: Dictionary persisted and loaded correctly"); - - // Show dictionary stats - const stats = deduplicator2.getStats(); - console.log("\nDeduplication Stats:"); - console.log(` Cache size: ${stats.cacheSize}`); - console.log(` Unique blocks: ${stats.uniqueContentBlocks}`); - console.log(` Total references: ${stats.totalReferences}`); - - return true; - }, 100); -} - -// Test 5: Size calculation and verification -function testSizeCalculation() { - console.log("\n=== Test 5: Size Calculation ==="); - - const deduplicator = new ContentDeduplicator(TEST_DICT_PATH, { - minSize: 50, - cacheSize: 10, - }); - - const content = "This is a test content string that will be deduplicated and have its size calculated."; - const ref = deduplicator.storeContent(content); - - console.log("✓ Content length:", content.length); - console.log("✓ Reference size field:", ref.size); - - if (ref.size !== content.length) { - console.error("✗ FAIL: Size mismatch!"); - return false; - } - console.log("✓ PASS: Size calculated correctly"); - - // Calculate space saved - const refSize = JSON.stringify(ref).length; - const originalSize = content.length; - const saved = originalSize - refSize; - const savedPercent = ((saved / originalSize) * 100).toFixed(1); - - console.log(`\nSpace saved: ${saved} bytes (${savedPercent}%)`); - console.log(` Original: ${originalSize} bytes`); - console.log(` Reference: ${refSize} bytes`); - - return true; -} - -// Test 6: Content sanitization (empty User: entries removal) -function testContentSanitization() { - console.log("\n=== Test 6: Content Sanitization (Empty User: Removal) ==="); - - const deduplicator = new ContentDeduplicator(TEST_DICT_PATH, { - minSize: 50, - cacheSize: 10, - sanitize: true, // Enable sanitization - }); - - // Content with multiple empty "User:" entries - const dirtyContent = `Claude: I'll implement... - -User: - -Claude: Now I'll implement... - -User: - -User: - -User: - -Respond with the title for the conversation and nothing else.`; - - console.log("✓ Original content length:", dirtyContent.length); - console.log("✓ Empty 'User:' entries in original:", (dirtyContent.match(/User:\s*\n/g) || []).length); - - // Store the content (should be sanitized internally) - const ref = deduplicator.storeContent(dirtyContent); - console.log("✓ Stored content with ref:", ref.$ref); - - // Retrieve it back - const retrieved = deduplicator.getContent(ref.$ref); - console.log("✓ Retrieved content length:", retrieved?.length); - - // Count empty "User:" entries in retrieved content - // Pattern: "User:" followed by newline(s) and then "Claude:" or another "User:" or end - const emptyUserMatches = retrieved.match(/User:\s*\n+(?=(Claude:|User:|$))/g) || []; - console.log("✓ Empty 'User:' entries in retrieved:", emptyUserMatches.length); - - // Verify empty User: entries were removed - if (emptyUserMatches.length > 0) { - console.error("✗ FAIL: Empty User: entries not removed!"); - console.error("Retrieved content:", retrieved); - return false; - } - - // Verify content still contains Claude: entries - if (!retrieved.includes("Claude:")) { - console.error("✗ FAIL: Claude: entries were incorrectly removed!"); - return false; - } - - // Verify the last line is preserved - if (!retrieved.includes("Respond with the title")) { - console.error("✗ FAIL: Content was over-sanitized!"); - return false; - } - - console.log("✓ PASS: Empty User: entries removed, content preserved"); - - // Test with sanitization disabled - const dedupNoSanitize = new ContentDeduplicator(TEST_DICT_PATH, { - minSize: 50, - cacheSize: 10, - sanitize: false, // Disable sanitization - }); - - const refNoSanitize = dedupNoSanitize.storeContent(dirtyContent); - const retrievedNoSanitize = dedupNoSanitize.getContent(refNoSanitize.$ref); - - // Should have empty User: entries when sanitization is disabled - const emptyUserNoSanitize = retrievedNoSanitize.match(/User:\s*\n+(?=(Claude:|User:|$))/g) || []; - if (emptyUserNoSanitize.length === 0) { - console.error("✗ FAIL: Content was sanitized even with sanitize=false!"); - return false; - } - - console.log("✓ PASS: Sanitization can be disabled"); - - return true; -} - -// Test 7: Content sanitization preserves non-empty User: entries -function testSanitizationPreservesContent() { - console.log("\n=== Test 7: Sanitization Preserves Non-Empty User: Entries ==="); - - const deduplicator = new ContentDeduplicator(TEST_DICT_PATH, { - minSize: 50, - cacheSize: 10, - sanitize: true, - }); - - // Content with both empty and non-empty User: entries - const mixedContent = `Claude: I'll help you. - -User: Can you explain this? - -Claude: Sure, here's the explanation. - -User: - -User: Another question here. - -Claude: Here's the answer.`; - - console.log("✓ Original has both empty and non-empty User: entries"); - - const ref = deduplicator.storeContent(mixedContent); - const retrieved = deduplicator.getContent(ref.$ref); - - // Check that non-empty User: entries are preserved - if (!retrieved.includes("User: Can you explain this?")) { - console.error("✗ FAIL: Non-empty User: entry was removed!"); - return false; - } - - if (!retrieved.includes("User: Another question here.")) { - console.error("✗ FAIL: Non-empty User: entry was removed!"); - return false; - } - - // Check that empty User: entries are removed - const lines = retrieved.split('\n'); - let hasEmptyUser = false; - for (let i = 0; i < lines.length; i++) { - const line = lines[i].trim(); - if (line === 'User:' || line === 'User: ') { - const nextLine = i + 1 < lines.length ? lines[i + 1].trim() : ''; - if (nextLine === '' || nextLine === 'Claude:' || nextLine === 'User:') { - hasEmptyUser = true; - break; - } - } - } - - if (hasEmptyUser) { - console.error("✗ FAIL: Empty User: entries not removed from mixed content!"); - return false; - } - - console.log("✓ PASS: Non-empty User: entries preserved, empty ones removed"); - - return true; -} - -// Main test runner -async function runTests() { - console.log("=".repeat(60)); - console.log("LLM Audit Log Deduplication Test Suite"); - console.log("=".repeat(60)); - - // Clean up before tests - cleanup(); - - const tests = [ - testBasicDeduplication, - testContentRestoration, - testEntryProcessing, - testSizeCalculation, - testContentSanitization, - testSanitizationPreservesContent, - ]; - - let passed = 0; - let failed = 0; - - for (const test of tests) { - try { - const result = test(); - if (result) { - passed++; - } else { - failed++; - } - } catch (err) { - console.error(`✗ Test failed with error: ${err.message}`); - console.error(err.stack); - failed++; - } - } - - // Run async test separately - setTimeout(() => { - testDictionaryPersistence(); - - console.log("\n" + "=".repeat(60)); - console.log("Test Results"); - console.log("=".repeat(60)); - console.log(`Passed: ${passed}/${passed + failed}`); - console.log(`Failed: ${failed}/${passed + failed}`); - - if (failed === 0) { - console.log("\n✓ All tests passed!"); - console.log("\nDictionary file created at:", TEST_DICT_PATH); - console.log("You can inspect it with: cat", TEST_DICT_PATH); - } else { - console.log("\n✗ Some tests failed!"); - process.exit(1); - } - - // Clean up after tests - console.log("\nCleaning up test files..."); - cleanup(); - console.log("✓ Test files removed"); - }, 200); -} - -// Run tests -if (require.main === module) { - runTests().catch((err) => { - console.error("Test suite failed:", err); - process.exit(1); - }); -} - -module.exports = { runTests }; diff --git a/src/api/middleware/session.js b/src/api/middleware/session.js index f19c58a..84103ec 100644 --- a/src/api/middleware/session.js +++ b/src/api/middleware/session.js @@ -1,6 +1,5 @@ const crypto = require("crypto"); const { getOrCreateSession } = require("../../sessions/store"); -const logger = require("../../logger"); const PRIMARY_HEADER = "x-session-id"; const FALLBACK_HEADERS = [ @@ -42,9 +41,6 @@ function sessionMiddleware(req, res, next) { const sessionId = extractSessionId(req); req.sessionId = sessionId; - // Add sessionId to logger context for this request - req.log = logger.child({ sessionId }); - const session = getOrCreateSession(sessionId); req.session = session; return next(); diff --git a/src/config/index.js b/src/config/index.js index 8bf97af..dbe3df6 100644 --- a/src/config/index.js +++ b/src/config/index.js @@ -89,114 +89,6 @@ const ollamaTimeout = Number.parseInt(process.env.OLLAMA_TIMEOUT_MS ?? "120000", const ollamaEmbeddingsEndpoint = process.env.OLLAMA_EMBEDDINGS_ENDPOINT ?? `${ollamaEndpoint}/api/embeddings`; const ollamaEmbeddingsModel = process.env.OLLAMA_EMBEDDINGS_MODEL ?? "nomic-embed-text"; -// Ollama cluster configuration -function loadOllamaClusterConfig() { - const configPath = process.env.OLLAMA_CLUSTER_CONFIG - ?? path.join(process.cwd(), ".lynkr", "ollama-cluster.json"); - - try { - const fs = require("fs"); - if (!fs.existsSync(configPath)) { - return null; // No cluster config, use single-host mode - } - - const configFile = fs.readFileSync(configPath, "utf8"); - const config = JSON.parse(configFile); - - // Validate configuration - if (!config.enabled) { - return null; // Cluster disabled - } - - const errors = validateOllamaClusterConfig(config); - if (errors.length > 0) { - throw new Error(`Ollama cluster configuration errors:\n${errors.join("\n")}`); - } - - return config; - } catch (err) { - if (err.code === "ENOENT") { - return null; // File doesn't exist, use single-host mode - } - throw new Error(`Failed to load Ollama cluster config from ${configPath}: ${err.message}`); - } -} - -function validateOllamaClusterConfig(config) { - const errors = []; - - if (!config.hosts || !Array.isArray(config.hosts) || config.hosts.length === 0) { - errors.push("At least one Ollama host must be configured in 'hosts' array"); - return errors; // Return early if no hosts - } - - const seenIds = new Set(); - const seenEndpoints = new Set(); - - config.hosts.forEach((host, idx) => { - // Check required fields - if (!host.endpoint) { - errors.push(`Host ${idx}: 'endpoint' is required`); - } else { - // Check URL format - try { - new URL(host.endpoint); - } catch { - errors.push(`Host ${idx}: invalid endpoint URL "${host.endpoint}"`); - } - - // Check for duplicate endpoints - if (seenEndpoints.has(host.endpoint)) { - errors.push(`Host ${idx}: duplicate endpoint "${host.endpoint}"`); - } - seenEndpoints.add(host.endpoint); - } - - // Check ID - if (!host.id) { - errors.push(`Host ${idx}: 'id' is required`); - } else if (seenIds.has(host.id)) { - errors.push(`Host ${idx}: duplicate ID "${host.id}"`); - } else { - seenIds.add(host.id); - } - - // Validate ranges - if (host.weight !== undefined && host.weight < 1) { - errors.push(`Host ${host.id || idx}: weight must be >= 1 (got ${host.weight})`); - } - - if (host.maxConcurrent !== undefined && host.maxConcurrent < 1) { - errors.push(`Host ${host.id || idx}: maxConcurrent must be >= 1 (got ${host.maxConcurrent})`); - } - - if (host.timeout !== undefined && host.timeout < 1000) { - errors.push(`Host ${host.id || idx}: timeout must be >= 1000ms (got ${host.timeout})`); - } - }); - - // Validate load balancing strategy - const validStrategies = [ - "round-robin", - "weighted-round-robin", - "least-connections", - "response-time-weighted", - "model-based", - "random" - ]; - - if (config.loadBalancing && !validStrategies.includes(config.loadBalancing)) { - errors.push( - `Invalid loadBalancing strategy "${config.loadBalancing}". ` + - `Must be one of: ${validStrategies.join(", ")}` - ); - } - - return errors; -} - -const ollamaClusterConfig = loadOllamaClusterConfig(); - // OpenRouter configuration const openRouterApiKey = process.env.OPENROUTER_API_KEY ?? null; const openRouterModel = process.env.OPENROUTER_MODEL ?? "openai/gpt-4o-mini"; @@ -219,8 +111,8 @@ const openAIOrganization = process.env.OPENAI_ORGANIZATION?.trim() || null; const llamacppEndpoint = process.env.LLAMACPP_ENDPOINT?.trim() || "http://localhost:8080"; const llamacppModel = process.env.LLAMACPP_MODEL?.trim() || "default"; const llamacppTimeout = Number.parseInt(process.env.LLAMACPP_TIMEOUT_MS ?? "120000", 10); -const llamacppEmbeddingsEndpoint = process.env.LLAMACPP_EMBEDDINGS_ENDPOINT?.trim() || `${llamacppEndpoint}/embeddings`; const llamacppApiKey = process.env.LLAMACPP_API_KEY?.trim() || null; +const llamacppEmbeddingsEndpoint = process.env.LLAMACPP_EMBEDDINGS_ENDPOINT?.trim() || `${llamacppEndpoint}/embeddings`; // LM Studio configuration const lmstudioEndpoint = process.env.LMSTUDIO_ENDPOINT?.trim() || "http://localhost:1234"; @@ -242,6 +134,10 @@ const zaiModel = process.env.ZAI_MODEL?.trim() || "GLM-4.7"; const vertexApiKey = process.env.VERTEX_API_KEY?.trim() || process.env.GOOGLE_API_KEY?.trim() || null; const vertexModel = process.env.VERTEX_MODEL?.trim() || "gemini-2.0-flash"; +// Hot reload configuration +const hotReloadEnabled = process.env.HOT_RELOAD_ENABLED !== "false"; // default true +const hotReloadDebounceMs = Number.parseInt(process.env.HOT_RELOAD_DEBOUNCE_MS ?? "1000", 10); + // Hybrid routing configuration const preferOllama = process.env.PREFER_OLLAMA === "true"; const fallbackEnabled = process.env.FALLBACK_ENABLED !== "false"; // default true @@ -267,61 +163,14 @@ if (!SUPPORTED_MODEL_PROVIDERS.has(rawFallbackProvider)) { const fallbackProvider = rawFallbackProvider; -// Tool execution mode: server (default), client, passthrough, local, or synthetic +// Tool execution mode: server (default), client, or passthrough const toolExecutionMode = (process.env.TOOL_EXECUTION_MODE ?? "server").toLowerCase(); -if (!["server", "client", "passthrough", "local", "synthetic"].includes(toolExecutionMode)) { +if (!["server", "client", "passthrough"].includes(toolExecutionMode)) { throw new Error( - "TOOL_EXECUTION_MODE must be one of: server, client, passthrough, local, synthetic (default: server)" + "TOOL_EXECUTION_MODE must be one of: server, client, passthrough (default: server)" ); } -// Pattern B Configuration (Lynkr on localhost) -const deploymentMode = (process.env.DEPLOYMENT_MODE ?? "pattern-a").toLowerCase(); -const patternBEnabled = deploymentMode === "pattern-b" || toolExecutionMode === "local" || toolExecutionMode === "synthetic"; - -// Local tool execution settings (Pattern B) -const localToolsEnabled = toolExecutionMode === "local" || toolExecutionMode === "synthetic"; -const localToolsAllowedOperations = parseList( - process.env.LOCAL_TOOLS_ALLOWED_OPERATIONS ?? "readFile,writeFile,listDirectory,executeCommand,searchCode" -); -const localToolsRestrictedPaths = parseList( - process.env.LOCAL_TOOLS_RESTRICTED_PATHS ?? "/etc,/sys,/proc,/root,~/.ssh,~/.gnupg" -); -const localToolsMaxFileSize = Number.parseInt( - process.env.LOCAL_TOOLS_MAX_FILE_SIZE ?? "10485760", // 10MB default - 10 -); -const localToolsCommandTimeout = Number.parseInt( - process.env.LOCAL_TOOLS_COMMAND_TIMEOUT ?? "30000", // 30s default - 10 -); - -// Synthetic mode settings (Pattern B) -const syntheticModeEnabled = toolExecutionMode === "synthetic"; -const syntheticModePatterns = process.env.SYNTHETIC_MODE_PATTERNS - ? parseJson(process.env.SYNTHETIC_MODE_PATTERNS) - : { - readFile: ["read.*file", "show.*contents", "display.*file"], - writeFile: ["write.*to.*file", "save.*to.*file", "create.*file"], - listDirectory: ["list.*files", "show.*directory", "ls.*"], - executeCommand: ["run.*command", "execute.*"], - searchCode: ["search.*for", "find.*in.*files", "grep.*"] - }; - -// Remote Ollama configuration (Pattern B) -// When Lynkr runs on localhost but Ollama runs on GPU server -const remoteOllamaEnabled = patternBEnabled; -const remoteOllamaEndpoint = process.env.REMOTE_OLLAMA_ENDPOINT ?? ollamaEndpoint; -const remoteOllamaConnectionPooling = process.env.REMOTE_OLLAMA_CONNECTION_POOLING !== "false"; // default true -const remoteOllamaMaxConnections = Number.parseInt( - process.env.REMOTE_OLLAMA_MAX_CONNECTIONS ?? "10", - 10 -); -const remoteOllamaTimeout = Number.parseInt( - process.env.REMOTE_OLLAMA_TIMEOUT ?? "300000", // 5 minutes - 10 -); - // Memory system configuration (Titans-inspired long-term memory) const memoryEnabled = process.env.MEMORY_ENABLED !== "false"; // default true const memoryRetrievalLimit = Number.parseInt(process.env.MEMORY_RETRIEVAL_LIMIT ?? "5", 10); @@ -603,33 +452,6 @@ const agentsDefaultModel = process.env.AGENTS_DEFAULT_MODEL ?? "haiku"; const agentsMaxSteps = Number.parseInt(process.env.AGENTS_MAX_STEPS ?? "15", 10); const agentsTimeout = Number.parseInt(process.env.AGENTS_TIMEOUT ?? "120000", 10); -// LLM Audit logging configuration -const auditEnabled = process.env.LLM_AUDIT_ENABLED === "true"; // default false -const auditLogFile = process.env.LLM_AUDIT_LOG_FILE ?? path.join(process.cwd(), "logs", "llm-audit.log"); -const auditMaxContentLength = Number.parseInt(process.env.LLM_AUDIT_MAX_CONTENT_LENGTH ?? "5000", 10); // Legacy fallback -const auditMaxSystemLength = Number.parseInt(process.env.LLM_AUDIT_MAX_SYSTEM_LENGTH ?? "2000", 10); -const auditMaxUserLength = Number.parseInt(process.env.LLM_AUDIT_MAX_USER_LENGTH ?? "3000", 10); -const auditMaxResponseLength = Number.parseInt(process.env.LLM_AUDIT_MAX_RESPONSE_LENGTH ?? "3000", 10); -const auditMaxFiles = Number.parseInt(process.env.LLM_AUDIT_MAX_FILES ?? "30", 10); -const auditMaxSize = process.env.LLM_AUDIT_MAX_SIZE ?? "100M"; -const auditAnnotations = process.env.LLM_AUDIT_ANNOTATIONS !== "false"; // default true - -// LLM Audit deduplication configuration -const auditDeduplicationEnabled = process.env.LLM_AUDIT_DEDUP_ENABLED !== "false"; // default true -const auditDeduplicationDictPath = - process.env.LLM_AUDIT_DEDUP_DICT_PATH ?? path.join(process.cwd(), "logs", "llm-audit-dictionary.jsonl"); -const auditDeduplicationMinSize = Number.parseInt(process.env.LLM_AUDIT_DEDUP_MIN_SIZE ?? "500", 10); -const auditDeduplicationCacheSize = Number.parseInt(process.env.LLM_AUDIT_DEDUP_CACHE_SIZE ?? "100", 10); -const auditDeduplicationSanitize = process.env.LLM_AUDIT_DEDUP_SANITIZE !== "false"; // default true -const auditDeduplicationSessionCache = process.env.LLM_AUDIT_DEDUP_SESSION_CACHE !== "false"; // default true - -// Oversized Error Logging Configuration -const oversizedErrorLoggingEnabled = process.env.OVERSIZED_ERROR_LOGGING_ENABLED !== "false"; // default true -const oversizedErrorThreshold = Number.parseInt(process.env.OVERSIZED_ERROR_THRESHOLD ?? "200", 10); -const oversizedErrorLogDir = - process.env.OVERSIZED_ERROR_LOG_DIR ?? path.join(process.cwd(), "logs", "oversized-errors"); -const oversizedErrorMaxFiles = Number.parseInt(process.env.OVERSIZED_ERROR_MAX_FILES ?? "100", 10); - const config = { env: process.env.NODE_ENV ?? "development", port: Number.isNaN(port) ? 8080 : port, @@ -650,7 +472,6 @@ const config = { timeout: Number.isNaN(ollamaTimeout) ? 120000 : ollamaTimeout, embeddingsEndpoint: ollamaEmbeddingsEndpoint, embeddingsModel: ollamaEmbeddingsModel, - cluster: ollamaClusterConfig, // null if cluster not configured }, openrouter: { apiKey: openRouterApiKey, @@ -674,8 +495,8 @@ const config = { endpoint: llamacppEndpoint, model: llamacppModel, timeout: Number.isNaN(llamacppTimeout) ? 120000 : llamacppTimeout, - embeddingsEndpoint: llamacppEmbeddingsEndpoint, apiKey: llamacppApiKey, + embeddingsEndpoint: llamacppEmbeddingsEndpoint, }, lmstudio: { endpoint: lmstudioEndpoint, @@ -697,6 +518,10 @@ const config = { apiKey: vertexApiKey, model: vertexModel, }, + hotReload: { + enabled: hotReloadEnabled, + debounceMs: Number.isNaN(hotReloadDebounceMs) ? 1000 : hotReloadDebounceMs, + }, modelProvider: { type: modelProvider, defaultModel, @@ -708,105 +533,6 @@ const config = { fallbackProvider, }, toolExecutionMode, - patternB: { - enabled: patternBEnabled, - deploymentMode, - localTools: { - enabled: localToolsEnabled, - allowedOperations: localToolsAllowedOperations, - restrictedPaths: localToolsRestrictedPaths, - maxFileSize: localToolsMaxFileSize, - commandTimeout: localToolsCommandTimeout, - }, - syntheticMode: { - enabled: syntheticModeEnabled, - patterns: syntheticModePatterns, - }, - remoteOllama: { - enabled: remoteOllamaEnabled, - endpoint: remoteOllamaEndpoint, - connectionPooling: remoteOllamaConnectionPooling, - maxConnections: remoteOllamaMaxConnections, - timeout: remoteOllamaTimeout, - }, - }, - gpuDiscovery: { - enabled: process.env.GPU_DISCOVERY_ENABLED !== "false", // default true - probe_on_startup: process.env.GPU_DISCOVERY_PROBE_ON_STARTUP !== "false", // default true - probe_local: process.env.GPU_DISCOVERY_PROBE_LOCAL !== "false", // default true - health_check_interval_seconds: Number.parseInt( - process.env.GPU_DISCOVERY_HEALTH_CHECK_INTERVAL ?? "300", // 5 minutes - 10 - ), - nvidia_smi_timeout_ms: Number.parseInt( - process.env.GPU_DISCOVERY_NVIDIA_SMI_TIMEOUT ?? "5000", - 10 - ), - cache_path: process.env.GPU_DISCOVERY_CACHE_PATH || "/tmp/lynkr_gpu_inventory.json", - detection_method: (process.env.GPU_DISCOVERY_METHOD ?? "auto").toLowerCase(), // "auto" | "ollama-api" | "nvidia-smi" - ssh_config: { - enabled: process.env.GPU_DISCOVERY_SSH_ENABLED === "true", - user: process.env.GPU_DISCOVERY_SSH_USER || null, - key_path: process.env.GPU_DISCOVERY_SSH_KEY_PATH || "~/.ssh/id_rsa", - }, - }, - gpuOrchestration: { - enabled: process.env.GPU_ORCHESTRATION_ENABLED !== "false", // default true - auto_profile_new_models: process.env.GPU_ORCHESTRATION_AUTO_PROFILE !== "false", // default true - model_profiles_path: process.env.GPU_ORCHESTRATION_PROFILES_PATH || "/tmp/ollama_model_profiles.json", - }, - taskModels: { - // User can override via ~/.lynkr/config.json or environment - // Format: TASK_MODELS__PRIMARY, TASK_MODELS__FALLBACK - planning: { - primary: process.env.TASK_MODELS_PLANNING_PRIMARY || "qwen2.5:14b", - fallback: process.env.TASK_MODELS_PLANNING_FALLBACK || "llama3.1:8b", - }, - coding: { - primary: process.env.TASK_MODELS_CODING_PRIMARY || "qwen2.5-coder:32b", - fallback: process.env.TASK_MODELS_CODING_FALLBACK || "qwen2.5-coder:7b", - }, - debugging: { - primary: process.env.TASK_MODELS_DEBUGGING_PRIMARY || "deepseek-coder-v2:16b", - fallback: process.env.TASK_MODELS_DEBUGGING_FALLBACK || "llama3.1:8b", - }, - refactoring: { - primary: process.env.TASK_MODELS_REFACTORING_PRIMARY || "qwen2.5-coder:32b", - fallback: process.env.TASK_MODELS_REFACTORING_FALLBACK || "qwen2.5:14b", - }, - documentation: { - primary: process.env.TASK_MODELS_DOCUMENTATION_PRIMARY || "llama3.2:11b", - fallback: process.env.TASK_MODELS_DOCUMENTATION_FALLBACK || "llama3.1:8b", - }, - testing: { - primary: process.env.TASK_MODELS_TESTING_PRIMARY || "qwen2.5-coder:14b", - fallback: process.env.TASK_MODELS_TESTING_FALLBACK || "llama3.1:8b", - }, - review: { - primary: process.env.TASK_MODELS_REVIEW_PRIMARY || "qwen2.5:14b", - fallback: process.env.TASK_MODELS_REVIEW_FALLBACK || "llama3.1:8b", - }, - analysis: { - primary: process.env.TASK_MODELS_ANALYSIS_PRIMARY || "qwen2.5:14b", - fallback: process.env.TASK_MODELS_ANALYSIS_FALLBACK || "llama3.1:8b", - }, - general: { - primary: process.env.TASK_MODELS_GENERAL_PRIMARY || "llama3.1:8b", - fallback: process.env.TASK_MODELS_GENERAL_FALLBACK || "llama3.1:8b", - }, - }, - palMcp: { - enabled: process.env.PAL_MCP_ENABLED === "true", - serverPath: process.env.PAL_MCP_SERVER_PATH || path.join(__dirname, "../../external/pal-mcp-server"), - pythonPath: process.env.PAL_MCP_PYTHON_PATH || "python3", - autoStart: process.env.PAL_MCP_AUTO_START !== "false", // default true if enabled - // Orchestration settings (for avoiding expensive cloud fallback) - useForComplexRequests: process.env.PAL_MCP_USE_FOR_COMPLEX_REQUESTS !== "false", // default true if enabled - maxToolsBeforeOrchestration: Number.parseInt( - process.env.PAL_MCP_MAX_TOOLS_BEFORE_ORCHESTRATION ?? "3", - 10 - ), - }, server: { jsonLimit: process.env.REQUEST_JSON_LIMIT ?? "1gb", }, @@ -962,48 +688,48 @@ const config = { tokenBudget: smartToolSelectionTokenBudget, minimalMode: false, // HARDCODED - disabled }, - security: { - // Content filtering - contentFilterEnabled: process.env.SECURITY_CONTENT_FILTER_ENABLED !== "false", // default true - blockOnDetection: process.env.SECURITY_BLOCK_ON_DETECTION !== "false", // default true - - // Rate limiting - rateLimitEnabled: process.env.SECURITY_RATE_LIMIT_ENABLED !== "false", // default true - perIpLimit: Number.parseInt(process.env.SECURITY_PER_IP_LIMIT ?? "100", 10), // requests per minute - perEndpointLimit: Number.parseInt(process.env.SECURITY_PER_ENDPOINT_LIMIT ?? "1000", 10), // requests per minute - - // Audit logging - auditLogEnabled: process.env.SECURITY_AUDIT_LOG_ENABLED !== "false", // default true - auditLogDir: process.env.SECURITY_AUDIT_LOG_DIR ?? path.join(process.cwd(), "logs"), - }, - audit: { - enabled: auditEnabled, - logFile: auditLogFile, - maxContentLength: { - systemPrompt: Number.isNaN(auditMaxSystemLength) ? 2000 : auditMaxSystemLength, - userMessages: Number.isNaN(auditMaxUserLength) ? 3000 : auditMaxUserLength, - response: Number.isNaN(auditMaxResponseLength) ? 3000 : auditMaxResponseLength, - }, - annotations: auditAnnotations, - rotation: { - maxFiles: Number.isNaN(auditMaxFiles) ? 30 : auditMaxFiles, - maxSize: auditMaxSize, - }, - deduplication: { - enabled: auditDeduplicationEnabled, - dictionaryPath: auditDeduplicationDictPath, - minSize: Number.isNaN(auditDeduplicationMinSize) ? 500 : auditDeduplicationMinSize, - cacheSize: Number.isNaN(auditDeduplicationCacheSize) ? 100 : auditDeduplicationCacheSize, - sanitize: auditDeduplicationSanitize, - sessionCache: auditDeduplicationSessionCache, - }, - }, - oversizedErrorLogging: { - enabled: oversizedErrorLoggingEnabled, - threshold: oversizedErrorThreshold, - logDir: oversizedErrorLogDir, - maxFiles: oversizedErrorMaxFiles, - }, }; +/** + * Reload configuration from environment + * Called by hot reload watcher when .env changes + */ +function reloadConfig() { + // Re-parse .env file + dotenv.config({ override: true }); + + // Update mutable config values (those that can safely change at runtime) + // API keys and endpoints + config.databricks.apiKey = process.env.DATABRICKS_API_KEY; + config.azureAnthropic.apiKey = process.env.AZURE_ANTHROPIC_API_KEY ?? null; + config.ollama.model = process.env.OLLAMA_MODEL ?? "qwen2.5-coder:7b"; + config.openrouter.apiKey = process.env.OPENROUTER_API_KEY ?? null; + config.openrouter.model = process.env.OPENROUTER_MODEL ?? "openai/gpt-4o-mini"; + config.azureOpenAI.apiKey = process.env.AZURE_OPENAI_API_KEY?.trim() || null; + config.openai.apiKey = process.env.OPENAI_API_KEY?.trim() || null; + config.bedrock.apiKey = process.env.AWS_BEDROCK_API_KEY?.trim() || null; + config.zai.apiKey = process.env.ZAI_API_KEY?.trim() || null; + config.zai.model = process.env.ZAI_MODEL?.trim() || "GLM-4.7"; + config.vertex.apiKey = process.env.VERTEX_API_KEY?.trim() || process.env.GOOGLE_API_KEY?.trim() || null; + config.vertex.model = process.env.VERTEX_MODEL?.trim() || "gemini-2.0-flash"; + + // Model provider settings + const newProvider = (process.env.MODEL_PROVIDER ?? "databricks").toLowerCase(); + if (SUPPORTED_MODEL_PROVIDERS.has(newProvider)) { + config.modelProvider.type = newProvider; + } + config.modelProvider.preferOllama = process.env.PREFER_OLLAMA === "true"; + config.modelProvider.fallbackEnabled = process.env.FALLBACK_ENABLED !== "false"; + config.modelProvider.fallbackProvider = (process.env.FALLBACK_PROVIDER ?? "databricks").toLowerCase(); + + // Log level + config.logger.level = process.env.LOG_LEVEL ?? "info"; + + console.log("[CONFIG] Configuration reloaded from environment"); + return config; +} + +// Make config mutable for hot reload +config.reloadConfig = reloadConfig; + module.exports = config; diff --git a/src/logger/audit-logger.js b/src/logger/audit-logger.js deleted file mode 100644 index 56b2830..0000000 --- a/src/logger/audit-logger.js +++ /dev/null @@ -1,609 +0,0 @@ -const pino = require("pino"); -const path = require("path"); -const fs = require("fs"); -const { ContentDeduplicator } = require("./deduplicator"); - -/** - * LLM Audit Logger - * - * Dedicated logger for capturing LLM request/response audit trails. - * Logs to a separate file for easy parsing, searching, and compliance. - * - * Log Entry Types: - * - llm_request: User messages sent to LLM providers - * - llm_response: LLM responses received from providers - * - * Key Features: - * - Separate log file (llm-audit.log) for easy parsing - * - Correlation IDs to link requests with responses - * - Network destination tracking (IP, hostname, URL) - * - Content truncation to control log size - * - Async writes for minimal latency impact - * - Daily log rotation with configurable retention - */ - -/** - * Create audit logger instance - * @param {Object} config - Audit configuration - * @returns {Object} Pino logger instance - */ -function createAuditLogger(config) { - // Ensure log directory exists - const logDir = path.dirname(config.logFile); - if (!fs.existsSync(logDir)) { - fs.mkdirSync(logDir, { recursive: true }); - } - - // Create dedicated pino instance for audit logs - const auditLogger = pino( - { - level: "info", // Always log at info level for compliance - name: "llm-audit", - base: null, // Don't include pid/hostname to keep logs clean - timestamp: pino.stdTimeFunctions.isoTime, - formatters: { - level: (label) => { - return { level: label }; - }, - }, - }, - pino.destination({ - dest: config.logFile, - sync: false, // Async writes for performance - mkdir: true, - }) - ); - - return auditLogger; -} - -/** - * Truncate content if it exceeds max length - * @param {string|Array|Object} content - Content to truncate - * @param {number} maxLength - Maximum length (0 = no truncation) - * @returns {Object} { content, truncated, originalLength } - */ -function truncateContent(content, maxLength) { - if (maxLength === 0) { - return { content, truncated: false, originalLength: null }; - } - - // Handle different content types - let stringContent; - if (typeof content === "string") { - stringContent = content; - } else if (Array.isArray(content)) { - stringContent = JSON.stringify(content); - } else if (typeof content === "object" && content !== null) { - stringContent = JSON.stringify(content); - } else { - return { content, truncated: false, originalLength: null }; - } - - const originalLength = stringContent.length; - - if (originalLength <= maxLength) { - return { content, truncated: false, originalLength }; - } - - // Truncate and add indicator - const truncated = stringContent.substring(0, maxLength); - const indicator = `... [truncated, ${originalLength - maxLength} chars omitted]`; - - // Try to parse back to original type if it was JSON - if (typeof content !== "string") { - try { - return { - content: truncated + indicator, - truncated: true, - originalLength, - }; - } catch { - return { - content: truncated + indicator, - truncated: true, - originalLength, - }; - } - } - - return { - content: truncated + indicator, - truncated: true, - originalLength, - }; -} - -/** - * Hash and truncate content for audit logging - * Hashes the ORIGINAL content before truncation to preserve full content hash - * @param {string|Array|Object} content - Content to hash and truncate - * @param {number} maxLength - Maximum length for truncation (0 = no truncation) - * @param {ContentDeduplicator} deduplicator - Deduplicator instance for hashing - * @returns {Object} { hash, content, truncated, originalLength } - */ -function hashAndTruncate(content, maxLength, deduplicator) { - if (!content) { - return { hash: null, content: null, truncated: false, originalLength: null }; - } - - // Hash the ORIGINAL content before any truncation - const hash = deduplicator ? deduplicator.hashContent(content) : null; - - // Then truncate for display - const truncationResult = truncateContent(content, maxLength); - - return { - hash, - content: truncationResult.content, - truncated: truncationResult.truncated, - originalLength: truncationResult.originalLength, - }; -} - -/** - * Smart truncation for system reminder content - * Keeps first N characters and everything from the LAST tag onwards - * @param {string|Array|Object} content - Content to truncate - * @param {number} prefixLength - Length of prefix to keep (default: 50) - * @returns {Object} { content, truncated, originalLength, charsRemoved } - */ -function truncateSystemReminder(content, prefixLength = 50) { - // Handle different content types - let stringContent; - if (typeof content === "string") { - stringContent = content; - } else if (Array.isArray(content)) { - stringContent = JSON.stringify(content); - } else if (typeof content === "object" && content !== null) { - stringContent = JSON.stringify(content); - } else { - return { content, truncated: false, originalLength: null, charsRemoved: 0 }; - } - - const originalLength = stringContent.length; - - // Find the LAST occurrence of tag - const tagIndex = stringContent.lastIndexOf(""); - - // If tag not found, return unchanged - if (tagIndex === -1) { - return { content, truncated: false, originalLength, charsRemoved: 0 }; - } - - // If tag is within the prefix, don't truncate - if (tagIndex < prefixLength) { - return { content, truncated: false, originalLength, charsRemoved: 0 }; - } - - // Extract prefix and suffix - const prefix = stringContent.substring(0, prefixLength); - const suffix = stringContent.substring(tagIndex); - - // Calculate what would be removed - const charsRemoved = tagIndex - prefixLength; - - // If removal would be insignificant (< 100 chars), don't truncate - if (charsRemoved < 100) { - return { content, truncated: false, originalLength, charsRemoved: 0 }; - } - - // Build truncated content - const truncatedContent = prefix + "..." + suffix; - - return { - content: truncatedContent, - truncated: true, - originalLength, - charsRemoved, - }; -} - -/** - * Hash and apply smart truncation for system reminder content - * Hashes the ORIGINAL content before truncation - * @param {string|Array|Object} content - Content to hash and truncate - * @param {number} prefixLength - Length of prefix to keep (default: 50) - * @param {ContentDeduplicator} deduplicator - Deduplicator instance for hashing - * @returns {Object} { hash, content, truncated, originalLength, charsRemoved } - */ -function hashAndTruncateSystemReminder(content, prefixLength = 50, deduplicator) { - if (!content) { - return { hash: null, content: null, truncated: false, originalLength: null, charsRemoved: 0 }; - } - - // Hash the ORIGINAL content before any truncation - const hash = deduplicator ? deduplicator.hashContent(content) : null; - - // Then apply smart truncation - const truncationResult = truncateSystemReminder(content, prefixLength); - - return { - hash, - content: truncationResult.content, - truncated: truncationResult.truncated, - originalLength: truncationResult.originalLength, - charsRemoved: truncationResult.charsRemoved, - }; -} - -/** - * Extract hostname and port from URL - * @param {string} url - Full URL - * @returns {Object} { hostname, port } - */ -function parseDestinationUrl(url) { - try { - const parsed = new URL(url); - return { - hostname: parsed.hostname, - port: parsed.port || (parsed.protocol === "https:" ? 443 : 80), - protocol: parsed.protocol.replace(":", ""), - }; - } catch { - return { hostname: null, port: null, protocol: null }; - } -} - -/** - * Create audit logger wrapper with convenience methods - * @param {Object} config - Audit configuration from config.js - * @returns {Object} Audit logger interface - */ -function createAuditLoggerWrapper(config) { - if (!config.enabled) { - // Return no-op logger if disabled - return { - logLlmRequest: () => {}, - logLlmResponse: () => {}, - restoreLogEntry: (entry) => entry, - enabled: false, - }; - } - - const logger = createAuditLogger(config); - - // Support both legacy single value and new object format for maxContentLength - const maxContentLength = - typeof config.maxContentLength === 'object' - ? config.maxContentLength - : { - systemPrompt: config.maxContentLength || 5000, - userMessages: config.maxContentLength || 5000, - response: config.maxContentLength || 5000, - }; - - // Initialize deduplicator if enabled - const deduplicator = - config.deduplication?.enabled - ? new ContentDeduplicator(config.deduplication.dictionaryPath, { - minSize: config.deduplication.minSize, - cacheSize: config.deduplication.cacheSize, - sanitize: config.deduplication.sanitize, - sessionCache: config.deduplication.sessionCache, - }) - : null; - - /** - * Log hash annotation line for easy lookup - * @private - * @param {Object} hashes - Hash values to annotate - */ - function logHashAnnotation(hashes) { - if (!config.annotations) { - return; // Skip if annotations disabled - } - - const annotationEntry = { - _annotation: true, - lookup: "Use: node scripts/audit-log-reader.js --hash ", - }; - - // Add any provided hashes - if (hashes.systemPromptHash) { - annotationEntry.systemPromptHash = hashes.systemPromptHash; - } - if (hashes.userMessagesHash) { - annotationEntry.userMessagesHash = hashes.userMessagesHash; - } - if (hashes.userQueryHash) { - annotationEntry.userQueryHash = hashes.userQueryHash; - } - - logger.info(annotationEntry); - } - - return { - /** - * Log LLM request (user message sent to provider) - * @param {Object} context - Request context - */ - logLlmRequest(context) { - const { - correlationId, - sessionId, - provider, - model, - stream, - destinationUrl, - userMessages, - systemPrompt, - tools, - maxTokens, - } = context; - - const { hostname, port, protocol } = parseDestinationUrl(destinationUrl); - - // Hash BEFORE truncate - this ensures we track the original content - // Use specific max lengths for different content types - const hashedMessages = hashAndTruncate(userMessages, maxContentLength.userMessages, deduplicator); - const hashedSystem = systemPrompt - ? hashAndTruncate(systemPrompt, maxContentLength.systemPrompt, deduplicator) - : { hash: null, content: null, truncated: false }; - - // Deduplicate large content if enabled (using original content hash) - // Session-level deduplication: first time outputs truncated content, subsequent times output reference - let finalUserMessages = hashedMessages.content; - let finalSystemPrompt = hashedSystem.content; - - if (deduplicator) { - // Deduplicate userMessages if original content is large enough - if (userMessages && deduplicator.shouldDeduplicate(userMessages)) { - const isFirstTime = deduplicator.isFirstTimeInSession(hashedMessages.hash); - if (isFirstTime) { - // First time: output truncated content, but store in dictionary - deduplicator.storeContentWithHash(userMessages, hashedMessages.hash); - finalUserMessages = hashedMessages.content; // Use truncated content - } else { - // Subsequent times: output only reference - finalUserMessages = deduplicator.storeContentWithHash( - userMessages, - hashedMessages.hash - ); - } - } - - // Deduplicate systemPrompt if original content is large enough - if (systemPrompt && deduplicator.shouldDeduplicate(systemPrompt)) { - const isFirstTime = deduplicator.isFirstTimeInSession(hashedSystem.hash); - if (isFirstTime) { - // First time: output truncated content, but store in dictionary - deduplicator.storeContentWithHash(systemPrompt, hashedSystem.hash); - finalSystemPrompt = hashedSystem.content; // Use truncated content - } else { - // Subsequent times: output only reference - finalSystemPrompt = deduplicator.storeContentWithHash( - systemPrompt, - hashedSystem.hash - ); - } - } - } - - const logEntry = { - type: "llm_request", - correlationId, - sessionId, - provider, - model, - stream: stream || false, - destinationUrl, - destinationHostname: hostname, - destinationPort: port, - protocol, - userMessages: finalUserMessages, - systemPrompt: finalSystemPrompt, - tools: Array.isArray(tools) ? tools : null, - maxTokens: maxTokens || null, - contentTruncated: hashedMessages.truncated || hashedSystem.truncated, - msg: "LLM request initiated", - }; - - // Add original length indicators if truncated - if (hashedMessages.truncated) { - logEntry.userMessagesOriginalLength = hashedMessages.originalLength; - } - if (hashedSystem.truncated) { - logEntry.systemPromptOriginalLength = hashedSystem.originalLength; - } - - logger.info(logEntry); - - // Log hash annotation for easy lookup - logHashAnnotation({ - userMessagesHash: hashedMessages.hash, - systemPromptHash: hashedSystem.hash, - }); - }, - - /** - * Log LLM response (response received from provider) - * @param {Object} context - Response context - */ - logLlmResponse(context) { - const { - correlationId, - sessionId, - provider, - model, - stream, - destinationUrl, - destinationHostname, - destinationIp, - destinationIpFamily, - assistantMessage, - stopReason, - requestTokens, - responseTokens, - latencyMs, - status, - error, - streamingNote, - } = context; - - const { hostname, port, protocol } = parseDestinationUrl(destinationUrl); - - // Truncate response content if needed (but not for streaming) - let truncatedMessage = { content: null, truncated: false }; - if (assistantMessage && !stream) { - truncatedMessage = truncateContent(assistantMessage, maxContentLength.response); - } - - const logEntry = { - type: "llm_response", - correlationId, - sessionId, - provider, - model, - stream: stream || false, - destinationUrl, - destinationHostname: destinationHostname || hostname, - destinationPort: port, - destinationIp: destinationIp || null, - destinationIpFamily: destinationIpFamily || null, - protocol, - status: status || null, - latencyMs: latencyMs || null, - msg: error ? "LLM request failed" : "LLM response received", - }; - - // Add response content for non-streaming - if (!stream && assistantMessage) { - logEntry.assistantMessage = truncatedMessage.content; - logEntry.stopReason = stopReason || null; - logEntry.contentTruncated = truncatedMessage.truncated; - if (truncatedMessage.truncated) { - logEntry.assistantMessageOriginalLength = truncatedMessage.originalLength; - } - } - - // Add streaming note if applicable - if (stream && streamingNote) { - logEntry.streamingNote = streamingNote; - } - - // Add token usage - if (requestTokens || responseTokens) { - logEntry.usage = { - requestTokens: requestTokens || null, - responseTokens: responseTokens || null, - totalTokens: (requestTokens || 0) + (responseTokens || 0), - }; - } - - // Add error details if present - if (error) { - logEntry.error = typeof error === "string" ? error : error.message || "Unknown error"; - logEntry.errorStack = error.stack || null; - } - - logger.info(logEntry); - }, - - /** - * Log query-response pair with full content (NO truncation) - * This is logged AFTER the response for easy query/response correlation - * @param {Object} context - Query-response context - */ - logQueryResponsePair(context) { - const { - correlationId, - sessionId, - provider, - model, - requestTime, - responseTime, - userQuery, - assistantResponse, - stopReason, - latencyMs, - requestTokens, - responseTokens, - } = context; - - // Hash BEFORE truncate - apply smart truncation to userQuery - const hashedQuery = hashAndTruncateSystemReminder(userQuery, 50, deduplicator); - - // Deduplicate userQuery if original content is large enough - // Session-level deduplication: first time outputs truncated content, subsequent times output reference - let finalUserQuery = hashedQuery.content; - if (deduplicator && userQuery && deduplicator.shouldDeduplicate(userQuery)) { - const isFirstTime = deduplicator.isFirstTimeInSession(hashedQuery.hash); - if (isFirstTime) { - // First time: output truncated content, but store in dictionary - deduplicator.storeContentWithHash(userQuery, hashedQuery.hash); - finalUserQuery = hashedQuery.content; // Use truncated content - } else { - // Subsequent times: output only reference - finalUserQuery = deduplicator.storeContentWithHash(userQuery, hashedQuery.hash); - } - } - - const logEntry = { - type: "llm_query_response_pair", - correlationId, - sessionId, - provider, - model, - requestTime, - responseTime, - latencyMs: latencyMs || null, - userQuery: finalUserQuery, // Smart truncation + deduplication applied - assistantResponse, // Full response, NO truncation or deduplication (usually unique) - stopReason: stopReason || null, - msg: "Query-response pair (full content)", - }; - - // Add truncation metadata if truncation occurred - if (hashedQuery.truncated) { - logEntry.userQueryTruncated = true; - logEntry.userQueryOriginalLength = hashedQuery.originalLength; - logEntry.userQueryCharsRemoved = hashedQuery.charsRemoved; - } - - // Add token usage if available - if (requestTokens || responseTokens) { - logEntry.usage = { - requestTokens: requestTokens || null, - responseTokens: responseTokens || null, - totalTokens: (requestTokens || 0) + (responseTokens || 0), - }; - } - - logger.info(logEntry); - - // Log hash annotation for easy lookup - logHashAnnotation({ - userQueryHash: hashedQuery.hash, - }); - }, - - /** - * Restore full content from hash references in a log entry - * @param {Object} entry - Log entry with potential hash references - * @returns {Object} Entry with full content restored - */ - restoreLogEntry(entry) { - return deduplicator ? deduplicator.restoreEntry(entry) : entry; - }, - - /** - * Get deduplication statistics - * @returns {Object|null} Statistics or null if deduplication disabled - */ - getDeduplicationStats() { - return deduplicator ? deduplicator.getStats() : null; - }, - - enabled: true, - }; -} - -module.exports = { - createAuditLogger: createAuditLoggerWrapper, - truncateContent, - truncateSystemReminder, - hashAndTruncate, - hashAndTruncateSystemReminder, - parseDestinationUrl, -}; diff --git a/src/logger/deduplicator.js b/src/logger/deduplicator.js deleted file mode 100644 index 880ed80..0000000 --- a/src/logger/deduplicator.js +++ /dev/null @@ -1,547 +0,0 @@ -const crypto = require("crypto"); -const fs = require("fs"); -const path = require("path"); -const readline = require("readline"); - -/** - * Content Deduplicator for LLM Audit Logs - * - * Implements content-addressable storage using SHA-256 hashes to deduplicate - * repetitive content in audit logs. Large content blocks are stored once in a - * dictionary file and referenced by hash in the main log. - * - * Key Features: - * - Hash-based deduplication (SHA-256, first 16 chars) - * - LRU cache for hot content (avoids disk I/O) - * - Configurable minimum content size threshold - * - Async dictionary writes for minimal latency impact - * - Backward compatible (handles both references and inline content) - * - * Dictionary Format (JSONL): - * {"hash": "sha256:abc...", "content": "...", "firstSeen": "ISO timestamp", "useCount": 123} - * - * Reference Format: - * {"$ref": "sha256:abc...", "size": 1234} - */ - -class LRUCache { - constructor(maxSize = 100) { - this.maxSize = maxSize; - this.cache = new Map(); - } - - get(key) { - if (!this.cache.has(key)) { - return undefined; - } - // Move to end (most recently used) - const value = this.cache.get(key); - this.cache.delete(key); - this.cache.set(key, value); - return value; - } - - set(key, value) { - // Remove if exists (to update position) - if (this.cache.has(key)) { - this.cache.delete(key); - } - // Add to end - this.cache.set(key, value); - // Evict oldest if over size - if (this.cache.size > this.maxSize) { - const firstKey = this.cache.keys().next().value; - this.cache.delete(firstKey); - } - } - - has(key) { - return this.cache.has(key); - } -} - -class ContentDeduplicator { - constructor(dictionaryPath, options = {}) { - this.dictionaryPath = dictionaryPath; - this.minSize = options.minSize || 500; - this.cacheSize = options.cacheSize || 100; - this.sanitizeEnabled = options.sanitize !== false; // default true - this.sessionCacheEnabled = options.sessionCache !== false; // default true - - // LRU cache: hash -> content - this.contentCache = new LRUCache(this.cacheSize); - - // Track usage counts: hash -> count - this.usageCounts = new Map(); - - // Track last seen timestamps: hash -> ISO timestamp - this.lastSeenTimestamps = new Map(); - - // Session-level cache: hash -> boolean (tracks if hash has been output in full in this session) - // Cleared on server restart, not persisted to disk - this.sessionContentCache = new Map(); - - // Ensure dictionary directory exists - const dictDir = path.dirname(this.dictionaryPath); - if (!fs.existsSync(dictDir)) { - fs.mkdirSync(dictDir, { recursive: true }); - } - - // Load existing dictionary into cache - this._loadDictionary(); - } - - /** - * Load existing dictionary file into cache - * @private - */ - _loadDictionary() { - if (!fs.existsSync(this.dictionaryPath)) { - return; - } - - try { - const fileStream = fs.createReadStream(this.dictionaryPath); - const rl = readline.createInterface({ - input: fileStream, - crlfDelay: Infinity, - }); - - rl.on("line", (line) => { - try { - const entry = JSON.parse(line); - if (!entry.hash) return; - - // Handle full entry (has content) - if (entry.content !== undefined && entry.content !== null) { - this.contentCache.set(entry.hash, entry.content); - this.usageCounts.set(entry.hash, entry.useCount || 1); - this.lastSeenTimestamps.set(entry.hash, entry.lastSeen || entry.firstSeen); - } - // Handle update entry (content is null, only metadata) - else if (entry.firstSeen === null) { - // This is an update entry - update metadata only - if (entry.useCount !== undefined) { - this.usageCounts.set(entry.hash, entry.useCount); - } - if (entry.lastSeen) { - this.lastSeenTimestamps.set(entry.hash, entry.lastSeen); - } - } - } catch (err) { - // Skip malformed lines (silent - don't pollute logs) - } - }); - - // Suppress error events during load (file may not exist yet) - rl.on("error", () => { - // Silently ignore - dictionary will be created on first write - }); - - // Wait for file to be fully read (synchronously for initialization) - return new Promise((resolve) => { - rl.on("close", resolve); - }); - } catch (err) { - // Silently ignore load errors - dictionary will be created on first write - } - } - - /** - * Compute SHA-256 hash of content (first 16 chars for brevity) - * @param {string|object|array} content - Content to hash - * @returns {string} Hash in format "sha256:abc..." - */ - hashContent(content) { - const stringContent = typeof content === "string" ? content : JSON.stringify(content); - const hash = crypto.createHash("sha256").update(stringContent, "utf8").digest("hex"); - return `sha256:${hash.substring(0, 16)}`; - } - - /** - * Clean empty "User:" entries from content - * Removes wasteful empty "User:" entries that appear between Claude responses - * - * @private - * @param {string} content - Content to clean - * @returns {string} Cleaned content - */ - _sanitizeContent(content) { - // Only process string content that contains conversation patterns - if (typeof content !== 'string' || !content.includes('User:') || !content.includes('Claude:')) { - return content; - } - - // Split into lines for processing - const lines = content.split('\n'); - const cleaned = []; - let i = 0; - - while (i < lines.length) { - const line = lines[i]; - - // Check if this is an empty "User:" entry - // Pattern: line with just "User:" or "User: " followed by empty line(s) - if (line.trim() === 'User:' || line.trim() === 'User: ') { - // Look ahead to see if next line is empty or another "User:" or "Claude:" - const nextLine = i + 1 < lines.length ? lines[i + 1] : ''; - - // If followed by empty line or another marker, this is an empty User: entry - if (nextLine.trim() === '' || nextLine.trim() === 'User:' || nextLine.trim() === 'Claude:') { - // Skip this empty User: entry - i += 1; - // Also skip following empty lines - while (i < lines.length && lines[i].trim() === '') { - i += 1; - } - continue; - } - } - - // Keep this line - cleaned.push(line); - i++; - } - - return cleaned.join('\n'); - } - - /** - * Check if content should be deduplicated - * @param {string|object|array} content - Content to check - * @param {number} minSize - Minimum size threshold (default: from config) - * @returns {boolean} True if content should be deduplicated - */ - shouldDeduplicate(content, minSize = null) { - if (!content) return false; - - const threshold = minSize !== null ? minSize : this.minSize; - const stringContent = typeof content === "string" ? content : JSON.stringify(content); - - // Check size threshold - if (stringContent.length < threshold) { - return false; - } - - // Don't deduplicate already truncated content (contains truncation indicators) - if ( - typeof stringContent === "string" && - (stringContent.includes("[truncated,") || stringContent.includes("... [truncated")) - ) { - return false; - } - - return true; - } - - /** - * Store content in dictionary and return reference - * @param {string|object|array} content - Content to store - * @returns {object} Reference object: { $ref: "sha256:abc...", size: 1234 } - */ - storeContent(content) { - if (!content) { - return null; - } - - let stringContent = typeof content === "string" ? content : JSON.stringify(content); - - // Sanitize content before hashing (if enabled) - if (this.sanitizeEnabled) { - stringContent = this._sanitizeContent(stringContent); - } - - const hash = this.hashContent(stringContent); - const size = stringContent.length; - - // Update usage count - const currentCount = this.usageCounts.get(hash) || 0; - this.usageCounts.set(hash, currentCount + 1); - - // If already in cache, just return reference - if (this.contentCache.has(hash)) { - // Update lastSeen timestamp - const now = new Date().toISOString(); - this.lastSeenTimestamps.set(hash, now); - // Update dictionary entry asynchronously (increment useCount, update lastSeen) - this._updateDictionaryEntry(hash, currentCount + 1, now); - return { $ref: hash, size }; - } - - // Store in cache - this.contentCache.set(hash, stringContent); - - // Track lastSeen - const now = new Date().toISOString(); - this.lastSeenTimestamps.set(hash, now); - - // Append to dictionary file asynchronously - this._appendToDictionary(hash, stringContent, currentCount + 1, now); - - return { $ref: hash, size }; - } - - /** - * Store content with a pre-computed hash (for hash-before-truncate pattern) - * @param {string|object|array} content - Content to store (original, not truncated) - * @param {string} precomputedHash - Hash computed before truncation - * @returns {object} Reference object: { $ref: "sha256:abc...", size: 1234 } or full content if first time in session - */ - storeContentWithHash(content, precomputedHash) { - if (!content || !precomputedHash) { - return null; - } - - let stringContent = typeof content === "string" ? content : JSON.stringify(content); - - // Sanitize content (if enabled) - if (this.sanitizeEnabled) { - stringContent = this._sanitizeContent(stringContent); - } - - const hash = precomputedHash; // Use provided hash instead of recomputing - const size = stringContent.length; - - // Update usage count - const currentCount = this.usageCounts.get(hash) || 0; - this.usageCounts.set(hash, currentCount + 1); - - // Track lastSeen - const now = new Date().toISOString(); - this.lastSeenTimestamps.set(hash, now); - - // Session-level deduplication: First time in session outputs full content - const isFirstTimeInSession = this.sessionCacheEnabled && !this.isFirstTimeInSession(hash); - - // If already in cache (dictionary), update metadata - if (this.contentCache.has(hash)) { - // Update dictionary entry asynchronously (increment useCount, update lastSeen) - this._updateDictionaryEntry(hash, currentCount + 1, now); - - // If first time in session, mark it and return reference (will be expanded by caller if needed) - if (isFirstTimeInSession) { - this.markSeenInSession(hash); - } - - return { $ref: hash, size }; - } - - // Store in cache (first time ever) - this.contentCache.set(hash, stringContent); - - // Mark as seen in session - if (this.sessionCacheEnabled) { - this.markSeenInSession(hash); - } - - // Append to dictionary file asynchronously - this._appendToDictionary(hash, stringContent, currentCount + 1, now); - - return { $ref: hash, size }; - } - - /** - * Check if this hash has been output in full during the current session - * @param {string} hash - Content hash - * @returns {boolean} True if this is the first time seeing this hash in this session - */ - isFirstTimeInSession(hash) { - return !this.sessionContentCache.has(hash); - } - - /** - * Mark a hash as having been output in full during this session - * @param {string} hash - Content hash - */ - markSeenInSession(hash) { - this.sessionContentCache.set(hash, true); - } - - /** - * Clear session cache (useful for testing or manual cache reset) - */ - clearSessionCache() { - this.sessionContentCache.clear(); - } - - /** - * Append new entry to dictionary file - * @private - * @param {string} hash - Content hash - * @param {string} content - Actual content - * @param {number} useCount - Usage count - * @param {string} timestamp - ISO timestamp for firstSeen/lastSeen - */ - _appendToDictionary(hash, content, useCount, timestamp) { - const entry = { - hash, - firstSeen: timestamp, - useCount, - lastSeen: timestamp, - content, - }; - - // Async append (non-blocking) - fs.appendFile(this.dictionaryPath, JSON.stringify(entry) + "\n", (err) => { - if (err) { - console.error("Failed to append to dictionary:", err.message); - } - }); - } - - /** - * Update existing dictionary entry (append update line to dictionary) - * @private - * @param {string} hash - Content hash - * @param {number} newCount - New usage count - * @param {string} timestamp - ISO timestamp for lastSeen - */ - _updateDictionaryEntry(hash, newCount, timestamp) { - // Append an update entry with the same hash to track updated metadata - // The reader/compactor should use the LAST entry for a given hash - const updateEntry = { - hash, - firstSeen: null, // Null indicates this is an update, not a new entry - useCount: newCount, - lastSeen: timestamp, - content: null, - }; - - // Async append (non-blocking) - fs.appendFile(this.dictionaryPath, JSON.stringify(updateEntry) + "\n", (err) => { - if (err) { - console.error("Failed to append update to dictionary:", err.message); - } - }); - } - - /** - * Retrieve content by hash reference - * @param {string} hashRef - Hash reference (e.g., "sha256:abc...") - * @returns {string|null} Content or null if not found - */ - getContent(hashRef) { - // Check cache first - if (this.contentCache.has(hashRef)) { - return this.contentCache.get(hashRef); - } - - // If not in cache, read from dictionary (synchronously for now) - return this._readFromDictionary(hashRef); - } - - /** - * Read content from dictionary file by hash - * @private - * @param {string} hashRef - Hash reference - * @returns {string|null} Content or null if not found - */ - _readFromDictionary(hashRef) { - if (!fs.existsSync(this.dictionaryPath)) { - return null; - } - - try { - const fileContent = fs.readFileSync(this.dictionaryPath, "utf8"); - const lines = fileContent.split("\n"); - - for (const line of lines) { - if (!line.trim()) continue; - try { - const entry = JSON.parse(line); - if (entry.hash === hashRef) { - // Cache it for future use - this.contentCache.set(hashRef, entry.content); - return entry.content; - } - } catch { - // Skip malformed lines - } - } - } catch (err) { - console.error("Failed to read dictionary:", err.message); - } - - return null; - } - - /** - * Process log entry fields for deduplication - * @param {object} entry - Log entry object - * @param {string[]} fields - Fields to deduplicate (default: common large fields) - * @returns {object} Entry with deduplicated fields - */ - deduplicateEntry(entry, fields = ["userMessages", "systemPrompt", "userQuery"]) { - if (!entry || typeof entry !== "object") { - return entry; - } - - const deduplicated = { ...entry }; - - for (const field of fields) { - const content = entry[field]; - - // Skip if field doesn't exist or is already a reference - if (!content || (typeof content === "object" && content.$ref)) { - continue; - } - - // Check if should deduplicate - if (this.shouldDeduplicate(content)) { - const ref = this.storeContent(content); - if (ref) { - deduplicated[field] = ref; - } - } - } - - return deduplicated; - } - - /** - * Restore full content from hash references in a log entry - * @param {object} entry - Log entry with potential references - * @returns {object} Entry with full content restored - */ - restoreEntry(entry) { - if (!entry || typeof entry !== "object") { - return entry; - } - - const restored = { ...entry }; - - // Check all fields for references - for (const [key, value] of Object.entries(entry)) { - if (typeof value === "object" && value !== null && value.$ref) { - const content = this.getContent(value.$ref); - if (content !== null) { - // Try to parse back to original type - try { - restored[key] = JSON.parse(content); - } catch { - restored[key] = content; - } - } - } - } - - return restored; - } - - /** - * Get statistics about deduplication - * @returns {object} Statistics - */ - getStats() { - return { - cacheSize: this.contentCache.cache.size, - uniqueContentBlocks: this.usageCounts.size, - totalReferences: Array.from(this.usageCounts.values()).reduce((a, b) => a + b, 0), - }; - } -} - -module.exports = { - ContentDeduplicator, - LRUCache, -}; diff --git a/src/logger/index.js b/src/logger/index.js index 7f49d9f..cf29399 100644 --- a/src/logger/index.js +++ b/src/logger/index.js @@ -1,91 +1,27 @@ const pino = require("pino"); const config = require("../config"); -const { createOversizedErrorStream } = require("./oversized-error-stream"); -/** - * Application logger using Pino - * - * Standard Network Logging Fields: - * When logging network requests/responses, use these consistent field names: - * - * - destinationUrl: string - Full URL being requested (e.g., "https://api.example.com/v1/endpoint") - * - destinationHostname: string - Hostname only (e.g., "api.example.com") - * - destinationIp: string - Resolved IP address (logged by DNS logger at debug level) - * - ipFamily: number - IP version (4 or 6) - logged by DNS logger - * - protocol: string - Protocol used ("http" or "https") - * - status: number - HTTP status code - * - provider: string - Service/provider label (e.g., "OpenAI", "HTTP", "HTTPS") - * - duration: number - Request duration in milliseconds - * - * DNS Resolution Logging: - * DNS resolution is logged at debug level via the dns-logger module. - * To see DNS logs, set LOG_LEVEL=debug. DNS logs correlate with application - * logs via the destinationHostname field. - * - * Example DNS log: - * { - * "level": "debug", - * "provider": "HTTPS", - * "hostname": "api.openai.com", - * "resolvedIp": "104.18.23.45", - * "ipFamily": 4, - * "duration": 23, - * "msg": "DNS resolution completed" - * } - * - * Example API request log: - * { - * "level": "debug", - * "provider": "OpenAI", - * "status": 200, - * "destinationUrl": "https://api.openai.com/v1/chat/completions", - * "destinationHostname": "api.openai.com", - * "responseLength": 1523, - * "msg": "OpenAI API response" - * } - */ - -// Create array of streams for multistream setup -const streams = []; - -// Main console output stream -streams.push({ - level: config.logger.level, - stream: - config.env === "development" - ? pino.transport({ - target: "pino-pretty", - options: { - translateTime: "SYS:standard", - ignore: "pid,hostname", - colorize: true, - }, - }) - : process.stdout, +const logger = pino({ + level: config.logger.level, + name: "claude-backend", + base: { + env: config.env, + }, + redact: { + paths: ["req.headers.authorization", "req.headers.cookie"], + censor: "***redacted***", + }, + transport: + config.env === "development" + ? { + target: "pino-pretty", + options: { + translateTime: "SYS:standard", + ignore: "pid,hostname", + colorize: true, + }, + } + : undefined, }); -// Oversized error stream (if enabled) -if (config.oversizedErrorLogging?.enabled) { - streams.push({ - level: "warn", // Only capture WARN and ERROR - stream: createOversizedErrorStream(config.oversizedErrorLogging), - }); -} - -// Create logger with multistream -const logger = pino( - { - level: config.logger.level, - name: "claude-backend", - base: { - env: config.env, - }, - redact: { - paths: ["req.headers.authorization", "req.headers.cookie"], - censor: "***redacted***", - }, - }, - pino.multistream(streams), -); - module.exports = logger; diff --git a/src/logger/oversized-error-stream.js b/src/logger/oversized-error-stream.js deleted file mode 100644 index adbbd82..0000000 --- a/src/logger/oversized-error-stream.js +++ /dev/null @@ -1,288 +0,0 @@ -const fs = require("node:fs"); -const path = require("node:path"); -const { Writable } = require("node:stream"); - -/** - * Custom Pino stream that captures oversized error messages to separate log files. - * Errors from the same session are appended to a single file. - */ - -// Cache of session file handles to enable appending -const sessionFiles = new Map(); - -// Track first error timestamp per session for filename -const sessionTimestamps = new Map(); - -/** - * Creates a custom Pino stream that captures oversized errors - * @param {Object} config - Configuration object - * @param {number} config.threshold - Character threshold for capturing (default 200) - * @param {string} config.logDir - Directory for oversized error logs - * @param {number} config.maxFiles - Maximum number of log files to keep - * @returns {Writable} - Writable stream for Pino - */ -function createOversizedErrorStream(config) { - const { threshold = 200, logDir, maxFiles = 100 } = config; - - // Ensure log directory exists - ensureDirectoryExists(logDir); - - // Create writable stream - const stream = new Writable({ - objectMode: true, - write(chunk, encoding, callback) { - try { - // Parse the log entry (Pino sends JSON strings) - const logObject = typeof chunk === "string" ? JSON.parse(chunk) : chunk; - - // Check if this log should be captured - const { shouldCapture, oversizedFields } = shouldCaptureLog(logObject, threshold); - - if (shouldCapture) { - // Extract or generate session ID - const sessionId = extractSessionId(logObject); - - // Get or create session file - const { filepath, writeStream } = getSessionFile(sessionId, logDir, maxFiles); - - // Format the log entry - const logEntry = formatLogEntry(logObject, oversizedFields); - - // Write to file (JSONL format - one JSON object per line) - writeStream.write(`${JSON.stringify(logEntry)}\n`, (err) => { - if (err) { - console.error(`Failed to write oversized error to ${filepath}:`, err.message); - } - }); - } - - // Always call callback to continue stream processing - callback(); - } catch (err) { - // Don't crash on stream errors - log and continue - console.error("Oversized error stream processing failed:", err.message); - callback(); - } - }, - final(callback) { - // Close all open file handles when stream ends - for (const [sessionId, { writeStream }] of sessionFiles.entries()) { - writeStream.end(); - } - sessionFiles.clear(); - sessionTimestamps.clear(); - callback(); - }, - }); - - // Handle stream errors gracefully - stream.on("error", (err) => { - console.error("Oversized error stream error:", err.message); - }); - - return stream; -} - -/** - * Determines if a log entry should be captured based on size threshold - * @param {Object} logObject - Pino log object - * @param {number} threshold - Character threshold - * @returns {Object} - { shouldCapture: boolean, oversizedFields: string[] } - */ -function shouldCaptureLog(logObject, threshold) { - // Only capture WARN (40) and ERROR (50) level logs - if (logObject.level < 40) { - return { shouldCapture: false, oversizedFields: [] }; - } - - const oversizedFields = []; - - // Check all fields recursively - function checkField(value, fieldPath) { - if (typeof value === "string") { - if (value.length > threshold) { - oversizedFields.push(fieldPath); - } - } else if (typeof value === "object" && value !== null) { - // Check nested objects/arrays - for (const [key, val] of Object.entries(value)) { - checkField(val, fieldPath ? `${fieldPath}.${key}` : key); - } - } - } - - // Check all fields in log object - for (const [key, value] of Object.entries(logObject)) { - // Skip internal Pino fields - if (["level", "time", "pid", "hostname"].includes(key)) continue; - checkField(value, key); - } - - return { - shouldCapture: oversizedFields.length > 0, - oversizedFields, - }; -} - -/** - * Extracts session ID from log object with fallback strategies - * @param {Object} logObject - Pino log object - * @returns {string} - Session ID or fallback identifier - */ -function extractSessionId(logObject) { - // Try sessionId field first - if (logObject.sessionId) return logObject.sessionId; - - // Try correlationId - if (logObject.correlationId) return logObject.correlationId; - - // Try requestId - if (logObject.requestId) return logObject.requestId; - - // Fallback to unknown with timestamp - return `unknown-${Date.now()}`; -} - -/** - * Gets or creates a file handle for a session - * @param {string} sessionId - Session identifier - * @param {string} logDir - Log directory path - * @param {number} maxFiles - Maximum number of files to keep - * @returns {Object} - { filepath: string, writeStream: WriteStream } - */ -function getSessionFile(sessionId, logDir, maxFiles) { - // Check if we already have a file for this session - if (sessionFiles.has(sessionId)) { - return sessionFiles.get(sessionId); - } - - // Clean up old files if needed (before creating new one) - cleanupOldFiles(logDir, maxFiles); - - // Generate timestamp for first error in this session - const timestamp = new Date() - .toISOString() - .replace(/[-:]/g, "_") - .replace(/\.\d{3}Z$/, "") - .replace("T", "_"); - - sessionTimestamps.set(sessionId, timestamp); - - // Create filename: {sessionId}_{timestamp}.log - const filename = `${sessionId}_${timestamp}.log`; - const filepath = path.join(logDir, filename); - - // Create write stream in append mode - const writeStream = fs.createWriteStream(filepath, { - flags: "a", // append mode - encoding: "utf8", - }); - - // Handle write stream errors - writeStream.on("error", (err) => { - console.error(`Error writing to ${filepath}:`, err.message); - sessionFiles.delete(sessionId); - }); - - // Cache the file handle - const fileInfo = { filepath, writeStream }; - sessionFiles.set(sessionId, fileInfo); - - return fileInfo; -} - -/** - * Formats a log entry for file storage with metadata - * @param {Object} logObject - Original Pino log object - * @param {string[]} oversizedFields - List of fields that exceeded threshold - * @returns {Object} - Formatted log entry - */ -function formatLogEntry(logObject, oversizedFields) { - // Convert Pino timestamp (milliseconds since epoch) to ISO string - const timestamp = new Date(logObject.time).toISOString(); - - // Map Pino log level numbers to names - const levelNames = { - 10: "TRACE", - 20: "DEBUG", - 30: "INFO", - 40: "WARN", - 50: "ERROR", - 60: "FATAL", - }; - - return { - timestamp, - level: levelNames[logObject.level] || "UNKNOWN", - levelNumber: logObject.level, - name: logObject.name, - sessionId: extractSessionId(logObject), - oversizedFields, - ...logObject, // Include all original fields - // Remove redundant internal fields - time: undefined, - pid: undefined, - hostname: undefined, - }; -} - -/** - * Removes oldest log files if count exceeds maximum - * @param {string} logDir - Log directory path - * @param {number} maxFiles - Maximum number of files to keep - */ -function cleanupOldFiles(logDir, maxFiles) { - try { - // List all .log files in directory - const files = fs.readdirSync(logDir).filter((f) => f.endsWith(".log")); - - // If under limit, no cleanup needed - if (files.length < maxFiles) return; - - // Get file stats and sort by modification time (oldest first) - const fileStats = files - .map((filename) => { - const filepath = path.join(logDir, filename); - const stats = fs.statSync(filepath); - return { filename, filepath, mtime: stats.mtime }; - }) - .sort((a, b) => a.mtime - b.mtime); - - // Delete oldest files until we're under the limit - const filesToDelete = fileStats.length - maxFiles + 1; // +1 to make room for new file - for (let i = 0; i < filesToDelete; i++) { - const { filepath } = fileStats[i]; - try { - fs.unlinkSync(filepath); - } catch (err) { - console.error(`Failed to delete old oversized error log ${filepath}:`, err.message); - } - } - } catch (err) { - console.error("Failed to cleanup old oversized error logs:", err.message); - } -} - -/** - * Ensures a directory exists, creating it if necessary - * @param {string} dirPath - Directory path - */ -function ensureDirectoryExists(dirPath) { - try { - if (!fs.existsSync(dirPath)) { - fs.mkdirSync(dirPath, { recursive: true }); - } - } catch (err) { - console.error(`Failed to create oversized error log directory ${dirPath}:`, err.message); - throw err; - } -} - -module.exports = { - createOversizedErrorStream, - shouldCaptureLog, - extractSessionId, - getSessionFile, - formatLogEntry, - cleanupOldFiles, -}; diff --git a/src/orchestrator/index.js b/src/orchestrator/index.js index 25f867e..5444358 100644 --- a/src/orchestrator/index.js +++ b/src/orchestrator/index.js @@ -11,9 +11,6 @@ const systemPrompt = require("../prompts/system"); const historyCompression = require("../context/compression"); const tokenBudget = require("../context/budget"); const { classifyRequestType, selectToolsSmartly } = require("../tools/smart-selection"); -const { parseOllamaModel } = require("../clients/ollama-model-parser"); -const { createAuditLogger } = require("../logger/audit-logger"); -const { getResolvedIp, runWithDnsContext } = require("../clients/dns-logger"); const { getShuttingDown } = require("../api/health"); const DROP_KEYS = new Set([ @@ -981,6 +978,22 @@ function sanitizePayload(payload) { if (!Array.isArray(clean.tools) || clean.tools.length === 0) { delete clean.tools; } + } else if (providerType === "zai") { + // Z.AI (Zhipu) supports tools - keep them in Anthropic format + // They will be converted to OpenAI format in invokeZai + if (!Array.isArray(clean.tools) || clean.tools.length === 0) { + delete clean.tools; + } else { + // Ensure tools are in Anthropic format + clean.tools = ensureAnthropicToolFormat(clean.tools); + } + } else if (providerType === "vertex") { + // Vertex AI supports tools - keep them in Anthropic format + if (!Array.isArray(clean.tools) || clean.tools.length === 0) { + delete clean.tools; + } else { + clean.tools = ensureAnthropicToolFormat(clean.tools); + } } else if (Array.isArray(clean.tools)) { // Unknown provider - remove tools for safety delete clean.tools; @@ -1056,13 +1069,44 @@ function sanitizePayload(payload) { } } + // FIX: Prevent consecutive messages with the same role (causes llama.cpp 400 error) + if (Array.isArray(clean.messages) && clean.messages.length > 0) { + const deduplicated = []; + let lastRole = null; + + for (const msg of clean.messages) { + // Skip if this message has the same role as the previous one + if (msg.role === lastRole) { + logger.debug({ + skippedRole: msg.role, + contentPreview: typeof msg.content === 'string' + ? msg.content.substring(0, 50) + : JSON.stringify(msg.content).substring(0, 50) + }, 'Skipping duplicate consecutive message with same role'); + continue; + } + + deduplicated.push(msg); + lastRole = msg.role; + } + + if (deduplicated.length !== clean.messages.length) { + logger.info({ + originalCount: clean.messages.length, + deduplicatedCount: deduplicated.length, + removed: clean.messages.length - deduplicated.length + }, 'Removed consecutive duplicate roles from message sequence'); + } + + clean.messages = deduplicated; + } + return clean; } const DEFAULT_LOOP_OPTIONS = { maxSteps: config.policy.maxStepsPerTurn ?? 6, maxDurationMs: 120000, - maxToolCallsPerRequest: config.policy.maxToolCallsPerRequest ?? 20, // Prevent runaway tool calling }; function resolveLoopOptions(options = {}) { @@ -1074,43 +1118,13 @@ function resolveLoopOptions(options = {}) { Number.isInteger(options.maxDurationMs) && options.maxDurationMs > 0 ? options.maxDurationMs : DEFAULT_LOOP_OPTIONS.maxDurationMs; - const maxToolCallsPerRequest = - Number.isInteger(options.maxToolCallsPerRequest) && options.maxToolCallsPerRequest > 0 - ? options.maxToolCallsPerRequest - : DEFAULT_LOOP_OPTIONS.maxToolCallsPerRequest; return { ...DEFAULT_LOOP_OPTIONS, maxSteps, maxDurationMs, - maxToolCallsPerRequest, }; } -/** - * Create a signature for a tool call to detect identical repeated calls - * @param {Object} toolCall - The tool call object - * @returns {string} - A hash signature of the tool name and parameters - */ -function getToolCallSignature(toolCall) { - const crypto = require('crypto'); - const name = toolCall.function?.name ?? toolCall.name ?? 'unknown'; - const args = toolCall.function?.arguments ?? toolCall.input; - - // Parse arguments if they're a string - let argsObj = args; - if (typeof args === 'string') { - try { - argsObj = JSON.parse(args); - } catch (err) { - argsObj = args; // Use raw string if parse fails - } - } - - // Create a deterministic signature - const signature = `${name}:${JSON.stringify(argsObj)}`; - return crypto.createHash('sha256').update(signature).digest('hex').substring(0, 16); -} - function buildNonJsonResponse(databricksResponse) { return { status: databricksResponse.status, @@ -1156,8 +1170,6 @@ async function runAgentLoop({ let toolCallsExecuted = 0; let fallbackPerformed = false; const toolCallNames = new Map(); - const toolCallHistory = new Map(); // Track tool calls to detect loops: signature -> count - let loopWarningInjected = false; // Track if we've already warned about loops while (steps < settings.maxSteps) { if (Date.now() - start > settings.maxDurationMs) { @@ -1804,10 +1816,11 @@ async function runAgentLoop({ ), ); } else { + // OpenAI format: tool_call_id MUST match the id from assistant's tool_call toolMessage = { role: "tool", - tool_call_id: execution.id, - name: execution.name, + tool_call_id: call.id ?? execution.id, + name: call.function?.name ?? call.name ?? execution.name, content: execution.content, }; } @@ -1848,35 +1861,6 @@ async function runAgentLoop({ completedTasks: taskExecutions.length, sessionId: session?.id }, "Completed parallel Task execution"); - - // Check if we've exceeded the max tool calls limit after parallel execution - if (toolCallsExecuted > settings.maxToolCallsPerRequest) { - logger.error( - { - sessionId: session?.id ?? null, - toolCallsExecuted, - maxToolCallsPerRequest: settings.maxToolCallsPerRequest, - steps, - }, - "Maximum tool calls per request exceeded after parallel Task execution - terminating", - ); - - return { - response: { - status: 500, - body: { - error: { - type: "max_tool_calls_exceeded", - message: `Maximum tool calls per request exceeded. The model attempted to execute ${toolCallsExecuted} tool calls, but the limit is ${settings.maxToolCallsPerRequest}. This may indicate a complex task that requires breaking down into smaller steps.`, - }, - }, - terminationReason: "max_tool_calls_exceeded", - }, - steps, - durationMs: Date.now() - start, - terminationReason: "max_tool_calls_exceeded", - }; - } } catch (error) { logger.error({ error: error.message, @@ -1968,35 +1952,6 @@ async function runAgentLoop({ toolCallsExecuted += 1; - // Check if we've exceeded the max tool calls limit - if (toolCallsExecuted > settings.maxToolCallsPerRequest) { - logger.error( - { - sessionId: session?.id ?? null, - toolCallsExecuted, - maxToolCallsPerRequest: settings.maxToolCallsPerRequest, - steps, - }, - "Maximum tool calls per request exceeded - terminating", - ); - - return { - response: { - status: 500, - body: { - error: { - type: "max_tool_calls_exceeded", - message: `Maximum tool calls per request exceeded. The model attempted to execute ${toolCallsExecuted} tool calls, but the limit is ${settings.maxToolCallsPerRequest}. This may indicate a complex task that requires breaking down into smaller steps.`, - }, - }, - terminationReason: "max_tool_calls_exceeded", - }, - steps, - durationMs: Date.now() - start, - terminationReason: "max_tool_calls_exceeded", - }; - } - const execution = await executeToolCall(call, { session, requestMessages: cleanPayload.messages, @@ -2050,10 +2005,11 @@ async function runAgentLoop({ ); } else { + // OpenAI format: tool_call_id MUST match the id from assistant's tool_call toolMessage = { role: "tool", - tool_call_id: execution.id, - name: execution.name, + tool_call_id: call.id ?? execution.id, + name: call.function?.name ?? call.name ?? execution.name, content: execution.content, }; } @@ -2111,94 +2067,6 @@ async function runAgentLoop({ } } - // === TOOL CALL LOOP DETECTION === - // Track tool calls to detect infinite loops where the model calls the same tool - // repeatedly with identical parameters - for (const call of toolCalls) { - const signature = getToolCallSignature(call); - const count = (toolCallHistory.get(signature) || 0) + 1; - toolCallHistory.set(signature, count); - - const toolName = call.function?.name ?? call.name ?? 'unknown'; - - if (count === 3 && !loopWarningInjected) { - logger.warn( - { - sessionId: session?.id ?? null, - correlationId: options?.correlationId, - tool: toolName, - loopCount: count, - signature: signature, - action: 'warning_injected', - totalSteps: steps, - remainingSteps: settings.maxSteps - steps, - }, - "Tool call loop detected - same tool called 3 times with identical parameters", - ); - - // Inject warning message to model - loopWarningInjected = true; - const warningMessage = { - role: "user", - content: "⚠️ System Warning: You have called the same tool with identical parameters 3 times in this request. This may indicate an infinite loop. Please provide a final answer to the user instead of calling the same tool again, or explain why you need to continue retrying with the same parameters.", - }; - - cleanPayload.messages.push(warningMessage); - - if (session) { - appendTurnToSession(session, { - role: "user", - type: "system_warning", - status: 200, - content: warningMessage.content, - metadata: { - reason: "tool_call_loop_warning", - toolName, - loopCount: count, - }, - }); - } - } else if (count > 3) { - // Force termination after 3 identical calls - // Log FULL context for debugging why the loop occurred - logger.error( - { - sessionId: session?.id ?? null, - correlationId: options?.correlationId, - tool: toolName, - loopCount: count, - signature: signature, - action: 'request_terminated', - totalSteps: steps, - maxSteps: settings.maxSteps, - // FULL CONTEXT for debugging - myPrompt: cleanPayload.messages, // Full conversation sent to LLM - systemPrompt: cleanPayload.system, // Full system prompt - llmResponse: databricksResponse?.data || databricksResponse?.json, // Full LLM response that triggered loop - repeatedToolCalls: toolCalls, // The actual repeated tool calls - toolCallHistory: Array.from(toolCallHistory.entries()), // Full history of all tool calls in this request - }, - "Tool call loop limit exceeded - forcing termination (FULL CONTEXT CAPTURED)", - ); - - return { - response: { - status: 500, - body: { - error: { - type: "tool_call_loop_detected", - message: `Tool call loop detected: The model called the same tool ("${toolName}") with identical parameters ${count} times. This indicates an infinite loop and execution has been terminated. Please try rephrasing your request or provide different parameters.`, - }, - }, - terminationReason: "tool_call_loop", - }, - steps, - durationMs: Date.now() - start, - terminationReason: "tool_call_loop", - }; - } - } - continue; } @@ -2407,6 +2275,22 @@ async function runAgentLoop({ }, "=== CONVERTED ANTHROPIC RESPONSE (llama.cpp) ==="); anthropicPayload.content = policy.sanitiseContent(anthropicPayload.content); + } else if (actualProvider === "zai") { + // Z.AI responses are already converted to Anthropic format in invokeZai + logger.info({ + hasJson: !!databricksResponse.json, + jsonContent: JSON.stringify(databricksResponse.json?.content)?.substring(0, 200), + }, "=== ZAI ORCHESTRATOR DEBUG ==="); + anthropicPayload = databricksResponse.json; + if (Array.isArray(anthropicPayload?.content)) { + anthropicPayload.content = policy.sanitiseContent(anthropicPayload.content); + } + } else if (actualProvider === "vertex") { + // Vertex AI responses are already in Anthropic format + anthropicPayload = databricksResponse.json; + if (Array.isArray(anthropicPayload?.content)) { + anthropicPayload.content = policy.sanitiseContent(anthropicPayload.content); + } } else { anthropicPayload = toAnthropicResponse( databricksResponse.json, @@ -2643,35 +2527,6 @@ async function runAgentLoop({ toolCallsExecuted += 1; - // Check if we've exceeded the max tool calls limit - if (toolCallsExecuted > settings.maxToolCallsPerRequest) { - logger.error( - { - sessionId: session?.id ?? null, - toolCallsExecuted, - maxToolCallsPerRequest: settings.maxToolCallsPerRequest, - steps, - }, - "Maximum tool calls per request exceeded during fallback - terminating", - ); - - return { - response: { - status: 500, - body: { - error: { - type: "max_tool_calls_exceeded", - message: `Maximum tool calls per request exceeded. The model attempted to execute ${toolCallsExecuted} tool calls, but the limit is ${settings.maxToolCallsPerRequest}. This may indicate a complex task that requires breaking down into smaller steps.`, - }, - }, - terminationReason: "max_tool_calls_exceeded", - }, - steps, - durationMs: Date.now() - start, - terminationReason: "max_tool_calls_exceeded", - }; - } - if (execution.ok) { fallbackPerformed = true; attemptSucceeded = true; @@ -2729,18 +2584,13 @@ async function runAgentLoop({ }); } - const finalDurationMs = Date.now() - start; logger.info( { sessionId: session?.id ?? null, steps, - toolCallsExecuted, - uniqueToolSignatures: toolCallHistory.size, - toolCallLoopWarnings: loopWarningInjected ? 1 : 0, - durationMs: finalDurationMs, - avgDurationPerStep: steps > 0 ? Math.round(finalDurationMs / steps) : 0, + durationMs: Date.now() - start, }, - "Agent loop completed successfully", + "Agent loop completed", ); return { response: { @@ -2749,7 +2599,7 @@ async function runAgentLoop({ terminationReason: "completion", }, steps, - durationMs: finalDurationMs, + durationMs: Date.now() - start, terminationReason: "completion", }; } @@ -2768,17 +2618,11 @@ async function runAgentLoop({ }, metadata: { termination: "max_steps" }, }); - const finalDurationMs = Date.now() - start; logger.warn( { sessionId: session?.id ?? null, steps, - toolCallsExecuted, - uniqueToolSignatures: toolCallHistory.size, - durationMs: finalDurationMs, - maxSteps: settings.maxSteps, - maxDurationMs: settings.maxDurationMs, - maxToolCallsPerRequest: settings.maxToolCallsPerRequest, + durationMs: Date.now() - start, }, "Agent loop exceeded limits", ); @@ -2792,18 +2636,12 @@ async function runAgentLoop({ limits: { maxSteps: settings.maxSteps, maxDurationMs: settings.maxDurationMs, - maxToolCallsPerRequest: settings.maxToolCallsPerRequest, - }, - metrics: { - steps, - toolCallsExecuted, - durationMs: finalDurationMs, }, }, terminationReason: "max_steps", }, steps, - durationMs: finalDurationMs, + durationMs: Date.now() - start, terminationReason: "max_steps", }; } @@ -2830,8 +2668,8 @@ async function processMessage({ payload, headers, session, options = {} }) { let cacheKey = null; let cachedResponse = null; if (promptCache.isEnabled()) { - const cacheSeedPayload = JSON.parse(JSON.stringify(cleanPayload)); - const { key, entry } = promptCache.lookup(cacheSeedPayload); + // cleanPayload is already a deep clone from sanitizePayload, no need to clone again + const { key, entry } = promptCache.lookup(cleanPayload); cacheKey = key; if (entry?.value) { try {