Skip to content

Commit 6772122

Browse files
authored
[Perfomance] Significantly reduce amount of copies during source operators (#564)
* Test for create * Test defer * Significantly fix iterate/just/from * Typo * Remove unused
1 parent 6b5065a commit 6772122

File tree

6 files changed

+334
-71
lines changed

6 files changed

+334
-71
lines changed

Rx/v2/src/rxcpp/rx-observable.hpp

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1676,16 +1676,16 @@ class observable<void, void>
16761676
/*! @copydoc rx-iterate.hpp
16771677
*/
16781678
template<class Collection>
1679-
static auto iterate(Collection c)
1680-
-> decltype(rxs::iterate(std::move(c), identity_current_thread())) {
1681-
return rxs::iterate(std::move(c), identity_current_thread());
1679+
static auto iterate(Collection&& c)
1680+
-> decltype(rxs::iterate(std::forward<Collection>(c), identity_current_thread())) {
1681+
return rxs::iterate(std::forward<Collection>(c), identity_current_thread());
16821682
}
16831683
/*! @copydoc rx-iterate.hpp
16841684
*/
16851685
template<class Collection, class Coordination>
1686-
static auto iterate(Collection c, Coordination cn)
1687-
-> decltype(rxs::iterate(std::move(c), std::move(cn))) {
1688-
return rxs::iterate(std::move(c), std::move(cn));
1686+
static auto iterate(Collection&& c, Coordination cn)
1687+
-> decltype(rxs::iterate(std::forward<Collection>(c), std::move(cn))) {
1688+
return rxs::iterate(std::forward<Collection>(c), std::move(cn));
16891689
}
16901690

16911691
/*! @copydoc rxcpp::sources::from()
@@ -1706,41 +1706,41 @@ class observable<void, void>
17061706
/*! @copydoc rxcpp::sources::from(Value0 v0, ValueN... vn)
17071707
*/
17081708
template<class Value0, class... ValueN>
1709-
static auto from(Value0 v0, ValueN... vn)
1710-
-> typename std::enable_if<!is_coordination<Value0>::value,
1711-
decltype( rxs::from(v0, vn...))>::type {
1712-
return rxs::from(v0, vn...);
1709+
static auto from(Value0&& v0, ValueN&&... vn)
1710+
-> typename std::enable_if<!is_coordination<rxu::decay_t<Value0>>::value,
1711+
decltype( rxs::from(std::forward<Value0>(v0), std::forward<ValueN>(vn)...))>::type {
1712+
return rxs::from(std::forward<Value0>(v0), std::forward<ValueN>(vn)...);
17131713
}
17141714
/*! @copydoc rxcpp::sources::from(Coordination cn, Value0 v0, ValueN... vn)
17151715
*/
17161716
template<class Coordination, class Value0, class... ValueN>
1717-
static auto from(Coordination cn, Value0 v0, ValueN... vn)
1717+
static auto from(Coordination cn, Value0&& v0, ValueN&&... vn)
17181718
-> typename std::enable_if<is_coordination<Coordination>::value,
1719-
decltype( rxs::from(std::move(cn), v0, vn...))>::type {
1720-
return rxs::from(std::move(cn), v0, vn...);
1719+
decltype( rxs::from(std::move(cn), std::forward<Value0>(v0), std::forward<ValueN>(vn)...))>::type {
1720+
return rxs::from(std::move(cn), std::forward<Value0>(v0), std::forward<ValueN>(vn)...);
17211721
}
17221722

17231723
/*! @copydoc rxcpp::sources::just(Value0 v0)
17241724
*/
17251725
template<class T>
1726-
static auto just(T v)
1727-
-> decltype(rxs::just(std::move(v))) {
1728-
return rxs::just(std::move(v));
1726+
static auto just(T&& v)
1727+
-> decltype(rxs::just(std::forward<T>(v))) {
1728+
return rxs::just(std::forward<T>(v));
17291729
}
17301730
/*! @copydoc rxcpp::sources::just(Value0 v0, Coordination cn)
17311731
*/
17321732
template<class T, class Coordination>
1733-
static auto just(T v, Coordination cn)
1734-
-> decltype(rxs::just(std::move(v), std::move(cn))) {
1735-
return rxs::just(std::move(v), std::move(cn));
1733+
static auto just(T&& v, Coordination cn)
1734+
-> decltype(rxs::just(std::forward<T>(v), std::move(cn))) {
1735+
return rxs::just(std::forward<T>(v), std::move(cn));
17361736
}
17371737

17381738
/*! @copydoc rxcpp::sources::start_with(Observable o, Value0 v0, ValueN... vn)
17391739
*/
17401740
template<class Observable, class Value0, class... ValueN>
1741-
static auto start_with(Observable o, Value0 v0, ValueN... vn)
1742-
-> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...)) {
1743-
return rxs::start_with(std::move(o), std::move(v0), std::move(vn)...);
1741+
static auto start_with(Observable o, Value0&& v0, ValueN&&... vn)
1742+
-> decltype(rxs::start_with(std::move(o), std::forward<Value0>(v0), std::forward<ValueN>(vn)...)) {
1743+
return rxs::start_with(std::move(o), std::forward<Value0>(v0), std::forward<ValueN>(vn)...);
17441744
}
17451745

17461746
/*! @copydoc rx-empty.hpp

Rx/v2/src/rxcpp/sources/rx-iterate.hpp

Lines changed: 57 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -52,39 +52,46 @@ struct is_iterable
5252
template<class Collection>
5353
struct iterate_traits
5454
{
55-
typedef rxu::decay_t<Collection> collection_type;
56-
typedef rxu::decay_t<decltype(std::begin(*(collection_type*)nullptr))> iterator_type;
57-
typedef rxu::value_type_t<std::iterator_traits<iterator_type>> value_type;
55+
// add const due to we don't want to modify original values!
56+
using collection_type = std::add_const_t<rxu::decay_t<Collection>>;
57+
using iterator_type = rxu::decay_t<decltype(std::begin(*(collection_type*)nullptr))>;
58+
using value_type = rxu::value_type_t<std::iterator_traits<iterator_type>>;
5859
};
5960

6061
template<class Collection, class Coordination>
6162
struct iterate : public source_base<rxu::value_type_t<iterate_traits<Collection>>>
6263
{
63-
typedef iterate<Collection, Coordination> this_type;
64-
typedef iterate_traits<Collection> traits;
64+
using this_type = iterate<Collection, Coordination>;
65+
using traits = iterate_traits<Collection>;
6566

66-
typedef rxu::decay_t<Coordination> coordination_type;
67-
typedef typename coordination_type::coordinator_type coordinator_type;
67+
using coordination_type = rxu::decay_t<Coordination>;
68+
using coordinator_type = typename coordination_type::coordinator_type;
6869

69-
typedef typename traits::collection_type collection_type;
70-
typedef typename traits::iterator_type iterator_type;
70+
using collection_type = typename traits::collection_type;
71+
using collection_type_ptr = std::shared_ptr<collection_type>;
72+
using iterator_type = typename traits::iterator_type;
7173

7274
struct iterate_initial_type
7375
{
74-
iterate_initial_type(collection_type c, coordination_type cn)
75-
: collection(std::move(c))
76-
, coordination(std::move(cn))
77-
{
78-
}
79-
collection_type collection;
80-
coordination_type coordination;
76+
iterate_initial_type(Collection&& c, coordination_type cn)
77+
: collection_ptr(std::make_shared<collection_type>(std::move(c)))
78+
, coordination(std::move(cn)) { }
79+
80+
iterate_initial_type(const Collection& c, coordination_type cn)
81+
: collection_ptr(std::make_shared<collection_type>(c))
82+
, coordination(std::move(cn)) { }
83+
84+
collection_type_ptr collection_ptr;
85+
coordination_type coordination;
8186
};
8287
iterate_initial_type initial;
8388

84-
iterate(collection_type c, coordination_type cn)
85-
: initial(std::move(c), std::move(cn))
86-
{
87-
}
89+
iterate(Collection&& c, coordination_type cn)
90+
: initial(std::move(c), std::move(cn)) { }
91+
92+
iterate(const Collection& c, coordination_type cn)
93+
: initial(c, std::move(cn)) { }
94+
8895
template<class Subscriber>
8996
void on_subscribe(Subscriber o) const {
9097
static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber");
@@ -96,18 +103,19 @@ struct iterate : public source_base<rxu::value_type_t<iterate_traits<Collection>
96103
{
97104
iterate_state_type(const iterate_initial_type& i, output_type o)
98105
: iterate_initial_type(i)
99-
, cursor(std::begin(iterate_initial_type::collection))
100-
, end(std::end(iterate_initial_type::collection))
106+
, cursor(std::begin(*iterate_initial_type::collection_ptr))
107+
, end(std::end(*iterate_initial_type::collection_ptr))
101108
, out(std::move(o))
102109
{
103110
}
104111
iterate_state_type(const iterate_state_type& o)
105112
: iterate_initial_type(o)
106-
, cursor(std::begin(iterate_initial_type::collection))
107-
, end(std::end(iterate_initial_type::collection))
113+
, cursor(std::begin(*iterate_initial_type::collection_ptr))
114+
, end(std::end(*iterate_initial_type::collection_ptr))
108115
, out(std::move(o.out)) // since lambda capture does not yet support move
109116
{
110117
}
118+
111119
mutable iterator_type cursor;
112120
iterator_type end;
113121
mutable output_type out;
@@ -157,18 +165,18 @@ struct iterate : public source_base<rxu::value_type_t<iterate_traits<Collection>
157165
/*! @copydoc rx-iterate.hpp
158166
*/
159167
template<class Collection>
160-
auto iterate(Collection c)
161-
-> observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, identity_one_worker>> {
162-
return observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, identity_one_worker>>(
163-
detail::iterate<Collection, identity_one_worker>(std::move(c), identity_immediate()));
168+
auto iterate(Collection&& c)
169+
-> observable<rxu::value_type_t<detail::iterate_traits<rxu::decay_t<Collection>>>, detail::iterate<rxu::decay_t<Collection>, identity_one_worker>> {
170+
return observable<rxu::value_type_t<detail::iterate_traits<rxu::decay_t<Collection>>>, detail::iterate<rxu::decay_t<Collection>, identity_one_worker>>(
171+
detail::iterate<rxu::decay_t<Collection>, identity_one_worker>(std::forward<Collection>(c), identity_immediate()));
164172
}
165173
/*! @copydoc rx-iterate.hpp
166174
*/
167175
template<class Collection, class Coordination>
168-
auto iterate(Collection c, Coordination cn)
169-
-> observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, Coordination>> {
170-
return observable<rxu::value_type_t<detail::iterate_traits<Collection>>, detail::iterate<Collection, Coordination>>(
171-
detail::iterate<Collection, Coordination>(std::move(c), std::move(cn)));
176+
auto iterate(Collection&& c, Coordination cn)
177+
-> observable<rxu::value_type_t<detail::iterate_traits<rxu::decay_t<Collection>>>, detail::iterate<rxu::decay_t<Collection>, Coordination>> {
178+
return observable<rxu::value_type_t<detail::iterate_traits<rxu::decay_t<Collection>>>, detail::iterate<rxu::decay_t<Collection>, Coordination>>(
179+
detail::iterate<rxu::decay_t<Collection>, Coordination>(std::forward<Collection>(c), std::move(cn)));
172180
}
173181

174182
/*! Returns an observable that sends an empty set of values and then completes.
@@ -218,10 +226,10 @@ auto from(Coordination cn)
218226
\note This operator is useful to send separated values. If they are stored as a collection, use observable<void,void>::iterate instead.
219227
*/
220228
template<class Value0, class... ValueN>
221-
auto from(Value0 v0, ValueN... vn)
222-
-> typename std::enable_if<!is_coordination<Value0>::value,
223-
decltype(iterate(*(std::array<Value0, sizeof...(ValueN) + 1>*)nullptr, identity_immediate()))>::type {
224-
std::array<Value0, sizeof...(ValueN) + 1> c{{v0, vn...}};
229+
auto from(Value0&& v0, ValueN&&... vn)
230+
-> typename std::enable_if<!is_coordination<rxu::decay_t<Value0>>::value,
231+
decltype(iterate(*(std::array<rxu::decay_t<Value0>, sizeof...(ValueN) + 1>*)nullptr, identity_immediate()))>::type {
232+
std::array<rxu::decay_t<Value0>, sizeof...(ValueN) + 1> c{{std::forward<Value0>(v0), std::forward<ValueN>(vn)...}};
225233
return iterate(std::move(c), identity_immediate());
226234
}
227235
/*! Returns an observable that sends each value from its arguments list, on the specified scheduler.
@@ -243,10 +251,10 @@ auto from(Value0 v0, ValueN... vn)
243251
\note This operator is useful to send separated values. If they are stored as a collection, use observable<void,void>::iterate instead.
244252
*/
245253
template<class Coordination, class Value0, class... ValueN>
246-
auto from(Coordination cn, Value0 v0, ValueN... vn)
254+
auto from(Coordination cn, Value0&& v0, ValueN&&... vn)
247255
-> typename std::enable_if<is_coordination<Coordination>::value,
248-
decltype(iterate(*(std::array<Value0, sizeof...(ValueN) + 1>*)nullptr, std::move(cn)))>::type {
249-
std::array<Value0, sizeof...(ValueN) + 1> c{{v0, vn...}};
256+
decltype(iterate(*(std::array<rxu::decay_t<Value0>, sizeof...(ValueN) + 1>*)nullptr, std::move(cn)))>::type {
257+
std::array<rxu::decay_t<Value0>, sizeof...(ValueN) + 1> c{{std::forward<Value0>(v0), std::forward<ValueN>(vn)...}};
250258
return iterate(std::move(c), std::move(cn));
251259
}
252260

@@ -264,10 +272,10 @@ auto from(Coordination cn, Value0 v0, ValueN... vn)
264272
\snippet output.txt just sample
265273
*/
266274
template<class Value0>
267-
auto just(Value0 v0)
268-
-> typename std::enable_if<!is_coordination<Value0>::value,
269-
decltype(iterate(*(std::array<Value0, 1>*)nullptr, identity_immediate()))>::type {
270-
std::array<Value0, 1> c{{v0}};
275+
auto just(Value0&& v0)
276+
-> typename std::enable_if<!is_coordination<rxu::decay_t<Value0>>::value,
277+
decltype(iterate(*(std::array<rxu::decay_t<Value0>, 1>*)nullptr, identity_immediate()))>::type {
278+
std::array<rxu::decay_t<Value0>, 1> c{{std::forward<Value0>(v0)}};
271279
return iterate(std::move(c), identity_immediate());
272280
}
273281
/*! Returns an observable that sends the specified item to observer and then completes, on the specified scheduler.
@@ -285,10 +293,10 @@ auto just(Value0 v0)
285293
\snippet output.txt threaded just sample
286294
*/
287295
template<class Value0, class Coordination>
288-
auto just(Value0 v0, Coordination cn)
296+
auto just(Value0&& v0, Coordination cn)
289297
-> typename std::enable_if<is_coordination<Coordination>::value,
290-
decltype(iterate(*(std::array<Value0, 1>*)nullptr, std::move(cn)))>::type {
291-
std::array<Value0, 1> c{{v0}};
298+
decltype(iterate(*(std::array<rxu::decay_t<Value0>, 1>*)nullptr, std::move(cn)))>::type {
299+
std::array<rxu::decay_t<Value0>, 1> c{{std::forward<Value0>(v0)}};
292300
return iterate(std::move(c), std::move(cn));
293301
}
294302

@@ -313,9 +321,9 @@ auto just(Value0 v0, Coordination cn)
313321
\snippet output.txt short start_with sample
314322
*/
315323
template<class Observable, class Value0, class... ValueN>
316-
auto start_with(Observable o, Value0 v0, ValueN... vn)
317-
-> decltype(from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o)) {
318-
return from(rxu::value_type_t<Observable>(v0), rxu::value_type_t<Observable>(vn)...).concat(o);
324+
auto start_with(Observable o, Value0&& v0, ValueN&&... vn)
325+
-> decltype(from(rxu::value_type_t<Observable>(std::forward<Value0>(v0)), rxu::value_type_t<Observable>(std::forward<ValueN>(vn))...).concat(o)) {
326+
return from(rxu::value_type_t<Observable>(std::forward<Value0>(v0)), rxu::value_type_t<Observable>(std::forward<ValueN>(vn))...).concat(o);
319327
}
320328

321329
}

Rx/v2/test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ set(TEST_SOURCES
2727
${TEST_DIR}/sources/defer.cpp
2828
${TEST_DIR}/sources/empty.cpp
2929
${TEST_DIR}/sources/interval.cpp
30+
${TEST_DIR}/sources/iterate.cpp
3031
${TEST_DIR}/sources/scope.cpp
3132
${TEST_DIR}/sources/timer.cpp
3233
${TEST_DIR}/operators/all.cpp

Rx/v2/test/sources/create.cpp

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,56 @@ SCENARIO("when observer::on_next is overridden", "[create][observer][sources]"){
8282
}
8383
}
8484
}
85+
86+
87+
SCENARIO("create doesn't provide copies", "[create][sources][copies]")
88+
{
89+
GIVEN("observable and subscriber")
90+
{
91+
auto empty_on_next = [](copy_verifier) {};
92+
auto sub = rx::make_observer<copy_verifier>(empty_on_next);
93+
copy_verifier verifier{};
94+
auto obs = rxcpp::observable<>::create<copy_verifier>([&verifier](rxcpp::subscriber<copy_verifier> sub)
95+
{
96+
sub.on_next(verifier);
97+
sub.on_completed();
98+
});
99+
100+
WHEN("subscribe")
101+
{
102+
obs.subscribe(sub);
103+
THEN("no extra copies")
104+
{
105+
// 1 copy to final lambda
106+
REQUIRE(verifier.get_copy_count() == 1);
107+
REQUIRE(verifier.get_move_count() == 0);
108+
}
109+
}
110+
}
111+
}
112+
113+
114+
SCENARIO("create doesn't provide copies for move", "[create][sources][copies]")
115+
{
116+
GIVEN("observable and subscriber")
117+
{
118+
auto empty_on_next = [](copy_verifier) {};
119+
auto sub = rx::make_observer<copy_verifier>(empty_on_next);
120+
copy_verifier verifier{};
121+
auto obs = rxcpp::observable<>::create<copy_verifier>([&verifier](rxcpp::subscriber<copy_verifier> sub)
122+
{
123+
sub.on_next(std::move(verifier));
124+
sub.on_completed();
125+
});
126+
WHEN("subscribe")
127+
{
128+
obs.subscribe(sub);
129+
THEN("no extra copies")
130+
{
131+
REQUIRE(verifier.get_copy_count() == 0);
132+
// 1 move to final lambda
133+
REQUIRE(verifier.get_move_count() == 1);
134+
}
135+
}
136+
}
137+
}

Rx/v2/test/sources/defer.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,61 @@ SCENARIO("defer stops on completion", "[defer][sources]"){
5757
}
5858
}
5959
}
60+
61+
SCENARIO("defer doesn't provide copies", "[defer][sources][copies]")
62+
{
63+
GIVEN("observable and subscriber")
64+
{
65+
auto empty_on_next = [](copy_verifier) {};
66+
auto sub = rx::make_observer<copy_verifier>(empty_on_next);
67+
copy_verifier verifier{};
68+
auto obs = rxcpp::observable<>::defer([&verifier]()
69+
{
70+
return rxcpp::observable<>::create<copy_verifier>([&verifier](rxcpp::subscriber<copy_verifier> sub)
71+
{
72+
sub.on_next(verifier);
73+
sub.on_completed();
74+
});;
75+
});
76+
77+
WHEN("subscribe")
78+
{
79+
obs.subscribe(sub);
80+
THEN("no extra copies")
81+
{
82+
// 1 copy to final lambda
83+
REQUIRE(verifier.get_copy_count() == 1);
84+
REQUIRE(verifier.get_move_count() == 0);
85+
}
86+
}
87+
}
88+
}
89+
90+
91+
SCENARIO("defer doesn't provide copies for move", "[defer][sources][copies]")
92+
{
93+
GIVEN("observable and subscriber")
94+
{
95+
auto empty_on_next = [](copy_verifier) {};
96+
auto sub = rx::make_observer<copy_verifier>(empty_on_next);
97+
copy_verifier verifier{};
98+
auto obs = rxcpp::observable<>::defer([&verifier]()
99+
{
100+
return rxcpp::observable<>::create<copy_verifier>([&verifier](rxcpp::subscriber<copy_verifier> sub)
101+
{
102+
sub.on_next(std::move(verifier));
103+
sub.on_completed();
104+
});;
105+
});
106+
WHEN("subscribe")
107+
{
108+
obs.subscribe(sub);
109+
THEN("no extra copies")
110+
{
111+
REQUIRE(verifier.get_copy_count() == 0);
112+
// 1 move to final lambda
113+
REQUIRE(verifier.get_move_count() == 1);
114+
}
115+
}
116+
}
117+
}

0 commit comments

Comments
 (0)