diff --git a/core/config.py b/core/config.py index 0d0b8fb2..1a4f81d3 100644 --- a/core/config.py +++ b/core/config.py @@ -123,6 +123,9 @@ class Settings(BaseSettings): MULTIVECTOR_STORE_PROVIDER: Literal["postgres", "morphik"] = "postgres" # Enable dual ingestion to both fast and slow multivector stores during migration ENABLE_DUAL_MULTIVECTOR_INGESTION: bool = False + MULTIVECTOR_BLOCK_STORAGE_ENABLED: bool = True + MULTIVECTOR_BLOCK_SIZE: int = 2048 + MULTIVECTOR_BLOCK_PREFIX: str = "multivector_blocks" # Colpali configuration ENABLE_COLPALI: bool diff --git a/core/vector_store/fast_multivector_store.py b/core/vector_store/fast_multivector_store.py index 378aa507..4dd457ae 100644 --- a/core/vector_store/fast_multivector_store.py +++ b/core/vector_store/fast_multivector_store.py @@ -5,6 +5,7 @@ import os import tempfile import time +from collections import defaultdict from contextlib import contextmanager from io import BytesIO from logging.handlers import RotatingFileHandler @@ -26,6 +27,7 @@ from core.storage.utils_file_extensions import detect_file_type from .base_vector_store import BaseVectorStore +from .multivector_block import BlockPointer, MultiVectorBlock, MultiVectorBlockEntry logger = logging.getLogger(__name__) @@ -225,15 +227,18 @@ def __init__(self, uri: str, tpuf_api_key: str, namespace: str = "public", regio self.vector_storage, self.vector_bucket = self._init_vector_storage() # Maintain legacy attribute for backwards compatibility with other components self.storage = self.chunk_storage - cache_settings = get_settings() - cache_enabled = cache_settings.CACHE_ENABLED - cache_path = Path(cache_settings.CACHE_PATH or "./storage/cache") - cache_limit = cache_settings.CACHE_MAX_BYTES + settings = get_settings() + cache_enabled = settings.CACHE_ENABLED + cache_path = Path(settings.CACHE_PATH or "./storage/cache") + cache_limit = settings.CACHE_MAX_BYTES self.cache = FileCacheManager( enabled=cache_enabled, base_dir=cache_path, max_bytes=cache_limit, ) + self.block_storage_enabled = bool(getattr(settings, "MULTIVECTOR_BLOCK_STORAGE_ENABLED", True)) + self.block_size = max(1, int(getattr(settings, "MULTIVECTOR_BLOCK_SIZE", 2048))) + self.block_prefix = getattr(settings, "MULTIVECTOR_BLOCK_PREFIX", "multivector_blocks") or "multivector_blocks" self.fde_config = fde.FixedDimensionalEncodingConfig( dimension=128, num_repetitions=20, @@ -328,9 +333,144 @@ def _create_storage( case _: raise ValueError(f"Unsupported storage provider: {provider}") + def _block_object_key(self, app_id: Optional[str]) -> str: + namespace = (app_id or self.namespace).strip("/ ") + prefix = (self.block_prefix or "multivector_blocks").strip("/ ") + return f"{prefix}/{namespace}/{uuid4().hex}.bin" + + def _coerce_multivector_array(self, embedding: Any) -> Optional[np.ndarray]: + """Convert raw embedding payload into a 2D numpy array or return None on failure.""" + try: + arr = np.asarray(embedding, dtype=np.float32) + except Exception as exc: # noqa: BLE001 + logger.debug("Failed to coerce embedding into array: %s", exc) + return None + + if arr.ndim == 1: + arr = arr.reshape(1, -1) + if arr.ndim != 2 or arr.size == 0: + return None + if not arr.flags["C_CONTIGUOUS"]: + arr = np.ascontiguousarray(arr) + return arr + + async def _upload_block( + self, entries: List[MultiVectorBlockEntry], app_id: Optional[str] + ) -> Tuple[str, str, int]: + """Serialize and upload a multivector block, returning (bucket, key, count).""" + block = MultiVectorBlock.build(entries) + payload = block.to_bytes() + object_key = self._block_object_key(app_id) + bucket_arg = "" if isinstance(self.vector_storage, LocalStorage) else (self.vector_bucket or "") + + logger.debug( + "Uploading multivector block %s with %d entries (%d bytes)", + object_key, + len(entries), + len(payload), + ) + + bucket, saved_key = await self.vector_storage.upload_file(payload, object_key, bucket=bucket_arg) + if isinstance(self.vector_storage, LocalStorage): + bucket = "" + + await self.cache.delete("vectors", bucket, saved_key) + return bucket, saved_key, len(entries) + + async def _flush_block_entries( + self, + staged: List[Tuple[int, DocumentChunk, np.ndarray]], + app_id: Optional[str], + results: List[Tuple[str, str]], + ) -> None: + """Upload staged entries as a block; fall back to legacy storage on failure.""" + try: + entries = [ + MultiVectorBlockEntry(chunk.document_id, chunk.chunk_number, arr) for _, chunk, arr in staged + ] + bucket, base_key, _ = await self._upload_block(entries, app_id) + for local_idx, (idx, _chunk, _arr) in enumerate(staged): + results[idx] = (bucket, f"{base_key}#{local_idx}") + return + except Exception as exc: # noqa: BLE001 + logger.error( + "Failed to upload multivector block (%d entries); falling back to legacy objects: %s", + len(staged), + exc, + ) + + for result_idx, chunk, _ in staged: + bucket, key = await self._save_legacy_multivector_to_storage(chunk) + results[result_idx] = (bucket, key) + def initialize(self): return True + async def _save_legacy_multivector_to_storage(self, chunk: DocumentChunk) -> Tuple[str, str]: + """Persist a single multivector as a standalone object (backwards-compatible path).""" + as_np = np.array(chunk.embedding) + save_path = f"multivector/{chunk.document_id}/{chunk.chunk_number}.npy" + with tempfile.NamedTemporaryFile(suffix=".npy") as temp_file: + np.save(temp_file, as_np) + if isinstance(self.vector_storage, S3Storage): + target_bucket = self.vector_bucket or self.vector_storage.default_bucket + self.vector_storage._ensure_bucket(target_bucket) # type: ignore[attr-defined] + self.vector_storage.s3_client.upload_file(temp_file.name, target_bucket, save_path) + bucket, key = target_bucket, save_path + else: + bucket_arg = "" if isinstance(self.vector_storage, LocalStorage) else (self.vector_bucket or "") + bucket, key = await self.vector_storage.upload_file(temp_file.name, save_path, bucket=bucket_arg) + if isinstance(self.vector_storage, LocalStorage): + bucket = "" + temp_file.close() + await self.cache.delete("vectors", bucket, key) + return bucket, key + + async def save_multivectors_to_blocks( + self, chunks: List[DocumentChunk], app_id: Optional[str] = None + ) -> List[Tuple[str, str]]: + """ + Store multivectors using block layout when enabled, falling back to legacy objects as needed. + Returns (bucket, key) pairs aligned with input chunks. + """ + if not chunks: + return [] + + if not self.block_storage_enabled: + legacy_results = await asyncio.gather(*[self._save_legacy_multivector_to_storage(c) for c in chunks]) + return list(legacy_results) + + results: List[Tuple[str, str]] = [("", "") for _ in chunks] + staged: List[Tuple[int, DocumentChunk, np.ndarray]] = [] + legacy_chunks: List[Tuple[int, DocumentChunk]] = [] + + for idx, chunk in enumerate(chunks): + coerced = self._coerce_multivector_array(chunk.embedding) + if coerced is None: + legacy_chunks.append((idx, chunk)) + continue + + staged.append((idx, chunk, coerced)) + if len(staged) >= self.block_size: + await self._flush_block_entries(staged, app_id, results) + staged = [] + + if staged: + await self._flush_block_entries(staged, app_id, results) + + if legacy_chunks: + legacy_results = await asyncio.gather( + *[self._save_legacy_multivector_to_storage(chunk) for _, chunk in legacy_chunks] + ) + for (idx, _chunk), stored in zip(legacy_chunks, legacy_results): + results[idx] = stored + + missing = [i for i, (_bucket, key) in enumerate(results) if not key] + if missing: + raise ValueError(f"Failed to store multivectors for positions: {missing}") + + return results + async def store_embeddings( self, chunks: List[DocumentChunk], app_id: Optional[str] = None ) -> Tuple[bool, List[str]]: @@ -340,13 +480,15 @@ async def store_embeddings( ] storage_keys = await asyncio.gather(*[self._save_chunk_to_storage(chunk, app_id) for chunk in chunks]) stored_ids = [f"{chunk.document_id}-{chunk.chunk_number}" for chunk in chunks] - doc_ids, chunk_numbers, metdatas, multivecs = [], [], [], [] + doc_ids, chunk_numbers, metdatas = [], [], [] + multivector_refs = await self.save_multivectors_to_blocks(chunks, app_id) + multivecs = [[bucket, key] for bucket, key in multivector_refs] + block_keys = {key.split("#", 1)[0] for _, key in multivector_refs if "#" in key} + legacy_multivectors = sum(1 for _bucket, key in multivector_refs if "#" not in key) for chunk in chunks: doc_ids.append(chunk.document_id) chunk_numbers.append(chunk.chunk_number) metdatas.append(json.dumps(chunk.metadata)) - bucket, key = await self.save_multivector_to_storage(chunk) - multivecs.append([bucket, key]) result = await self.ns(app_id).write( upsert_columns={ "id": stored_ids, @@ -359,7 +501,13 @@ async def store_embeddings( }, distance_metric="cosine_distance", ) - logger.info(f"Stored {len(chunks)} chunks, tpuf ns: {result.model_dump_json()}") + logger.info( + "Stored %d chunks (%d block objects, %d legacy objects), tpuf ns: %s", + len(chunks), + len(block_keys), + legacy_multivectors, + result.model_dump_json(), + ) return True, stored_ids async def query_similar( @@ -402,10 +550,7 @@ async def query_similar( ) return [] - multivector_retrieval_tasks = [ - self.load_multivector_from_storage(r["multivector"][0], r["multivector"][1]) for r in result.rows - ] - multivectors = await asyncio.gather(*multivector_retrieval_tasks) + multivectors = await self._load_multivectors_for_rows(result.rows) t3 = time.perf_counter() logger.info(f"query_similar timing - load_multivectors: {(t3 - t2)*1000:.2f} ms") @@ -516,26 +661,18 @@ async def delete_chunks_by_document_id(self, document_id: str, app_id: Optional[ return True - async def save_multivector_to_storage(self, chunk: DocumentChunk) -> Tuple[str, str]: - as_np = np.array(chunk.embedding) - save_path = f"multivector/{chunk.document_id}/{chunk.chunk_number}.npy" - with tempfile.NamedTemporaryFile(suffix=".npy") as temp_file: - np.save(temp_file, as_np) # , allow_pickle=True) - if isinstance(self.vector_storage, S3Storage): - target_bucket = self.vector_bucket or self.vector_storage.default_bucket - self.vector_storage._ensure_bucket(target_bucket) # type: ignore[attr-defined] - self.vector_storage.s3_client.upload_file(temp_file.name, target_bucket, save_path) - bucket, key = target_bucket, save_path - else: - bucket_arg = "" if isinstance(self.vector_storage, LocalStorage) else (self.vector_bucket or "") - bucket, key = await self.vector_storage.upload_file(temp_file.name, save_path, bucket=bucket_arg) - if isinstance(self.vector_storage, LocalStorage): - bucket = "" - temp_file.close() - await self.cache.delete("vectors", bucket, key) - return bucket, key - - async def load_multivector_from_storage(self, bucket: str, key: str) -> torch.Tensor: + async def save_multivector_to_storage( + self, chunk: DocumentChunk, app_id: Optional[str] = None + ) -> Tuple[str, str]: + """Compatibility wrapper that stores a single multivector using block or legacy layout.""" + stored = await self.save_multivectors_to_blocks([chunk], app_id) + if not stored: + raise RuntimeError("Failed to store multivector for chunk.") + return stored[0] + + def _resolve_vector_bucket(self, bucket: str) -> Tuple[str, str]: + """Normalize bucket for local storage and derive cache bucket.""" + bucket = bucket or "" primary_bucket = bucket if isinstance(self.vector_storage, LocalStorage): storage_root = getattr(self.vector_storage, "storage_path", None) @@ -554,50 +691,137 @@ async def load_multivector_from_storage(self, bucket: str, key: str) -> torch.Te primary_bucket = "" cache_bucket = primary_bucket or bucket - cached_bytes = await self.cache.get("vectors", cache_bucket, key) - if cached_bytes is not None: - try: - as_np = np.load(BytesIO(cached_bytes)) - return torch.from_numpy(as_np).float() - except Exception as cache_exc: # noqa: BLE001 - logger.warning( - "Vector cache entry for bucket %s key %s is invalid; purging and reloading: %s", - cache_bucket, - key, - cache_exc, - ) - await self.cache.delete("vectors", cache_bucket, key) - cached_bytes = None + return primary_bucket, cache_bucket - if cached_bytes is None: - try: - content = await self.vector_storage.download_file(primary_bucket, key) - except Exception as primary_exc: # noqa: BLE001 - if self.vector_storage is self.chunk_storage or not bucket: - raise - logger.warning( - "Primary vector storage failed to load %s/%s, falling back to chunk storage: %s", - bucket, - key, - primary_exc, - ) - content = await self.chunk_storage.download_file(bucket, key) - await self.cache.put("vectors", cache_bucket, key, content) - cached_bytes = content + async def _download_vector_content(self, bucket: str, key: str, use_cache: bool = True) -> bytes: + """Download vector content with cache and storage fallback handling.""" + bucket = bucket or "" + primary_bucket, cache_bucket = self._resolve_vector_bucket(bucket) + + if use_cache: + cached_bytes = await self.cache.get("vectors", cache_bucket, key) + if cached_bytes is not None: + return cached_bytes try: - as_np = np.load(BytesIO(cached_bytes)) - except Exception as exc: # noqa: BLE001 - await self.cache.delete("vectors", cache_bucket, key) - logger.error( - "Failed to deserialize vector content for bucket %s key %s after refresh: %s", - cache_bucket, + content = await self.vector_storage.download_file(primary_bucket, key) + except Exception as primary_exc: # noqa: BLE001 + if self.vector_storage is self.chunk_storage or not bucket: + raise + logger.warning( + "Primary vector storage failed to load %s/%s, falling back to chunk storage: %s", + bucket, key, + primary_exc, + ) + content = await self.chunk_storage.download_file(bucket, key) + + await self.cache.put("vectors", cache_bucket, key, content) + return content + + async def _load_block(self, bucket: str, base_key: str) -> MultiVectorBlock: + """Load and decode a block, refreshing cache on decode failure.""" + _, cache_bucket = self._resolve_vector_bucket(bucket) + payload = await self._download_vector_content(bucket, base_key) + try: + return MultiVectorBlock.from_bytes(payload) + except Exception as exc: # noqa: BLE001 + logger.warning( + "Block decode failed for %s/%s, retrying with cache refresh: %s", + bucket, + base_key, exc, ) - raise + await self.cache.delete("vectors", cache_bucket, base_key) + payload = await self._download_vector_content(bucket, base_key, use_cache=False) + return MultiVectorBlock.from_bytes(payload) + + async def _load_legacy_multivector(self, bucket: str, key: str) -> torch.Tensor: + """Load a legacy per-chunk multivector object.""" + _, cache_bucket = self._resolve_vector_bucket(bucket) + payload = await self._download_vector_content(bucket, key) + try: + as_np = np.load(BytesIO(payload)) + except Exception as exc: # noqa: BLE001 + logger.warning("Legacy vector decode failed for %s/%s, refreshing cache: %s", bucket, key, exc) + await self.cache.delete("vectors", cache_bucket, key) + payload = await self._download_vector_content(bucket, key, use_cache=False) + as_np = np.load(BytesIO(payload)) return torch.from_numpy(as_np).float() + async def _load_multivectors_for_rows(self, rows: List[Any]) -> List[torch.Tensor]: + """Batch load multivectors, deduplicating block fetches and falling back to legacy objects.""" + pointers: List[BlockPointer] = [] + for row in rows: + multivector_ref = self._row_get(row, "multivector") + if not (isinstance(multivector_ref, (list, tuple)) and len(multivector_ref) == 2): + raise ValueError(f"Row missing multivector reference: {row}") + bucket, key = multivector_ref + if not isinstance(bucket, str): + bucket = bucket or "" + if not isinstance(key, str): + raise ValueError(f"Multivector key must be a string, got {type(key)}") + pointers.append(BlockPointer.parse(bucket, key)) + + block_groups: Dict[Tuple[str, str], List[Tuple[int, BlockPointer]]] = defaultdict(list) + results: List[Optional[torch.Tensor]] = [None for _ in rows] + legacy_indices: List[int] = [] + + for idx, pointer in enumerate(pointers): + if pointer.is_block and pointer.item_index is not None: + block_groups[(pointer.bucket, pointer.base_key)].append((idx, pointer)) + else: + legacy_indices.append(idx) + + if block_groups: + block_tasks = { + (bucket, base_key): asyncio.create_task(self._load_block(bucket, base_key)) + for bucket, base_key in block_groups + } + block_results = await asyncio.gather(*block_tasks.values(), return_exceptions=True) + for (bucket, base_key), block_obj in zip(block_tasks.keys(), block_results): + if isinstance(block_obj, Exception): + raise ValueError(f"Failed to load multivector block {bucket}/{base_key}: {block_obj}") from block_obj + + for idx, ptr in block_groups[(bucket, base_key)]: + try: + emb = block_obj.embedding_at(ptr.item_index or 0) + results[idx] = torch.from_numpy(emb).float() + except Exception as exc: # noqa: BLE001 + raise ValueError( + f"Failed to decode multivector index {ptr.item_index} in block {bucket}/{base_key}: {exc}" + ) from exc + + if legacy_indices: + legacy_tasks = [ + self._load_legacy_multivector(pointers[i].bucket, pointers[i].raw_key) for i in legacy_indices + ] + legacy_results = await asyncio.gather(*legacy_tasks) + for idx, tensor in zip(legacy_indices, legacy_results): + results[idx] = tensor + + logger.debug( + "Loaded %d multivectors via %d block(s) and %d legacy object(s)", + len(results), + len(block_groups), + len(legacy_indices), + ) + + missing = [i for i, tensor in enumerate(results) if tensor is None] + if missing: + raise ValueError(f"Missing multivectors for result rows at positions: {missing}") + + return [tensor for tensor in results if tensor is not None] + + async def load_multivector_from_storage(self, bucket: str, key: str) -> torch.Tensor: + pointer = BlockPointer.parse(bucket or "", key) + if "#" in key and not pointer.is_block: + raise ValueError(f"Malformed block pointer key: {key}") + if pointer.is_block: + block = await self._load_block(pointer.bucket, pointer.base_key) + return torch.from_numpy(block.embedding_at(pointer.item_index or 0)).float() + return await self._load_legacy_multivector(bucket, key) + @contextmanager def get_connection(self): """Get a PostgreSQL connection with retry logic. @@ -749,9 +973,10 @@ def _is_storage_key(self, content: str) -> bool: @staticmethod def _normalize_storage_key(key: str) -> str: - if key.startswith(f"{MULTIVECTOR_CHUNKS_BUCKET}/"): - return key[len(MULTIVECTOR_CHUNKS_BUCKET) + 1 :] - return key + base_key = key.split("#", 1)[0] + if base_key.startswith(f"{MULTIVECTOR_CHUNKS_BUCKET}/"): + return base_key[len(MULTIVECTOR_CHUNKS_BUCKET) + 1 :] + return base_key async def _download_chunk_bytes(self, bucket: str, storage_key: str) -> Optional[bytes]: """Attempt to fetch chunk payload bytes from storage, considering legacy/variant keys. @@ -917,8 +1142,9 @@ async def _collect_storage_targets( if isinstance(multivector, (list, tuple)) and len(multivector) == 2: bucket, key = multivector if isinstance(bucket, str) and isinstance(key, str): - normalized_bucket = bucket if bucket else "" - targets["vector"].add((normalized_bucket, self._normalize_storage_key(key))) + pointer = BlockPointer.parse(bucket or "", key) + normalized_bucket = pointer.cache_bucket + targets["vector"].add((normalized_bucket, self._normalize_storage_key(pointer.base_key))) row_id = self._row_get(row, "id") if isinstance(row_id, str): diff --git a/core/vector_store/multivector_block.py b/core/vector_store/multivector_block.py new file mode 100644 index 00000000..defcc301 --- /dev/null +++ b/core/vector_store/multivector_block.py @@ -0,0 +1,224 @@ +from __future__ import annotations + +import struct +from dataclasses import dataclass +from typing import Iterable, List, Optional, Sequence + +import numpy as np + +_MAGIC = b"MVBL" +_VERSION = 1 +_HEADER_STRUCT = struct.Struct("<4sBBI") # magic, version, dtype_code, num_items +_INDEX_STRUCT = struct.Struct(" str: + return self.bucket or "" + + @property + def raw_key(self) -> str: + if self.item_index is None: + return self.base_key + return f"{self.base_key}#{self.item_index}" + + @property + def is_block(self) -> bool: + return self.item_index is not None + + @classmethod + def parse(cls, bucket: str, key: str) -> "BlockPointer": + """Parse raw storage bucket/key into a structured pointer.""" + if "#" not in key: + return cls(bucket=bucket, base_key=key, item_index=None) + + base, _, idx_str = key.rpartition("#") + try: + idx = int(idx_str) + except ValueError: + # Unexpected delimiter usage – treat as a plain object key. + return cls(bucket=bucket, base_key=key, item_index=None) + return cls(bucket=bucket, base_key=base, item_index=idx) + + +def _dtype_to_code(dtype: np.dtype) -> int: + code = _DTYPE_TO_CODE.get(np.dtype(dtype)) + if code is None: + raise ValueError(f"Unsupported dtype for block encoding: {dtype}") + return code + + +def _code_to_dtype(code: int) -> np.dtype: + dtype = _CODE_TO_DTYPE.get(code) + if dtype is None: + raise ValueError(f"Unknown dtype code in block payload: {code}") + return dtype + + +class MultiVectorBlock: + """Binary block containing many multivectors + lightweight index metadata.""" + + def __init__(self, dtype: np.dtype, indices: List[MultiVectorBlockIndex], payload: bytes | memoryview): + if not indices: + raise ValueError("Cannot create a block without indices.") + self.dtype = np.dtype(dtype) + self.indices = indices + self.payload = payload if isinstance(payload, memoryview) else memoryview(payload) + + @classmethod + def build(cls, entries: Sequence[MultiVectorBlockEntry], dtype: np.dtype | None = None) -> "MultiVectorBlock": + """Construct a block from in-memory entries.""" + if not entries: + raise ValueError("Cannot build a block with zero entries.") + + target_dtype = np.dtype(dtype or np.float16) + _dtype_to_code(target_dtype) # Validate support early. + + payload_chunks: List[bytes] = [] + indices: List[MultiVectorBlockIndex] = [] + offset = 0 + + for entry in entries: + doc_id_bytes = entry.doc_id.encode("utf-8") + if len(doc_id_bytes) > (2**16 - 1): + raise ValueError(f"Document id too long for block entry: {entry.doc_id!r}") + + arr = np.asarray(entry.embedding, dtype=np.float32) + if arr.ndim == 1: + arr = arr.reshape(1, -1) + if arr.ndim != 2 or arr.size == 0: + raise ValueError("Embeddings must be 2D arrays with at least one element.") + if not arr.flags["C_CONTIGUOUS"]: + arr = np.ascontiguousarray(arr) + + arr_cast = arr.astype(target_dtype, copy=False) + flat = arr_cast.reshape(-1) + length = int(flat.size) + indices.append( + MultiVectorBlockIndex( + doc_id=entry.doc_id, + chunk_number=entry.chunk_number, + num_modalities=int(arr.shape[0]), + dim=int(arr.shape[1]), + offset=int(offset), + length=length, + ) + ) + payload_chunks.append(flat.tobytes()) + offset += length + + payload = b"".join(payload_chunks) + return cls(dtype=target_dtype, indices=indices, payload=payload) + + def to_bytes(self) -> bytes: + """Serialize block to bytes.""" + dtype_code = _dtype_to_code(self.dtype) + parts: List[bytes] = [ + _HEADER_STRUCT.pack(_MAGIC, _VERSION, dtype_code, len(self.indices)), + ] + for idx in self.indices: + doc_bytes = idx.doc_id.encode("utf-8") + parts.append( + _INDEX_STRUCT.pack( + len(doc_bytes), + idx.chunk_number, + idx.num_modalities, + idx.dim, + idx.offset, + idx.length, + ) + ) + parts.append(doc_bytes) + + parts.append(self.payload.tobytes()) + return b"".join(parts) + + @classmethod + def from_bytes(cls, data: bytes | memoryview) -> "MultiVectorBlock": + """Parse a serialized block back into an in-memory structure.""" + buf = data if isinstance(data, memoryview) else memoryview(data) + if len(buf) < _HEADER_STRUCT.size: + raise ValueError("Block payload is too small to contain a header.") + + magic, version, dtype_code, num_items = _HEADER_STRUCT.unpack_from(buf, 0) + if magic != _MAGIC: + raise ValueError("Invalid block magic header.") + if version != _VERSION: + raise ValueError(f"Unsupported block version: {version}") + + dtype = _code_to_dtype(dtype_code) + + offset = _HEADER_STRUCT.size + indices: List[MultiVectorBlockIndex] = [] + for _ in range(num_items): + if offset + _INDEX_STRUCT.size > len(buf): + raise ValueError("Block index table truncated.") + doc_len, chunk_number, num_modalities, dim, vec_offset, length = _INDEX_STRUCT.unpack_from(buf, offset) + offset += _INDEX_STRUCT.size + + doc_end = offset + doc_len + if doc_end > len(buf): + raise ValueError("Block index entry extends past payload.") + doc_id = bytes(buf[offset:doc_end]).decode("utf-8", errors="replace") + offset = doc_end + + indices.append( + MultiVectorBlockIndex( + doc_id=doc_id, + chunk_number=chunk_number, + num_modalities=num_modalities, + dim=dim, + offset=vec_offset, + length=length, + ) + ) + + payload = buf[offset:] + return cls(dtype=dtype, indices=indices, payload=payload) + + def embedding_at(self, index: int) -> np.ndarray: + """Return multivector at index as float32 for downstream scoring.""" + if index < 0 or index >= len(self.indices): + raise IndexError(f"Block index {index} out of range for {len(self.indices)} items.") + meta = self.indices[index] + start = meta.offset * self.dtype.itemsize + end = start + meta.length * self.dtype.itemsize + arr = np.frombuffer(self.payload[start:end], dtype=self.dtype, count=meta.length) + return arr.reshape(meta.num_modalities, meta.dim).astype(np.float32) + + def iter_indices(self) -> Iterable[MultiVectorBlockIndex]: + """Expose indices for diagnostics or metadata scans.""" + return iter(self.indices)