Skip to content

Commit b302343

Browse files
ktfdavidrohr
authored andcommitted
DPL: change ExpirationHandler::Creator to use services
This allows us to easily have some state attached to the creation of new timeslices and in particular we can then use the DecongestionService to keep track of the next enumeration to be created. This comes handy when we have dummy iterations which we want to "rewind" to avoid messages about empty timeslices.
1 parent c6eb51d commit b302343

4 files changed

Lines changed: 32 additions & 19 deletions

File tree

Framework/Core/include/Framework/ExpirationHandler.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ struct TimesliceSlot;
3030
struct InputRecord;
3131

3232
struct ExpirationHandler {
33-
using Creator = std::function<TimesliceSlot(ChannelIndex, TimesliceIndex&)>;
33+
using Creator = std::function<TimesliceSlot(ServiceRegistryRef, ChannelIndex)>;
3434
/// Callback type to check if the record must be expired
3535
using Checker = std::function<bool(ServiceRegistryRef, uint64_t timestamp, InputSpan const& record)>;
3636
/// Callback type to actually materialise a given record

Framework/Core/src/DataRelayer.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<Expira
125125
for (auto& handler : expirationHandlers) {
126126
LOGP(debug, "handler.creator for {}", handler.name);
127127
auto channelIndex = deviceProxy.getInputChannelIndex(handler.routeIndex);
128-
slotsCreatedByHandlers.push_back(handler.creator(channelIndex, mTimesliceIndex));
128+
slotsCreatedByHandlers.push_back(handler.creator(services, channelIndex));
129129
}
130130
}
131131
// Count how many slots are not invalid

Framework/Core/src/DecongestionService.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ namespace o2::framework
1818
struct DecongestionService {
1919
/// Wether we are a source in the processing chain
2020
bool isFirstInTopology = true;
21+
/// The last timeslice which the ExpirationHandler::Creator callback
22+
/// created. This can be used to skip dummy iterations.
23+
size_t nextEnumerationTimeslice = 0;
2124
/// Last timeslice we communicated. Notice this should never go backwards.
2225
int64_t lastTimeslice = 0;
2326
/// The next timeslice we should consume, when running in order,

Framework/Core/src/LifetimeHelpers.cxx

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
// granted to it by virtue of its status as an Intergovernmental Organization
1010
// or submit itself to any jurisdiction.
1111

12+
#include "DecongestionService.h"
1213
#include "Framework/DataProcessingHeader.h"
1314
#include "Framework/InputSpec.h"
1415
#include "Framework/LifetimeHelpers.h"
@@ -58,36 +59,42 @@ size_t getCurrentTime()
5859

5960
ExpirationHandler::Creator LifetimeHelpers::dataDrivenCreation()
6061
{
61-
return [](ChannelIndex, TimesliceIndex&) -> TimesliceSlot {
62+
return [](ServiceRegistryRef, ChannelIndex) -> TimesliceSlot {
6263
return {TimesliceSlot::ANY};
6364
};
6465
}
6566

6667
ExpirationHandler::Creator LifetimeHelpers::enumDrivenCreation(size_t start, size_t end, size_t step, size_t inputTimeslice, size_t maxInputTimeslices, size_t maxRepetitions)
6768
{
68-
auto last = std::make_shared<size_t>(start + inputTimeslice * step);
69+
size_t firstTimeslice = start + inputTimeslice * step;
6970
auto repetition = std::make_shared<size_t>(0);
7071

71-
return [end, step, last, maxInputTimeslices, maxRepetitions, repetition](ChannelIndex channelIndex, TimesliceIndex& index) -> TimesliceSlot {
72+
return [end, step, firstTimeslice, maxInputTimeslices, maxRepetitions, repetition](ServiceRegistryRef services, ChannelIndex channelIndex) -> TimesliceSlot {
73+
auto& index = services.get<TimesliceIndex>();
74+
auto& decongestion = services.get<DecongestionService>();
75+
if (decongestion.nextEnumerationTimeslice == 0) {
76+
decongestion.nextEnumerationTimeslice = firstTimeslice;
77+
}
78+
7279
for (size_t si = 0; si < index.size(); si++) {
73-
if (*last > end) {
80+
if (decongestion.nextEnumerationTimeslice > end) {
7481
LOGP(debug, "Last greater than end");
7582
return TimesliceSlot{TimesliceSlot::INVALID};
7683
}
7784
auto slot = TimesliceSlot{si};
7885
if (index.isValid(slot) == false) {
79-
TimesliceId timestamp{*last};
86+
TimesliceId timestamp{decongestion.nextEnumerationTimeslice};
8087
*repetition += 1;
8188
if (*repetition % maxRepetitions == 0) {
82-
*last += step * maxInputTimeslices;
89+
decongestion.nextEnumerationTimeslice += step * maxInputTimeslices;
8390
}
8491
LOGP(debug, "Associating timestamp {} to slot {}", timestamp.value, slot.index);
8592
index.associate(timestamp, slot);
8693
// We know that next association will bring in last
8794
// so we can state this will be the latest possible input for the channel
8895
// associated with this.
89-
LOG(debug) << "Oldest possible input is " << *last;
90-
auto newOldest = index.setOldestPossibleInput({*last}, channelIndex);
96+
LOG(debug) << "Oldest possible input is " << decongestion.nextEnumerationTimeslice;
97+
[[maybe_unused]] auto newOldest = index.setOldestPossibleInput({decongestion.nextEnumerationTimeslice}, channelIndex);
9198
index.updateOldestPossibleOutput();
9299
return slot;
93100
}
@@ -100,10 +107,9 @@ ExpirationHandler::Creator LifetimeHelpers::enumDrivenCreation(size_t start, siz
100107

101108
ExpirationHandler::Creator LifetimeHelpers::timeDrivenCreation(std::vector<std::chrono::microseconds> periods, std::vector<std::chrono::seconds> intervals, std::function<bool(void)> hasTimerFired, std::function<void(uint64_t, uint64_t)> updateTimerPeriod)
102109
{
103-
std::shared_ptr<size_t> last = std::make_shared<size_t>(0);
104110
std::shared_ptr<bool> stablePeriods = std::make_shared<bool>(false);
105111
// FIXME: should create timeslices when period expires....
106-
return [last, stablePeriods, periods, intervals, hasTimerFired, updateTimerPeriod](ChannelIndex channelIndex, TimesliceIndex& index) mutable -> TimesliceSlot {
112+
return [stablePeriods, periods, intervals, hasTimerFired, updateTimerPeriod](ServiceRegistryRef services, ChannelIndex channelIndex) mutable -> TimesliceSlot {
107113
// We start with a random offset to avoid all the devices
108114
// send their first message at the same time, bring down
109115
// the QC machine.
@@ -113,13 +119,16 @@ ExpirationHandler::Creator LifetimeHelpers::timeDrivenCreation(std::vector<std::
113119
// We do it here because if we do it in configure, long delays
114120
// between configure and run will cause this to behave
115121
// incorrectly.
122+
auto& index = services.get<TimesliceIndex>();
123+
auto& decongestion = services.get<DecongestionService>();
124+
116125
bool timerHasFired = hasTimerFired();
117-
if (*last == 0ULL || (index.didReceiveData() == false && timerHasFired)) {
126+
if (decongestion.nextEnumerationTimeslice == 0ULL || (index.didReceiveData() == false && timerHasFired)) {
118127
std::random_device r;
119128
std::default_random_engine e1(r());
120129
std::uniform_int_distribution<uint64_t> dist(0, periods.front().count() * 0.9);
121130
auto randomizedPeriodUs = static_cast<int64_t>(dist(e1) + periods.front().count() * 0.1);
122-
*last = getCurrentTime() - randomizedPeriodUs;
131+
decongestion.nextEnumerationTimeslice = getCurrentTime() - randomizedPeriodUs;
123132
updateTimerPeriod(randomizedPeriodUs / 1000, randomizedPeriodUs / 1000);
124133
*stablePeriods = false;
125134
LOG(debug) << "Timer updated to a randomized period of " << randomizedPeriodUs << "us";
@@ -130,7 +139,7 @@ ExpirationHandler::Creator LifetimeHelpers::timeDrivenCreation(std::vector<std::
130139
}
131140
// Nothing to do if the time has not expired yet.
132141
if (timerHasFired == false) {
133-
auto newOldest = index.setOldestPossibleInput({*last}, channelIndex);
142+
[[maybe_unused]] auto newOldest = index.setOldestPossibleInput({decongestion.nextEnumerationTimeslice}, channelIndex);
134143
index.updateOldestPossibleOutput();
135144
return TimesliceSlot{TimesliceSlot::INVALID};
136145
}
@@ -155,13 +164,13 @@ ExpirationHandler::Creator LifetimeHelpers::timeDrivenCreation(std::vector<std::
155164
}
156165
auto& variables = index.getVariablesForSlot(slot);
157166
if (VariableContextHelpers::getTimeslice(variables).value == current) {
158-
auto newOldest = index.setOldestPossibleInput({*last}, channelIndex);
167+
[[maybe_unused]] auto newOldest = index.setOldestPossibleInput({decongestion.nextEnumerationTimeslice}, channelIndex);
159168
index.updateOldestPossibleOutput();
160169
return TimesliceSlot{TimesliceSlot::INVALID};
161170
}
162171
}
163172

164-
*last = current;
173+
decongestion.nextEnumerationTimeslice = current;
165174
// If we are here the timer has expired and a new slice needs
166175
// to be created.
167176
data_matcher::VariableContext newContext;
@@ -179,7 +188,7 @@ ExpirationHandler::Creator LifetimeHelpers::timeDrivenCreation(std::vector<std::
179188
break;
180189
}
181190

182-
auto newOldest = index.setOldestPossibleInput({*last}, channelIndex);
191+
auto newOldest = index.setOldestPossibleInput({decongestion.nextEnumerationTimeslice}, channelIndex);
183192
index.updateOldestPossibleOutput();
184193
return slot;
185194
};
@@ -241,8 +250,9 @@ ExpirationHandler::Checker LifetimeHelpers::expireIfPresent(std::vector<InputRou
241250

242251
ExpirationHandler::Creator LifetimeHelpers::uvDrivenCreation(int requestedLoopReason, DeviceState& state)
243252
{
244-
return [requestedLoopReason, &state](ChannelIndex, TimesliceIndex& index) -> TimesliceSlot {
253+
return [requestedLoopReason, &state](ServiceRegistryRef services, ChannelIndex) -> TimesliceSlot {
245254
/// Not the expected loop reason, return an invalid slot.
255+
auto& index = services.get<TimesliceIndex>();
246256
if ((state.loopReason & requestedLoopReason) == 0) {
247257
LOGP(debug, "No expiration due to a loop event. Requested: {:b}, reported: {:b}, matching: {:b}",
248258
requestedLoopReason,

0 commit comments

Comments
 (0)