-
Notifications
You must be signed in to change notification settings - Fork 730
fix: support manifests >5 GB via size-aware copy #7047
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,8 @@ | |
| use std::sync::Arc; | ||
|
|
||
| use async_trait::async_trait; | ||
| use bytes::Bytes; | ||
| use futures::StreamExt; | ||
| use lance_core::utils::tracing::{ | ||
| AUDIT_MODE_CREATE, AUDIT_MODE_DELETE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT, | ||
| }; | ||
|
|
@@ -123,7 +125,7 @@ pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync { | |
|
|
||
| // Step 2: Copy staging to final path | ||
| let final_path = naming_scheme.manifest_path(base_path, version); | ||
| let copied = match object_store.copy(staging_path, &final_path).await { | ||
| let copied = match copy_size_aware(object_store, staging_path, &final_path, size).await { | ||
| Ok(_) => true, | ||
| Err(ObjectStoreError::NotFound { .. }) => false, | ||
| Err(e) => return Err(e.into()), | ||
|
|
@@ -213,6 +215,129 @@ pub(crate) fn detect_naming_scheme_from_path(path: &Path) -> Result<ManifestNami | |
| }) | ||
| } | ||
|
|
||
| /// The most conservative server-side-copy size limit across the object | ||
| /// stores we support. This is not S3-specific: S3's `CopyObject` and GCS's | ||
| /// single-shot `Objects: copy` both reject sources above ~5 GiB, so we use | ||
| /// 5 GiB as a backend-agnostic threshold. Above it we stream the source | ||
| /// through the client and re-upload via multipart instead of relying on a | ||
| /// server-side copy. Stores that have no such cap (e.g. local filesystem) | ||
| /// also take the fallback above this size — correctness is preserved; only | ||
| /// the rare >5 GiB copy is slower than a native copy would be. | ||
| const MAX_SERVER_SIDE_COPY_BYTES: u64 = 5 * 1024 * 1024 * 1024; | ||
|
|
||
| /// Part size for the read+rewrite fallback. Multipart-capable stores | ||
| /// (S3, GCS) require every part except the last to be ≥5 MB and allow up to | ||
| /// 10,000 parts. 100 MB sits comfortably inside both bounds and keeps the | ||
| /// part count low (~140 parts for a 14 GB manifest) without large per-part | ||
| /// RAM. | ||
| const COPY_REWRITE_PART_SIZE: usize = 100 * 1024 * 1024; | ||
|
|
||
| /// Copy `from` to `to`, falling back to a multipart-equivalent read+rewrite | ||
| /// when the source exceeds the server-side-copy size limit | ||
| /// (`MAX_SERVER_SIDE_COPY_BYTES`). | ||
| /// | ||
| /// For sources below the limit, this is the same fast server-side | ||
| /// `store.copy()` as before. For larger sources, the source is streamed | ||
| /// through the client and re-uploaded as a multipart upload at `to`. This | ||
| /// doubles bytes-on-the-wire for the rare large case while preserving the | ||
| /// cheap fast path for the common small case. | ||
| /// | ||
| /// `size` is the known source size. It is required: the only caller already | ||
| /// has it, and the alternative (an extra `head(from)` round-trip) is work | ||
| /// the caller can avoid by passing what it already knows. | ||
| /// | ||
| /// `NotFound` errors on `from` propagate unchanged so callers can keep | ||
| /// existing `Err(NotFound { .. })` arms. | ||
| /// | ||
| /// This is a workaround for the missing `UploadPartCopy` primitive in the | ||
| /// upstream `object_store` crate. Once that lands, this helper can be | ||
| /// deleted and the call sites can go back to plain `store.copy()`. | ||
| async fn copy_size_aware( | ||
| store: &dyn OSObjectStore, | ||
| from: &Path, | ||
| to: &Path, | ||
| size: u64, | ||
| ) -> std::result::Result<(), ObjectStoreError> { | ||
| if size < MAX_SERVER_SIDE_COPY_BYTES { | ||
| store.copy(from, to).await | ||
| } else { | ||
| copy_via_read_rewrite(store, from, to).await | ||
| } | ||
| } | ||
|
|
||
| // NOTE: parts are uploaded sequentially. This could be parallelized (a | ||
| // bounded JoinSet, like lance-io/src/object_writer.rs's | ||
| // LANCE_UPLOAD_CONCURRENCY) or sidestepped entirely by switching to | ||
| // `object_store::WriteMultipart` (which also handles abort-on-drop). Left | ||
| // sequential here: this is a cold path (only >5 GiB manifests) and the | ||
| // helper is itself a stopgap until `object_store` exposes UploadPartCopy. | ||
| async fn copy_via_read_rewrite( | ||
| store: &dyn OSObjectStore, | ||
| from: &Path, | ||
| to: &Path, | ||
| ) -> std::result::Result<(), ObjectStoreError> { | ||
| // NotFound here propagates upward unchanged. | ||
| let mut stream = store.get(from).await?.into_stream(); | ||
|
|
||
| // From here on, errors must `abort()` the upload to avoid leaving an | ||
| // orphan multipart upload on stores that support them (e.g. S3, GCS), | ||
| // which would otherwise incur storage charges until the bucket's | ||
| // lifecycle policy cleans it up. | ||
| // | ||
| // Note: this does NOT cover task cancellation — `MultipartUpload`'s | ||
| // upstream Drop is documented as a no-op for S3/GCS. Callers that | ||
| // need cancellation cleanliness should run this with a guard or | ||
| // switch to `object_store::WriteMultipart` (planned follow-up). | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there actually a follow-up issue created? Can we specify the issue directly?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I reworded this from a "deferred to a follow-up" TODO into a plain inline NOTE rather than referencing a tracked issue. |
||
| let mut upload = store.put_multipart(to).await?; | ||
| let mut part_buf: Vec<u8> = Vec::with_capacity(COPY_REWRITE_PART_SIZE); | ||
|
|
||
| while let Some(chunk) = stream.next().await { | ||
| let chunk = match chunk { | ||
| Ok(b) => b, | ||
| Err(e) => { | ||
| let _ = upload.abort().await; | ||
| return Err(e); | ||
| } | ||
| }; | ||
| // Append the chunk in COPY_REWRITE_PART_SIZE-bounded slices so a | ||
| // single oversized chunk (e.g., LocalFileSystem returning a whole | ||
| // file) cannot push part_buf past the backend's per-part size limit | ||
| // (5 GiB on S3/GCS). COPY_REWRITE_PART_SIZE is well under every | ||
| // backend's cap, so each flushed part is always valid. | ||
| let mut offset = 0; | ||
| while offset < chunk.len() { | ||
| let want = COPY_REWRITE_PART_SIZE - part_buf.len(); | ||
| let take = want.min(chunk.len() - offset); | ||
| part_buf.extend_from_slice(&chunk[offset..offset + take]); | ||
| offset += take; | ||
|
|
||
| if part_buf.len() >= COPY_REWRITE_PART_SIZE { | ||
| let payload = | ||
| std::mem::replace(&mut part_buf, Vec::with_capacity(COPY_REWRITE_PART_SIZE)); | ||
| if let Err(e) = upload.put_part(Bytes::from(payload).into()).await { | ||
| let _ = upload.abort().await; | ||
| return Err(e); | ||
| } | ||
|
Comment on lines
+317
to
+320
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could maybe be optimized to allow for concurrent calls to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right — left sequential for now, covered by the reworded note just above. It can be parallelized with a bounded JoinSet (mirroring object_writer.rs's LANCE_UPLOAD_CONCURRENCY) or sidestepped by switching to object_store::WriteMultipart, which handles concurrency and abort-on-drop for free. Deferred since this only runs for >5 GiB manifests and the helper itself is interim until UploadPartCopy exists. |
||
| } | ||
| } | ||
| } | ||
|
|
||
| // Flush the final (possibly-short) part. The last part of a multipart | ||
| // upload is exempt from the per-part minimum on S3/GCS. | ||
| if !part_buf.is_empty() | ||
| && let Err(e) = upload.put_part(Bytes::from(part_buf).into()).await | ||
| { | ||
| let _ = upload.abort().await; | ||
| return Err(e); | ||
| } | ||
|
|
||
| if let Err(e) = upload.complete().await { | ||
| let _ = upload.abort().await; | ||
| return Err(e); | ||
| } | ||
| Ok(()) | ||
| } | ||
|
|
||
| /// External manifest commit handler | ||
| /// This handler is used to commit a manifest to an external store | ||
| /// for detailed design, see <https://github.com/lance-format/lance/issues/1183> | ||
|
|
@@ -245,14 +370,12 @@ impl ExternalManifestCommitHandler { | |
| // step 1: copy the manifest to the final location | ||
| let final_manifest_path = naming_scheme.manifest_path(base_path, version); | ||
|
|
||
| let copied = match store | ||
| .copy(staging_manifest_path, &final_manifest_path) | ||
| .await | ||
| { | ||
| Ok(_) => true, | ||
| Err(ObjectStoreError::NotFound { .. }) => false, // Another writer beat us to it. | ||
| Err(e) => return Err(e.into()), | ||
| }; | ||
| let copied = | ||
| match copy_size_aware(store, staging_manifest_path, &final_manifest_path, size).await { | ||
| Ok(_) => true, | ||
| Err(ObjectStoreError::NotFound { .. }) => false, // Another writer beat us to it. | ||
| Err(e) => return Err(e.into()), | ||
| }; | ||
| if copied { | ||
| info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = final_manifest_path.as_ref()); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this limit S3-specific or common across all object stores? Should we limit the slow path to S3?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not actually S3-specific — GCS's single-shot Objects.copy has the same ~5 GiB limit. So rather than gate the slow path to S3, I reframed the threshold as the most-conservative server-side-copy limit across backends and renamed the constant to MAX_SERVER_SIDE_COPY_BYTES.