From 2062ebcdce6a4373be673f208408f919e95c0fb6 Mon Sep 17 00:00:00 2001 From: Nathan Evans Date: Wed, 11 Feb 2026 13:49:45 -0800 Subject: [PATCH 1/2] Add async iterator support to InputReader and use in load workflows InputReader now implements __aiter__ so it can be used as `async for doc in reader`. The core iteration logic is in _iterate_files(), and read_files() delegates to the iterator for batch loading. Both load_input_documents and load_update_documents workflows now use the async iterator with dataclasses.asdict for DataFrame construction. --- .../patch-20260211214912747264.json | 4 +++ .../graphrag_input/input_reader.py | 34 +++++++++++++------ .../index/workflows/load_input_documents.py | 4 ++- .../index/workflows/load_update_documents.py | 6 +++- 4 files changed, 35 insertions(+), 13 deletions(-) create mode 100644 .semversioner/next-release/patch-20260211214912747264.json diff --git a/.semversioner/next-release/patch-20260211214912747264.json b/.semversioner/next-release/patch-20260211214912747264.json new file mode 100644 index 0000000000..e65444da75 --- /dev/null +++ b/.semversioner/next-release/patch-20260211214912747264.json @@ -0,0 +1,4 @@ +{ + "type": "patch", + "description": "Add async iterator support to InputReader and use it in load_input_documents and load_update_documents workflows." +} diff --git a/packages/graphrag-input/graphrag_input/input_reader.py b/packages/graphrag-input/graphrag_input/input_reader.py index be95168336..ae840eb8f2 100644 --- a/packages/graphrag-input/graphrag_input/input_reader.py +++ b/packages/graphrag-input/graphrag_input/input_reader.py @@ -11,6 +11,8 @@ from typing import TYPE_CHECKING if TYPE_CHECKING: + from collections.abc import AsyncIterator + from graphrag_storage import Storage from graphrag_input.text_document import TextDocument @@ -33,34 +35,44 @@ def __init__( self._file_pattern = file_pattern async def read_files(self) -> list[TextDocument]: - """Load files from storage and apply a loader function based on file type. Process metadata on the results if needed.""" + """Load all files from storage and return them as a single list.""" + return [doc async for doc in self] + + def __aiter__(self) -> AsyncIterator[TextDocument]: + """Return the async iterator, enabling `async for doc in reader`.""" + return self._iterate_files() + + async def _iterate_files(self) -> AsyncIterator[TextDocument]: + """Async generator that yields documents one at a time as files are loaded.""" files = list(self._storage.find(re.compile(self._file_pattern))) if len(files) == 0: msg = f"No {self._file_pattern} matches found in storage" logger.warning(msg) - files = [] + return - documents: list[TextDocument] = [] + file_count = len(files) + doc_count = 0 for file in files: try: - documents.extend(await self.read_file(file)) + for doc in await self.read_file(file): + doc_count += 1 + yield doc except Exception as e: # noqa: BLE001 (catching Exception is fine here) logger.warning("Warning! Error loading file %s. Skipping...", file) logger.warning("Error: %s", e) logger.info( "Found %d %s files, loading %d", - len(files), + file_count, self._file_pattern, - len(documents), + doc_count, ) - total_files_log = ( - f"Total number of unfiltered {self._file_pattern} rows: {len(documents)}" + logger.info( + "Total number of unfiltered %s rows: %d", + self._file_pattern, + doc_count, ) - logger.info(total_files_log) - - return documents @abstractmethod async def read_file(self, path: str) -> list[TextDocument]: diff --git a/packages/graphrag/graphrag/index/workflows/load_input_documents.py b/packages/graphrag/graphrag/index/workflows/load_input_documents.py index ed7f83c8e2..aacc2965b3 100644 --- a/packages/graphrag/graphrag/index/workflows/load_input_documents.py +++ b/packages/graphrag/graphrag/index/workflows/load_input_documents.py @@ -4,6 +4,7 @@ """A module containing run_workflow method definition.""" import logging +from dataclasses import asdict import pandas as pd from graphrag_input import InputReader, create_input_reader @@ -39,4 +40,5 @@ async def run_workflow( async def load_input_documents(input_reader: InputReader) -> pd.DataFrame: """Load and parse input documents into a standard format.""" - return pd.DataFrame(await input_reader.read_files()) + documents = [asdict(doc) async for doc in input_reader] + return pd.DataFrame(documents) diff --git a/packages/graphrag/graphrag/index/workflows/load_update_documents.py b/packages/graphrag/graphrag/index/workflows/load_update_documents.py index 3f4417d3e1..8b138b3767 100644 --- a/packages/graphrag/graphrag/index/workflows/load_update_documents.py +++ b/packages/graphrag/graphrag/index/workflows/load_update_documents.py @@ -4,6 +4,7 @@ """A module containing run_workflow method definition.""" import logging +from dataclasses import asdict import pandas as pd from graphrag_input.input_reader import InputReader @@ -50,7 +51,10 @@ async def load_update_documents( previous_table_provider: TableProvider, ) -> pd.DataFrame: """Load and parse update-only input documents into a standard format.""" - input_documents = pd.DataFrame(await input_reader.read_files()) + input_documents = [] + async for doc in input_reader: + input_documents.append(asdict(doc)) + input_documents = pd.DataFrame(input_documents) # previous table provider has the output of the previous run # we'll use this to diff the input from the prior delta_documents = await get_delta_docs(input_documents, previous_table_provider) From 113ef062489ff5cda7c22d7b5aa86f0cb5b16869 Mon Sep 17 00:00:00 2001 From: Nathan Evans Date: Wed, 11 Feb 2026 13:54:15 -0800 Subject: [PATCH 2/2] Format --- docs/examples_notebooks/api_overview.ipynb | 5 ++--- docs/examples_notebooks/input_documents.ipynb | 5 ++--- .../graphrag/index/workflows/load_update_documents.py | 4 +--- tests/verbs/test_create_community_reports.py | 8 ++++---- unified-search-app/app/app_logic.py | 3 +-- 5 files changed, 10 insertions(+), 15 deletions(-) diff --git a/docs/examples_notebooks/api_overview.ipynb b/docs/examples_notebooks/api_overview.ipynb index abcd7832fc..2a0c0f15de 100644 --- a/docs/examples_notebooks/api_overview.ipynb +++ b/docs/examples_notebooks/api_overview.ipynb @@ -28,11 +28,10 @@ "from pathlib import Path\n", "from pprint import pprint\n", "\n", + "import graphrag.api as api\n", "import pandas as pd\n", "from graphrag.config.load_config import load_config\n", - "from graphrag.index.typing.pipeline_run_result import PipelineRunResult\n", - "\n", - "import graphrag.api as api" + "from graphrag.index.typing.pipeline_run_result import PipelineRunResult" ] }, { diff --git a/docs/examples_notebooks/input_documents.ipynb b/docs/examples_notebooks/input_documents.ipynb index 505c0fe1f3..5657770eaf 100644 --- a/docs/examples_notebooks/input_documents.ipynb +++ b/docs/examples_notebooks/input_documents.ipynb @@ -30,11 +30,10 @@ "from pathlib import Path\n", "from pprint import pprint\n", "\n", + "import graphrag.api as api\n", "import pandas as pd\n", "from graphrag.config.load_config import load_config\n", - "from graphrag.index.typing.pipeline_run_result import PipelineRunResult\n", - "\n", - "import graphrag.api as api" + "from graphrag.index.typing.pipeline_run_result import PipelineRunResult" ] }, { diff --git a/packages/graphrag/graphrag/index/workflows/load_update_documents.py b/packages/graphrag/graphrag/index/workflows/load_update_documents.py index 8b138b3767..f29d02f09e 100644 --- a/packages/graphrag/graphrag/index/workflows/load_update_documents.py +++ b/packages/graphrag/graphrag/index/workflows/load_update_documents.py @@ -51,9 +51,7 @@ async def load_update_documents( previous_table_provider: TableProvider, ) -> pd.DataFrame: """Load and parse update-only input documents into a standard format.""" - input_documents = [] - async for doc in input_reader: - input_documents.append(asdict(doc)) + input_documents = [asdict(doc) async for doc in input_reader] input_documents = pd.DataFrame(input_documents) # previous table provider has the output of the previous run # we'll use this to diff the input from the prior diff --git a/tests/verbs/test_create_community_reports.py b/tests/verbs/test_create_community_reports.py index 5a40b05449..68d8d1be9c 100644 --- a/tests/verbs/test_create_community_reports.py +++ b/tests/verbs/test_create_community_reports.py @@ -3,14 +3,14 @@ from graphrag.data_model.schemas import COMMUNITY_REPORTS_FINAL_COLUMNS -from graphrag.index.workflows.create_community_reports import ( - run_workflow, -) - from graphrag.index.operations.summarize_communities.community_reports_extractor import ( CommunityReportResponse, FindingModel, ) +from graphrag.index.workflows.create_community_reports import ( + run_workflow, +) + from tests.unit.config.utils import get_default_graphrag_config from .util import ( diff --git a/unified-search-app/app/app_logic.py b/unified-search-app/app/app_logic.py index a573b9daa5..dc64e0e77c 100644 --- a/unified-search-app/app/app_logic.py +++ b/unified-search-app/app/app_logic.py @@ -7,6 +7,7 @@ import logging from typing import TYPE_CHECKING +import graphrag.api as api import streamlit as st from knowledge_loader.data_sources.loader import ( create_datasource, @@ -17,8 +18,6 @@ from state.session_variables import SessionVariables from ui.search import display_search_result -import graphrag.api as api - if TYPE_CHECKING: import pandas as pd