diff --git a/apps/api/src/storage/adapters/memory.adapter.ts b/apps/api/src/storage/adapters/memory.adapter.ts index bb028f37..0ec8a707 100644 --- a/apps/api/src/storage/adapters/memory.adapter.ts +++ b/apps/api/src/storage/adapters/memory.adapter.ts @@ -50,6 +50,7 @@ import type { } from '@betterdb/shared'; import { PROPOSAL_DEFAULT_EXPIRY_MS, variantPayloadSchemaFor } from '@betterdb/shared'; import { WebhookMemoryRepository } from './repositories/webhook.memory.repository'; +import { SlowLogMemoryRepository } from './repositories/slowlog.memory.repository'; const NULL_SUB_DISCRIMINATOR = '__betterdb_null__'; @@ -73,7 +74,6 @@ export class MemoryAdapter implements StoragePort { private clientSnapshots: StoredClientSnapshot[] = []; private anomalyEvents: StoredAnomalyEvent[] = []; private correlatedGroups: StoredCorrelatedGroup[] = []; - private slowLogEntries: StoredSlowLogEntry[] = []; private commandLogEntries: StoredCommandLogEntry[] = []; private latencySnapshots: StoredLatencySnapshot[] = []; private latencyHistograms: StoredLatencyHistogram[] = []; @@ -83,6 +83,7 @@ export class MemoryAdapter implements StoragePort { private settings: AppSettings | null = null; private readonly MAX_DELIVERIES_PER_WEBHOOK = 1000; private readonly webhookRepo = new WebhookMemoryRepository(this.MAX_DELIVERIES_PER_WEBHOOK); + private readonly slowlogRepo = new SlowLogMemoryRepository(); private idCounter = 1; private ready: boolean = false; @@ -838,73 +839,19 @@ export class MemoryAdapter implements StoragePort { // Slow Log Methods async saveSlowLogEntries(entries: StoredSlowLogEntry[], connectionId: string): Promise { - let savedCount = 0; - for (const entry of entries) { - // Check for duplicates based on unique constraint (including connectionId) - const exists = this.slowLogEntries.some( - (e) => - e.id === entry.id && - e.sourceHost === entry.sourceHost && - e.sourcePort === entry.sourcePort && - e.connectionId === connectionId, - ); - if (!exists) { - this.slowLogEntries.push({ ...entry, connectionId }); - savedCount++; - } - } - return savedCount; + return this.slowlogRepo.saveSlowLogEntries(entries, connectionId); } async getSlowLogEntries(options: SlowLogQueryOptions = {}): Promise { - let filtered = [...this.slowLogEntries]; - - if (options.connectionId) { - filtered = filtered.filter((e) => e.connectionId === options.connectionId); - } - if (options.startTime) { - filtered = filtered.filter((e) => e.timestamp >= options.startTime!); - } - if (options.endTime) { - filtered = filtered.filter((e) => e.timestamp <= options.endTime!); - } - if (options.command) { - const cmd = options.command.toLowerCase(); - // command is an array, check if the first element (command name) matches - filtered = filtered.filter((e) => e.command[0]?.toLowerCase().includes(cmd)); - } - if (options.clientName) { - const name = options.clientName.toLowerCase(); - filtered = filtered.filter((e) => e.clientName.toLowerCase().includes(name)); - } - if (options.minDuration) { - filtered = filtered.filter((e) => e.duration >= options.minDuration!); - } - - return filtered - .sort((a, b) => b.timestamp - a.timestamp) - .slice(options.offset ?? 0, (options.offset ?? 0) + (options.limit ?? 100)); + return this.slowlogRepo.getSlowLogEntries(options); } async getLatestSlowLogId(connectionId?: string): Promise { - let entries = this.slowLogEntries; - if (connectionId) { - entries = entries.filter((e) => e.connectionId === connectionId); - } - if (entries.length === 0) return null; - return Math.max(...entries.map((e) => e.id)); + return this.slowlogRepo.getLatestSlowLogId(connectionId); } async pruneOldSlowLogEntries(cutoffTimestamp: number, connectionId?: string): Promise { - const before = this.slowLogEntries.length; - if (connectionId) { - this.slowLogEntries = this.slowLogEntries.filter( - (e) => e.capturedAt >= cutoffTimestamp || e.connectionId !== connectionId, - ); - } else { - this.slowLogEntries = this.slowLogEntries.filter((e) => e.capturedAt >= cutoffTimestamp); - } - return before - this.slowLogEntries.length; + return this.slowlogRepo.pruneOldSlowLogEntries(cutoffTimestamp, connectionId); } // Command Log Methods diff --git a/apps/api/src/storage/adapters/postgres.adapter.ts b/apps/api/src/storage/adapters/postgres.adapter.ts index 5fb31c7d..8f29147b 100644 --- a/apps/api/src/storage/adapters/postgres.adapter.ts +++ b/apps/api/src/storage/adapters/postgres.adapter.ts @@ -64,9 +64,10 @@ import { } from '@betterdb/shared'; import { PostgresDialect, RowMappers } from './base-sql.adapter'; import { WebhookPostgresRepository } from './repositories/webhook.postgres.repository'; +import { SlowLogPostgresRepository } from './repositories/slowlog.postgres.repository'; -// Domain-specific repositories (webhooks extracted). Remaining domains to extract: -// ACL, anomaly, slowlog, commandlog, latency, memory, hotkeys, settings, +// Domain-specific repositories (webhooks, slowlog extracted). Remaining domains to extract: +// ACL, anomaly, commandlog, latency, memory, hotkeys, settings, // agent-tokens, metric-forecasts, client-snapshots, key-patterns, vector-index-snapshots. export interface PostgresAdapterConfig { @@ -110,6 +111,7 @@ export class PostgresAdapter implements StoragePort { private ready: boolean = false; private readonly mappers = new RowMappers(PostgresDialect); private webhookRepo!: WebhookPostgresRepository; + private slowlogRepo!: SlowLogPostgresRepository; constructor(private config: PostgresAdapterConfig) {} @@ -251,6 +253,7 @@ export class PostgresAdapter implements StoragePort { } this.webhookRepo = new WebhookPostgresRepository(this.pool, this.mappers); + this.slowlogRepo = new SlowLogPostgresRepository(this.pool, this.mappers); // Test connection (will have correct search_path if schema is set) const testClient = await this.pool.connect(); @@ -2526,127 +2529,22 @@ export class PostgresAdapter implements StoragePort { // Slow Log Methods async saveSlowLogEntries(entries: StoredSlowLogEntry[], connectionId: string): Promise { if (!this.pool || entries.length === 0) return 0; - - const values: any[] = []; - const placeholders: string[] = []; - let paramIndex = 1; - - for (const entry of entries) { - placeholders.push(`( - $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, - $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++} - )`); - values.push( - entry.id, - entry.timestamp, - entry.duration, - entry.command, // PostgreSQL will accept string[] for TEXT[] - entry.clientAddress || '', - entry.clientName || '', - entry.capturedAt, - entry.sourceHost, - entry.sourcePort, - connectionId, - ); - } - - const query = ` - INSERT INTO slow_log_entries ( - slowlog_id, timestamp, duration, command, - client_address, client_name, captured_at, source_host, source_port, connection_id - ) VALUES ${placeholders.join(', ')} - ON CONFLICT (slowlog_id, source_host, source_port, connection_id) DO NOTHING - `; - - const result = await this.pool.query(query, values); - return result.rowCount ?? 0; + return this.slowlogRepo.saveSlowLogEntries(entries, connectionId); } async getSlowLogEntries(options: SlowLogQueryOptions = {}): Promise { if (!this.pool) throw new Error('Database not initialized'); - - const conditions: string[] = []; - const params: any[] = []; - let paramIndex = 1; - - if (options.connectionId) { - conditions.push(`connection_id = $${paramIndex++}`); - params.push(options.connectionId); - } - if (options.startTime) { - conditions.push(`timestamp >= $${paramIndex++}`); - params.push(options.startTime); - } - if (options.endTime) { - conditions.push(`timestamp <= $${paramIndex++}`); - params.push(options.endTime); - } - if (options.command) { - // Search in the first element of command array (the command name) - conditions.push(`command[1] ILIKE $${paramIndex++}`); - params.push(`%${options.command}%`); - } - if (options.clientName) { - conditions.push(`client_name ILIKE $${paramIndex++}`); - params.push(`%${options.clientName}%`); - } - if (options.minDuration) { - conditions.push(`duration >= $${paramIndex++}`); - params.push(options.minDuration); - } - - const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; - const limit = options.limit ?? 100; - const offset = options.offset ?? 0; - - const result = await this.pool.query( - `SELECT - slowlog_id, timestamp, duration, command, - client_address, client_name, captured_at, source_host, source_port, connection_id - FROM slow_log_entries - ${whereClause} - ORDER BY timestamp DESC - LIMIT $${paramIndex++} OFFSET $${paramIndex++}`, - [...params, limit, offset], - ); - - return result.rows.map((row) => this.mappers.mapSlowLogEntryRow(row)); + return this.slowlogRepo.getSlowLogEntries(options); } async getLatestSlowLogId(connectionId?: string): Promise { if (!this.pool) throw new Error('Database not initialized'); - - if (connectionId) { - const result = await this.pool.query( - 'SELECT MAX(slowlog_id) as max_id FROM slow_log_entries WHERE connection_id = $1', - [connectionId], - ); - const maxId = result.rows[0]?.max_id; - return maxId !== null && maxId !== undefined ? Number(maxId) : null; - } - - const result = await this.pool.query('SELECT MAX(slowlog_id) as max_id FROM slow_log_entries'); - - const maxId = result.rows[0]?.max_id; - return maxId !== null && maxId !== undefined ? Number(maxId) : null; + return this.slowlogRepo.getLatestSlowLogId(connectionId); } async pruneOldSlowLogEntries(cutoffTimestamp: number, connectionId?: string): Promise { if (!this.pool) throw new Error('Database not initialized'); - - if (connectionId) { - const result = await this.pool.query( - 'DELETE FROM slow_log_entries WHERE captured_at < $1 AND connection_id = $2', - [cutoffTimestamp, connectionId], - ); - return result.rowCount ?? 0; - } - - const result = await this.pool.query('DELETE FROM slow_log_entries WHERE captured_at < $1', [ - cutoffTimestamp, - ]); - - return result.rowCount ?? 0; + return this.slowlogRepo.pruneOldSlowLogEntries(cutoffTimestamp, connectionId); } // Command Log Methods (Valkey-specific) diff --git a/apps/api/src/storage/adapters/repositories/slowlog.memory.repository.ts b/apps/api/src/storage/adapters/repositories/slowlog.memory.repository.ts new file mode 100644 index 00000000..f9a38fe4 --- /dev/null +++ b/apps/api/src/storage/adapters/repositories/slowlog.memory.repository.ts @@ -0,0 +1,78 @@ +import { + SlowLogQueryOptions, + StoredSlowLogEntry, +} from '../../../common/interfaces/storage-port.interface'; + +export class SlowLogMemoryRepository { + private slowLogEntries: StoredSlowLogEntry[] = []; + + async saveSlowLogEntries(entries: StoredSlowLogEntry[], connectionId: string): Promise { + let savedCount = 0; + for (const entry of entries) { + // Check for duplicates based on unique constraint (including connectionId) + const exists = this.slowLogEntries.some( + (e) => + e.id === entry.id && + e.sourceHost === entry.sourceHost && + e.sourcePort === entry.sourcePort && + e.connectionId === connectionId, + ); + if (!exists) { + this.slowLogEntries.push({ ...entry, connectionId }); + savedCount++; + } + } + return savedCount; + } + + async getSlowLogEntries(options: SlowLogQueryOptions = {}): Promise { + let filtered = [...this.slowLogEntries]; + + if (options.connectionId) { + filtered = filtered.filter((e) => e.connectionId === options.connectionId); + } + if (options.startTime) { + filtered = filtered.filter((e) => e.timestamp >= options.startTime!); + } + if (options.endTime) { + filtered = filtered.filter((e) => e.timestamp <= options.endTime!); + } + if (options.command) { + const cmd = options.command.toLowerCase(); + // command is an array, check if the first element (command name) matches + filtered = filtered.filter((e) => e.command[0]?.toLowerCase().includes(cmd)); + } + if (options.clientName) { + const name = options.clientName.toLowerCase(); + filtered = filtered.filter((e) => e.clientName.toLowerCase().includes(name)); + } + if (options.minDuration) { + filtered = filtered.filter((e) => e.duration >= options.minDuration!); + } + + return filtered + .sort((a, b) => b.timestamp - a.timestamp) + .slice(options.offset ?? 0, (options.offset ?? 0) + (options.limit ?? 100)); + } + + async getLatestSlowLogId(connectionId?: string): Promise { + let entries = this.slowLogEntries; + if (connectionId) { + entries = entries.filter((e) => e.connectionId === connectionId); + } + if (entries.length === 0) return null; + return Math.max(...entries.map((e) => e.id)); + } + + async pruneOldSlowLogEntries(cutoffTimestamp: number, connectionId?: string): Promise { + const before = this.slowLogEntries.length; + if (connectionId) { + this.slowLogEntries = this.slowLogEntries.filter( + (e) => e.capturedAt >= cutoffTimestamp || e.connectionId !== connectionId, + ); + } else { + this.slowLogEntries = this.slowLogEntries.filter((e) => e.capturedAt >= cutoffTimestamp); + } + return before - this.slowLogEntries.length; + } +} diff --git a/apps/api/src/storage/adapters/repositories/slowlog.postgres.repository.ts b/apps/api/src/storage/adapters/repositories/slowlog.postgres.repository.ts new file mode 100644 index 00000000..62ab6b3e --- /dev/null +++ b/apps/api/src/storage/adapters/repositories/slowlog.postgres.repository.ts @@ -0,0 +1,132 @@ +import { Pool } from 'pg'; +import { + SlowLogQueryOptions, + StoredSlowLogEntry, +} from '../../../common/interfaces/storage-port.interface'; +import { RowMappers } from '../base-sql.adapter'; + +export class SlowLogPostgresRepository { + constructor( + private readonly pool: Pool, + private readonly mappers: RowMappers, + ) {} + + async saveSlowLogEntries(entries: StoredSlowLogEntry[], connectionId: string): Promise { + if (entries.length === 0) return 0; + + const values: any[] = []; + const placeholders: string[] = []; + let paramIndex = 1; + + for (const entry of entries) { + placeholders.push(`( + $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, + $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++} + )`); + values.push( + entry.id, + entry.timestamp, + entry.duration, + entry.command, // PostgreSQL will accept string[] for TEXT[] + entry.clientAddress || '', + entry.clientName || '', + entry.capturedAt, + entry.sourceHost, + entry.sourcePort, + connectionId, + ); + } + + const query = ` + INSERT INTO slow_log_entries ( + slowlog_id, timestamp, duration, command, + client_address, client_name, captured_at, source_host, source_port, connection_id + ) VALUES ${placeholders.join(', ')} + ON CONFLICT (slowlog_id, source_host, source_port, connection_id) DO NOTHING + `; + + const result = await this.pool.query(query, values); + return result.rowCount ?? 0; + } + + async getSlowLogEntries(options: SlowLogQueryOptions = {}): Promise { + const conditions: string[] = []; + const params: any[] = []; + let paramIndex = 1; + + if (options.connectionId) { + conditions.push(`connection_id = $${paramIndex++}`); + params.push(options.connectionId); + } + if (options.startTime) { + conditions.push(`timestamp >= $${paramIndex++}`); + params.push(options.startTime); + } + if (options.endTime) { + conditions.push(`timestamp <= $${paramIndex++}`); + params.push(options.endTime); + } + if (options.command) { + // Search in the first element of command array (the command name) + conditions.push(`command[1] ILIKE $${paramIndex++}`); + params.push(`%${options.command}%`); + } + if (options.clientName) { + conditions.push(`client_name ILIKE $${paramIndex++}`); + params.push(`%${options.clientName}%`); + } + if (options.minDuration) { + conditions.push(`duration >= $${paramIndex++}`); + params.push(options.minDuration); + } + + const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; + const limit = options.limit ?? 100; + const offset = options.offset ?? 0; + + const result = await this.pool.query( + `SELECT + slowlog_id, timestamp, duration, command, + client_address, client_name, captured_at, source_host, source_port, connection_id + FROM slow_log_entries + ${whereClause} + ORDER BY timestamp DESC + LIMIT $${paramIndex++} OFFSET $${paramIndex++}`, + [...params, limit, offset], + ); + + return result.rows.map((row) => this.mappers.mapSlowLogEntryRow(row)); + } + + async getLatestSlowLogId(connectionId?: string): Promise { + if (connectionId) { + const result = await this.pool.query( + 'SELECT MAX(slowlog_id) as max_id FROM slow_log_entries WHERE connection_id = $1', + [connectionId], + ); + const maxId = result.rows[0]?.max_id; + return maxId !== null && maxId !== undefined ? Number(maxId) : null; + } + + const result = await this.pool.query('SELECT MAX(slowlog_id) as max_id FROM slow_log_entries'); + + const maxId = result.rows[0]?.max_id; + return maxId !== null && maxId !== undefined ? Number(maxId) : null; + } + + async pruneOldSlowLogEntries(cutoffTimestamp: number, connectionId?: string): Promise { + if (connectionId) { + const result = await this.pool.query( + 'DELETE FROM slow_log_entries WHERE captured_at < $1 AND connection_id = $2', + [cutoffTimestamp, connectionId], + ); + return result.rowCount ?? 0; + } + + const result = await this.pool.query('DELETE FROM slow_log_entries WHERE captured_at < $1', [ + cutoffTimestamp, + ]); + + return result.rowCount ?? 0; + } +} diff --git a/apps/api/src/storage/adapters/repositories/slowlog.sqlite.repository.ts b/apps/api/src/storage/adapters/repositories/slowlog.sqlite.repository.ts new file mode 100644 index 00000000..86b8d028 --- /dev/null +++ b/apps/api/src/storage/adapters/repositories/slowlog.sqlite.repository.ts @@ -0,0 +1,121 @@ +import Database from 'better-sqlite3'; +import { + SlowLogQueryOptions, + StoredSlowLogEntry, +} from '../../../common/interfaces/storage-port.interface'; +import { RowMappers } from '../base-sql.adapter'; + +export class SlowLogSqliteRepository { + constructor( + private readonly db: Database.Database, + private readonly mappers: RowMappers, + ) {} + + async saveSlowLogEntries(entries: StoredSlowLogEntry[], connectionId: string): Promise { + if (entries.length === 0) return 0; + + const stmt = this.db.prepare(` + INSERT OR IGNORE INTO slow_log_entries ( + slowlog_id, timestamp, duration, command, + client_address, client_name, captured_at, source_host, source_port, connection_id + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `); + + let count = 0; + const transaction = this.db.transaction((connId: string) => { + for (const entry of entries) { + const result = stmt.run( + entry.id, + entry.timestamp, + entry.duration, + JSON.stringify(entry.command), // Store as JSON string + entry.clientAddress || '', + entry.clientName || '', + entry.capturedAt, + entry.sourceHost, + entry.sourcePort, + connId, + ); + count += result.changes; + } + }); + transaction(connectionId); + + return count; + } + + async getSlowLogEntries(options: SlowLogQueryOptions = {}): Promise { + const conditions: string[] = []; + const params: any[] = []; + + if (options.connectionId) { + conditions.push('connection_id = ?'); + params.push(options.connectionId); + } + if (options.startTime) { + conditions.push('timestamp >= ?'); + params.push(options.startTime); + } + if (options.endTime) { + conditions.push('timestamp <= ?'); + params.push(options.endTime); + } + if (options.command) { + conditions.push('command LIKE ?'); + params.push(`%${options.command}%`); + } + if (options.clientName) { + conditions.push('client_name LIKE ?'); + params.push(`%${options.clientName}%`); + } + if (options.minDuration) { + conditions.push('duration >= ?'); + params.push(options.minDuration); + } + + const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; + const limit = options.limit ?? 100; + const offset = options.offset ?? 0; + + const rows = this.db + .prepare( + `SELECT slowlog_id, timestamp, duration, command, + client_address, client_name, captured_at, source_host, source_port, connection_id + FROM slow_log_entries + ${whereClause} + ORDER BY timestamp DESC + LIMIT ? OFFSET ?`, + ) + .all(...params, limit, offset) as any[]; + + return rows.map((row) => this.mappers.mapSlowLogEntryRow(row)); + } + + async getLatestSlowLogId(connectionId?: string): Promise { + if (connectionId) { + const row = this.db + .prepare('SELECT MAX(slowlog_id) as max_id FROM slow_log_entries WHERE connection_id = ?') + .get(connectionId) as any; + return row?.max_id ?? null; + } + + const row = this.db + .prepare('SELECT MAX(slowlog_id) as max_id FROM slow_log_entries') + .get() as any; + return row?.max_id ?? null; + } + + async pruneOldSlowLogEntries(cutoffTimestamp: number, connectionId?: string): Promise { + if (connectionId) { + const result = this.db + .prepare('DELETE FROM slow_log_entries WHERE captured_at < ? AND connection_id = ?') + .run(cutoffTimestamp, connectionId); + return result.changes; + } + + const result = this.db + .prepare('DELETE FROM slow_log_entries WHERE captured_at < ?') + .run(cutoffTimestamp); + return result.changes; + } +} diff --git a/apps/api/src/storage/adapters/sqlite.adapter.ts b/apps/api/src/storage/adapters/sqlite.adapter.ts index 636083dd..ad5a31b8 100644 --- a/apps/api/src/storage/adapters/sqlite.adapter.ts +++ b/apps/api/src/storage/adapters/sqlite.adapter.ts @@ -68,6 +68,7 @@ import { } from '@betterdb/shared'; import { SqliteDialect, RowMappers } from './base-sql.adapter'; import { WebhookSqliteRepository } from './repositories/webhook.sqlite.repository'; +import { SlowLogSqliteRepository } from './repositories/slowlog.sqlite.repository'; export interface SqliteAdapterConfig { filepath: string; @@ -116,6 +117,7 @@ export class SqliteAdapter implements StoragePort { private ready: boolean = false; private readonly mappers = new RowMappers(SqliteDialect); private webhookRepo!: WebhookSqliteRepository; + private slowlogRepo!: SlowLogSqliteRepository; constructor(private config: SqliteAdapterConfig) {} @@ -137,6 +139,7 @@ export class SqliteAdapter implements StoragePort { // Run migrations for existing databases this.runMigrations(); this.webhookRepo = new WebhookSqliteRepository(this.db, this.mappers); + this.slowlogRepo = new SlowLogSqliteRepository(this.db, this.mappers); this.ready = true; } catch (error) { this.ready = false; @@ -2255,116 +2258,22 @@ export class SqliteAdapter implements StoragePort { // Slow Log Methods async saveSlowLogEntries(entries: StoredSlowLogEntry[], connectionId: string): Promise { if (!this.db || entries.length === 0) return 0; - - const stmt = this.db.prepare(` - INSERT OR IGNORE INTO slow_log_entries ( - slowlog_id, timestamp, duration, command, - client_address, client_name, captured_at, source_host, source_port, connection_id - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `); - - let count = 0; - const transaction = this.db.transaction((connId: string) => { - for (const entry of entries) { - const result = stmt.run( - entry.id, - entry.timestamp, - entry.duration, - JSON.stringify(entry.command), // Store as JSON string - entry.clientAddress || '', - entry.clientName || '', - entry.capturedAt, - entry.sourceHost, - entry.sourcePort, - connId, - ); - count += result.changes; - } - }); - transaction(connectionId); - - return count; + return this.slowlogRepo.saveSlowLogEntries(entries, connectionId); } async getSlowLogEntries(options: SlowLogQueryOptions = {}): Promise { if (!this.db) throw new Error('Database not initialized'); - - const conditions: string[] = []; - const params: any[] = []; - - if (options.connectionId) { - conditions.push('connection_id = ?'); - params.push(options.connectionId); - } - if (options.startTime) { - conditions.push('timestamp >= ?'); - params.push(options.startTime); - } - if (options.endTime) { - conditions.push('timestamp <= ?'); - params.push(options.endTime); - } - if (options.command) { - conditions.push('command LIKE ?'); - params.push(`%${options.command}%`); - } - if (options.clientName) { - conditions.push('client_name LIKE ?'); - params.push(`%${options.clientName}%`); - } - if (options.minDuration) { - conditions.push('duration >= ?'); - params.push(options.minDuration); - } - - const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; - const limit = options.limit ?? 100; - const offset = options.offset ?? 0; - - const rows = this.db - .prepare( - `SELECT slowlog_id, timestamp, duration, command, - client_address, client_name, captured_at, source_host, source_port, connection_id - FROM slow_log_entries - ${whereClause} - ORDER BY timestamp DESC - LIMIT ? OFFSET ?`, - ) - .all(...params, limit, offset) as any[]; - - return rows.map((row) => this.mappers.mapSlowLogEntryRow(row)); + return this.slowlogRepo.getSlowLogEntries(options); } async getLatestSlowLogId(connectionId?: string): Promise { if (!this.db) throw new Error('Database not initialized'); - - if (connectionId) { - const row = this.db - .prepare('SELECT MAX(slowlog_id) as max_id FROM slow_log_entries WHERE connection_id = ?') - .get(connectionId) as any; - return row?.max_id ?? null; - } - - const row = this.db - .prepare('SELECT MAX(slowlog_id) as max_id FROM slow_log_entries') - .get() as any; - return row?.max_id ?? null; + return this.slowlogRepo.getLatestSlowLogId(connectionId); } async pruneOldSlowLogEntries(cutoffTimestamp: number, connectionId?: string): Promise { if (!this.db) throw new Error('Database not initialized'); - - if (connectionId) { - const result = this.db - .prepare('DELETE FROM slow_log_entries WHERE captured_at < ? AND connection_id = ?') - .run(cutoffTimestamp, connectionId); - return result.changes; - } - - const result = this.db - .prepare('DELETE FROM slow_log_entries WHERE captured_at < ?') - .run(cutoffTimestamp); - return result.changes; + return this.slowlogRepo.pruneOldSlowLogEntries(cutoffTimestamp, connectionId); } // Command Log Methods