diff --git a/scripts/audit-log-reader.js b/scripts/audit-log-reader.js new file mode 100755 index 0000000..48b1fbc --- /dev/null +++ b/scripts/audit-log-reader.js @@ -0,0 +1,399 @@ +#!/usr/bin/env node + +/** + * LLM Audit Log Reader + * + * Utility to read and reconstruct audit log entries from deduplicated logs. + * Resolves hash references from the dictionary file and outputs full content. + * + * Usage: + * node scripts/audit-log-reader.js [options] + * + * Options: + * --log-file Path to audit log file (default: logs/llm-audit.log) + * --dict-file Path to dictionary file (default: logs/llm-audit-dictionary.jsonl) + * --full Output full restored entries (resolve all references) + * --filter Filter by field (e.g., type=llm_request, provider=anthropic) + * --correlation-id Filter by correlation ID + * --last Show only last N entries + * --stats Show deduplication statistics + * --verify Verify all references can be resolved + * --help Show this help message + * + * Examples: + * # Show all entries with full content + * node scripts/audit-log-reader.js --full + * + * # Show only requests + * node scripts/audit-log-reader.js --filter type=llm_request + * + * # Show last 5 entries + * node scripts/audit-log-reader.js --last 5 --full + * + * # Show deduplication statistics + * node scripts/audit-log-reader.js --stats + * + * # Verify all references resolve + * node scripts/audit-log-reader.js --verify + */ + +const fs = require("fs"); +const path = require("path"); +const readline = require("readline"); +const { ContentDeduplicator } = require("../src/logger/deduplicator"); + +// Default file paths +const DEFAULT_LOG_FILE = path.join(process.cwd(), "logs", "llm-audit.log"); +const DEFAULT_DICT_FILE = path.join(process.cwd(), "logs", "llm-audit-dictionary.jsonl"); + +/** + * Parse command line arguments + */ +function parseArgs() { + const args = process.argv.slice(2); + const options = { + logFile: DEFAULT_LOG_FILE, + dictFile: DEFAULT_DICT_FILE, + full: false, + filter: null, + correlationId: null, + last: null, + stats: false, + verify: false, + help: false, + }; + + for (let i = 0; i < args.length; i++) { + const arg = args[i]; + switch (arg) { + case "--log-file": + options.logFile = args[++i]; + break; + case "--dict-file": + options.dictFile = args[++i]; + break; + case "--full": + options.full = true; + break; + case "--filter": + const filterArg = args[++i]; + const [key, value] = filterArg.split("="); + options.filter = { key, value }; + break; + case "--correlation-id": + options.correlationId = args[++i]; + break; + case "--last": + options.last = Number.parseInt(args[++i], 10); + break; + case "--stats": + options.stats = true; + break; + case "--verify": + options.verify = true; + break; + case "--help": + options.help = true; + break; + default: + console.error(`Unknown option: ${arg}`); + process.exit(1); + } + } + + return options; +} + +/** + * Show help message + */ +function showHelp() { + const helpText = ` +LLM Audit Log Reader + +Usage: node scripts/audit-log-reader.js [options] + +Options: + --log-file Path to audit log file (default: logs/llm-audit.log) + --dict-file Path to dictionary file (default: logs/llm-audit-dictionary.jsonl) + --full Output full restored entries (resolve all references) + --filter Filter by field (e.g., type=llm_request, provider=anthropic) + --correlation-id Filter by correlation ID + --last Show only last N entries + --stats Show deduplication statistics + --verify Verify all references can be resolved + --help Show this help message + +Examples: + # Show all entries with full content + node scripts/audit-log-reader.js --full + + # Show only requests + node scripts/audit-log-reader.js --filter type=llm_request + + # Show last 5 entries + node scripts/audit-log-reader.js --last 5 --full + + # Show deduplication statistics + node scripts/audit-log-reader.js --stats + + # Verify all references resolve + node scripts/audit-log-reader.js --verify +`; + console.log(helpText); +} + +/** + * Read and process log entries + */ +async function readLogEntries(options) { + const { logFile, dictFile, full, filter, correlationId, last } = options; + + // Check if log file exists + if (!fs.existsSync(logFile)) { + console.error(`Log file not found: ${logFile}`); + process.exit(1); + } + + // Initialize deduplicator if needed + let deduplicator = null; + if (full && fs.existsSync(dictFile)) { + deduplicator = new ContentDeduplicator(dictFile); + } + + const entries = []; + + // Read log file line by line + const fileStream = fs.createReadStream(logFile); + const rl = readline.createInterface({ + input: fileStream, + crlfDelay: Infinity, + }); + + for await (const line of rl) { + if (!line.trim()) continue; + + try { + let entry = JSON.parse(line); + + // Apply filters + if (filter && entry[filter.key] !== filter.value) { + continue; + } + if (correlationId && entry.correlationId !== correlationId) { + continue; + } + + // Restore full content if requested + if (full && deduplicator) { + entry = deduplicator.restoreEntry(entry); + } + + entries.push(entry); + } catch (err) { + console.error("Malformed log entry:", err.message); + } + } + + // Apply last N filter + const output = last ? entries.slice(-last) : entries; + + // Output as JSONL + for (const entry of output) { + console.log(JSON.stringify(entry, null, 2)); + } + + return entries.length; +} + +/** + * Show deduplication statistics + */ +async function showStats(options) { + const { logFile, dictFile } = options; + + if (!fs.existsSync(logFile)) { + console.error(`Log file not found: ${logFile}`); + process.exit(1); + } + + if (!fs.existsSync(dictFile)) { + console.log("No dictionary file found. Deduplication may not be enabled."); + return; + } + + // Get file sizes + const logStats = fs.statSync(logFile); + const dictStats = fs.statSync(dictFile); + + console.log("\n=== LLM Audit Log Deduplication Statistics ===\n"); + console.log(`Log file: ${logFile}`); + console.log(` Size: ${formatBytes(logStats.size)}`); + console.log(` Lines: ${await countLines(logFile)}`); + console.log(); + console.log(`Dictionary file: ${dictFile}`); + console.log(` Size: ${formatBytes(dictStats.size)}`); + console.log(` Entries: ${await countLines(dictFile)}`); + console.log(); + console.log(`Total size: ${formatBytes(logStats.size + dictStats.size)}`); + console.log(); + + // Count reference occurrences in log + const refCount = await countReferences(logFile); + console.log(`Reference objects in log: ${refCount}`); + console.log(`Estimated space saved: ~${formatBytes(refCount * 2000)} (assuming ~2KB per deduplicated field)`); + console.log(); +} + +/** + * Verify all references can be resolved + */ +async function verifyReferences(options) { + const { logFile, dictFile } = options; + + if (!fs.existsSync(logFile)) { + console.error(`Log file not found: ${logFile}`); + process.exit(1); + } + + if (!fs.existsSync(dictFile)) { + console.log("No dictionary file found. Nothing to verify."); + return; + } + + const deduplicator = new ContentDeduplicator(dictFile); + const fileStream = fs.createReadStream(logFile); + const rl = readline.createInterface({ + input: fileStream, + crlfDelay: Infinity, + }); + + let totalRefs = 0; + let unresolvedRefs = 0; + const unresolvedHashes = new Set(); + + console.log("Verifying references...\n"); + + for await (const line of rl) { + if (!line.trim()) continue; + + try { + const entry = JSON.parse(line); + + // Check all fields for references + for (const [key, value] of Object.entries(entry)) { + if (typeof value === "object" && value !== null && value.$ref) { + totalRefs++; + const content = deduplicator.getContent(value.$ref); + if (content === null) { + unresolvedRefs++; + unresolvedHashes.add(value.$ref); + console.error(`✗ Unresolved reference: ${value.$ref} in field "${key}"`); + } + } + } + } catch (err) { + console.error("Malformed log entry:", err.message); + } + } + + console.log("\n=== Verification Results ===\n"); + console.log(`Total references: ${totalRefs}`); + console.log(`Unresolved references: ${unresolvedRefs}`); + console.log(`Unique unresolved hashes: ${unresolvedHashes.size}`); + + if (unresolvedRefs === 0) { + console.log("\n✓ All references resolved successfully!"); + } else { + console.log("\n✗ Some references could not be resolved. Dictionary may be incomplete."); + process.exit(1); + } +} + +/** + * Helper: Format bytes to human-readable string + */ +function formatBytes(bytes) { + if (bytes < 1024) return `${bytes} B`; + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(2)} KB`; + return `${(bytes / (1024 * 1024)).toFixed(2)} MB`; +} + +/** + * Helper: Count lines in a file + */ +async function countLines(filePath) { + const fileStream = fs.createReadStream(filePath); + const rl = readline.createInterface({ + input: fileStream, + crlfDelay: Infinity, + }); + + let count = 0; + for await (const line of rl) { + if (line.trim()) count++; + } + return count; +} + +/** + * Helper: Count reference objects in log + */ +async function countReferences(filePath) { + const fileStream = fs.createReadStream(filePath); + const rl = readline.createInterface({ + input: fileStream, + crlfDelay: Infinity, + }); + + let count = 0; + for await (const line of rl) { + if (!line.trim()) continue; + try { + const entry = JSON.parse(line); + for (const value of Object.values(entry)) { + if (typeof value === "object" && value !== null && value.$ref) { + count++; + } + } + } catch { + // Skip malformed lines + } + } + return count; +} + +/** + * Main entry point + */ +async function main() { + const options = parseArgs(); + + if (options.help) { + showHelp(); + return; + } + + if (options.stats) { + await showStats(options); + } else if (options.verify) { + await verifyReferences(options); + } else { + const count = await readLogEntries(options); + console.error(`\n(Processed ${count} entries)`); + } +} + +// Run if called directly +if (require.main === module) { + main().catch((err) => { + console.error("Error:", err.message); + process.exit(1); + }); +} + +module.exports = { + readLogEntries, + showStats, + verifyReferences, +}; diff --git a/scripts/compact-dictionary.js b/scripts/compact-dictionary.js new file mode 100755 index 0000000..d8805c2 --- /dev/null +++ b/scripts/compact-dictionary.js @@ -0,0 +1,204 @@ +#!/usr/bin/env node + +/** + * Compact LLM Audit Dictionary + * + * Removes redundant UPDATE entries from the dictionary file, keeping only: + * - One entry per hash with full content + * - Latest metadata (useCount, lastSeen) + * + * Usage: + * node scripts/compact-dictionary.js [options] + * + * Options: + * --dict-path Path to dictionary file (default: logs/llm-audit-dictionary.jsonl) + * --backup Create backup before compacting (default: true) + * --dry-run Show what would be done without making changes + * --help Show this help message + */ + +const fs = require('fs'); +const path = require('path'); +const readline = require('readline'); + +// Parse command line arguments +function parseArgs() { + const args = process.argv.slice(2); + const options = { + dictPath: 'logs/llm-audit-dictionary.jsonl', + backup: true, + dryRun: false, + }; + + for (let i = 0; i < args.length; i++) { + const arg = args[i]; + switch (arg) { + case '--dict-path': + options.dictPath = args[++i]; + break; + case '--backup': + options.backup = true; + break; + case '--no-backup': + options.backup = false; + break; + case '--dry-run': + options.dryRun = true; + break; + case '--help': + console.log(` +Compact LLM Audit Dictionary + +Removes redundant UPDATE entries from the dictionary file. + +Usage: + node scripts/compact-dictionary.js [options] + +Options: + --dict-path Path to dictionary file (default: logs/llm-audit-dictionary.jsonl) + --backup Create backup before compacting (default: true) + --no-backup Skip creating backup + --dry-run Show what would be done without making changes + --help Show this help message + +Example: + node scripts/compact-dictionary.js --dict-path logs/llm-audit-dictionary.jsonl --dry-run + `); + process.exit(0); + default: + if (arg.startsWith('--')) { + console.error(`Unknown option: ${arg}`); + process.exit(1); + } + } + } + + return options; +} + +// Read and compact dictionary +async function compactDictionary(dictPath) { + if (!fs.existsSync(dictPath)) { + throw new Error(`Dictionary file not found: ${dictPath}`); + } + + console.log(`Reading dictionary: ${dictPath}`); + + // Map: hash -> entry object + // For each hash, we'll keep the latest metadata merged with content + const entries = new Map(); + let totalLines = 0; + let malformedLines = 0; + + // Read all entries + const fileStream = fs.createReadStream(dictPath); + const rl = readline.createInterface({ + input: fileStream, + crlfDelay: Infinity, + }); + + for await (const line of rl) { + totalLines++; + if (!line.trim()) continue; + + try { + const entry = JSON.parse(line); + if (!entry.hash) { + malformedLines++; + continue; + } + + const hash = entry.hash; + + // Check if we already have an entry for this hash + if (entries.has(hash)) { + const existing = entries.get(hash); + + // Merge: keep content from entry that has it, use latest metadata + const merged = { + hash, + firstSeen: existing.firstSeen || entry.firstSeen, + useCount: entry.useCount || existing.useCount, + lastSeen: entry.lastSeen || existing.lastSeen, + content: existing.content || entry.content, + }; + + entries.set(hash, merged); + } else { + // First time seeing this hash + entries.set(hash, entry); + } + } catch (err) { + malformedLines++; + console.warn(`Skipping malformed line ${totalLines}: ${err.message}`); + } + } + + const compactedCount = entries.size; + const removedCount = totalLines - malformedLines - compactedCount; + + return { + entries, + stats: { + totalLines, + malformedLines, + uniqueHashes: compactedCount, + removedEntries: removedCount, + }, + }; +} + +// Write compacted dictionary +async function writeCompactedDictionary(dictPath, entries, backup = true) { + // Create backup if requested + if (backup) { + const backupPath = `${dictPath}.backup.${Date.now()}`; + console.log(`Creating backup: ${backupPath}`); + fs.copyFileSync(dictPath, backupPath); + } + + // Write compacted entries + console.log(`Writing compacted dictionary: ${dictPath}`); + const lines = Array.from(entries.values()).map((entry) => JSON.stringify(entry)); + fs.writeFileSync(dictPath, lines.join('\n') + '\n'); +} + +// Main +async function main() { + try { + const options = parseArgs(); + const dictPath = path.resolve(options.dictPath); + + console.log('=== LLM Audit Dictionary Compaction ===\n'); + + // Read and compact + const { entries, stats } = await compactDictionary(dictPath); + + // Report statistics + console.log('\nCompaction Statistics:'); + console.log(` Total lines in dictionary: ${stats.totalLines}`); + console.log(` Malformed lines skipped: ${stats.malformedLines}`); + console.log(` Unique content hashes: ${stats.uniqueHashes}`); + console.log(` Redundant entries removed: ${stats.removedEntries}`); + + const reductionPercent = + stats.totalLines > 0 + ? ((stats.removedEntries / stats.totalLines) * 100).toFixed(1) + : 0; + console.log(` Size reduction: ${reductionPercent}%\n`); + + if (options.dryRun) { + console.log('DRY RUN: No changes made to dictionary file.'); + console.log(`Would have written ${stats.uniqueHashes} entries.\n`); + } else { + // Write compacted dictionary + await writeCompactedDictionary(dictPath, entries, options.backup); + console.log('✓ Dictionary compaction complete!\n'); + } + } catch (err) { + console.error('Error:', err.message); + process.exit(1); + } +} + +main(); diff --git a/scripts/test-deduplication.js b/scripts/test-deduplication.js new file mode 100755 index 0000000..c6d72bb --- /dev/null +++ b/scripts/test-deduplication.js @@ -0,0 +1,448 @@ +#!/usr/bin/env node + +/** + * Test script for deduplication functionality + * Creates mock log entries and verifies deduplication works correctly + */ + +const fs = require("fs"); +const path = require("path"); +const { ContentDeduplicator } = require("../src/logger/deduplicator"); + +// Test configuration +const TEST_DICT_PATH = path.join(process.cwd(), "logs", "test-dictionary.jsonl"); +const TEST_LOG_PATH = path.join(process.cwd(), "logs", "test-audit.log"); + +// Clean up test files if they exist +function cleanup() { + if (fs.existsSync(TEST_DICT_PATH)) { + fs.unlinkSync(TEST_DICT_PATH); + } + if (fs.existsSync(TEST_LOG_PATH)) { + fs.unlinkSync(TEST_LOG_PATH); + } +} + +// Test 1: Basic deduplication +function testBasicDeduplication() { + console.log("\n=== Test 1: Basic Deduplication ==="); + + const deduplicator = new ContentDeduplicator(TEST_DICT_PATH, { + minSize: 50, // Lower threshold for testing + cacheSize: 10, + }); + + const content1 = "This is a test content that is longer than 50 characters and should be deduplicated."; + const content2 = "This is a test content that is longer than 50 characters and should be deduplicated."; + const content3 = "Short"; + + // First content should be stored + const ref1 = deduplicator.storeContent(content1); + console.log("✓ Stored content1:", ref1); + + // Second identical content should return same reference + const ref2 = deduplicator.storeContent(content2); + console.log("✓ Stored content2 (should be same hash):", ref2); + + // Verify same hash + if (ref1.$ref !== ref2.$ref) { + console.error("✗ FAIL: Different hashes for identical content!"); + return false; + } + console.log("✓ PASS: Identical content produces same hash"); + + // Short content should not be deduplicated (below threshold) + const shouldNotDedup = deduplicator.shouldDeduplicate(content3, 50); + if (shouldNotDedup) { + console.error("✗ FAIL: Short content should not be deduplicated!"); + return false; + } + console.log("✓ PASS: Short content not deduplicated"); + + return true; +} + +// Test 2: Content restoration +function testContentRestoration() { + console.log("\n=== Test 2: Content Restoration ==="); + + const deduplicator = new ContentDeduplicator(TEST_DICT_PATH, { + minSize: 50, + cacheSize: 10, + }); + + const originalContent = "This is original content that needs to be restored from the dictionary file."; + + // Store and get reference + const ref = deduplicator.storeContent(originalContent); + console.log("✓ Stored content with ref:", ref.$ref); + + // Retrieve content + const retrieved = deduplicator.getContent(ref.$ref); + console.log("✓ Retrieved content length:", retrieved?.length); + + // Verify content matches + if (retrieved !== originalContent) { + console.error("✗ FAIL: Retrieved content doesn't match original!"); + console.error("Expected:", originalContent); + console.error("Got:", retrieved); + return false; + } + console.log("✓ PASS: Content restored correctly"); + + return true; +} + +// Test 3: Entry deduplication and restoration +function testEntryProcessing() { + console.log("\n=== Test 3: Entry Deduplication and Restoration ==="); + + const deduplicator = new ContentDeduplicator(TEST_DICT_PATH, { + minSize: 50, + cacheSize: 10, + }); + + const systemPrompt = "You are a helpful AI assistant. This is a long system prompt that should be deduplicated."; + const userMessage = "This is a user message that is long enough to be deduplicated by the deduplication system."; + + const entry = { + type: "llm_request", + correlationId: "test-123", + systemPrompt: systemPrompt, + userMessages: userMessage, + model: "test-model", + }; + + // Deduplicate entry + const deduplicated = deduplicator.deduplicateEntry(entry, ["systemPrompt", "userMessages"]); + console.log("✓ Deduplicated entry:", JSON.stringify(deduplicated, null, 2)); + + // Verify fields are now references + if (typeof deduplicated.systemPrompt !== "object" || !deduplicated.systemPrompt.$ref) { + console.error("✗ FAIL: systemPrompt was not deduplicated!"); + return false; + } + if (typeof deduplicated.userMessages !== "object" || !deduplicated.userMessages.$ref) { + console.error("✗ FAIL: userMessages was not deduplicated!"); + return false; + } + console.log("✓ PASS: Fields converted to references"); + + // Restore entry + const restored = deduplicator.restoreEntry(deduplicated); + console.log("✓ Restored entry keys:", Object.keys(restored)); + + // Verify restoration + if (restored.systemPrompt !== systemPrompt) { + console.error("✗ FAIL: systemPrompt not restored correctly!"); + console.error("Expected:", systemPrompt); + console.error("Got:", restored.systemPrompt); + return false; + } + if (restored.userMessages !== userMessage) { + console.error("✗ FAIL: userMessages not restored correctly!"); + console.error("Expected:", userMessage); + console.error("Got:", restored.userMessages); + return false; + } + console.log("✓ PASS: Entry restored correctly"); + + return true; +} + +// Test 4: Dictionary persistence +function testDictionaryPersistence() { + console.log("\n=== Test 4: Dictionary Persistence ==="); + + // Create first deduplicator and store content + const deduplicator1 = new ContentDeduplicator(TEST_DICT_PATH, { + minSize: 50, + cacheSize: 10, + }); + + const content = "This is test content for persistence verification across deduplicator instances."; + const ref = deduplicator1.storeContent(content); + console.log("✓ Stored content with first deduplicator:", ref.$ref); + + // Wait for async write to complete + setTimeout(() => { + // Create second deduplicator (should load from dictionary) + const deduplicator2 = new ContentDeduplicator(TEST_DICT_PATH, { + minSize: 50, + cacheSize: 10, + }); + + // Try to retrieve with second deduplicator + const retrieved = deduplicator2.getContent(ref.$ref); + + if (retrieved !== content) { + console.error("✗ FAIL: Content not persisted to dictionary!"); + console.error("Expected:", content); + console.error("Got:", retrieved); + return false; + } + console.log("✓ PASS: Dictionary persisted and loaded correctly"); + + // Show dictionary stats + const stats = deduplicator2.getStats(); + console.log("\nDeduplication Stats:"); + console.log(` Cache size: ${stats.cacheSize}`); + console.log(` Unique blocks: ${stats.uniqueContentBlocks}`); + console.log(` Total references: ${stats.totalReferences}`); + + return true; + }, 100); +} + +// Test 5: Size calculation and verification +function testSizeCalculation() { + console.log("\n=== Test 5: Size Calculation ==="); + + const deduplicator = new ContentDeduplicator(TEST_DICT_PATH, { + minSize: 50, + cacheSize: 10, + }); + + const content = "This is a test content string that will be deduplicated and have its size calculated."; + const ref = deduplicator.storeContent(content); + + console.log("✓ Content length:", content.length); + console.log("✓ Reference size field:", ref.size); + + if (ref.size !== content.length) { + console.error("✗ FAIL: Size mismatch!"); + return false; + } + console.log("✓ PASS: Size calculated correctly"); + + // Calculate space saved + const refSize = JSON.stringify(ref).length; + const originalSize = content.length; + const saved = originalSize - refSize; + const savedPercent = ((saved / originalSize) * 100).toFixed(1); + + console.log(`\nSpace saved: ${saved} bytes (${savedPercent}%)`); + console.log(` Original: ${originalSize} bytes`); + console.log(` Reference: ${refSize} bytes`); + + return true; +} + +// Test 6: Content sanitization (empty User: entries removal) +function testContentSanitization() { + console.log("\n=== Test 6: Content Sanitization (Empty User: Removal) ==="); + + const deduplicator = new ContentDeduplicator(TEST_DICT_PATH, { + minSize: 50, + cacheSize: 10, + sanitize: true, // Enable sanitization + }); + + // Content with multiple empty "User:" entries + const dirtyContent = `Claude: I'll implement... + +User: + +Claude: Now I'll implement... + +User: + +User: + +User: + +Respond with the title for the conversation and nothing else.`; + + console.log("✓ Original content length:", dirtyContent.length); + console.log("✓ Empty 'User:' entries in original:", (dirtyContent.match(/User:\s*\n/g) || []).length); + + // Store the content (should be sanitized internally) + const ref = deduplicator.storeContent(dirtyContent); + console.log("✓ Stored content with ref:", ref.$ref); + + // Retrieve it back + const retrieved = deduplicator.getContent(ref.$ref); + console.log("✓ Retrieved content length:", retrieved?.length); + + // Count empty "User:" entries in retrieved content + // Pattern: "User:" followed by newline(s) and then "Claude:" or another "User:" or end + const emptyUserMatches = retrieved.match(/User:\s*\n+(?=(Claude:|User:|$))/g) || []; + console.log("✓ Empty 'User:' entries in retrieved:", emptyUserMatches.length); + + // Verify empty User: entries were removed + if (emptyUserMatches.length > 0) { + console.error("✗ FAIL: Empty User: entries not removed!"); + console.error("Retrieved content:", retrieved); + return false; + } + + // Verify content still contains Claude: entries + if (!retrieved.includes("Claude:")) { + console.error("✗ FAIL: Claude: entries were incorrectly removed!"); + return false; + } + + // Verify the last line is preserved + if (!retrieved.includes("Respond with the title")) { + console.error("✗ FAIL: Content was over-sanitized!"); + return false; + } + + console.log("✓ PASS: Empty User: entries removed, content preserved"); + + // Test with sanitization disabled + const dedupNoSanitize = new ContentDeduplicator(TEST_DICT_PATH, { + minSize: 50, + cacheSize: 10, + sanitize: false, // Disable sanitization + }); + + const refNoSanitize = dedupNoSanitize.storeContent(dirtyContent); + const retrievedNoSanitize = dedupNoSanitize.getContent(refNoSanitize.$ref); + + // Should have empty User: entries when sanitization is disabled + const emptyUserNoSanitize = retrievedNoSanitize.match(/User:\s*\n+(?=(Claude:|User:|$))/g) || []; + if (emptyUserNoSanitize.length === 0) { + console.error("✗ FAIL: Content was sanitized even with sanitize=false!"); + return false; + } + + console.log("✓ PASS: Sanitization can be disabled"); + + return true; +} + +// Test 7: Content sanitization preserves non-empty User: entries +function testSanitizationPreservesContent() { + console.log("\n=== Test 7: Sanitization Preserves Non-Empty User: Entries ==="); + + const deduplicator = new ContentDeduplicator(TEST_DICT_PATH, { + minSize: 50, + cacheSize: 10, + sanitize: true, + }); + + // Content with both empty and non-empty User: entries + const mixedContent = `Claude: I'll help you. + +User: Can you explain this? + +Claude: Sure, here's the explanation. + +User: + +User: Another question here. + +Claude: Here's the answer.`; + + console.log("✓ Original has both empty and non-empty User: entries"); + + const ref = deduplicator.storeContent(mixedContent); + const retrieved = deduplicator.getContent(ref.$ref); + + // Check that non-empty User: entries are preserved + if (!retrieved.includes("User: Can you explain this?")) { + console.error("✗ FAIL: Non-empty User: entry was removed!"); + return false; + } + + if (!retrieved.includes("User: Another question here.")) { + console.error("✗ FAIL: Non-empty User: entry was removed!"); + return false; + } + + // Check that empty User: entries are removed + const lines = retrieved.split('\n'); + let hasEmptyUser = false; + for (let i = 0; i < lines.length; i++) { + const line = lines[i].trim(); + if (line === 'User:' || line === 'User: ') { + const nextLine = i + 1 < lines.length ? lines[i + 1].trim() : ''; + if (nextLine === '' || nextLine === 'Claude:' || nextLine === 'User:') { + hasEmptyUser = true; + break; + } + } + } + + if (hasEmptyUser) { + console.error("✗ FAIL: Empty User: entries not removed from mixed content!"); + return false; + } + + console.log("✓ PASS: Non-empty User: entries preserved, empty ones removed"); + + return true; +} + +// Main test runner +async function runTests() { + console.log("=".repeat(60)); + console.log("LLM Audit Log Deduplication Test Suite"); + console.log("=".repeat(60)); + + // Clean up before tests + cleanup(); + + const tests = [ + testBasicDeduplication, + testContentRestoration, + testEntryProcessing, + testSizeCalculation, + testContentSanitization, + testSanitizationPreservesContent, + ]; + + let passed = 0; + let failed = 0; + + for (const test of tests) { + try { + const result = test(); + if (result) { + passed++; + } else { + failed++; + } + } catch (err) { + console.error(`✗ Test failed with error: ${err.message}`); + console.error(err.stack); + failed++; + } + } + + // Run async test separately + setTimeout(() => { + testDictionaryPersistence(); + + console.log("\n" + "=".repeat(60)); + console.log("Test Results"); + console.log("=".repeat(60)); + console.log(`Passed: ${passed}/${passed + failed}`); + console.log(`Failed: ${failed}/${passed + failed}`); + + if (failed === 0) { + console.log("\n✓ All tests passed!"); + console.log("\nDictionary file created at:", TEST_DICT_PATH); + console.log("You can inspect it with: cat", TEST_DICT_PATH); + } else { + console.log("\n✗ Some tests failed!"); + process.exit(1); + } + + // Clean up after tests + console.log("\nCleaning up test files..."); + cleanup(); + console.log("✓ Test files removed"); + }, 200); +} + +// Run tests +if (require.main === module) { + runTests().catch((err) => { + console.error("Test suite failed:", err); + process.exit(1); + }); +} + +module.exports = { runTests }; diff --git a/src/api/middleware/session.js b/src/api/middleware/session.js index 84103ec..f19c58a 100644 --- a/src/api/middleware/session.js +++ b/src/api/middleware/session.js @@ -1,5 +1,6 @@ const crypto = require("crypto"); const { getOrCreateSession } = require("../../sessions/store"); +const logger = require("../../logger"); const PRIMARY_HEADER = "x-session-id"; const FALLBACK_HEADERS = [ @@ -41,6 +42,9 @@ function sessionMiddleware(req, res, next) { const sessionId = extractSessionId(req); req.sessionId = sessionId; + // Add sessionId to logger context for this request + req.log = logger.child({ sessionId }); + const session = getOrCreateSession(sessionId); req.session = session; return next(); diff --git a/src/config/index.js b/src/config/index.js index dbe3df6..8bf97af 100644 --- a/src/config/index.js +++ b/src/config/index.js @@ -89,6 +89,114 @@ const ollamaTimeout = Number.parseInt(process.env.OLLAMA_TIMEOUT_MS ?? "120000", const ollamaEmbeddingsEndpoint = process.env.OLLAMA_EMBEDDINGS_ENDPOINT ?? `${ollamaEndpoint}/api/embeddings`; const ollamaEmbeddingsModel = process.env.OLLAMA_EMBEDDINGS_MODEL ?? "nomic-embed-text"; +// Ollama cluster configuration +function loadOllamaClusterConfig() { + const configPath = process.env.OLLAMA_CLUSTER_CONFIG + ?? path.join(process.cwd(), ".lynkr", "ollama-cluster.json"); + + try { + const fs = require("fs"); + if (!fs.existsSync(configPath)) { + return null; // No cluster config, use single-host mode + } + + const configFile = fs.readFileSync(configPath, "utf8"); + const config = JSON.parse(configFile); + + // Validate configuration + if (!config.enabled) { + return null; // Cluster disabled + } + + const errors = validateOllamaClusterConfig(config); + if (errors.length > 0) { + throw new Error(`Ollama cluster configuration errors:\n${errors.join("\n")}`); + } + + return config; + } catch (err) { + if (err.code === "ENOENT") { + return null; // File doesn't exist, use single-host mode + } + throw new Error(`Failed to load Ollama cluster config from ${configPath}: ${err.message}`); + } +} + +function validateOllamaClusterConfig(config) { + const errors = []; + + if (!config.hosts || !Array.isArray(config.hosts) || config.hosts.length === 0) { + errors.push("At least one Ollama host must be configured in 'hosts' array"); + return errors; // Return early if no hosts + } + + const seenIds = new Set(); + const seenEndpoints = new Set(); + + config.hosts.forEach((host, idx) => { + // Check required fields + if (!host.endpoint) { + errors.push(`Host ${idx}: 'endpoint' is required`); + } else { + // Check URL format + try { + new URL(host.endpoint); + } catch { + errors.push(`Host ${idx}: invalid endpoint URL "${host.endpoint}"`); + } + + // Check for duplicate endpoints + if (seenEndpoints.has(host.endpoint)) { + errors.push(`Host ${idx}: duplicate endpoint "${host.endpoint}"`); + } + seenEndpoints.add(host.endpoint); + } + + // Check ID + if (!host.id) { + errors.push(`Host ${idx}: 'id' is required`); + } else if (seenIds.has(host.id)) { + errors.push(`Host ${idx}: duplicate ID "${host.id}"`); + } else { + seenIds.add(host.id); + } + + // Validate ranges + if (host.weight !== undefined && host.weight < 1) { + errors.push(`Host ${host.id || idx}: weight must be >= 1 (got ${host.weight})`); + } + + if (host.maxConcurrent !== undefined && host.maxConcurrent < 1) { + errors.push(`Host ${host.id || idx}: maxConcurrent must be >= 1 (got ${host.maxConcurrent})`); + } + + if (host.timeout !== undefined && host.timeout < 1000) { + errors.push(`Host ${host.id || idx}: timeout must be >= 1000ms (got ${host.timeout})`); + } + }); + + // Validate load balancing strategy + const validStrategies = [ + "round-robin", + "weighted-round-robin", + "least-connections", + "response-time-weighted", + "model-based", + "random" + ]; + + if (config.loadBalancing && !validStrategies.includes(config.loadBalancing)) { + errors.push( + `Invalid loadBalancing strategy "${config.loadBalancing}". ` + + `Must be one of: ${validStrategies.join(", ")}` + ); + } + + return errors; +} + +const ollamaClusterConfig = loadOllamaClusterConfig(); + // OpenRouter configuration const openRouterApiKey = process.env.OPENROUTER_API_KEY ?? null; const openRouterModel = process.env.OPENROUTER_MODEL ?? "openai/gpt-4o-mini"; @@ -111,8 +219,8 @@ const openAIOrganization = process.env.OPENAI_ORGANIZATION?.trim() || null; const llamacppEndpoint = process.env.LLAMACPP_ENDPOINT?.trim() || "http://localhost:8080"; const llamacppModel = process.env.LLAMACPP_MODEL?.trim() || "default"; const llamacppTimeout = Number.parseInt(process.env.LLAMACPP_TIMEOUT_MS ?? "120000", 10); -const llamacppApiKey = process.env.LLAMACPP_API_KEY?.trim() || null; const llamacppEmbeddingsEndpoint = process.env.LLAMACPP_EMBEDDINGS_ENDPOINT?.trim() || `${llamacppEndpoint}/embeddings`; +const llamacppApiKey = process.env.LLAMACPP_API_KEY?.trim() || null; // LM Studio configuration const lmstudioEndpoint = process.env.LMSTUDIO_ENDPOINT?.trim() || "http://localhost:1234"; @@ -134,10 +242,6 @@ const zaiModel = process.env.ZAI_MODEL?.trim() || "GLM-4.7"; const vertexApiKey = process.env.VERTEX_API_KEY?.trim() || process.env.GOOGLE_API_KEY?.trim() || null; const vertexModel = process.env.VERTEX_MODEL?.trim() || "gemini-2.0-flash"; -// Hot reload configuration -const hotReloadEnabled = process.env.HOT_RELOAD_ENABLED !== "false"; // default true -const hotReloadDebounceMs = Number.parseInt(process.env.HOT_RELOAD_DEBOUNCE_MS ?? "1000", 10); - // Hybrid routing configuration const preferOllama = process.env.PREFER_OLLAMA === "true"; const fallbackEnabled = process.env.FALLBACK_ENABLED !== "false"; // default true @@ -163,14 +267,61 @@ if (!SUPPORTED_MODEL_PROVIDERS.has(rawFallbackProvider)) { const fallbackProvider = rawFallbackProvider; -// Tool execution mode: server (default), client, or passthrough +// Tool execution mode: server (default), client, passthrough, local, or synthetic const toolExecutionMode = (process.env.TOOL_EXECUTION_MODE ?? "server").toLowerCase(); -if (!["server", "client", "passthrough"].includes(toolExecutionMode)) { +if (!["server", "client", "passthrough", "local", "synthetic"].includes(toolExecutionMode)) { throw new Error( - "TOOL_EXECUTION_MODE must be one of: server, client, passthrough (default: server)" + "TOOL_EXECUTION_MODE must be one of: server, client, passthrough, local, synthetic (default: server)" ); } +// Pattern B Configuration (Lynkr on localhost) +const deploymentMode = (process.env.DEPLOYMENT_MODE ?? "pattern-a").toLowerCase(); +const patternBEnabled = deploymentMode === "pattern-b" || toolExecutionMode === "local" || toolExecutionMode === "synthetic"; + +// Local tool execution settings (Pattern B) +const localToolsEnabled = toolExecutionMode === "local" || toolExecutionMode === "synthetic"; +const localToolsAllowedOperations = parseList( + process.env.LOCAL_TOOLS_ALLOWED_OPERATIONS ?? "readFile,writeFile,listDirectory,executeCommand,searchCode" +); +const localToolsRestrictedPaths = parseList( + process.env.LOCAL_TOOLS_RESTRICTED_PATHS ?? "/etc,/sys,/proc,/root,~/.ssh,~/.gnupg" +); +const localToolsMaxFileSize = Number.parseInt( + process.env.LOCAL_TOOLS_MAX_FILE_SIZE ?? "10485760", // 10MB default + 10 +); +const localToolsCommandTimeout = Number.parseInt( + process.env.LOCAL_TOOLS_COMMAND_TIMEOUT ?? "30000", // 30s default + 10 +); + +// Synthetic mode settings (Pattern B) +const syntheticModeEnabled = toolExecutionMode === "synthetic"; +const syntheticModePatterns = process.env.SYNTHETIC_MODE_PATTERNS + ? parseJson(process.env.SYNTHETIC_MODE_PATTERNS) + : { + readFile: ["read.*file", "show.*contents", "display.*file"], + writeFile: ["write.*to.*file", "save.*to.*file", "create.*file"], + listDirectory: ["list.*files", "show.*directory", "ls.*"], + executeCommand: ["run.*command", "execute.*"], + searchCode: ["search.*for", "find.*in.*files", "grep.*"] + }; + +// Remote Ollama configuration (Pattern B) +// When Lynkr runs on localhost but Ollama runs on GPU server +const remoteOllamaEnabled = patternBEnabled; +const remoteOllamaEndpoint = process.env.REMOTE_OLLAMA_ENDPOINT ?? ollamaEndpoint; +const remoteOllamaConnectionPooling = process.env.REMOTE_OLLAMA_CONNECTION_POOLING !== "false"; // default true +const remoteOllamaMaxConnections = Number.parseInt( + process.env.REMOTE_OLLAMA_MAX_CONNECTIONS ?? "10", + 10 +); +const remoteOllamaTimeout = Number.parseInt( + process.env.REMOTE_OLLAMA_TIMEOUT ?? "300000", // 5 minutes + 10 +); + // Memory system configuration (Titans-inspired long-term memory) const memoryEnabled = process.env.MEMORY_ENABLED !== "false"; // default true const memoryRetrievalLimit = Number.parseInt(process.env.MEMORY_RETRIEVAL_LIMIT ?? "5", 10); @@ -452,6 +603,33 @@ const agentsDefaultModel = process.env.AGENTS_DEFAULT_MODEL ?? "haiku"; const agentsMaxSteps = Number.parseInt(process.env.AGENTS_MAX_STEPS ?? "15", 10); const agentsTimeout = Number.parseInt(process.env.AGENTS_TIMEOUT ?? "120000", 10); +// LLM Audit logging configuration +const auditEnabled = process.env.LLM_AUDIT_ENABLED === "true"; // default false +const auditLogFile = process.env.LLM_AUDIT_LOG_FILE ?? path.join(process.cwd(), "logs", "llm-audit.log"); +const auditMaxContentLength = Number.parseInt(process.env.LLM_AUDIT_MAX_CONTENT_LENGTH ?? "5000", 10); // Legacy fallback +const auditMaxSystemLength = Number.parseInt(process.env.LLM_AUDIT_MAX_SYSTEM_LENGTH ?? "2000", 10); +const auditMaxUserLength = Number.parseInt(process.env.LLM_AUDIT_MAX_USER_LENGTH ?? "3000", 10); +const auditMaxResponseLength = Number.parseInt(process.env.LLM_AUDIT_MAX_RESPONSE_LENGTH ?? "3000", 10); +const auditMaxFiles = Number.parseInt(process.env.LLM_AUDIT_MAX_FILES ?? "30", 10); +const auditMaxSize = process.env.LLM_AUDIT_MAX_SIZE ?? "100M"; +const auditAnnotations = process.env.LLM_AUDIT_ANNOTATIONS !== "false"; // default true + +// LLM Audit deduplication configuration +const auditDeduplicationEnabled = process.env.LLM_AUDIT_DEDUP_ENABLED !== "false"; // default true +const auditDeduplicationDictPath = + process.env.LLM_AUDIT_DEDUP_DICT_PATH ?? path.join(process.cwd(), "logs", "llm-audit-dictionary.jsonl"); +const auditDeduplicationMinSize = Number.parseInt(process.env.LLM_AUDIT_DEDUP_MIN_SIZE ?? "500", 10); +const auditDeduplicationCacheSize = Number.parseInt(process.env.LLM_AUDIT_DEDUP_CACHE_SIZE ?? "100", 10); +const auditDeduplicationSanitize = process.env.LLM_AUDIT_DEDUP_SANITIZE !== "false"; // default true +const auditDeduplicationSessionCache = process.env.LLM_AUDIT_DEDUP_SESSION_CACHE !== "false"; // default true + +// Oversized Error Logging Configuration +const oversizedErrorLoggingEnabled = process.env.OVERSIZED_ERROR_LOGGING_ENABLED !== "false"; // default true +const oversizedErrorThreshold = Number.parseInt(process.env.OVERSIZED_ERROR_THRESHOLD ?? "200", 10); +const oversizedErrorLogDir = + process.env.OVERSIZED_ERROR_LOG_DIR ?? path.join(process.cwd(), "logs", "oversized-errors"); +const oversizedErrorMaxFiles = Number.parseInt(process.env.OVERSIZED_ERROR_MAX_FILES ?? "100", 10); + const config = { env: process.env.NODE_ENV ?? "development", port: Number.isNaN(port) ? 8080 : port, @@ -472,6 +650,7 @@ const config = { timeout: Number.isNaN(ollamaTimeout) ? 120000 : ollamaTimeout, embeddingsEndpoint: ollamaEmbeddingsEndpoint, embeddingsModel: ollamaEmbeddingsModel, + cluster: ollamaClusterConfig, // null if cluster not configured }, openrouter: { apiKey: openRouterApiKey, @@ -495,8 +674,8 @@ const config = { endpoint: llamacppEndpoint, model: llamacppModel, timeout: Number.isNaN(llamacppTimeout) ? 120000 : llamacppTimeout, - apiKey: llamacppApiKey, embeddingsEndpoint: llamacppEmbeddingsEndpoint, + apiKey: llamacppApiKey, }, lmstudio: { endpoint: lmstudioEndpoint, @@ -518,10 +697,6 @@ const config = { apiKey: vertexApiKey, model: vertexModel, }, - hotReload: { - enabled: hotReloadEnabled, - debounceMs: Number.isNaN(hotReloadDebounceMs) ? 1000 : hotReloadDebounceMs, - }, modelProvider: { type: modelProvider, defaultModel, @@ -533,6 +708,105 @@ const config = { fallbackProvider, }, toolExecutionMode, + patternB: { + enabled: patternBEnabled, + deploymentMode, + localTools: { + enabled: localToolsEnabled, + allowedOperations: localToolsAllowedOperations, + restrictedPaths: localToolsRestrictedPaths, + maxFileSize: localToolsMaxFileSize, + commandTimeout: localToolsCommandTimeout, + }, + syntheticMode: { + enabled: syntheticModeEnabled, + patterns: syntheticModePatterns, + }, + remoteOllama: { + enabled: remoteOllamaEnabled, + endpoint: remoteOllamaEndpoint, + connectionPooling: remoteOllamaConnectionPooling, + maxConnections: remoteOllamaMaxConnections, + timeout: remoteOllamaTimeout, + }, + }, + gpuDiscovery: { + enabled: process.env.GPU_DISCOVERY_ENABLED !== "false", // default true + probe_on_startup: process.env.GPU_DISCOVERY_PROBE_ON_STARTUP !== "false", // default true + probe_local: process.env.GPU_DISCOVERY_PROBE_LOCAL !== "false", // default true + health_check_interval_seconds: Number.parseInt( + process.env.GPU_DISCOVERY_HEALTH_CHECK_INTERVAL ?? "300", // 5 minutes + 10 + ), + nvidia_smi_timeout_ms: Number.parseInt( + process.env.GPU_DISCOVERY_NVIDIA_SMI_TIMEOUT ?? "5000", + 10 + ), + cache_path: process.env.GPU_DISCOVERY_CACHE_PATH || "/tmp/lynkr_gpu_inventory.json", + detection_method: (process.env.GPU_DISCOVERY_METHOD ?? "auto").toLowerCase(), // "auto" | "ollama-api" | "nvidia-smi" + ssh_config: { + enabled: process.env.GPU_DISCOVERY_SSH_ENABLED === "true", + user: process.env.GPU_DISCOVERY_SSH_USER || null, + key_path: process.env.GPU_DISCOVERY_SSH_KEY_PATH || "~/.ssh/id_rsa", + }, + }, + gpuOrchestration: { + enabled: process.env.GPU_ORCHESTRATION_ENABLED !== "false", // default true + auto_profile_new_models: process.env.GPU_ORCHESTRATION_AUTO_PROFILE !== "false", // default true + model_profiles_path: process.env.GPU_ORCHESTRATION_PROFILES_PATH || "/tmp/ollama_model_profiles.json", + }, + taskModels: { + // User can override via ~/.lynkr/config.json or environment + // Format: TASK_MODELS__PRIMARY, TASK_MODELS__FALLBACK + planning: { + primary: process.env.TASK_MODELS_PLANNING_PRIMARY || "qwen2.5:14b", + fallback: process.env.TASK_MODELS_PLANNING_FALLBACK || "llama3.1:8b", + }, + coding: { + primary: process.env.TASK_MODELS_CODING_PRIMARY || "qwen2.5-coder:32b", + fallback: process.env.TASK_MODELS_CODING_FALLBACK || "qwen2.5-coder:7b", + }, + debugging: { + primary: process.env.TASK_MODELS_DEBUGGING_PRIMARY || "deepseek-coder-v2:16b", + fallback: process.env.TASK_MODELS_DEBUGGING_FALLBACK || "llama3.1:8b", + }, + refactoring: { + primary: process.env.TASK_MODELS_REFACTORING_PRIMARY || "qwen2.5-coder:32b", + fallback: process.env.TASK_MODELS_REFACTORING_FALLBACK || "qwen2.5:14b", + }, + documentation: { + primary: process.env.TASK_MODELS_DOCUMENTATION_PRIMARY || "llama3.2:11b", + fallback: process.env.TASK_MODELS_DOCUMENTATION_FALLBACK || "llama3.1:8b", + }, + testing: { + primary: process.env.TASK_MODELS_TESTING_PRIMARY || "qwen2.5-coder:14b", + fallback: process.env.TASK_MODELS_TESTING_FALLBACK || "llama3.1:8b", + }, + review: { + primary: process.env.TASK_MODELS_REVIEW_PRIMARY || "qwen2.5:14b", + fallback: process.env.TASK_MODELS_REVIEW_FALLBACK || "llama3.1:8b", + }, + analysis: { + primary: process.env.TASK_MODELS_ANALYSIS_PRIMARY || "qwen2.5:14b", + fallback: process.env.TASK_MODELS_ANALYSIS_FALLBACK || "llama3.1:8b", + }, + general: { + primary: process.env.TASK_MODELS_GENERAL_PRIMARY || "llama3.1:8b", + fallback: process.env.TASK_MODELS_GENERAL_FALLBACK || "llama3.1:8b", + }, + }, + palMcp: { + enabled: process.env.PAL_MCP_ENABLED === "true", + serverPath: process.env.PAL_MCP_SERVER_PATH || path.join(__dirname, "../../external/pal-mcp-server"), + pythonPath: process.env.PAL_MCP_PYTHON_PATH || "python3", + autoStart: process.env.PAL_MCP_AUTO_START !== "false", // default true if enabled + // Orchestration settings (for avoiding expensive cloud fallback) + useForComplexRequests: process.env.PAL_MCP_USE_FOR_COMPLEX_REQUESTS !== "false", // default true if enabled + maxToolsBeforeOrchestration: Number.parseInt( + process.env.PAL_MCP_MAX_TOOLS_BEFORE_ORCHESTRATION ?? "3", + 10 + ), + }, server: { jsonLimit: process.env.REQUEST_JSON_LIMIT ?? "1gb", }, @@ -688,48 +962,48 @@ const config = { tokenBudget: smartToolSelectionTokenBudget, minimalMode: false, // HARDCODED - disabled }, + security: { + // Content filtering + contentFilterEnabled: process.env.SECURITY_CONTENT_FILTER_ENABLED !== "false", // default true + blockOnDetection: process.env.SECURITY_BLOCK_ON_DETECTION !== "false", // default true + + // Rate limiting + rateLimitEnabled: process.env.SECURITY_RATE_LIMIT_ENABLED !== "false", // default true + perIpLimit: Number.parseInt(process.env.SECURITY_PER_IP_LIMIT ?? "100", 10), // requests per minute + perEndpointLimit: Number.parseInt(process.env.SECURITY_PER_ENDPOINT_LIMIT ?? "1000", 10), // requests per minute + + // Audit logging + auditLogEnabled: process.env.SECURITY_AUDIT_LOG_ENABLED !== "false", // default true + auditLogDir: process.env.SECURITY_AUDIT_LOG_DIR ?? path.join(process.cwd(), "logs"), + }, + audit: { + enabled: auditEnabled, + logFile: auditLogFile, + maxContentLength: { + systemPrompt: Number.isNaN(auditMaxSystemLength) ? 2000 : auditMaxSystemLength, + userMessages: Number.isNaN(auditMaxUserLength) ? 3000 : auditMaxUserLength, + response: Number.isNaN(auditMaxResponseLength) ? 3000 : auditMaxResponseLength, + }, + annotations: auditAnnotations, + rotation: { + maxFiles: Number.isNaN(auditMaxFiles) ? 30 : auditMaxFiles, + maxSize: auditMaxSize, + }, + deduplication: { + enabled: auditDeduplicationEnabled, + dictionaryPath: auditDeduplicationDictPath, + minSize: Number.isNaN(auditDeduplicationMinSize) ? 500 : auditDeduplicationMinSize, + cacheSize: Number.isNaN(auditDeduplicationCacheSize) ? 100 : auditDeduplicationCacheSize, + sanitize: auditDeduplicationSanitize, + sessionCache: auditDeduplicationSessionCache, + }, + }, + oversizedErrorLogging: { + enabled: oversizedErrorLoggingEnabled, + threshold: oversizedErrorThreshold, + logDir: oversizedErrorLogDir, + maxFiles: oversizedErrorMaxFiles, + }, }; -/** - * Reload configuration from environment - * Called by hot reload watcher when .env changes - */ -function reloadConfig() { - // Re-parse .env file - dotenv.config({ override: true }); - - // Update mutable config values (those that can safely change at runtime) - // API keys and endpoints - config.databricks.apiKey = process.env.DATABRICKS_API_KEY; - config.azureAnthropic.apiKey = process.env.AZURE_ANTHROPIC_API_KEY ?? null; - config.ollama.model = process.env.OLLAMA_MODEL ?? "qwen2.5-coder:7b"; - config.openrouter.apiKey = process.env.OPENROUTER_API_KEY ?? null; - config.openrouter.model = process.env.OPENROUTER_MODEL ?? "openai/gpt-4o-mini"; - config.azureOpenAI.apiKey = process.env.AZURE_OPENAI_API_KEY?.trim() || null; - config.openai.apiKey = process.env.OPENAI_API_KEY?.trim() || null; - config.bedrock.apiKey = process.env.AWS_BEDROCK_API_KEY?.trim() || null; - config.zai.apiKey = process.env.ZAI_API_KEY?.trim() || null; - config.zai.model = process.env.ZAI_MODEL?.trim() || "GLM-4.7"; - config.vertex.apiKey = process.env.VERTEX_API_KEY?.trim() || process.env.GOOGLE_API_KEY?.trim() || null; - config.vertex.model = process.env.VERTEX_MODEL?.trim() || "gemini-2.0-flash"; - - // Model provider settings - const newProvider = (process.env.MODEL_PROVIDER ?? "databricks").toLowerCase(); - if (SUPPORTED_MODEL_PROVIDERS.has(newProvider)) { - config.modelProvider.type = newProvider; - } - config.modelProvider.preferOllama = process.env.PREFER_OLLAMA === "true"; - config.modelProvider.fallbackEnabled = process.env.FALLBACK_ENABLED !== "false"; - config.modelProvider.fallbackProvider = (process.env.FALLBACK_PROVIDER ?? "databricks").toLowerCase(); - - // Log level - config.logger.level = process.env.LOG_LEVEL ?? "info"; - - console.log("[CONFIG] Configuration reloaded from environment"); - return config; -} - -// Make config mutable for hot reload -config.reloadConfig = reloadConfig; - module.exports = config; diff --git a/src/logger/audit-logger.js b/src/logger/audit-logger.js new file mode 100644 index 0000000..56b2830 --- /dev/null +++ b/src/logger/audit-logger.js @@ -0,0 +1,609 @@ +const pino = require("pino"); +const path = require("path"); +const fs = require("fs"); +const { ContentDeduplicator } = require("./deduplicator"); + +/** + * LLM Audit Logger + * + * Dedicated logger for capturing LLM request/response audit trails. + * Logs to a separate file for easy parsing, searching, and compliance. + * + * Log Entry Types: + * - llm_request: User messages sent to LLM providers + * - llm_response: LLM responses received from providers + * + * Key Features: + * - Separate log file (llm-audit.log) for easy parsing + * - Correlation IDs to link requests with responses + * - Network destination tracking (IP, hostname, URL) + * - Content truncation to control log size + * - Async writes for minimal latency impact + * - Daily log rotation with configurable retention + */ + +/** + * Create audit logger instance + * @param {Object} config - Audit configuration + * @returns {Object} Pino logger instance + */ +function createAuditLogger(config) { + // Ensure log directory exists + const logDir = path.dirname(config.logFile); + if (!fs.existsSync(logDir)) { + fs.mkdirSync(logDir, { recursive: true }); + } + + // Create dedicated pino instance for audit logs + const auditLogger = pino( + { + level: "info", // Always log at info level for compliance + name: "llm-audit", + base: null, // Don't include pid/hostname to keep logs clean + timestamp: pino.stdTimeFunctions.isoTime, + formatters: { + level: (label) => { + return { level: label }; + }, + }, + }, + pino.destination({ + dest: config.logFile, + sync: false, // Async writes for performance + mkdir: true, + }) + ); + + return auditLogger; +} + +/** + * Truncate content if it exceeds max length + * @param {string|Array|Object} content - Content to truncate + * @param {number} maxLength - Maximum length (0 = no truncation) + * @returns {Object} { content, truncated, originalLength } + */ +function truncateContent(content, maxLength) { + if (maxLength === 0) { + return { content, truncated: false, originalLength: null }; + } + + // Handle different content types + let stringContent; + if (typeof content === "string") { + stringContent = content; + } else if (Array.isArray(content)) { + stringContent = JSON.stringify(content); + } else if (typeof content === "object" && content !== null) { + stringContent = JSON.stringify(content); + } else { + return { content, truncated: false, originalLength: null }; + } + + const originalLength = stringContent.length; + + if (originalLength <= maxLength) { + return { content, truncated: false, originalLength }; + } + + // Truncate and add indicator + const truncated = stringContent.substring(0, maxLength); + const indicator = `... [truncated, ${originalLength - maxLength} chars omitted]`; + + // Try to parse back to original type if it was JSON + if (typeof content !== "string") { + try { + return { + content: truncated + indicator, + truncated: true, + originalLength, + }; + } catch { + return { + content: truncated + indicator, + truncated: true, + originalLength, + }; + } + } + + return { + content: truncated + indicator, + truncated: true, + originalLength, + }; +} + +/** + * Hash and truncate content for audit logging + * Hashes the ORIGINAL content before truncation to preserve full content hash + * @param {string|Array|Object} content - Content to hash and truncate + * @param {number} maxLength - Maximum length for truncation (0 = no truncation) + * @param {ContentDeduplicator} deduplicator - Deduplicator instance for hashing + * @returns {Object} { hash, content, truncated, originalLength } + */ +function hashAndTruncate(content, maxLength, deduplicator) { + if (!content) { + return { hash: null, content: null, truncated: false, originalLength: null }; + } + + // Hash the ORIGINAL content before any truncation + const hash = deduplicator ? deduplicator.hashContent(content) : null; + + // Then truncate for display + const truncationResult = truncateContent(content, maxLength); + + return { + hash, + content: truncationResult.content, + truncated: truncationResult.truncated, + originalLength: truncationResult.originalLength, + }; +} + +/** + * Smart truncation for system reminder content + * Keeps first N characters and everything from the LAST tag onwards + * @param {string|Array|Object} content - Content to truncate + * @param {number} prefixLength - Length of prefix to keep (default: 50) + * @returns {Object} { content, truncated, originalLength, charsRemoved } + */ +function truncateSystemReminder(content, prefixLength = 50) { + // Handle different content types + let stringContent; + if (typeof content === "string") { + stringContent = content; + } else if (Array.isArray(content)) { + stringContent = JSON.stringify(content); + } else if (typeof content === "object" && content !== null) { + stringContent = JSON.stringify(content); + } else { + return { content, truncated: false, originalLength: null, charsRemoved: 0 }; + } + + const originalLength = stringContent.length; + + // Find the LAST occurrence of tag + const tagIndex = stringContent.lastIndexOf(""); + + // If tag not found, return unchanged + if (tagIndex === -1) { + return { content, truncated: false, originalLength, charsRemoved: 0 }; + } + + // If tag is within the prefix, don't truncate + if (tagIndex < prefixLength) { + return { content, truncated: false, originalLength, charsRemoved: 0 }; + } + + // Extract prefix and suffix + const prefix = stringContent.substring(0, prefixLength); + const suffix = stringContent.substring(tagIndex); + + // Calculate what would be removed + const charsRemoved = tagIndex - prefixLength; + + // If removal would be insignificant (< 100 chars), don't truncate + if (charsRemoved < 100) { + return { content, truncated: false, originalLength, charsRemoved: 0 }; + } + + // Build truncated content + const truncatedContent = prefix + "..." + suffix; + + return { + content: truncatedContent, + truncated: true, + originalLength, + charsRemoved, + }; +} + +/** + * Hash and apply smart truncation for system reminder content + * Hashes the ORIGINAL content before truncation + * @param {string|Array|Object} content - Content to hash and truncate + * @param {number} prefixLength - Length of prefix to keep (default: 50) + * @param {ContentDeduplicator} deduplicator - Deduplicator instance for hashing + * @returns {Object} { hash, content, truncated, originalLength, charsRemoved } + */ +function hashAndTruncateSystemReminder(content, prefixLength = 50, deduplicator) { + if (!content) { + return { hash: null, content: null, truncated: false, originalLength: null, charsRemoved: 0 }; + } + + // Hash the ORIGINAL content before any truncation + const hash = deduplicator ? deduplicator.hashContent(content) : null; + + // Then apply smart truncation + const truncationResult = truncateSystemReminder(content, prefixLength); + + return { + hash, + content: truncationResult.content, + truncated: truncationResult.truncated, + originalLength: truncationResult.originalLength, + charsRemoved: truncationResult.charsRemoved, + }; +} + +/** + * Extract hostname and port from URL + * @param {string} url - Full URL + * @returns {Object} { hostname, port } + */ +function parseDestinationUrl(url) { + try { + const parsed = new URL(url); + return { + hostname: parsed.hostname, + port: parsed.port || (parsed.protocol === "https:" ? 443 : 80), + protocol: parsed.protocol.replace(":", ""), + }; + } catch { + return { hostname: null, port: null, protocol: null }; + } +} + +/** + * Create audit logger wrapper with convenience methods + * @param {Object} config - Audit configuration from config.js + * @returns {Object} Audit logger interface + */ +function createAuditLoggerWrapper(config) { + if (!config.enabled) { + // Return no-op logger if disabled + return { + logLlmRequest: () => {}, + logLlmResponse: () => {}, + restoreLogEntry: (entry) => entry, + enabled: false, + }; + } + + const logger = createAuditLogger(config); + + // Support both legacy single value and new object format for maxContentLength + const maxContentLength = + typeof config.maxContentLength === 'object' + ? config.maxContentLength + : { + systemPrompt: config.maxContentLength || 5000, + userMessages: config.maxContentLength || 5000, + response: config.maxContentLength || 5000, + }; + + // Initialize deduplicator if enabled + const deduplicator = + config.deduplication?.enabled + ? new ContentDeduplicator(config.deduplication.dictionaryPath, { + minSize: config.deduplication.minSize, + cacheSize: config.deduplication.cacheSize, + sanitize: config.deduplication.sanitize, + sessionCache: config.deduplication.sessionCache, + }) + : null; + + /** + * Log hash annotation line for easy lookup + * @private + * @param {Object} hashes - Hash values to annotate + */ + function logHashAnnotation(hashes) { + if (!config.annotations) { + return; // Skip if annotations disabled + } + + const annotationEntry = { + _annotation: true, + lookup: "Use: node scripts/audit-log-reader.js --hash ", + }; + + // Add any provided hashes + if (hashes.systemPromptHash) { + annotationEntry.systemPromptHash = hashes.systemPromptHash; + } + if (hashes.userMessagesHash) { + annotationEntry.userMessagesHash = hashes.userMessagesHash; + } + if (hashes.userQueryHash) { + annotationEntry.userQueryHash = hashes.userQueryHash; + } + + logger.info(annotationEntry); + } + + return { + /** + * Log LLM request (user message sent to provider) + * @param {Object} context - Request context + */ + logLlmRequest(context) { + const { + correlationId, + sessionId, + provider, + model, + stream, + destinationUrl, + userMessages, + systemPrompt, + tools, + maxTokens, + } = context; + + const { hostname, port, protocol } = parseDestinationUrl(destinationUrl); + + // Hash BEFORE truncate - this ensures we track the original content + // Use specific max lengths for different content types + const hashedMessages = hashAndTruncate(userMessages, maxContentLength.userMessages, deduplicator); + const hashedSystem = systemPrompt + ? hashAndTruncate(systemPrompt, maxContentLength.systemPrompt, deduplicator) + : { hash: null, content: null, truncated: false }; + + // Deduplicate large content if enabled (using original content hash) + // Session-level deduplication: first time outputs truncated content, subsequent times output reference + let finalUserMessages = hashedMessages.content; + let finalSystemPrompt = hashedSystem.content; + + if (deduplicator) { + // Deduplicate userMessages if original content is large enough + if (userMessages && deduplicator.shouldDeduplicate(userMessages)) { + const isFirstTime = deduplicator.isFirstTimeInSession(hashedMessages.hash); + if (isFirstTime) { + // First time: output truncated content, but store in dictionary + deduplicator.storeContentWithHash(userMessages, hashedMessages.hash); + finalUserMessages = hashedMessages.content; // Use truncated content + } else { + // Subsequent times: output only reference + finalUserMessages = deduplicator.storeContentWithHash( + userMessages, + hashedMessages.hash + ); + } + } + + // Deduplicate systemPrompt if original content is large enough + if (systemPrompt && deduplicator.shouldDeduplicate(systemPrompt)) { + const isFirstTime = deduplicator.isFirstTimeInSession(hashedSystem.hash); + if (isFirstTime) { + // First time: output truncated content, but store in dictionary + deduplicator.storeContentWithHash(systemPrompt, hashedSystem.hash); + finalSystemPrompt = hashedSystem.content; // Use truncated content + } else { + // Subsequent times: output only reference + finalSystemPrompt = deduplicator.storeContentWithHash( + systemPrompt, + hashedSystem.hash + ); + } + } + } + + const logEntry = { + type: "llm_request", + correlationId, + sessionId, + provider, + model, + stream: stream || false, + destinationUrl, + destinationHostname: hostname, + destinationPort: port, + protocol, + userMessages: finalUserMessages, + systemPrompt: finalSystemPrompt, + tools: Array.isArray(tools) ? tools : null, + maxTokens: maxTokens || null, + contentTruncated: hashedMessages.truncated || hashedSystem.truncated, + msg: "LLM request initiated", + }; + + // Add original length indicators if truncated + if (hashedMessages.truncated) { + logEntry.userMessagesOriginalLength = hashedMessages.originalLength; + } + if (hashedSystem.truncated) { + logEntry.systemPromptOriginalLength = hashedSystem.originalLength; + } + + logger.info(logEntry); + + // Log hash annotation for easy lookup + logHashAnnotation({ + userMessagesHash: hashedMessages.hash, + systemPromptHash: hashedSystem.hash, + }); + }, + + /** + * Log LLM response (response received from provider) + * @param {Object} context - Response context + */ + logLlmResponse(context) { + const { + correlationId, + sessionId, + provider, + model, + stream, + destinationUrl, + destinationHostname, + destinationIp, + destinationIpFamily, + assistantMessage, + stopReason, + requestTokens, + responseTokens, + latencyMs, + status, + error, + streamingNote, + } = context; + + const { hostname, port, protocol } = parseDestinationUrl(destinationUrl); + + // Truncate response content if needed (but not for streaming) + let truncatedMessage = { content: null, truncated: false }; + if (assistantMessage && !stream) { + truncatedMessage = truncateContent(assistantMessage, maxContentLength.response); + } + + const logEntry = { + type: "llm_response", + correlationId, + sessionId, + provider, + model, + stream: stream || false, + destinationUrl, + destinationHostname: destinationHostname || hostname, + destinationPort: port, + destinationIp: destinationIp || null, + destinationIpFamily: destinationIpFamily || null, + protocol, + status: status || null, + latencyMs: latencyMs || null, + msg: error ? "LLM request failed" : "LLM response received", + }; + + // Add response content for non-streaming + if (!stream && assistantMessage) { + logEntry.assistantMessage = truncatedMessage.content; + logEntry.stopReason = stopReason || null; + logEntry.contentTruncated = truncatedMessage.truncated; + if (truncatedMessage.truncated) { + logEntry.assistantMessageOriginalLength = truncatedMessage.originalLength; + } + } + + // Add streaming note if applicable + if (stream && streamingNote) { + logEntry.streamingNote = streamingNote; + } + + // Add token usage + if (requestTokens || responseTokens) { + logEntry.usage = { + requestTokens: requestTokens || null, + responseTokens: responseTokens || null, + totalTokens: (requestTokens || 0) + (responseTokens || 0), + }; + } + + // Add error details if present + if (error) { + logEntry.error = typeof error === "string" ? error : error.message || "Unknown error"; + logEntry.errorStack = error.stack || null; + } + + logger.info(logEntry); + }, + + /** + * Log query-response pair with full content (NO truncation) + * This is logged AFTER the response for easy query/response correlation + * @param {Object} context - Query-response context + */ + logQueryResponsePair(context) { + const { + correlationId, + sessionId, + provider, + model, + requestTime, + responseTime, + userQuery, + assistantResponse, + stopReason, + latencyMs, + requestTokens, + responseTokens, + } = context; + + // Hash BEFORE truncate - apply smart truncation to userQuery + const hashedQuery = hashAndTruncateSystemReminder(userQuery, 50, deduplicator); + + // Deduplicate userQuery if original content is large enough + // Session-level deduplication: first time outputs truncated content, subsequent times output reference + let finalUserQuery = hashedQuery.content; + if (deduplicator && userQuery && deduplicator.shouldDeduplicate(userQuery)) { + const isFirstTime = deduplicator.isFirstTimeInSession(hashedQuery.hash); + if (isFirstTime) { + // First time: output truncated content, but store in dictionary + deduplicator.storeContentWithHash(userQuery, hashedQuery.hash); + finalUserQuery = hashedQuery.content; // Use truncated content + } else { + // Subsequent times: output only reference + finalUserQuery = deduplicator.storeContentWithHash(userQuery, hashedQuery.hash); + } + } + + const logEntry = { + type: "llm_query_response_pair", + correlationId, + sessionId, + provider, + model, + requestTime, + responseTime, + latencyMs: latencyMs || null, + userQuery: finalUserQuery, // Smart truncation + deduplication applied + assistantResponse, // Full response, NO truncation or deduplication (usually unique) + stopReason: stopReason || null, + msg: "Query-response pair (full content)", + }; + + // Add truncation metadata if truncation occurred + if (hashedQuery.truncated) { + logEntry.userQueryTruncated = true; + logEntry.userQueryOriginalLength = hashedQuery.originalLength; + logEntry.userQueryCharsRemoved = hashedQuery.charsRemoved; + } + + // Add token usage if available + if (requestTokens || responseTokens) { + logEntry.usage = { + requestTokens: requestTokens || null, + responseTokens: responseTokens || null, + totalTokens: (requestTokens || 0) + (responseTokens || 0), + }; + } + + logger.info(logEntry); + + // Log hash annotation for easy lookup + logHashAnnotation({ + userQueryHash: hashedQuery.hash, + }); + }, + + /** + * Restore full content from hash references in a log entry + * @param {Object} entry - Log entry with potential hash references + * @returns {Object} Entry with full content restored + */ + restoreLogEntry(entry) { + return deduplicator ? deduplicator.restoreEntry(entry) : entry; + }, + + /** + * Get deduplication statistics + * @returns {Object|null} Statistics or null if deduplication disabled + */ + getDeduplicationStats() { + return deduplicator ? deduplicator.getStats() : null; + }, + + enabled: true, + }; +} + +module.exports = { + createAuditLogger: createAuditLoggerWrapper, + truncateContent, + truncateSystemReminder, + hashAndTruncate, + hashAndTruncateSystemReminder, + parseDestinationUrl, +}; diff --git a/src/logger/deduplicator.js b/src/logger/deduplicator.js new file mode 100644 index 0000000..880ed80 --- /dev/null +++ b/src/logger/deduplicator.js @@ -0,0 +1,547 @@ +const crypto = require("crypto"); +const fs = require("fs"); +const path = require("path"); +const readline = require("readline"); + +/** + * Content Deduplicator for LLM Audit Logs + * + * Implements content-addressable storage using SHA-256 hashes to deduplicate + * repetitive content in audit logs. Large content blocks are stored once in a + * dictionary file and referenced by hash in the main log. + * + * Key Features: + * - Hash-based deduplication (SHA-256, first 16 chars) + * - LRU cache for hot content (avoids disk I/O) + * - Configurable minimum content size threshold + * - Async dictionary writes for minimal latency impact + * - Backward compatible (handles both references and inline content) + * + * Dictionary Format (JSONL): + * {"hash": "sha256:abc...", "content": "...", "firstSeen": "ISO timestamp", "useCount": 123} + * + * Reference Format: + * {"$ref": "sha256:abc...", "size": 1234} + */ + +class LRUCache { + constructor(maxSize = 100) { + this.maxSize = maxSize; + this.cache = new Map(); + } + + get(key) { + if (!this.cache.has(key)) { + return undefined; + } + // Move to end (most recently used) + const value = this.cache.get(key); + this.cache.delete(key); + this.cache.set(key, value); + return value; + } + + set(key, value) { + // Remove if exists (to update position) + if (this.cache.has(key)) { + this.cache.delete(key); + } + // Add to end + this.cache.set(key, value); + // Evict oldest if over size + if (this.cache.size > this.maxSize) { + const firstKey = this.cache.keys().next().value; + this.cache.delete(firstKey); + } + } + + has(key) { + return this.cache.has(key); + } +} + +class ContentDeduplicator { + constructor(dictionaryPath, options = {}) { + this.dictionaryPath = dictionaryPath; + this.minSize = options.minSize || 500; + this.cacheSize = options.cacheSize || 100; + this.sanitizeEnabled = options.sanitize !== false; // default true + this.sessionCacheEnabled = options.sessionCache !== false; // default true + + // LRU cache: hash -> content + this.contentCache = new LRUCache(this.cacheSize); + + // Track usage counts: hash -> count + this.usageCounts = new Map(); + + // Track last seen timestamps: hash -> ISO timestamp + this.lastSeenTimestamps = new Map(); + + // Session-level cache: hash -> boolean (tracks if hash has been output in full in this session) + // Cleared on server restart, not persisted to disk + this.sessionContentCache = new Map(); + + // Ensure dictionary directory exists + const dictDir = path.dirname(this.dictionaryPath); + if (!fs.existsSync(dictDir)) { + fs.mkdirSync(dictDir, { recursive: true }); + } + + // Load existing dictionary into cache + this._loadDictionary(); + } + + /** + * Load existing dictionary file into cache + * @private + */ + _loadDictionary() { + if (!fs.existsSync(this.dictionaryPath)) { + return; + } + + try { + const fileStream = fs.createReadStream(this.dictionaryPath); + const rl = readline.createInterface({ + input: fileStream, + crlfDelay: Infinity, + }); + + rl.on("line", (line) => { + try { + const entry = JSON.parse(line); + if (!entry.hash) return; + + // Handle full entry (has content) + if (entry.content !== undefined && entry.content !== null) { + this.contentCache.set(entry.hash, entry.content); + this.usageCounts.set(entry.hash, entry.useCount || 1); + this.lastSeenTimestamps.set(entry.hash, entry.lastSeen || entry.firstSeen); + } + // Handle update entry (content is null, only metadata) + else if (entry.firstSeen === null) { + // This is an update entry - update metadata only + if (entry.useCount !== undefined) { + this.usageCounts.set(entry.hash, entry.useCount); + } + if (entry.lastSeen) { + this.lastSeenTimestamps.set(entry.hash, entry.lastSeen); + } + } + } catch (err) { + // Skip malformed lines (silent - don't pollute logs) + } + }); + + // Suppress error events during load (file may not exist yet) + rl.on("error", () => { + // Silently ignore - dictionary will be created on first write + }); + + // Wait for file to be fully read (synchronously for initialization) + return new Promise((resolve) => { + rl.on("close", resolve); + }); + } catch (err) { + // Silently ignore load errors - dictionary will be created on first write + } + } + + /** + * Compute SHA-256 hash of content (first 16 chars for brevity) + * @param {string|object|array} content - Content to hash + * @returns {string} Hash in format "sha256:abc..." + */ + hashContent(content) { + const stringContent = typeof content === "string" ? content : JSON.stringify(content); + const hash = crypto.createHash("sha256").update(stringContent, "utf8").digest("hex"); + return `sha256:${hash.substring(0, 16)}`; + } + + /** + * Clean empty "User:" entries from content + * Removes wasteful empty "User:" entries that appear between Claude responses + * + * @private + * @param {string} content - Content to clean + * @returns {string} Cleaned content + */ + _sanitizeContent(content) { + // Only process string content that contains conversation patterns + if (typeof content !== 'string' || !content.includes('User:') || !content.includes('Claude:')) { + return content; + } + + // Split into lines for processing + const lines = content.split('\n'); + const cleaned = []; + let i = 0; + + while (i < lines.length) { + const line = lines[i]; + + // Check if this is an empty "User:" entry + // Pattern: line with just "User:" or "User: " followed by empty line(s) + if (line.trim() === 'User:' || line.trim() === 'User: ') { + // Look ahead to see if next line is empty or another "User:" or "Claude:" + const nextLine = i + 1 < lines.length ? lines[i + 1] : ''; + + // If followed by empty line or another marker, this is an empty User: entry + if (nextLine.trim() === '' || nextLine.trim() === 'User:' || nextLine.trim() === 'Claude:') { + // Skip this empty User: entry + i += 1; + // Also skip following empty lines + while (i < lines.length && lines[i].trim() === '') { + i += 1; + } + continue; + } + } + + // Keep this line + cleaned.push(line); + i++; + } + + return cleaned.join('\n'); + } + + /** + * Check if content should be deduplicated + * @param {string|object|array} content - Content to check + * @param {number} minSize - Minimum size threshold (default: from config) + * @returns {boolean} True if content should be deduplicated + */ + shouldDeduplicate(content, minSize = null) { + if (!content) return false; + + const threshold = minSize !== null ? minSize : this.minSize; + const stringContent = typeof content === "string" ? content : JSON.stringify(content); + + // Check size threshold + if (stringContent.length < threshold) { + return false; + } + + // Don't deduplicate already truncated content (contains truncation indicators) + if ( + typeof stringContent === "string" && + (stringContent.includes("[truncated,") || stringContent.includes("... [truncated")) + ) { + return false; + } + + return true; + } + + /** + * Store content in dictionary and return reference + * @param {string|object|array} content - Content to store + * @returns {object} Reference object: { $ref: "sha256:abc...", size: 1234 } + */ + storeContent(content) { + if (!content) { + return null; + } + + let stringContent = typeof content === "string" ? content : JSON.stringify(content); + + // Sanitize content before hashing (if enabled) + if (this.sanitizeEnabled) { + stringContent = this._sanitizeContent(stringContent); + } + + const hash = this.hashContent(stringContent); + const size = stringContent.length; + + // Update usage count + const currentCount = this.usageCounts.get(hash) || 0; + this.usageCounts.set(hash, currentCount + 1); + + // If already in cache, just return reference + if (this.contentCache.has(hash)) { + // Update lastSeen timestamp + const now = new Date().toISOString(); + this.lastSeenTimestamps.set(hash, now); + // Update dictionary entry asynchronously (increment useCount, update lastSeen) + this._updateDictionaryEntry(hash, currentCount + 1, now); + return { $ref: hash, size }; + } + + // Store in cache + this.contentCache.set(hash, stringContent); + + // Track lastSeen + const now = new Date().toISOString(); + this.lastSeenTimestamps.set(hash, now); + + // Append to dictionary file asynchronously + this._appendToDictionary(hash, stringContent, currentCount + 1, now); + + return { $ref: hash, size }; + } + + /** + * Store content with a pre-computed hash (for hash-before-truncate pattern) + * @param {string|object|array} content - Content to store (original, not truncated) + * @param {string} precomputedHash - Hash computed before truncation + * @returns {object} Reference object: { $ref: "sha256:abc...", size: 1234 } or full content if first time in session + */ + storeContentWithHash(content, precomputedHash) { + if (!content || !precomputedHash) { + return null; + } + + let stringContent = typeof content === "string" ? content : JSON.stringify(content); + + // Sanitize content (if enabled) + if (this.sanitizeEnabled) { + stringContent = this._sanitizeContent(stringContent); + } + + const hash = precomputedHash; // Use provided hash instead of recomputing + const size = stringContent.length; + + // Update usage count + const currentCount = this.usageCounts.get(hash) || 0; + this.usageCounts.set(hash, currentCount + 1); + + // Track lastSeen + const now = new Date().toISOString(); + this.lastSeenTimestamps.set(hash, now); + + // Session-level deduplication: First time in session outputs full content + const isFirstTimeInSession = this.sessionCacheEnabled && !this.isFirstTimeInSession(hash); + + // If already in cache (dictionary), update metadata + if (this.contentCache.has(hash)) { + // Update dictionary entry asynchronously (increment useCount, update lastSeen) + this._updateDictionaryEntry(hash, currentCount + 1, now); + + // If first time in session, mark it and return reference (will be expanded by caller if needed) + if (isFirstTimeInSession) { + this.markSeenInSession(hash); + } + + return { $ref: hash, size }; + } + + // Store in cache (first time ever) + this.contentCache.set(hash, stringContent); + + // Mark as seen in session + if (this.sessionCacheEnabled) { + this.markSeenInSession(hash); + } + + // Append to dictionary file asynchronously + this._appendToDictionary(hash, stringContent, currentCount + 1, now); + + return { $ref: hash, size }; + } + + /** + * Check if this hash has been output in full during the current session + * @param {string} hash - Content hash + * @returns {boolean} True if this is the first time seeing this hash in this session + */ + isFirstTimeInSession(hash) { + return !this.sessionContentCache.has(hash); + } + + /** + * Mark a hash as having been output in full during this session + * @param {string} hash - Content hash + */ + markSeenInSession(hash) { + this.sessionContentCache.set(hash, true); + } + + /** + * Clear session cache (useful for testing or manual cache reset) + */ + clearSessionCache() { + this.sessionContentCache.clear(); + } + + /** + * Append new entry to dictionary file + * @private + * @param {string} hash - Content hash + * @param {string} content - Actual content + * @param {number} useCount - Usage count + * @param {string} timestamp - ISO timestamp for firstSeen/lastSeen + */ + _appendToDictionary(hash, content, useCount, timestamp) { + const entry = { + hash, + firstSeen: timestamp, + useCount, + lastSeen: timestamp, + content, + }; + + // Async append (non-blocking) + fs.appendFile(this.dictionaryPath, JSON.stringify(entry) + "\n", (err) => { + if (err) { + console.error("Failed to append to dictionary:", err.message); + } + }); + } + + /** + * Update existing dictionary entry (append update line to dictionary) + * @private + * @param {string} hash - Content hash + * @param {number} newCount - New usage count + * @param {string} timestamp - ISO timestamp for lastSeen + */ + _updateDictionaryEntry(hash, newCount, timestamp) { + // Append an update entry with the same hash to track updated metadata + // The reader/compactor should use the LAST entry for a given hash + const updateEntry = { + hash, + firstSeen: null, // Null indicates this is an update, not a new entry + useCount: newCount, + lastSeen: timestamp, + content: null, + }; + + // Async append (non-blocking) + fs.appendFile(this.dictionaryPath, JSON.stringify(updateEntry) + "\n", (err) => { + if (err) { + console.error("Failed to append update to dictionary:", err.message); + } + }); + } + + /** + * Retrieve content by hash reference + * @param {string} hashRef - Hash reference (e.g., "sha256:abc...") + * @returns {string|null} Content or null if not found + */ + getContent(hashRef) { + // Check cache first + if (this.contentCache.has(hashRef)) { + return this.contentCache.get(hashRef); + } + + // If not in cache, read from dictionary (synchronously for now) + return this._readFromDictionary(hashRef); + } + + /** + * Read content from dictionary file by hash + * @private + * @param {string} hashRef - Hash reference + * @returns {string|null} Content or null if not found + */ + _readFromDictionary(hashRef) { + if (!fs.existsSync(this.dictionaryPath)) { + return null; + } + + try { + const fileContent = fs.readFileSync(this.dictionaryPath, "utf8"); + const lines = fileContent.split("\n"); + + for (const line of lines) { + if (!line.trim()) continue; + try { + const entry = JSON.parse(line); + if (entry.hash === hashRef) { + // Cache it for future use + this.contentCache.set(hashRef, entry.content); + return entry.content; + } + } catch { + // Skip malformed lines + } + } + } catch (err) { + console.error("Failed to read dictionary:", err.message); + } + + return null; + } + + /** + * Process log entry fields for deduplication + * @param {object} entry - Log entry object + * @param {string[]} fields - Fields to deduplicate (default: common large fields) + * @returns {object} Entry with deduplicated fields + */ + deduplicateEntry(entry, fields = ["userMessages", "systemPrompt", "userQuery"]) { + if (!entry || typeof entry !== "object") { + return entry; + } + + const deduplicated = { ...entry }; + + for (const field of fields) { + const content = entry[field]; + + // Skip if field doesn't exist or is already a reference + if (!content || (typeof content === "object" && content.$ref)) { + continue; + } + + // Check if should deduplicate + if (this.shouldDeduplicate(content)) { + const ref = this.storeContent(content); + if (ref) { + deduplicated[field] = ref; + } + } + } + + return deduplicated; + } + + /** + * Restore full content from hash references in a log entry + * @param {object} entry - Log entry with potential references + * @returns {object} Entry with full content restored + */ + restoreEntry(entry) { + if (!entry || typeof entry !== "object") { + return entry; + } + + const restored = { ...entry }; + + // Check all fields for references + for (const [key, value] of Object.entries(entry)) { + if (typeof value === "object" && value !== null && value.$ref) { + const content = this.getContent(value.$ref); + if (content !== null) { + // Try to parse back to original type + try { + restored[key] = JSON.parse(content); + } catch { + restored[key] = content; + } + } + } + } + + return restored; + } + + /** + * Get statistics about deduplication + * @returns {object} Statistics + */ + getStats() { + return { + cacheSize: this.contentCache.cache.size, + uniqueContentBlocks: this.usageCounts.size, + totalReferences: Array.from(this.usageCounts.values()).reduce((a, b) => a + b, 0), + }; + } +} + +module.exports = { + ContentDeduplicator, + LRUCache, +}; diff --git a/src/logger/index.js b/src/logger/index.js index cf29399..7f49d9f 100644 --- a/src/logger/index.js +++ b/src/logger/index.js @@ -1,27 +1,91 @@ const pino = require("pino"); const config = require("../config"); +const { createOversizedErrorStream } = require("./oversized-error-stream"); -const logger = pino({ - level: config.logger.level, - name: "claude-backend", - base: { - env: config.env, - }, - redact: { - paths: ["req.headers.authorization", "req.headers.cookie"], - censor: "***redacted***", - }, - transport: - config.env === "development" - ? { - target: "pino-pretty", - options: { - translateTime: "SYS:standard", - ignore: "pid,hostname", - colorize: true, - }, - } - : undefined, +/** + * Application logger using Pino + * + * Standard Network Logging Fields: + * When logging network requests/responses, use these consistent field names: + * + * - destinationUrl: string - Full URL being requested (e.g., "https://api.example.com/v1/endpoint") + * - destinationHostname: string - Hostname only (e.g., "api.example.com") + * - destinationIp: string - Resolved IP address (logged by DNS logger at debug level) + * - ipFamily: number - IP version (4 or 6) - logged by DNS logger + * - protocol: string - Protocol used ("http" or "https") + * - status: number - HTTP status code + * - provider: string - Service/provider label (e.g., "OpenAI", "HTTP", "HTTPS") + * - duration: number - Request duration in milliseconds + * + * DNS Resolution Logging: + * DNS resolution is logged at debug level via the dns-logger module. + * To see DNS logs, set LOG_LEVEL=debug. DNS logs correlate with application + * logs via the destinationHostname field. + * + * Example DNS log: + * { + * "level": "debug", + * "provider": "HTTPS", + * "hostname": "api.openai.com", + * "resolvedIp": "104.18.23.45", + * "ipFamily": 4, + * "duration": 23, + * "msg": "DNS resolution completed" + * } + * + * Example API request log: + * { + * "level": "debug", + * "provider": "OpenAI", + * "status": 200, + * "destinationUrl": "https://api.openai.com/v1/chat/completions", + * "destinationHostname": "api.openai.com", + * "responseLength": 1523, + * "msg": "OpenAI API response" + * } + */ + +// Create array of streams for multistream setup +const streams = []; + +// Main console output stream +streams.push({ + level: config.logger.level, + stream: + config.env === "development" + ? pino.transport({ + target: "pino-pretty", + options: { + translateTime: "SYS:standard", + ignore: "pid,hostname", + colorize: true, + }, + }) + : process.stdout, }); +// Oversized error stream (if enabled) +if (config.oversizedErrorLogging?.enabled) { + streams.push({ + level: "warn", // Only capture WARN and ERROR + stream: createOversizedErrorStream(config.oversizedErrorLogging), + }); +} + +// Create logger with multistream +const logger = pino( + { + level: config.logger.level, + name: "claude-backend", + base: { + env: config.env, + }, + redact: { + paths: ["req.headers.authorization", "req.headers.cookie"], + censor: "***redacted***", + }, + }, + pino.multistream(streams), +); + module.exports = logger; diff --git a/src/logger/oversized-error-stream.js b/src/logger/oversized-error-stream.js new file mode 100644 index 0000000..adbbd82 --- /dev/null +++ b/src/logger/oversized-error-stream.js @@ -0,0 +1,288 @@ +const fs = require("node:fs"); +const path = require("node:path"); +const { Writable } = require("node:stream"); + +/** + * Custom Pino stream that captures oversized error messages to separate log files. + * Errors from the same session are appended to a single file. + */ + +// Cache of session file handles to enable appending +const sessionFiles = new Map(); + +// Track first error timestamp per session for filename +const sessionTimestamps = new Map(); + +/** + * Creates a custom Pino stream that captures oversized errors + * @param {Object} config - Configuration object + * @param {number} config.threshold - Character threshold for capturing (default 200) + * @param {string} config.logDir - Directory for oversized error logs + * @param {number} config.maxFiles - Maximum number of log files to keep + * @returns {Writable} - Writable stream for Pino + */ +function createOversizedErrorStream(config) { + const { threshold = 200, logDir, maxFiles = 100 } = config; + + // Ensure log directory exists + ensureDirectoryExists(logDir); + + // Create writable stream + const stream = new Writable({ + objectMode: true, + write(chunk, encoding, callback) { + try { + // Parse the log entry (Pino sends JSON strings) + const logObject = typeof chunk === "string" ? JSON.parse(chunk) : chunk; + + // Check if this log should be captured + const { shouldCapture, oversizedFields } = shouldCaptureLog(logObject, threshold); + + if (shouldCapture) { + // Extract or generate session ID + const sessionId = extractSessionId(logObject); + + // Get or create session file + const { filepath, writeStream } = getSessionFile(sessionId, logDir, maxFiles); + + // Format the log entry + const logEntry = formatLogEntry(logObject, oversizedFields); + + // Write to file (JSONL format - one JSON object per line) + writeStream.write(`${JSON.stringify(logEntry)}\n`, (err) => { + if (err) { + console.error(`Failed to write oversized error to ${filepath}:`, err.message); + } + }); + } + + // Always call callback to continue stream processing + callback(); + } catch (err) { + // Don't crash on stream errors - log and continue + console.error("Oversized error stream processing failed:", err.message); + callback(); + } + }, + final(callback) { + // Close all open file handles when stream ends + for (const [sessionId, { writeStream }] of sessionFiles.entries()) { + writeStream.end(); + } + sessionFiles.clear(); + sessionTimestamps.clear(); + callback(); + }, + }); + + // Handle stream errors gracefully + stream.on("error", (err) => { + console.error("Oversized error stream error:", err.message); + }); + + return stream; +} + +/** + * Determines if a log entry should be captured based on size threshold + * @param {Object} logObject - Pino log object + * @param {number} threshold - Character threshold + * @returns {Object} - { shouldCapture: boolean, oversizedFields: string[] } + */ +function shouldCaptureLog(logObject, threshold) { + // Only capture WARN (40) and ERROR (50) level logs + if (logObject.level < 40) { + return { shouldCapture: false, oversizedFields: [] }; + } + + const oversizedFields = []; + + // Check all fields recursively + function checkField(value, fieldPath) { + if (typeof value === "string") { + if (value.length > threshold) { + oversizedFields.push(fieldPath); + } + } else if (typeof value === "object" && value !== null) { + // Check nested objects/arrays + for (const [key, val] of Object.entries(value)) { + checkField(val, fieldPath ? `${fieldPath}.${key}` : key); + } + } + } + + // Check all fields in log object + for (const [key, value] of Object.entries(logObject)) { + // Skip internal Pino fields + if (["level", "time", "pid", "hostname"].includes(key)) continue; + checkField(value, key); + } + + return { + shouldCapture: oversizedFields.length > 0, + oversizedFields, + }; +} + +/** + * Extracts session ID from log object with fallback strategies + * @param {Object} logObject - Pino log object + * @returns {string} - Session ID or fallback identifier + */ +function extractSessionId(logObject) { + // Try sessionId field first + if (logObject.sessionId) return logObject.sessionId; + + // Try correlationId + if (logObject.correlationId) return logObject.correlationId; + + // Try requestId + if (logObject.requestId) return logObject.requestId; + + // Fallback to unknown with timestamp + return `unknown-${Date.now()}`; +} + +/** + * Gets or creates a file handle for a session + * @param {string} sessionId - Session identifier + * @param {string} logDir - Log directory path + * @param {number} maxFiles - Maximum number of files to keep + * @returns {Object} - { filepath: string, writeStream: WriteStream } + */ +function getSessionFile(sessionId, logDir, maxFiles) { + // Check if we already have a file for this session + if (sessionFiles.has(sessionId)) { + return sessionFiles.get(sessionId); + } + + // Clean up old files if needed (before creating new one) + cleanupOldFiles(logDir, maxFiles); + + // Generate timestamp for first error in this session + const timestamp = new Date() + .toISOString() + .replace(/[-:]/g, "_") + .replace(/\.\d{3}Z$/, "") + .replace("T", "_"); + + sessionTimestamps.set(sessionId, timestamp); + + // Create filename: {sessionId}_{timestamp}.log + const filename = `${sessionId}_${timestamp}.log`; + const filepath = path.join(logDir, filename); + + // Create write stream in append mode + const writeStream = fs.createWriteStream(filepath, { + flags: "a", // append mode + encoding: "utf8", + }); + + // Handle write stream errors + writeStream.on("error", (err) => { + console.error(`Error writing to ${filepath}:`, err.message); + sessionFiles.delete(sessionId); + }); + + // Cache the file handle + const fileInfo = { filepath, writeStream }; + sessionFiles.set(sessionId, fileInfo); + + return fileInfo; +} + +/** + * Formats a log entry for file storage with metadata + * @param {Object} logObject - Original Pino log object + * @param {string[]} oversizedFields - List of fields that exceeded threshold + * @returns {Object} - Formatted log entry + */ +function formatLogEntry(logObject, oversizedFields) { + // Convert Pino timestamp (milliseconds since epoch) to ISO string + const timestamp = new Date(logObject.time).toISOString(); + + // Map Pino log level numbers to names + const levelNames = { + 10: "TRACE", + 20: "DEBUG", + 30: "INFO", + 40: "WARN", + 50: "ERROR", + 60: "FATAL", + }; + + return { + timestamp, + level: levelNames[logObject.level] || "UNKNOWN", + levelNumber: logObject.level, + name: logObject.name, + sessionId: extractSessionId(logObject), + oversizedFields, + ...logObject, // Include all original fields + // Remove redundant internal fields + time: undefined, + pid: undefined, + hostname: undefined, + }; +} + +/** + * Removes oldest log files if count exceeds maximum + * @param {string} logDir - Log directory path + * @param {number} maxFiles - Maximum number of files to keep + */ +function cleanupOldFiles(logDir, maxFiles) { + try { + // List all .log files in directory + const files = fs.readdirSync(logDir).filter((f) => f.endsWith(".log")); + + // If under limit, no cleanup needed + if (files.length < maxFiles) return; + + // Get file stats and sort by modification time (oldest first) + const fileStats = files + .map((filename) => { + const filepath = path.join(logDir, filename); + const stats = fs.statSync(filepath); + return { filename, filepath, mtime: stats.mtime }; + }) + .sort((a, b) => a.mtime - b.mtime); + + // Delete oldest files until we're under the limit + const filesToDelete = fileStats.length - maxFiles + 1; // +1 to make room for new file + for (let i = 0; i < filesToDelete; i++) { + const { filepath } = fileStats[i]; + try { + fs.unlinkSync(filepath); + } catch (err) { + console.error(`Failed to delete old oversized error log ${filepath}:`, err.message); + } + } + } catch (err) { + console.error("Failed to cleanup old oversized error logs:", err.message); + } +} + +/** + * Ensures a directory exists, creating it if necessary + * @param {string} dirPath - Directory path + */ +function ensureDirectoryExists(dirPath) { + try { + if (!fs.existsSync(dirPath)) { + fs.mkdirSync(dirPath, { recursive: true }); + } + } catch (err) { + console.error(`Failed to create oversized error log directory ${dirPath}:`, err.message); + throw err; + } +} + +module.exports = { + createOversizedErrorStream, + shouldCaptureLog, + extractSessionId, + getSessionFile, + formatLogEntry, + cleanupOldFiles, +}; diff --git a/src/orchestrator/index.js b/src/orchestrator/index.js index 5444358..25f867e 100644 --- a/src/orchestrator/index.js +++ b/src/orchestrator/index.js @@ -11,6 +11,9 @@ const systemPrompt = require("../prompts/system"); const historyCompression = require("../context/compression"); const tokenBudget = require("../context/budget"); const { classifyRequestType, selectToolsSmartly } = require("../tools/smart-selection"); +const { parseOllamaModel } = require("../clients/ollama-model-parser"); +const { createAuditLogger } = require("../logger/audit-logger"); +const { getResolvedIp, runWithDnsContext } = require("../clients/dns-logger"); const { getShuttingDown } = require("../api/health"); const DROP_KEYS = new Set([ @@ -978,22 +981,6 @@ function sanitizePayload(payload) { if (!Array.isArray(clean.tools) || clean.tools.length === 0) { delete clean.tools; } - } else if (providerType === "zai") { - // Z.AI (Zhipu) supports tools - keep them in Anthropic format - // They will be converted to OpenAI format in invokeZai - if (!Array.isArray(clean.tools) || clean.tools.length === 0) { - delete clean.tools; - } else { - // Ensure tools are in Anthropic format - clean.tools = ensureAnthropicToolFormat(clean.tools); - } - } else if (providerType === "vertex") { - // Vertex AI supports tools - keep them in Anthropic format - if (!Array.isArray(clean.tools) || clean.tools.length === 0) { - delete clean.tools; - } else { - clean.tools = ensureAnthropicToolFormat(clean.tools); - } } else if (Array.isArray(clean.tools)) { // Unknown provider - remove tools for safety delete clean.tools; @@ -1069,44 +1056,13 @@ function sanitizePayload(payload) { } } - // FIX: Prevent consecutive messages with the same role (causes llama.cpp 400 error) - if (Array.isArray(clean.messages) && clean.messages.length > 0) { - const deduplicated = []; - let lastRole = null; - - for (const msg of clean.messages) { - // Skip if this message has the same role as the previous one - if (msg.role === lastRole) { - logger.debug({ - skippedRole: msg.role, - contentPreview: typeof msg.content === 'string' - ? msg.content.substring(0, 50) - : JSON.stringify(msg.content).substring(0, 50) - }, 'Skipping duplicate consecutive message with same role'); - continue; - } - - deduplicated.push(msg); - lastRole = msg.role; - } - - if (deduplicated.length !== clean.messages.length) { - logger.info({ - originalCount: clean.messages.length, - deduplicatedCount: deduplicated.length, - removed: clean.messages.length - deduplicated.length - }, 'Removed consecutive duplicate roles from message sequence'); - } - - clean.messages = deduplicated; - } - return clean; } const DEFAULT_LOOP_OPTIONS = { maxSteps: config.policy.maxStepsPerTurn ?? 6, maxDurationMs: 120000, + maxToolCallsPerRequest: config.policy.maxToolCallsPerRequest ?? 20, // Prevent runaway tool calling }; function resolveLoopOptions(options = {}) { @@ -1118,13 +1074,43 @@ function resolveLoopOptions(options = {}) { Number.isInteger(options.maxDurationMs) && options.maxDurationMs > 0 ? options.maxDurationMs : DEFAULT_LOOP_OPTIONS.maxDurationMs; + const maxToolCallsPerRequest = + Number.isInteger(options.maxToolCallsPerRequest) && options.maxToolCallsPerRequest > 0 + ? options.maxToolCallsPerRequest + : DEFAULT_LOOP_OPTIONS.maxToolCallsPerRequest; return { ...DEFAULT_LOOP_OPTIONS, maxSteps, maxDurationMs, + maxToolCallsPerRequest, }; } +/** + * Create a signature for a tool call to detect identical repeated calls + * @param {Object} toolCall - The tool call object + * @returns {string} - A hash signature of the tool name and parameters + */ +function getToolCallSignature(toolCall) { + const crypto = require('crypto'); + const name = toolCall.function?.name ?? toolCall.name ?? 'unknown'; + const args = toolCall.function?.arguments ?? toolCall.input; + + // Parse arguments if they're a string + let argsObj = args; + if (typeof args === 'string') { + try { + argsObj = JSON.parse(args); + } catch (err) { + argsObj = args; // Use raw string if parse fails + } + } + + // Create a deterministic signature + const signature = `${name}:${JSON.stringify(argsObj)}`; + return crypto.createHash('sha256').update(signature).digest('hex').substring(0, 16); +} + function buildNonJsonResponse(databricksResponse) { return { status: databricksResponse.status, @@ -1170,6 +1156,8 @@ async function runAgentLoop({ let toolCallsExecuted = 0; let fallbackPerformed = false; const toolCallNames = new Map(); + const toolCallHistory = new Map(); // Track tool calls to detect loops: signature -> count + let loopWarningInjected = false; // Track if we've already warned about loops while (steps < settings.maxSteps) { if (Date.now() - start > settings.maxDurationMs) { @@ -1816,11 +1804,10 @@ async function runAgentLoop({ ), ); } else { - // OpenAI format: tool_call_id MUST match the id from assistant's tool_call toolMessage = { role: "tool", - tool_call_id: call.id ?? execution.id, - name: call.function?.name ?? call.name ?? execution.name, + tool_call_id: execution.id, + name: execution.name, content: execution.content, }; } @@ -1861,6 +1848,35 @@ async function runAgentLoop({ completedTasks: taskExecutions.length, sessionId: session?.id }, "Completed parallel Task execution"); + + // Check if we've exceeded the max tool calls limit after parallel execution + if (toolCallsExecuted > settings.maxToolCallsPerRequest) { + logger.error( + { + sessionId: session?.id ?? null, + toolCallsExecuted, + maxToolCallsPerRequest: settings.maxToolCallsPerRequest, + steps, + }, + "Maximum tool calls per request exceeded after parallel Task execution - terminating", + ); + + return { + response: { + status: 500, + body: { + error: { + type: "max_tool_calls_exceeded", + message: `Maximum tool calls per request exceeded. The model attempted to execute ${toolCallsExecuted} tool calls, but the limit is ${settings.maxToolCallsPerRequest}. This may indicate a complex task that requires breaking down into smaller steps.`, + }, + }, + terminationReason: "max_tool_calls_exceeded", + }, + steps, + durationMs: Date.now() - start, + terminationReason: "max_tool_calls_exceeded", + }; + } } catch (error) { logger.error({ error: error.message, @@ -1952,6 +1968,35 @@ async function runAgentLoop({ toolCallsExecuted += 1; + // Check if we've exceeded the max tool calls limit + if (toolCallsExecuted > settings.maxToolCallsPerRequest) { + logger.error( + { + sessionId: session?.id ?? null, + toolCallsExecuted, + maxToolCallsPerRequest: settings.maxToolCallsPerRequest, + steps, + }, + "Maximum tool calls per request exceeded - terminating", + ); + + return { + response: { + status: 500, + body: { + error: { + type: "max_tool_calls_exceeded", + message: `Maximum tool calls per request exceeded. The model attempted to execute ${toolCallsExecuted} tool calls, but the limit is ${settings.maxToolCallsPerRequest}. This may indicate a complex task that requires breaking down into smaller steps.`, + }, + }, + terminationReason: "max_tool_calls_exceeded", + }, + steps, + durationMs: Date.now() - start, + terminationReason: "max_tool_calls_exceeded", + }; + } + const execution = await executeToolCall(call, { session, requestMessages: cleanPayload.messages, @@ -2005,11 +2050,10 @@ async function runAgentLoop({ ); } else { - // OpenAI format: tool_call_id MUST match the id from assistant's tool_call toolMessage = { role: "tool", - tool_call_id: call.id ?? execution.id, - name: call.function?.name ?? call.name ?? execution.name, + tool_call_id: execution.id, + name: execution.name, content: execution.content, }; } @@ -2067,6 +2111,94 @@ async function runAgentLoop({ } } + // === TOOL CALL LOOP DETECTION === + // Track tool calls to detect infinite loops where the model calls the same tool + // repeatedly with identical parameters + for (const call of toolCalls) { + const signature = getToolCallSignature(call); + const count = (toolCallHistory.get(signature) || 0) + 1; + toolCallHistory.set(signature, count); + + const toolName = call.function?.name ?? call.name ?? 'unknown'; + + if (count === 3 && !loopWarningInjected) { + logger.warn( + { + sessionId: session?.id ?? null, + correlationId: options?.correlationId, + tool: toolName, + loopCount: count, + signature: signature, + action: 'warning_injected', + totalSteps: steps, + remainingSteps: settings.maxSteps - steps, + }, + "Tool call loop detected - same tool called 3 times with identical parameters", + ); + + // Inject warning message to model + loopWarningInjected = true; + const warningMessage = { + role: "user", + content: "⚠️ System Warning: You have called the same tool with identical parameters 3 times in this request. This may indicate an infinite loop. Please provide a final answer to the user instead of calling the same tool again, or explain why you need to continue retrying with the same parameters.", + }; + + cleanPayload.messages.push(warningMessage); + + if (session) { + appendTurnToSession(session, { + role: "user", + type: "system_warning", + status: 200, + content: warningMessage.content, + metadata: { + reason: "tool_call_loop_warning", + toolName, + loopCount: count, + }, + }); + } + } else if (count > 3) { + // Force termination after 3 identical calls + // Log FULL context for debugging why the loop occurred + logger.error( + { + sessionId: session?.id ?? null, + correlationId: options?.correlationId, + tool: toolName, + loopCount: count, + signature: signature, + action: 'request_terminated', + totalSteps: steps, + maxSteps: settings.maxSteps, + // FULL CONTEXT for debugging + myPrompt: cleanPayload.messages, // Full conversation sent to LLM + systemPrompt: cleanPayload.system, // Full system prompt + llmResponse: databricksResponse?.data || databricksResponse?.json, // Full LLM response that triggered loop + repeatedToolCalls: toolCalls, // The actual repeated tool calls + toolCallHistory: Array.from(toolCallHistory.entries()), // Full history of all tool calls in this request + }, + "Tool call loop limit exceeded - forcing termination (FULL CONTEXT CAPTURED)", + ); + + return { + response: { + status: 500, + body: { + error: { + type: "tool_call_loop_detected", + message: `Tool call loop detected: The model called the same tool ("${toolName}") with identical parameters ${count} times. This indicates an infinite loop and execution has been terminated. Please try rephrasing your request or provide different parameters.`, + }, + }, + terminationReason: "tool_call_loop", + }, + steps, + durationMs: Date.now() - start, + terminationReason: "tool_call_loop", + }; + } + } + continue; } @@ -2275,22 +2407,6 @@ async function runAgentLoop({ }, "=== CONVERTED ANTHROPIC RESPONSE (llama.cpp) ==="); anthropicPayload.content = policy.sanitiseContent(anthropicPayload.content); - } else if (actualProvider === "zai") { - // Z.AI responses are already converted to Anthropic format in invokeZai - logger.info({ - hasJson: !!databricksResponse.json, - jsonContent: JSON.stringify(databricksResponse.json?.content)?.substring(0, 200), - }, "=== ZAI ORCHESTRATOR DEBUG ==="); - anthropicPayload = databricksResponse.json; - if (Array.isArray(anthropicPayload?.content)) { - anthropicPayload.content = policy.sanitiseContent(anthropicPayload.content); - } - } else if (actualProvider === "vertex") { - // Vertex AI responses are already in Anthropic format - anthropicPayload = databricksResponse.json; - if (Array.isArray(anthropicPayload?.content)) { - anthropicPayload.content = policy.sanitiseContent(anthropicPayload.content); - } } else { anthropicPayload = toAnthropicResponse( databricksResponse.json, @@ -2527,6 +2643,35 @@ async function runAgentLoop({ toolCallsExecuted += 1; + // Check if we've exceeded the max tool calls limit + if (toolCallsExecuted > settings.maxToolCallsPerRequest) { + logger.error( + { + sessionId: session?.id ?? null, + toolCallsExecuted, + maxToolCallsPerRequest: settings.maxToolCallsPerRequest, + steps, + }, + "Maximum tool calls per request exceeded during fallback - terminating", + ); + + return { + response: { + status: 500, + body: { + error: { + type: "max_tool_calls_exceeded", + message: `Maximum tool calls per request exceeded. The model attempted to execute ${toolCallsExecuted} tool calls, but the limit is ${settings.maxToolCallsPerRequest}. This may indicate a complex task that requires breaking down into smaller steps.`, + }, + }, + terminationReason: "max_tool_calls_exceeded", + }, + steps, + durationMs: Date.now() - start, + terminationReason: "max_tool_calls_exceeded", + }; + } + if (execution.ok) { fallbackPerformed = true; attemptSucceeded = true; @@ -2584,13 +2729,18 @@ async function runAgentLoop({ }); } + const finalDurationMs = Date.now() - start; logger.info( { sessionId: session?.id ?? null, steps, - durationMs: Date.now() - start, + toolCallsExecuted, + uniqueToolSignatures: toolCallHistory.size, + toolCallLoopWarnings: loopWarningInjected ? 1 : 0, + durationMs: finalDurationMs, + avgDurationPerStep: steps > 0 ? Math.round(finalDurationMs / steps) : 0, }, - "Agent loop completed", + "Agent loop completed successfully", ); return { response: { @@ -2599,7 +2749,7 @@ async function runAgentLoop({ terminationReason: "completion", }, steps, - durationMs: Date.now() - start, + durationMs: finalDurationMs, terminationReason: "completion", }; } @@ -2618,11 +2768,17 @@ async function runAgentLoop({ }, metadata: { termination: "max_steps" }, }); + const finalDurationMs = Date.now() - start; logger.warn( { sessionId: session?.id ?? null, steps, - durationMs: Date.now() - start, + toolCallsExecuted, + uniqueToolSignatures: toolCallHistory.size, + durationMs: finalDurationMs, + maxSteps: settings.maxSteps, + maxDurationMs: settings.maxDurationMs, + maxToolCallsPerRequest: settings.maxToolCallsPerRequest, }, "Agent loop exceeded limits", ); @@ -2636,12 +2792,18 @@ async function runAgentLoop({ limits: { maxSteps: settings.maxSteps, maxDurationMs: settings.maxDurationMs, + maxToolCallsPerRequest: settings.maxToolCallsPerRequest, + }, + metrics: { + steps, + toolCallsExecuted, + durationMs: finalDurationMs, }, }, terminationReason: "max_steps", }, steps, - durationMs: Date.now() - start, + durationMs: finalDurationMs, terminationReason: "max_steps", }; } @@ -2668,8 +2830,8 @@ async function processMessage({ payload, headers, session, options = {} }) { let cacheKey = null; let cachedResponse = null; if (promptCache.isEnabled()) { - // cleanPayload is already a deep clone from sanitizePayload, no need to clone again - const { key, entry } = promptCache.lookup(cleanPayload); + const cacheSeedPayload = JSON.parse(JSON.stringify(cleanPayload)); + const { key, entry } = promptCache.lookup(cacheSeedPayload); cacheKey = key; if (entry?.value) { try {