Skip to content

Commit 5fec415

Browse files
Joined arrays in ADIOS2 (#1382)
* Extract verifyChunk * Main implementation Use magic number instead of API call (impl) * Python bindings * Throw errors if unsupported * Testing Only test if ADIOS2 version at least 2.9 * Documentation * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix ADIOS2 checks for multidimensional joined arrays * Python test * Expose this to the sliced Python API * Fix * Fix formatting * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 045e416 commit 5fec415

21 files changed

+843
-152
lines changed

docs/source/usage/workflow.rst

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,49 @@
33
Workflow
44
========
55

6+
Storing and reading chunks
7+
--------------------------
8+
9+
1. **Chunks within an n-dimensional dataset**
10+
11+
Most commonly, chunks within an n-dimensional dataset are identified by their offset and extent.
12+
The extent is the size of the chunk in each dimension, NOT the absolute coordinate within the entire dataset.
13+
14+
In the Python API, this is modeled to conform to the conventional ``__setitem__``/``__getitem__`` protocol.
15+
16+
2. **Joined arrays (write only)**
17+
18+
(Currently) only supported in ADIOS2 no older than v2.9.0 under the conditions listed in the `ADIOS2 documentation on joined arrays <https://adios2.readthedocs.io/en/latest/components/components.html#shapes>`_.
19+
20+
In some cases, the concrete chunk within a dataset does not matter and the computation of indexes is a needless computational and mental overhead.
21+
This commonly occurs for particle data which the openPMD-standard models as a list of particles.
22+
The order of particles does not matter greatly, and making different parallel processes agree on indexing is error-prone boilerplate.
23+
24+
In such a case, at most one *joined dimension* can be specified in the Dataset, e.g. ``{Dataset::JOINED_DIMENSION, 128, 128}`` (3D for the sake of explanation, particle data would normally be 1D).
25+
The chunk is then stored by specifying an empty offset vector ``{}``.
26+
The chunk extent vector must be equivalent to the global extent in all non-joined dimensions (i.e. joined arrays allow no further sub-chunking other than concatenation along the joined dimension).
27+
The joined dimension of the extent vector specifies the extent that this piece should have along the joined dimension.
28+
In the Python API, the slice-based setter syntax can be used as an abbreviation since the necessary information is determined from the passed array, e.g. ``record_component[()] = local_data``.
29+
The global extent of the dataset along the joined dimension will then be the sum of all local chunk extents along the joined dimension.
30+
31+
Since openPMD follows a struct-of-array layout of data, it is important not to lose correlation of data between components. E.g., joining an array must take care that ``particles/e/position/x`` and ``particles/e/position/y`` are joined in uniform way.
32+
33+
The openPMD-api makes the **following guarantee**:
34+
35+
Consider a Series written from ``N`` parallel processes between two (collective) flush points. For each parallel process ``n`` and dataset ``D``, let:
36+
37+
* ``chunk(D, n, i)`` be the ``i``'th chunk written to dataset ``D`` on process ``n``
38+
* ``num_chunks(D, n)`` be the count of chunks written by ``n`` to ``D``
39+
* ``joined_index(D, c)`` be the index of chunk ``c`` in the joining order of ``D``
40+
41+
Then for any two datasets ``x`` and ``y``:
42+
43+
* If for any parallel process ``n`` the condition holds that ``num_chunks(x, n) = num_chunks(y, n)`` (between the two flush points!)...
44+
* ...then for any parallel process ``n`` and chunk index ``i`` less than ``num_chunks(x, n)``: ``joined_index(x, chunk(x, n, i)) = joined_index(y, chunk(y, n, i))``.
45+
46+
**TLDR:** Writing chunks to two joined arrays in synchronous way (**1.** same order of store operations and **2.** between the same flush operations) will result in the same joining order in both arrays.
47+
48+
649
Access modes
750
------------
851

examples/5_write_parallel.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,21 @@
1414
import numpy as np
1515
import openpmd_api as io
1616

17+
try:
18+
import adios2
19+
from packaging import version
20+
USE_JOINED_DIMENSION = \
21+
version.parse(adios2.__version__) >= version.parse('2.9.0')
22+
except ImportError:
23+
USE_JOINED_DIMENSION = False
24+
1725
if __name__ == "__main__":
1826
# also works with any other MPI communicator
1927
comm = MPI.COMM_WORLD
2028

2129
# global data set to write: [MPI_Size * 10, 300]
2230
# each rank writes a 10x300 slice with its MPI rank as values
23-
local_value = comm.size
31+
local_value = comm.rank
2432
local_data = np.ones(10 * 300,
2533
dtype=np.double).reshape(10, 300) * local_value
2634
if 0 == comm.rank:
@@ -29,7 +37,9 @@
2937

3038
# open file for writing
3139
series = io.Series(
32-
"../samples/5_parallel_write_py.h5",
40+
"../samples/5_parallel_write_py.bp"
41+
if USE_JOINED_DIMENSION
42+
else "../samples/5_parallel_write_py.h5",
3343
io.Access.create,
3444
comm
3545
)
@@ -51,7 +61,9 @@
5161
meshes["mymesh"]
5262

5363
# example 1D domain decomposition in first index
54-
global_extent = [comm.size * 10, 300]
64+
global_extent = [io.Dataset.JOINED_DIMENSION, 300] \
65+
if USE_JOINED_DIMENSION else [comm.size * 10, 300]
66+
5567
dataset = io.Dataset(local_data.dtype, global_extent)
5668

5769
if 0 == comm.rank:
@@ -64,7 +76,15 @@
6476
"mymesh in iteration 1")
6577

6678
# example shows a 1D domain decomposition in first index
67-
mymesh[comm.rank*10:(comm.rank+1)*10, :] = local_data
79+
80+
if USE_JOINED_DIMENSION:
81+
# explicit API
82+
# mymesh.store_chunk(local_data, [], [10, 300])
83+
mymesh[:, :] = local_data
84+
# or short:
85+
# mymesh[()] = local_data
86+
else:
87+
mymesh[comm.rank*10:(comm.rank+1)*10, :] = local_data
6888
if 0 == comm.rank:
6989
print("Registered a single chunk per MPI rank containing its "
7090
"contribution, ready to write content to disk")

include/openPMD/Dataset.hpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222

2323
#include "openPMD/Datatype.hpp"
2424

25+
#include <limits>
2526
#include <memory>
27+
#include <optional>
2628
#include <string>
2729
#include <type_traits>
2830
#include <vector>
@@ -37,6 +39,11 @@ class Dataset
3739
friend class RecordComponent;
3840

3941
public:
42+
enum : std::uint64_t
43+
{
44+
JOINED_DIMENSION = std::numeric_limits<std::uint64_t>::max()
45+
};
46+
4047
Dataset(Datatype, Extent, std::string options = "{}");
4148

4249
/**
@@ -53,5 +60,9 @@ class Dataset
5360
Datatype dtype;
5461
uint8_t rank;
5562
std::string options = "{}"; //!< backend-dependent JSON configuration
63+
64+
bool empty() const;
65+
66+
std::optional<size_t> joinedDimension() const;
5667
};
5768
} // namespace openPMD

include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ namespace openPMD
6060
{
6161
#if openPMD_HAVE_ADIOS2
6262

63+
std::optional<size_t> joinedDimension(adios2::Dims const &dims);
64+
6365
class ADIOS2IOHandler;
6466

6567
namespace detail
@@ -443,12 +445,35 @@ class ADIOS2IOHandlerImpl
443445
std::to_string(actualDim) + ")");
444446
}
445447
}
446-
for (unsigned int i = 0; i < actualDim; i++)
448+
auto joinedDim = joinedDimension(shape);
449+
if (joinedDim.has_value())
447450
{
448-
if (offset[i] + extent[i] > shape[i])
451+
if (!offset.empty())
449452
{
450453
throw std::runtime_error(
451-
"[ADIOS2] Dataset access out of bounds.");
454+
"[ADIOS2] Offset must be an empty vector in case of joined "
455+
"array.");
456+
}
457+
for (unsigned int i = 0; i < actualDim; i++)
458+
{
459+
if (*joinedDim != i && extent[i] != shape[i])
460+
{
461+
throw std::runtime_error(
462+
"[ADIOS2] store_chunk extent of non-joined dimensions "
463+
"must be equivalent to the total extent.");
464+
}
465+
}
466+
}
467+
else
468+
{
469+
for (unsigned int i = 0; i < actualDim; i++)
470+
{
471+
if (!(joinedDim.has_value() && *joinedDim == i) &&
472+
offset[i] + extent[i] > shape[i])
473+
{
474+
throw std::runtime_error(
475+
"[ADIOS2] Dataset access out of bounds.");
476+
}
452477
}
453478
}
454479

include/openPMD/IO/IOTask.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ struct OPENPMDAPI_EXPORT Parameter<Operation::CREATE_DATASET>
326326
Extent extent = {};
327327
Datatype dtype = Datatype::UNDEFINED;
328328
std::string options = "{}";
329+
std::optional<size_t> joinedDimension;
329330

330331
/** Warn about unused JSON paramters
331332
*

include/openPMD/RecordComponent.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,11 @@ OPENPMD_protected
537537
}
538538

539539
void readBase(bool require_unit_si);
540+
541+
template <typename T>
542+
void verifyChunk(Offset const &, Extent const &) const;
543+
544+
void verifyChunk(Datatype, Offset const &, Extent const &) const;
540545
}; // RecordComponent
541546

542547
} // namespace openPMD

include/openPMD/RecordComponent.tpp

Lines changed: 18 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -259,8 +259,17 @@ RecordComponent::storeChunk(T_ContiguousContainer &data, Offset o, Extent e)
259259
// default arguments
260260
// offset = {0u}: expand to right dim {0u, 0u, ...}
261261
Offset offset = o;
262-
if (o.size() == 1u && o.at(0) == 0u && dim > 1u)
263-
offset = Offset(dim, 0u);
262+
if (o.size() == 1u && o.at(0) == 0u)
263+
{
264+
if (joinedDimension().has_value())
265+
{
266+
offset.clear();
267+
}
268+
else if (dim > 1u)
269+
{
270+
offset = Offset(dim, 0u);
271+
}
272+
}
264273

265274
// extent = {-1u}: take full size
266275
Extent extent(dim, 1u);
@@ -278,38 +287,7 @@ template <typename T, typename F>
278287
inline DynamicMemoryView<T>
279288
RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer)
280289
{
281-
if (constant())
282-
throw std::runtime_error(
283-
"Chunks cannot be written for a constant RecordComponent.");
284-
if (empty())
285-
throw std::runtime_error(
286-
"Chunks cannot be written for an empty RecordComponent.");
287-
Datatype dtype = determineDatatype<T>();
288-
if (dtype != getDatatype())
289-
{
290-
std::ostringstream oss;
291-
oss << "Datatypes of chunk data (" << dtype
292-
<< ") and record component (" << getDatatype() << ") do not match.";
293-
throw std::runtime_error(oss.str());
294-
}
295-
uint8_t dim = getDimensionality();
296-
if (e.size() != dim || o.size() != dim)
297-
{
298-
std::ostringstream oss;
299-
oss << "Dimensionality of chunk ("
300-
<< "offset=" << o.size() << "D, "
301-
<< "extent=" << e.size() << "D) "
302-
<< "and record component (" << int(dim) << "D) "
303-
<< "do not match.";
304-
throw std::runtime_error(oss.str());
305-
}
306-
Extent dse = getExtent();
307-
for (uint8_t i = 0; i < dim; ++i)
308-
if (dse[i] < o[i] + e[i])
309-
throw std::runtime_error(
310-
"Chunk does not reside inside dataset (Dimension on index " +
311-
std::to_string(i) + ". DS: " + std::to_string(dse[i]) +
312-
" - Chunk: " + std::to_string(o[i] + e[i]) + ")");
290+
verifyChunk<T>(o, e);
313291

314292
/*
315293
* The openPMD backend might not yet know about this dataset.
@@ -334,6 +312,7 @@ RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer)
334312
dCreate.name = rc.m_name;
335313
dCreate.extent = getExtent();
336314
dCreate.dtype = getDatatype();
315+
dCreate.joinedDimension = joinedDimension();
337316
if (!rc.m_dataset.has_value())
338317
{
339318
throw error::WrongAPIUsage(
@@ -407,4 +386,9 @@ auto RecordComponent::visit(Args &&...args)
407386
getDatatype(), *this, std::forward<Args>(args)...);
408387
}
409388

389+
template <typename T>
390+
void RecordComponent::verifyChunk(Offset const &o, Extent const &e) const
391+
{
392+
verifyChunk(determineDatatype<T>(), o, e);
393+
}
410394
} // namespace openPMD

include/openPMD/backend/BaseRecordComponent.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ class BaseRecordComponent : virtual public Attributable
143143
*/
144144
bool constant() const;
145145

146+
std::optional<size_t> joinedDimension() const;
147+
146148
/**
147149
* Get data chunks that are available to be loaded from the backend.
148150
* Note that this is backend-dependent information and the returned

include/openPMD/backend/PatchRecordComponent.hpp

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
*/
2121
#pragma once
2222

23+
#include "openPMD/Error.hpp"
2324
#include "openPMD/RecordComponent.hpp"
2425
#include "openPMD/auxiliary/ShareRawInternal.hpp"
2526
#include "openPMD/backend/BaseRecordComponent.hpp"
@@ -85,6 +86,9 @@ class PatchRecordComponent : public RecordComponent
8586
template <typename T>
8687
void store(uint64_t idx, T);
8788

89+
template <typename T>
90+
void store(T);
91+
8892
// clang-format off
8993
OPENPMD_private
9094
// clang-format on
@@ -180,4 +184,33 @@ inline void PatchRecordComponent::store(uint64_t idx, T data)
180184
auto &rc = get();
181185
rc.m_chunks.push(IOTask(this, std::move(dWrite)));
182186
}
187+
188+
template <typename T>
189+
inline void PatchRecordComponent::store(T data)
190+
{
191+
Datatype dtype = determineDatatype<T>();
192+
if (dtype != getDatatype())
193+
{
194+
std::ostringstream oss;
195+
oss << "Datatypes of patch data (" << dtype << ") and dataset ("
196+
<< getDatatype() << ") do not match.";
197+
throw std::runtime_error(oss.str());
198+
}
199+
200+
if (!joinedDimension().has_value())
201+
{
202+
throw error::WrongAPIUsage(
203+
"[PatchRecordComponent::store] API call without explicit "
204+
"specification of index only allowed when a joined dimension is "
205+
"specified.");
206+
}
207+
208+
Parameter<Operation::WRITE_DATASET> dWrite;
209+
dWrite.offset = {};
210+
dWrite.extent = {1};
211+
dWrite.dtype = dtype;
212+
dWrite.data = std::make_shared<T>(data);
213+
auto &rc = get();
214+
rc.m_chunks.push(IOTask(this, std::move(dWrite)));
215+
}
183216
} // namespace openPMD

0 commit comments

Comments
 (0)