Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20260220143557050413.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "finalize_graph streaming"
}
76 changes: 49 additions & 27 deletions packages/graphrag/graphrag/index/operations/finalize_entities.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,56 @@
# Copyright (c) 2024 Microsoft Corporation.
# Copyright (C) 2026 Microsoft
# Licensed under the MIT License

"""All the steps to transform final entities."""
"""Stream-finalize entity rows into an output Table."""

from typing import Any
from uuid import uuid4

import pandas as pd
from graphrag_storage.tables.table import Table

from graphrag.data_model.schemas import ENTITIES_FINAL_COLUMNS
from graphrag.graphs.compute_degree import compute_degree


def finalize_entities(
entities: pd.DataFrame,
relationships: pd.DataFrame,
) -> pd.DataFrame:
"""All the steps to transform final entities."""
degrees = compute_degree(relationships)
final_entities = entities.merge(degrees, on="title", how="left").drop_duplicates(
subset="title"
)
final_entities = final_entities.loc[entities["title"].notna()].reset_index()
# disconnected nodes and those with no community even at level 0 can be missing degree
final_entities["degree"] = final_entities["degree"].fillna(0).astype(int)
final_entities.reset_index(inplace=True)
final_entities["human_readable_id"] = final_entities.index
final_entities["id"] = final_entities["human_readable_id"].apply(
lambda _x: str(uuid4())
)
return final_entities.loc[
:,
ENTITIES_FINAL_COLUMNS,
]


async def finalize_entities(
entities_table: Table,
degree_map: dict[str, int],
) -> list[dict[str, Any]]:
"""Read entity rows, enrich with degree, and write back.

Streams through the entities table, deduplicates by title,
assigns degree from the pre-computed degree map, and writes
each finalized row back to the same table (safe when using
truncate=True, which reads from the original and writes to
a temp file).

Args
----
entities_table: Table
Opened table for both reading input and writing output.
degree_map: dict[str, int]
Pre-computed mapping of entity title to node degree.

Returns
-------
list[dict[str, Any]]
Sample of up to 5 entity rows for logging.
"""
sample_rows: list[dict[str, Any]] = []
seen_titles: set[str] = set()
human_readable_id = 0

async for row in entities_table:
title = row.get("title")
if not title or title in seen_titles:
continue
seen_titles.add(title)
row["degree"] = degree_map.get(title, 0)
row["human_readable_id"] = human_readable_id
row["id"] = str(uuid4())
human_readable_id += 1
out = {col: row.get(col) for col in ENTITIES_FINAL_COLUMNS}
await entities_table.write(out)
if len(sample_rows) < 5:
sample_rows.append(out)

return sample_rows
Original file line number Diff line number Diff line change
@@ -1,42 +1,55 @@
# Copyright (c) 2024 Microsoft Corporation.
# Copyright (C) 2026 Microsoft
# Licensed under the MIT License

"""All the steps to transform final relationships."""
"""Stream-finalize relationship rows into an output Table."""

from typing import Any
from uuid import uuid4

import pandas as pd
from graphrag_storage.tables.table import Table

from graphrag.data_model.schemas import RELATIONSHIPS_FINAL_COLUMNS
from graphrag.graphs.compute_degree import compute_degree
from graphrag.index.operations.compute_edge_combined_degree import (
compute_edge_combined_degree,
)


def finalize_relationships(
relationships: pd.DataFrame,
) -> pd.DataFrame:
"""All the steps to transform final relationships."""
degrees = compute_degree(relationships)

final_relationships = relationships.drop_duplicates(subset=["source", "target"])
final_relationships["combined_degree"] = compute_edge_combined_degree(
final_relationships,
degrees,
node_name_column="title",
node_degree_column="degree",
edge_source_column="source",
edge_target_column="target",
)

final_relationships.reset_index(inplace=True)
final_relationships["human_readable_id"] = final_relationships.index
final_relationships["id"] = final_relationships["human_readable_id"].apply(
lambda _x: str(uuid4())
)

return final_relationships.loc[
:,
RELATIONSHIPS_FINAL_COLUMNS,
]


async def finalize_relationships(
relationships_table: Table,
degree_map: dict[str, int],
) -> list[dict[str, Any]]:
"""Deduplicate relationships, enrich with combined degree, and write.

Streams through the relationships table, deduplicates by
(source, target) pair, computes combined_degree as the sum of
source and target node degrees, and writes each finalized row
back to the table.

Args
----
relationships_table: Table
Opened table for reading and writing relationship rows.
degree_map: dict[str, int]
Pre-computed mapping of entity title to node degree.

Returns
-------
list[dict[str, Any]]
Sample of up to 5 relationship rows for logging.
"""
sample_rows: list[dict[str, Any]] = []
seen: set[tuple[str, str]] = set()
human_readable_id = 0

async for row in relationships_table:
key = (row.get("source", ""), row.get("target", ""))
if key in seen:
continue
seen.add(key)
row["combined_degree"] = degree_map.get(key[0], 0) + degree_map.get(key[1], 0)
row["human_readable_id"] = human_readable_id
row["id"] = str(uuid4())
human_readable_id += 1
final = {col: row.get(col) for col in RELATIONSHIPS_FINAL_COLUMNS}
await relationships_table.write(final)
if len(sample_rows) < 5:
sample_rows.append(final)

return sample_rows
127 changes: 94 additions & 33 deletions packages/graphrag/graphrag/index/workflows/finalize_graph.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
# Copyright (c) 2024 Microsoft Corporation.
# Copyright (C) 2026 Microsoft
# Licensed under the MIT License

"""A module containing run_workflow method definition."""

import logging
from collections import Counter
from typing import Any

import pandas as pd
from graphrag_storage.tables.table import Table

from graphrag.config.models.graph_rag_config import GraphRagConfig
from graphrag.data_model.data_reader import DataReader
from graphrag.data_model.row_transformers import (
transform_entity_row,
transform_relationship_row,
)
from graphrag.index.operations.finalize_entities import finalize_entities
from graphrag.index.operations.finalize_relationships import finalize_relationships
from graphrag.index.operations.finalize_relationships import (
finalize_relationships,
)
from graphrag.index.operations.snapshot_graphml import snapshot_graphml
from graphrag.index.typing.context import PipelineRunContext
from graphrag.index.typing.workflow import WorkflowFunctionOutput
Expand All @@ -24,41 +31,95 @@ async def run_workflow(
) -> WorkflowFunctionOutput:
"""All the steps to create the base entity graph."""
logger.info("Workflow started: finalize_graph")
reader = DataReader(context.output_table_provider)
entities = await reader.entities()
relationships = await reader.relationships()

final_entities, final_relationships = finalize_graph(
entities,
relationships,
)

await context.output_table_provider.write_dataframe("entities", final_entities)
await context.output_table_provider.write_dataframe(
"relationships", final_relationships
)
async with (
context.output_table_provider.open(
"entities",
transformer=transform_entity_row,
) as entities_table,
context.output_table_provider.open(
"relationships",
transformer=transform_relationship_row,
) as relationships_table,
):
result = await finalize_graph(
entities_table,
relationships_table,
)

if config.snapshots.graphml:
rels = await context.output_table_provider.read_dataframe("relationships")
Comment thread
dayesouza marked this conversation as resolved.
await snapshot_graphml(
final_relationships,
rels,
name="graph",
storage=context.output_storage,
)

logger.info("Workflow completed: finalize_graph")
return WorkflowFunctionOutput(
result={
"entities": entities,
"relationships": relationships,
}
)


def finalize_graph(
entities: pd.DataFrame,
relationships: pd.DataFrame,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""All the steps to finalize the entity and relationship formats."""
final_entities = finalize_entities(entities, relationships)
final_relationships = finalize_relationships(relationships)
return (final_entities, final_relationships)
return WorkflowFunctionOutput(result=result)


async def finalize_graph(
entities_table: Table,
relationships_table: Table,
) -> dict[str, list[dict[str, Any]]]:
"""Compute degrees and finalize entities and relationships.

Streams relationship rows to build a degree map without
materializing a DataFrame, then delegates to the individual
finalize operations for streaming row-by-row enrichment and
writing.

Args
----
entities_table: Table
Opened table for reading and writing entity rows.
relationships_table: Table
Opened table for reading relationships into a DataFrame
and writing finalized relationship rows.

Returns
-------
dict[str, list[dict[str, Any]]]
Sample rows keyed by ``"entities"`` and
``"relationships"``, up to 5 each.
"""
degree_map = await _build_degree_map(relationships_table)

entity_samples = await finalize_entities(entities_table, degree_map)
relationship_samples = await finalize_relationships(relationships_table, degree_map)

return {
"entities": entity_samples,
"relationships": relationship_samples,
}


async def _build_degree_map(
relationships_table: Table,
) -> dict[str, int]:
"""Stream relationship rows to compute node degrees.

Normalizes each edge to an undirected pair and deduplicates
on the fly, matching the behavior of ``compute_degree`` but
without materializing a DataFrame.

Args
----
relationships_table: Table
Opened table to stream relationship rows from.

Returns
-------
dict[str, int]
Mapping of entity title to its node degree.
"""
seen: set[tuple[str, str]] = set()
degree: Counter[str] = Counter()
async for row in relationships_table:
lo, hi = sorted((row["source"], row["target"]))
if (lo, hi) not in seen:
seen.add((lo, hi))
degree[lo] += 1
degree[hi] += 1
return dict(degree)
Loading