Conversation
There was a problem hiding this comment.
Pull request overview
This pull request fixes a race condition in the getAny method by replacing condition variables with a new synchronization mechanism consisting of BinarySemaphore and ManyToOneNotifier. The race condition occurred when a message arrived between checking for messages and waiting on the condition variable, causing the notification to be missed. The new implementation uses a binary semaphore that "remembers" notifications, ensuring no messages are lost.
Changes:
- Introduces
BinarySemaphoreclass as a temporary C++17-compatible alternative tostd::binary_semaphore - Implements
ManyToOneNotifierfor many-to-one thread synchronization with predicate support - Refactors
MessageQueueto use notifiers instead of condition variables, with a newwaitAnystatic method - Adds thread-safe locking to
LockingQueue::isDestroyed()method
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| include/depthai/utility/BinarySemaphore.hpp | New binary semaphore implementation with acquire/release semantics and timeout support |
| include/depthai/utility/ManyToOneNotifier.hpp | New notifier class that wraps BinarySemaphore with predicate-based waiting |
| include/depthai/pipeline/MessageQueue.hpp | Updated to use notifiers instead of condition variables, added waitAny declaration |
| src/pipeline/MessageQueue.cpp | Refactored getAny to use new waitAny method, renamed methods from condVar to notifier terminology |
| include/depthai/utility/LockingQueue.hpp | Added mutex lock to isDestroyed() for thread safety |
| tests/src/onhost_tests/calibration_handler_test.cpp | Reordered includes to follow standard library first convention |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| queue = std::move(m.queue); | ||
| name = std::move(m.name); | ||
| callbacks = std::move(m.callbacks); | ||
| notifiers = std::move(m.notifiers); | ||
| uniqueCallbackId = m.uniqueCallbackId; | ||
| pipelineEventDispatcher = m.pipelineEventDispatcher; | ||
| return *this; |
There was a problem hiding this comment.
Missing uniqueNotifierId in assignment operators. The move assignment operator copies uniqueCallbackId (line 90) but doesn't copy/move uniqueNotifierId. This is inconsistent and could lead to ID collisions if notifiers are added after assignment, since the ID counter starts at 0 but the notifiers map may contain entries with higher IDs from the source object.
| removeNotifiers(); | ||
| return false; |
There was a problem hiding this comment.
Unreachable code detected. After the try-catch block completes successfully (line 259 returns true), execution will never reach lines 264-265. The removeNotifiers() call on line 264 will never execute, which means notifiers added to the queues will not be removed in the success case. This will cause a resource leak as notifiers accumulate in the MessageQueue instances.
| CallbackId uniqueCondVarId{0}; | ||
| std::mutex notifierMtx; | ||
| std::unordered_map<CallbackId, std::shared_ptr<ManyToOneNotifier>> notifiers; | ||
| ; |
There was a problem hiding this comment.
Extraneous semicolon after member declaration. This line contains only a semicolon which creates an unnecessary empty declaration.
| ; |
| queue = c.queue; | ||
| name = c.name; | ||
| callbacks = c.callbacks; | ||
| notifiers = c.notifiers; | ||
| uniqueCallbackId = c.uniqueCallbackId; | ||
| pipelineEventDispatcher = c.pipelineEventDispatcher; | ||
| return *this; |
There was a problem hiding this comment.
Missing uniqueNotifierId in assignment operators. The copy and move assignment operators copy uniqueCallbackId (lines 80 and 90) but don't copy uniqueNotifierId. This is inconsistent and could lead to ID collisions if notifiers are added after assignment, since the ID counter starts at 0 but the notifiers map may contain entries with higher IDs from the source object.
| std::lock_guard<std::mutex> lock(cvNotifyMtx); | ||
| std::lock_guard<std::mutex> lock(notifierMtx); | ||
|
|
||
| // Call all callbacks |
There was a problem hiding this comment.
Misleading comment. The comment says "Call all callbacks" but the code actually notifies all notifiers, not callbacks. The comment should be updated to say "Notify all notifiers" or "Notify all listeners" to match the function name.
| // Call all callbacks | |
| // Notify all listeners |
| std::unique_lock<std::mutex> lock(cvNotifyMtx); | ||
| std::unique_lock<std::mutex> lock(notifierMtx); | ||
|
|
||
| // If callback with id 'callbackId' doesn't exists, return false |
There was a problem hiding this comment.
Incorrect variable reference in comment. The comment refers to 'callbackId' but the parameter is named 'notifierId'. The comment should be updated to: "If notifier with id 'notifierId' doesn't exist, return false".
| // If callback with id 'callbackId' doesn't exists, return false | |
| // If notifier with id 'notifierId' doesn't exist, return false |
| #pragma once | ||
|
|
||
| #include <chrono> | ||
| #include <condition_variable> | ||
| #include <mutex> | ||
|
|
||
| /** | ||
| * @brief A binary semaphore implementation | ||
| * Should be removed and replaced with std::binary_semaphore if and when C++20 is supported | ||
| */ | ||
| class BinarySemaphore { | ||
| std::mutex mtx; | ||
| std::condition_variable cv; | ||
| bool available = false; // zero-initialized | ||
|
|
||
| public: | ||
| BinarySemaphore() = default; | ||
| explicit BinarySemaphore(bool initiallyAvailable) : available(initiallyAvailable) {} | ||
|
|
||
| // signal / post / V | ||
| void release() { | ||
| { | ||
| std::lock_guard<std::mutex> lk(mtx); | ||
| available = true; | ||
| } | ||
| cv.notify_one(); | ||
| } | ||
|
|
||
| // wait / P | ||
| void acquire() { | ||
| std::unique_lock<std::mutex> lk(mtx); | ||
| cv.wait(lk, [&] { return available; }); | ||
| available = false; // consume | ||
| } | ||
|
|
||
| // try-wait | ||
| bool tryAcquire() { | ||
| std::lock_guard<std::mutex> lk(mtx); | ||
| if(!available) return false; | ||
| available = false; | ||
| return true; | ||
| } | ||
|
|
||
| template <class Rep, class Period> | ||
| bool tryAcquireFor(const std::chrono::duration<Rep, Period>& timeout) { | ||
| std::unique_lock<std::mutex> lk(mtx); | ||
| if(!cv.wait_for(lk, timeout, [&] { return available; })) return false; | ||
| available = false; | ||
| return true; | ||
| } | ||
|
|
||
| template <class Clock, class Duration> | ||
| bool tryAcquireUntil(const std::chrono::time_point<Clock, Duration>& deadline) { | ||
| std::unique_lock<std::mutex> lk(mtx); | ||
| if(!cv.wait_until(lk, deadline, [&] { return available; })) return false; | ||
| available = false; // consume | ||
| return true; | ||
| } | ||
| }; |
There was a problem hiding this comment.
Missing namespace declaration. The BinarySemaphore class is defined in the global namespace, which could lead to naming conflicts. Other utility classes in the depthai project (like LockingQueue) are defined within the 'dai' namespace. Consider wrapping this class in the 'dai' namespace for consistency and to avoid potential naming conflicts.
| #pragma once | ||
|
|
||
| #include <atomic> | ||
| #include <stdexcept> | ||
|
|
||
| #include "BinarySemaphore.hpp" | ||
|
|
||
| class ManyToOneNotifier { | ||
| BinarySemaphore semaphore{false}; | ||
| std::atomic<bool> waiting{false}; | ||
|
|
||
| public: | ||
| ManyToOneNotifier() = default; | ||
|
|
||
| /** | ||
| * @brief Notify the waiting thread | ||
| * Should always be called after the state change that the waiting thread is waiting for | ||
| */ | ||
| void notifyOne() { | ||
| semaphore.release(); | ||
| } | ||
|
|
||
| /** | ||
| * @brief Wait until notified | ||
| * @arg pred Predicate to check after each notification - IMPORTANT: must be thread-safe | ||
| */ | ||
| template <typename Pred> | ||
| void wait(Pred pred) { | ||
| if(waiting.exchange(true)) { | ||
| throw std::runtime_error("ManyToOneNotifier: Multiple threads waiting on the same notifier is not supported"); | ||
| } | ||
| try { | ||
| while(!pred()) semaphore.acquire(); | ||
| } catch(...) { | ||
| waiting = false; | ||
| throw; | ||
| } | ||
| waiting = false; | ||
| } | ||
|
|
||
| /** | ||
| * @brief Wait until notified or timeout occurs | ||
| * @arg pred Predicate to check after each notification - IMPORTANT: must be thread-safe | ||
| */ | ||
| template <typename Pred, typename Rep, typename Period> | ||
| bool waitFor(Pred pred, std::chrono::duration<Rep, Period> timeout) { | ||
| if(waiting.exchange(true)) { | ||
| throw std::runtime_error("ManyToOneNotifier: Multiple threads waiting on the same notifier is not supported"); | ||
| } | ||
|
|
||
| try { | ||
| auto deadline = std::chrono::steady_clock::now() + timeout; | ||
|
|
||
| while(!pred()) { | ||
| if(!semaphore.tryAcquireUntil(deadline)) { | ||
| return pred(); | ||
| } | ||
| } | ||
| return true; | ||
| } catch(...) { | ||
| waiting = false; | ||
| throw; | ||
| } | ||
| waiting = false; | ||
| } | ||
| }; |
There was a problem hiding this comment.
Missing namespace declaration. The ManyToOneNotifier class is defined in the global namespace, which could lead to naming conflicts. Other utility classes in the depthai project (like LockingQueue) are defined within the 'dai' namespace. Consider wrapping this class in the 'dai' namespace for consistency and to avoid potential naming conflicts.
| #pragma once | ||
|
|
||
| #include <atomic> | ||
| #include <stdexcept> |
There was a problem hiding this comment.
Missing chrono header include. The waitFor method uses std::chrono::duration and std::chrono::steady_clock but the header is not included. This works only because BinarySemaphore.hpp includes , creating a hidden dependency. Add #include for proper self-contained header design.
| #include <stdexcept> | |
| #include <stdexcept> | |
| #include <chrono> |
…/get_any_race_condition
| template <typename Pred> | ||
| void wait(Pred pred) { | ||
| if(waiting.exchange(true)) { | ||
| throw std::runtime_error("ManyToOneNotifier: Multiple threads waiting on the same notifier is not supported"); |
There was a problem hiding this comment.
What is the purpose of the condition that only one thread can wait for a notifier; what is that beneficial for?
In the message queues, you typically want that many threads wait in a notification queue and only one thread gets notified.
There was a problem hiding this comment.
The purpose of this class is for one main thread to wait on any one of multiple worker threads. This check ensures that someone using this class doesn't get unexpected results if they try to wait from multiple threads.
There was a problem hiding this comment.
Is that a beneficial behavior?
If you did not use a BinarySemaphore but used a class cv + mutex, then there could be more waiting threads waiting while Worker threads could be notifying them.
There was a problem hiding this comment.
If I used a mutex I'd have to lock the queue.push method and no output would be able to send while another watched queue is being sent to or while checking the predicate. I'd also have to add a lock for each waiting thread which would increase code complexity, which is not necessary since this accomplishes the goal of this PR.
There was a problem hiding this comment.
I assume we have the following scenario - and correct me, if I am wrong.
- Multiple writers. One writer doesn't wait for a signal, it tries to push to a particular queue. But there may be multiple writers trying to push data to the same queue.
- Multiple readers on the particular queue. A reader is waiting to get a signal that the queue has any data available - and it reads the data and processes further.
What about this solution then:
-
A worker thread (writer) that pushes data to a queue
1.1. The thread locks the queue with queue's related queueMutex
1.2. Data is pushed into queue
1.3. The queue is unlocked with queue's related queueMutex
1.4. A conditional variable dataReady is called to notify max. one thread -
A worker thread (reader) that waits for a data from a queue:
2.1 The queue locks the queue with queue's related queueMutex
2.2 Conditional variable dataReady->wait(queueMutex...) is called:
2.2.1 Checks the predicate and if true, return
2.2.2 Unlocks the queueMutex
2.2.3 Waits for a signal from dataReady (with unlocked queueMutex)
2.2.4 Locks the queueMutex and goes to 2.2.1
2.2 The worker thread reads the data from the queue
2.3 The worker thread unlocks the queueMutex
There was a problem hiding this comment.
If I understand correctly, you're describing the way queue.get() works. This is a separate thing. Each message queue already has its own conditional variable and mutex that handles pushes and pops from the queue.
The issue I had for getAny is that there isn't a mechanism built in to only wait until one conditional variable notifies. To do this you need to pass a single condition variable instance to multiple queues and wait until one notifies.
So the way it works now is that each time you want to wait on any of a list of queues, a semaphore is created and added to every queue in the list. When a new message arrives to any of the queues (or when a queue is destroyed) the semaphore is notified, and the waiting thread gets woken up.
| std::lock_guard<std::mutex> lk(mtx); | ||
| available = true; | ||
| } | ||
| cv.notify_one(); |
There was a problem hiding this comment.
If we talk about BinarySemaphore (only one waiting), then you can notify conditionally.
If a semaphore was already released and anybody would call "release", do not notify. Take it as an erroneous situation with no action.
There was a problem hiding this comment.
If there's no waiting thread then there's practically no overhead to notifying, which would be the case if available was already true. If I conditionally notified, I'd have to do it under the lock which would mean that the waiting thread would wake up and then immediately block and wait for the mutex to be released which would cause two extra context switches.
There was a problem hiding this comment.
Not sure you understood me. Let's make it clear.
This is one scenario I was thinking about:
void release() {
ASSERT(available == true, "There is a bug in the SW, a thread calls release while the semaphore is not acquired. We will not call notify_one");
{
std::lock_guard<std::mutex> lk(mtx);
available = true;
}
cv.notify_one();
}
Another one, without ASSERT but with
void release() {
bool was_available = false;
{
std::lock_guard<std::mutex> lk(mtx);
was_available = available;
available = true;
}
if (!was_available)
cv.notify_one(); // call notify one only in an expected scenario.
}
There was a problem hiding this comment.
Ah I see. notify_one has little overhead when there are no waiters but it certainly doesn't hurt to add this. Thanks
…/get_any_race_condition
|
@majvan Do you have any more concerns or can this be merged? |
…/get_any_race_condition
…/get_any_race_condition
| throw std::runtime_error("ManyToOneNotifier: Multiple threads waiting on the same notifier is not supported"); | ||
| } | ||
| try { | ||
| while(!pred()) semaphore.acquire(); |
There was a problem hiding this comment.
Two issues.
[1]
We check pred() and then (maybe) acquire a semaphore
This function has then two expected outcomes then:
- the pred was fulfilled BUT NO semaphore acquired (the case when the pred() returns true the first time)
- the pred was fulfilled AND semaphore acquired (the case when the pred() returns true the next time)
The semaphore acquisition is not causally linked to the predicate becoming true.
[2]
Typically the Pred in the wait (like cv::wait) are used just because the whole pred() is running protected under the mutex. The reason is that the mutex ensures the atomicity of the operation together with cv's waiting cycle.
It's not a case here. If pred() is an unprotected function, it can be preempted by other thread which can access the state of the pred() - and this creates a potential RACE CONDITION.
Example:
// We need to ensure that the queue has data and that camera is on (an abstract example)
bool pred() {
if (!camera_on)
return false;
if (queue.empty)
return false;
return true;
}
Step 0 - initial state:
camera_on = true;
queue.empty = true;
Step 1 - ThreadA acquires a semaphore and continues:
gets into pred() and computes camera_on = true
-- preemptive context switch --
Step 2 - Thread C runs:
{
lock(camera_and_queue);
camera_on = false; // just set that the camera is off
}
-- preemptive context switch --
Step 3 - ThreadA continues:
continues the pred() and computes queue.empty = false and returns true
ManyToWaitNotifier::wait() returns but the state is:
camera_on = false; <-- RACE CONDITION: we wanted to wait only till camera is on
The predicate reads shared state (camera_on, queue.empty) without holding the mutex that protects it. This allows concurrent modification by other threads, resulting in undefined behavior and inconsistent observations of the system state.
In contrast, std::condition_variable::wait(lock, pred) guarantees that the predicate is evaluated while holding the mutex, making the check and sleep cycle atomic.
Another outcome of [2] is a possible deadlock:
Step 1: ThreadA - runs:
if (!pred())
semaphore.acquire(); // waits in the semaphore for the cv
Step 2: ThreadB - is about to release the semaphore:
{
std::lock(camera_and_queue)
queue.empty = false;
notifyOne(); -> semaphore.release(); // notify there is next some data, release the semaphore
}
Step 3: ThreadA - continues:
semaphore.acquire(); // finishes the semaphore acquiring
-- preemptive context switch --
Step 4: ThreadC - runs:
{
lock(camera_and_queue);
camera_on = false; // just set that the camera is off
}
-- preemptive context switch --
Step 5 - ThreadA continues:
if (!pred())
semaphore.acquire(); // DEADLOCK <-- semaphore count is zero and no further release will occur; ThreadA will get not awaken anymore
The semaphore release is not tied to the predicate remaining true.
A thread may wake, consume the semaphore, and then observe the predicate as false due to a concurrent state change. Since the semaphore signal has already been consumed and no further releases are guaranteed, the waiting thread may block forever.
There was a problem hiding this comment.
The second issue is documented:
Should always be called after the state change that the waiting thread is waiting for
(maybe the wording is not clear, should it be 'after any state change'?)
ThreadC should notify after setting camera_on to false.
The first issue is a good point. I'll document this limitation.
There was a problem hiding this comment.
This constraint works for the deadlock, but these rules have to be followed.
And it brings additional unnecessary wakeups even in the case your state is going "against" the wakeup (why would you wakeup threads when you switch off the camera when you want them to work when the camera is switched on?)
Using std::condition_variable is much easier and exactly what you need here without reinventing it. :)
…/get_any_race_condition
…/get_any_race_condition
| std::mutex inputsWaitMutex; | ||
| auto inputsWaitCv = std::make_shared<std::condition_variable>(); | ||
| }; | ||
| auto checkAllClosed = [&]() { return boost::algorithm::all_of(queues, [](const std::reference_wrapper<MessageQueue> q) { return q.get().isClosed(); }); }; |
| if(gotAny) { | ||
| for(const auto& kv : queues) { | ||
| auto& input = kv.second; | ||
| if(!input.isClosed() && input.has()) inputs[kv.first] = input.get<ADatatype>(); |
There was a problem hiding this comment.
Shouldn't be this atomically checked and retrieved?
Get an input only when it is not closed and has data. Any modification and readout of the queue state should be mutex-ed.
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
📝 WalkthroughWalkthroughThis pull request refactors MessageQueue's synchronization mechanism, replacing condition-variable-based notification with a new notifier abstraction. Two new utility classes—BinarySemaphore and WaitAnyNotifier—are introduced to manage thread coordination. MessageQueue's public API is updated with renamed methods and timeout support for multi-queue waiting. Thread safety in LockingQueue is improved, and comprehensive test coverage for multi-queue operations is added. Changes
Sequence DiagramsequenceDiagram
participant Client
participant MessageQueue as MessageQueue<br/>(Primary)
participant Notifier as WaitAnyNotifier
participant Semaphore as BinarySemaphore
participant WaitThread as Waiting Thread
Client->>MessageQueue: waitAny(queues, timeout)
MessageQueue->>Notifier: create shared instance
MessageQueue->>MessageQueue: subscribe all queues to notifier
MessageQueue->>WaitThread: begin waiting
WaitThread->>Notifier: wait(predicate)
Notifier->>Semaphore: acquire with timeout
Semaphore-->>Notifier: block until available
Client->>MessageQueue: send(data) on queue[i]
MessageQueue->>Notifier: notifyListeners()
Notifier->>Notifier: notifyOne()
Notifier->>Semaphore: release()
Semaphore-->>WaitThread: unblock
WaitThread->>Notifier: evaluate predicate
Notifier-->>Notifier: check if any queue has data
alt Data Available
Notifier-->>WaitThread: return true
WaitThread->>MessageQueue: unsubscribe notifier
MessageQueue-->>Client: return bool (true)
else Timeout
Semaphore-->>WaitThread: timeout expires
Notifier-->>WaitThread: return false
WaitThread->>MessageQueue: unsubscribe notifier
MessageQueue-->>Client: return bool (false)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
📝 Coding Plan
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (3)
include/depthai/utility/BinarySemaphore.hpp (1)
24-31: 🧹 Nitpick | 🔵 TrivialOnly notify on the
false -> truetransition.
notifyis always set totrue, so a duplicaterelease()still wakes a waiter even whenavailablewas alreadytrue. That defeats the conditional-notify behavior discussed earlier.Suggested fix
void release() { bool notify = false; { std::lock_guard<std::mutex> lk(mtx); - available = true; - notify = true; + notify = !available; + available = true; } if(notify) cv.notify_one(); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@include/depthai/utility/BinarySemaphore.hpp` around lines 24 - 31, In BinarySemaphore.hpp's release() method the notify flag is unconditionally true so calling release() when available is already true will still wake a waiter; change the locked section in release() (function name: release, members: available, mtx, cv) to only set notify = true when available was false (i.e., check previous state under the mutex and set available = true and notify = true only on the false->true transition) so cv.notify_one() runs only when the semaphore actually became available.src/pipeline/MessageQueue.cpp (1)
229-260:⚠️ Potential issue | 🟠 Major
checkForMessages()can still break on a close race.It builds readiness from
!isClosed()andhas()under separate queue calls. If a queue closes between those two checks,has()throwsQueueException;pred()converts that intotrue, but Line 260 calls the same helper again outside the catch. That means a close racing with this path can either endwaitAny()early or leak an unexpectedQueueExceptioneven though closed queues are supposed to be tolerated here. Please snapshot{closed, has_message}under one queue lock instead of composing two accessors.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/pipeline/MessageQueue.cpp` around lines 229 - 260, The predicate checkForMessages() races because it calls q.get().isClosed() and q.get().has() separately; if the queue closes between those calls has() may throw and the later call to checkForMessages() (line assigning gotMessage) can propagate QueueException. Change checkForMessages() to query each queue under a single atomic snapshot API or lock: for each input use one call that returns both closed and has_message together (or add a MessageQueue::snapshotStatus()/tryHasMessage() that acquires the queue lock and returns {closed, has}), then base readiness on that single snapshot; update pred() and the final gotMessage = ... line to use this snapshot helper so no separate isClosed()/has() calls are made outside the exception-safe block (refer to functions checkForMessages, pred, and the final gotMessage assignment).include/depthai/utility/WaitAnyNotifier.hpp (1)
39-46:⚠️ Potential issue | 🔴 CriticalThe semaphore wake can still be lost to a competing consumer.
wait()andwaitFor()only consume a token and then re-readpred()outside the state's mutex. Insrc/pipeline/MessageQueue.cpp, any concurrentget()/tryGet()/getAny()on the same queues can drain the notified message before the waiter recheckscheckForMessages(). Once that token is consumed, the next loop iteration can block forever. The current comment only bans another waiting thread, but a non-waiting consumer is enough to trigger the same race.Also applies to: 74-82
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@include/depthai/utility/WaitAnyNotifier.hpp` around lines 39 - 46, The wake token can be stolen by a competing non-waiting consumer because wait(Pred) and waitFor(...) consume the semaphore then re-check pred() outside synchronization; fix by making the wake check and token consumption atomic-relative to notifications: add a monotonic sequence/version counter in the WaitAnyNotifier class that notify() increments, have wait(Pred) and waitFor(...) sample the current version before waiting and loop until the sampled version changes and pred() is true (use the semaphore only as a wake hint), or alternatively hold the same mutex while checking pred() and consuming the semaphore so no other consumer can drain the signalled message between the pred check and semaphore acquire; update the implementations of wait(Pred), waitFor(...), and notify() accordingly (referencing wait(Pred), waitFor(Pred, ...), and notify()).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/pipeline/MessageQueue.cpp`:
- Around line 281-285: The exception handler in MessageQueue.cpp is comparing
e.what() and CLOSED_QUEUE_MESSAGE with != which compares pointers; change it to
compare string contents (e.g., use std::string(e.what()) != CLOSED_QUEUE_MESSAGE
or strcmp(e.what(), CLOSED_QUEUE_MESSAGE) != 0) so the closed-queue message is
detected correctly and only other exceptions are rethrown; locate the try/catch
around input.tryGet<ADatatype>() and the QueueException handling to apply this
fix.
---
Duplicate comments:
In `@include/depthai/utility/BinarySemaphore.hpp`:
- Around line 24-31: In BinarySemaphore.hpp's release() method the notify flag
is unconditionally true so calling release() when available is already true will
still wake a waiter; change the locked section in release() (function name:
release, members: available, mtx, cv) to only set notify = true when available
was false (i.e., check previous state under the mutex and set available = true
and notify = true only on the false->true transition) so cv.notify_one() runs
only when the semaphore actually became available.
In `@include/depthai/utility/WaitAnyNotifier.hpp`:
- Around line 39-46: The wake token can be stolen by a competing non-waiting
consumer because wait(Pred) and waitFor(...) consume the semaphore then re-check
pred() outside synchronization; fix by making the wake check and token
consumption atomic-relative to notifications: add a monotonic sequence/version
counter in the WaitAnyNotifier class that notify() increments, have wait(Pred)
and waitFor(...) sample the current version before waiting and loop until the
sampled version changes and pred() is true (use the semaphore only as a wake
hint), or alternatively hold the same mutex while checking pred() and consuming
the semaphore so no other consumer can drain the signalled message between the
pred check and semaphore acquire; update the implementations of wait(Pred),
waitFor(...), and notify() accordingly (referencing wait(Pred), waitFor(Pred,
...), and notify()).
In `@src/pipeline/MessageQueue.cpp`:
- Around line 229-260: The predicate checkForMessages() races because it calls
q.get().isClosed() and q.get().has() separately; if the queue closes between
those calls has() may throw and the later call to checkForMessages() (line
assigning gotMessage) can propagate QueueException. Change checkForMessages() to
query each queue under a single atomic snapshot API or lock: for each input use
one call that returns both closed and has_message together (or add a
MessageQueue::snapshotStatus()/tryHasMessage() that acquires the queue lock and
returns {closed, has}), then base readiness on that single snapshot; update
pred() and the final gotMessage = ... line to use this snapshot helper so no
separate isClosed()/has() calls are made outside the exception-safe block (refer
to functions checkForMessages, pred, and the final gotMessage assignment).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: eaa89e5d-54e6-41ae-b3ca-a5a430959072
📒 Files selected for processing (6)
include/depthai/pipeline/MessageQueue.hppinclude/depthai/utility/BinarySemaphore.hppinclude/depthai/utility/LockingQueue.hppinclude/depthai/utility/WaitAnyNotifier.hppsrc/pipeline/MessageQueue.cpptests/src/onhost_tests/message_queue_test.cpp
📜 Review details
🧰 Additional context used
🧬 Code graph analysis (3)
include/depthai/pipeline/MessageQueue.hpp (2)
tests/src/onhost_tests/message_queue_test.cpp (1)
pipelineEventDispatcher(530-532)src/pipeline/MessageQueue.cpp (2)
getAny(268-292)getAny(268-269)
src/pipeline/MessageQueue.cpp (1)
include/depthai/pipeline/MessageQueue.hpp (16)
notifier(206-206)callbackId(214-214)notifierId(222-222)QueueException(28-28)QueueException(28-28)QueueException(29-29)msg(47-47)msg(527-527)msg(536-536)msg(545-545)msg(552-552)queue(241-246)queues(94-94)queues(95-96)getAny(98-109)getAny(98-99)
tests/src/onhost_tests/message_queue_test.cpp (2)
include/depthai/pipeline/MessageQueue.hpp (9)
queues(94-94)queues(95-96)getAny(98-109)getAny(98-99)msg(47-47)msg(527-527)msg(536-536)msg(545-545)msg(552-552)src/pipeline/MessageQueue.cpp (4)
getAny(268-292)getAny(268-269)waitAny(220-267)waitAny(220-220)
🪛 Cppcheck (2.20.0)
src/pipeline/MessageQueue.cpp
[information] 5-5: Include file
(missingIncludeSystem)
[information] 5-5: Include file
(missingIncludeSystem)
[information] 5-5: Include file
(missingIncludeSystem)
[information] 5-5: Include file
(missingIncludeSystem)
[information] 5-5: Include file
(missingIncludeSystem)
[information] 5-5: Include file
(missingIncludeSystem)
[information] 5-5: Include file
(missingIncludeSystem)
[information] 5-5: Include file
(missingIncludeSystem)
[information] 5-5: Include file
(missingIncludeSystem)
[information] 5-5: Include file
(missingIncludeSystem)
| try { | ||
| msg = input.tryGet<ADatatype>(); | ||
| } catch(QueueException& e) { | ||
| if(e.what() != CLOSED_QUEUE_MESSAGE) { | ||
| throw; |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In C++, when comparing const char*values likestd::runtime_error::what()and a string literal with!=, does the operator compare pointer addresses or string contents?
💡 Result:
In C++, std::runtime_error::what() returns a const char* (a pointer) to a null-terminated character array. (gcc.gnu.org)
So an expression like:
if (e.what() != "some message") { ... }uses the built-in operator!= for pointers and compares pointer addresses, not the string contents. (It will only be “equal” if both pointers point to the exact same address.) (web.stanford.edu)
To compare the text, use std::strcmp(e.what(), "some message") == 0 (or convert to std::string / std::string_view and compare). (en.cppreference.com)
Citations:
- 1: https://gcc.gnu.org/onlinedocs/gcc-12.5.0/libstdc%2B%2B/api/a03375.html?utm_source=openai
- 2: https://web.stanford.edu/class/archive/cs/cs107/cs107.1194/lectures/4/Lecture4.pdf?utm_source=openai
- 3: https://en.cppreference.com/w/cpp/string/byte/strcmp?utm_source=openai
🏁 Script executed:
# First, locate the MessageQueue.cpp file
fd MessageQueue.cppRepository: luxonis/depthai-core
Length of output: 93
🏁 Script executed:
# Once we have the file, read the context around lines 281-285
if [ -f "src/pipeline/MessageQueue.cpp" ]; then
echo "=== Lines 275-290 of MessageQueue.cpp ==="
sed -n '275,290p' src/pipeline/MessageQueue.cpp
fiRepository: luxonis/depthai-core
Length of output: 587
🏁 Script executed:
# Search for CLOSED_QUEUE_MESSAGE definition
rg "CLOSED_QUEUE_MESSAGE" --type cpp -B 2 -A 2Repository: luxonis/depthai-core
Length of output: 6207
Fix pointer comparison to string comparison in exception handler.
e.what() and CLOSED_QUEUE_MESSAGE are both const char*, so != compares pointer addresses, not string contents. This causes the closed-queue exception to be rethrown instead of silently ignored, breaking the intended exception filtering logic.
Suggested fix
- if(e.what() != CLOSED_QUEUE_MESSAGE) {
+ if(std::string{e.what()} != CLOSED_QUEUE_MESSAGE) {
throw;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try { | |
| msg = input.tryGet<ADatatype>(); | |
| } catch(QueueException& e) { | |
| if(e.what() != CLOSED_QUEUE_MESSAGE) { | |
| throw; | |
| try { | |
| msg = input.tryGet<ADatatype>(); | |
| } catch(QueueException& e) { | |
| if(std::string{e.what()} != CLOSED_QUEUE_MESSAGE) { | |
| throw; | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/pipeline/MessageQueue.cpp` around lines 281 - 285, The exception handler
in MessageQueue.cpp is comparing e.what() and CLOSED_QUEUE_MESSAGE with != which
compares pointers; change it to compare string contents (e.g., use
std::string(e.what()) != CLOSED_QUEUE_MESSAGE or strcmp(e.what(),
CLOSED_QUEUE_MESSAGE) != 0) so the closed-queue message is detected correctly
and only other exceptions are rethrown; locate the try/catch around
input.tryGet<ADatatype>() and the QueueException handling to apply this fix.
Fixes race condition in
getAnyby implementingBinarySemaphore(should be replaced withstd::binary_semaphorewhen we switch to C++20) andManyToOneNotifier. Also addswaitAny.Might need some extra eyes on this 👀
Here's why I believe this works:
Summary by CodeRabbit
Release Notes
New Features
Improvements
API Changes
Tests