Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion python/python/lance/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
_BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY = (
b"lance-encoding:blob-dedicated-size-threshold"
)
_BLOB_PACK_FILE_SIZE_THRESHOLD_META_KEY = (
b"lance-encoding:blob-pack-file-size-threshold"
)
_MAX_RUST_USIZE = ctypes.c_size_t(-1).value


Expand Down Expand Up @@ -217,6 +220,7 @@ def blob_field(
nullable: bool = True,
inline_size_threshold: Optional[int] = None,
dedicated_size_threshold: Optional[int] = None,
pack_file_size_threshold: Optional[int] = None,
) -> pa.Field:
"""
Construct an Arrow field for a Lance blob column.
Expand All @@ -234,14 +238,24 @@ def blob_field(
Maximum payload size in bytes to store in packed blob storage before
using dedicated blob storage. This threshold is checked before
``inline_size_threshold``.
pack_file_size_threshold : optional, int
Maximum size in bytes of a single packed blob sidecar (``.pack``) file.
Once a sidecar reaches this size a new one is started.
"""
_validate_threshold("inline_size_threshold", inline_size_threshold, allow_zero=True)
_validate_threshold(
"dedicated_size_threshold", dedicated_size_threshold, allow_zero=False
)
_validate_threshold(
"pack_file_size_threshold", pack_file_size_threshold, allow_zero=False
)

field = pa.field(name, BlobType(), nullable=nullable)
if inline_size_threshold is None and dedicated_size_threshold is None:
if (
inline_size_threshold is None
and dedicated_size_threshold is None
and pack_file_size_threshold is None
):
return field

metadata = dict(field.metadata or {})
Expand All @@ -253,6 +267,10 @@ def blob_field(
metadata[_BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY] = str(
dedicated_size_threshold
).encode()
if pack_file_size_threshold is not None:
metadata[_BLOB_PACK_FILE_SIZE_THRESHOLD_META_KEY] = str(
pack_file_size_threshold
).encode()
return field.with_metadata(metadata)


Expand Down
73 changes: 73 additions & 0 deletions python/python/tests/test_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sys
import tarfile
import textwrap
from pathlib import Path

import lance
import pandas as pd
Expand Down Expand Up @@ -588,10 +589,14 @@ def test_blob_field_threshold_metadata():
"blob",
inline_size_threshold=16 * 1024,
dedicated_size_threshold=2 * 1024 * 1024,
pack_file_size_threshold=512 * 1024 * 1024,
)

assert field.metadata[b"lance-encoding:blob-inline-size-threshold"] == b"16384"
assert field.metadata[b"lance-encoding:blob-dedicated-size-threshold"] == b"2097152"
assert (
field.metadata[b"lance-encoding:blob-pack-file-size-threshold"] == b"536870912"
)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -645,6 +650,30 @@ def test_blob_field_threshold_metadata():
"dedicated_size_threshold must fit in a Rust usize",
id="overflow_dedicated",
),
pytest.param(
{"pack_file_size_threshold": 0},
ValueError,
"pack_file_size_threshold must be positive",
id="zero_pack_file",
),
pytest.param(
{"pack_file_size_threshold": -1},
ValueError,
"pack_file_size_threshold must be positive",
id="negative_pack_file",
),
pytest.param(
{"pack_file_size_threshold": True},
TypeError,
"pack_file_size_threshold must be an int",
id="bool_pack_file",
),
pytest.param(
{"pack_file_size_threshold": 2**100},
OverflowError,
"pack_file_size_threshold must fit in a Rust usize",
id="overflow_pack_file",
),
],
)
def test_blob_field_rejects_invalid_thresholds(kwargs, error, message):
Expand Down Expand Up @@ -715,6 +744,50 @@ def test_blob_extension_append_rejects_explicit_threshold_mismatch(tmp_path):
lance.write_dataset(append, dataset_path, mode="append")


def test_blob_extension_pack_file_threshold_metadata_persists_after_reopen(
tmp_path: Path,
):
dataset_path = tmp_path / "test_ds_v2_pack_file_threshold_persists"
threshold = 512 * 1024 * 1024
schema = pa.schema([lance.blob_field("blob", pack_file_size_threshold=threshold)])
table = pa.table({"blob": lance.blob_array([b"x"])}, schema=schema)

lance.write_dataset(table, dataset_path, data_storage_version="2.2")
reopened = lance.dataset(dataset_path)

assert (
reopened.schema.field("blob").metadata[
b"lance-encoding:blob-pack-file-size-threshold"
]
== str(threshold).encode()
)


def test_blob_extension_append_rejects_pack_file_threshold_mismatch(tmp_path: Path):
dataset_path = tmp_path / "test_ds_v2_append_pack_file_mismatch"
initial_schema = pa.schema(
[lance.blob_field("blob", pack_file_size_threshold=512 * 1024 * 1024)]
)
initial = pa.table(
{"blob": lance.blob_array([b"x" * 2048])},
schema=initial_schema,
)
lance.write_dataset(initial, dataset_path, data_storage_version="2.2")

append_schema = pa.schema(
[lance.blob_field("blob", pack_file_size_threshold=256 * 1024 * 1024)]
)
append = pa.table(
{"blob": lance.blob_array([b"x" * 2048])},
schema=append_schema,
)

with pytest.raises(
OSError, match="Cannot append data with blob threshold metadata"
):
lance.write_dataset(append, dataset_path, mode="append")


def test_blob_extension_dedicated_threshold_precedes_inline_threshold(tmp_path):
payload = b"x" * 2048
schema = pa.schema(
Expand Down
3 changes: 3 additions & 0 deletions rust/lance-arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ pub const BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY: &str =
"lance-encoding:blob-dedicated-size-threshold";
/// Metadata key for overriding the inline blob size threshold (in bytes)
pub const BLOB_INLINE_SIZE_THRESHOLD_META_KEY: &str = "lance-encoding:blob-inline-size-threshold";
/// Metadata key for overriding the maximum size (in bytes) of a packed blob sidecar file
pub const BLOB_PACK_FILE_SIZE_THRESHOLD_META_KEY: &str =
"lance-encoding:blob-pack-file-size-threshold";

type Result<T> = std::result::Result<T, ArrowError>;

Expand Down
Loading
Loading