Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 6 additions & 59 deletions apps/api/src/storage/adapters/memory.adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import type {
} from '@betterdb/shared';
import { PROPOSAL_DEFAULT_EXPIRY_MS, variantPayloadSchemaFor } from '@betterdb/shared';
import { WebhookMemoryRepository } from './repositories/webhook.memory.repository';
import { SlowLogMemoryRepository } from './repositories/slowlog.memory.repository';

const NULL_SUB_DISCRIMINATOR = '__betterdb_null__';

Expand All @@ -73,7 +74,6 @@ export class MemoryAdapter implements StoragePort {
private clientSnapshots: StoredClientSnapshot[] = [];
private anomalyEvents: StoredAnomalyEvent[] = [];
private correlatedGroups: StoredCorrelatedGroup[] = [];
private slowLogEntries: StoredSlowLogEntry[] = [];
private commandLogEntries: StoredCommandLogEntry[] = [];
private latencySnapshots: StoredLatencySnapshot[] = [];
private latencyHistograms: StoredLatencyHistogram[] = [];
Expand All @@ -83,6 +83,7 @@ export class MemoryAdapter implements StoragePort {
private settings: AppSettings | null = null;
private readonly MAX_DELIVERIES_PER_WEBHOOK = 1000;
private readonly webhookRepo = new WebhookMemoryRepository(this.MAX_DELIVERIES_PER_WEBHOOK);
private readonly slowlogRepo = new SlowLogMemoryRepository();
private idCounter = 1;
private ready: boolean = false;

Expand Down Expand Up @@ -838,73 +839,19 @@ export class MemoryAdapter implements StoragePort {

// Slow Log Methods
async saveSlowLogEntries(entries: StoredSlowLogEntry[], connectionId: string): Promise<number> {
let savedCount = 0;
for (const entry of entries) {
// Check for duplicates based on unique constraint (including connectionId)
const exists = this.slowLogEntries.some(
(e) =>
e.id === entry.id &&
e.sourceHost === entry.sourceHost &&
e.sourcePort === entry.sourcePort &&
e.connectionId === connectionId,
);
if (!exists) {
this.slowLogEntries.push({ ...entry, connectionId });
savedCount++;
}
}
return savedCount;
return this.slowlogRepo.saveSlowLogEntries(entries, connectionId);
}

async getSlowLogEntries(options: SlowLogQueryOptions = {}): Promise<StoredSlowLogEntry[]> {
let filtered = [...this.slowLogEntries];

if (options.connectionId) {
filtered = filtered.filter((e) => e.connectionId === options.connectionId);
}
if (options.startTime) {
filtered = filtered.filter((e) => e.timestamp >= options.startTime!);
}
if (options.endTime) {
filtered = filtered.filter((e) => e.timestamp <= options.endTime!);
}
if (options.command) {
const cmd = options.command.toLowerCase();
// command is an array, check if the first element (command name) matches
filtered = filtered.filter((e) => e.command[0]?.toLowerCase().includes(cmd));
}
if (options.clientName) {
const name = options.clientName.toLowerCase();
filtered = filtered.filter((e) => e.clientName.toLowerCase().includes(name));
}
if (options.minDuration) {
filtered = filtered.filter((e) => e.duration >= options.minDuration!);
}

return filtered
.sort((a, b) => b.timestamp - a.timestamp)
.slice(options.offset ?? 0, (options.offset ?? 0) + (options.limit ?? 100));
return this.slowlogRepo.getSlowLogEntries(options);
}

async getLatestSlowLogId(connectionId?: string): Promise<number | null> {
let entries = this.slowLogEntries;
if (connectionId) {
entries = entries.filter((e) => e.connectionId === connectionId);
}
if (entries.length === 0) return null;
return Math.max(...entries.map((e) => e.id));
return this.slowlogRepo.getLatestSlowLogId(connectionId);
}

async pruneOldSlowLogEntries(cutoffTimestamp: number, connectionId?: string): Promise<number> {
const before = this.slowLogEntries.length;
if (connectionId) {
this.slowLogEntries = this.slowLogEntries.filter(
(e) => e.capturedAt >= cutoffTimestamp || e.connectionId !== connectionId,
);
} else {
this.slowLogEntries = this.slowLogEntries.filter((e) => e.capturedAt >= cutoffTimestamp);
}
return before - this.slowLogEntries.length;
return this.slowlogRepo.pruneOldSlowLogEntries(cutoffTimestamp, connectionId);
}

// Command Log Methods
Expand Down
120 changes: 9 additions & 111 deletions apps/api/src/storage/adapters/postgres.adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ import {
} from '@betterdb/shared';
import { PostgresDialect, RowMappers } from './base-sql.adapter';
import { WebhookPostgresRepository } from './repositories/webhook.postgres.repository';
import { SlowLogPostgresRepository } from './repositories/slowlog.postgres.repository';

// Domain-specific repositories (webhooks extracted). Remaining domains to extract:
// ACL, anomaly, slowlog, commandlog, latency, memory, hotkeys, settings,
// Domain-specific repositories (webhooks, slowlog extracted). Remaining domains to extract:
// ACL, anomaly, commandlog, latency, memory, hotkeys, settings,
// agent-tokens, metric-forecasts, client-snapshots, key-patterns, vector-index-snapshots.

export interface PostgresAdapterConfig {
Expand Down Expand Up @@ -110,6 +111,7 @@ export class PostgresAdapter implements StoragePort {
private ready: boolean = false;
private readonly mappers = new RowMappers(PostgresDialect);
private webhookRepo!: WebhookPostgresRepository;
private slowlogRepo!: SlowLogPostgresRepository;

constructor(private config: PostgresAdapterConfig) {}

Expand Down Expand Up @@ -251,6 +253,7 @@ export class PostgresAdapter implements StoragePort {
}

this.webhookRepo = new WebhookPostgresRepository(this.pool, this.mappers);
this.slowlogRepo = new SlowLogPostgresRepository(this.pool, this.mappers);

// Test connection (will have correct search_path if schema is set)
const testClient = await this.pool.connect();
Expand Down Expand Up @@ -2526,127 +2529,22 @@ export class PostgresAdapter implements StoragePort {
// Slow Log Methods
async saveSlowLogEntries(entries: StoredSlowLogEntry[], connectionId: string): Promise<number> {
if (!this.pool || entries.length === 0) return 0;

const values: any[] = [];
const placeholders: string[] = [];
let paramIndex = 1;

for (const entry of entries) {
placeholders.push(`(
$${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++},
$${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}, $${paramIndex++}
)`);
values.push(
entry.id,
entry.timestamp,
entry.duration,
entry.command, // PostgreSQL will accept string[] for TEXT[]
entry.clientAddress || '',
entry.clientName || '',
entry.capturedAt,
entry.sourceHost,
entry.sourcePort,
connectionId,
);
}

const query = `
INSERT INTO slow_log_entries (
slowlog_id, timestamp, duration, command,
client_address, client_name, captured_at, source_host, source_port, connection_id
) VALUES ${placeholders.join(', ')}
ON CONFLICT (slowlog_id, source_host, source_port, connection_id) DO NOTHING
`;

const result = await this.pool.query(query, values);
return result.rowCount ?? 0;
return this.slowlogRepo.saveSlowLogEntries(entries, connectionId);
}

async getSlowLogEntries(options: SlowLogQueryOptions = {}): Promise<StoredSlowLogEntry[]> {
if (!this.pool) throw new Error('Database not initialized');

const conditions: string[] = [];
const params: any[] = [];
let paramIndex = 1;

if (options.connectionId) {
conditions.push(`connection_id = $${paramIndex++}`);
params.push(options.connectionId);
}
if (options.startTime) {
conditions.push(`timestamp >= $${paramIndex++}`);
params.push(options.startTime);
}
if (options.endTime) {
conditions.push(`timestamp <= $${paramIndex++}`);
params.push(options.endTime);
}
if (options.command) {
// Search in the first element of command array (the command name)
conditions.push(`command[1] ILIKE $${paramIndex++}`);
params.push(`%${options.command}%`);
}
if (options.clientName) {
conditions.push(`client_name ILIKE $${paramIndex++}`);
params.push(`%${options.clientName}%`);
}
if (options.minDuration) {
conditions.push(`duration >= $${paramIndex++}`);
params.push(options.minDuration);
}

const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : '';
const limit = options.limit ?? 100;
const offset = options.offset ?? 0;

const result = await this.pool.query(
`SELECT
slowlog_id, timestamp, duration, command,
client_address, client_name, captured_at, source_host, source_port, connection_id
FROM slow_log_entries
${whereClause}
ORDER BY timestamp DESC
LIMIT $${paramIndex++} OFFSET $${paramIndex++}`,
[...params, limit, offset],
);

return result.rows.map((row) => this.mappers.mapSlowLogEntryRow(row));
return this.slowlogRepo.getSlowLogEntries(options);
}

async getLatestSlowLogId(connectionId?: string): Promise<number | null> {
if (!this.pool) throw new Error('Database not initialized');

if (connectionId) {
const result = await this.pool.query(
'SELECT MAX(slowlog_id) as max_id FROM slow_log_entries WHERE connection_id = $1',
[connectionId],
);
const maxId = result.rows[0]?.max_id;
return maxId !== null && maxId !== undefined ? Number(maxId) : null;
}

const result = await this.pool.query('SELECT MAX(slowlog_id) as max_id FROM slow_log_entries');

const maxId = result.rows[0]?.max_id;
return maxId !== null && maxId !== undefined ? Number(maxId) : null;
return this.slowlogRepo.getLatestSlowLogId(connectionId);
}

async pruneOldSlowLogEntries(cutoffTimestamp: number, connectionId?: string): Promise<number> {
if (!this.pool) throw new Error('Database not initialized');

if (connectionId) {
const result = await this.pool.query(
'DELETE FROM slow_log_entries WHERE captured_at < $1 AND connection_id = $2',
[cutoffTimestamp, connectionId],
);
return result.rowCount ?? 0;
}

const result = await this.pool.query('DELETE FROM slow_log_entries WHERE captured_at < $1', [
cutoffTimestamp,
]);

return result.rowCount ?? 0;
return this.slowlogRepo.pruneOldSlowLogEntries(cutoffTimestamp, connectionId);
}

// Command Log Methods (Valkey-specific)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import {
SlowLogQueryOptions,
StoredSlowLogEntry,
} from '../../../common/interfaces/storage-port.interface';

export class SlowLogMemoryRepository {
private slowLogEntries: StoredSlowLogEntry[] = [];

async saveSlowLogEntries(entries: StoredSlowLogEntry[], connectionId: string): Promise<number> {
let savedCount = 0;
for (const entry of entries) {
// Check for duplicates based on unique constraint (including connectionId)
const exists = this.slowLogEntries.some(
(e) =>
e.id === entry.id &&
e.sourceHost === entry.sourceHost &&
e.sourcePort === entry.sourcePort &&
e.connectionId === connectionId,
);
if (!exists) {
this.slowLogEntries.push({ ...entry, connectionId });
savedCount++;
}
}
return savedCount;
}

async getSlowLogEntries(options: SlowLogQueryOptions = {}): Promise<StoredSlowLogEntry[]> {
let filtered = [...this.slowLogEntries];

if (options.connectionId) {
filtered = filtered.filter((e) => e.connectionId === options.connectionId);
}
if (options.startTime) {
filtered = filtered.filter((e) => e.timestamp >= options.startTime!);
}
if (options.endTime) {
filtered = filtered.filter((e) => e.timestamp <= options.endTime!);
}
if (options.command) {
const cmd = options.command.toLowerCase();
// command is an array, check if the first element (command name) matches
filtered = filtered.filter((e) => e.command[0]?.toLowerCase().includes(cmd));
}
if (options.clientName) {
const name = options.clientName.toLowerCase();
filtered = filtered.filter((e) => e.clientName.toLowerCase().includes(name));
}
if (options.minDuration) {
filtered = filtered.filter((e) => e.duration >= options.minDuration!);
}

return filtered
.sort((a, b) => b.timestamp - a.timestamp)
.slice(options.offset ?? 0, (options.offset ?? 0) + (options.limit ?? 100));
}

async getLatestSlowLogId(connectionId?: string): Promise<number | null> {
let entries = this.slowLogEntries;
if (connectionId) {
entries = entries.filter((e) => e.connectionId === connectionId);
}
if (entries.length === 0) return null;
return Math.max(...entries.map((e) => e.id));
}

async pruneOldSlowLogEntries(cutoffTimestamp: number, connectionId?: string): Promise<number> {
const before = this.slowLogEntries.length;
if (connectionId) {
this.slowLogEntries = this.slowLogEntries.filter(
(e) => e.capturedAt >= cutoffTimestamp || e.connectionId !== connectionId,
);
} else {
this.slowLogEntries = this.slowLogEntries.filter((e) => e.capturedAt >= cutoffTimestamp);
}
return before - this.slowLogEntries.length;
}
}
Loading
Loading