Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# This file is used to ignore files which are generated
# ----------------------------------------------------------------------------

.vscode
*~
*.autosave
*.a
Expand Down
15 changes: 13 additions & 2 deletions data_tamer_cpp/include/data_tamer/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ class LogChannel : public std::enable_shared_from_this<LogChannel>
* @param value pointer to the vectors of values.
* @return the ID to be used to unregister or enable/disable the values.
*/
template <template <class, class> class Container, class T, class... TArgs,
std::enable_if_t<!has_TypeDefinition<Container<T, TArgs...>>::value, bool> = true>
template <
template <class, class> class Container, class T, class... TArgs,
std::enable_if_t<!has_TypeDefinition<Container<T, TArgs...>>::value, bool> = true>
RegistrationID registerValue(const std::string& name,
const Container<T, TArgs...>* value);

Expand Down Expand Up @@ -153,6 +154,16 @@ class LogChannel : public std::enable_shared_from_this<LogChannel>
*/
void addDataSink(std::shared_ptr<DataSinkBase> sink);

/**
* @brief removeDataSink remove a sink, i.e. a class collecting our snapshots.
*/
void removeDataSink(std::shared_ptr<DataSinkBase> sink);

/**
* @brief getNumberOfSinks returns the number of registered sinks.
*/
size_t getNumberOfSinks() const;

/**
* @brief takeSnapshot copies the current value of all your registered values
* and send an instance of Snapshot to all your Sinks.
Expand Down
43 changes: 37 additions & 6 deletions data_tamer_cpp/src/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct LogChannel::Pimpl
Schema schema;
bool logging_started = false;

mutable Mutex sinks_mutex;
std::unordered_set<std::shared_ptr<DataSinkBase>> sinks;
};

Expand Down Expand Up @@ -160,9 +161,29 @@ void LogChannel::unregister(const RegistrationID& id)

void LogChannel::addDataSink(std::shared_ptr<DataSinkBase> sink)
{
std::lock_guard const lock_sinks(_p->sinks_mutex);

// if we haven't already started logging, then takeSnapshot() handles adding the channel
// otherwise it must be done here so the sink knows about the existing schema
if(_p->logging_started)
{
sink->addChannel(_p->channel_name, _p->schema);
}
_p->sinks.insert(sink);
}

void LogChannel::removeDataSink(std::shared_ptr<DataSinkBase> sink)
{
std::lock_guard const lock(_p->sinks_mutex);
_p->sinks.erase(sink);
}

size_t LogChannel::getNumberOfSinks() const
{
std::lock_guard const lock(_p->sinks_mutex);
return _p->sinks.size();
}

Schema LogChannel::getSchema() const
{
std::lock_guard const lock(_p->mutex);
Expand All @@ -183,12 +204,16 @@ void LogChannel::addCustomType(const std::string& custom_type_name,
bool LogChannel::takeSnapshot(std::chrono::nanoseconds timestamp)
{
{
std::lock_guard const lock(_p->mutex);

std::lock_guard const lock_sinks(_p->sinks_mutex);
if(_p->sinks.empty())
{
return false;
}
}

{
std::lock_guard const lock(_p->mutex);

// update the _p->snapshot.active_mask if necessary
if(_p->mask_dirty)
{
Expand All @@ -215,11 +240,14 @@ bool LogChannel::takeSnapshot(std::chrono::nanoseconds timestamp)
}
_p->snapshot.payload.resize(payload_size);

// call sink->addChannel (usually done once)
// set up the channel if we haven't begun logging
if(!_p->logging_started)
{
_p->logging_started = true;
_p->snapshot.schema_hash = _p->schema.hash;

std::lock_guard const lock_sinks(_p->sinks_mutex);
// start logging inside the sinks_mutex so that addDataSink does not have an incorrect value due to a race condition
_p->logging_started = true;
for(auto const& sink : _p->sinks)
{
sink->addChannel(_p->channel_name, _p->schema);
Expand All @@ -243,9 +271,12 @@ bool LogChannel::takeSnapshot(std::chrono::nanoseconds timestamp)
}

bool all_pushed = true;
for(auto& sink : _p->sinks)
{
all_pushed &= sink->pushSnapshot(_p->snapshot);
std::lock_guard const lock_sinks(_p->sinks_mutex);
for(auto& sink : _p->sinks)
{
all_pushed &= sink->pushSnapshot(_p->snapshot);
}
}
return all_pushed;
}
Expand Down
3 changes: 2 additions & 1 deletion data_tamer_cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ else()
add_executable(datatamer_test
dt_tests.cpp
custom_types_tests.cpp
parser_tests.cpp)
parser_tests.cpp
add_remove_sink_tests.cpp)
gtest_discover_tests(datatamer_test DISCOVERY_MODE PRE_TEST)

target_include_directories(datatamer_test
Expand Down
71 changes: 71 additions & 0 deletions data_tamer_cpp/tests/add_remove_sink_tests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#include "data_tamer/channel.hpp"
#include "data_tamer/sinks/dummy_sink.hpp"

#include <gtest/gtest.h>
#include <string>
#include <thread>

using namespace DataTamer;

void take_snapshots(std::shared_ptr<LogChannel> channel, int count)
{
for(int i = 0; i < count; i++)
{
channel->takeSnapshot();
std::this_thread::sleep_for(std::chrono::microseconds(50));
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}

TEST(DataTamerSinkRegistry, AddSinkIncreasesCountAndRef)
{
auto channel = LogChannel::create("chan");
auto sink = std::make_shared<DummySink>();
channel->addDataSink(sink);

std::vector<double> dummyData = { 10, 11, 12 };
channel->registerValue("valsA", &dummyData);

ASSERT_EQ(channel->getNumberOfSinks(), 1);
}

TEST(DataTamerSinkRegistry, SnapshotsAreRecordedWhileSinkPresent)
{
auto channel = LogChannel::create("chan");
auto sink = std::make_shared<DummySink>();
channel->addDataSink(sink);

std::vector<double> dummyData = { 10, 11, 12 };
channel->registerValue("valsA", &dummyData);

const int snapshot_count = 10;
take_snapshots(channel, snapshot_count);

const auto hash = channel->getSchema().hash;
ASSERT_EQ(sink->snapshots_count[hash], snapshot_count);
}

TEST(DataTamerSinkRegistry, RemoveSinkStopsRecording)
{
auto channel = LogChannel::create("chan");
auto sink = std::make_shared<DummySink>();
channel->addDataSink(sink);

std::vector<double> dummyData = { 10, 11, 12 };
channel->registerValue("valsA", &dummyData);

const int snapshot_count = 10;
take_snapshots(channel, snapshot_count);

const auto hash = channel->getSchema().hash;
ASSERT_EQ(sink->snapshots_count[hash], snapshot_count);

channel->removeDataSink(sink);

ASSERT_EQ(channel->getNumberOfSinks(), 0);

// Taking more snapshots, should not be recorded in the sink (i.e does not increase snapshots_count)
take_snapshots(channel, snapshot_count);

ASSERT_EQ(sink->snapshots_count[hash], snapshot_count);
}
Loading