Skip to content

Commit 9a0d111

Browse files
committed
feat: initial packet-stats implementation
1 parent e5b4e51 commit 9a0d111

File tree

5 files changed

+205
-15
lines changed

5 files changed

+205
-15
lines changed

src/main.cc

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ Tools to process ARIB TS streams.
7272
[--pre-streaming] [<file>]
7373
mirakc-arib filter-program-metadata [--sid=<sid>] [<file>]
7474
mirakc-arib record-service --sid=<sid> --file=<file>
75-
--chunk-size=<bytes> --num-chunks=<num> [--start-pos=<pos>] [<file>]
75+
--chunk-size=<bytes> --num-chunks=<num> [--start-pos=<pos>] [--packet-stats] [<file>]
7676
mirakc-arib track-airtime --sid=<sid> --eid=<eid> [<file>]
7777
mirakc-arib seek-start --sid=<sid>
7878
[--max-duration=<ms>] [--max-packets=<num>] [<file>]
@@ -629,7 +629,7 @@ Record a service stream into a ring buffer file
629629
630630
Usage:
631631
mirakc-arib record-service --sid=<sid> --file=<file>
632-
--chunk-size=<bytes> --num-chunks=<num> [--start-pos=<pos>] [<file>]
632+
--chunk-size=<bytes> --num-chunks=<num> [--start-pos=<pos>] [--packet-stats] [<file>]
633633
634634
Options:
635635
-h --help
@@ -651,6 +651,9 @@ Record a service stream into a ring buffer file
651651
--start-pos=<pos> [default: 0]
652652
A file position to start recoring.
653653
The value must be a multiple of the chunk size.
654+
655+
--packet-stats
656+
Enables statistics on TS packets.
654657
655658
Arguments:
656659
<file>
@@ -1221,6 +1224,7 @@ void LoadOption(const Args& args, ServiceRecorderOption* opt) {
12211224
static const std::string kChunkSize = "--chunk-size";
12221225
static const std::string kNumChunks = "--num-chunks";
12231226
static const std::string kStartPos = "--start-pos";
1227+
static const std::string kPacketStats = "--packet-stats";
12241228

12251229
opt->sid = static_cast<uint16_t>(args.at(kSid).asLong());
12261230
opt->file = args.at(kFile).asString();
@@ -1258,9 +1262,13 @@ void LoadOption(const Args& args, ServiceRecorderOption* opt) {
12581262
std::abort();
12591263
}
12601264
}
1265+
if (args.at(kPacketStats).asBool()) {
1266+
opt->packet_stats = true;
1267+
}
12611268
MIRAKC_ARIB_INFO(
1262-
"ServiceRecorderOptions: sid={:04X} file={} chunk-size={} num-chunks={} start-pos={}",
1263-
opt->sid, opt->file, opt->chunk_size, opt->num_chunks, opt->start_pos);
1269+
"ServiceRecorderOptions: sid={:04X} file={} chunk-size={} num-chunks={} start-pos={} "
1270+
"packet-stats={}",
1271+
opt->sid, opt->file, opt->chunk_size, opt->num_chunks, opt->start_pos, opt->packet_stats);
12641272
}
12651273

12661274
void LoadOption(const Args& args, AirtimeTrackerOption* opt) {

src/packet_stats_collector.hh

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
#pragma once
2+
3+
#include <string.h>
4+
#include <array>
5+
#include <cstdint>
6+
#include "base.hh"
7+
8+
namespace {
9+
10+
class PacketStatsCollector final {
11+
public:
12+
PacketStatsCollector() {}
13+
~PacketStatsCollector() {}
14+
15+
void CollectPacketStats(const ts::TSPacket& packet) {
16+
auto pid = packet.getPID();
17+
auto last_cc = stats[pid].last_cc;
18+
auto cc = packet.getCC();
19+
auto has_payload = packet.hasPayload();
20+
auto tei = packet.getTEI();
21+
stats[pid].last_cc = cc;
22+
stats[pid].last_packet = packet;
23+
24+
if (packet.getScrambling()) {
25+
++scrambled_packets_;
26+
}
27+
28+
auto duplicate_found = false;
29+
30+
if (packet.getDiscontinuityIndicator() || packet.getPID() == ts::PID_NULL) {
31+
// do nothing
32+
} else if (tei) {
33+
++error_packets_;
34+
} else if (last_cc != ts::INVALID_CC) {
35+
if (has_payload) {
36+
if (last_cc == cc) {
37+
if (stats[pid].last_packet != packet) {
38+
// non-duplicate packet
39+
++dropped_packets_;
40+
} else {
41+
// duplicate packet
42+
duplicate_found = true;
43+
}
44+
} else {
45+
// regular packet
46+
uint8_t expectedCC = (last_cc + 1) & ts::CC_MASK;
47+
if (expectedCC != cc) {
48+
++dropped_packets_;
49+
}
50+
}
51+
} else {
52+
// Continuity counter should not increment if packet has no payload
53+
if (last_cc != cc) {
54+
++dropped_packets_;
55+
}
56+
}
57+
}
58+
59+
if (duplicate_found) {
60+
// duplicate packet is only allowed once
61+
++stats[pid].duplicate_packets;
62+
if (stats[pid].duplicate_packets > 1) {
63+
++dropped_packets_;
64+
}
65+
} else {
66+
stats[pid].duplicate_packets = 0;
67+
}
68+
}
69+
70+
void ResetPacketStats() {
71+
error_packets_ = 0;
72+
dropped_packets_ = 0;
73+
scrambled_packets_ = 0;
74+
}
75+
76+
uint64_t GetErrorPackets() const {
77+
return error_packets_;
78+
}
79+
80+
uint64_t GetDroppedPackets() const {
81+
return dropped_packets_;
82+
}
83+
84+
uint64_t GetScrambledPackets() const {
85+
return scrambled_packets_;
86+
}
87+
88+
private:
89+
struct PacketStat {
90+
uint8_t last_cc = ts::INVALID_CC;
91+
uint8_t duplicate_packets = 0;
92+
ts::TSPacket last_packet;
93+
};
94+
95+
MIRAKC_ARIB_NON_COPYABLE(PacketStatsCollector);
96+
std::array<PacketStat, ts::PID_MAX> stats;
97+
uint64_t error_packets_ = 0;
98+
uint64_t dropped_packets_ = 0;
99+
uint64_t scrambled_packets_ = 0;
100+
};
101+
102+
} // namespace

src/service_recorder.hh

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "jsonl_source.hh"
1313
#include "logging.hh"
1414
#include "packet_sink.hh"
15+
#include "packet_stats_collector.hh"
1516
#include "tsduck_helper.hh"
1617

1718
#define MIRAKC_ARIB_SERVICE_RECORDER_TRACE(...) MIRAKC_ARIB_TRACE("service-recorder: " __VA_ARGS__)
@@ -28,6 +29,7 @@ struct ServiceRecorderOption final {
2829
size_t chunk_size = 0;
2930
size_t num_chunks = 0;
3031
uint64_t start_pos = 0;
32+
bool packet_stats = false;
3133
};
3234

3335
class ServiceRecorder final : public PacketSink,
@@ -91,6 +93,10 @@ class ServiceRecorder final : public PacketSink,
9193

9294
demux_.feedPacket(packet);
9395

96+
if (option_.packet_stats) {
97+
packet_stats_collector_.CollectPacketStats(packet);
98+
}
99+
94100
switch (state_) {
95101
case State::kPreparing:
96102
return OnPreparing(packet);
@@ -113,6 +119,7 @@ class ServiceRecorder final : public PacketSink,
113119
// The application may purge expired programs in the message handler for
114120
// the `chunk` message. So, the program data must be updated before that.
115121
SendEventUpdateMessage(eit_, now, pos);
122+
SendPacketStatsMessage();
116123
SendChunkMessage(now, pos);
117124
}
118125

@@ -310,17 +317,15 @@ class ServiceRecorder final : public PacketSink,
310317
if (event_changed) {
311318
MIRAKC_ARIB_SERVICE_RECORDER_WARN("Event#{:04X} has started before Event#{:04X} ends",
312319
GetEvent(new_eit).event_id, GetEvent(eit).event_id);
313-
UpdateEventBoundary(now, sink_->pos());
314-
SendEventEndMessage(eit);
320+
HandleEventEnd(now, eit);
315321
SendEventStartMessage(new_eit);
316322
} else {
317323
if (IsUnspecifiedEventEndTime(GetEvent(eit))) {
318324
// Continue recording as the current program until the event changes.
319325
} else {
320326
auto end_time = GetEventEndTime(GetEvent(eit));
321327
if (now >= end_time) {
322-
UpdateEventBoundary(end_time, sink_->pos());
323-
SendEventEndMessage(eit);
328+
HandleEventEnd(end_time, eit);
324329
event_started_ = false; // wait for new event
325330
}
326331
}
@@ -340,6 +345,12 @@ class ServiceRecorder final : public PacketSink,
340345
event_boundary_pos_ = pos;
341346
}
342347

348+
void HandleEventEnd(const ts::Time& endTime, const std::shared_ptr<ts::EIT>& eit) {
349+
UpdateEventBoundary(endTime, sink_->pos());
350+
SendPacketStatsMessage();
351+
SendEventEndMessage(eit);
352+
}
353+
343354
void SendStartMessage() {
344355
MIRAKC_ARIB_SERVICE_RECORDER_INFO("Started recording SID#{:04X}", option_.sid);
345356

@@ -411,6 +422,32 @@ class ServiceRecorder final : public PacketSink,
411422
SendEventMessage("event-end", eit, event_boundary_time_, event_boundary_pos_);
412423
}
413424

425+
void SendPacketStatsMessage() {
426+
if (!option_.packet_stats) {
427+
return;
428+
}
429+
430+
auto error_packets = packet_stats_collector_.GetErrorPackets();
431+
auto dropped_packets = packet_stats_collector_.GetDroppedPackets();
432+
auto scrambled_packets = packet_stats_collector_.GetScrambledPackets();
433+
MIRAKC_ARIB_SERVICE_RECORDER_INFO("PacketStats: Error: {}, Dropped {}, Scrambled: {}",
434+
error_packets, dropped_packets, scrambled_packets);
435+
436+
rapidjson::Document doc(rapidjson::kObjectType);
437+
auto& allocator = doc.GetAllocator();
438+
439+
rapidjson::Value data(rapidjson::kObjectType);
440+
data.AddMember("errorPackets", error_packets, allocator);
441+
data.AddMember("droppedPackets", dropped_packets, allocator);
442+
data.AddMember("scrambledPackets", scrambled_packets, allocator);
443+
444+
doc.AddMember("type", "packet-stats", allocator);
445+
doc.AddMember("data", data, allocator);
446+
447+
FeedDocument(doc);
448+
packet_stats_collector_.ResetPacketStats();
449+
}
450+
414451
void SendEventMessage(const std::string& type, const std::shared_ptr<ts::EIT>& eit,
415452
const ts::Time& time, uint64_t pos) {
416453
MIRAKC_ARIB_ASSERT(eit);
@@ -466,6 +503,7 @@ class ServiceRecorder final : public PacketSink,
466503
ts::PID pmt_pid_ = ts::PID_NULL;
467504
State state_ = State::kPreparing;
468505
bool event_started_ = false;
506+
PacketStatsCollector packet_stats_collector_;
469507

470508
MIRAKC_ARIB_NON_COPYABLE(ServiceRecorder);
471509
};

test/cli_tests.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ assert 0 "$MIRAKC_ARIB filter-program-metadata --sid=0xFFFF"
7272
assert 0 "$MIRAKC_ARIB record-service --sid=1 --file=$TMPFILE --chunk-size=8192 --num-chunks=1"
7373
assert 0 "$MIRAKC_ARIB record-service --sid=1 --file=$TMPFILE --chunk-size=8192 --num-chunks=1 --start-pos=0"
7474
assert 0 "$MIRAKC_ARIB record-service --sid=1 --file=$TMPFILE --chunk-size=8192 --num-chunks=2 --start-pos=8192"
75+
assert 0 "$MIRAKC_ARIB record-service --sid=1 --file=$TMPFILE --chunk-size=8192 --num-chunks=2 --packet-stats"
7576
if [ -z "$CI" ]
7677
then
7778
# This test fails in GitHub Actions.

test/service_recorder_test.cc

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ constexpr size_t kNumChunks = 2;
1515
constexpr size_t kChunkSize = RingFileSink::kBufferSize * kNumBuffers;
1616
constexpr uint64_t kRingSize = kChunkSize * kNumChunks;
1717
const ServiceRecorderOption kOption{"/dev/null", 3, kChunkSize, kNumChunks};
18+
struct ServiceRecorderTest : testing::TestWithParam<ServiceRecorderOption> {};
1819
} // namespace
1920

2021
TEST(ServiceRecorderTest, NoPacket) {
@@ -136,8 +137,8 @@ TEST(ServiceRecorderTest, EventStart) {
136137
EXPECT_TRUE(src.IsEmpty());
137138
}
138139

139-
TEST(ServiceRecorderTest, EventProgress) {
140-
ServiceRecorderOption option = kOption;
140+
TEST_P(ServiceRecorderTest, EventProgress) {
141+
ServiceRecorderOption option = GetParam();
141142

142143
TableSource src;
143144
auto ring_sink = std::make_unique<MockRingSink>(option.chunk_size, option.num_chunks);
@@ -226,6 +227,18 @@ TEST(ServiceRecorderTest, EventProgress) {
226227
MockJsonlSink::Stringify(doc));
227228
return true;
228229
});
230+
if (option.packet_stats) {
231+
EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) {
232+
EXPECT_EQ(R"({"type":"packet-stats","data":{)"
233+
R"("errorPackets":0,)"
234+
R"("droppedPackets":0,)"
235+
R"("scrambledPackets":0)"
236+
R"(})"
237+
R"(})",
238+
MockJsonlSink::Stringify(doc));
239+
return true;
240+
});
241+
}
229242
EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) {
230243
EXPECT_EQ(R"({"type":"chunk","data":{"chunk":{)"
231244
R"("timestamp":1609426800000,"pos":16384)"
@@ -253,6 +266,18 @@ TEST(ServiceRecorderTest, EventProgress) {
253266
MockJsonlSink::Stringify(doc));
254267
return true;
255268
});
269+
if (option.packet_stats) {
270+
EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) {
271+
EXPECT_EQ(R"({"type":"packet-stats","data":{)"
272+
R"("errorPackets":0,)"
273+
R"("droppedPackets":0,)"
274+
R"("scrambledPackets":0)"
275+
R"(})"
276+
R"(})",
277+
MockJsonlSink::Stringify(doc));
278+
return true;
279+
});
280+
}
256281
EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) {
257282
EXPECT_EQ(R"({"type":"chunk","data":{"chunk":{)"
258283
R"("timestamp":1609426800000,"pos":0)"
@@ -267,16 +292,16 @@ TEST(ServiceRecorderTest, EventProgress) {
267292
EXPECT_CALL(*ring_sink, End).WillOnce(testing::Return());
268293
}
269294

270-
auto recorder = std::make_unique<ServiceRecorder>(kOption);
295+
auto recorder = std::make_unique<ServiceRecorder>(option);
271296
recorder->ServiceRecorder::Connect(std::move(ring_sink));
272297
recorder->JsonlSource::Connect(std::move(json_sink));
273298
src.Connect(std::move(recorder));
274299
EXPECT_EQ(EXIT_SUCCESS, src.FeedPackets());
275300
EXPECT_TRUE(src.IsEmpty());
276301
}
277302

278-
TEST(ServiceRecorderTest, EventEnd) {
279-
ServiceRecorderOption option = kOption;
303+
TEST_P(ServiceRecorderTest, EventEnd) {
304+
ServiceRecorderOption option = GetParam();
280305

281306
TableSource src;
282307
auto file = std::make_unique<MockFile>();
@@ -353,6 +378,18 @@ TEST(ServiceRecorderTest, EventEnd) {
353378
MockJsonlSink::Stringify(doc));
354379
return true;
355380
});
381+
if (option.packet_stats) {
382+
EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) {
383+
EXPECT_EQ(R"({"type":"packet-stats","data":{)"
384+
R"("errorPackets":0,)"
385+
R"("droppedPackets":1,)"
386+
R"("scrambledPackets":0)"
387+
R"(})"
388+
R"(})",
389+
MockJsonlSink::Stringify(doc));
390+
return true;
391+
});
392+
}
356393
EXPECT_CALL(*json_sink, HandleDocument).WillOnce([](const rapidjson::Document& doc) {
357394
EXPECT_EQ(R"({"type":"event-end","data":{)"
358395
R"("originalNetworkId":1,)"
@@ -406,13 +443,13 @@ TEST(ServiceRecorderTest, EventEnd) {
406443

407444
auto ring =
408445
std::make_unique<RingFileSink>(std::move(file), option.chunk_size, option.num_chunks);
409-
auto recorder = std::make_unique<ServiceRecorder>(kOption);
446+
auto recorder = std::make_unique<ServiceRecorder>(option);
410447
recorder->ServiceRecorder::Connect(std::move(ring));
411448
recorder->JsonlSource::Connect(std::move(json_sink));
412449
src.Connect(std::move(recorder));
413450
EXPECT_EQ(EXIT_SUCCESS, src.FeedPackets());
414451
EXPECT_TRUE(src.IsEmpty());
415-
}
452+
};
416453

417454
TEST(ServiceRecorderTest, EventStartBeforeEventEnd) {
418455
ServiceRecorderOption option = kOption;
@@ -791,3 +828,7 @@ TEST(ServiceRecorderTest, UnspecifiedEventEnd) {
791828
EXPECT_EQ(EXIT_SUCCESS, src.FeedPackets());
792829
EXPECT_TRUE(src.IsEmpty());
793830
}
831+
832+
INSTANTIATE_TEST_SUITE_P(EventTestWithPacketStats, ServiceRecorderTest,
833+
testing::Values(ServiceRecorderOption{"/dev/null", 3, kChunkSize, kNumChunks, 0, false},
834+
ServiceRecorderOption{"/dev/null", 3, kChunkSize, kNumChunks, 0, true}));

0 commit comments

Comments
 (0)