Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 99 additions & 27 deletions py_hamt/hamt_to_sharded_converter.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,84 @@
import argparse
import asyncio
import time
from collections.abc import Mapping

import xarray as xr
from multiformats import CID

from .hamt import HAMT
from .sharded_zarr_store import ShardedZarrStore
from .store_httpx import KuboCAS
from .zarr_hamt_store import ZarrHAMTStore
from .sharded_zarr_store import SHARDED_ZARR_V2, ArrayIndex, ShardedZarrStore
from .store_httpx import ContentAddressedStore, KuboCAS

ZARR_METADATA_SUFFIXES = ("zarr.json", ".zarray", ".zattrs", ".zgroup", ".zmetadata")


def _is_zarr_metadata_key(key: str) -> bool:
return key.endswith(ZARR_METADATA_SUFFIXES)


def _classic_dotted_chunk_key_to_v3(key: str) -> str | None:
array_path, _, coord_part = key.rpartition("/")
if "." not in coord_part:
return None

parts = coord_part.split(".")
if not parts or not all(part.isdecimal() for part in parts):
return None
return ShardedZarrStore._format_chunk_key(
array_path, tuple(int(part) for part in parts)
)


def _classic_slash_chunk_key_to_v3(
key: str, array_indices: Mapping[str, ArrayIndex]
) -> str | None:
for array_path, array_index in sorted(
array_indices.items(), key=lambda item: len(item[0]), reverse=True
):
prefix = f"{array_path}/" if array_path else ""
if prefix:
if not key.startswith(prefix):
continue
coord_part = key[len(prefix) :]
else:
coord_part = key

parts = coord_part.split("/")
if len(parts) != len(array_index.chunks_per_dim):
continue
if all(part.isdecimal() for part in parts):
return ShardedZarrStore._format_chunk_key(
array_path, tuple(int(part) for part in parts)
)
return None


def _normalize_zarr_chunk_key(
key: str, array_indices: Mapping[str, ArrayIndex] | None = None
) -> str | None:
if _is_zarr_metadata_key(key):
return None

classic_key = _classic_dotted_chunk_key_to_v3(key)
if classic_key is not None:
return classic_key

if array_indices is not None:
classic_key = _classic_slash_chunk_key_to_v3(key, array_indices)
if classic_key is not None:
return classic_key

if key.startswith("c/") or "/c/" in key:
return key
return None


def _is_zarr_chunk_key(key: str) -> bool:
return _normalize_zarr_chunk_key(key) is not None


async def convert_hamt_to_sharded(
cas: KuboCAS, hamt_root_cid: str, chunks_per_shard: int
cas: ContentAddressedStore, hamt_root_cid: str, chunks_per_shard: int
) -> str:
"""
Converts a Zarr dataset from a HAMT-based store to a ShardedZarrStore.
Expand All @@ -32,47 +98,53 @@ async def convert_hamt_to_sharded(
hamt_ro = await HAMT.build(
cas=cas, root_node_id=hamt_root_cid, values_are_bytes=True, read_only=True
)
source_store = ZarrHAMTStore(hamt_ro, read_only=True)
source_dataset = xr.open_zarr(store=source_store, consolidated=True)
# 2. Introspect the source array to get its configuration
print("Reading metadata from source store...")

# Read the stores metadata to get array shape and chunk shape
data_var_name = next(iter(source_dataset.data_vars))
ordered_dims = list(source_dataset[data_var_name].dims)
array_shape_tuple = tuple(source_dataset.sizes[dim] for dim in ordered_dims)
chunk_shape_tuple = tuple(source_dataset.chunks[dim][0] for dim in ordered_dims)
array_shape = array_shape_tuple
chunk_shape = chunk_shape_tuple

# 3. Create the destination ShardedZarrStore for writing

# 2. Create the destination ShardedZarrStore for writing.
print(
f"Initializing new ShardedZarrStore with {chunks_per_shard} chunks per shard..."
f"Initializing new ShardedZarrStore v2 with {chunks_per_shard} chunks per shard..."
)
dest_store = await ShardedZarrStore.open(
cas=cas,
read_only=False,
array_shape=array_shape,
chunk_shape=chunk_shape,
chunks_per_shard=chunks_per_shard,
manifest_version=SHARDED_ZARR_V2,
)

print("Destination store initialized.")

# 4. Iterate and copy all data from source to destination
# 3. Copy metadata first so each chunked array path registers its own shard
# index before chunk pointers are inserted.
print("Starting data migration...")
count = 0
async for key in hamt_ro.keys():
if not _is_zarr_metadata_key(key):
continue
count += 1
# Read the raw data (metadata or chunk) from the source
cid: CID = await hamt_ro.get_pointer(key)
cid = await hamt_ro.get_pointer(key)
if not isinstance(cid, CID): # pragma: no cover
raise TypeError(f"Expected CID pointer for key {key!r}.")
cid_base32_str = str(cid.encode("base32"))

# Write the exact same key-value pair to the destination.
await dest_store.set_pointer(key, cid_base32_str)
if count % 200 == 0: # pragma: no cover
print(f"Migrated {count} keys...") # pragma: no cover

async for key in hamt_ro.keys():
chunk_key = _normalize_zarr_chunk_key(key, dest_store.array_indices)
if chunk_key is None:
if _is_zarr_metadata_key(key):
continue
raise ValueError(
f"Cannot classify Zarr key {key!r} as metadata or chunk during conversion."
)
count += 1
cid = await hamt_ro.get_pointer(key)
if not isinstance(cid, CID): # pragma: no cover
raise TypeError(f"Expected CID pointer for key {key!r}.")
cid_base32_str = str(cid.encode("base32"))
await dest_store.set_pointer(chunk_key, cid_base32_str)
if count % 200 == 0: # pragma: no cover
print(f"Migrated {count} keys...") # pragma: no cover

print(f"Migration of {count} total keys complete.")

# 5. Finalize the new store by flushing it to the CAS
Expand Down
Loading
Loading