Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
75bb535
feat: add streaming write API for iterator-based array writing
theokb7 Mar 9, 2026
3665d74
feat: add xarray Dataset write support
theokb7 Mar 9, 2026
7fdaad5
feat: add dask array integration
theokb7 Mar 9, 2026
c3aafdc
refactor: Remove `resource` import
theokb7 Mar 11, 2026
ca8b232
feat: Add memory usage test
theokb7 Mar 11, 2026
b1f2664
feat: add and test fsspec xarray support
theokb7 Mar 16, 2026
55497a9
Merge branch 'main' into feat/streaming-write-and-dataset-api
terraputix Mar 19, 2026
a423c24
fix linter
terraputix Mar 19, 2026
9e2ec1e
use similar code for streaming and full writes
terraputix Mar 20, 2026
b5d09b5
revert unnecessary changes on this branch
terraputix Mar 24, 2026
0925814
remove dask related changes in xarray tests
terraputix Mar 24, 2026
13f0ba3
add missing method
terraputix Mar 24, 2026
13b850a
remove unnecessary dtype from test
terraputix Mar 24, 2026
10c3d69
use enum based disctinction
terraputix Mar 24, 2026
c9a07fb
remove example
terraputix Mar 24, 2026
9669d3d
use freestanding tests and separate into two files
terraputix Mar 24, 2026
c65f173
fix pyright errors
terraputix Mar 24, 2026
9124d6d
use numpy dtype instead of string
terraputix Mar 24, 2026
ae99fb5
use same pattern for temporary file as in other tests
terraputix Mar 24, 2026
5a5c1c8
raise top level import error if dask not available
terraputix Mar 24, 2026
0a37954
use fixture for second empty file as well
terraputix Mar 25, 2026
a857360
delete memory usage test
terraputix Mar 25, 2026
f9ccd5e
consistency in doc comments
terraputix Mar 25, 2026
7b09377
cleanup
terraputix Mar 25, 2026
d76ccb6
use ndindex over itertools
terraputix Mar 25, 2026
eee9f03
guard against mismatching chunk shapes
terraputix Mar 25, 2026
12f861b
improve iter type hint
terraputix Mar 25, 2026
05d5859
validate all dimensions
terraputix Mar 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ codec = [
xarray = ["xarray>=2023.1.0"]
fsspec = ["fsspec>=2023.1.0", "s3fs>=2023.1.0"]
grids = ["pyproj>=3.1.0"]
dask = ["dask[array]>=2023.1.0"]
all = [
"zarr>=2.18.2",
"numcodecs>=0.12.1",
"xarray>=2023.1.0",
"fsspec>=2023.10.0",
"s3fs>=2023.1.0",
"pyproj>=3.3.0"
"pyproj>=3.3.0",
"dask[array]>=2023.1.0",
]

[dependency-groups]
Expand Down
40 changes: 40 additions & 0 deletions python/omfiles/_rust/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,46 @@ class OmFileWriter:
Raises:
ValueError: If the data type is unsupported or if parameters are invalid
"""
def write_array_streaming(
self,
dimensions: typing.Sequence[builtins.int],
chunks: typing.Sequence[builtins.int],
chunk_iterator: typing.Iterator,
dtype: numpy.dtype,
scale_factor: typing.Optional[builtins.float] = None,
add_offset: typing.Optional[builtins.float] = None,
compression: typing.Optional[builtins.str] = None,
name: typing.Optional[builtins.str] = None,
children: typing.Optional[typing.Sequence[OmVariable]] = None,
) -> OmVariable:
r"""
Write an array to the .om file by streaming chunks from a Python iterator.

This method is designed for writing large arrays that do not fit in memory.
Instead of providing the full array, you provide the full array dimensions
and an iterator that yields numpy array chunks.

Chunks MUST be yielded in row-major order (C-order) of the chunk grid.
Each chunk's shape determines how many internal file chunks it covers.

Args:
dimensions: Shape of the full array (e.g., [1000, 2000])
chunks: Chunk sizes for each dimension (e.g., [100, 200])
chunk_iterator: Python iterable yielding numpy arrays, one per chunk region
dtype: Numpy dtype of the array (e.g., np.dtype(np.float32))
scale_factor: Scale factor for data compression (default: 1.0)
add_offset: Offset value for data compression (default: 0.0)
compression: Compression algorithm to use (default: "pfor_delta_2d")
name: Name of the variable (default: "data")
children: List of child variables (default: [])

Returns:
:py:data:`omfiles.OmVariable` representing the written array in the file structure

Raises:
ValueError: If the dtype is unsupported or parameters are invalid
RuntimeError: If there's an error during compression or I/O
"""
def write_scalar(
self, value: typing.Any, name: builtins.str, children: typing.Optional[typing.Sequence[OmVariable]] = None
) -> OmVariable:
Expand Down
139 changes: 139 additions & 0 deletions python/omfiles/dask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""Dask array integration for writing to OM files."""

import math
from typing import Iterator, Optional, Sequence

import numpy as np

from omfiles._rust import OmFileWriter, OmVariable

try:
import dask.array as da
except ImportError:
raise ImportError("omfiles[dask] is required for dask functionality")


def _validate_chunk_alignment(
data_chunks: tuple,
om_chunks: list[int],
array_shape: tuple,
) -> None:
"""
Validate dask chunks are compatible with OM chunks for block-level streaming.

Every non-last dask chunk along each dimension must be an exact multiple
of the corresponding OM chunk size (the last chunk may be smaller).
Additionally, for the leftmost dimension where a dask block contains more
than one OM chunk, every trailing dimension must be fully covered by each
dask block. This ensures the local chunk traversal inside a block matches
the global file order.
"""
ndim = len(om_chunks)

Comment thread
terraputix marked this conversation as resolved.
for d in range(ndim):
dim_chunks = data_chunks[d]
for i, c in enumerate(dim_chunks[:-1]):
if c % om_chunks[d] != 0:
raise ValueError(
f"Dask chunk size {c} along dimension {d} (block {i}) "
f"is not a multiple of the OM chunk size {om_chunks[d]}."
)

first_multi = None
for d in range(ndim):
local_n = max(math.ceil(c / om_chunks[d]) for c in data_chunks[d])
if local_n > 1:
first_multi = d
break

if first_multi is not None:
for d in range(first_multi + 1, ndim):
dim_chunks = data_chunks[d]
if not (len(dim_chunks) == 1 and dim_chunks[0] == array_shape[d]):
raise ValueError(
f"Dask blocks have multiple OM chunks in dimension {first_multi}, "
f"but dimension {d} is not fully covered by each dask block "
f"(dask chunks {dim_chunks} vs array size {array_shape[d]}). "
f"Rechunk so trailing dimensions are fully covered."
)


def _dask_block_iterator(dask_array: da.Array) -> Iterator[np.ndarray]:
"""
Yield computed numpy arrays from a dask array in C-order block traversal.

The OM file format requires chunks to be written in sequential order
corresponding to a row-major (C-order) traversal of the chunk grid.
np.ndindex yields indices in C-order: the last axis index varies fastest.
"""
for block_indices in np.ndindex(*dask_array.numblocks):
yield dask_array.blocks[block_indices].compute()


def write_dask_array(
writer: OmFileWriter,
data: da.Array,
chunks: Optional[Sequence[int]] = None,
scale_factor: float = 1.0,
add_offset: float = 0.0,
compression: str = "pfor_delta_2d",
name: str = "data",
children: Optional[Sequence[OmVariable]] = None,
) -> OmVariable:
"""
Write a dask array to an OM file using streaming/incremental writes.

Iterates over the blocks of the dask array, computing each block
on-the-fly, and streams them to the OM file writer. Only one block
is held in memory at a time.

The dask array's chunk structure is used to determine the OM file's
chunk dimensions by default. Dask chunks must be multiples of the OM
chunk sizes (except the last chunk along each dimension which may be
smaller). When a dask block contains more than one OM chunk in a
dimension, all trailing dimensions must be fully covered by each block.

Performance: write speed depends on the number of dask tasks, not just
data size. For best performance, use dask chunks much larger than the
OM chunk sizes — ideally covering the full extent of trailing dimensions.
For example, with OM chunks of (124, 124) on an (8192, 8192) array,
dask chunks of (124, 8192) will write ~10x faster than (124, 124).

Args:
writer: An open OmFileWriter instance.
data: A dask array to write.
chunks: OM file chunk sizes per dimension. If None, uses the dask
array's chunk sizes. Dask chunks must be multiples of these.
scale_factor: Scale factor for float compression (default: 1.0).
add_offset: Offset for float compression (default: 0.0).
compression: Compression algorithm (default: "pfor_delta_2d").
name: Variable name (default: "data").
children: Child variables (default: None).

Returns:
OmVariable representing the written array.

Raises:
TypeError: If data is not a dask array.
ValueError: If dask chunks are incompatible with OM chunks.
"""
if not isinstance(data, da.Array):
raise TypeError(f"Expected a dask array, got {type(data)}")

if chunks is not None and len(chunks) != data.ndim:
raise ValueError(f"chunks has {len(chunks)} element(s) but data has {data.ndim} dimension(s).")

om_chunks: list[int] = list(chunks) if chunks is not None else [c[0] for c in data.chunks]
_validate_chunk_alignment(data.chunks, om_chunks, data.shape)

return writer.write_array_streaming(
dimensions=[int(d) for d in data.shape],
chunks=om_chunks,
chunk_iterator=_dask_block_iterator(data),
dtype=data.dtype,
scale_factor=scale_factor,
add_offset=add_offset,
compression=compression,
name=name,
children=list(children) if children is not None else [],
)
Loading
Loading