Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20250430211223127781.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "Update as workflows"
}
2 changes: 1 addition & 1 deletion graphrag/api/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def build_index(
if memory_profile:
log.warning("New pipeline does not yet support memory profiling.")

pipeline = PipelineFactory.create_pipeline(config, method)
pipeline = PipelineFactory.create_pipeline(config, method, is_update_run)

workflow_callbacks.pipeline_start(pipeline.names())

Expand Down
60 changes: 23 additions & 37 deletions graphrag/index/run/run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,14 @@

import pandas as pd

from graphrag.cache.pipeline_cache import PipelineCache
from graphrag.callbacks.noop_workflow_callbacks import NoopWorkflowCallbacks
from graphrag.callbacks.workflow_callbacks import WorkflowCallbacks
from graphrag.config.models.graph_rag_config import GraphRagConfig
from graphrag.index.input.factory import create_input
from graphrag.index.run.utils import create_run_context
from graphrag.index.typing.context import PipelineRunContext
from graphrag.index.typing.pipeline import Pipeline
from graphrag.index.typing.pipeline_run_result import PipelineRunResult
from graphrag.index.update.incremental_index import (
get_delta_docs,
update_dataframe_outputs,
)
from graphrag.index.update.incremental_index import get_delta_docs
from graphrag.logger.base import ProgressLogger
from graphrag.logger.progress import Progress
from graphrag.storage.pipeline_storage import PipelineStorage
Expand All @@ -50,6 +45,10 @@ async def run_pipeline(

dataset = await create_input(config.input, logger, root_dir)

# load existing state in case any workflows are stateful
state_json = await storage.get("context.json")
state = json.loads(state_json) if state_json else {}

if is_update_run:
logger.info("Running incremental indexing.")

Expand All @@ -62,48 +61,45 @@ async def run_pipeline(
else:
update_storage = create_storage_from_config(config.update_index_output)
# we use this to store the new subset index, and will merge its content with the previous index
timestamped_storage = update_storage.child(time.strftime("%Y%m%d-%H%M%S"))
update_timestamp = time.strftime("%Y%m%d-%H%M%S")
timestamped_storage = update_storage.child(update_timestamp)
delta_storage = timestamped_storage.child("delta")
# copy the previous output to a backup folder, so we can replace it with the update
# we'll read from this later when we merge the old and new indexes
previous_storage = timestamped_storage.child("previous")
await _copy_previous_output(storage, previous_storage)

state["update_timestamp"] = update_timestamp

context = create_run_context(
storage=delta_storage, cache=cache, callbacks=callbacks, state=state
)

# Run the pipeline on the new documents
async for table in _run_pipeline(
pipeline=pipeline,
config=config,
dataset=delta_dataset.new_inputs,
cache=cache,
storage=delta_storage,
callbacks=callbacks,
logger=logger,
context=context,
):
yield table

logger.success("Finished running workflows on new documents.")

await update_dataframe_outputs(
previous_storage=previous_storage,
delta_storage=delta_storage,
output_storage=storage,
config=config,
cache=cache,
callbacks=NoopWorkflowCallbacks(),
progress_logger=logger,
)

else:
logger.info("Running standard indexing.")

context = create_run_context(
storage=storage, cache=cache, callbacks=callbacks, state=state
)

async for table in _run_pipeline(
pipeline=pipeline,
config=config,
dataset=dataset,
cache=cache,
storage=storage,
callbacks=callbacks,
logger=logger,
context=context,
):
yield table

Expand All @@ -112,21 +108,11 @@ async def _run_pipeline(
pipeline: Pipeline,
config: GraphRagConfig,
dataset: pd.DataFrame,
cache: PipelineCache,
storage: PipelineStorage,
callbacks: WorkflowCallbacks,
logger: ProgressLogger,
context: PipelineRunContext,
) -> AsyncIterable[PipelineRunResult]:
start_time = time.time()

# load existing state in case any workflows are stateful
state_json = await storage.get("context.json")
state = json.loads(state_json) if state_json else {}

context = create_run_context(
storage=storage, cache=cache, callbacks=callbacks, state=state
)

log.info("Final # of rows loaded: %s", len(dataset))
context.stats.num_documents = len(dataset)
last_workflow = "starting documents"
Expand All @@ -138,11 +124,11 @@ async def _run_pipeline(
for name, workflow_function in pipeline.run():
last_workflow = name
progress = logger.child(name, transient=False)
callbacks.workflow_start(name, None)
context.callbacks.workflow_start(name, None)
work_time = time.time()
result = await workflow_function(config, context)
progress(Progress(percent=1))
callbacks.workflow_end(name, result)
context.callbacks.workflow_end(name, result)
yield PipelineRunResult(
workflow=name, result=result.result, state=context.state, errors=None
)
Expand All @@ -154,7 +140,7 @@ async def _run_pipeline(

except Exception as e:
log.exception("error running workflow %s", last_workflow)
callbacks.error("Error running pipeline!", e, traceback.format_exc())
context.callbacks.error("Error running pipeline!", e, traceback.format_exc())
yield PipelineRunResult(
workflow=last_workflow, result=None, state=context.state, errors=[e]
)
Expand Down
15 changes: 15 additions & 0 deletions graphrag/index/run/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
from graphrag.callbacks.progress_workflow_callbacks import ProgressWorkflowCallbacks
from graphrag.callbacks.workflow_callbacks import WorkflowCallbacks
from graphrag.callbacks.workflow_callbacks_manager import WorkflowCallbacksManager
from graphrag.config.models.graph_rag_config import GraphRagConfig
from graphrag.index.typing.context import PipelineRunContext
from graphrag.index.typing.state import PipelineState
from graphrag.index.typing.stats import PipelineRunStats
from graphrag.logger.base import ProgressLogger
from graphrag.storage.memory_pipeline_storage import MemoryPipelineStorage
from graphrag.storage.pipeline_storage import PipelineStorage
from graphrag.utils.api import create_storage_from_config


def create_run_context(
Expand Down Expand Up @@ -44,3 +46,16 @@ def create_callback_chain(
if progress is not None:
manager.register(ProgressWorkflowCallbacks(progress))
return manager


def get_update_storages(
config: GraphRagConfig, timestamp: str
) -> tuple[PipelineStorage, PipelineStorage, PipelineStorage]:
"""Get storage objects for the update index run."""
output_storage = create_storage_from_config(config.output)
update_storage = create_storage_from_config(config.update_index_output)
timestamped_storage = update_storage.child(timestamp)
delta_storage = timestamped_storage.child("delta")
previous_storage = timestamped_storage.child("previous")

return output_storage, previous_storage, delta_storage
Loading
Loading