Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions rust/lance-io/src/object_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ fn max_conn_reset_retries() -> u16 {
})
}

/// Maximum part size in GCS and S3: 5GB.
const MAX_UPLOAD_PART_SIZE: usize = 1024 * 1024 * 1024 * 5;
/// Maximum body size for a single S3 PUT: strictly less than 5 GiB.
/// AWS rejects single-PUT bodies of exactly 5 GiB (= 5 * 1024^3) with
/// `EntityTooLarge`, so we clamp `LANCE_INITIAL_UPLOAD_SIZE` one byte
/// below that threshold to keep the buffer-fills-to-clamp single-PUT
/// path safe. See lance#6750 for the related txn-file write fix.
const MAX_UPLOAD_PART_SIZE: usize = 1024 * 1024 * 1024 * 5 - 1;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this limit S3-specific or common across all object stores? Should we limit the slow path to S3?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not actually S3-specific — GCS's single-shot Objects.copy has the same ~5 GiB limit. So rather than gate the slow path to S3, I reframed the threshold as the most-conservative server-side-copy limit across backends and renamed the constant to MAX_SERVER_SIDE_COPY_BYTES.


/// Clamps a requested upload part size to the valid [5MB, 5GB] range.
/// Returns the clamped value and whether clamping was necessary.
Expand Down Expand Up @@ -898,4 +902,17 @@ mod tests {
(MAX_UPLOAD_PART_SIZE, true)
);
}

/// Regression for the foot-gun where `LANCE_INITIAL_UPLOAD_SIZE=5368709120`
/// (exactly 5 GiB, Pucheng's setting) caused a single-PUT of 5 GiB on
/// shutdown — which S3 rejects with `EntityTooLarge`. After tightening
/// `MAX_UPLOAD_PART_SIZE` to 5 GiB - 1, raw 5 GiB must clamp DOWN.
#[test]
fn clamp_initial_upload_size_at_5gib_clamps_down() {
let exactly_5_gib: usize = 5 * 1024 * 1024 * 1024;
assert_eq!(
clamp_initial_upload_size(exactly_5_gib),
(MAX_UPLOAD_PART_SIZE, true)
);
}
}
141 changes: 132 additions & 9 deletions rust/lance-table/src/io/commit/external_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use lance_core::utils::tracing::{
AUDIT_MODE_CREATE, AUDIT_MODE_DELETE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT,
};
Expand Down Expand Up @@ -123,7 +125,7 @@ pub trait ExternalManifestStore: std::fmt::Debug + Send + Sync {

// Step 2: Copy staging to final path
let final_path = naming_scheme.manifest_path(base_path, version);
let copied = match object_store.copy(staging_path, &final_path).await {
let copied = match copy_size_aware(object_store, staging_path, &final_path, size).await {
Ok(_) => true,
Err(ObjectStoreError::NotFound { .. }) => false,
Err(e) => return Err(e.into()),
Expand Down Expand Up @@ -213,6 +215,129 @@ pub(crate) fn detect_naming_scheme_from_path(path: &Path) -> Result<ManifestNami
})
}

/// The most conservative server-side-copy size limit across the object
/// stores we support. This is not S3-specific: S3's `CopyObject` and GCS's
/// single-shot `Objects: copy` both reject sources above ~5 GiB, so we use
/// 5 GiB as a backend-agnostic threshold. Above it we stream the source
/// through the client and re-upload via multipart instead of relying on a
/// server-side copy. Stores that have no such cap (e.g. local filesystem)
/// also take the fallback above this size — correctness is preserved; only
/// the rare >5 GiB copy is slower than a native copy would be.
const MAX_SERVER_SIDE_COPY_BYTES: u64 = 5 * 1024 * 1024 * 1024;

/// Part size for the read+rewrite fallback. Multipart-capable stores
/// (S3, GCS) require every part except the last to be ≥5 MB and allow up to
/// 10,000 parts. 100 MB sits comfortably inside both bounds and keeps the
/// part count low (~140 parts for a 14 GB manifest) without large per-part
/// RAM.
const COPY_REWRITE_PART_SIZE: usize = 100 * 1024 * 1024;

/// Copy `from` to `to`, falling back to a multipart-equivalent read+rewrite
/// when the source exceeds the server-side-copy size limit
/// (`MAX_SERVER_SIDE_COPY_BYTES`).
///
/// For sources below the limit, this is the same fast server-side
/// `store.copy()` as before. For larger sources, the source is streamed
/// through the client and re-uploaded as a multipart upload at `to`. This
/// doubles bytes-on-the-wire for the rare large case while preserving the
/// cheap fast path for the common small case.
///
/// `size` is the known source size. It is required: the only caller already
/// has it, and the alternative (an extra `head(from)` round-trip) is work
/// the caller can avoid by passing what it already knows.
///
/// `NotFound` errors on `from` propagate unchanged so callers can keep
/// existing `Err(NotFound { .. })` arms.
///
/// This is a workaround for the missing `UploadPartCopy` primitive in the
/// upstream `object_store` crate. Once that lands, this helper can be
/// deleted and the call sites can go back to plain `store.copy()`.
async fn copy_size_aware(
store: &dyn OSObjectStore,
from: &Path,
to: &Path,
size: u64,
) -> std::result::Result<(), ObjectStoreError> {
if size < MAX_SERVER_SIDE_COPY_BYTES {
store.copy(from, to).await
} else {
copy_via_read_rewrite(store, from, to).await
}
}

// NOTE: parts are uploaded sequentially. This could be parallelized (a
// bounded JoinSet, like lance-io/src/object_writer.rs's
// LANCE_UPLOAD_CONCURRENCY) or sidestepped entirely by switching to
// `object_store::WriteMultipart` (which also handles abort-on-drop). Left
// sequential here: this is a cold path (only >5 GiB manifests) and the
// helper is itself a stopgap until `object_store` exposes UploadPartCopy.
async fn copy_via_read_rewrite(
store: &dyn OSObjectStore,
from: &Path,
to: &Path,
) -> std::result::Result<(), ObjectStoreError> {
// NotFound here propagates upward unchanged.
let mut stream = store.get(from).await?.into_stream();

// From here on, errors must `abort()` the upload to avoid leaving an
// orphan multipart upload on stores that support them (e.g. S3, GCS),
// which would otherwise incur storage charges until the bucket's
// lifecycle policy cleans it up.
//
// Note: this does NOT cover task cancellation — `MultipartUpload`'s
// upstream Drop is documented as a no-op for S3/GCS. Callers that
// need cancellation cleanliness should run this with a guard or
// switch to `object_store::WriteMultipart` (planned follow-up).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there actually a follow-up issue created? Can we specify the issue directly?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reworded this from a "deferred to a follow-up" TODO into a plain inline NOTE rather than referencing a tracked issue.
It's a cold path (only triggers for >5 GiB manifests) and the whole read+rewrite helper is a stopgap until object_store exposes UploadPartCopy, so I didn't think a standalone tracking issue was warranted — a self-contained note explaining the tradeoff seemed more useful than an issue link that'd likely sit untouched.
Happy to file one if you'd prefer it tracked.

let mut upload = store.put_multipart(to).await?;
let mut part_buf: Vec<u8> = Vec::with_capacity(COPY_REWRITE_PART_SIZE);

while let Some(chunk) = stream.next().await {
let chunk = match chunk {
Ok(b) => b,
Err(e) => {
let _ = upload.abort().await;
return Err(e);
}
};
// Append the chunk in COPY_REWRITE_PART_SIZE-bounded slices so a
// single oversized chunk (e.g., LocalFileSystem returning a whole
// file) cannot push part_buf past the backend's per-part size limit
// (5 GiB on S3/GCS). COPY_REWRITE_PART_SIZE is well under every
// backend's cap, so each flushed part is always valid.
let mut offset = 0;
while offset < chunk.len() {
let want = COPY_REWRITE_PART_SIZE - part_buf.len();
let take = want.min(chunk.len() - offset);
part_buf.extend_from_slice(&chunk[offset..offset + take]);
offset += take;

if part_buf.len() >= COPY_REWRITE_PART_SIZE {
let payload =
std::mem::replace(&mut part_buf, Vec::with_capacity(COPY_REWRITE_PART_SIZE));
if let Err(e) = upload.put_part(Bytes::from(payload).into()).await {
let _ = upload.abort().await;
return Err(e);
}
Comment on lines +317 to +320

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could maybe be optimized to allow for concurrent calls to put_part but maybe that will be addressed by the previously mentioned follow-up?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right — left sequential for now, covered by the reworded note just above. It can be parallelized with a bounded JoinSet (mirroring object_writer.rs's LANCE_UPLOAD_CONCURRENCY) or sidestepped by switching to object_store::WriteMultipart, which handles concurrency and abort-on-drop for free. Deferred since this only runs for >5 GiB manifests and the helper itself is interim until UploadPartCopy exists.

}
}
}

// Flush the final (possibly-short) part. The last part of a multipart
// upload is exempt from the per-part minimum on S3/GCS.
if !part_buf.is_empty()
&& let Err(e) = upload.put_part(Bytes::from(part_buf).into()).await
{
let _ = upload.abort().await;
return Err(e);
}

if let Err(e) = upload.complete().await {
let _ = upload.abort().await;
return Err(e);
}
Ok(())
}

/// External manifest commit handler
/// This handler is used to commit a manifest to an external store
/// for detailed design, see <https://github.com/lance-format/lance/issues/1183>
Expand Down Expand Up @@ -245,14 +370,12 @@ impl ExternalManifestCommitHandler {
// step 1: copy the manifest to the final location
let final_manifest_path = naming_scheme.manifest_path(base_path, version);

let copied = match store
.copy(staging_manifest_path, &final_manifest_path)
.await
{
Ok(_) => true,
Err(ObjectStoreError::NotFound { .. }) => false, // Another writer beat us to it.
Err(e) => return Err(e.into()),
};
let copied =
match copy_size_aware(store, staging_manifest_path, &final_manifest_path, size).await {
Ok(_) => true,
Err(ObjectStoreError::NotFound { .. }) => false, // Another writer beat us to it.
Err(e) => return Err(e.into()),
};
if copied {
info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = final_manifest_path.as_ref());
}
Expand Down
Loading
Loading