From b0439995f2ce3e52d86c40c75bd9eca4fca1fafc Mon Sep 17 00:00:00 2001 From: oaslananka <285490571+oaslananka@users.noreply.github.com> Date: Tue, 30 Jun 2026 02:57:24 +0300 Subject: [PATCH] Serialize memory graph writes --- src/memory/__tests__/knowledge-graph.test.ts | 14 ++++ src/memory/index.ts | 88 +++++++++++--------- 2 files changed, 64 insertions(+), 38 deletions(-) diff --git a/src/memory/__tests__/knowledge-graph.test.ts b/src/memory/__tests__/knowledge-graph.test.ts index 236242413a..852a5254b4 100644 --- a/src/memory/__tests__/knowledge-graph.test.ts +++ b/src/memory/__tests__/knowledge-graph.test.ts @@ -59,6 +59,20 @@ describe('KnowledgeGraphManager', () => { const newEntities = await manager.createEntities([]); expect(newEntities).toHaveLength(0); }); + it('should preserve concurrent entity writes', async () => { + const entities = Array.from({ length: 20 }, (_, index) => ({ + name: `Entity ${index}`, + entityType: 'test', + observations: [], + })); + + await Promise.all(entities.map(entity => manager.createEntities([entity]))); + + const graph = await manager.readGraph(); + expect(graph.entities).toHaveLength(20); + expect(new Set(graph.entities.map(entity => entity.name)).size).toBe(20); + }); + }); describe('createRelations', () => { diff --git a/src/memory/index.ts b/src/memory/index.ts index 9865c5318e..c75e0b956a 100644 --- a/src/memory/index.ts +++ b/src/memory/index.ts @@ -67,6 +67,8 @@ export interface KnowledgeGraph { // The KnowledgeGraphManager class contains all operations to interact with the knowledge graph export class KnowledgeGraphManager { + private writeQueue: Promise = Promise.resolve(); + constructor(private memoryFilePath: string) {} private async loadGraph(): Promise { @@ -114,32 +116,44 @@ export class KnowledgeGraphManager { relationType: r.relationType })), ]; - await fs.writeFile(this.memoryFilePath, lines.join("\n")); + const tempPath = `${this.memoryFilePath}.${process.pid}.${Date.now()}.${Math.random().toString(36).slice(2)}.tmp`; + await fs.writeFile(tempPath, lines.join("\n")); + await fs.rename(tempPath, this.memoryFilePath); + } + + private async updateGraph(operation: (graph: KnowledgeGraph) => T | Promise): Promise { + const run = this.writeQueue.then(async () => { + const graph = await this.loadGraph(); + const result = await operation(graph); + await this.saveGraph(graph); + return result; + }); + this.writeQueue = run.then(() => undefined, () => undefined); + return run; } async createEntities(entities: Entity[]): Promise { - const graph = await this.loadGraph(); - const newEntities = entities.filter(e => !graph.entities.some(existingEntity => existingEntity.name === e.name)); - graph.entities.push(...newEntities); - await this.saveGraph(graph); - return newEntities; + return this.updateGraph(graph => { + const newEntities = entities.filter(e => !graph.entities.some(existingEntity => existingEntity.name === e.name)); + graph.entities.push(...newEntities); + return newEntities; + }); } async createRelations(relations: Relation[]): Promise { - const graph = await this.loadGraph(); - const newRelations = relations.filter(r => !graph.relations.some(existingRelation => - existingRelation.from === r.from && - existingRelation.to === r.to && - existingRelation.relationType === r.relationType - )); - graph.relations.push(...newRelations); - await this.saveGraph(graph); - return newRelations; + return this.updateGraph(graph => { + const newRelations = relations.filter(r => !graph.relations.some(existingRelation => + existingRelation.from === r.from && + existingRelation.to === r.to && + existingRelation.relationType === r.relationType + )); + graph.relations.push(...newRelations); + return newRelations; + }); } async addObservations(observations: { entityName: string; contents: string[] }[]): Promise<{ entityName: string; addedObservations: string[] }[]> { - const graph = await this.loadGraph(); - const results = observations.map(o => { + return this.updateGraph(graph => observations.map(o => { const entity = graph.entities.find(e => e.name === o.entityName); if (!entity) { throw new Error(`Entity with name ${o.entityName} not found`); @@ -147,37 +161,35 @@ export class KnowledgeGraphManager { const newObservations = o.contents.filter(content => !entity.observations.includes(content)); entity.observations.push(...newObservations); return { entityName: o.entityName, addedObservations: newObservations }; - }); - await this.saveGraph(graph); - return results; + })); } async deleteEntities(entityNames: string[]): Promise { - const graph = await this.loadGraph(); - graph.entities = graph.entities.filter(e => !entityNames.includes(e.name)); - graph.relations = graph.relations.filter(r => !entityNames.includes(r.from) && !entityNames.includes(r.to)); - await this.saveGraph(graph); + await this.updateGraph(graph => { + graph.entities = graph.entities.filter(e => !entityNames.includes(e.name)); + graph.relations = graph.relations.filter(r => !entityNames.includes(r.from) && !entityNames.includes(r.to)); + }); } async deleteObservations(deletions: { entityName: string; observations: string[] }[]): Promise { - const graph = await this.loadGraph(); - deletions.forEach(d => { - const entity = graph.entities.find(e => e.name === d.entityName); - if (entity) { - entity.observations = entity.observations.filter(o => !d.observations.includes(o)); - } + await this.updateGraph(graph => { + deletions.forEach(d => { + const entity = graph.entities.find(e => e.name === d.entityName); + if (entity) { + entity.observations = entity.observations.filter(o => !d.observations.includes(o)); + } + }); }); - await this.saveGraph(graph); } async deleteRelations(relations: Relation[]): Promise { - const graph = await this.loadGraph(); - graph.relations = graph.relations.filter(r => !relations.some(delRelation => - r.from === delRelation.from && - r.to === delRelation.to && - r.relationType === delRelation.relationType - )); - await this.saveGraph(graph); + await this.updateGraph(graph => { + graph.relations = graph.relations.filter(r => !relations.some(delRelation => + r.from === delRelation.from && + r.to === delRelation.to && + r.relationType === delRelation.relationType + )); + }); } async readGraph(): Promise {