Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 31 additions & 26 deletions py_hamt/hamt_to_sharded_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@
import asyncio
import time

import xarray as xr
from multiformats import CID

from .hamt import HAMT
from .sharded_zarr_store import ShardedZarrStore
from .store_httpx import KuboCAS
from .zarr_hamt_store import ZarrHAMTStore
from .sharded_zarr_store import SHARDED_ZARR_V2, ShardedZarrStore
from .store_httpx import ContentAddressedStore, KuboCAS


def _is_zarr_chunk_key(key: str) -> bool:
if key.endswith(("zarr.json", ".zarray", ".zattrs", ".zgroup")):
return False
return key.startswith("c/") or "/c/" in key


async def convert_hamt_to_sharded(
cas: KuboCAS, hamt_root_cid: str, chunks_per_shard: int
cas: ContentAddressedStore, hamt_root_cid: str, chunks_per_shard: int
Comment on lines 18 to +19

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Narrow the CAS contract back to CID-backed implementations.

ContentAddressedStore allows non-CID immutable IDs, but this code immediately enforces CID and base32-encodes pointers before handing them to ShardedZarrStore, which also decodes them as CIDs. That means a valid non-CID ContentAddressedStore now type-checks at the boundary and then fails on the first migrated key. Either narrow the annotation/docstring again or reject non-CID-backed stores up front.

Also applies to: 61-76

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@py_hamt/hamt_to_sharded_converter.py` around lines 18 - 19, The
convert_hamt_to_sharded entrypoint currently advertises a generic
ContentAddressedStore even though the implementation only works with CID-backed
stores. Update convert_hamt_to_sharded to either narrow the cas annotation/docs
to the CID-backed store type or add an upfront validation that rejects non-CID
immutable IDs before any sharded migration work begins. Make sure the pointer
encoding/decoding path in convert_hamt_to_sharded and the ShardedZarrStore
handoff stays consistent with CID assumptions.

) -> str:
"""
Converts a Zarr dataset from a HAMT-based store to a ShardedZarrStore.
Expand All @@ -32,43 +36,44 @@ async def convert_hamt_to_sharded(
hamt_ro = await HAMT.build(
cas=cas, root_node_id=hamt_root_cid, values_are_bytes=True, read_only=True
)
source_store = ZarrHAMTStore(hamt_ro, read_only=True)
source_dataset = xr.open_zarr(store=source_store, consolidated=True)
# 2. Introspect the source array to get its configuration
print("Reading metadata from source store...")

# Read the stores metadata to get array shape and chunk shape
data_var_name = next(iter(source_dataset.data_vars))
ordered_dims = list(source_dataset[data_var_name].dims)
array_shape_tuple = tuple(source_dataset.sizes[dim] for dim in ordered_dims)
chunk_shape_tuple = tuple(source_dataset.chunks[dim][0] for dim in ordered_dims)
array_shape = array_shape_tuple
chunk_shape = chunk_shape_tuple

# 3. Create the destination ShardedZarrStore for writing

# 2. Create the destination ShardedZarrStore for writing.
print(
f"Initializing new ShardedZarrStore with {chunks_per_shard} chunks per shard..."
f"Initializing new ShardedZarrStore v2 with {chunks_per_shard} chunks per shard..."
)
dest_store = await ShardedZarrStore.open(
cas=cas,
read_only=False,
array_shape=array_shape,
chunk_shape=chunk_shape,
chunks_per_shard=chunks_per_shard,
manifest_version=SHARDED_ZARR_V2,
)

print("Destination store initialized.")

# 4. Iterate and copy all data from source to destination
# 3. Copy metadata first so each chunked array path registers its own shard
# index before chunk pointers are inserted.
print("Starting data migration...")
count = 0
async for key in hamt_ro.keys():
if _is_zarr_chunk_key(key):
continue
count += 1
# Read the raw data (metadata or chunk) from the source
cid: CID = await hamt_ro.get_pointer(key)
cid = await hamt_ro.get_pointer(key)
if not isinstance(cid, CID): # pragma: no cover
raise TypeError(f"Expected CID pointer for key {key!r}.")
cid_base32_str = str(cid.encode("base32"))
await dest_store.set_pointer(key, cid_base32_str)
if count % 200 == 0: # pragma: no cover
print(f"Migrated {count} keys...") # pragma: no cover

# Write the exact same key-value pair to the destination.
async for key in hamt_ro.keys():
if not _is_zarr_chunk_key(key):
continue
count += 1
cid = await hamt_ro.get_pointer(key)
if not isinstance(cid, CID): # pragma: no cover
raise TypeError(f"Expected CID pointer for key {key!r}.")
cid_base32_str = str(cid.encode("base32"))
await dest_store.set_pointer(key, cid_base32_str)
if count % 200 == 0: # pragma: no cover
print(f"Migrated {count} keys...") # pragma: no cover
Expand Down
Loading
Loading