Skip to content

Commit 3acf565

Browse files
Python bindings: Release GIL during IO wait operations (openPMD#1381)
* Release GIL: Easy cases * ReadIterations iterator * Iteration::open, Iteration::close * Some fixes * Ensure that first iteration is seen in Python API
1 parent 413b1c5 commit 3acf565

File tree

6 files changed

+151
-32
lines changed

6 files changed

+151
-32
lines changed

include/openPMD/Iteration.hpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,4 +420,24 @@ inline T Iteration::dt() const
420420
{
421421
return this->readFloatingpoint<T>("dt");
422422
}
423+
424+
/**
425+
* @brief Subclass of Iteration that knows its own index withing the containing
426+
* Series.
427+
*/
428+
class IndexedIteration : public Iteration
429+
{
430+
friend class SeriesIterator;
431+
friend class WriteIterations;
432+
433+
public:
434+
using index_t = Iteration::IterationIndex_t;
435+
index_t const iterationIndex;
436+
437+
private:
438+
template <typename Iteration_t>
439+
IndexedIteration(Iteration_t &&it, index_t index)
440+
: Iteration(std::forward<Iteration_t>(it)), iterationIndex(index)
441+
{}
442+
};
423443
} // namespace openPMD

include/openPMD/ReadIterations.hpp

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,26 +31,6 @@
3131

3232
namespace openPMD
3333
{
34-
/**
35-
* @brief Subclass of Iteration that knows its own index withing the containing
36-
* Series.
37-
*/
38-
class IndexedIteration : public Iteration
39-
{
40-
friend class SeriesIterator;
41-
42-
public:
43-
using iterations_t = decltype(internal::SeriesData::iterations);
44-
using index_t = iterations_t::key_type;
45-
index_t const iterationIndex;
46-
47-
private:
48-
template <typename Iteration_t>
49-
IndexedIteration(Iteration_t &&it, index_t index)
50-
: Iteration(std::forward<Iteration_t>(it)), iterationIndex(index)
51-
{}
52-
};
53-
5434
class SeriesIterator
5535
{
5636
using iteration_index_t = IndexedIteration::index_t;

include/openPMD/WriteIterations.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,5 +87,10 @@ class WriteIterations
8787
public:
8888
mapped_type &operator[](key_type const &key);
8989
mapped_type &operator[](key_type &&key);
90+
91+
/**
92+
* Return the iteration that is currently being written to, if it exists.
93+
*/
94+
std::optional<IndexedIteration> currentIteration();
9095
};
9196
} // namespace openPMD

src/WriteIterations.cpp

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,17 @@ WriteIterations::mapped_type &WriteIterations::operator[](key_type &&key)
6969
"[WriteIterations] Trying to access after closing Series.");
7070
}
7171
auto &s = shared->value();
72-
if (s.currentlyOpen.has_value())
72+
auto lastIteration = currentIteration();
73+
if (lastIteration.has_value())
7374
{
74-
auto lastIterationIndex = s.currentlyOpen.value();
75-
auto &lastIteration = s.iterations.at(lastIterationIndex);
76-
if (lastIterationIndex != key && !lastIteration.closed())
75+
auto lastIteration_v = lastIteration.value();
76+
if (lastIteration_v.iterationIndex == key)
7777
{
78-
lastIteration.close();
78+
return s.iterations.at(std::move(key));
79+
}
80+
else
81+
{
82+
lastIteration_v.close(); // continue below
7983
}
8084
}
8185
s.currentlyOpen = key;
@@ -87,4 +91,24 @@ WriteIterations::mapped_type &WriteIterations::operator[](key_type &&key)
8791
}
8892
return res;
8993
}
94+
95+
std::optional<IndexedIteration> WriteIterations::currentIteration()
96+
{
97+
if (!shared || !shared->has_value())
98+
{
99+
return std::nullopt;
100+
}
101+
auto &s = shared->value();
102+
if (!s.currentlyOpen.has_value())
103+
{
104+
return std::nullopt;
105+
}
106+
Iteration &currentIteration = s.iterations.at(s.currentlyOpen.value());
107+
if (currentIteration.closed())
108+
{
109+
return std::nullopt;
110+
}
111+
return std::make_optional<IndexedIteration>(
112+
IndexedIteration(currentIteration, s.currentlyOpen.value()));
113+
}
90114
} // namespace openPMD

src/binding/python/Iteration.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,20 @@ void init_Iteration(py::module &m)
6363
"dt", &Iteration::dt<long double>, &Iteration::setDt<double>)
6464
.def_property(
6565
"time_unit_SI", &Iteration::timeUnitSI, &Iteration::setTimeUnitSI)
66-
.def("open", &Iteration::open)
67-
.def("close", &Iteration::close, py::arg("flush") = true)
66+
.def(
67+
"open",
68+
[](Iteration &it) {
69+
py::gil_scoped_release release;
70+
return it.open();
71+
})
72+
.def(
73+
"close",
74+
/*
75+
* Cannot release the GIL here since Python buffers might be
76+
* accessed in deferred tasks
77+
*/
78+
&Iteration::close,
79+
py::arg("flush") = true)
6880

6981
// TODO remove in future versions (deprecated)
7082
.def("set_time", &Iteration::setTime<double>)

src/binding/python/Series.cpp

Lines changed: 83 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,32 +53,103 @@ struct openPMD_PyMPICommObject
5353
using openPMD_PyMPIIntracommObject = openPMD_PyMPICommObject;
5454
#endif
5555

56+
struct SeriesIteratorPythonAdaptor : SeriesIterator
57+
{
58+
SeriesIteratorPythonAdaptor(SeriesIterator it)
59+
: SeriesIterator(std::move(it))
60+
{}
61+
62+
/*
63+
* Python iterators are weird and call `__next__()` already for getting the
64+
* first element.
65+
* In that case, no `operator++()` must be called...
66+
*/
67+
bool first_iteration = true;
68+
};
69+
5670
void init_Series(py::module &m)
5771
{
5872
py::class_<WriteIterations>(m, "WriteIterations")
5973
.def(
6074
"__getitem__",
6175
[](WriteIterations writeIterations, Series::IterationIndex_t key) {
76+
auto lastIteration = writeIterations.currentIteration();
77+
if (lastIteration.has_value() &&
78+
lastIteration.value().iterationIndex != key)
79+
{
80+
// this must happen under the GIL
81+
lastIteration.value().close();
82+
}
83+
py::gil_scoped_release release;
6284
return writeIterations[key];
6385
},
6486
// copy + keepalive
65-
py::return_value_policy::copy);
87+
py::return_value_policy::copy)
88+
.def(
89+
"current_iteration",
90+
&WriteIterations::currentIteration,
91+
"Return the iteration that is currently being written to, if it "
92+
"exists.");
6693
py::class_<IndexedIteration, Iteration>(m, "IndexedIteration")
6794
.def_readonly("iteration_index", &IndexedIteration::iterationIndex);
95+
96+
py::class_<SeriesIteratorPythonAdaptor>(m, "SeriesIterator")
97+
.def(
98+
"__next__",
99+
[](SeriesIteratorPythonAdaptor &iterator) {
100+
if (iterator == SeriesIterator::end())
101+
{
102+
throw py::stop_iteration();
103+
}
104+
/*
105+
* Closing the iteration must happen under the GIL lock since
106+
* Python buffers might be accessed
107+
*/
108+
if (!iterator.first_iteration)
109+
{
110+
if (!(*iterator).closed())
111+
{
112+
(*iterator).close();
113+
}
114+
py::gil_scoped_release release;
115+
++iterator;
116+
}
117+
iterator.first_iteration = false;
118+
if (iterator == SeriesIterator::end())
119+
{
120+
throw py::stop_iteration();
121+
}
122+
else
123+
{
124+
return *iterator;
125+
}
126+
}
127+
128+
);
129+
68130
py::class_<ReadIterations>(m, "ReadIterations")
69131
.def(
70132
"__iter__",
71133
[](ReadIterations &readIterations) {
72-
return py::make_iterator(
73-
readIterations.begin(), readIterations.end());
134+
// Simple iterator implementation:
135+
// But we need to release the GIL inside
136+
// SeriesIterator::operator++, so manually it is
137+
// return py::make_iterator(
138+
// readIterations.begin(), readIterations.end());
139+
return SeriesIteratorPythonAdaptor(readIterations.begin());
74140
},
75141
// keep handle alive while iterator exists
76142
py::keep_alive<0, 1>());
77143

78144
py::class_<Series, Attributable>(m, "Series")
79145

80146
.def(
81-
py::init<std::string const &, Access, std::string const &>(),
147+
py::init([](std::string const &filepath,
148+
Access at,
149+
std::string const &options) {
150+
py::gil_scoped_release release;
151+
return new Series(filepath, at, options);
152+
}),
82153
py::arg("filepath"),
83154
py::arg("access"),
84155
py::arg("options") = "{}")
@@ -145,6 +216,7 @@ void init_Series(py::module &m)
145216
"(Mismatched MPI at compile vs. runtime?)");
146217
}
147218

219+
py::gil_scoped_release release;
148220
return new Series(filepath, at, *mpiCommPtr, options);
149221
}),
150222
py::arg("filepath"),
@@ -232,7 +304,13 @@ this method.
232304
py::return_value_policy::reference,
233305
// garbage collection: return value must be freed before Series
234306
py::keep_alive<1, 0>())
235-
.def("read_iterations", &Series::readIterations, py::keep_alive<0, 1>())
307+
.def(
308+
"read_iterations",
309+
[](Series &s) {
310+
py::gil_scoped_release release;
311+
return s.readIterations();
312+
},
313+
py::keep_alive<0, 1>())
236314
.def(
237315
"write_iterations",
238316
&Series::writeIterations,

0 commit comments

Comments
 (0)