diff --git a/src/__tests__/fix-4534-persist-now-enotdir.test.ts b/src/__tests__/fix-4534-persist-now-enotdir.test.ts new file mode 100644 index 00000000..02127b93 --- /dev/null +++ b/src/__tests__/fix-4534-persist-now-enotdir.test.ts @@ -0,0 +1,78 @@ +/** + * Regression test for PR #4549 local-storage refactor: ensure persistNow + * swallows write errors (ENOTDIR) and records them in persistError without + * allowing unhandled rejections to escape the test runner. + */ +import fs from 'node:fs'; +import path from 'node:path'; +import os from 'node:os'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; + +import { createFileAcpLocalStorageProfile } from '../services/acp/local-storage/index.js'; + +function makeSession(id: string) { + const now = Date.now(); + return { + id, + conversationId: `conv-${id}`, + transcriptId: `transcript-${id}`, + tenantId: 'test-tenant', + ownerKeyId: 'test-key', + runnerType: 'acp' as const, + status: 'initializing' as const, + createdAt: now, + updatedAt: now, + metadata: {}, + }; +} + +describe('PR #4549: persistNow ENOTDIR does not cause unhandled rejection', () => { + let tmpDir: string; + let filePath: string; + + async function blockStorageDirectory() { + const storageDir = path.dirname(filePath); + await fs.promises.rm(storageDir, { recursive: true, force: true }); + await fs.promises.writeFile(storageDir, 'not-a-directory'); + } + + async function restoreStorageDirectory() { + const storageDir = path.dirname(filePath); + await fs.promises.rm(storageDir, { force: true }); + await fs.promises.mkdir(storageDir, { recursive: true }); + } + + beforeEach(async () => { + tmpDir = await fs.promises.mkdtemp(path.join(os.tmpdir(), 'aegis-4534-')); + filePath = path.join(tmpDir, 'acp-local-storage.json'); + }); + + afterEach(async () => { + await fs.promises.chmod(tmpDir, 0o755).catch(() => {}); + await fs.promises.rm(tmpDir, { recursive: true, force: true }).catch(() => {}); + }); + + it('records persist error but does not let it escape as unhandled rejection', async () => { + const profile = createFileAcpLocalStorageProfile({ filePath, persistDebounceMs: 0 }); + await profile.start(); + + // Replace the storage directory with a file to force ENOTDIR on all platforms. + await blockStorageDirectory(); + + // Trigger persist via a mutation — should fail silently + await profile.sessionStore.create(makeSession('fail-1')); + expect(profile.getPersistError()).not.toBeNull(); + + // Restore directory and ensure subsequent writes succeed + await restoreStorageDirectory(); + await profile.sessionStore.create(makeSession('recover-1')); + expect(profile.getPersistError()).toBeNull(); + + // Verify file contains data + const content = await fs.promises.readFile(filePath, 'utf8'); + const parsed = JSON.parse(content); + expect(parsed.sessions).toHaveLength(2); + + await profile.stop(); + }); +}); diff --git a/src/services/acp/local-storage.ts b/src/services/acp/local-storage.ts index 0df43743..42bec948 100644 --- a/src/services/acp/local-storage.ts +++ b/src/services/acp/local-storage.ts @@ -1,1192 +1,34 @@ -import { mkdir, readFile, writeFile, rename, unlink } from 'node:fs/promises'; -import path from 'node:path'; - -import { logger } from '../../logger.js'; -import type { ServiceHealth } from '../../container.js'; -import { AcpDurableIdentityError, AcpValidationError } from './errors.js'; -import { validateAcpControlActionInput } from './session-service.js'; -import { - normalizeAcpActionMetadata, - type AcpActionMetadata, - type AcpActionQueue, - type AcpActionRecord, - type AcpCancelActionOptions, - type AcpCompleteActionOptions, - type AcpEnqueueActionOptions, - type AcpFailActionOptions, - type AcpLeaseActionOptions, -} from './action-queue.js'; -import type { - AcpAppendEventInput, - AcpEventPayload, - AcpEventRecord, - AcpEventStore, - AcpListEventsInput, -} from './event-store.js'; -import type { - AcpControlActionInput, - AcpListSessionsInput, - AcpSessionRecord, - AcpSessionScope, - AcpSessionStore, -} from './types.js'; -import type { - AcpCompleteInterventionInput, - AcpPauseInterventionRecord, - AcpPauseInterventionStore, - AcpPauseSessionInput, - AcpResumeSessionInput, - AcpStartInterventionInput, -} from './pause-intervention.js'; - -export interface AcpLocalStorageProfile { - sessionStore: AcpSessionStore; - eventStore: AcpEventStore; - actionQueue: AcpActionQueue; - /** Issue #4247: Pause intervention store for ACP sessions. */ - readonly pauseInterventionStore: AcpPauseInterventionStore; - start(): Promise; - stop(signal?: AbortSignal): Promise; - health(): Promise; - getPersistError(): Error | null; -} - -/** Issue #4032: Configuration for file-backed ACP local storage. */ -export interface FileAcpLocalStorageProfileConfig { - filePath: string; - /** Issue #4032: Maximum events retained per session (default: 1000). Older events are pruned on append. */ - maxEventsPerSession?: number; - /** Issue #4032: Debounce interval in ms for coalescing rapid mutations into a single disk write (default: 3000). */ - persistDebounceMs?: number; -} - -/** Issue #4032: Terminal session statuses whose events can be pruned. */ -const PRUNABLE_SESSION_STATUSES: ReadonlySet = new Set([ - 'closed', - 'completed', - 'failed', -]); - -interface LocalState { - sessions: AcpSessionRecord[]; - events: AcpEventRecord[]; - actions: AcpActionRecord[]; - actionOrder: Map; - nextActionOrder: number; - pauseInterventions: AcpPauseInterventionRecord[]; - /** Issue #4032: Incremental event seq tracking — avoids O(n) scan on append. */ - lastEventSeqBySession: Map; -} - -type MutationHook = () => Promise; - -const noopMutationHook: MutationHook = async () => {}; -const DEFAULT_LIST_LIMIT = 100; -const MAX_LIST_LIMIT = 1_000; -/** Issue #4032: Default max events per session before pruning kicks in. */ -const DEFAULT_MAX_EVENTS_PER_SESSION = 1_000; -/** Issue #4032: Default debounce interval for persist (ms). */ -const DEFAULT_PERSIST_DEBOUNCE_MS = 3_000; - -export function createMemoryAcpLocalStorageProfile(): AcpLocalStorageProfile { - return new MemoryAcpLocalStorageProfile(); -} - -export function createFileAcpLocalStorageProfile( - config: FileAcpLocalStorageProfileConfig -): AcpLocalStorageProfile { - return new FileAcpLocalStorageProfile(config); -} - -export class MemoryAcpLocalStorageProfile implements AcpLocalStorageProfile { - private readonly state: LocalState; - readonly sessionStore: AcpSessionStore; - readonly eventStore: AcpEventStore; - readonly actionQueue: AcpActionQueue; - readonly pauseInterventionStore: MemoryAcpPauseInterventionStore; - - constructor(state: LocalState = createEmptyState(), onMutation = noopMutationHook) { - this.state = state; - this.sessionStore = new MemoryAcpSessionStore(this.state, onMutation); - this.eventStore = new MemoryAcpEventStore(this.state, onMutation); - this.actionQueue = new MemoryAcpActionQueue(this.state, onMutation); - this.pauseInterventionStore = new MemoryAcpPauseInterventionStore(this.state, onMutation); - } - - async start(): Promise {} - - async stop(_signal?: AbortSignal): Promise {} - - async health(): Promise { - return { healthy: true, details: 'memory ACP local storage profile ok' }; - } - - getPersistError(): Error | null { - return null; - } -} - /** - * File-backed ACP local storage profile. + * Backward-compatibility barrel. * - * Issue #4032: Hardened against OOM via: - * - Event compaction: max events per session, prunable terminal sessions - * - Debounced persistence: coalesces rapid mutations into single disk writes - * - Incremental event seq tracking: O(1) instead of O(n) per append - * - Lightweight serialization: skips structuredClone on persist path + * `local-storage.ts` historically bundled the ACP local-storage profile and + * its four stores in a single 1192-line file. The contents have been split + * into focused modules under `local-storage/` for the gate:arch 500-line + * ceiling, with this file preserved as a re-export surface so existing + * imports like `from './local-storage.js'` continue to resolve unchanged. */ -export class FileAcpLocalStorageProfile implements AcpLocalStorageProfile { - private state = createEmptyState(); - private started = false; - private writeChain: Promise = Promise.resolve(); - private persistError: Error | null = null; - /** Issue #4032: Dirty flag — set when state changes, cleared on persist. */ - private dirty = false; - /** Issue #4032: Debounce timer for persist. */ - private persistTimer: ReturnType | null = null; - /** Issue #4032: Resolvers for pending persist promises. */ - private pendingPersistResolvers: Array<() => void> = []; - private readonly maxEventsPerSession: number; - private readonly persistDebounceMs: number; - private readonly memorySessionStore: MemoryAcpSessionStore; - private readonly memoryEventStore: MemoryAcpEventStore; - private readonly memoryActionQueue: MemoryAcpActionQueue; - private readonly memoryPauseInterventionStore: MemoryAcpPauseInterventionStore; - - readonly sessionStore: AcpSessionStore; - readonly eventStore: AcpEventStore; - readonly actionQueue: AcpActionQueue; - readonly pauseInterventionStore: MemoryAcpPauseInterventionStore; - - constructor(private readonly config: FileAcpLocalStorageProfileConfig) { - this.maxEventsPerSession = config.maxEventsPerSession ?? DEFAULT_MAX_EVENTS_PER_SESSION; - this.persistDebounceMs = config.persistDebounceMs ?? DEFAULT_PERSIST_DEBOUNCE_MS; - const schedulePersist = (): Promise => this.schedulePersist(); - this.memorySessionStore = new MemoryAcpSessionStore(this.state, schedulePersist); - this.memoryEventStore = new MemoryAcpEventStore(this.state, schedulePersist, this.maxEventsPerSession); - this.memoryActionQueue = new MemoryAcpActionQueue(this.state, schedulePersist); - this.memoryPauseInterventionStore = new MemoryAcpPauseInterventionStore(this.state, schedulePersist); - this.sessionStore = this.memorySessionStore; - this.eventStore = this.memoryEventStore; - this.actionQueue = this.memoryActionQueue; - this.pauseInterventionStore = this.memoryPauseInterventionStore; - } - - async start(): Promise { - if (this.started) return; - await mkdir(path.dirname(this.config.filePath), { recursive: true }); - this.state = await loadState(this.config.filePath); - // Issue #4032: Prune events for terminal sessions on startup. - this.pruneCompletedSessionEvents(); - this.memorySessionStore.replaceState(this.state); - this.memoryEventStore.replaceState(this.state); - this.memoryActionQueue.replaceState(this.state); - this.memoryPauseInterventionStore.replaceState(this.state); - this.started = true; - this.dirty = true; - // Issue #4032: Initial persist after load (which may have pruned events). - await this.flush(); - } - - async stop(_signal?: AbortSignal): Promise { - if (!this.started) return; - // Issue #4032: Flush any pending dirty state before shutdown. - if (this.persistTimer !== null) { - clearTimeout(this.persistTimer); - this.persistTimer = null; - } - await this.flush().catch(() => {}); - // Best-effort final write chain — swallow errors so shutdown completes - await this.writeChain.catch(() => {}); - this.started = false; - } - - async health(): Promise { - if (!this.started) { - return { healthy: false, details: 'file ACP local storage profile not started' }; - } - return { healthy: true, details: 'file ACP local storage profile ok' }; - } - - /** - * Issue #4032: Flush dirty state to disk immediately. - * Called on shutdown and after initial load. - */ - private async flush(): Promise { - if (!this.dirty) return; - this.dirty = false; - try { - await this.persist(); - } finally { - // Resolve any callers that awaited onMutation() so they don't hang. - const resolvers = this.pendingPersistResolvers.splice(0); - for (const r of resolvers) { - try { r(); } catch {} - } - } - } - - /** - * Issue #4032: Schedule a debounced persist. - * Coalesces rapid mutations into a single disk write. - */ - private schedulePersist(): Promise { - this.dirty = true; - return new Promise((resolve) => { - // Track resolver so callers can be notified when persist completes. - this.pendingPersistResolvers.push(resolve); - if (this.persistTimer !== null) return; - this.persistTimer = setTimeout(() => { - this.persistTimer = null; - // When flush finishes, we'll resolve all pending resolvers there. - void this.flush().catch(() => {}); - }, this.persistDebounceMs); - }); - } - - private async persist(): Promise { - if (!this.started) { - throw new Error('FileAcpLocalStorageProfile: persist() called before start()'); - } - // Issue #3045: atomic write to prevent truncation on SIGTERM/OOM kill - // Issue #3366: error recovery — a failed write must not poison subsequent writes - // #3363: restrict file permissions to owner-only - // Issue #4032: Use lightweight serialization — no structuredClone needed since - // we're about to JSON.stringify anyway. - const content = `${JSON.stringify(serializeStateLightweight(this.state), null, 2)}\n`; - const tmpFile = `${this.config.filePath}.tmp.${process.pid}`; - - const prevChain = this.writeChain; - this.writeChain = prevChain - .then( - // Previous write succeeded — do this write - () => writeFile(tmpFile, content, { mode: 0o600 }).then(() => rename(tmpFile, this.config.filePath)), - // Previous write failed — still attempt this write - () => writeFile(tmpFile, content, { mode: 0o600 }).then(() => rename(tmpFile, this.config.filePath)), - ) - .then(() => { - this.persistError = null; - }) - .catch((err: Error) => { - this.persistError = err; - logger.error({ - component: 'acp-local-storage', - operation: 'persist', - errorCode: 'PERSIST_FAILED', - attributes: { error: err.message, filePath: this.config.filePath }, - }); - // Clean up stale tmp file if it exists - unlink(tmpFile).catch(() => {}); - // Reset chain so next persist() is not chained to a rejected promise - this.writeChain = Promise.resolve(); - }); - await this.writeChain; - } - - /** - * Returns the last persist error, or null if all writes succeeded. - * Useful for diagnostics and health checks. - */ - getPersistError(): Error | null { - return this.persistError; - } - - /** - * Issue #4032: Remove events for sessions that have reached a terminal status. - * Called on startup and can be called periodically. - */ - private pruneCompletedSessionEvents(): void { - const prunableSessionIds = new Set(); - for (const session of this.state.sessions) { - if (PRUNABLE_SESSION_STATUSES.has(session.status)) { - prunableSessionIds.add(session.id); - } - } - if (prunableSessionIds.size === 0) return; - - const before = this.state.events.length; - this.state.events = this.state.events.filter( - event => !prunableSessionIds.has(event.sessionId) - ); - const pruned = before - this.state.events.length; - - // Issue #4032: Clean up seq tracking for pruned sessions. - for (const sessionId of prunableSessionIds) { - this.state.lastEventSeqBySession.delete(sessionId); - } - - if (pruned > 0) { - logger.info({ - component: 'acp-local-storage', - operation: 'pruneCompletedSessionEvents', - attributes: { prunedEventCount: pruned, prunedSessionCount: prunableSessionIds.size }, - }); - } - } -} - -export class MemoryAcpSessionStore implements AcpSessionStore { - constructor( - private state: LocalState = createEmptyState(), - private readonly onMutation: MutationHook = noopMutationHook - ) {} - - async create(record: AcpSessionRecord): Promise { - const existing = this.state.sessions.find( - session => - session.id === record.id && - session.tenantId === record.tenantId && - session.ownerKeyId === record.ownerKeyId - ); - if (existing !== undefined) { - throw new AcpDurableIdentityError(`ACP session already exists: ${record.id}`); - } - this.state.sessions.push(cloneSession(record)); - await this.onMutation(); - } - - async get(id: string, scope: AcpSessionScope): Promise { - validateScope(scope); - const record = this.state.sessions.find( - session => - session.id === id && - session.tenantId === scope.tenantId && - session.ownerKeyId === scope.ownerKeyId - ); - return record === undefined ? null : cloneSession(record); - } - - async update(record: AcpSessionRecord, scope: AcpSessionScope): Promise { - validateScope(scope); - if (record.tenantId !== scope.tenantId || record.ownerKeyId !== scope.ownerKeyId) { - throw new Error('MemoryAcpSessionStore: record scope does not match requested scope'); - } - const index = this.state.sessions.findIndex( - session => - session.id === record.id && - session.tenantId === scope.tenantId && - session.ownerKeyId === scope.ownerKeyId - ); - const previous = this.state.sessions[index]; - if (previous === undefined) return null; - const persisted: AcpSessionRecord = { - ...previous, - acpAgentSessionId: record.acpAgentSessionId, - claudeSessionId: record.claudeSessionId, - currentBackendRunId: record.currentBackendRunId, - status: record.status, - updatedAt: record.updatedAt, - closedAt: record.closedAt, - failedAt: record.failedAt, - backendMetadata: record.backendMetadata === undefined ? undefined : { ...record.backendMetadata }, - validationWarnings: record.validationWarnings, - }; - this.state.sessions[index] = cloneSession(persisted); - await this.onMutation(); - return cloneSession(persisted); - } - - async list(input: AcpListSessionsInput): Promise { - validateScope(input); - const limit = resolveSessionListLimit(input.limit); - return this.state.sessions - .filter( - session => - session.tenantId === input.tenantId && - session.ownerKeyId === input.ownerKeyId && - (input.statuses === undefined || input.statuses.includes(session.status)) && - (input.updatedAfter === undefined || session.updatedAt > input.updatedAfter) - ) - .sort((a, b) => b.updatedAt - a.updatedAt) - .slice(0, limit) - .map(cloneSession); - } - - replaceState(state: LocalState): void { - this.state = state; - } -} - -/** - * Issue #4032: Event store with configurable per-session event limit and - * incremental seq tracking. - */ -export class MemoryAcpEventStore implements AcpEventStore { - private readonly maxEventsPerSession: number; - - constructor( - private state: LocalState = createEmptyState(), - private readonly onMutation: MutationHook = noopMutationHook, - maxEventsPerSession: number = DEFAULT_MAX_EVENTS_PER_SESSION, - ) { - this.maxEventsPerSession = maxEventsPerSession; - } - - async append(input: AcpAppendEventInput): Promise { - const validated = validateAppendInput(input); - const scopeConflict = this.state.events.find( - event => - event.sessionId === validated.sessionId && - (event.tenantId !== validated.tenantId || event.ownerKeyId !== validated.ownerKeyId) - ); - if (scopeConflict !== undefined) { - throw new Error('MemoryAcpEventStore: session scope does not match existing event stream'); - } - // Issue #4032: O(1) seq lookup instead of O(n) scan. - const eventSeq = this.nextEventSeqIncremental(validated.sessionId); - const record: AcpEventRecord = { - ...validated, - eventSeq, - eventId: `${validated.tenantId}:${validated.ownerKeyId}:${validated.sessionId}:${eventSeq}`, - occurredAt: new Date(validated.occurredAt.getTime()), - ingestedAt: new Date(), - payload: clonePayload(validated.payload), - }; - this.state.events.push(cloneEvent(record)); - // Issue #4032: Update seq tracking after append. - this.state.lastEventSeqBySession.set(validated.sessionId, eventSeq); - // Issue #4032: Prune oldest events for this session if limit exceeded. - this.pruneSessionEvents(validated.sessionId); - await this.onMutation(); - return cloneEvent(record); - } - - async list(input: AcpListEventsInput): Promise { - validateListInput(input); - const afterEventSeq = resolveAfterEventSeq(input.afterEventSeq); - const limit = resolveLimit(input.limit); - return this.state.events - .filter( - event => - event.sessionId === input.sessionId && - event.tenantId === input.tenantId && - event.ownerKeyId === input.ownerKeyId && - event.eventSeq > afterEventSeq - ) - .sort((left, right) => left.eventSeq - right.eventSeq) - .slice(0, limit) - .map(cloneEvent); - } - - replaceState(state: LocalState): void { - this.state = state; - } - - /** - * Issue #4032: O(1) event seq lookup using incremental tracking map. - */ - private nextEventSeqIncremental(sessionId: string): number { - const lastSeq = this.state.lastEventSeqBySession.get(sessionId); - if (lastSeq !== undefined) return lastSeq + 1; - // First event for this session — scan once to seed the map (only happens once per session). - return Math.max(0, ...this.state.events.filter(event => event.sessionId === sessionId).map(event => event.eventSeq)) + 1; - } - - /** - * Issue #4032: Prune oldest events for a session when the count exceeds maxEventsPerSession. - */ - private pruneSessionEvents(sessionId: string): void { - const sessionEvents = this.state.events.filter(e => e.sessionId === sessionId); - if (sessionEvents.length <= this.maxEventsPerSession) return; - - const pruneCount = sessionEvents.length - this.maxEventsPerSession; - // Sort by eventSeq ascending to identify oldest. - sessionEvents.sort((a, b) => a.eventSeq - b.eventSeq); - const prunableSeqs = new Set( - sessionEvents.slice(0, pruneCount).map(e => e.eventSeq) - ); - - const before = this.state.events.length; - this.state.events = this.state.events.filter( - e => !(e.sessionId === sessionId && prunableSeqs.has(e.eventSeq)) - ); - - if (this.state.events.length < before) { - logger.info({ - component: 'acp-local-storage', - operation: 'pruneSessionEvents', - attributes: { sessionId, prunedCount: before - this.state.events.length, remainingForSession: this.maxEventsPerSession }, - }); - } - } -} - -export class MemoryAcpActionQueue implements AcpActionQueue { - constructor( - private state: LocalState = createEmptyState(), - private readonly onMutation: MutationHook = noopMutationHook - ) {} - - async enqueue( - input: AcpControlActionInput, - options: AcpEnqueueActionOptions = {} - ): Promise { - validateAcpControlActionInput(input); - const metadata = normalizeAcpActionMetadata(input.metadata) ?? {}; - const availableAt = options.availableAt ?? new Date(); - assertValidDate(availableAt, 'action availableAt'); - - if (input.idempotencyKey !== undefined) { - const existing = this.findByIdempotencyKey(input, input.idempotencyKey); - if (existing !== null) return existing; - } - if (this.state.actions.some(action => action.actionId === input.actionId)) { - throw new AcpDurableIdentityError(`ACP action id already exists: ${input.actionId}`); - } - - const record: AcpActionRecord = { - actionId: input.actionId, - sessionId: input.sessionId, - tenantId: input.tenantId, - ownerKeyId: input.ownerKeyId, - actionType: input.type, - idempotencyKey: input.idempotencyKey, - status: 'queued', - createdAt: new Date(), - availableAt: new Date(availableAt.getTime()), - attemptCount: 0, - approvalId: input.approvalId, - controlRequestId: input.controlRequestId, - metadata, - }; - this.state.actions.push(cloneAction(record)); - this.state.actionOrder.set(record.actionId, this.state.nextActionOrder); - this.state.nextActionOrder += 1; - await this.onMutation(); - return cloneAction(record); - } - - async leaseNext( - scope: AcpSessionScope, - options: AcpLeaseActionOptions - ): Promise { - validateScope(scope); - const now = options.now ?? new Date(); - assertValidDate(now, 'action lease now'); - assertValidDate(options.leaseUntil, 'action leaseUntil'); - if (options.leaseUntil.getTime() <= now.getTime()) { - throw new AcpValidationError('ACP action leaseUntil must be after now'); - } - const candidate = this.state.actions - .filter( - action => - action.tenantId === scope.tenantId && - action.ownerKeyId === scope.ownerKeyId && - action.status === 'queued' && - action.availableAt.getTime() <= now.getTime() - ) - .sort((left, right) => compareLeaseOrder(this.state, left, right))[0]; - if (candidate === undefined) return null; - candidate.status = 'leased'; - candidate.leasedUntil = new Date(options.leaseUntil.getTime()); - candidate.attemptCount += 1; - await this.onMutation(); - return cloneAction(candidate); - } - - async complete( - actionId: string, - scope: AcpSessionScope, - options: AcpCompleteActionOptions = {} - ): Promise { - const resultMetadata = normalizeAcpActionMetadata( - options.resultMetadata, - 'action result metadata' - ); - return this.finishLeased(actionId, scope, options.now, 'completed', resultMetadata); - } - - async fail( - actionId: string, - scope: AcpSessionScope, - options: AcpFailActionOptions = {} - ): Promise { - const errorMetadata = normalizeAcpActionMetadata(options.errorMetadata, 'action error metadata'); - return this.finishLeased(actionId, scope, options.now, 'failed', errorMetadata); - } - - async cancel( - actionId: string, - scope: AcpSessionScope, - options: AcpCancelActionOptions = {} - ): Promise { - validateActionIdAndScope(actionId, scope); - const now = options.now ?? new Date(); - assertValidDate(now, 'action cancellation time'); - const action = this.findAction(actionId, scope); - if (action === null || (action.status !== 'queued' && action.status !== 'leased')) return null; - action.status = 'cancelled'; - action.errorMetadata = normalizeAcpActionMetadata(options.errorMetadata, 'action error metadata'); - action.cancelledAt = new Date(now.getTime()); - action.leasedUntil = undefined; - await this.onMutation(); - return cloneAction(action); - } - - replaceState(state: LocalState): void { - this.state = state; - } - - async sweepOrphanedActions(now: Date = new Date()): Promise { - const recovered: AcpActionRecord[] = []; - for (const action of this.state.actions) { - if ( - action.status === 'leased' && - action.leasedUntil !== undefined && - action.leasedUntil.getTime() <= now.getTime() - ) { - action.status = 'failed'; - action.failedAt = new Date(now.getTime()); - action.leasedUntil = undefined; - action.errorMetadata = { - sweeper: true, - reason: 'lease_expired', - recoveredAt: now.toISOString(), - }; - recovered.push(cloneAction(action)); - } - } - if (recovered.length > 0) { - await this.onMutation(); - } - return recovered; - } - - private findByIdempotencyKey( - input: AcpControlActionInput, - idempotencyKey: string - ): AcpActionRecord | null { - const action = this.state.actions.find( - record => - record.tenantId === input.tenantId && - record.ownerKeyId === input.ownerKeyId && - record.sessionId === input.sessionId && - record.idempotencyKey === idempotencyKey - ); - return action === undefined ? null : cloneAction(action); - } - - private async finishLeased( - actionId: string, - scope: AcpSessionScope, - nowValue: Date | undefined, - status: 'completed' | 'failed', - metadata: AcpActionMetadata | undefined - ): Promise { - validateActionIdAndScope(actionId, scope); - const now = nowValue ?? new Date(); - assertValidDate(now, status === 'completed' ? 'action completion time' : 'action failure time'); - const action = this.findAction(actionId, scope); - if (action === null || action.status !== 'leased') return null; - action.status = status; - action.leasedUntil = undefined; - if (status === 'completed') { - action.resultMetadata = metadata; - action.completedAt = new Date(now.getTime()); - } else { - action.errorMetadata = metadata; - action.failedAt = new Date(now.getTime()); - } - await this.onMutation(); - return cloneAction(action); - } - - private findAction(actionId: string, scope: AcpSessionScope): AcpActionRecord | null { - return ( - this.state.actions.find( - action => - action.actionId === actionId && - action.tenantId === scope.tenantId && - action.ownerKeyId === scope.ownerKeyId - ) ?? null - ); - } -} - -export class MemoryAcpPauseInterventionStore implements AcpPauseInterventionStore { - constructor( - private state: LocalState = createEmptyState(), - private readonly onMutation: MutationHook = noopMutationHook, - ) {} - - async pause(input: AcpPauseSessionInput): Promise { - validateScope(input); - const existing = this.state.pauseInterventions.find( - (p) => - p.sessionId === input.sessionId && - p.tenantId === input.tenantId && - p.ownerKeyId === input.ownerKeyId && - (p.status === 'paused' || p.status === 'intervening'), - ); - if (existing !== undefined) { - throw new AcpDurableIdentityError( - `ACP pause already active for session: ${input.sessionId}`, - ); - } - const record: AcpPauseInterventionRecord = { - pauseId: input.pauseId, - sessionId: input.sessionId, - tenantId: input.tenantId, - ownerKeyId: input.ownerKeyId, - status: 'paused', - idempotencyKey: input.idempotencyKey, - reason: input.reason, - requestedBy: input.requestedBy, - requestedAt: input.requestedAt ?? new Date(), - metadata: input.metadata === undefined ? undefined : { ...input.metadata }, - updatedAt: new Date(), - }; - this.state.pauseInterventions.push(clonePauseIntervention(record)); - await this.onMutation(); - return clonePauseIntervention(record); - } - - async getActive( - sessionId: string, - scope: AcpSessionScope, - ): Promise { - validateScope(scope); - const record = this.state.pauseInterventions - .filter( - (p) => - p.sessionId === sessionId && - p.tenantId === scope.tenantId && - p.ownerKeyId === scope.ownerKeyId && - (p.status === 'paused' || p.status === 'intervening'), - ) - .sort((a, b) => b.updatedAt.getTime() - a.updatedAt.getTime())[0]; - return record === undefined ? null : clonePauseIntervention(record); - } - - async getLatest( - sessionId: string, - scope: AcpSessionScope, - ): Promise { - validateScope(scope); - const record = this.state.pauseInterventions - .filter( - (p) => - p.sessionId === sessionId && - p.tenantId === scope.tenantId && - p.ownerKeyId === scope.ownerKeyId, - ) - .sort((a, b) => b.updatedAt.getTime() - a.updatedAt.getTime())[0]; - return record === undefined ? null : clonePauseIntervention(record); - } - - async startIntervention( - input: AcpStartInterventionInput, - ): Promise { - validateScope(input); - const record = this.state.pauseInterventions.find( - (p) => - p.sessionId === input.sessionId && - p.tenantId === input.tenantId && - p.ownerKeyId === input.ownerKeyId && - p.status === 'paused', - ); - if (record === undefined) return null; - record.status = 'intervening'; - record.interventionId = input.interventionId; - record.interventionBy = input.interventionBy; - record.interventionStartedAt = input.startedAt ?? new Date(); - record.updatedAt = new Date(); - await this.onMutation(); - return clonePauseIntervention(record); - } - - async completeIntervention( - input: AcpCompleteInterventionInput, - ): Promise { - validateScope(input); - const record = this.state.pauseInterventions.find( - (p) => - p.sessionId === input.sessionId && - p.tenantId === input.tenantId && - p.ownerKeyId === input.ownerKeyId && - p.status === 'intervening' && - p.interventionId === input.interventionId, - ); - if (record === undefined) return null; - record.interventionCompletedBy = input.completedBy; - record.interventionCompletedAt = input.completedAt ?? new Date(); - record.guidance = input.guidance; - record.updatedAt = new Date(); - await this.onMutation(); - return clonePauseIntervention(record); - } - - async resume(input: AcpResumeSessionInput): Promise { - validateScope(input); - const record = this.state.pauseInterventions.find( - (p) => - p.sessionId === input.sessionId && - p.tenantId === input.tenantId && - p.ownerKeyId === input.ownerKeyId && - (p.status === 'paused' || p.status === 'intervening'), - ); - if (record === undefined) return null; - record.status = 'resumed'; - record.resumeId = input.resumeId; - record.resumedBy = input.resumedBy; - record.resumedAt = input.resumedAt ?? new Date(); - record.updatedAt = new Date(); - await this.onMutation(); - return clonePauseIntervention(record); - } - - replaceState(state: LocalState): void { - this.state = state; - } -} - -function createEmptyState(): LocalState { - return { - sessions: [], - events: [], - actions: [], - actionOrder: new Map(), - nextActionOrder: 0, - pauseInterventions: [], - lastEventSeqBySession: new Map(), - }; -} - -async function loadState(filePath: string): Promise { - try { - return deserializeState(JSON.parse(await readFile(filePath, 'utf8'))); - } catch (error) { - if (isNodeError(error) && error.code === 'ENOENT') return createEmptyState(); - if (error instanceof SyntaxError) return createEmptyState(); - throw error; - } -} - -interface SerializedEvent extends Omit { - occurredAt: string; - ingestedAt: string; -} - -interface SerializedAction - extends Omit< - AcpActionRecord, - 'createdAt' | 'availableAt' | 'leasedUntil' | 'completedAt' | 'failedAt' | 'cancelledAt' - > { - createdAt: string; - availableAt: string; - leasedUntil?: string; - completedAt?: string; - failedAt?: string; - cancelledAt?: string; -} - -interface SerializedPauseIntervention - extends Omit< - AcpPauseInterventionRecord, - 'requestedAt' | 'updatedAt' | 'interventionStartedAt' | 'interventionCompletedAt' | 'resumedAt' - > { - requestedAt: string; - updatedAt: string; - interventionStartedAt?: string; - interventionCompletedAt?: string; - resumedAt?: string; -} - -interface SerializedState { - version: 1; - sessions: AcpSessionRecord[]; - events: SerializedEvent[]; - actions: SerializedAction[]; - pauseInterventions: SerializedPauseIntervention[]; -} - -/** - * Issue #4032: Lightweight serialization for the persist path. - * Skips structuredClone since we're about to JSON.stringify anyway. - * The stringifier creates a fresh value tree, so cloning is redundant. - */ -function serializeStateLightweight(state: LocalState): SerializedState { - return { - version: 1, - sessions: state.sessions.map(s => ({ - ...s, - backendMetadata: s.backendMetadata === undefined ? undefined : { ...s.backendMetadata }, - })), - events: state.events.map(event => ({ - ...event, - occurredAt: event.occurredAt.toISOString(), - ingestedAt: event.ingestedAt.toISOString(), - // No structuredClone — JSON.stringify handles the payload as-is. - })), - actions: state.actions.map(action => ({ - ...action, - createdAt: action.createdAt.toISOString(), - availableAt: action.availableAt.toISOString(), - leasedUntil: action.leasedUntil?.toISOString(), - completedAt: action.completedAt?.toISOString(), - failedAt: action.failedAt?.toISOString(), - cancelledAt: action.cancelledAt?.toISOString(), - metadata: action.metadata === undefined ? undefined : { ...action.metadata }, - resultMetadata: action.resultMetadata === undefined ? undefined : { ...action.resultMetadata }, - errorMetadata: action.errorMetadata === undefined ? undefined : { ...action.errorMetadata }, - })), - pauseInterventions: state.pauseInterventions.map(record => ({ - ...record, - requestedAt: record.requestedAt.toISOString(), - updatedAt: record.updatedAt.toISOString(), - interventionStartedAt: record.interventionStartedAt?.toISOString(), - interventionCompletedAt: record.interventionCompletedAt?.toISOString(), - resumedAt: record.resumedAt?.toISOString(), - metadata: record.metadata === undefined ? undefined : { ...record.metadata }, - resumeMetadata: record.resumeMetadata === undefined ? undefined : { ...record.resumeMetadata }, - })), - }; -} - -function deserializeState(value: unknown): LocalState { - if (!isSerializedState(value)) { - throw new Error('FileAcpLocalStorageProfile: invalid storage file'); - } - const actions = value.actions.map(action => ({ - ...action, - createdAt: parseDate(action.createdAt, 'action.createdAt'), - availableAt: parseDate(action.availableAt, 'action.availableAt'), - leasedUntil: parseOptionalDate(action.leasedUntil, 'action.leasedUntil'), - completedAt: parseOptionalDate(action.completedAt, 'action.completedAt'), - failedAt: parseOptionalDate(action.failedAt, 'action.failedAt'), - cancelledAt: parseOptionalDate(action.cancelledAt, 'action.cancelledAt'), - })); - const actionOrder = new Map(); - actions.forEach((action, index) => actionOrder.set(action.actionId, index)); - - // Issue #4032: Build incremental seq tracking from loaded events. - const lastEventSeqBySession = new Map(); - const events = value.events.map(event => { - if ((event.eventSeq ?? 0) > 0) { - const current = lastEventSeqBySession.get(event.sessionId) ?? 0; - if (event.eventSeq > current) { - lastEventSeqBySession.set(event.sessionId, event.eventSeq); - } - } - return { - ...event, - occurredAt: parseDate(event.occurredAt, 'event.occurredAt'), - ingestedAt: parseDate(event.ingestedAt, 'event.ingestedAt'), - payload: clonePayload(event.payload), - }; - }); - - return { - sessions: value.sessions.map(cloneSession), - events, - actions, - actionOrder, - nextActionOrder: actions.length, - pauseInterventions: (value.pauseInterventions ?? []).map((record: SerializedPauseIntervention) => ({ - ...record, - requestedAt: parseDate(record.requestedAt, 'pauseIntervention.requestedAt'), - updatedAt: parseDate(record.updatedAt, 'pauseIntervention.updatedAt'), - interventionStartedAt: parseOptionalDate(record.interventionStartedAt, 'pauseIntervention.interventionStartedAt'), - interventionCompletedAt: parseOptionalDate(record.interventionCompletedAt, 'pauseIntervention.interventionCompletedAt'), - resumedAt: parseOptionalDate(record.resumedAt, 'pauseIntervention.resumedAt'), - })), - lastEventSeqBySession, - }; -} - -function isSerializedState(value: unknown): value is SerializedState { - if (!isRecord(value) || value.version !== 1) return false; - return Array.isArray(value.sessions) && Array.isArray(value.events) && Array.isArray(value.actions) && Array.isArray(value.pauseInterventions ?? []); -} - -function validateAppendInput(input: AcpAppendEventInput): AcpAppendEventInput & { occurredAt: Date } { - return { - sessionId: requireNonEmptyString(input.sessionId, 'sessionId'), - tenantId: requireNonEmptyString(input.tenantId, 'tenantId'), - ownerKeyId: requireNonEmptyString(input.ownerKeyId, 'ownerKeyId'), - backendRunId: requireOptionalNonEmptyString(input.backendRunId, 'backendRunId'), - eventType: requireNonEmptyString(input.eventType, 'eventType'), - occurredAt: resolveOccurredAt(input.occurredAt), - payload: clonePayload(input.payload), - payloadRef: requireOptionalNonEmptyString(input.payloadRef, 'payloadRef'), - }; -} - -function validateListInput(input: AcpListEventsInput): void { - requireNonEmptyString(input.sessionId, 'sessionId'); - requireNonEmptyString(input.tenantId, 'tenantId'); - requireNonEmptyString(input.ownerKeyId, 'ownerKeyId'); -} - -function compareLeaseOrder(state: LocalState, left: AcpActionRecord, right: AcpActionRecord): number { - const availableDelta = left.availableAt.getTime() - right.availableAt.getTime(); - if (availableDelta !== 0) return availableDelta; - const createdDelta = left.createdAt.getTime() - right.createdAt.getTime(); - if (createdDelta !== 0) return createdDelta; - return (state.actionOrder.get(left.actionId) ?? 0) - (state.actionOrder.get(right.actionId) ?? 0); -} - -function resolveOccurredAt(value: Date | undefined): Date { - if (value === undefined) return new Date(); - assertValidDate(value, 'event occurredAt'); - return new Date(value.getTime()); -} - -function resolveAfterEventSeq(value: number | undefined): number { - if (value === undefined) return 0; - if (!Number.isSafeInteger(value) || value < 0) { - throw new AcpValidationError('ACP afterEventSeq must be a non-negative safe integer'); - } - return value; -} - -function resolveSessionListLimit(value: number | undefined): number { - if (value === undefined) return DEFAULT_LIST_LIMIT; - if (!Number.isSafeInteger(value) || value <= 0 || value > MAX_LIST_LIMIT) { - throw new AcpValidationError(`ACP session list limit must be between 1 and ${MAX_LIST_LIMIT}`); - } - return value; -} - -function resolveLimit(value: number | undefined): number { - if (value === undefined) return DEFAULT_LIST_LIMIT; - if (!Number.isSafeInteger(value) || value <= 0 || value > MAX_LIST_LIMIT) { - throw new AcpValidationError(`ACP event replay limit must be between 1 and ${MAX_LIST_LIMIT}`); - } - return value; -} - -function validateActionIdAndScope(actionId: string, scope: AcpSessionScope): void { - requireNonEmptyString(actionId, 'action id'); - validateScope(scope); -} - -function validateScope(scope: AcpSessionScope): void { - requireNonEmptyString(scope.tenantId, 'tenant id'); - requireNonEmptyString(scope.ownerKeyId, 'owner key id'); -} - -function requireNonEmptyString(value: unknown, label: string): string { - if (typeof value !== 'string' || value.trim() === '') { - throw new AcpValidationError(`ACP ${label} must be a non-empty string`); - } - return value; -} - -function requireOptionalNonEmptyString(value: unknown, label: string): string | undefined { - if (value === undefined) return undefined; - return requireNonEmptyString(value, label); -} - -function assertValidDate(value: Date, label: string): void { - if (!(value instanceof Date) || Number.isNaN(value.getTime())) { - throw new AcpValidationError(`ACP ${label} must be a valid Date`); - } -} - -function cloneSession(record: AcpSessionRecord): AcpSessionRecord { - return { - ...record, - backendMetadata: record.backendMetadata === undefined ? undefined : { ...record.backendMetadata }, - }; -} - -function cloneEvent(record: AcpEventRecord): AcpEventRecord { - return { - ...record, - occurredAt: new Date(record.occurredAt.getTime()), - ingestedAt: new Date(record.ingestedAt.getTime()), - payload: clonePayload(record.payload), - }; -} - -function cloneAction(record: AcpActionRecord): AcpActionRecord { - return { - ...record, - createdAt: new Date(record.createdAt.getTime()), - availableAt: new Date(record.availableAt.getTime()), - leasedUntil: cloneOptionalDate(record.leasedUntil), - completedAt: cloneOptionalDate(record.completedAt), - failedAt: cloneOptionalDate(record.failedAt), - cancelledAt: cloneOptionalDate(record.cancelledAt), - metadata: record.metadata === undefined ? undefined : { ...record.metadata }, - resultMetadata: record.resultMetadata === undefined ? undefined : { ...record.resultMetadata }, - errorMetadata: record.errorMetadata === undefined ? undefined : { ...record.errorMetadata }, - }; -} - -function clonePayload(value: unknown): AcpEventPayload { - validateJsonValue(value, 'payload', new WeakSet()); - return structuredClone(value); -} - -function validateJsonValue( - value: unknown, - label: string, - seen: WeakSet -): asserts value is AcpEventPayload { - if (value === null || typeof value === 'string' || typeof value === 'boolean') return; - if (typeof value === 'number') { - if (!Number.isFinite(value)) throw new AcpValidationError(`ACP ${label} number must be finite`); - return; - } - if (Array.isArray(value)) { - if (seen.has(value)) throw new AcpValidationError(`ACP ${label} cannot be circular`); - seen.add(value); - for (let index = 0; index < value.length; index += 1) { - if (!(index in value)) throw new AcpValidationError(`ACP ${label} cannot contain sparse arrays`); - validateJsonValue(value[index], label, seen); - } - seen.delete(value); - return; - } - if (isRecord(value)) { - const prototype = Object.getPrototypeOf(value); - if (prototype !== Object.prototype && prototype !== null) { - throw new AcpValidationError(`ACP ${label} object must be a plain JSON object`); - } - if (seen.has(value)) throw new AcpValidationError(`ACP ${label} cannot be circular`); - seen.add(value); - Object.values(value).forEach(property => validateJsonValue(property, label, seen)); - seen.delete(value); - return; - } - throw new AcpValidationError(`ACP ${label} must be serializable JSON`); -} - -function parseDate(value: string, label: string): Date { - const date = new Date(value); - if (Number.isNaN(date.getTime())) { - throw new Error(`FileAcpLocalStorageProfile: ${label} must be a valid date string`); - } - return date; -} - -function parseOptionalDate(value: string | undefined, label: string): Date | undefined { - return value === undefined ? undefined : parseDate(value, label); -} - -function cloneOptionalDate(value: Date | undefined): Date | undefined { - return value === undefined ? undefined : new Date(value.getTime()); -} - -function clonePauseIntervention(record: AcpPauseInterventionRecord): AcpPauseInterventionRecord { - return { - ...record, - requestedAt: new Date(record.requestedAt.getTime()), - updatedAt: new Date(record.updatedAt.getTime()), - interventionStartedAt: cloneOptionalDate(record.interventionStartedAt), - interventionCompletedAt: cloneOptionalDate(record.interventionCompletedAt), - resumedAt: cloneOptionalDate(record.resumedAt), - metadata: record.metadata === undefined ? undefined : { ...record.metadata }, - resumeMetadata: record.resumeMetadata === undefined ? undefined : { ...record.resumeMetadata }, - }; -} - -function isRecord(value: unknown): value is Record { - return typeof value === 'object' && value !== null && !Array.isArray(value); -} - -function isNodeError(error: unknown): error is NodeJS.ErrnoException { - return error instanceof Error && 'code' in error; -} +export { + DEFAULT_LIST_LIMIT, + DEFAULT_MAX_EVENTS_PER_SESSION, + DEFAULT_PERSIST_DEBOUNCE_MS, + MAX_LIST_LIMIT, + PRUNABLE_SESSION_STATUSES, + createEmptyState, + noopMutationHook, +} from './local-storage/index.js'; +export type { + AcpLocalStorageProfile, + FileAcpLocalStorageProfileConfig, + LocalState, + MutationHook, +} from './local-storage/index.js'; +export { MemoryAcpSessionStore } from './local-storage/index.js'; +export { MemoryAcpEventStore } from './local-storage/index.js'; +export { MemoryAcpActionQueue } from './local-storage/index.js'; +export { MemoryAcpPauseInterventionStore } from './local-storage/index.js'; +export { + FileAcpLocalStorageProfile, + MemoryAcpLocalStorageProfile, + createFileAcpLocalStorageProfile, + createMemoryAcpLocalStorageProfile, +} from './local-storage/index.js'; diff --git a/src/services/acp/local-storage/clone.ts b/src/services/acp/local-storage/clone.ts new file mode 100644 index 00000000..52d361d5 --- /dev/null +++ b/src/services/acp/local-storage/clone.ts @@ -0,0 +1,99 @@ +import { AcpValidationError } from '../errors.js'; +import type { AcpEventPayload, AcpEventRecord } from '../event-store.js'; +import type { AcpActionRecord } from '../action-queue.js'; +import type { AcpSessionRecord } from '../types.js'; +import type { AcpPauseInterventionRecord } from '../pause-intervention.js'; +import { isRecord } from './types.js'; + +/** + * Issue #4032: Defensive deep-clone helpers. Persistence relies on these + * to keep store state isolated from caller-mutated inputs and from prior + * payloads already written to disk. + */ + +export function clonePayload(value: unknown): AcpEventPayload { + validateJsonValue(value, 'payload', new WeakSet()); + return structuredClone(value); +} + +export function validateJsonValue( + value: unknown, + label: string, + seen: WeakSet +): asserts value is AcpEventPayload { + if (value === null || typeof value === 'string' || typeof value === 'boolean') return; + if (typeof value === 'number') { + if (!Number.isFinite(value)) throw new AcpValidationError(`ACP ${label} number must be finite`); + return; + } + if (Array.isArray(value)) { + if (seen.has(value)) throw new AcpValidationError(`ACP ${label} cannot be circular`); + seen.add(value); + for (let index = 0; index < value.length; index += 1) { + if (!(index in value)) throw new AcpValidationError(`ACP ${label} cannot contain sparse arrays`); + validateJsonValue(value[index], label, seen); + } + seen.delete(value); + return; + } + if (isRecord(value)) { + const prototype = Object.getPrototypeOf(value); + if (prototype !== Object.prototype && prototype !== null) { + throw new AcpValidationError(`ACP ${label} object must be a plain JSON object`); + } + if (seen.has(value)) throw new AcpValidationError(`ACP ${label} cannot be circular`); + seen.add(value); + Object.values(value).forEach(property => validateJsonValue(property, label, seen)); + seen.delete(value); + return; + } + throw new AcpValidationError(`ACP ${label} must be serializable JSON`); +} + +export function cloneOptionalDate(value: Date | undefined): Date | undefined { + return value === undefined ? undefined : new Date(value.getTime()); +} + +export function cloneSession(record: AcpSessionRecord): AcpSessionRecord { + return { + ...record, + backendMetadata: record.backendMetadata === undefined ? undefined : { ...record.backendMetadata }, + }; +} + +export function cloneEvent(record: AcpEventRecord): AcpEventRecord { + return { + ...record, + occurredAt: new Date(record.occurredAt.getTime()), + ingestedAt: new Date(record.ingestedAt.getTime()), + payload: clonePayload(record.payload), + }; +} + +export function cloneAction(record: AcpActionRecord): AcpActionRecord { + return { + ...record, + createdAt: new Date(record.createdAt.getTime()), + availableAt: new Date(record.availableAt.getTime()), + leasedUntil: cloneOptionalDate(record.leasedUntil), + completedAt: cloneOptionalDate(record.completedAt), + failedAt: cloneOptionalDate(record.failedAt), + cancelledAt: cloneOptionalDate(record.cancelledAt), + metadata: record.metadata === undefined ? undefined : { ...record.metadata }, + resultMetadata: record.resultMetadata === undefined ? undefined : { ...record.resultMetadata }, + errorMetadata: record.errorMetadata === undefined ? undefined : { ...record.errorMetadata }, + }; +} + +export function clonePauseIntervention(record: AcpPauseInterventionRecord): AcpPauseInterventionRecord { + return { + ...record, + requestedAt: new Date(record.requestedAt.getTime()), + updatedAt: new Date(record.updatedAt.getTime()), + interventionStartedAt: cloneOptionalDate(record.interventionStartedAt), + interventionCompletedAt: cloneOptionalDate(record.interventionCompletedAt), + resumedAt: cloneOptionalDate(record.resumedAt), + metadata: record.metadata === undefined ? undefined : { ...record.metadata }, + resumeMetadata: record.resumeMetadata === undefined ? undefined : { ...record.resumeMetadata }, + }; +} diff --git a/src/services/acp/local-storage/index.ts b/src/services/acp/local-storage/index.ts new file mode 100644 index 00000000..912b6927 --- /dev/null +++ b/src/services/acp/local-storage/index.ts @@ -0,0 +1,34 @@ +/** + * Barrel module for the ACP local-storage profile and its four bundled + * stores. All public exports continue to flow through `./local-storage.js` + * in the parent `services/acp` tree — see `services/acp/index.ts` for the + * canonical re-exports. + */ + +export { + DEFAULT_LIST_LIMIT, + DEFAULT_MAX_EVENTS_PER_SESSION, + DEFAULT_PERSIST_DEBOUNCE_MS, + MAX_LIST_LIMIT, + PRUNABLE_SESSION_STATUSES, + createEmptyState, + noopMutationHook, +} from './types.js'; +export type { + AcpLocalStorageProfile, + FileAcpLocalStorageProfileConfig, + LocalState, + MutationHook, +} from './types.js'; + +export { MemoryAcpSessionStore } from './memory-session-store.js'; +export { MemoryAcpEventStore } from './memory-event-store.js'; +export { MemoryAcpActionQueue } from './memory-action-queue.js'; +export { MemoryAcpPauseInterventionStore } from './memory-pause-intervention-store.js'; + +export { + FileAcpLocalStorageProfile, + MemoryAcpLocalStorageProfile, + createFileAcpLocalStorageProfile, + createMemoryAcpLocalStorageProfile, +} from './profiles.js'; diff --git a/src/services/acp/local-storage/memory-action-queue.ts b/src/services/acp/local-storage/memory-action-queue.ts new file mode 100644 index 00000000..6abb7230 --- /dev/null +++ b/src/services/acp/local-storage/memory-action-queue.ts @@ -0,0 +1,216 @@ +import { AcpDurableIdentityError, AcpValidationError } from '../errors.js'; +import { validateAcpControlActionInput } from '../session-service.js'; +import { + normalizeAcpActionMetadata, + type AcpActionMetadata, + type AcpActionQueue, + type AcpActionRecord, + type AcpCancelActionOptions, + type AcpCompleteActionOptions, + type AcpEnqueueActionOptions, + type AcpFailActionOptions, + type AcpLeaseActionOptions, +} from '../action-queue.js'; +import type { AcpControlActionInput } from '../types.js'; +import type { AcpSessionScope } from '../types.js'; +import { cloneAction } from './clone.js'; +import { + assertValidDate, + validateActionIdAndScope, + validateScope, +} from './validation.js'; +import { compareLeaseOrder } from './persistence.js'; +import { createEmptyState, noopMutationHook } from './types.js'; +import type { LocalState, MutationHook } from './types.js'; + +export class MemoryAcpActionQueue implements AcpActionQueue { + constructor( + private state: LocalState = createEmptyState(), + private readonly onMutation: MutationHook = noopMutationHook + ) {} + + async enqueue( + input: AcpControlActionInput, + options: AcpEnqueueActionOptions = {} + ): Promise { + validateAcpControlActionInput(input); + const metadata = normalizeAcpActionMetadata(input.metadata) ?? {}; + const availableAt = options.availableAt ?? new Date(); + assertValidDate(availableAt, 'action availableAt'); + + if (input.idempotencyKey !== undefined) { + const existing = this.findByIdempotencyKey(input, input.idempotencyKey); + if (existing !== null) return existing; + } + if (this.state.actions.some(action => action.actionId === input.actionId)) { + throw new AcpDurableIdentityError(`ACP action id already exists: ${input.actionId}`); + } + + const record: AcpActionRecord = { + actionId: input.actionId, + sessionId: input.sessionId, + tenantId: input.tenantId, + ownerKeyId: input.ownerKeyId, + actionType: input.type, + idempotencyKey: input.idempotencyKey, + status: 'queued', + createdAt: new Date(), + availableAt: new Date(availableAt.getTime()), + attemptCount: 0, + approvalId: input.approvalId, + controlRequestId: input.controlRequestId, + metadata, + }; + this.state.actions.push(cloneAction(record)); + this.state.actionOrder.set(record.actionId, this.state.nextActionOrder); + this.state.nextActionOrder += 1; + await this.onMutation(); + return cloneAction(record); + } + + async leaseNext( + scope: AcpSessionScope, + options: AcpLeaseActionOptions + ): Promise { + validateScope(scope); + const now = options.now ?? new Date(); + assertValidDate(now, 'action lease now'); + assertValidDate(options.leaseUntil, 'action leaseUntil'); + if (options.leaseUntil.getTime() <= now.getTime()) { + throw new AcpValidationError('ACP action leaseUntil must be after now'); + } + const candidate = this.state.actions + .filter( + action => + action.tenantId === scope.tenantId && + action.ownerKeyId === scope.ownerKeyId && + action.status === 'queued' && + action.availableAt.getTime() <= now.getTime() + ) + .sort((left, right) => compareLeaseOrder(this.state, left, right))[0]; + if (candidate === undefined) return null; + candidate.status = 'leased'; + candidate.leasedUntil = new Date(options.leaseUntil.getTime()); + candidate.attemptCount += 1; + await this.onMutation(); + return cloneAction(candidate); + } + + async complete( + actionId: string, + scope: AcpSessionScope, + options: AcpCompleteActionOptions = {} + ): Promise { + const resultMetadata = normalizeAcpActionMetadata( + options.resultMetadata, + 'action result metadata' + ); + return this.finishLeased(actionId, scope, options.now, 'completed', resultMetadata); + } + + async fail( + actionId: string, + scope: AcpSessionScope, + options: AcpFailActionOptions = {} + ): Promise { + const errorMetadata = normalizeAcpActionMetadata(options.errorMetadata, 'action error metadata'); + return this.finishLeased(actionId, scope, options.now, 'failed', errorMetadata); + } + + async cancel( + actionId: string, + scope: AcpSessionScope, + options: AcpCancelActionOptions = {} + ): Promise { + validateActionIdAndScope(actionId, scope); + const now = options.now ?? new Date(); + assertValidDate(now, 'action cancellation time'); + const action = this.findAction(actionId, scope); + if (action === null || (action.status !== 'queued' && action.status !== 'leased')) return null; + action.status = 'cancelled'; + action.errorMetadata = normalizeAcpActionMetadata(options.errorMetadata, 'action error metadata'); + action.cancelledAt = new Date(now.getTime()); + action.leasedUntil = undefined; + await this.onMutation(); + return cloneAction(action); + } + + replaceState(state: LocalState): void { + this.state = state; + } + + async sweepOrphanedActions(now: Date = new Date()): Promise { + const recovered: AcpActionRecord[] = []; + for (const action of this.state.actions) { + if ( + action.status === 'leased' && + action.leasedUntil !== undefined && + action.leasedUntil.getTime() <= now.getTime() + ) { + action.status = 'failed'; + action.failedAt = new Date(now.getTime()); + action.leasedUntil = undefined; + action.errorMetadata = { + sweeper: true, + reason: 'lease_expired', + recoveredAt: now.toISOString(), + }; + recovered.push(cloneAction(action)); + } + } + if (recovered.length > 0) { + await this.onMutation(); + } + return recovered; + } + + private findByIdempotencyKey( + input: AcpControlActionInput, + idempotencyKey: string + ): AcpActionRecord | null { + const action = this.state.actions.find( + record => + record.tenantId === input.tenantId && + record.ownerKeyId === input.ownerKeyId && + record.sessionId === input.sessionId && + record.idempotencyKey === idempotencyKey + ); + return action === undefined ? null : cloneAction(action); + } + + private async finishLeased( + actionId: string, + scope: AcpSessionScope, + nowValue: Date | undefined, + status: 'completed' | 'failed', + metadata: AcpActionMetadata | undefined + ): Promise { + validateActionIdAndScope(actionId, scope); + const now = nowValue ?? new Date(); + assertValidDate(now, status === 'completed' ? 'action completion time' : 'action failure time'); + const action = this.findAction(actionId, scope); + if (action === null || action.status !== 'leased') return null; + action.status = status; + action.leasedUntil = undefined; + if (status === 'completed') { + action.resultMetadata = metadata; + action.completedAt = new Date(now.getTime()); + } else { + action.errorMetadata = metadata; + action.failedAt = new Date(now.getTime()); + } + await this.onMutation(); + return cloneAction(action); + } + + private findAction(actionId: string, scope: AcpSessionScope): AcpActionRecord | null { + return ( + this.state.actions.find( + action => + action.actionId === actionId && + action.tenantId === scope.tenantId && + action.ownerKeyId === scope.ownerKeyId + ) ?? null + ); + } +} diff --git a/src/services/acp/local-storage/memory-event-store.ts b/src/services/acp/local-storage/memory-event-store.ts new file mode 100644 index 00000000..064dfe32 --- /dev/null +++ b/src/services/acp/local-storage/memory-event-store.ts @@ -0,0 +1,119 @@ +import { logger } from '../../../logger.js'; +import type { AcpAppendEventInput, AcpEventRecord, AcpEventStore, AcpListEventsInput } from '../event-store.js'; +import { cloneEvent, clonePayload } from './clone.js'; +import { + resolveAfterEventSeq, + resolveLimit, + validateAppendInput, + validateListInput, +} from './validation.js'; +import { + createEmptyState, + DEFAULT_MAX_EVENTS_PER_SESSION, + noopMutationHook, +} from './types.js'; +import type { LocalState, MutationHook } from './types.js'; + +/** + * Issue #4032: Event store with configurable per-session event limit and + * incremental seq tracking. + */ +export class MemoryAcpEventStore implements AcpEventStore { + private readonly maxEventsPerSession: number; + + constructor( + private state: LocalState = createEmptyState(), + private readonly onMutation: MutationHook = noopMutationHook, + maxEventsPerSession: number = DEFAULT_MAX_EVENTS_PER_SESSION, + ) { + this.maxEventsPerSession = maxEventsPerSession; + } + + async append(input: AcpAppendEventInput): Promise { + const validated = validateAppendInput(input); + const scopeConflict = this.state.events.find( + event => + event.sessionId === validated.sessionId && + (event.tenantId !== validated.tenantId || event.ownerKeyId !== validated.ownerKeyId) + ); + if (scopeConflict !== undefined) { + throw new Error('MemoryAcpEventStore: session scope does not match existing event stream'); + } + // Issue #4032: O(1) seq lookup instead of O(n) scan. + const eventSeq = this.nextEventSeqIncremental(validated.sessionId); + const record: AcpEventRecord = { + ...validated, + eventSeq, + eventId: `${validated.tenantId}:${validated.ownerKeyId}:${validated.sessionId}:${eventSeq}`, + occurredAt: new Date(validated.occurredAt.getTime()), + ingestedAt: new Date(), + payload: clonePayload(validated.payload), + }; + this.state.events.push(cloneEvent(record)); + // Issue #4032: Update seq tracking after append. + this.state.lastEventSeqBySession.set(validated.sessionId, eventSeq); + // Issue #4032: Prune oldest events for this session if limit exceeded. + this.pruneSessionEvents(validated.sessionId); + await this.onMutation(); + return cloneEvent(record); + } + + async list(input: AcpListEventsInput): Promise { + validateListInput(input); + const afterEventSeq = resolveAfterEventSeq(input.afterEventSeq); + const limit = resolveLimit(input.limit); + return this.state.events + .filter( + event => + event.sessionId === input.sessionId && + event.tenantId === input.tenantId && + event.ownerKeyId === input.ownerKeyId && + event.eventSeq > afterEventSeq + ) + .sort((left, right) => left.eventSeq - right.eventSeq) + .slice(0, limit) + .map(cloneEvent); + } + + replaceState(state: LocalState): void { + this.state = state; + } + + /** + * Issue #4032: O(1) event seq lookup using incremental tracking map. + */ + private nextEventSeqIncremental(sessionId: string): number { + const lastSeq = this.state.lastEventSeqBySession.get(sessionId); + if (lastSeq !== undefined) return lastSeq + 1; + // First event for this session — scan once to seed the map (only happens once per session). + return Math.max(0, ...this.state.events.filter(event => event.sessionId === sessionId).map(event => event.eventSeq)) + 1; + } + + /** + * Issue #4032: Prune oldest events for a session when the count exceeds maxEventsPerSession. + */ + private pruneSessionEvents(sessionId: string): void { + const sessionEvents = this.state.events.filter(e => e.sessionId === sessionId); + if (sessionEvents.length <= this.maxEventsPerSession) return; + + const pruneCount = sessionEvents.length - this.maxEventsPerSession; + // Sort by eventSeq ascending to identify oldest. + sessionEvents.sort((a, b) => a.eventSeq - b.eventSeq); + const prunableSeqs = new Set( + sessionEvents.slice(0, pruneCount).map(e => e.eventSeq) + ); + + const before = this.state.events.length; + this.state.events = this.state.events.filter( + e => !(e.sessionId === sessionId && prunableSeqs.has(e.eventSeq)) + ); + + if (this.state.events.length < before) { + logger.info({ + component: 'acp-local-storage', + operation: 'pruneSessionEvents', + attributes: { sessionId, prunedCount: before - this.state.events.length, remainingForSession: this.maxEventsPerSession }, + }); + } + } +} diff --git a/src/services/acp/local-storage/memory-pause-intervention-store.ts b/src/services/acp/local-storage/memory-pause-intervention-store.ts new file mode 100644 index 00000000..042d8921 --- /dev/null +++ b/src/services/acp/local-storage/memory-pause-intervention-store.ts @@ -0,0 +1,151 @@ +import { AcpDurableIdentityError } from '../errors.js'; +import type { + AcpCompleteInterventionInput, + AcpPauseInterventionRecord, + AcpPauseInterventionStore, + AcpPauseSessionInput, + AcpResumeSessionInput, + AcpStartInterventionInput, +} from '../pause-intervention.js'; +import type { AcpSessionScope } from '../types.js'; +import { clonePauseIntervention } from './clone.js'; +import { validateScope } from './validation.js'; +import { createEmptyState, noopMutationHook } from './types.js'; +import type { LocalState, MutationHook } from './types.js'; + +export class MemoryAcpPauseInterventionStore implements AcpPauseInterventionStore { + constructor( + private state: LocalState = createEmptyState(), + private readonly onMutation: MutationHook = noopMutationHook, + ) {} + + async pause(input: AcpPauseSessionInput): Promise { + validateScope(input); + const existing = this.state.pauseInterventions.find( + (p) => + p.sessionId === input.sessionId && + p.tenantId === input.tenantId && + p.ownerKeyId === input.ownerKeyId && + (p.status === 'paused' || p.status === 'intervening'), + ); + if (existing !== undefined) { + throw new AcpDurableIdentityError( + `ACP pause already active for session: ${input.sessionId}`, + ); + } + const record: AcpPauseInterventionRecord = { + pauseId: input.pauseId, + sessionId: input.sessionId, + tenantId: input.tenantId, + ownerKeyId: input.ownerKeyId, + status: 'paused', + idempotencyKey: input.idempotencyKey, + reason: input.reason, + requestedBy: input.requestedBy, + requestedAt: input.requestedAt ?? new Date(), + metadata: input.metadata === undefined ? undefined : { ...input.metadata }, + updatedAt: new Date(), + }; + this.state.pauseInterventions.push(clonePauseIntervention(record)); + await this.onMutation(); + return clonePauseIntervention(record); + } + + async getActive( + sessionId: string, + scope: AcpSessionScope, + ): Promise { + validateScope(scope); + const record = this.state.pauseInterventions + .filter( + (p) => + p.sessionId === sessionId && + p.tenantId === scope.tenantId && + p.ownerKeyId === scope.ownerKeyId && + (p.status === 'paused' || p.status === 'intervening'), + ) + .sort((a, b) => b.updatedAt.getTime() - a.updatedAt.getTime())[0]; + return record === undefined ? null : clonePauseIntervention(record); + } + + async getLatest( + sessionId: string, + scope: AcpSessionScope, + ): Promise { + validateScope(scope); + const record = this.state.pauseInterventions + .filter( + (p) => + p.sessionId === sessionId && + p.tenantId === scope.tenantId && + p.ownerKeyId === scope.ownerKeyId, + ) + .sort((a, b) => b.updatedAt.getTime() - a.updatedAt.getTime())[0]; + return record === undefined ? null : clonePauseIntervention(record); + } + + async startIntervention( + input: AcpStartInterventionInput, + ): Promise { + validateScope(input); + const record = this.state.pauseInterventions.find( + (p) => + p.sessionId === input.sessionId && + p.tenantId === input.tenantId && + p.ownerKeyId === input.ownerKeyId && + p.status === 'paused', + ); + if (record === undefined) return null; + record.status = 'intervening'; + record.interventionId = input.interventionId; + record.interventionBy = input.interventionBy; + record.interventionStartedAt = input.startedAt ?? new Date(); + record.updatedAt = new Date(); + await this.onMutation(); + return clonePauseIntervention(record); + } + + async completeIntervention( + input: AcpCompleteInterventionInput, + ): Promise { + validateScope(input); + const record = this.state.pauseInterventions.find( + (p) => + p.sessionId === input.sessionId && + p.tenantId === input.tenantId && + p.ownerKeyId === input.ownerKeyId && + p.status === 'intervening' && + p.interventionId === input.interventionId, + ); + if (record === undefined) return null; + record.interventionCompletedBy = input.completedBy; + record.interventionCompletedAt = input.completedAt ?? new Date(); + record.guidance = input.guidance; + record.updatedAt = new Date(); + await this.onMutation(); + return clonePauseIntervention(record); + } + + async resume(input: AcpResumeSessionInput): Promise { + validateScope(input); + const record = this.state.pauseInterventions.find( + (p) => + p.sessionId === input.sessionId && + p.tenantId === input.tenantId && + p.ownerKeyId === input.ownerKeyId && + (p.status === 'paused' || p.status === 'intervening'), + ); + if (record === undefined) return null; + record.status = 'resumed'; + record.resumeId = input.resumeId; + record.resumedBy = input.resumedBy; + record.resumedAt = input.resumedAt ?? new Date(); + record.updatedAt = new Date(); + await this.onMutation(); + return clonePauseIntervention(record); + } + + replaceState(state: LocalState): void { + this.state = state; + } +} diff --git a/src/services/acp/local-storage/memory-session-store.ts b/src/services/acp/local-storage/memory-session-store.ts new file mode 100644 index 00000000..28852d6f --- /dev/null +++ b/src/services/acp/local-storage/memory-session-store.ts @@ -0,0 +1,93 @@ +import { AcpDurableIdentityError } from '../errors.js'; +import type { AcpSessionRecord, AcpSessionScope, AcpSessionStore, AcpListSessionsInput } from '../types.js'; +import { cloneSession } from './clone.js'; +import { resolveSessionListLimit, validateScope } from './validation.js'; +import { createEmptyState, noopMutationHook } from './types.js'; +import type { LocalState, MutationHook } from './types.js'; + +/** + * In-memory implementation of the session store. Shares a `LocalState` + * container with the other Memory* stores and the file-backed profile, + * so all mutations are observable to persistence via the `onMutation` hook. + */ +export class MemoryAcpSessionStore implements AcpSessionStore { + constructor( + private state: LocalState = createEmptyState(), + private readonly onMutation: MutationHook = noopMutationHook + ) {} + + async create(record: AcpSessionRecord): Promise { + const existing = this.state.sessions.find( + session => + session.id === record.id && + session.tenantId === record.tenantId && + session.ownerKeyId === record.ownerKeyId + ); + if (existing !== undefined) { + throw new AcpDurableIdentityError(`ACP session already exists: ${record.id}`); + } + this.state.sessions.push(cloneSession(record)); + await this.onMutation(); + } + + async get(id: string, scope: AcpSessionScope): Promise { + validateScope(scope); + const record = this.state.sessions.find( + session => + session.id === id && + session.tenantId === scope.tenantId && + session.ownerKeyId === scope.ownerKeyId + ); + return record === undefined ? null : cloneSession(record); + } + + async update(record: AcpSessionRecord, scope: AcpSessionScope): Promise { + validateScope(scope); + if (record.tenantId !== scope.tenantId || record.ownerKeyId !== scope.ownerKeyId) { + throw new Error('MemoryAcpSessionStore: record scope does not match requested scope'); + } + const index = this.state.sessions.findIndex( + session => + session.id === record.id && + session.tenantId === scope.tenantId && + session.ownerKeyId === scope.ownerKeyId + ); + const previous = this.state.sessions[index]; + if (previous === undefined) return null; + const persisted: AcpSessionRecord = { + ...previous, + acpAgentSessionId: record.acpAgentSessionId, + claudeSessionId: record.claudeSessionId, + currentBackendRunId: record.currentBackendRunId, + status: record.status, + updatedAt: record.updatedAt, + closedAt: record.closedAt, + failedAt: record.failedAt, + backendMetadata: record.backendMetadata === undefined ? undefined : { ...record.backendMetadata }, + validationWarnings: record.validationWarnings, + }; + this.state.sessions[index] = cloneSession(persisted); + await this.onMutation(); + return cloneSession(persisted); + } + + async list(input: AcpListSessionsInput): Promise { + validateScope(input); + const limit = resolveSessionListLimit(input.limit); + return this.state.sessions + .filter( + session => + session.tenantId === input.tenantId && + session.ownerKeyId === input.ownerKeyId && + (input.statuses === undefined || input.statuses.includes(session.status)) && + (input.updatedAfter === undefined || session.updatedAt > input.updatedAfter) + ) + .sort((a, b) => b.updatedAt - a.updatedAt) + .slice(0, limit) + .map(cloneSession); + } + + replaceState(state: LocalState): void { + this.state = state; + } +} diff --git a/src/services/acp/local-storage/persistence.ts b/src/services/acp/local-storage/persistence.ts new file mode 100644 index 00000000..0f57cec3 --- /dev/null +++ b/src/services/acp/local-storage/persistence.ts @@ -0,0 +1,191 @@ +import { readFile } from 'node:fs/promises'; + +import type { AcpActionRecord } from '../action-queue.js'; +import type { AcpEventRecord } from '../event-store.js'; +import type { AcpPauseInterventionRecord } from '../pause-intervention.js'; +import { isNodeError } from './validation.js'; +import { isRecord } from './types.js'; +import { createEmptyState } from './types.js'; +import { + clonePayload, + cloneSession, +} from './clone.js'; +import type { LocalState } from './types.js'; + +export async function loadState(filePath: string): Promise { + try { + return deserializeState(JSON.parse(await readFile(filePath, 'utf8'))); + } catch (error) { + if (isNodeError(error) && error.code === 'ENOENT') return createEmptyState(); + if (error instanceof SyntaxError) return createEmptyState(); + throw error; + } +} + +export interface SerializedEvent extends Omit { + occurredAt: string; + ingestedAt: string; +} + +export interface SerializedAction + extends Omit< + AcpActionRecord, + 'createdAt' | 'availableAt' | 'leasedUntil' | 'completedAt' | 'failedAt' | 'cancelledAt' + > { + createdAt: string; + availableAt: string; + leasedUntil?: string; + completedAt?: string; + failedAt?: string; + cancelledAt?: string; +} + +export interface SerializedPauseIntervention + extends Omit< + AcpPauseInterventionRecord, + 'requestedAt' | 'updatedAt' | 'interventionStartedAt' | 'interventionCompletedAt' | 'resumedAt' + > { + requestedAt: string; + updatedAt: string; + interventionStartedAt?: string; + interventionCompletedAt?: string; + resumedAt?: string; +} + +export interface SerializedState { + version: 1; + sessions: import('../types.js').AcpSessionRecord[]; + events: SerializedEvent[]; + actions: SerializedAction[]; + pauseInterventions: SerializedPauseIntervention[]; +} + +/** + * Issue #4032: Lightweight serialization for the persist path. + * Skips structuredClone since we're about to JSON.stringify anyway. + * The stringifier creates a fresh value tree, so cloning is redundant. + */ +export function serializeStateLightweight(state: LocalState): SerializedState { + return { + version: 1, + sessions: state.sessions.map(s => ({ + ...s, + backendMetadata: s.backendMetadata === undefined ? undefined : { ...s.backendMetadata }, + })), + events: state.events.map(event => ({ + ...event, + occurredAt: event.occurredAt.toISOString(), + ingestedAt: event.ingestedAt.toISOString(), + // No structuredClone — JSON.stringify handles the payload as-is. + })), + actions: state.actions.map(action => ({ + ...action, + createdAt: action.createdAt.toISOString(), + availableAt: action.availableAt.toISOString(), + leasedUntil: action.leasedUntil?.toISOString(), + completedAt: action.completedAt?.toISOString(), + failedAt: action.failedAt?.toISOString(), + cancelledAt: action.cancelledAt?.toISOString(), + metadata: action.metadata === undefined ? undefined : { ...action.metadata }, + resultMetadata: action.resultMetadata === undefined ? undefined : { ...action.resultMetadata }, + errorMetadata: action.errorMetadata === undefined ? undefined : { ...action.errorMetadata }, + })), + pauseInterventions: state.pauseInterventions.map(record => ({ + ...record, + requestedAt: record.requestedAt.toISOString(), + updatedAt: record.updatedAt.toISOString(), + interventionStartedAt: record.interventionStartedAt?.toISOString(), + interventionCompletedAt: record.interventionCompletedAt?.toISOString(), + resumedAt: record.resumedAt?.toISOString(), + metadata: record.metadata === undefined ? undefined : { ...record.metadata }, + resumeMetadata: record.resumeMetadata === undefined ? undefined : { ...record.resumeMetadata }, + })), + }; +} + +export function deserializeState(value: unknown): LocalState { + if (!isSerializedState(value)) { + throw new Error('FileAcpLocalStorageProfile: invalid storage file'); + } + const actions = value.actions.map(action => ({ + ...action, + createdAt: parseDate(action.createdAt, 'action.createdAt'), + availableAt: parseDate(action.availableAt, 'action.availableAt'), + leasedUntil: parseOptionalDate(action.leasedUntil, 'action.leasedUntil'), + completedAt: parseOptionalDate(action.completedAt, 'action.completedAt'), + failedAt: parseOptionalDate(action.failedAt, 'action.failedAt'), + cancelledAt: parseOptionalDate(action.cancelledAt, 'action.cancelledAt'), + })); + const actionOrder = new Map(); + actions.forEach((action, index) => actionOrder.set(action.actionId, index)); + + // Issue #4032: Build incremental seq tracking from loaded events. + const lastEventSeqBySession = new Map(); + const events = value.events.map(event => { + if ((event.eventSeq ?? 0) > 0) { + const current = lastEventSeqBySession.get(event.sessionId) ?? 0; + if (event.eventSeq > current) { + lastEventSeqBySession.set(event.sessionId, event.eventSeq); + } + } + return { + ...event, + occurredAt: parseDate(event.occurredAt, 'event.occurredAt'), + ingestedAt: parseDate(event.ingestedAt, 'event.ingestedAt'), + payload: clonePayload(event.payload), + }; + }); + + return { + sessions: value.sessions.map(cloneSession), + events, + actions, + actionOrder, + nextActionOrder: actions.length, + pauseInterventions: (value.pauseInterventions ?? []).map((record: SerializedPauseIntervention) => ({ + ...record, + requestedAt: parseDate(record.requestedAt, 'pauseIntervention.requestedAt'), + updatedAt: parseDate(record.updatedAt, 'pauseIntervention.updatedAt'), + interventionStartedAt: parseOptionalDate(record.interventionStartedAt, 'pauseIntervention.interventionStartedAt'), + interventionCompletedAt: parseOptionalDate(record.interventionCompletedAt, 'pauseIntervention.interventionCompletedAt'), + resumedAt: parseOptionalDate(record.resumedAt, 'pauseIntervention.resumedAt'), + })), + lastEventSeqBySession, + }; +} + +export function isSerializedState(value: unknown): value is SerializedState { + if (!isRecord(value) || value.version !== 1) return false; + return ( + Array.isArray(value.sessions) && + Array.isArray(value.events) && + Array.isArray(value.actions) && + Array.isArray(value.pauseInterventions ?? []) + ); +} + +function parseDate(value: string, label: string): Date { + const date = new Date(value); + if (Number.isNaN(date.getTime())) { + throw new Error(`FileAcpLocalStorageProfile: ${label} must be a valid date string`); + } + return date; +} + +function parseOptionalDate(value: string | undefined, label: string): Date | undefined { + return value === undefined ? undefined : parseDate(value, label); +} + +/** + * Stable order for the action leaseNext tie-breaker: queue position first, + * then creation time, then insertion order. Pulled out of + * `MemoryAcpActionQueue` so persistence can reason about the same + * invariant when reconstructing state from disk. + */ +export function compareLeaseOrder(state: LocalState, left: AcpActionRecord, right: AcpActionRecord): number { + const availableDelta = left.availableAt.getTime() - right.availableAt.getTime(); + if (availableDelta !== 0) return availableDelta; + const createdDelta = left.createdAt.getTime() - right.createdAt.getTime(); + if (createdDelta !== 0) return createdDelta; + return (state.actionOrder.get(left.actionId) ?? 0) - (state.actionOrder.get(right.actionId) ?? 0); +} diff --git a/src/services/acp/local-storage/profiles.ts b/src/services/acp/local-storage/profiles.ts new file mode 100644 index 00000000..bad5d5df --- /dev/null +++ b/src/services/acp/local-storage/profiles.ts @@ -0,0 +1,257 @@ +import { mkdir, rename, writeFile } from 'node:fs/promises'; +import path from 'node:path'; + +import { logger } from '../../../logger.js'; +import type { ServiceHealth } from '../../../container.js'; +import { MemoryAcpSessionStore } from './memory-session-store.js'; +import { MemoryAcpEventStore } from './memory-event-store.js'; +import { MemoryAcpActionQueue } from './memory-action-queue.js'; +import { MemoryAcpPauseInterventionStore } from './memory-pause-intervention-store.js'; +import { createEmptyState, DEFAULT_MAX_EVENTS_PER_SESSION, DEFAULT_PERSIST_DEBOUNCE_MS, PRUNABLE_SESSION_STATUSES, noopMutationHook } from './types.js'; +import type { AcpLocalStorageProfile, FileAcpLocalStorageProfileConfig, LocalState } from './types.js'; +import { + loadState, + serializeStateLightweight, +} from './persistence.js'; +import type { AcpSessionStore } from '../types.js'; +import type { AcpEventStore } from '../event-store.js'; +import type { AcpActionQueue } from '../action-queue.js'; + +export function createMemoryAcpLocalStorageProfile(): AcpLocalStorageProfile { + return new MemoryAcpLocalStorageProfile(); +} + +export function createFileAcpLocalStorageProfile( + config: FileAcpLocalStorageProfileConfig +): AcpLocalStorageProfile { + return new FileAcpLocalStorageProfile(config); +} + +export class MemoryAcpLocalStorageProfile implements AcpLocalStorageProfile { + private readonly state: LocalState; + readonly sessionStore: AcpSessionStore; + readonly eventStore: AcpEventStore; + readonly actionQueue: AcpActionQueue; + readonly pauseInterventionStore: MemoryAcpPauseInterventionStore; + + constructor(state: LocalState = createEmptyState(), onMutation = noopMutationHook) { + this.state = state; + this.sessionStore = new MemoryAcpSessionStore(this.state, onMutation); + this.eventStore = new MemoryAcpEventStore(this.state, onMutation); + this.actionQueue = new MemoryAcpActionQueue(this.state, onMutation); + this.pauseInterventionStore = new MemoryAcpPauseInterventionStore(this.state, onMutation); + } + + async start(): Promise {} + + async stop(_signal?: AbortSignal): Promise {} + + async health(): Promise { + return { healthy: true, details: 'memory ACP local storage profile ok' }; + } + + getPersistError(): Error | null { + return null; + } +} + +/** + * File-backed ACP local storage profile. + * + * Issue #4032: Hardened against OOM via: + * - Event compaction: max events per session, prunable terminal sessions + * - Debounced persistence: coalesces rapid mutations into single disk writes + * - Incremental event seq tracking: O(1) instead of O(n) per append + * - Lightweight serialization: skips structuredClone on persist path + */ +export class FileAcpLocalStorageProfile implements AcpLocalStorageProfile { + private state: LocalState = createEmptyState(); + private started = false; + private writeChain: Promise = Promise.resolve(); + private persistError: Error | null = null; + /** Issue #4032: Dirty flag — set when state changes, cleared on persist. */ + private dirty = false; + /** Issue #4032: Debounce timer for persist. */ + private persistTimer: ReturnType | null = null; + /** Issue #4032: Resolvers for pending persist promises. */ + private pendingPersistResolvers: Array<() => void> = []; + private readonly maxEventsPerSession: number; + private readonly persistDebounceMs: number; + private readonly memorySessionStore: MemoryAcpSessionStore; + private readonly memoryEventStore: MemoryAcpEventStore; + private readonly memoryActionQueue: MemoryAcpActionQueue; + private readonly memoryPauseInterventionStore: MemoryAcpPauseInterventionStore; + + readonly sessionStore: AcpSessionStore; + readonly eventStore: AcpEventStore; + readonly actionQueue: AcpActionQueue; + readonly pauseInterventionStore: MemoryAcpPauseInterventionStore; + + constructor(private readonly config: FileAcpLocalStorageProfileConfig) { + this.maxEventsPerSession = config.maxEventsPerSession ?? DEFAULT_MAX_EVENTS_PER_SESSION; + this.persistDebounceMs = config.persistDebounceMs ?? DEFAULT_PERSIST_DEBOUNCE_MS; + const schedulePersist = (): Promise => this.schedulePersist(); + this.memorySessionStore = new MemoryAcpSessionStore(this.state, schedulePersist); + this.memoryEventStore = new MemoryAcpEventStore(this.state, schedulePersist, this.maxEventsPerSession); + this.memoryActionQueue = new MemoryAcpActionQueue(this.state, schedulePersist); + this.memoryPauseInterventionStore = new MemoryAcpPauseInterventionStore(this.state, schedulePersist); + this.sessionStore = this.memorySessionStore; + this.eventStore = this.memoryEventStore; + this.actionQueue = this.memoryActionQueue; + this.pauseInterventionStore = this.memoryPauseInterventionStore; + } + + async start(): Promise { + if (this.started) return; + await mkdir(path.dirname(this.config.filePath), { recursive: true }); + this.state = await loadState(this.config.filePath); + // Issue #4032: Prune events for terminal sessions on startup. + this.pruneCompletedSessionEvents(); + this.memorySessionStore.replaceState(this.state); + this.memoryEventStore.replaceState(this.state); + this.memoryActionQueue.replaceState(this.state); + this.memoryPauseInterventionStore.replaceState(this.state); + this.started = true; + this.dirty = true; + // Issue #4032: Initial persist after load (which may have pruned events). + await this.flush(); + } + + async stop(_signal?: AbortSignal): Promise { + if (!this.started) return; + // Issue #4032: Flush any pending dirty state before shutdown. + if (this.persistTimer !== null) { + clearTimeout(this.persistTimer); + this.persistTimer = null; + } + await this.flush().catch(() => {}); + // Best-effort final write chain — swallow errors so shutdown completes + await this.writeChain.catch(() => {}); + this.started = false; + } + + async health(): Promise { + if (!this.started) { + return { healthy: false, details: 'file ACP local storage profile not started' }; + } + if (this.persistError !== null) { + return { healthy: false, details: `persist error: ${this.persistError.message}` }; + } + return { healthy: true, details: 'file ACP local storage profile ok' }; + } + + getPersistError(): Error | null { + return this.persistError; + } + + /** + * Issue #4032: Schedules a debounced persist. The returned promise resolves + * after the next successful persist cycle. + */ + private schedulePersist(): Promise { + this.dirty = true; + if (this.persistTimer !== null) { + clearTimeout(this.persistTimer); + } + return new Promise(resolve => { + this.pendingPersistResolvers.push(resolve); + this.persistTimer = setTimeout(() => { + this.persistTimer = null; + void this.persistNow(); + }, this.persistDebounceMs); + }); + } + + /** + * Force a flush of any pending writes. Used at start/stop boundaries. + */ + private async flush(): Promise { + if (this.persistTimer !== null) { + clearTimeout(this.persistTimer); + this.persistTimer = null; + } + if (this.dirty) { + await this.persistNow(); + } else { + this.drainPendingResolvers(); + } + } + + /** + * Issue #4032: Coalesced persist — writes the lightweight serialized state + * to disk via atomic rename and resolves all pending waiters. + */ + private async persistNow(): Promise { + const serialized = serializeStateLightweight(this.state); + const tmpPath = `${this.config.filePath}.tmp.${process.pid}`; + + const write = async (): Promise => { + await writeFile(tmpPath, JSON.stringify(serialized)); + await rename(tmpPath, this.config.filePath); + }; + + // Chain writes so concurrent persists serialize. If a previous write + // failed, still attempt this write — but ensure failures do not + // poison subsequent writes (reset writeChain on failure). + const prevChain = this.writeChain; + this.writeChain = prevChain.then(write, write) + .then(() => { + // Success: clear any previous persist error + this.persistError = null; + }) + .catch((err) => { + // Record the error for health checks but DO NOT re-throw: callers + // (schedulePersist/flush/stop) expect persist failures to be + // recorded and not cause unhandled rejections. + this.persistError = err instanceof Error ? err : new Error(String(err)); + logger.error({ + "component": 'acp-local-storage', + "operation": 'persistNow', + attributes: { filePath: this.config.filePath, error: this.persistError.message }, + }); + // Clean up stale tmp file if present; best-effort. + import('node:fs/promises').then(fs => fs.unlink(tmpPath).catch(() => {})).catch(() => {}); + // Reset chain so next persist() is not chained to a rejected promise + this.writeChain = Promise.resolve(); + }); + + // Await the chain so callers waiting for persist completion are + // notified, but do not re-throw errors (they are surfaced via + // getPersistError/health()). + try { + await this.writeChain; + this.dirty = false; + } finally { + this.drainPendingResolvers(); + } + } + private drainPendingResolvers(): void { + const resolvers = this.pendingPersistResolvers; + this.pendingPersistResolvers = []; + for (const resolve of resolvers) resolve(); + } + + /** + * Issue #4032: At startup, drop events belonging to terminal sessions so + * the file does not grow unbounded across restarts. + */ + private pruneCompletedSessionEvents(): void { + const prunableSessionIds = new Set(); + for (const session of this.state.sessions) { + if (PRUNABLE_SESSION_STATUSES.has(session.status)) { + prunableSessionIds.add(session.id); + } + } + if (prunableSessionIds.size === 0) return; + const before = this.state.events.length; + this.state.events = this.state.events.filter(event => !prunableSessionIds.has(event.sessionId)); + const pruned = before - this.state.events.length; + if (pruned > 0) { + logger.info({ + component: 'acp-local-storage', + operation: 'pruneCompletedSessionEvents', + attributes: { prunedEventCount: pruned, prunedSessionCount: prunableSessionIds.size }, + }); + } + } +} diff --git a/src/services/acp/local-storage/types.ts b/src/services/acp/local-storage/types.ts new file mode 100644 index 00000000..51922bbd --- /dev/null +++ b/src/services/acp/local-storage/types.ts @@ -0,0 +1,89 @@ +import type { AcpSessionRecord, AcpSessionStore } from '../types.js'; +import type { AcpEventRecord, AcpEventStore } from '../event-store.js'; +import type { AcpActionQueue } from '../action-queue.js'; +import type { AcpPauseInterventionStore } from '../pause-intervention.js'; + +/** + * Issue #4032: Configuration for file-backed ACP local storage. + */ +export interface FileAcpLocalStorageProfileConfig { + filePath: string; + maxEventsPerSession?: number; + persistDebounceMs?: number; +} + +/** + * Issue #4032: Terminal session statuses whose events can be pruned. + */ +export const PRUNABLE_SESSION_STATUSES: ReadonlySet = new Set([ + 'closed', + 'completed', + 'failed', +]); + +/** + * The shared in-memory state shape used by the profile, file persistence, + * and every Memory* store. Centralised so persistence (de)serialisation and + * runtime mutation paths stay in sync. + */ +export interface LocalState { + sessions: AcpSessionRecord[]; + events: AcpEventRecord[]; + actions: import('../action-queue.js').AcpActionRecord[]; + actionOrder: Map; + nextActionOrder: number; + pauseInterventions: import('../pause-intervention.js').AcpPauseInterventionRecord[]; + lastEventSeqBySession: Map; +} + +/** + * Issue #4032: Hook fired after every mutating store operation. Profiles use + * this to schedule persistence; the noop default keeps pure in-memory users + * free of side effects. + */ +export type MutationHook = () => Promise; + +export const noopMutationHook: MutationHook = async () => {}; + +export const DEFAULT_LIST_LIMIT = 100; +export const MAX_LIST_LIMIT = 1_000; + +/** Issue #4032: Default max events per session before pruning kicks in. */ +export const DEFAULT_MAX_EVENTS_PER_SESSION = 1_000; + +/** Issue #4032: Default debounce interval for persist (ms). */ +export const DEFAULT_PERSIST_DEBOUNCE_MS = 3_000; + +/** Factory for an empty `LocalState`. Lives with the type so the four + * Memory* stores and the file-backed profile share one source of truth. */ +export function createEmptyState(): LocalState { + return { + sessions: [], + events: [], + actions: [], + actionOrder: new Map(), + nextActionOrder: 0, + pauseInterventions: [], + lastEventSeqBySession: new Map(), + }; +} + +/** + * Public surface of an ACP local storage profile: bundles the four stores + * behind lifecycle and observability hooks. Implementations may be + * in-memory (testing, ephemeral) or file-backed (durable). + */ +export interface AcpLocalStorageProfile { + readonly sessionStore: AcpSessionStore; + readonly eventStore: AcpEventStore; + readonly actionQueue: AcpActionQueue; + readonly pauseInterventionStore: AcpPauseInterventionStore; + start(): Promise; + stop(signal?: AbortSignal): Promise; + health(): Promise; + getPersistError(): Error | null; +} +/** Object-guard used by clone + persistence for safe property access. */ +export function isRecord(value: unknown): value is Record { + return typeof value === 'object' && value !== null && !Array.isArray(value); +} diff --git a/src/services/acp/local-storage/validation.ts b/src/services/acp/local-storage/validation.ts new file mode 100644 index 00000000..b7daa4f9 --- /dev/null +++ b/src/services/acp/local-storage/validation.ts @@ -0,0 +1,89 @@ +import { AcpValidationError } from '../errors.js'; +import { clonePayload } from './clone.js'; +import type { AcpAppendEventInput, AcpListEventsInput } from '../event-store.js'; +import type { AcpSessionScope } from '../types.js'; +import { + DEFAULT_LIST_LIMIT, + MAX_LIST_LIMIT, +} from './types.js'; + +export function isNodeError(error: unknown): error is NodeJS.ErrnoException { + return error instanceof Error && 'code' in error; +} + +export function validateAppendInput(input: AcpAppendEventInput): AcpAppendEventInput & { occurredAt: Date } { + return { + sessionId: requireNonEmptyString(input.sessionId, 'sessionId'), + tenantId: requireNonEmptyString(input.tenantId, 'tenantId'), + ownerKeyId: requireNonEmptyString(input.ownerKeyId, 'ownerKeyId'), + backendRunId: requireOptionalNonEmptyString(input.backendRunId, 'backendRunId'), + eventType: requireNonEmptyString(input.eventType, 'eventType'), + occurredAt: resolveOccurredAt(input.occurredAt), + payload: clonePayload(input.payload), + payloadRef: requireOptionalNonEmptyString(input.payloadRef, 'payloadRef'), + }; +} + +export function validateListInput(input: AcpListEventsInput): void { + requireNonEmptyString(input.sessionId, 'sessionId'); + requireNonEmptyString(input.tenantId, 'tenantId'); + requireNonEmptyString(input.ownerKeyId, 'ownerKeyId'); +} + +export function resolveOccurredAt(value: Date | undefined): Date { + if (value === undefined) return new Date(); + assertValidDate(value, 'event occurredAt'); + return new Date(value.getTime()); +} + +export function resolveAfterEventSeq(value: number | undefined): number { + if (value === undefined) return 0; + if (!Number.isSafeInteger(value) || value < 0) { + throw new AcpValidationError('ACP afterEventSeq must be a non-negative safe integer'); + } + return value; +} + +export function resolveSessionListLimit(value: number | undefined): number { + if (value === undefined) return DEFAULT_LIST_LIMIT; + if (!Number.isSafeInteger(value) || value <= 0 || value > MAX_LIST_LIMIT) { + throw new AcpValidationError(`ACP session list limit must be between 1 and ${MAX_LIST_LIMIT}`); + } + return value; +} + +export function resolveLimit(value: number | undefined): number { + if (value === undefined) return DEFAULT_LIST_LIMIT; + if (!Number.isSafeInteger(value) || value <= 0 || value > MAX_LIST_LIMIT) { + throw new AcpValidationError(`ACP event replay limit must be between 1 and ${MAX_LIST_LIMIT}`); + } + return value; +} + +export function validateActionIdAndScope(actionId: string, scope: AcpSessionScope): void { + requireNonEmptyString(actionId, 'action id'); + validateScope(scope); +} + +export function validateScope(scope: AcpSessionScope): void { + requireNonEmptyString(scope.tenantId, 'tenant id'); + requireNonEmptyString(scope.ownerKeyId, 'owner key id'); +} + +export function requireNonEmptyString(value: unknown, label: string): string { + if (typeof value !== 'string' || value.trim() === '') { + throw new AcpValidationError(`ACP ${label} must be a non-empty string`); + } + return value; +} + +export function requireOptionalNonEmptyString(value: unknown, label: string): string | undefined { + if (value === undefined) return undefined; + return requireNonEmptyString(value, label); +} + +export function assertValidDate(value: Date, label: string): void { + if (!(value instanceof Date) || Number.isNaN(value.getTime())) { + throw new AcpValidationError(`ACP ${label} must be a valid Date`); + } +}