Skip to content
This repository was archived by the owner on Mar 25, 2025. It is now read-only.

Commit ddd6b56

Browse files
committed
This commit contains a bunch of minor changes.
First it allows a ClientProfile to be updated with new reservation, weight, and limit values by adding an update function. Second it adds an ability to invoke callbacks when a ClientInfo object is removed due to the idle timeout. Testing this functionality has been added to the unit tests. Third we add the ability to clear a heap to help with a more controlled tear-down. Finally, dmclock-servers "cleaning" has been renamed "idle erase" to better indicate the role. Types and variable names have been adjusted accordingly. Signed-off-by: J. Eric Ivancich <ivancich@redhat.com>
1 parent effea49 commit ddd6b56

File tree

3 files changed

+72
-37
lines changed

3 files changed

+72
-37
lines changed

src/dmclock_server.h

Lines changed: 55 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,18 @@ namespace crimson {
8484
double limit_inv;
8585

8686
// order parameters -- min, "normal", max
87-
ClientInfo(double _reservation, double _weight, double _limit) :
88-
reservation(_reservation),
89-
weight(_weight),
90-
limit(_limit),
91-
reservation_inv(0.0 == reservation ? 0.0 : 1.0 / reservation),
92-
weight_inv( 0.0 == weight ? 0.0 : 1.0 / weight),
93-
limit_inv( 0.0 == limit ? 0.0 : 1.0 / limit)
94-
{
95-
// empty
87+
ClientInfo(double _reservation, double _weight, double _limit) {
88+
update(_reservation, _weight, _limit);
9689
}
9790

91+
inline void update(double _reservation, double _weight, double _limit) {
92+
reservation = _reservation;
93+
weight = _weight;
94+
limit = _limit;
95+
reservation_inv = (0.0 == reservation) ? 0.0 : 1.0 / reservation;
96+
weight_inv = (0.0 == weight) ? 0.0 : 1.0 / weight;
97+
limit_inv = (0.0 == limit) ? 0.0 : 1.0 / limit;
98+
}
9899

99100
friend std::ostream& operator<<(std::ostream& out,
100101
const ClientInfo& client) {
@@ -492,6 +493,12 @@ namespace crimson {
492493
// a function that can be called to look up client information
493494
using ClientInfoFunc = std::function<const ClientInfo*(const C&)>;
494495

496+
// a function that can be called when dmclock decides to idle a
497+
// client and remove its information; if the owner's
498+
// ClientInfoFunc is maintaining information about the client,
499+
// that information can be removed
500+
using IdleEraseListener = std::function<void(const C&)>;
501+
495502

496503
bool empty() const {
497504
DataGuard g(data_mtx);
@@ -708,6 +715,7 @@ namespace crimson {
708715
};
709716

710717
ClientInfoFunc client_info_f;
718+
IdleEraseListener idle_erase_f;
711719
static constexpr bool is_dynamic_cli_info_f = U1;
712720

713721
mutable std::mutex data_mtx;
@@ -766,12 +774,11 @@ namespace crimson {
766774
Duration idle_age;
767775
Duration erase_age;
768776
Duration check_time;
769-
std::deque<MarkPoint> clean_mark_points;
777+
std::deque<MarkPoint> idle_erase_mark_points;
770778

771779
// NB: All threads declared at end, so they're destructed first!
772780

773-
std::unique_ptr<RunEvery> cleaning_job;
774-
781+
std::unique_ptr<RunEvery> idle_erase_job;
775782

776783
// COMMON constructor that others feed into; we can accept three
777784
// different variations of durations
@@ -781,8 +788,10 @@ namespace crimson {
781788
std::chrono::duration<Rep,Per> _erase_age,
782789
std::chrono::duration<Rep,Per> _check_time,
783790
bool _allow_limit_break,
784-
double _anticipation_timeout) :
791+
double _anticipation_timeout,
792+
IdleEraseListener _idle_erase_f) :
785793
client_info_f(_client_info_f),
794+
idle_erase_f(_idle_erase_f),
786795
allow_limit_break(_allow_limit_break),
787796
anticipation_timeout(_anticipation_timeout),
788797
finishing(false),
@@ -792,10 +801,10 @@ namespace crimson {
792801
{
793802
assert(_erase_age >= _idle_age);
794803
assert(_check_time < _idle_age);
795-
cleaning_job =
804+
idle_erase_job =
796805
std::unique_ptr<RunEvery>(
797806
new RunEvery(check_time,
798-
std::bind(&PriorityQueueBase::do_clean, this)));
807+
std::bind(&PriorityQueueBase::do_idle_erase, this)));
799808
}
800809

801810

@@ -1016,8 +1025,8 @@ namespace crimson {
10161025
void reduce_reservation_tags(const C& client_id) {
10171026
auto client_it = client_map.find(client_id);
10181027

1019-
// means the client was cleaned from map; should never happen
1020-
// as long as cleaning times are long enough
1028+
// means the client was idle-erased from map; should never
1029+
// happen as long as idle erase times are long enough
10211030
assert(client_map.end() != client_it);
10221031
reduce_reservation_tags(*client_it->second);
10231032
}
@@ -1111,27 +1120,27 @@ namespace crimson {
11111120
* This is being called regularly by RunEvery. Every time it's
11121121
* called it notes the time and delta counter (mark point) in a
11131122
* deque. It also looks at the deque to find the most recent
1114-
* mark point that is older than clean_age. It then walks the
1115-
* map and delete all server entries that were last used before
1116-
* that mark point.
1123+
* mark point that is older than idle-erase age. It then walks
1124+
* the map and delete all server entries that were last used
1125+
* before that mark point.
11171126
*/
1118-
void do_clean() {
1127+
void do_idle_erase() {
11191128
TimePoint now = std::chrono::steady_clock::now();
11201129
DataGuard g(data_mtx);
1121-
clean_mark_points.emplace_back(MarkPoint(now, tick));
1130+
idle_erase_mark_points.emplace_back(MarkPoint(now, tick));
11221131

11231132
// first erase the super-old client records
11241133

11251134
Counter erase_point = 0;
1126-
auto point = clean_mark_points.front();
1135+
auto point = idle_erase_mark_points.front();
11271136
while (point.first <= now - erase_age) {
11281137
erase_point = point.second;
1129-
clean_mark_points.pop_front();
1130-
point = clean_mark_points.front();
1138+
idle_erase_mark_points.pop_front();
1139+
point = idle_erase_mark_points.front();
11311140
}
11321141

11331142
Counter idle_point = 0;
1134-
for (auto i : clean_mark_points) {
1143+
for (auto i : idle_erase_mark_points) {
11351144
if (i.first <= now - idle_age) {
11361145
idle_point = i.second;
11371146
} else {
@@ -1143,14 +1152,17 @@ namespace crimson {
11431152
for (auto i = client_map.begin(); i != client_map.end(); /* empty */) {
11441153
auto i2 = i++;
11451154
if (erase_point && i2->second->last_tick <= erase_point) {
1155+
if (nullptr != idle_erase_f) {
1156+
idle_erase_f(i2->second->client);
1157+
}
11461158
delete_from_heaps(i2->second);
11471159
client_map.erase(i2);
11481160
} else if (idle_point && i2->second->last_tick <= idle_point) {
11491161
i2->second->idle = true;
11501162
}
11511163
} // for
11521164
} // if
1153-
} // do_clean
1165+
} // do_idle_erase
11541166

11551167

11561168
// data_mtx must be held by caller
@@ -1215,10 +1227,13 @@ namespace crimson {
12151227
std::chrono::duration<Rep,Per> _erase_age,
12161228
std::chrono::duration<Rep,Per> _check_time,
12171229
bool _allow_limit_break = false,
1218-
double _anticipation_timeout = 0.0) :
1230+
double _anticipation_timeout = 0.0,
1231+
typename super::IdleEraseListener _idle_erase_f = nullptr) :
12191232
super(_client_info_f,
12201233
_idle_age, _erase_age, _check_time,
1221-
_allow_limit_break, _anticipation_timeout)
1234+
_allow_limit_break, _anticipation_timeout,
1235+
_idle_erase_f)
1236+
12221237
{
12231238
// empty
12241239
}
@@ -1227,13 +1242,15 @@ namespace crimson {
12271242
// pull convenience constructor
12281243
PullPriorityQueue(typename super::ClientInfoFunc _client_info_f,
12291244
bool _allow_limit_break = false,
1230-
double _anticipation_timeout = 0.0) :
1245+
double _anticipation_timeout = 0.0,
1246+
typename super::IdleEraseListener _idle_erase_f = nullptr) :
12311247
PullPriorityQueue(_client_info_f,
12321248
std::chrono::minutes(10),
12331249
std::chrono::minutes(15),
12341250
std::chrono::minutes(6),
12351251
_allow_limit_break,
1236-
_anticipation_timeout)
1252+
_anticipation_timeout,
1253+
_idle_erase_f)
12371254
{
12381255
// empty
12391256
}
@@ -1446,10 +1463,11 @@ namespace crimson {
14461463
std::chrono::duration<Rep,Per> _erase_age,
14471464
std::chrono::duration<Rep,Per> _check_time,
14481465
bool _allow_limit_break = false,
1449-
double anticipation_timeout = 0.0) :
1466+
double anticipation_timeout = 0.0,
1467+
typename super::IdleEraseListener _idle_erase_f = nullptr) :
14501468
super(_client_info_f,
14511469
_idle_age, _erase_age, _check_time,
1452-
_allow_limit_break, anticipation_timeout)
1470+
_allow_limit_break, anticipation_timeout, _idle_erase_f)
14531471
{
14541472
can_handle_f = _can_handle_f;
14551473
handle_f = _handle_f;
@@ -1462,15 +1480,17 @@ namespace crimson {
14621480
CanHandleRequestFunc _can_handle_f,
14631481
HandleRequestFunc _handle_f,
14641482
bool _allow_limit_break = false,
1465-
double _anticipation_timeout = 0.0) :
1483+
double _anticipation_timeout = 0.0,
1484+
typename super::IdleEraseListener _idle_erase_f = nullptr) :
14661485
PushPriorityQueue(_client_info_f,
14671486
_can_handle_f,
14681487
_handle_f,
14691488
std::chrono::minutes(10),
14701489
std::chrono::minutes(15),
14711490
std::chrono::minutes(6),
14721491
_allow_limit_break,
1473-
_anticipation_timeout)
1492+
_anticipation_timeout,
1493+
_idle_erase_f)
14741494
{
14751495
// empty
14761496
}
@@ -1482,7 +1502,6 @@ namespace crimson {
14821502
sched_ahead_thd.join();
14831503
}
14841504

1485-
public:
14861505

14871506
inline void add_request(R&& request,
14881507
const C& client_id,

support/src/indirect_intrusive_heap.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,8 @@ namespace crimson {
227227

228228
bool empty() const { return 0 == count; }
229229

230+
void clear() { data.clear(); count = 0; }
231+
230232
size_t size() const { return (size_t) count; }
231233

232234
T& top() { return *data[0]; }

test/test_dmclock_server.cc

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <iostream>
1919
#include <list>
2020
#include <vector>
21+
#include <atomic>
2122

2223

2324
#include "dmclock_server.h"
@@ -97,6 +98,7 @@ namespace crimson {
9798
using Queue = dmc::PushPriorityQueue<ClientId,Request>;
9899
int client = 17;
99100
double reservation = 100.0;
101+
std::atomic<std::uint32_t> idle_erase_counter(0u);
100102

101103
dmc::ClientInfo ci(reservation, 1.0, 0.0);
102104
auto client_info_f = [&] (ClientId c) -> const dmc::ClientInfo* {
@@ -109,14 +111,19 @@ namespace crimson {
109111
uint64_t req_cost) {
110112
// empty; do nothing
111113
};
114+
auto idle_erase_listener_f = [&idle_erase_counter](const ClientId& c) {
115+
idle_erase_counter++;
116+
};
112117

113118
Queue pq(client_info_f,
114119
server_ready_f,
115120
submit_req_f,
116121
std::chrono::seconds(3),
117122
std::chrono::seconds(5),
118123
std::chrono::seconds(2),
119-
false);
124+
false,
125+
0.0,
126+
idle_erase_listener_f);
120127

121128
auto lock_pq = [&](std::function<void()> code) {
122129
test_locked(pq.data_mtx, code);
@@ -171,12 +178,19 @@ namespace crimson {
171178
"after idle age client map entry shows idle.";
172179
});
173180

181+
EXPECT_EQ(0u, idle_erase_counter) <<
182+
"idle erase counter is still 0 since client has not yet been "
183+
"idle-erased";
184+
174185
std::this_thread::sleep_for(std::chrono::seconds(2));
175186

176187
lock_pq([&] () {
177188
EXPECT_EQ(0u, pq.client_map.size()) <<
178189
"client map loses its entry after erase age";
179190
});
191+
192+
EXPECT_EQ(1u, idle_erase_counter) <<
193+
"idle erase counter is now 1 since client has been idle-erased";
180194
} // TEST
181195

182196

0 commit comments

Comments
 (0)