Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions MODULE.bazel.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions score/mw/com/benchmark/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# *******************************************************************************
# Copyright (c) 2025 Contributors to the Eclipse Foundation
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Apache License Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0
#
# SPDX-License-Identifier: Apache-2.0
# *******************************************************************************
load("@score-baselibs//score/language/safecpp:toolchain_features.bzl", "COMPILER_WARNING_FEATURES")

cc_binary(
name = "benchmark",
srcs = [
"benchmark.cpp",
"benchmark.h",
],
data = ["etc/mw_com_config.json"],
features = COMPILER_WARNING_FEATURES + [
"aborts_upon_exception",
],
deps = [
"//score/mw/com",
"@score-baselibs//score/mw/log",
"@score-baselibs//score/language/futurecpp",
],
)
241 changes: 241 additions & 0 deletions score/mw/com/benchmark/benchmark.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
/********************************************************************************
* Copyright (c) 2025 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
#include "score/mw/com/impl/instance_specifier.h"
#include "benchmark.h"

#include "score/concurrency/notification.h"

#include <cstdlib>
#include <iostream>
#include <sys/syscall.h>
#include <thread>
#include <score/latch.hpp>

#include "score/mw/com/runtime.h"

using namespace std::chrono_literals;
using namespace score::mw::com;
using namespace std::chrono;

score::cpp::latch benchmark_ab_start_point{3}, benchmark_ab_finish_point{3}, init_ab_sync_point{3}, deinit_ab_sync_point{2};

void SetupThread(int cpu)
{
auto id = std::this_thread::get_id();
auto native_handle = *reinterpret_cast<std::thread::native_handle_type*>(&id);

int max_priority = sched_get_priority_max(SCHED_RR);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this (POSIX) func needs a specific include -> <sched.h> ?

struct sched_param params;
params.sched_priority = max_priority;
if (pthread_setschedparam(native_handle, SCHED_RR, &params))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need include <pthread.h> for this?

{
std::cout << "Failed to setschedparam: " << std::strerror(errno) << std::endl;
std::cout << "App needs to be run as root" << std::endl;
return;
}

cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu, &cpuset);
if (pthread_setaffinity_np(native_handle, sizeof(cpu_set_t), &cpuset))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is Linux specific (thus the "non portable / _np" extension. Then you should adjust the bazel BUILD accordingly, annotating, that binary is "Linux" only (since major/typical goal in score is imdo to achieve QNX compatibility)

{
std::cout << "Failed to setaffinity_np: " << std::strerror(errno) << std::endl;
std::cout << "App needs to be run as root" << std::endl;
}
}

void Transceiver(int cpu, bool starter, impl::InstanceSpecifier skeleton_instance_specifier, impl::InstanceSpecifier proxy_instance_specifier)
{
SetupThread(cpu);

auto create_result = BenchmarkSkeleton::Create(skeleton_instance_specifier);
if (!create_result.has_value())
{
std::cerr << "Unable to construct skeleton: " << create_result.error() << "!" << std::endl;
return;
}
auto& skeleton = create_result.value();
const auto offer_result = skeleton.OfferService();
if (!offer_result.has_value())
{
std::cerr << "Unable to offer service for skeleton: " << offer_result.error() << "!" << std::endl;
return;
}

ServiceHandleContainer<impl::HandleType> handle{};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as it is a container (could contain even multiple handles) I would prefer to rename:
handle -> handle_container
or at least:
handle -> handles

do
{
auto handles_result = BenchmarkProxy::FindService(proxy_instance_specifier);
if (!handles_result.has_value())
{
std::cerr << "Unable to find service: " << handles_result.error() << "!" << std::endl;
return;
}
handle = std::move(handles_result).value();
if (handle.size() == 0)
{
std::this_thread::sleep_for(500ms);
}
} while (handle.size() == 0);

auto proxy_result = BenchmarkProxy::Create(std::move(handle.front()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This I do not get. Why are you creating a proxy/consumer within the "Transmitter" func? The job of subscribing/consuming data provided by the skeleton/provider should be the job of the "subscriber" thread, which calls the Subscriber function ...

And for naming:
I would prefer to have either Provider or Sender instead of Transmitter! This is more natural as the Skeleton is the "Service Provider" and the Proxy is the "Service Consumer".
Or you name it "Sender", because you are actually calling the Send API on the skeleton side ....

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the function was supposed to be named transeiver which would put some light on overall functionality.
There are transeivers and subscribers.
Renamed

if (!proxy_result.has_value())
{
std::cerr << "Unable to construct proxy: " << proxy_result.error() << "!" << std::endl;
return;
}
auto& proxy = proxy_result.value();

impl::ProxyEvent<score::mw::com::DummyBenchmarkData>& dummy_data_event = proxy.dummy_benchmark_data_;
score::Result<score::mw::com::impl::SampleAllocateePtr<score::mw::com::DummyBenchmarkData>> sample_result;

dummy_data_event.Subscribe(1);

init_ab_sync_point.arrive_and_wait();
benchmark_ab_start_point.arrive_and_wait();
if (starter)
{
do {
sample_result = skeleton.dummy_benchmark_data_.Allocate();
} while (!sample_result.has_value());
skeleton.dummy_benchmark_data_.Send(std::move(sample_result).value());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this "starter" logic? So only one provider/skeleton does initially provide/send an event before it gets read/consumed in line 131? Why not in the case of the other provider?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explained in other comment

}
for (std::size_t cycle = 0U; cycle < kIterations; cycle++)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "testing-sequence" in the loop looks a bit odd?
So basically it is:

  • Consumer side: Try to acquire a new sample (new since the last call to GetNewSamples)
  • Provider side: Allocate memory for a new sample to be sent.
  • Provider side: Send the allocated sample.

The code is imho very confusing as in some step (but not all!) you tried to add "error" handling in the sense, that you simply repeat the API call in case of error.
But why aren't you doing this for the last step (Send()) - it also can have an error ...
And why do you for the 1st step use a while-loop with empty body and in the secon step a do-while-loop?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While loop with empty body was incorrect use of api - wrong assumption that 0 samples received returns error.
Workflow further explained in other comment.

{
while (true) {
auto result = dummy_data_event.GetNewSamples((
[](SamplePtr<DummyBenchmarkData> sample) noexcept {
std::ignore = sample;
}),1);
if (result.has_value())
{
if (result.value() == 0)
{
continue;
} else {
break;
}
}
};
Comment on lines +115 to +129
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. This is still "odd". In case the call to GetNewSamples() would give you an error. IF it would give you an error, it will be most likely indicating an issue, which won't self-heal! But then you have an endless-loop here.
So my proposal:

Suggested change
while (true) {
auto result = dummy_data_event.GetNewSamples((
[](SamplePtr<DummyBenchmarkData> sample) noexcept {
std::ignore = sample;
}),1);
if (result.has_value())
{
if (result.value() == 0)
{
continue;
} else {
break;
}
}
};
while (true) {
auto result = dummy_data_event.GetNewSamples((
[](SamplePtr<DummyBenchmarkData> sample) noexcept {
std::ignore = sample;
}),1);
if (!result.has_value())
{
std::cerr << "Transceiver: GetNewSamples()( failled: << result.error() << "!" << std::endl;
break;
}
if (result.value() == 0)
{
continue;
} else
{
break;
}
};

do {
sample_result = skeleton.dummy_benchmark_data_.Allocate();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you have the "same" problem as above. When you have a problem doing allocation ... there is a likelihood, that this problem does not self-heal amd you end up in an endless loop.

} while (!sample_result.has_value());
skeleton.dummy_benchmark_data_.Send(std::move(sample_result).value());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sendmay fail ... and you don't have any error handling ... if you don't want to do error-handling ... then you should be at least concise by explicitly ignoring the return value of Send()...


}
benchmark_ab_finish_point.arrive_and_wait();
dummy_data_event.Unsubscribe();
deinit_ab_sync_point.arrive_and_wait();
skeleton.StopOfferService();
}

void Subscriber(int cpu, impl::InstanceSpecifier proxy_instance_specifier)
{
SetupThread(cpu);

ServiceHandleContainer<impl::HandleType> handle{};
do
{
auto handles_result = BenchmarkProxy::FindService(proxy_instance_specifier);
if (!handles_result.has_value())
{
std::cerr << "Unable to find service: " << handles_result.error() << "!" << std::endl;
return;
}
handle = std::move(handles_result).value();
if (handle.size() == 0)
{
std::this_thread::sleep_for(500ms);
}
} while (handle.size() == 0);

auto proxy_result = BenchmarkProxy::Create(std::move(handle.front()));
if (!proxy_result.has_value())
{
std::cerr << "Unable to construct proxy: " << proxy_result.error() << "!" << std::endl;
return;
}
auto& proxy = proxy_result.value();

impl::ProxyEvent<score::mw::com::DummyBenchmarkData>& dummy_data_event = proxy.dummy_benchmark_data_;

dummy_data_event.Subscribe(1);
for (std::size_t cycle = 0U; cycle < kIterations; cycle++)
{
while (true) {
auto result = dummy_data_event.GetNewSamples((
[](SamplePtr<DummyBenchmarkData> sample) noexcept {
std::ignore = sample;
}),1);
if (result.has_value())
{
if (result.value() == 0)
{
continue;
} else {
break;
}
}
};
}
dummy_data_event.Unsubscribe();
}

int main()
{
int cpu = 0;
const auto instance_specifier_instance_a_result = InstanceSpecifier::Create("benchmark/InstanceA");
const auto instance_specifier_instance_b_result = InstanceSpecifier::Create("benchmark/InstanceB");

if (!instance_specifier_instance_a_result.has_value() || !instance_specifier_instance_b_result.has_value())
{
std::cerr << "Invalid instance specifier, terminating." << std::endl;
return EXIT_FAILURE;
}
const auto& instance_specifier_instance_a = instance_specifier_instance_a_result.value();
const auto& instance_specifier_instance_b = instance_specifier_instance_b_result.value();

std::cout << "Starting benchmark" << std::endl;

std::thread transceiverA(Transceiver, cpu++, true, std::ref(instance_specifier_instance_a), std::ref(instance_specifier_instance_b));
std::thread transceiverB(Transceiver, cpu++, false, std::ref(instance_specifier_instance_b), std::ref(instance_specifier_instance_a));
#if kSubscribers > 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you have the additional subscriber threads still in here. They are not explicitly taken into account by interacting with specific timing checkpoints?
So they are run "in background", thus creating CPU-load and because they are also accessing the provided service instances, they are also effectively potentially affecting the Transceivers (working on the same control-data-structures).

So is this exactly the idea? Check how an additional number of consumers might affect the "latency" of a given communication? If so -> that should be explicitly be documented.

std::thread subscribers[kSubscribers];
if (kSubscribers > 0)
{
subscribers[i] = std::thread(Subscriber, cpu++, std::ref(instance_specifier_instance_a));
subscribers[i] = std::thread(Subscriber, cpu++, std::ref(instance_specifier_instance_b));
}
#endif
init_ab_sync_point.arrive_and_wait();
benchmark_ab_start_point.arrive_and_wait();
const auto benchmark_ab_start_time = std::chrono::steady_clock::now();
benchmark_ab_finish_point.arrive_and_wait();
const auto benchmark_ab_stop_time = std::chrono::steady_clock::now();
const auto benchmark_ab_time = benchmark_ab_stop_time - benchmark_ab_start_time;

transceiverA.join();
transceiverB.join();
#if kSubscribers > 0
for (std::size_t i = 0; i < kSubscribers; i++)
{
subscribers[i].join();
}
#endif
std::cout << "Results:" << "\t" <<
"Iterations: " << kIterations << ", " << "\t" <<
"Time: " << duration<float>(benchmark_ab_time).count() << "s, " << "\t" <<
"Latency: " << duration_cast<nanoseconds>(benchmark_ab_time).count() / (kIterations * 2) << "ns, " << "\t" <<
"Sample Size: " << kSampleSize <<
"Additional subscribers: " << kSubscribers << std::endl;
}
56 changes: 56 additions & 0 deletions score/mw/com/benchmark/benchmark.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/********************************************************************************
* Copyright (c) 2025 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
#ifndef SCORE_IPC_BRIDGE_BENCHMARK_H
#define SCORE_IPC_BRIDGE_BENCHMARK_H

#include "score/mw/com/types.h"

namespace score::mw::com
{

constexpr std::size_t kSampleSize = 8192;
constexpr std::uint32_t kIterations = 1000000;
constexpr std::size_t kSubscribers = 2;
constexpr std::size_t kThreadsMultiTotal = kSubscribers + 2;

struct DummyBenchmarkData
{
DummyBenchmarkData() = default;

DummyBenchmarkData(DummyBenchmarkData&&) = default;

DummyBenchmarkData(const DummyBenchmarkData&) = default;

DummyBenchmarkData& operator=(DummyBenchmarkData&&) = default;

DummyBenchmarkData& operator=(const DummyBenchmarkData&) = default;

std::array<std::uint32_t, kSampleSize / sizeof(std::uint32_t)> dummy_data;

};

template <typename Trait>
class IpcBridgeInterface : public Trait::Base
{
public:
using Trait::Base::Base;

typename Trait::template Event<DummyBenchmarkData> dummy_benchmark_data_{*this, "dummy_data_arrived"};
};

using BenchmarkProxy = AsProxy<IpcBridgeInterface>;
using BenchmarkSkeleton = AsSkeleton<IpcBridgeInterface>;

} // namespace score::mw::com

#endif // SCORE_IPC_BRIDGE_BENCHMARK_H
8 changes: 8 additions & 0 deletions score/mw/com/benchmark/etc/logging.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"appId": "IPBR",
"appDesc": "ipc_bridge",
"logLevel": "kOff",
"logLevelThresholdConsole": "kOff",
"logMode": "kConsole",
"dynamicDatarouterIdentifiers" : true
}
Loading