diff --git a/rust/lance-io/src/object_writer.rs b/rust/lance-io/src/object_writer.rs index 4b9bb901446..0fd0a30f9e7 100644 --- a/rust/lance-io/src/object_writer.rs +++ b/rust/lance-io/src/object_writer.rs @@ -47,8 +47,12 @@ fn max_conn_reset_retries() -> u16 { }) } -/// Maximum part size in GCS and S3: 5GB. -const MAX_UPLOAD_PART_SIZE: usize = 1024 * 1024 * 1024 * 5; +/// Maximum body size for a single S3 PUT: strictly less than 5 GiB. +/// AWS rejects single-PUT bodies of exactly 5 GiB (= 5 * 1024^3) with +/// `EntityTooLarge`, so we clamp `LANCE_INITIAL_UPLOAD_SIZE` one byte +/// below that threshold to keep the buffer-fills-to-clamp single-PUT +/// path safe. See lance#6750 for the related txn-file write fix. +const MAX_UPLOAD_PART_SIZE: usize = 1024 * 1024 * 1024 * 5 - 1; /// Clamps a requested upload part size to the valid [5MB, 5GB] range. /// Returns the clamped value and whether clamping was necessary. @@ -898,4 +902,17 @@ mod tests { (MAX_UPLOAD_PART_SIZE, true) ); } + + /// Regression for the foot-gun where `LANCE_INITIAL_UPLOAD_SIZE=5368709120` + /// (exactly 5 GiB, Pucheng's setting) caused a single-PUT of 5 GiB on + /// shutdown — which S3 rejects with `EntityTooLarge`. After tightening + /// `MAX_UPLOAD_PART_SIZE` to 5 GiB - 1, raw 5 GiB must clamp DOWN. + #[test] + fn clamp_initial_upload_size_at_5gib_clamps_down() { + let exactly_5_gib: usize = 5 * 1024 * 1024 * 1024; + assert_eq!( + clamp_initial_upload_size(exactly_5_gib), + (MAX_UPLOAD_PART_SIZE, true) + ); + } } diff --git a/rust/lance-table/src/io/commit/external_manifest.rs b/rust/lance-table/src/io/commit/external_manifest.rs index a6c9bbaa90d..22ebaa10b4a 100644 --- a/rust/lance-table/src/io/commit/external_manifest.rs +++ b/rust/lance-table/src/io/commit/external_manifest.rs @@ -8,6 +8,8 @@ use std::sync::Arc; use async_trait::async_trait; +use bytes::Bytes; +use futures::StreamExt; use lance_core::utils::tracing::{ AUDIT_MODE_CREATE, AUDIT_MODE_DELETE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT, }; @@ -123,7 +125,7 @@ pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync { // Step 2: Copy staging to final path let final_path = naming_scheme.manifest_path(base_path, version); - let copied = match object_store.copy(staging_path, &final_path).await { + let copied = match copy_size_aware(object_store, staging_path, &final_path, size).await { Ok(_) => true, Err(ObjectStoreError::NotFound { .. }) => false, Err(e) => return Err(e.into()), @@ -213,6 +215,129 @@ pub(crate) fn detect_naming_scheme_from_path(path: &Path) -> Result5 GiB copy is slower than a native copy would be. +const MAX_SERVER_SIDE_COPY_BYTES: u64 = 5 * 1024 * 1024 * 1024; + +/// Part size for the read+rewrite fallback. Multipart-capable stores +/// (S3, GCS) require every part except the last to be ≥5 MB and allow up to +/// 10,000 parts. 100 MB sits comfortably inside both bounds and keeps the +/// part count low (~140 parts for a 14 GB manifest) without large per-part +/// RAM. +const COPY_REWRITE_PART_SIZE: usize = 100 * 1024 * 1024; + +/// Copy `from` to `to`, falling back to a multipart-equivalent read+rewrite +/// when the source exceeds the server-side-copy size limit +/// (`MAX_SERVER_SIDE_COPY_BYTES`). +/// +/// For sources below the limit, this is the same fast server-side +/// `store.copy()` as before. For larger sources, the source is streamed +/// through the client and re-uploaded as a multipart upload at `to`. This +/// doubles bytes-on-the-wire for the rare large case while preserving the +/// cheap fast path for the common small case. +/// +/// `size` is the known source size. It is required: the only caller already +/// has it, and the alternative (an extra `head(from)` round-trip) is work +/// the caller can avoid by passing what it already knows. +/// +/// `NotFound` errors on `from` propagate unchanged so callers can keep +/// existing `Err(NotFound { .. })` arms. +/// +/// This is a workaround for the missing `UploadPartCopy` primitive in the +/// upstream `object_store` crate. Once that lands, this helper can be +/// deleted and the call sites can go back to plain `store.copy()`. +async fn copy_size_aware( + store: &dyn OSObjectStore, + from: &Path, + to: &Path, + size: u64, +) -> std::result::Result<(), ObjectStoreError> { + if size < MAX_SERVER_SIDE_COPY_BYTES { + store.copy(from, to).await + } else { + copy_via_read_rewrite(store, from, to).await + } +} + +// NOTE: parts are uploaded sequentially. This could be parallelized (a +// bounded JoinSet, like lance-io/src/object_writer.rs's +// LANCE_UPLOAD_CONCURRENCY) or sidestepped entirely by switching to +// `object_store::WriteMultipart` (which also handles abort-on-drop). Left +// sequential here: this is a cold path (only >5 GiB manifests) and the +// helper is itself a stopgap until `object_store` exposes UploadPartCopy. +async fn copy_via_read_rewrite( + store: &dyn OSObjectStore, + from: &Path, + to: &Path, +) -> std::result::Result<(), ObjectStoreError> { + // NotFound here propagates upward unchanged. + let mut stream = store.get(from).await?.into_stream(); + + // From here on, errors must `abort()` the upload to avoid leaving an + // orphan multipart upload on stores that support them (e.g. S3, GCS), + // which would otherwise incur storage charges until the bucket's + // lifecycle policy cleans it up. + // + // Note: this does NOT cover task cancellation — `MultipartUpload`'s + // upstream Drop is documented as a no-op for S3/GCS. Callers that + // need cancellation cleanliness should run this with a guard or + // switch to `object_store::WriteMultipart` (planned follow-up). + let mut upload = store.put_multipart(to).await?; + let mut part_buf: Vec = Vec::with_capacity(COPY_REWRITE_PART_SIZE); + + while let Some(chunk) = stream.next().await { + let chunk = match chunk { + Ok(b) => b, + Err(e) => { + let _ = upload.abort().await; + return Err(e); + } + }; + // Append the chunk in COPY_REWRITE_PART_SIZE-bounded slices so a + // single oversized chunk (e.g., LocalFileSystem returning a whole + // file) cannot push part_buf past the backend's per-part size limit + // (5 GiB on S3/GCS). COPY_REWRITE_PART_SIZE is well under every + // backend's cap, so each flushed part is always valid. + let mut offset = 0; + while offset < chunk.len() { + let want = COPY_REWRITE_PART_SIZE - part_buf.len(); + let take = want.min(chunk.len() - offset); + part_buf.extend_from_slice(&chunk[offset..offset + take]); + offset += take; + + if part_buf.len() >= COPY_REWRITE_PART_SIZE { + let payload = + std::mem::replace(&mut part_buf, Vec::with_capacity(COPY_REWRITE_PART_SIZE)); + if let Err(e) = upload.put_part(Bytes::from(payload).into()).await { + let _ = upload.abort().await; + return Err(e); + } + } + } + } + + // Flush the final (possibly-short) part. The last part of a multipart + // upload is exempt from the per-part minimum on S3/GCS. + if !part_buf.is_empty() + && let Err(e) = upload.put_part(Bytes::from(part_buf).into()).await + { + let _ = upload.abort().await; + return Err(e); + } + + if let Err(e) = upload.complete().await { + let _ = upload.abort().await; + return Err(e); + } + Ok(()) +} + /// External manifest commit handler /// This handler is used to commit a manifest to an external store /// for detailed design, see @@ -245,14 +370,12 @@ impl ExternalManifestCommitHandler { // step 1: copy the manifest to the final location let final_manifest_path = naming_scheme.manifest_path(base_path, version); - let copied = match store - .copy(staging_manifest_path, &final_manifest_path) - .await - { - Ok(_) => true, - Err(ObjectStoreError::NotFound { .. }) => false, // Another writer beat us to it. - Err(e) => return Err(e.into()), - }; + let copied = + match copy_size_aware(store, staging_manifest_path, &final_manifest_path, size).await { + Ok(_) => true, + Err(ObjectStoreError::NotFound { .. }) => false, // Another writer beat us to it. + Err(e) => return Err(e.into()), + }; if copied { info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = final_manifest_path.as_ref()); } diff --git a/rust/lance/src/io/commit/external_manifest.rs b/rust/lance/src/io/commit/external_manifest.rs index eee4fbf07b6..850d10f9a23 100644 --- a/rust/lance/src/io/commit/external_manifest.rs +++ b/rust/lance/src/io/commit/external_manifest.rs @@ -4,10 +4,14 @@ /// Keep the tests in `lance` crate because it has dependency on [Dataset]. #[cfg(test)] mod test { + use std::ops::Range; use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; use std::{collections::HashMap, time::Duration}; use async_trait::async_trait; + use bytes::Bytes; + use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt, future::join_all}; use lance_core::{Error, Result}; use lance_table::io::commit::external_manifest::{ @@ -15,7 +19,12 @@ mod test { }; use lance_table::io::commit::{CommitHandler, ManifestNamingScheme}; use lance_testing::datagen::{BatchGenerator, IncrementingInt32}; - use object_store::{ObjectStoreExt, local::LocalFileSystem, path::Path}; + use object_store::memory::InMemory; + use object_store::{ + CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, + ObjectStore as OSObjectStore, ObjectStoreExt, PutMultipartOptions, PutOptions, PutPayload, + PutResult, RenameOptions, Result as OSResult, local::LocalFileSystem, path::Path, + }; use tokio::sync::Mutex; use crate::dataset::builder::DatasetBuilder; @@ -420,4 +429,288 @@ mod test { .collect::>(); assert!(unexpected_entries.is_empty(), "{:?}", unexpected_entries); } + + /// S3's `CopyObject` API has a hard 5 GB cap on the source object size. + /// Above that, callers must use multipart copy (`UploadPartCopy`) instead. + /// `lance-table::io::commit::external_manifest` calls + /// `object_store.copy(staging, final)` unconditionally on the manifest + /// commit path — which fails for manifests >5 GB. + /// + /// This wrapper enforces that S3-side cap on top of any inner store, so + /// the regression can be reproduced in-process without S3. + /// + /// It also lets the test override `head().size` for a chosen path, so the + /// staging file can *appear* to be 14 GB without actually putting that + /// many bytes into the inner store. + const S3_COPY_OBJECT_CAP_BYTES: u64 = 5 * 1024 * 1024 * 1024; + + #[derive(Debug)] + struct CopyCapStore { + inner: Arc, + /// path → fake size returned by head(); overrides the inner store. + head_size_overrides: Arc>>, + /// Counts calls to `copy_opts` (the fast path). Tests use this to + /// assert which branch of `copy_size_aware` was taken — succeeding + /// alone is not enough, since the slow path can also succeed for + /// small files. + copy_calls: AtomicUsize, + /// Counts calls to `put_multipart_opts` (the slow read+rewrite path). + put_multipart_calls: AtomicUsize, + } + + impl CopyCapStore { + fn new(inner: Arc) -> Self { + Self { + inner, + head_size_overrides: Arc::new(Mutex::new(HashMap::new())), + copy_calls: AtomicUsize::new(0), + put_multipart_calls: AtomicUsize::new(0), + } + } + + async fn override_size(&self, path: &Path, size: u64) { + self.head_size_overrides + .lock() + .await + .insert(path.to_string(), size); + } + + async fn effective_size(&self, location: &Path, real: u64) -> u64 { + self.head_size_overrides + .lock() + .await + .get(&location.to_string()) + .copied() + .unwrap_or(real) + } + + fn copy_calls(&self) -> usize { + self.copy_calls.load(Ordering::SeqCst) + } + + fn put_multipart_calls(&self) -> usize { + self.put_multipart_calls.load(Ordering::SeqCst) + } + } + + impl std::fmt::Display for CopyCapStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "CopyCapStore({})", self.inner) + } + } + + #[async_trait] + impl OSObjectStore for CopyCapStore { + async fn put_opts( + &self, + location: &Path, + bytes: PutPayload, + opts: PutOptions, + ) -> OSResult { + self.inner.put_opts(location, bytes, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> OSResult> { + self.put_multipart_calls.fetch_add(1, Ordering::SeqCst); + self.inner.put_multipart_opts(location, opts).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult { + // `head()` is a default method on `ObjectStore` that delegates to + // `get_opts(location, GetOptions { head: true, .. })`. To make a + // staging file *appear* to be 14 GB without holding 14 GB in + // memory, we override the size in the returned ObjectMeta here. + let mut res = self.inner.get_opts(location, options).await?; + let overridden = self.effective_size(location, res.meta.size).await; + res.meta.size = overridden; + Ok(res) + } + + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> OSResult> { + self.inner.get_ranges(location, ranges).await + } + + // `head` and `delete` are default methods on `ObjectStore`, derived + // from `get_opts`/`delete_stream`. We override `head` indirectly by + // overriding `get_opts` below — it returns size based on the + // overrides table for the chosen path. + fn delete_stream( + &self, + locations: BoxStream<'static, OSResult>, + ) -> BoxStream<'static, OSResult> { + self.inner.delete_stream(locations) + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, OSResult> { + self.inner.list(prefix) + } + + fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> BoxStream<'static, OSResult> { + self.inner.list_with_offset(prefix, offset) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy_opts(&self, from: &Path, to: &Path, opts: CopyOptions) -> OSResult<()> { + // Mimic S3's CopyObject 5 GB hard cap: read the (possibly-overridden) + // size of the source via head() and reject if it crosses the cap. + let meta = self.head(from).await?; + if meta.size >= S3_COPY_OBJECT_CAP_BYTES { + return Err(object_store::Error::Generic { + store: "S3", + source: format!( + "EntityTooLarge: ProposedSize {} exceeds CopyObject 5GB cap", + meta.size + ) + .into(), + }); + } + self.copy_calls.fetch_add(1, Ordering::SeqCst); + self.inner.copy_opts(from, to, opts).await + } + + async fn rename_opts(&self, from: &Path, to: &Path, opts: RenameOptions) -> OSResult<()> { + self.inner.rename_opts(from, to, opts).await + } + } + + /// Repro for the manifest >5 GB bug. + /// + /// Drives `ExternalManifestStore::put` (the default impl) against a + /// staging file whose `head().size` is reported as 14 GB. That `put` + /// calls `object_store.copy(staging, final)` unconditionally — which + /// our `CopyCapStore` wrapper rejects with the same `EntityTooLarge` + /// error S3 returns in production. + /// + /// Today this test is RED: the copy step fails on >5 GB. + /// After `copy_size_aware` lands, it should turn GREEN by falling back + /// to a multipart-equivalent path (option 1: read+rewrite via + /// `ObjectWriter`). + #[tokio::test] + async fn manifest_commit_succeeds_when_staging_exceeds_5gb_copy_cap() { + let inner: Arc = Arc::new(InMemory::new()); + let capped = Arc::new(CopyCapStore::new(inner)); + + // Write a small staging file, then lie about its size so the + // CopyObject cap fires without holding 14 GB in memory. + let base_path = Path::from("repro"); + let staging_path = Path::from("repro/_versions/1.manifest.staging-abcd"); + let body = b"fake manifest body"; + capped + .put(&staging_path, PutPayload::from_static(body)) + .await + .expect("seed staging file"); + capped + .override_size(&staging_path, 14_961_429_442) // matches the production failure + .await; + + // Spin up an ExternalManifestStore and drive `put` (the same code + // path the failing CTAS hits via ExternalManifestCommitHandler). + let external = SleepyExternalManifestStore::new(); + let head_meta = capped.head(&staging_path).await.unwrap(); + + let location = external + .put( + &base_path, + 1, + &staging_path, + head_meta.size, + head_meta.e_tag, + capped.as_ref(), + ManifestNamingScheme::V2, + ) + .await + .expect( + "manifest commit should succeed for a >5 GB staging file via multipart-aware copy", + ); + + // Branch-taken assertions: the slow read+rewrite path was used. + assert_eq!( + capped.copy_calls(), + 0, + "CopyObject must not be attempted for >5 GiB sources" + ); + assert!( + capped.put_multipart_calls() >= 1, + "read+rewrite path must initiate a multipart upload" + ); + + // End-state assertions: final manifest exists with the original + // bytes, and the staging file was deleted. + let final_get = capped + .inner + .get(&location.path) + .await + .expect("final manifest must exist on the inner store") + .bytes() + .await + .unwrap(); + assert_eq!(final_get.as_ref(), body); + let staging_after = capped.inner.head(&staging_path).await; + assert!( + matches!(staging_after, Err(object_store::Error::NotFound { .. })), + "staging file must be cleaned up after commit, got: {:?}", + staging_after + ); + } + + /// Counterpart to manifest_commit_succeeds_when_staging_exceeds_5gb_copy_cap. + /// Confirms that for staging files BELOW the 5 GB cap, the fast-path + /// server-side `copy()` is still used — i.e. we haven't accidentally + /// regressed every commit to read+rewrite. + #[tokio::test] + async fn manifest_commit_uses_fast_copy_for_small_staging() { + let inner: Arc = Arc::new(InMemory::new()); + let capped = Arc::new(CopyCapStore::new(inner)); + + let base_path = Path::from("repro"); + let staging_path = Path::from("repro/_versions/1.manifest.staging-abcd"); + capped + .put( + &staging_path, + PutPayload::from_static(b"small manifest body"), + ) + .await + .expect("seed staging file"); + // No size override — the staging file's real size is ~20 bytes, + // well below the 5 GB cap, so copy_size_aware must take the fast + // path. + + let external = SleepyExternalManifestStore::new(); + let head_meta = capped.head(&staging_path).await.unwrap(); + + external + .put( + &base_path, + 1, + &staging_path, + head_meta.size, + head_meta.e_tag, + capped.as_ref(), + ManifestNamingScheme::V2, + ) + .await + .expect("small manifest commit must succeed via fast-path copy"); + + // The branch-taken assertion: fast path was used, slow path was not. + assert!( + capped.copy_calls() >= 1, + "small-file commit must use server-side CopyObject" + ); + assert_eq!( + capped.put_multipart_calls(), + 0, + "small-file commit must NOT initiate a multipart upload" + ); + } }