Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/memory/__tests__/knowledge-graph.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ describe('KnowledgeGraphManager', () => {
const newEntities = await manager.createEntities([]);
expect(newEntities).toHaveLength(0);
});
it('should preserve concurrent entity writes', async () => {
const entities = Array.from({ length: 20 }, (_, index) => ({
name: `Entity ${index}`,
entityType: 'test',
observations: [],
}));

await Promise.all(entities.map(entity => manager.createEntities([entity])));

const graph = await manager.readGraph();
expect(graph.entities).toHaveLength(20);
expect(new Set(graph.entities.map(entity => entity.name)).size).toBe(20);
});

});

describe('createRelations', () => {
Expand Down
88 changes: 50 additions & 38 deletions src/memory/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ export interface KnowledgeGraph {

// The KnowledgeGraphManager class contains all operations to interact with the knowledge graph
export class KnowledgeGraphManager {
private writeQueue: Promise<void> = Promise.resolve();

constructor(private memoryFilePath: string) {}

private async loadGraph(): Promise<KnowledgeGraph> {
Expand Down Expand Up @@ -114,70 +116,80 @@ export class KnowledgeGraphManager {
relationType: r.relationType
})),
];
await fs.writeFile(this.memoryFilePath, lines.join("\n"));
const tempPath = `${this.memoryFilePath}.${process.pid}.${Date.now()}.${Math.random().toString(36).slice(2)}.tmp`;
await fs.writeFile(tempPath, lines.join("\n"));
await fs.rename(tempPath, this.memoryFilePath);
}

private async updateGraph<T>(operation: (graph: KnowledgeGraph) => T | Promise<T>): Promise<T> {
const run = this.writeQueue.then(async () => {
const graph = await this.loadGraph();
const result = await operation(graph);
await this.saveGraph(graph);
return result;
});
this.writeQueue = run.then(() => undefined, () => undefined);
return run;
}

async createEntities(entities: Entity[]): Promise<Entity[]> {
const graph = await this.loadGraph();
const newEntities = entities.filter(e => !graph.entities.some(existingEntity => existingEntity.name === e.name));
graph.entities.push(...newEntities);
await this.saveGraph(graph);
return newEntities;
return this.updateGraph(graph => {
const newEntities = entities.filter(e => !graph.entities.some(existingEntity => existingEntity.name === e.name));
graph.entities.push(...newEntities);
return newEntities;
});
}

async createRelations(relations: Relation[]): Promise<Relation[]> {
const graph = await this.loadGraph();
const newRelations = relations.filter(r => !graph.relations.some(existingRelation =>
existingRelation.from === r.from &&
existingRelation.to === r.to &&
existingRelation.relationType === r.relationType
));
graph.relations.push(...newRelations);
await this.saveGraph(graph);
return newRelations;
return this.updateGraph(graph => {
const newRelations = relations.filter(r => !graph.relations.some(existingRelation =>
existingRelation.from === r.from &&
existingRelation.to === r.to &&
existingRelation.relationType === r.relationType
));
graph.relations.push(...newRelations);
return newRelations;
});
}

async addObservations(observations: { entityName: string; contents: string[] }[]): Promise<{ entityName: string; addedObservations: string[] }[]> {
const graph = await this.loadGraph();
const results = observations.map(o => {
return this.updateGraph(graph => observations.map(o => {
const entity = graph.entities.find(e => e.name === o.entityName);
if (!entity) {
throw new Error(`Entity with name ${o.entityName} not found`);
}
const newObservations = o.contents.filter(content => !entity.observations.includes(content));
entity.observations.push(...newObservations);
return { entityName: o.entityName, addedObservations: newObservations };
});
await this.saveGraph(graph);
return results;
}));
}

async deleteEntities(entityNames: string[]): Promise<void> {
const graph = await this.loadGraph();
graph.entities = graph.entities.filter(e => !entityNames.includes(e.name));
graph.relations = graph.relations.filter(r => !entityNames.includes(r.from) && !entityNames.includes(r.to));
await this.saveGraph(graph);
await this.updateGraph(graph => {
graph.entities = graph.entities.filter(e => !entityNames.includes(e.name));
graph.relations = graph.relations.filter(r => !entityNames.includes(r.from) && !entityNames.includes(r.to));
});
}

async deleteObservations(deletions: { entityName: string; observations: string[] }[]): Promise<void> {
const graph = await this.loadGraph();
deletions.forEach(d => {
const entity = graph.entities.find(e => e.name === d.entityName);
if (entity) {
entity.observations = entity.observations.filter(o => !d.observations.includes(o));
}
await this.updateGraph(graph => {
deletions.forEach(d => {
const entity = graph.entities.find(e => e.name === d.entityName);
if (entity) {
entity.observations = entity.observations.filter(o => !d.observations.includes(o));
}
});
});
await this.saveGraph(graph);
}

async deleteRelations(relations: Relation[]): Promise<void> {
const graph = await this.loadGraph();
graph.relations = graph.relations.filter(r => !relations.some(delRelation =>
r.from === delRelation.from &&
r.to === delRelation.to &&
r.relationType === delRelation.relationType
));
await this.saveGraph(graph);
await this.updateGraph(graph => {
graph.relations = graph.relations.filter(r => !relations.some(delRelation =>
r.from === delRelation.from &&
r.to === delRelation.to &&
r.relationType === delRelation.relationType
));
});
}

async readGraph(): Promise<KnowledgeGraph> {
Expand Down
Loading