Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions virtualizarr/manifests/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,7 @@ def __eq__( # type: ignore[override]
)

# do chunk-wise comparison
equal_chunk_paths = self.manifest._paths == other.manifest._paths
equal_chunk_offsets = self.manifest._offsets == other.manifest._offsets
equal_chunk_lengths = self.manifest._lengths == other.manifest._lengths

equal_chunks = (
equal_chunk_paths & equal_chunk_offsets & equal_chunk_lengths
)
equal_chunks = self.manifest.elementwise_eq(other.manifest)

if not equal_chunks.all():
# TODO expand chunk-wise comparison into an element-wise result instead of just returning all False
Expand Down
115 changes: 52 additions & 63 deletions virtualizarr/manifests/array_api.py
Comment thread
maxrjones marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -97,28 +97,8 @@ def concatenate(
new_shape[axis] = new_length_along_concat_axis

# do concatenation of entries in manifest
concatenated_paths = (
cast( # `np.concatenate` is type hinted as if the output could have Any dtype
np.ndarray[Any, np.dtypes.StringDType],
np.concatenate(
[arr.manifest._paths for arr in arrays],
axis=axis,
),
)
)
concatenated_offsets = np.concatenate(
[arr.manifest._offsets for arr in arrays],
axis=axis,
)
concatenated_lengths = np.concatenate(
[arr.manifest._lengths for arr in arrays],
axis=axis,
)
concatenated_manifest = ChunkManifest.from_arrays(
paths=concatenated_paths,
offsets=concatenated_offsets,
lengths=concatenated_lengths,
validate_paths=False,
concatenated_manifest = _concat_manifests(
[arr.manifest for arr in arrays], axis=axis
)

new_metadata = copy_and_replace_metadata(
Expand Down Expand Up @@ -165,27 +145,7 @@ def stack(
new_shape.insert(axis, length_along_new_stacked_axis)

# do stacking of entries in manifest
stacked_paths = cast( # `np.stack` apparently is type hinted as if the output could have Any dtype
np.ndarray[Any, np.dtypes.StringDType],
np.stack(
[arr.manifest._paths for arr in arrays],
axis=axis,
),
)
stacked_offsets = np.stack(
[arr.manifest._offsets for arr in arrays],
axis=axis,
)
stacked_lengths = np.stack(
[arr.manifest._lengths for arr in arrays],
axis=axis,
)
stacked_manifest = ChunkManifest.from_arrays(
paths=stacked_paths,
offsets=stacked_offsets,
lengths=stacked_lengths,
validate_paths=False,
)
stacked_manifest = _stack_manifests([arr.manifest for arr in arrays], axis=axis)

# chunk shape has changed because a length-1 axis has been inserted
old_chunks = first_arr.chunks
Expand Down Expand Up @@ -235,37 +195,66 @@ def broadcast_to(x: "ManifestArray", /, shape: tuple[int, ...]) -> "ManifestArra
new_chunk_grid_shape = determine_chunk_grid_shape(new_shape, new_chunk_shape)

# do broadcasting of entries in manifest
broadcasted_paths = cast( # `np.broadcast_to` apparently is type hinted as if the output could have Any dtype
broadcasted_manifest = _broadcast_manifest(x.manifest, shape=new_chunk_grid_shape)

new_metadata = copy_and_replace_metadata(
old_metadata=x.metadata,
new_shape=list(new_shape),
new_chunks=list(new_chunk_shape),
)

return ManifestArray(chunkmanifest=broadcasted_manifest, metadata=new_metadata)


def _concat_manifests(manifests: list[ChunkManifest], axis: int) -> ChunkManifest:
"""Concatenate manifests along an existing axis."""
concatenated_paths = cast(
np.ndarray[Any, np.dtypes.StringDType],
np.broadcast_to(
x.manifest._paths,
shape=new_chunk_grid_shape,
),
np.concatenate([m._paths for m in manifests], axis=axis),
)
concatenated_offsets = np.concatenate([m._offsets for m in manifests], axis=axis)
concatenated_lengths = np.concatenate([m._lengths for m in manifests], axis=axis)
return ChunkManifest.from_arrays(
paths=concatenated_paths,
offsets=concatenated_offsets,
lengths=concatenated_lengths,
validate_paths=False,
)

broadcasted_offsets = np.broadcast_to(
x.manifest._offsets,
shape=new_chunk_grid_shape,

def _stack_manifests(manifests: list[ChunkManifest], axis: int) -> ChunkManifest:
"""Stack manifests along a new axis."""
stacked_paths = cast(
np.ndarray[Any, np.dtypes.StringDType],
np.stack([m._paths for m in manifests], axis=axis),
)
broadcasted_lengths = np.broadcast_to(
x.manifest._lengths,
shape=new_chunk_grid_shape,
stacked_offsets = np.stack([m._offsets for m in manifests], axis=axis)
stacked_lengths = np.stack([m._lengths for m in manifests], axis=axis)
return ChunkManifest.from_arrays(
paths=stacked_paths,
offsets=stacked_offsets,
lengths=stacked_lengths,
validate_paths=False,
)
broadcasted_manifest = ChunkManifest.from_arrays(


def _broadcast_manifest(
manifest: ChunkManifest, shape: tuple[int, ...]
) -> ChunkManifest:
"""Broadcast manifest to a new chunk grid shape."""
broadcasted_paths = cast(
np.ndarray[Any, np.dtypes.StringDType],
np.broadcast_to(manifest._paths, shape=shape),
)
broadcasted_offsets = np.broadcast_to(manifest._offsets, shape=shape)
broadcasted_lengths = np.broadcast_to(manifest._lengths, shape=shape)
return ChunkManifest.from_arrays(
paths=broadcasted_paths,
offsets=broadcasted_offsets,
lengths=broadcasted_lengths,
validate_paths=False,
)

new_metadata = copy_and_replace_metadata(
old_metadata=x.metadata,
new_shape=list(new_shape),
new_chunks=list(new_chunk_shape),
)

return ManifestArray(chunkmanifest=broadcasted_manifest, metadata=new_metadata)


def _prepend_singleton_dimensions(shape: tuple[int, ...], ndim: int) -> tuple[int, ...]:
"""Prepend as many new length-1 axes to shape as necessary such that the result has ndim number of axes."""
Expand Down
41 changes: 41 additions & 0 deletions virtualizarr/manifests/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,47 @@ def __eq__(self, other: Any) -> bool:
lengths_equal = (self._lengths == other._lengths).all()
return paths_equal and offsets_equal and lengths_equal

def get_entry(self, indices: tuple[int, ...]) -> ChunkEntry | None:
"""Look up a chunk entry by grid indices. Returns None for missing chunks (empty path)."""
path = self._paths[indices]
if path == "":
return None
offset = self._offsets[indices]
length = self._lengths[indices]
return ChunkEntry(path=path, offset=offset, length=length)

def elementwise_eq(self, other: "ChunkManifest") -> np.ndarray:
"""Return boolean array where True means that chunk entry matches."""
return (
(self._paths == other._paths)
& (self._offsets == other._offsets)
& (self._lengths == other._lengths)
)
Comment on lines +447 to +453
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will have to be updated in #938, to compare the values of inlined chunks.


def iter_nonempty_paths(self) -> Iterator[str]:
"""Yield all non-empty paths in the manifest."""
for path in self._paths.flat:
if path:
yield path

def iter_refs(self) -> Iterator[tuple[tuple[int, ...], ChunkEntry]]:
"""Yield (grid_indices, chunk_entry) for every non-missing chunk."""
coord_vectors = np.mgrid[
tuple(slice(None, length) for length in self.shape_chunk_grid)
]
for *inds, path, offset, length in np.nditer(
[*coord_vectors, self._paths, self._offsets, self._lengths],
flags=("refs_ok",),
):
if path.item() != "":
idx = tuple(int(i) for i in inds)
yield (
idx,
ChunkEntry(
path=path.item(), offset=offset.item(), length=length.item()
),
)

def rename_paths(
self,
new: str | Callable[[str], str],
Expand Down
9 changes: 5 additions & 4 deletions virtualizarr/manifests/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,12 @@ async def get(
)
chunk_indexes = parse_manifest_index(key, separator, expand_pattern=True)

path = manifest._paths[chunk_indexes]
if path == "":
entry = manifest.get_entry(chunk_indexes)
if entry is None:
return None
offset = manifest._offsets[chunk_indexes]
length = manifest._lengths[chunk_indexes]
path = entry["path"]
offset = entry["offset"]
length = entry["length"]
# Get the configured object store instance that matches the path
store, path_after_prefix = self._registry.resolve(path)
if not store:
Expand Down
28 changes: 7 additions & 21 deletions virtualizarr/writers/icechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Iterable, List, Literal, Optional, Union, cast

import numpy as np
import xarray as xr
from xarray.backends.zarr import ZarrStore as XarrayZarrStore
from xarray.backends.zarr import encode_zarr_attr_value
Expand Down Expand Up @@ -272,9 +271,8 @@ def validate_virtual_chunk_containers(
for marr in manifestarrays:
# TODO this loop over every virtual reference is likely inefficient in python,
# is there a way to push this down to Icechunk? (see https://github.com/earth-mover/icechunk/issues/1167)
for ref in marr.manifest._paths.flat:
if ref:
validate_single_ref(ref, supported_prefixes)
for ref in marr.manifest.iter_nonempty_paths():
validate_single_ref(ref, supported_prefixes)


def validate_single_ref(ref: str, supported_prefixes: set[str]) -> None:
Expand Down Expand Up @@ -562,16 +560,6 @@ def write_manifest_virtual_refs(
# but Icechunk need to expose a suitable API first
# See https://github.com/earth-mover/icechunk/issues/401 for performance benchmark

it = np.nditer(
[manifest._paths, manifest._offsets, manifest._lengths], # type: ignore[arg-type]
flags=[
"refs_ok",
"multi_index",
"c_index",
],
op_flags=[["readonly"]] * 3, # type: ignore
)

if last_updated_at is None:
# Icechunk rounds timestamps to the nearest second, but filesystems have higher precision,
# so we need to add a buffer, so that if you immediately read data back from this icechunk store,
Expand All @@ -583,16 +571,14 @@ def write_manifest_virtual_refs(
virtual_chunk_spec_list = [
VirtualChunkSpec(
index=[
index + offset
for index, offset in zip(it.multi_index, chunk_index_offsets)
index + offset for index, offset in zip(grid_index, chunk_index_offsets)
],
location=path.item(),
offset=offset.item(),
length=length.item(),
location=entry["path"],
offset=entry["offset"],
length=entry["length"],
last_updated_at_checksum=last_updated_at,
)
for path, offset, length in it
if path
for grid_index, entry in manifest.iter_refs()
]

store.set_virtual_refs(
Expand Down
Loading