Skip to content

Commit e5fd3db

Browse files
committed
DPL: add proper tracing for DataProcessingDevice socket callbacks
1 parent dccea09 commit e5fd3db

1 file changed

Lines changed: 26 additions & 14 deletions

File tree

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535
#include "Framework/TMessageSerializer.h"
3636
#include "Framework/InputRecord.h"
3737
#include "Framework/InputSpan.h"
38+
#if defined(__APPLE__) || defined(NDEBUG)
39+
#define O2_SIGNPOST_IMPLEMENTATION
40+
#endif
3841
#include "Framework/Signpost.h"
3942
#include "Framework/TimingHelpers.h"
4043
#include "Framework/SourceInfoHeader.h"
@@ -80,6 +83,8 @@
8083
#include <sstream>
8184
#include <boost/property_tree/json_parser.hpp>
8285

86+
O2_DECLARE_DYNAMIC_LOG(device);
87+
8388
using namespace o2::framework;
8489
using ConfigurationInterface = o2::configuration::ConfigurationInterface;
8590
using DataHeader = o2::header::DataHeader;
@@ -274,21 +279,22 @@ struct PollerContext {
274279
void on_socket_polled(uv_poll_t* poller, int status, int events)
275280
{
276281
auto* context = (PollerContext*)poller->data;
282+
assert(context);
283+
O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
277284
context->state->loopReason |= DeviceState::DATA_SOCKET_POLLED;
278285
switch (events) {
279286
case UV_READABLE: {
280-
ZoneScopedN("socket readable event");
281-
LOG(debug) << "socket polled UV_READABLE: " << context->name;
287+
O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "Data pending on socket for channel %{public}s", context->name);
282288
context->state->loopReason |= DeviceState::DATA_INCOMING;
283289
} break;
284290
case UV_WRITABLE: {
285-
ZoneScopedN("socket writeable");
291+
O2_SIGNPOST_END(device, sid, "socket_state", "Socket connected for channel %{public}s", context->name);
286292
if (context->read) {
287-
LOG(debug) << "socket polled UV_CONNECT" << context->name;
293+
O2_SIGNPOST_START(device, sid, "socket_state", "Socket connected for read in context %{public}s", context->name);
288294
uv_poll_start(poller, UV_READABLE | UV_DISCONNECT | UV_PRIORITIZED, &on_socket_polled);
289295
context->state->loopReason |= DeviceState::DATA_CONNECTED;
290296
} else {
291-
LOG(debug) << "socket polled UV_WRITABLE" << context->name;
297+
O2_SIGNPOST_START(device, sid, "socket_state", "Socket connected for write for channel %{public}s", context->name);
292298
context->state->loopReason |= DeviceState::DATA_OUTGOING;
293299
// If the socket is writable, fairmq will handle the rest, so we can stop polling and
294300
// just wait for the disconnect.
@@ -297,12 +303,10 @@ void on_socket_polled(uv_poll_t* poller, int status, int events)
297303
context->pollerState = PollerContext::PollerState::Connected;
298304
} break;
299305
case UV_DISCONNECT: {
300-
ZoneScopedN("socket disconnect");
301-
LOG(debug) << "socket polled UV_DISCONNECT";
306+
O2_SIGNPOST_END(device, sid, "socket_state", "Socket disconnected in context %{public}s", context->name);
302307
} break;
303308
case UV_PRIORITIZED: {
304-
ZoneScopedN("socket prioritized");
305-
LOG(debug) << "socket polled UV_PRIORITIZED";
309+
O2_SIGNPOST_EVENT_EMIT(device, sid, "socket_state", "Data pending on socket for context %{public}s", context->name);
306310
} break;
307311
}
308312
// We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
@@ -873,15 +877,19 @@ void DataProcessingDevice::startPollers()
873877
auto& deviceContext = ref.get<DeviceContext>();
874878
auto& state = ref.get<DeviceState>();
875879

876-
for (auto& poller : state.activeInputPollers) {
880+
for (auto* poller : state.activeInputPollers) {
881+
O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
882+
O2_SIGNPOST_START(device, sid, "socket_state", "Input socket waiting for connection.");
877883
uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
878884
((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
879885
}
880886
for (auto& poller : state.activeOutOfBandPollers) {
881887
uv_poll_start(poller, UV_WRITABLE, &on_out_of_band_polled);
882888
((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
883889
}
884-
for (auto& poller : state.activeOutputPollers) {
890+
for (auto* poller : state.activeOutputPollers) {
891+
O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
892+
O2_SIGNPOST_START(device, sid, "socket_state", "Output socket waiting for connection.");
885893
uv_poll_start(poller, UV_WRITABLE, &on_socket_polled);
886894
((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Disconnected;
887895
}
@@ -897,17 +905,21 @@ void DataProcessingDevice::stopPollers()
897905
auto& deviceContext = ref.get<DeviceContext>();
898906
auto& state = ref.get<DeviceState>();
899907
LOGP(detail, "Stopping {} input pollers", state.activeInputPollers.size());
900-
for (auto& poller : state.activeInputPollers) {
908+
for (auto* poller : state.activeInputPollers) {
909+
O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
910+
O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
901911
uv_poll_stop(poller);
902912
((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
903913
}
904914
LOGP(detail, "Stopping {} out of band pollers", state.activeOutOfBandPollers.size());
905-
for (auto& poller : state.activeOutOfBandPollers) {
915+
for (auto* poller : state.activeOutOfBandPollers) {
906916
uv_poll_stop(poller);
907917
((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
908918
}
909919
LOGP(detail, "Stopping {} output pollers", state.activeOutOfBandPollers.size());
910-
for (auto& poller : state.activeOutputPollers) {
920+
for (auto* poller : state.activeOutputPollers) {
921+
O2_SIGNPOST_ID_FROM_POINTER(sid, device, poller);
922+
O2_SIGNPOST_END(device, sid, "socket_state", "Output socket closed.");
911923
uv_poll_stop(poller);
912924
((PollerContext*)poller->data)->pollerState = PollerContext::PollerState::Stopped;
913925
}

0 commit comments

Comments
 (0)