Skip to content

Commit 4869000

Browse files
committed
DPL use Signposts to debug consumeWhenAll
1 parent e5fd3db commit 4869000

1 file changed

Lines changed: 10 additions & 1 deletion

File tree

Framework/Core/src/CompletionPolicyHelpers.cxx

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
#include "Framework/TimesliceIndex.h"
1919
#include "Framework/TimingInfo.h"
2020
#include "DecongestionService.h"
21+
#include "Framework/Signpost.h"
2122

2223
#include <cassert>
2324
#include <regex>
2425

26+
O2_DECLARE_DYNAMIC_LOG(completion);
27+
2528
namespace o2::framework
2629
{
2730

@@ -108,6 +111,8 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAll(const char* name, Compl
108111
{
109112
auto callback = [](InputSpan const& inputs, std::vector<InputSpec> const& specs, ServiceRegistryRef& ref) -> CompletionPolicy::CompletionOp {
110113
assert(inputs.size() == specs.size());
114+
O2_SIGNPOST_ID_GENERATE(sid, completion);
115+
O2_SIGNPOST_START(completion, sid, "consumeWhenAll", "Completion policy invoked");
111116

112117
size_t si = 0;
113118
bool missingSporadic = false;
@@ -117,15 +122,18 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAll(const char* name, Compl
117122
assert(si < specs.size());
118123
auto& spec = specs[si++];
119124
if (input.header == nullptr && spec.lifetime != Lifetime::Sporadic) {
125+
O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s due to missing input %lu", "Wait", si);
120126
return CompletionPolicy::CompletionOp::Wait;
121127
}
122128
if (input.header == nullptr && spec.lifetime == Lifetime::Sporadic) {
129+
O2_SIGNPOST_EVENT_EMIT(completion, sid, "consumeWhenAll", "Missing sporadic found for route index %lu", si);
123130
missingSporadic = true;
124131
}
125132
if (input.header != nullptr && currentTimeslice == -1) {
126133
auto const* dph = framework::DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(input);
127134
if (dph && !TimingInfo::timesliceIsTimer(dph->startTime)) {
128135
currentTimeslice = dph->startTime;
136+
O2_SIGNPOST_EVENT_EMIT(completion, sid, "consumeWhenAll", "currentTimeslice %lu from route index %lu", currentTimeslice, si);
129137
}
130138
}
131139
if (input.header != nullptr && spec.lifetime != Lifetime::Condition) {
@@ -138,9 +146,10 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAll(const char* name, Compl
138146
auto oldestPossibleTimeslice = timesliceIndex.getOldestPossibleInput().timeslice.value;
139147

140148
if (missingSporadic && currentTimeslice >= oldestPossibleTimeslice) {
149+
O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s for timeslice %lu > oldestPossibleTimeslice %lu", "Retry", currentTimeslice, oldestPossibleTimeslice);
141150
return CompletionPolicy::CompletionOp::Retry;
142151
}
143-
// We only consume if we have something which needs processing.
152+
O2_SIGNPOST_END(completion, sid, "consumeWhenAll", "Completion policy returned %{public}s for timeslice %lu <= oldestPossibleTimeslice %lu", needsProcessing ? "Consume" : "Discard", currentTimeslice, oldestPossibleTimeslice);
144153
return needsProcessing ? CompletionPolicy::CompletionOp::Consume : CompletionPolicy::CompletionOp::Discard;
145154
};
146155
return CompletionPolicy{name, matcher, callback};

0 commit comments

Comments
 (0)