You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Unfortunately we have to iterate over every chunk in the store (see #850 (comment)), to find out:
If it was initialized (for both Icechunk and Kerchunk),
The object's size (for Icechunk).
This exposes several problems:
Just storing all this information in memory at once becomes potentially problematic with many many millions of refs.
Icechunk currently has a maximum number of references you can write in a single commit - 50 million.
Python is a bad language for this sort of performance-sensitive iteration (and our current ZarrParser implementation is pure python all the way up until we instantiate the resulting ChunkManifest objects).
VirtualiZarr's parser abstraction was designed for parsing a single file at a time, which are never anywhere near this big for any other format (as the max object size most object stores support is 1TB, and the zarr stores listed above are many PBs each).
Desired performance
We want the peak memory usage to be bounded to some (configurable) constant, and we want the run time to be at worst O(n_chunks_per_array) * O(n_arrays).
We also want to minimize the number of API calls to the object store, so we clearly should be LISTing the chunks. (Listing won't work for HTTP stores - let's worry about that another time.)
Idea: Batched arrow streams
This problem calls for batching streams.
Luckily, we're already using obstore, which provides python access to efficient paginated object store list operations in rust that return results as streams of PyArrow RecordBatches (!!!).
In #892 we are working on rewriting the ZarrParser to use PyArrow and obstore's list function, and in #861 we're working on ensuring that setting millions of refs at once with Icechunk doesn't cause excessive memory usage.
But we still need to split that arrow stream up somehow (both to bound memory usage and due to maximum commit size) and commit between each batch.
Quick and dirty way
If we just wanted to get a single native zarr store into Icechunk with no modifications we could write a rust program which:
Set each ref directly using icechunk's API in a loop,
Every BATCH_SIZE elements, we stop, commit, then continue consuming the stream,
Repeat for all the other arrays in the store.
This should work, but I don't love how it:
Commits chunks in a unintuitive order (the order they are listed in, which will be ordered by leading dimension of the chunk grid first),
Doesn't align commits with regions of the array, as for example the first commit could include all chunks for row 1 and only some of the commits for row 2,
Is therefore a bit inconvenient to restart if the ingestion process dies,
Commits one array at a time, as opposed to committing logically related parts of arrays together,
Doesn't have any facility to appending to an existing Zarr store, so we would have to re-implement that logic (which could be hard because of the out-of-order thing), or be restricted to ingesting only existing stores exactly as-is.
General way
Alternatively, we could try to make an appending workflow like this work in VirtualiZarr:
fromvirtualizarr.parsersimportZarrParserzarr_store=zarr.open(store_url)
# set up object store registry blah blah not important heretime_slices=<slicesalongsomedim, e.g. alongtime>fortime_sliceintime_slices:
# create a vds of only a subset of all zarr arraysregion_parser=ZarrParser(region=time_slice)
vds=vz.open_virtual_dataset(zarr_store, region_parser)
# append and commit that subsetsession=ic_repo.writable_session("main")
vds.vz.to_icechunk(session.store, append_dim="time")
session.commit(f"wrote chunks for time_slice={time_slice}")
I don't love having to add a region kwarg to the parser, but I do love how this:
Commits only a well-defined subset of refs at a time,
Allows concatenating multiple Zarr stores together along any dimension, which is a common use case,
Matches what a user might intuitively expect should work,
Echoes patterns we recommend in other contexts (e.g. ingesting data from operational pipelines),
Is convenient to restart from if the process fails between commits,
Doesn't force the user to know about the arrow streaming tricks being used under the hood,
Should work for writing to Kerchunk (Parquet) too.
The main difficulty with implementing this is: how do we efficiently list only chunks from a specific region of the store?
One naive way is to list all chunks every time we parse then filter the returned arrays for only the chunks in the specified region. But even paginating the listing, this would be O(n_chunks_per_array^2) in run time.
Instead I think we can use the prefix and offset args to list_with_offset (exposed in obstore.list) together to list only specified regions of the chunk grid.
We can control the
axis of the chunk grid we want to iterate over by varying the prefix,
index along that axis we want to start listing at using offset,
index we want to stop listing at by simply ceasing consuming the stream after the expected number of elements (or after the final expected element is encountered).
For example imagine we have a store with chunk keys c/0.0.0, c/0.0.1, c/0.1.0, & c/0.1.1. Passing prefix="{base_prefix}/c/0.", offset="{base_prefix}/c/0.1" (and only streaming 2 elements), we should get back c/0.1.0 & c/0.1.1, but never list c/0.0.0, and c/0.0.1.
This only allows us to split into regions along one dimension, but that should be enough.
Ingesting native zarr stores of arbitrary size is not trivial.
There are some really big ones out there, such as
Why this is hard
Unfortunately we have to iterate over every chunk in the store (see #850 (comment)), to find out:
This exposes several problems:
ZarrParserimplementation is pure python all the way up until we instantiate the resultingChunkManifestobjects).parserabstraction was designed for parsing a single file at a time, which are never anywhere near this big for any other format (as the max object size most object stores support is 1TB, and the zarr stores listed above are many PBs each).Desired performance
We want the peak memory usage to be bounded to some (configurable) constant, and we want the run time to be at worst
O(n_chunks_per_array)*O(n_arrays).We also want to minimize the number of API calls to the object store, so we clearly should be LISTing the chunks. (Listing won't work for HTTP stores - let's worry about that another time.)
Idea: Batched arrow streams
This problem calls for batching streams.
Luckily, we're already using
obstore, which provides python access to efficient paginated object storelistoperations in rust that return results as streams of PyArrow RecordBatches (!!!).In #892 we are working on rewriting the
ZarrParserto use PyArrow and obstore'slistfunction, and in #861 we're working on ensuring that setting millions of refs at once with Icechunk doesn't cause excessive memory usage.But we still need to split that arrow stream up somehow (both to bound memory usage and due to maximum commit size) and commit between each batch.
Quick and dirty way
If we just wanted to get a single native zarr store into Icechunk with no modifications we could write a rust program which:
arrow_rs_object_store'slistmethod to list all chunks of an array as a paginated stream,BATCH_SIZEelements, we stop, commit, then continue consuming the stream,This should work, but I don't love how it:
General way
Alternatively, we could try to make an appending workflow like this work in VirtualiZarr:
I don't love having to add a
regionkwarg to the parser, but I do love how this:regionwrites invds.vz.to_icechunkin Implement to_icechunk(region=...) #873,The main difficulty with implementing this is: how do we efficiently
listonly chunks from a specific region of the store?One naive way is to list all chunks every time we parse then filter the returned arrays for only the chunks in the specified region. But even paginating the listing, this would be
O(n_chunks_per_array^2)in run time.Instead I think we can use the
prefixandoffsetargs tolist_with_offset(exposed inobstore.list) together to list only specified regions of the chunk grid.We can control the
prefix,offset,For example imagine we have a store with chunk keys
c/0.0.0,c/0.0.1,c/0.1.0, &c/0.1.1. Passingprefix="{base_prefix}/c/0.", offset="{base_prefix}/c/0.1"(and only streaming 2 elements), we should get backc/0.1.0&c/0.1.1, but never listc/0.0.0, andc/0.0.1.This only allows us to split into regions along one dimension, but that should be enough.
(For this approach to work we would need developmentseed/obstore#545.)