Skip to content

Commit 01b7ce7

Browse files
authored
DPL: fix dropped obsolete messages with faster rates (#5462)
This should reduce the amount of messages being dropped if the rate becomes to high.
1 parent 8d63de6 commit 01b7ce7

File tree

4 files changed

+89
-28
lines changed

4 files changed

+89
-28
lines changed

Framework/Core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ foreach(w
266266
Forwarding
267267
ParallelPipeline
268268
ParallelProducer
269+
SlowConsumer
269270
SimpleDataProcessingDevice01
270271
SimpleRDataFrameProcessing
271272
SimpleStatefulProcessing01

Framework/Core/include/Framework/ChannelInfo.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ enum struct InputChannelState {
3131
/// updated by Control or by the by the incoming flow of messages.
3232
struct InputChannelInfo {
3333
InputChannelState state = InputChannelState::Running;
34+
uint32_t hasPendingEvents = 0;
3435
};
3536

3637
} // namespace o2::framework

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -489,42 +489,44 @@ void DataProcessingDevice::doPrepare(DataProcessorContext& context)
489489
int64_t result = -2;
490490
auto& fairMQChannel = context.device->GetChannel(channel.name, 0);
491491
auto& socket = fairMQChannel.GetSocket();
492-
uint32_t events;
493-
socket.Events(&events);
494-
if ((events & 1) == 0) {
495-
continue;
492+
// If we have pending events from a previous iteration,
493+
// we do receive in any case.
494+
// Otherwise we check if there is any pending event and skip
495+
// this channel in case there is none.
496+
if (info.hasPendingEvents == 0) {
497+
socket.Events(&info.hasPendingEvents);
498+
// If we do not read, we can continue.
499+
if ((info.hasPendingEvents & 1) == 0) {
500+
continue;
501+
}
496502
}
497503
// Notice that there seems to be a difference between the documentation
498504
// of zeromq and the observed behavior. The fact that ZMQ_POLLIN
499505
// is raised does not mean that a message is immediately available to
500506
// read, just that it will be available soon, so the receive can
501507
// still return -2. To avoid this we keep receiving on the socket until
502-
// we get a message, consume all the consecutive messages, and then go back
503-
// to the usual loop.
504-
do {
505-
if (events & 1) {
506-
bool oneMessage = false;
507-
while (true) {
508-
FairMQParts parts;
509-
result = fairMQChannel.Receive(parts, 0);
510-
if (result >= 0) {
511-
// Receiving data counts as activity now, so that
512-
// We can make sure we process all the pending
513-
// messages without hanging on the uv_run.
514-
*context.wasActive = true;
515-
DataProcessingDevice::handleData(context, parts, info);
516-
oneMessage = true;
517-
} else {
518-
if (oneMessage) {
519-
break;
520-
}
521-
}
522-
}
523-
} else {
508+
// we get a message. In order not to overflow the DPL queue we process
509+
// one message at the time and we keep track of wether there were more
510+
// to process.
511+
while (true) {
512+
FairMQParts parts;
513+
result = fairMQChannel.Receive(parts, 0);
514+
if (result >= 0) {
515+
DataProcessingDevice::handleData(context, parts, info);
516+
// Receiving data counts as activity now, so that
517+
// We can make sure we process all the pending
518+
// messages without hanging on the uv_run.
524519
break;
525520
}
526-
socket.Events(&events);
527-
} while (events & 1);
521+
}
522+
// We check once again for pending events, keeping track if this was the
523+
// case so that we can immediately repeat this loop and avoid remaining
524+
// stuck in uv_run. This is because we will not get notified on the socket
525+
// if more events are pending due to zeromq level triggered approach.
526+
socket.Events(&info.hasPendingEvents);
527+
if (info.hasPendingEvents) {
528+
*context.wasActive |= true;
529+
}
528530
}
529531
}
530532

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
#include "Framework/ConfigParamSpec.h"
11+
#include "Framework/CompletionPolicyHelpers.h"
12+
#include "Framework/DeviceSpec.h"
13+
#include "Framework/RawDeviceService.h"
14+
#include "Framework/ControlService.h"
15+
#include <FairMQDevice.h>
16+
#include <InfoLogger/InfoLogger.hxx>
17+
18+
#include <chrono>
19+
#include <thread>
20+
#include <vector>
21+
22+
#include "Framework/runDataProcessing.h"
23+
using namespace o2::framework;
24+
25+
// This is how you can define your processing in a declarative way
26+
WorkflowSpec defineDataProcessing(ConfigContext const& specs)
27+
{
28+
return WorkflowSpec{
29+
{"A",
30+
Inputs{},
31+
{OutputSpec{{"a"}, "TST", "A"}},
32+
AlgorithmSpec{adaptStateful([]() { return adaptStateless(
33+
[](DataAllocator& outputs, RawDeviceService& device, ControlService& control) {
34+
static int count = 0;
35+
auto& aData = outputs.make<int>(OutputRef{"a"});
36+
LOG(info) << count;
37+
aData = count++;
38+
if (count > 3000) {
39+
control.endOfStream();
40+
control.readyToQuit(QuitRequest::Me);
41+
}
42+
}); })}},
43+
{"B",
44+
{InputSpec{"x", "TST", "A", Lifetime::Timeframe}},
45+
{},
46+
AlgorithmSpec{adaptStateful([]() { return adaptStateless(
47+
[](InputRecord& inputs, RawDeviceService& device, ControlService& control) {
48+
static int expected = 0;
49+
device.device()->WaitFor(std::chrono::milliseconds(3));
50+
auto& count = inputs.get<int>("x");
51+
if (expected != count) {
52+
LOGP(ERROR, "Missing message. Expected: {}, Found {}.", expected, count);
53+
control.readyToQuit(QuitRequest::All);
54+
}
55+
expected++;
56+
}); })}}};
57+
}

0 commit comments

Comments
 (0)