diff --git a/rust/lance-core/src/deepsize.rs b/rust/lance-core/src/deepsize.rs index b6c145bb504..350c52b146c 100644 --- a/rust/lance-core/src/deepsize.rs +++ b/rust/lance-core/src/deepsize.rs @@ -87,6 +87,16 @@ impl DeepSizeOf for String { } } +impl DeepSizeOf for bytes::Bytes { + fn deep_size_of_children(&self, context: &mut Context) -> usize { + if context.mark_seen(self.as_ptr() as usize) { + self.len() + } else { + 0 + } + } +} + impl DeepSizeOf for AtomicU64 { fn deep_size_of_children(&self, _context: &mut Context) -> usize { 0 diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index a30d5ed93a9..bb46e725098 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -225,13 +225,14 @@ use futures::future::{BoxFuture, MaybeDone, maybe_done}; use futures::stream::{self, BoxStream}; use futures::{FutureExt, StreamExt}; use lance_arrow::DataTypeExt; -use lance_core::cache::LanceCache; +use lance_core::cache::{Context, DeepSizeOf, LanceCache}; use lance_core::datatypes::{ BLOB_DESC_LANCE_FIELD, Field, Schema, validate_fixed_size_list_dimensions, }; use lance_core::utils::futures::{FinallyStreamExt, StreamOnDropExt}; use lance_core::utils::parse::parse_env_as_bool; use log::{debug, trace, warn}; +use prost::Message; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::{self, unbounded_channel}; @@ -299,6 +300,15 @@ pub enum PageEncoding { Structural(pb21::PageLayout), } +impl DeepSizeOf for PageEncoding { + fn deep_size_of_children(&self, _context: &mut Context) -> usize { + match self { + Self::Legacy(encoding) => encoding.encoded_len() * 4, + Self::Structural(encoding) => encoding.encoded_len() * 4, + } + } +} + impl PageEncoding { pub fn as_legacy(&self) -> &pb::ArrayEncoding { match self { @@ -336,6 +346,13 @@ pub struct PageInfo { pub buffer_offsets_and_sizes: Arc<[(u64, u64)]>, } +impl DeepSizeOf for PageInfo { + fn deep_size_of_children(&self, context: &mut Context) -> usize { + self.encoding.deep_size_of_children(context) + + self.buffer_offsets_and_sizes.deep_size_of_children(context) + } +} + /// Metadata describing a column in a file /// /// This is typically created by reading the metadata section of a Lance file @@ -350,6 +367,14 @@ pub struct ColumnInfo { pub encoding: pb::ColumnEncoding, } +impl DeepSizeOf for ColumnInfo { + fn deep_size_of_children(&self, context: &mut Context) -> usize { + self.page_infos.deep_size_of_children(context) + + self.buffer_offsets_and_sizes.deep_size_of_children(context) + + self.encoding.encoded_len() * 4 + } +} + impl ColumnInfo { /// Create a new instance pub fn new( diff --git a/rust/lance-encoding/src/version.rs b/rust/lance-encoding/src/version.rs index 41640d0bf2d..cd8f09b011b 100644 --- a/rust/lance-encoding/src/version.rs +++ b/rust/lance-encoding/src/version.rs @@ -5,6 +5,7 @@ use std::str::FromStr; use lance_arrow::DataTypeExt; use lance_core::datatypes::Field; +use lance_core::deepsize::{Context, DeepSizeOf}; use lance_core::{Error, Result}; pub const LEGACY_FORMAT_VERSION: &str = "0.1"; @@ -38,6 +39,12 @@ pub enum LanceFileVersion { V2_3, } +impl DeepSizeOf for LanceFileVersion { + fn deep_size_of_children(&self, _context: &mut Context) -> usize { + 0 + } +} + impl LanceFileVersion { /// Convert Stable or Next to the actual version pub fn resolve(&self) -> Self { diff --git a/rust/lance-file/src/reader.rs b/rust/lance-file/src/reader.rs index c454f73819e..d5355c089b2 100644 --- a/rust/lance-file/src/reader.rs +++ b/rust/lance-file/src/reader.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors use std::{ + borrow::Cow, collections::{BTreeMap, BTreeSet}, io::Cursor, ops::Range, @@ -31,7 +32,7 @@ use prost::{Message, Name}; use lance_core::{ Error, Result, - cache::LanceCache, + cache::{CacheKey, LanceCache}, datatypes::{Field, Schema}, }; use lance_encoding::format::pb as pbenc; @@ -127,6 +128,14 @@ impl CachedFileMetadata { } } +fn column_metadata_deep_size(column_metadatas: &[pbfile::ColumnMetadata]) -> usize { + column_metadatas + .iter() + .map(|cm| cm.encoded_len() * 4) + .sum::() + + std::mem::size_of_val(column_metadatas) +} + impl DeepSizeOf for CachedFileMetadata { fn deep_size_of_children(&self, context: &mut Context) -> usize { let schema_size = self.file_schema.deep_size_of_children(context); @@ -142,49 +151,15 @@ impl DeepSizeOf for CachedFileMetadata { // as a proxy for in-memory size. The decoded representation is typically // several times larger than the wire format due to heap-allocated // repeated/string/bytes fields, so we apply a 4x multiplier. - let column_metadatas_size: usize = self - .column_metadatas - .iter() - .map(|cm| cm.encoded_len() * 4) - .sum::() - + std::mem::size_of_val(self.column_metadatas.as_slice()); + let column_metadatas_size = column_metadata_deep_size(self.column_metadatas.as_slice()); // column_infos is Vec>. Each ColumnInfo contains // page_infos (with protobuf PageEncoding), buffer offsets, and a // column-level ColumnEncoding protobuf. - let column_infos_size: usize = self - .column_infos - .iter() - .map(|ci| { - let pages_size: usize = ci - .page_infos - .iter() - .map(|pi| { - let enc_size = match &pi.encoding { - lance_encoding::decoder::PageEncoding::Legacy(e) => e.encoded_len() * 4, - lance_encoding::decoder::PageEncoding::Structural(e) => { - e.encoded_len() * 4 - } - }; - enc_size - + std::mem::size_of_val(pi.buffer_offsets_and_sizes.as_ref()) - + std::mem::size_of::() * 2 // num_rows + priority - }) - .sum(); - pages_size - + std::mem::size_of_val(ci.buffer_offsets_and_sizes.as_ref()) - + ci.encoding.encoded_len() * 4 - + std::mem::size_of::() // index - + std::mem::size_of::() * 2 // Arc overhead - }) - .sum(); + let column_infos_size = self.column_infos.deep_size_of_children(context); // Global buffer bytes retained for zero-IO reads (copied out of the tail). - let retained_buffers_size: usize = self - .retained_global_buffers - .values() - .map(|buf| buf.len()) - .sum(); + let retained_buffers_size = self.retained_global_buffers.deep_size_of_children(context); schema_size + buffers_size @@ -194,6 +169,65 @@ impl DeepSizeOf for CachedFileMetadata { } } +/// Lightweight file metadata used to locate per-column metadata on demand. +/// +/// This contains the file-level schema, row count, global buffer descriptors, +/// and column metadata offset table. Unlike [`CachedFileMetadata`], it does not +/// hold decoded metadata for every column. +#[derive(Debug, DeepSizeOf)] +pub struct FileMetadataIndex { + file_schema: Arc, + num_rows: u64, + file_buffers: Vec, + column_metadata_offsets: Arc<[(u64, u64)]>, + num_columns: u32, + version: LanceFileVersion, + file_size_bytes: u64, + retained_global_buffers: BTreeMap, +} + +impl FileMetadataIndex { + /// Returns the total size of the file in bytes. + pub fn file_size(&self) -> u64 { + self.file_size_bytes + } + + /// Returns the number of physical columns in the file. + pub fn num_columns(&self) -> u32 { + self.num_columns + } +} + +#[derive(Debug)] +struct CachedColumnMetadata { + column_metadata: pbfile::ColumnMetadata, + column_info: Arc, +} + +impl DeepSizeOf for CachedColumnMetadata { + fn deep_size_of_children(&self, context: &mut Context) -> usize { + column_metadata_deep_size(std::slice::from_ref(&self.column_metadata)) + + self.column_info.deep_size_of_children(context) + } +} + +#[derive(Debug, Clone)] +struct ColumnMetadataCacheKey { + column_index: u32, +} + +impl CacheKey for ColumnMetadataCacheKey { + type ValueType = CachedColumnMetadata; + + fn key(&self) -> Cow<'_, str> { + Cow::Owned(format!("column_metadata/{}", self.column_index)) + } + + fn type_name() -> &'static str { + "ColumnMetadata" + } +} + impl CachedFileMetadata { pub fn version(&self) -> LanceFileVersion { match (self.major_version, self.minor_version) { @@ -430,16 +464,45 @@ impl Default for FileReaderOptions { } #[derive(Debug, Clone)] -pub struct FileReader { +struct PreparedProjection { + column_infos: Vec>, + decoder_projection: ReaderProjection, +} + +#[derive(Debug, Clone)] +enum FileMetadataProvider { + Full(Arc), + Indexed(Arc), +} + +#[derive(Debug, Clone)] +struct FileReadCore { scheduler: Arc, - // The default projection to be applied to all reads base_projection: ReaderProjection, - num_rows: u64, - metadata: Arc, + metadata_provider: FileMetadataProvider, decoder_plugins: Arc, cache: Arc, options: FileReaderOptions, } + +/// A projection-scoped reader for Lance files. +/// +/// This reader fixes a base projection at construction time. All later reads +/// must stay within that projection, which lets the reader load only the column +/// metadata needed by the base projection when opening from a [`FileMetadataIndex`]. +/// It intentionally does not expose APIs that require synchronous access to full +/// file metadata. +#[derive(Debug, Clone)] +pub struct ProjectedFileReader { + core: FileReadCore, +} + +/// A Lance file reader backed by fully decoded file metadata. +#[derive(Debug, Clone)] +pub struct FileReader { + core: FileReadCore, + metadata: Arc, +} #[derive(Debug)] struct Footer { #[allow(dead_code)] @@ -460,13 +523,8 @@ const FOOTER_LEN: usize = 40; impl FileReader { pub fn with_scheduler(&self, scheduler: Arc) -> Self { Self { - scheduler, - base_projection: self.base_projection.clone(), - cache: self.cache.clone(), - decoder_plugins: self.decoder_plugins.clone(), + core: self.core.with_scheduler(scheduler), metadata: self.metadata.clone(), - options: self.options.clone(), - num_rows: self.num_rows, } } @@ -481,23 +539,23 @@ impl FileReader { &self, stats: Arc, ) -> Self { - match self.scheduler.with_io_stats(stats) { + match self.core.scheduler.with_io_stats(stats) { Some(scheduler) => self.with_scheduler(scheduler), None => self.clone(), } } pub fn num_rows(&self) -> u64 { - self.num_rows + self.core.num_rows() } pub fn metadata(&self) -> &Arc { &self.metadata } - pub fn file_statistics(&self) -> FileStatistics { - let column_metadatas = &self.metadata().column_metadatas; - + fn statistics_from_column_metadata( + column_metadatas: &[pbfile::ColumnMetadata], + ) -> FileStatistics { let column_stats = column_metadatas .iter() .map(|col_metadata| { @@ -519,22 +577,12 @@ impl FileReader { } } - pub async fn read_global_buffer(&self, index: u32) -> Result { - let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len())))?; - - // If the buffer's bytes were captured by the tail read at open, serve them - // from memory with no additional I/O. Larger buffers (outside the window) - // are not retained and fall back to a dedicated read. - if let Some(bytes) = self.metadata.retained_global_buffers.get(&index) { - return Ok(bytes.clone()); - } + pub fn file_statistics(&self) -> FileStatistics { + Self::statistics_from_column_metadata(&self.metadata().column_metadatas) + } - self.scheduler - .submit_single( - buffer_desc.position..buffer_desc.position + buffer_desc.size, - 0, - ) - .await + pub async fn read_global_buffer(&self, index: u32) -> Result { + self.core.read_global_buffer(index).await } async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> { @@ -548,6 +596,47 @@ impl FileReader { Ok((tail_bytes, file_size)) } + async fn read_range_from_tail_or_scheduler( + tail_bytes: &Bytes, + tail_offset: u64, + scheduler: &FileScheduler, + range: Range, + ) -> Result { + let tail_end = tail_offset + tail_bytes.len() as u64; + if range.start >= tail_offset && range.end <= tail_end { + let rel_start = (range.start - tail_offset) as usize; + let rel_end = (range.end - tail_offset) as usize; + Ok(tail_bytes.slice(rel_start..rel_end)) + } else { + scheduler.submit_single(range, 0).await + } + } + + fn retained_global_buffers_from_tail( + gbo_table: &[BufferDescriptor], + tail_bytes: &Bytes, + tail_offset: u64, + ) -> BTreeMap { + let tail_end = tail_offset + tail_bytes.len() as u64; + gbo_table + .iter() + .enumerate() + .skip(1) + .filter_map(|(index, buffer)| { + let start = buffer.position; + let end = buffer.position + buffer.size; + if start >= tail_offset && end <= tail_end { + let rel_start = (start - tail_offset) as usize; + let rel_end = (end - tail_offset) as usize; + let bytes = Bytes::copy_from_slice(&tail_bytes[rel_start..rel_end]); + Some((index as u32, bytes)) + } else { + None + } + }) + .collect() + } + // Checks to make sure the footer is written correctly and returns the // position of the file descriptor (which comes from the footer) fn decode_footer(footer_bytes: &Bytes) -> Result