Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/session-fs-watchers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@moonshot-ai/kimi-code": patch
---

Release session filesystem watchers when sessions close.
20 changes: 19 additions & 1 deletion packages/agent-core/src/services/fs/fsWatcherService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ export class FsWatcherService extends Disposable implements IFsWatcher {
private readonly lookup: FsWatcherConnectionLookup,
options: FsWatcherServiceOptions,
@ILogService private readonly logger: ILogService,
@ISessionService _sessionService: ISessionService,
@ISessionService sessionService: ISessionService,
) {
super();
this.sessions = this._register(new DisposableMap<string, SessionEntry>());
Expand All @@ -126,6 +126,11 @@ export class FsWatcherService extends Disposable implements IFsWatcher {
persistent: false,
ignored: (p: string) => /(?:^|[/\\])\.git(?:$|[/\\])/.test(p),
}));
this._register(
sessionService.onDidClose(({ sessionId }) => {
this.disposeSessionEntry(sessionId);
}),
);
}

addPaths(
Expand Down Expand Up @@ -257,6 +262,19 @@ export class FsWatcherService extends Disposable implements IFsWatcher {
}
}

private disposeSessionEntry(sessionId: string): void {
const entry = this.sessions.get(sessionId);
if (!entry) return;
for (const connectionId of entry.connectionPathRefs.keys()) {
const connSessions = this.connections.get(connectionId);
connSessions?.delete(sessionId);
if (connSessions !== undefined && connSessions.size === 0) {
this.connections.delete(connectionId);
}
}
this.sessions.deleteAndDispose(sessionId);
}

private getOrCreateConnection(
connectionId: string,
): Map<string, Map<string, IReference<string>>> {
Expand Down
52 changes: 49 additions & 3 deletions packages/server/test/services.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { join } from 'node:path';

import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';

import { InstantiationService, ServiceCollection, EventService, FsWatcherService, IApprovalService, IEventService, ILogService, IQuestionService, type ApprovalResponse, type QuestionResult, type FsWatcherServiceOptions, type IEnvironmentService, type ILogService as ILoggerT, type ISessionService } from '@moonshot-ai/agent-core';
import { Emitter, InstantiationService, ServiceCollection, EventService, FsWatcherService, IApprovalService, IEventService, ILogService, IQuestionService, type ApprovalResponse, type QuestionResult, type FsWatcherServiceOptions, type IEnvironmentService, type ILogService as ILoggerT, type ISessionService } from '@moonshot-ai/agent-core';
import type { Event } from '@moonshot-ai/protocol';

import { ApprovalService } from '#/services/approval/approvalService';
Expand Down Expand Up @@ -135,6 +135,29 @@ class FakeWatcher {

type TestFsWatcher = ReturnType<NonNullable<FsWatcherServiceOptions['watcherFactory']>>;

function makeSessionService(
closeEmitter = new Emitter<{ sessionId: string }>(),
): ISessionService {
const createEmitter = new Emitter<never>();
return {
_serviceBrand: undefined,
create: async () => { throw new Error('not implemented'); },
list: async () => ({ items: [], has_more: false }),
get: async () => { throw new Error('not implemented'); },
update: async () => { throw new Error('not implemented'); },
fork: async () => { throw new Error('not implemented'); },
listChildren: async () => ({ items: [], has_more: false }),
createChild: async () => { throw new Error('not implemented'); },
getStatus: async () => { throw new Error('not implemented'); },
getSessionWarnings: async () => [],
compact: async () => { throw new Error('not implemented'); },
undo: async () => { throw new Error('not implemented'); },
archive: async () => { throw new Error('not implemented'); },
onDidCreate: createEmitter.event,
onDidClose: closeEmitter.event,
};
}

let ix: InstantiationService;
let testLogger: TestLogger;

Expand Down Expand Up @@ -463,7 +486,7 @@ describe('FsWatcherService', () => {
{ resolve: () => undefined },
{ watcherFactory: () => watcher as unknown as TestFsWatcher },
testLogger,
{} as ISessionService,
makeSessionService(),
);
const path = '/workspace/src';

Expand Down Expand Up @@ -496,7 +519,7 @@ describe('FsWatcherService', () => {
{ resolve: () => undefined },
{ watcherFactory: () => watcher as unknown as TestFsWatcher },
testLogger,
{} as ISessionService,
makeSessionService(),
);
const paths = ['/workspace/src', '/workspace/docs', '/workspace/notes'];
watcher.unwatchErrors.set(paths[0]!, new Error('unwatch-src'));
Expand All @@ -517,6 +540,29 @@ describe('FsWatcherService', () => {
expect(watcher.closeCalls).toBe(0);
service.dispose();
});

it('releases a session watcher when the session closes', () => {
const watcher = new FakeWatcher();
const closeEmitter = new Emitter<{ sessionId: string }>();
const service = new FsWatcherService(
{ resolve: () => undefined },
{ watcherFactory: () => watcher as unknown as TestFsWatcher },
testLogger,
makeSessionService(closeEmitter),
);
const path = '/workspace/src';

service.addPaths('sid', 'conn-a', [path]);
service.addPaths('sid', 'conn-b', [path]);
closeEmitter.fire({ sessionId: 'sid' });

expect(watcher.closeCalls).toBe(1);
expect(service.watchedPaths('conn-a', 'sid')).toEqual([]);
expect(service.watchedPaths('conn-b', 'sid')).toEqual([]);
expect(service.countForConnection('conn-a')).toBe(0);
expect(service.countForConnection('conn-b')).toBe(0);
service.dispose();
});
});

describe('ApprovalService (broadcasts + resolve-by-approval_id)', () => {
Expand Down