From 6519368d8ebd1a19fad3a57e7e71ccf88e9ea871 Mon Sep 17 00:00:00 2001 From: Dayenne Souza Date: Fri, 27 Feb 2026 19:56:03 +0000 Subject: [PATCH 1/4] vectors bulk load_documents --- .../graphrag_vectors/azure_ai_search.py | 19 ++++-- .../graphrag_vectors/cosmosdb.py | 15 +++-- .../graphrag_vectors/lancedb.py | 65 ++++++++++--------- .../graphrag_vectors/vector_store.py | 7 +- 4 files changed, 60 insertions(+), 46 deletions(-) diff --git a/packages/graphrag-vectors/graphrag_vectors/azure_ai_search.py b/packages/graphrag-vectors/graphrag_vectors/azure_ai_search.py index 2800f166c9..fe9634b6bb 100644 --- a/packages/graphrag-vectors/graphrag_vectors/azure_ai_search.py +++ b/packages/graphrag-vectors/graphrag_vectors/azure_ai_search.py @@ -165,22 +165,27 @@ def create_index(self) -> None: index, ) - def insert(self, document: VectorStoreDocument) -> None: - """Insert a single document into Azure AI Search.""" - self._prepare_document(document) - if document.vector is not None: - doc_dict = { + def load_documents(self, documents: list[VectorStoreDocument]) -> None: + """Load documents into Azure AI Search as a single batch upload.""" + batch: list[dict[str, Any]] = [] + for document in documents: + self._prepare_document(document) + if document.vector is None: + continue + doc_dict: dict[str, Any] = { self.id_field: document.id, self.vector_field: document.vector, self.create_date_field: document.create_date, self.update_date_field: document.update_date, } - # Add additional fields if they exist in the document data if document.data: for field_name in self.fields: if field_name in document.data: doc_dict[field_name] = document.data[field_name] - self.db_connection.upload_documents([doc_dict]) + batch.append(doc_dict) + + if batch: + self.db_connection.upload_documents(batch) def _compile_filter(self, expr: FilterExpr) -> str: """Compile a FilterExpr into an Azure AI Search OData filter string.""" diff --git a/packages/graphrag-vectors/graphrag_vectors/cosmosdb.py b/packages/graphrag-vectors/graphrag_vectors/cosmosdb.py index afc2873246..42bce4bae7 100644 --- a/packages/graphrag-vectors/graphrag_vectors/cosmosdb.py +++ b/packages/graphrag-vectors/graphrag_vectors/cosmosdb.py @@ -163,17 +163,22 @@ def create_index(self) -> None: msg = "Container client is not initialized." raise ValueError(msg) - def insert(self, document: VectorStoreDocument) -> None: - """Insert a single document into CosmosDB.""" - self._prepare_document(document) - if document.vector is not None: + def load_documents(self, documents: list[VectorStoreDocument]) -> None: + """Load documents into CosmosDB. + + CosmosDB does not support native batch upsert, so each + document is upserted individually after preparation. + """ + for document in documents: + self._prepare_document(document) + if document.vector is None: + continue doc_json: dict[str, Any] = { self.id_field: document.id, self.vector_field: document.vector, self.create_date_field: document.create_date, self.update_date_field: document.update_date, } - # Add additional fields if they exist in the document data if document.data: for field_name in self.fields: if field_name in document.data: diff --git a/packages/graphrag-vectors/graphrag_vectors/lancedb.py b/packages/graphrag-vectors/graphrag_vectors/lancedb.py index 2855db293a..a7b1de7c01 100644 --- a/packages/graphrag-vectors/graphrag_vectors/lancedb.py +++ b/packages/graphrag-vectors/graphrag_vectors/lancedb.py @@ -78,38 +78,43 @@ def create_index(self) -> None: # Remove the dummy document used to set up the schema self.document_collection.delete(f"{self.id_field} = '__DUMMY__'") - def insert(self, document: VectorStoreDocument) -> None: - """Insert a single document into LanceDB.""" - self._prepare_document(document) - if document.vector is not None: - vector = np.array(document.vector, dtype=np.float32) - flat_array = pa.array(vector, type=pa.float32()) - vector_column = pa.FixedSizeListArray.from_arrays( - flat_array, self.vector_size - ) - - others = {} + def load_documents(self, documents: list[VectorStoreDocument]) -> None: + """Load documents into LanceDB as a single batch write.""" + ids: list[str] = [] + vectors: list[np.ndarray] = [] + create_dates: list[str | None] = [] + update_dates: list[str | None] = [] + field_columns: dict[str, list[Any]] = {name: [] for name in self.fields} + + for document in documents: + self._prepare_document(document) + if document.vector is None: + continue + + ids.append(str(document.id)) + vectors.append(np.array(document.vector, dtype=np.float32)) + create_dates.append(document.create_date) + update_dates.append(document.update_date) for field_name in self.fields: - others[field_name] = ( - document.data.get(field_name) if document.data else None - ) - - data = pa.table({ - self.id_field: pa.array([document.id], type=pa.string()), - self.vector_field: vector_column, - self.create_date_field: pa.array( - [document.create_date], type=pa.string() - ), - self.update_date_field: pa.array( - [document.update_date], type=pa.string() - ), - **{ - field_name: pa.array([value]) - for field_name, value in others.items() - }, - }) + value = document.data.get(field_name) if document.data else None + field_columns[field_name].append(value) + + if not ids: + return + + flat_vector = np.concatenate(vectors).astype(np.float32) + flat_array = pa.array(flat_vector, type=pa.float32()) + vector_column = pa.FixedSizeListArray.from_arrays(flat_array, self.vector_size) + + data = pa.table({ + self.id_field: pa.array(ids, type=pa.string()), + self.vector_field: vector_column, + self.create_date_field: pa.array(create_dates, type=pa.string()), + self.update_date_field: pa.array(update_dates, type=pa.string()), + **{name: pa.array(values) for name, values in field_columns.items()}, + }) - self.document_collection.add(data) + self.document_collection.add(data) def _extract_data( self, doc: dict[str, Any], select: list[str] | None = None diff --git a/packages/graphrag-vectors/graphrag_vectors/vector_store.py b/packages/graphrag-vectors/graphrag_vectors/vector_store.py index 4676f6b265..3d44fa2e69 100644 --- a/packages/graphrag-vectors/graphrag_vectors/vector_store.py +++ b/packages/graphrag-vectors/graphrag_vectors/vector_store.py @@ -140,14 +140,13 @@ def connect(self) -> None: def create_index(self) -> None: """Create index.""" + @abstractmethod def load_documents(self, documents: list[VectorStoreDocument]) -> None: """Load documents into the vector-store.""" - for doc in documents: - self.insert(doc) - @abstractmethod def insert(self, document: VectorStoreDocument) -> None: - """Insert a single document into the vector-store.""" + """Insert a single document by delegating to load_documents.""" + self.load_documents([document]) @abstractmethod def similarity_search_by_vector( From 977dd2407516acd7f4894a0ab8c753b5ad1da83d Mon Sep 17 00:00:00 2001 From: Dayenne Souza Date: Fri, 27 Feb 2026 20:27:28 +0000 Subject: [PATCH 2/4] vector load --- .../next-release/patch-20260227202720480258.json | 4 ++++ .../index/operations/embed_text/embed_text.py | 3 ++- .../operations/embed_text/test_embed_text.py | 13 ++++++------- 3 files changed, 12 insertions(+), 8 deletions(-) create mode 100644 .semversioner/next-release/patch-20260227202720480258.json diff --git a/.semversioner/next-release/patch-20260227202720480258.json b/.semversioner/next-release/patch-20260227202720480258.json new file mode 100644 index 0000000000..0ad3a4a632 --- /dev/null +++ b/.semversioner/next-release/patch-20260227202720480258.json @@ -0,0 +1,4 @@ +{ + "type": "patch", + "description": "vector load_documents in batches" +} diff --git a/packages/graphrag/graphrag/index/operations/embed_text/embed_text.py b/packages/graphrag/graphrag/index/operations/embed_text/embed_text.py index 72ba2a2ec1..9c4366f3d7 100644 --- a/packages/graphrag/graphrag/index/operations/embed_text/embed_text.py +++ b/packages/graphrag/graphrag/index/operations/embed_text/embed_text.py @@ -38,6 +38,7 @@ async def embed_text( buffer: list[dict[str, Any]] = [] total_rows = 0 + flush_size = batch_size * 4 async for row in input_table: text = row.get(embed_column) @@ -49,7 +50,7 @@ async def embed_text( embed_column: text, }) - if len(buffer) >= batch_size: + if len(buffer) >= flush_size: total_rows += await _flush_embedding_buffer( buffer, embed_column, diff --git a/tests/unit/indexing/operations/embed_text/test_embed_text.py b/tests/unit/indexing/operations/embed_text/test_embed_text.py index 9a519a6930..7a232b272a 100644 --- a/tests/unit/indexing/operations/embed_text/test_embed_text.py +++ b/tests/unit/indexing/operations/embed_text/test_embed_text.py @@ -150,8 +150,8 @@ async def test_embed_text_basic(): @pytest.mark.asyncio async def test_embed_text_batching(): - """Verify rows are flushed in batches when batch_size < total rows.""" - rows = [{"id": str(i), "text": f"text {i}"} for i in range(5)] + """Verify rows are flushed in batches when buffer exceeds batch_size * 4.""" + rows = [{"id": str(i), "text": f"text {i}"} for i in range(10)] input_table = FakeInputTable(rows) vector_store = _make_mock_vector_store() @@ -160,9 +160,8 @@ async def test_embed_text_batching(): new_callable=AsyncMock, ) as mock_run: mock_run.side_effect = [ - _make_embedding_result(2, [1.0]), + _make_embedding_result(8, [1.0]), _make_embedding_result(2, [2.0]), - _make_embedding_result(1, [3.0]), ] count = await embed_text( @@ -177,9 +176,9 @@ async def test_embed_text_batching(): vector_store=vector_store, ) - assert count == 5 - assert mock_run.call_count == 3 - assert vector_store.load_documents.call_count == 3 + assert count == 10 + assert mock_run.call_count == 2 + assert vector_store.load_documents.call_count == 2 @pytest.mark.asyncio From fafb06b5898f8fb5462e05dc0bd46ca981ddd8cd Mon Sep 17 00:00:00 2001 From: Dayenne Souza Date: Fri, 27 Feb 2026 20:36:41 +0000 Subject: [PATCH 3/4] add upsert work into dictionary --- dictionary.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/dictionary.txt b/dictionary.txt index 12bb974780..82d458ad28 100644 --- a/dictionary.txt +++ b/dictionary.txt @@ -133,6 +133,7 @@ retryer agenerate dropna notna +upsert # LLM Terms AOAI From 6aa6d921622536502fe74037af99fbc0d6e029dc Mon Sep 17 00:00:00 2001 From: Dayenne Souza Date: Fri, 27 Feb 2026 20:47:45 +0000 Subject: [PATCH 4/4] fic dictionary --- dictionary.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dictionary.txt b/dictionary.txt index 82d458ad28..13405e6061 100644 --- a/dictionary.txt +++ b/dictionary.txt @@ -133,7 +133,7 @@ retryer agenerate dropna notna -upsert +upserted # LLM Terms AOAI