From 70dae823cb391e59dda38438394d691a293cad89 Mon Sep 17 00:00:00 2001 From: Gabriel Nieves Date: Wed, 19 Mar 2025 17:04:48 +0000 Subject: [PATCH 1/4] Added support for embeddings chunking as defined by the config. --- graphrag/prompt_tune/loader/input.py | 66 ++++++++++++++++++++++++++-- 1 file changed, 62 insertions(+), 4 deletions(-) diff --git a/graphrag/prompt_tune/loader/input.py b/graphrag/prompt_tune/loader/input.py index 17af53ec98..222fe7c4a9 100644 --- a/graphrag/prompt_tune/loader/input.py +++ b/graphrag/prompt_tune/loader/input.py @@ -3,12 +3,20 @@ """Input loading module.""" +import asyncio + import numpy as np import pandas as pd from graphrag.callbacks.noop_workflow_callbacks import NoopWorkflowCallbacks from graphrag.config.models.graph_rag_config import GraphRagConfig +from graphrag.config.models.language_model_config import LanguageModelConfig from graphrag.index.input.factory import create_input +from graphrag.index.operations.embed_text.strategies.openai import ( + _create_text_batches, + _prepare_embed_texts, +) +from graphrag.index.text_splitting.text_splitting import TokenTextSplitter from graphrag.index.workflows.create_base_text_units import create_base_text_units from graphrag.language_model.manager import ModelManager from graphrag.language_model.protocol.base import EmbeddingModel @@ -21,15 +29,52 @@ from graphrag.prompt_tune.types import DocSelectionType +async def _embed( + model: EmbeddingModel, chunk: list[str], semaphore: asyncio.Semaphore +) -> np.ndarray[float, np.dtype[np.float_]]: + async with semaphore: + chunk_embeddings = await model.aembed_batch(chunk) + return np.array(chunk_embeddings) + + async def _embed_chunks( + batch_size: int, + batch_max_tokens: int, text_chunks: pd.DataFrame, embedding_llm: EmbeddingModel, + config: LanguageModelConfig, + splitter: TokenTextSplitter, + logger: ProgressLogger, n_subset_max: int = N_SUBSET_MAX, ) -> tuple[pd.DataFrame, np.ndarray]: """Convert text chunks into dense text embeddings.""" - sampled_text_chunks = text_chunks.sample(n=min(n_subset_max, len(text_chunks))) - embeddings = await embedding_llm.aembed_batch(sampled_text_chunks["text"].tolist()) - return text_chunks, np.array(embeddings) + sampled_text_chunks = text_chunks.sample(n=min(n_subset_max, len(text_chunks)))[ + "text" + ].tolist() + preped_text_chunks, _ = _prepare_embed_texts(sampled_text_chunks, splitter) + semaphore: asyncio.Semaphore = asyncio.Semaphore(config.concurrent_requests) + + # Break up the input texts. The sizes here indicate how many snippets are in each input text + sampled_batches = _create_text_batches( + preped_text_chunks, + batch_size, + batch_max_tokens, + splitter, + ) + logger.info( + ( # noqa: UP034 + f"embedding {len(sampled_text_chunks)} inputs " # noqa: G004 + f"via {len(preped_text_chunks)} snippets " + f"using {len(sampled_batches)} batches. " + f"max_batch_size={batch_size}, max_tokens={batch_max_tokens}" + ) + ) + + # Embed each chunk of snippets + futures = [_embed(embedding_llm, batch, semaphore) for batch in sampled_batches] + embeddings = await asyncio.gather(*futures) + # merge results in a single list of lists (reduce the collect dimension) + return text_chunks, np.array([item for sublist in embeddings for item in sublist]) def _sample_chunks_from_embeddings( @@ -60,6 +105,12 @@ async def load_docs_in_chunks( embeddings_llm_settings = config.get_language_model_config( config.embed_text.model_id ) + batch_size = config.embed_text.batch_size + batch_max_tokens = config.embed_text.batch_max_tokens + splitter = TokenTextSplitter( + encoding_name=embeddings_llm_settings.encoding_model, + chunk_size=batch_max_tokens, + ) dataset = await create_input(config.input, logger, root) chunk_config = config.chunks @@ -97,7 +148,14 @@ async def load_docs_in_chunks( ) chunks_df, embeddings = await _embed_chunks( - chunks_df, embedding_llm, n_subset_max=n_subset_max + batch_size, + batch_max_tokens, + splitter=splitter, + text_chunks=chunks_df, + embedding_llm=embedding_llm, + config=embeddings_llm_settings, + logger=logger, + n_subset_max=n_subset_max, ) chunks_df = _sample_chunks_from_embeddings(chunks_df, embeddings, k=k) From cc72f5fdb96df2c4be37dbe5d010f8a5b62cedbd Mon Sep 17 00:00:00 2001 From: Gabriel Nieves Date: Wed, 19 Mar 2025 18:27:03 +0000 Subject: [PATCH 2/4] ran semvisor -t patch --- .semversioner/next-release/patch-20250319182609055856.json | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 .semversioner/next-release/patch-20250319182609055856.json diff --git a/.semversioner/next-release/patch-20250319182609055856.json b/.semversioner/next-release/patch-20250319182609055856.json new file mode 100644 index 0000000000..87cc8a25eb --- /dev/null +++ b/.semversioner/next-release/patch-20250319182609055856.json @@ -0,0 +1,4 @@ +{ + "type": "patch", + "description": "Added batching logic to the prompt tuning autoselection embeddings workflow" +} From 3bcc028c54c57cb22ba5c79558b5099d686133bd Mon Sep 17 00:00:00 2001 From: Gabriel Nieves Date: Wed, 19 Mar 2025 20:48:50 +0000 Subject: [PATCH 3/4] Eliminated redunant code by using the embed_text strategy directly --- graphrag/api/prompt_tune.py | 2 +- graphrag/prompt_tune/loader/input.py | 99 ++++++---------------------- 2 files changed, 20 insertions(+), 81 deletions(-) diff --git a/graphrag/api/prompt_tune.py b/graphrag/api/prompt_tune.py index 0c09714825..92f6ba94f3 100644 --- a/graphrag/api/prompt_tune.py +++ b/graphrag/api/prompt_tune.py @@ -111,7 +111,7 @@ async def generate_indexing_prompts( # if max_retries is not set, inject a dynamically assigned value based on the number of expected LLM calls # to be made or fallback to a default value in the worst case - if default_llm_settings.max_retries == -1: + if default_llm_settings.max_retries < -1: default_llm_settings.max_retries = min( len(doc_list), language_model_defaults.max_retries ) diff --git a/graphrag/prompt_tune/loader/input.py b/graphrag/prompt_tune/loader/input.py index 222fe7c4a9..7c24f02cf3 100644 --- a/graphrag/prompt_tune/loader/input.py +++ b/graphrag/prompt_tune/loader/input.py @@ -3,23 +3,17 @@ """Input loading module.""" -import asyncio - import numpy as np import pandas as pd +from graphrag.cache.noop_pipeline_cache import NoopPipelineCache from graphrag.callbacks.noop_workflow_callbacks import NoopWorkflowCallbacks from graphrag.config.models.graph_rag_config import GraphRagConfig -from graphrag.config.models.language_model_config import LanguageModelConfig from graphrag.index.input.factory import create_input from graphrag.index.operations.embed_text.strategies.openai import ( - _create_text_batches, - _prepare_embed_texts, + run as run_embed_text, ) -from graphrag.index.text_splitting.text_splitting import TokenTextSplitter from graphrag.index.workflows.create_base_text_units import create_base_text_units -from graphrag.language_model.manager import ModelManager -from graphrag.language_model.protocol.base import EmbeddingModel from graphrag.logger.base import ProgressLogger from graphrag.prompt_tune.defaults import ( LIMIT, @@ -29,57 +23,9 @@ from graphrag.prompt_tune.types import DocSelectionType -async def _embed( - model: EmbeddingModel, chunk: list[str], semaphore: asyncio.Semaphore -) -> np.ndarray[float, np.dtype[np.float_]]: - async with semaphore: - chunk_embeddings = await model.aembed_batch(chunk) - return np.array(chunk_embeddings) - - -async def _embed_chunks( - batch_size: int, - batch_max_tokens: int, - text_chunks: pd.DataFrame, - embedding_llm: EmbeddingModel, - config: LanguageModelConfig, - splitter: TokenTextSplitter, - logger: ProgressLogger, - n_subset_max: int = N_SUBSET_MAX, -) -> tuple[pd.DataFrame, np.ndarray]: - """Convert text chunks into dense text embeddings.""" - sampled_text_chunks = text_chunks.sample(n=min(n_subset_max, len(text_chunks)))[ - "text" - ].tolist() - preped_text_chunks, _ = _prepare_embed_texts(sampled_text_chunks, splitter) - semaphore: asyncio.Semaphore = asyncio.Semaphore(config.concurrent_requests) - - # Break up the input texts. The sizes here indicate how many snippets are in each input text - sampled_batches = _create_text_batches( - preped_text_chunks, - batch_size, - batch_max_tokens, - splitter, - ) - logger.info( - ( # noqa: UP034 - f"embedding {len(sampled_text_chunks)} inputs " # noqa: G004 - f"via {len(preped_text_chunks)} snippets " - f"using {len(sampled_batches)} batches. " - f"max_batch_size={batch_size}, max_tokens={batch_max_tokens}" - ) - ) - - # Embed each chunk of snippets - futures = [_embed(embedding_llm, batch, semaphore) for batch in sampled_batches] - embeddings = await asyncio.gather(*futures) - # merge results in a single list of lists (reduce the collect dimension) - return text_chunks, np.array([item for sublist in embeddings for item in sublist]) - - def _sample_chunks_from_embeddings( text_chunks: pd.DataFrame, - embeddings, + embeddings: np.ndarray[float, np.dtype[np.float_]], k: int = K, ) -> pd.DataFrame: """Sample text chunks from embeddings.""" @@ -105,13 +51,6 @@ async def load_docs_in_chunks( embeddings_llm_settings = config.get_language_model_config( config.embed_text.model_id ) - batch_size = config.embed_text.batch_size - batch_max_tokens = config.embed_text.batch_max_tokens - splitter = TokenTextSplitter( - encoding_name=embeddings_llm_settings.encoding_model, - chunk_size=batch_max_tokens, - ) - dataset = await create_input(config.input, logger, root) chunk_config = config.chunks chunks_df = create_base_text_units( @@ -139,24 +78,24 @@ async def load_docs_in_chunks( if k is None or k <= 0: msg = "k must be an integer > 0" raise ValueError(msg) - embedding_llm = ModelManager().register_embedding( - name="prompt_tuning_embeddings", - model_type=embeddings_llm_settings.type, - config=embeddings_llm_settings, - callbacks=NoopWorkflowCallbacks(), - cache=None, - ) - chunks_df, embeddings = await _embed_chunks( - batch_size, - batch_max_tokens, - splitter=splitter, - text_chunks=chunks_df, - embedding_llm=embedding_llm, - config=embeddings_llm_settings, - logger=logger, - n_subset_max=n_subset_max, + """Convert text chunks into dense text embeddings.""" + sampled_text_chunks = chunks_df.sample(n=min(n_subset_max, len(chunks_df)))[ + "text" + ].tolist() + + embedding_results = await run_embed_text( + sampled_text_chunks, + callbacks=NoopWorkflowCallbacks(), + cache=NoopPipelineCache(), + args={ + "llm": embeddings_llm_settings.model_dump(), + "num_threads": embeddings_llm_settings.concurrent_requests, + "batch_size": config.embed_text.batch_size, + "batch_max_tokens": config.embed_text.batch_max_tokens, + }, ) + embeddings = np.array(embedding_results.embeddings) chunks_df = _sample_chunks_from_embeddings(chunks_df, embeddings, k=k) # Convert the dataset to list form, so we have a list of documents From 02283799f22590355c087832651519e5cdab7cf1 Mon Sep 17 00:00:00 2001 From: Gabriel Nieves Date: Mon, 24 Mar 2025 23:41:47 +0000 Subject: [PATCH 4/4] Added fix to support brakets within the corpus text; For example, inline LaTeX within a markdown file --- graphrag/prompt_tune/loader/input.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/graphrag/prompt_tune/loader/input.py b/graphrag/prompt_tune/loader/input.py index 7c24f02cf3..fa49ebeeb9 100644 --- a/graphrag/prompt_tune/loader/input.py +++ b/graphrag/prompt_tune/loader/input.py @@ -99,4 +99,8 @@ async def load_docs_in_chunks( chunks_df = _sample_chunks_from_embeddings(chunks_df, embeddings, k=k) # Convert the dataset to list form, so we have a list of documents - return chunks_df["text"].tolist() + return [ + # need this to prevent the str.format() function from breaking when parsing LaTeX from markdown files + i.replace("{", "{{").replace("}", "}}") + for i in chunks_df["text"] + ]