-
Notifications
You must be signed in to change notification settings - Fork 3
feat: Add streaming write support to enable writing larger-than-ram dask-backed arrays and datasets #108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
terraputix
merged 28 commits into
open-meteo:main
from
theokb7:feat/streaming-write-and-dataset-api
Mar 25, 2026
Merged
feat: Add streaming write support to enable writing larger-than-ram dask-backed arrays and datasets #108
Changes from all commits
Commits
Show all changes
28 commits
Select commit
Hold shift + click to select a range
75bb535
feat: add streaming write API for iterator-based array writing
theokb7 3665d74
feat: add xarray Dataset write support
theokb7 7fdaad5
feat: add dask array integration
theokb7 c3aafdc
refactor: Remove `resource` import
theokb7 ca8b232
feat: Add memory usage test
theokb7 b1f2664
feat: add and test fsspec xarray support
theokb7 55497a9
Merge branch 'main' into feat/streaming-write-and-dataset-api
terraputix a423c24
fix linter
terraputix 9e2ec1e
use similar code for streaming and full writes
terraputix b5d09b5
revert unnecessary changes on this branch
terraputix 0925814
remove dask related changes in xarray tests
terraputix 13f0ba3
add missing method
terraputix 13b850a
remove unnecessary dtype from test
terraputix 10c3d69
use enum based disctinction
terraputix c9a07fb
remove example
terraputix 9669d3d
use freestanding tests and separate into two files
terraputix c65f173
fix pyright errors
terraputix 9124d6d
use numpy dtype instead of string
terraputix ae99fb5
use same pattern for temporary file as in other tests
terraputix 5a5c1c8
raise top level import error if dask not available
terraputix 0a37954
use fixture for second empty file as well
terraputix a857360
delete memory usage test
terraputix f9ccd5e
consistency in doc comments
terraputix 7b09377
cleanup
terraputix d76ccb6
use ndindex over itertools
terraputix eee9f03
guard against mismatching chunk shapes
terraputix 12f861b
improve iter type hint
terraputix 05d5859
validate all dimensions
terraputix File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,139 @@ | ||
| """Dask array integration for writing to OM files.""" | ||
|
|
||
| import math | ||
| from typing import Iterator, Optional, Sequence | ||
|
|
||
| import numpy as np | ||
|
|
||
| from omfiles._rust import OmFileWriter, OmVariable | ||
|
|
||
| try: | ||
| import dask.array as da | ||
| except ImportError: | ||
| raise ImportError("omfiles[dask] is required for dask functionality") | ||
|
|
||
|
|
||
| def _validate_chunk_alignment( | ||
| data_chunks: tuple, | ||
| om_chunks: list[int], | ||
| array_shape: tuple, | ||
| ) -> None: | ||
| """ | ||
| Validate dask chunks are compatible with OM chunks for block-level streaming. | ||
|
|
||
| Every non-last dask chunk along each dimension must be an exact multiple | ||
| of the corresponding OM chunk size (the last chunk may be smaller). | ||
| Additionally, for the leftmost dimension where a dask block contains more | ||
| than one OM chunk, every trailing dimension must be fully covered by each | ||
| dask block. This ensures the local chunk traversal inside a block matches | ||
| the global file order. | ||
| """ | ||
| ndim = len(om_chunks) | ||
|
|
||
| for d in range(ndim): | ||
| dim_chunks = data_chunks[d] | ||
| for i, c in enumerate(dim_chunks[:-1]): | ||
| if c % om_chunks[d] != 0: | ||
| raise ValueError( | ||
| f"Dask chunk size {c} along dimension {d} (block {i}) " | ||
| f"is not a multiple of the OM chunk size {om_chunks[d]}." | ||
| ) | ||
|
|
||
| first_multi = None | ||
| for d in range(ndim): | ||
| local_n = max(math.ceil(c / om_chunks[d]) for c in data_chunks[d]) | ||
| if local_n > 1: | ||
| first_multi = d | ||
| break | ||
|
|
||
| if first_multi is not None: | ||
| for d in range(first_multi + 1, ndim): | ||
| dim_chunks = data_chunks[d] | ||
| if not (len(dim_chunks) == 1 and dim_chunks[0] == array_shape[d]): | ||
| raise ValueError( | ||
| f"Dask blocks have multiple OM chunks in dimension {first_multi}, " | ||
| f"but dimension {d} is not fully covered by each dask block " | ||
| f"(dask chunks {dim_chunks} vs array size {array_shape[d]}). " | ||
| f"Rechunk so trailing dimensions are fully covered." | ||
| ) | ||
|
|
||
|
|
||
| def _dask_block_iterator(dask_array: da.Array) -> Iterator[np.ndarray]: | ||
| """ | ||
| Yield computed numpy arrays from a dask array in C-order block traversal. | ||
|
|
||
| The OM file format requires chunks to be written in sequential order | ||
| corresponding to a row-major (C-order) traversal of the chunk grid. | ||
| np.ndindex yields indices in C-order: the last axis index varies fastest. | ||
| """ | ||
| for block_indices in np.ndindex(*dask_array.numblocks): | ||
| yield dask_array.blocks[block_indices].compute() | ||
|
|
||
|
|
||
| def write_dask_array( | ||
| writer: OmFileWriter, | ||
| data: da.Array, | ||
| chunks: Optional[Sequence[int]] = None, | ||
| scale_factor: float = 1.0, | ||
| add_offset: float = 0.0, | ||
| compression: str = "pfor_delta_2d", | ||
| name: str = "data", | ||
| children: Optional[Sequence[OmVariable]] = None, | ||
| ) -> OmVariable: | ||
| """ | ||
| Write a dask array to an OM file using streaming/incremental writes. | ||
|
|
||
| Iterates over the blocks of the dask array, computing each block | ||
| on-the-fly, and streams them to the OM file writer. Only one block | ||
| is held in memory at a time. | ||
|
|
||
| The dask array's chunk structure is used to determine the OM file's | ||
| chunk dimensions by default. Dask chunks must be multiples of the OM | ||
| chunk sizes (except the last chunk along each dimension which may be | ||
| smaller). When a dask block contains more than one OM chunk in a | ||
| dimension, all trailing dimensions must be fully covered by each block. | ||
|
|
||
| Performance: write speed depends on the number of dask tasks, not just | ||
| data size. For best performance, use dask chunks much larger than the | ||
| OM chunk sizes — ideally covering the full extent of trailing dimensions. | ||
| For example, with OM chunks of (124, 124) on an (8192, 8192) array, | ||
| dask chunks of (124, 8192) will write ~10x faster than (124, 124). | ||
|
|
||
| Args: | ||
| writer: An open OmFileWriter instance. | ||
| data: A dask array to write. | ||
| chunks: OM file chunk sizes per dimension. If None, uses the dask | ||
| array's chunk sizes. Dask chunks must be multiples of these. | ||
| scale_factor: Scale factor for float compression (default: 1.0). | ||
| add_offset: Offset for float compression (default: 0.0). | ||
| compression: Compression algorithm (default: "pfor_delta_2d"). | ||
| name: Variable name (default: "data"). | ||
| children: Child variables (default: None). | ||
|
|
||
| Returns: | ||
| OmVariable representing the written array. | ||
|
|
||
| Raises: | ||
| TypeError: If data is not a dask array. | ||
| ValueError: If dask chunks are incompatible with OM chunks. | ||
| """ | ||
| if not isinstance(data, da.Array): | ||
| raise TypeError(f"Expected a dask array, got {type(data)}") | ||
|
|
||
| if chunks is not None and len(chunks) != data.ndim: | ||
| raise ValueError(f"chunks has {len(chunks)} element(s) but data has {data.ndim} dimension(s).") | ||
|
|
||
| om_chunks: list[int] = list(chunks) if chunks is not None else [c[0] for c in data.chunks] | ||
| _validate_chunk_alignment(data.chunks, om_chunks, data.shape) | ||
|
|
||
| return writer.write_array_streaming( | ||
| dimensions=[int(d) for d in data.shape], | ||
| chunks=om_chunks, | ||
| chunk_iterator=_dask_block_iterator(data), | ||
| dtype=data.dtype, | ||
| scale_factor=scale_factor, | ||
| add_offset=add_offset, | ||
| compression=compression, | ||
| name=name, | ||
| children=list(children) if children is not None else [], | ||
| ) | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.