Skip to content

Fix getAny race condition#1644

Open
asahtik wants to merge 23 commits intodevelopfrom
bugfix/get_any_race_condition
Open

Fix getAny race condition#1644
asahtik wants to merge 23 commits intodevelopfrom
bugfix/get_any_race_condition

Conversation

@asahtik
Copy link
Copy Markdown
Contributor

@asahtik asahtik commented Jan 21, 2026

Fixes race condition in getAny by implementing BinarySemaphore (should be replaced with std::binary_semaphore when we switch to C++20) and ManyToOneNotifier. Also adds waitAny.

Might need some extra eyes on this 👀

Here's why I believe this works:

func waitAny():
  1. add notifiers
  // If we get a message here the next line will evaluate to true + notifier semaphore will have available set
  2. check if any messages are already present
  // If we get a message here it will be caught in the notifier predicate + notifier semaphore will have available set
  3. check predicate
  // If we get a message here the notifier semaphore will have available set
  4. acquire semaphore
  // From here on the semaphore should ensure that the thread continues when available is set
  5. go to 3. which should now return true
// This is implemented according to the reference so it should be correct
func Sem::acquire:
  6. lock the available variable
  // If there were any new messages from point 3 to 6, the next line will evaluate to true
  7. check predicate (is available true - in cv wait)
  8. unlock
  9. sleep & wait for notify

Summary by CodeRabbit

Release Notes

  • New Features

    • Added optional timeout parameter to multi-queue waiting operations for improved control over blocking behavior.
    • Introduced new synchronization utilities for advanced waiting and notification scenarios.
  • Improvements

    • Enhanced thread-safe access to queue status checks.
    • Updated multi-queue message retrieval with more efficient synchronization mechanisms.
  • API Changes

    • Queue listener registration methods renamed for clarity.
  • Tests

    • Added comprehensive test coverage for multi-queue operations and timeout scenarios.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request fixes a race condition in the getAny method by replacing condition variables with a new synchronization mechanism consisting of BinarySemaphore and ManyToOneNotifier. The race condition occurred when a message arrived between checking for messages and waiting on the condition variable, causing the notification to be missed. The new implementation uses a binary semaphore that "remembers" notifications, ensuring no messages are lost.

Changes:

  • Introduces BinarySemaphore class as a temporary C++17-compatible alternative to std::binary_semaphore
  • Implements ManyToOneNotifier for many-to-one thread synchronization with predicate support
  • Refactors MessageQueue to use notifiers instead of condition variables, with a new waitAny static method
  • Adds thread-safe locking to LockingQueue::isDestroyed() method

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
include/depthai/utility/BinarySemaphore.hpp New binary semaphore implementation with acquire/release semantics and timeout support
include/depthai/utility/ManyToOneNotifier.hpp New notifier class that wraps BinarySemaphore with predicate-based waiting
include/depthai/pipeline/MessageQueue.hpp Updated to use notifiers instead of condition variables, added waitAny declaration
src/pipeline/MessageQueue.cpp Refactored getAny to use new waitAny method, renamed methods from condVar to notifier terminology
include/depthai/utility/LockingQueue.hpp Added mutex lock to isDestroyed() for thread safety
tests/src/onhost_tests/calibration_handler_test.cpp Reordered includes to follow standard library first convention

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 86 to 92
queue = std::move(m.queue);
name = std::move(m.name);
callbacks = std::move(m.callbacks);
notifiers = std::move(m.notifiers);
uniqueCallbackId = m.uniqueCallbackId;
pipelineEventDispatcher = m.pipelineEventDispatcher;
return *this;
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing uniqueNotifierId in assignment operators. The move assignment operator copies uniqueCallbackId (line 90) but doesn't copy/move uniqueNotifierId. This is inconsistent and could lead to ID collisions if notifiers are added after assignment, since the ID counter starts at 0 but the notifiers map may contain entries with higher IDs from the source object.

Copilot uses AI. Check for mistakes.
Comment thread src/pipeline/MessageQueue.cpp Outdated
Comment on lines +264 to +265
removeNotifiers();
return false;
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unreachable code detected. After the try-catch block completes successfully (line 259 returns true), execution will never reach lines 264-265. The removeNotifiers() call on line 264 will never execute, which means notifiers added to the queues will not be removed in the success case. This will cause a resource leak as notifiers accumulate in the MessageQueue instances.

Copilot uses AI. Check for mistakes.
CallbackId uniqueCondVarId{0};
std::mutex notifierMtx;
std::unordered_map<CallbackId, std::shared_ptr<ManyToOneNotifier>> notifiers;
;
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extraneous semicolon after member declaration. This line contains only a semicolon which creates an unnecessary empty declaration.

Suggested change
;

Copilot uses AI. Check for mistakes.
Comment on lines 76 to 82
queue = c.queue;
name = c.name;
callbacks = c.callbacks;
notifiers = c.notifiers;
uniqueCallbackId = c.uniqueCallbackId;
pipelineEventDispatcher = c.pipelineEventDispatcher;
return *this;
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing uniqueNotifierId in assignment operators. The copy and move assignment operators copy uniqueCallbackId (lines 80 and 90) but don't copy uniqueNotifierId. This is inconsistent and could lead to ID collisions if notifiers are added after assignment, since the ID counter starts at 0 but the notifiers map may contain entries with higher IDs from the source object.

Copilot uses AI. Check for mistakes.
Comment thread src/pipeline/MessageQueue.cpp Outdated
std::lock_guard<std::mutex> lock(cvNotifyMtx);
std::lock_guard<std::mutex> lock(notifierMtx);

// Call all callbacks
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misleading comment. The comment says "Call all callbacks" but the code actually notifies all notifiers, not callbacks. The comment should be updated to say "Notify all notifiers" or "Notify all listeners" to match the function name.

Suggested change
// Call all callbacks
// Notify all listeners

Copilot uses AI. Check for mistakes.
Comment thread src/pipeline/MessageQueue.cpp Outdated
std::unique_lock<std::mutex> lock(cvNotifyMtx);
std::unique_lock<std::mutex> lock(notifierMtx);

// If callback with id 'callbackId' doesn't exists, return false
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorrect variable reference in comment. The comment refers to 'callbackId' but the parameter is named 'notifierId'. The comment should be updated to: "If notifier with id 'notifierId' doesn't exist, return false".

Suggested change
// If callback with id 'callbackId' doesn't exists, return false
// If notifier with id 'notifierId' doesn't exist, return false

Copilot uses AI. Check for mistakes.
Comment on lines +1 to +59
#pragma once

#include <chrono>
#include <condition_variable>
#include <mutex>

/**
* @brief A binary semaphore implementation
* Should be removed and replaced with std::binary_semaphore if and when C++20 is supported
*/
class BinarySemaphore {
std::mutex mtx;
std::condition_variable cv;
bool available = false; // zero-initialized

public:
BinarySemaphore() = default;
explicit BinarySemaphore(bool initiallyAvailable) : available(initiallyAvailable) {}

// signal / post / V
void release() {
{
std::lock_guard<std::mutex> lk(mtx);
available = true;
}
cv.notify_one();
}

// wait / P
void acquire() {
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, [&] { return available; });
available = false; // consume
}

// try-wait
bool tryAcquire() {
std::lock_guard<std::mutex> lk(mtx);
if(!available) return false;
available = false;
return true;
}

template <class Rep, class Period>
bool tryAcquireFor(const std::chrono::duration<Rep, Period>& timeout) {
std::unique_lock<std::mutex> lk(mtx);
if(!cv.wait_for(lk, timeout, [&] { return available; })) return false;
available = false;
return true;
}

template <class Clock, class Duration>
bool tryAcquireUntil(const std::chrono::time_point<Clock, Duration>& deadline) {
std::unique_lock<std::mutex> lk(mtx);
if(!cv.wait_until(lk, deadline, [&] { return available; })) return false;
available = false; // consume
return true;
}
};
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing namespace declaration. The BinarySemaphore class is defined in the global namespace, which could lead to naming conflicts. Other utility classes in the depthai project (like LockingQueue) are defined within the 'dai' namespace. Consider wrapping this class in the 'dai' namespace for consistency and to avoid potential naming conflicts.

Copilot uses AI. Check for mistakes.
Comment on lines +1 to +66
#pragma once

#include <atomic>
#include <stdexcept>

#include "BinarySemaphore.hpp"

class ManyToOneNotifier {
BinarySemaphore semaphore{false};
std::atomic<bool> waiting{false};

public:
ManyToOneNotifier() = default;

/**
* @brief Notify the waiting thread
* Should always be called after the state change that the waiting thread is waiting for
*/
void notifyOne() {
semaphore.release();
}

/**
* @brief Wait until notified
* @arg pred Predicate to check after each notification - IMPORTANT: must be thread-safe
*/
template <typename Pred>
void wait(Pred pred) {
if(waiting.exchange(true)) {
throw std::runtime_error("ManyToOneNotifier: Multiple threads waiting on the same notifier is not supported");
}
try {
while(!pred()) semaphore.acquire();
} catch(...) {
waiting = false;
throw;
}
waiting = false;
}

/**
* @brief Wait until notified or timeout occurs
* @arg pred Predicate to check after each notification - IMPORTANT: must be thread-safe
*/
template <typename Pred, typename Rep, typename Period>
bool waitFor(Pred pred, std::chrono::duration<Rep, Period> timeout) {
if(waiting.exchange(true)) {
throw std::runtime_error("ManyToOneNotifier: Multiple threads waiting on the same notifier is not supported");
}

try {
auto deadline = std::chrono::steady_clock::now() + timeout;

while(!pred()) {
if(!semaphore.tryAcquireUntil(deadline)) {
return pred();
}
}
return true;
} catch(...) {
waiting = false;
throw;
}
waiting = false;
}
};
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing namespace declaration. The ManyToOneNotifier class is defined in the global namespace, which could lead to naming conflicts. Other utility classes in the depthai project (like LockingQueue) are defined within the 'dai' namespace. Consider wrapping this class in the 'dai' namespace for consistency and to avoid potential naming conflicts.

Copilot uses AI. Check for mistakes.
#pragma once

#include <atomic>
#include <stdexcept>
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing chrono header include. The waitFor method uses std::chrono::duration and std::chrono::steady_clock but the header is not included. This works only because BinarySemaphore.hpp includes , creating a hidden dependency. Add #include for proper self-contained header design.

Suggested change
#include <stdexcept>
#include <stdexcept>
#include <chrono>

Copilot uses AI. Check for mistakes.
template <typename Pred>
void wait(Pred pred) {
if(waiting.exchange(true)) {
throw std::runtime_error("ManyToOneNotifier: Multiple threads waiting on the same notifier is not supported");
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of the condition that only one thread can wait for a notifier; what is that beneficial for?
In the message queues, you typically want that many threads wait in a notification queue and only one thread gets notified.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this class is for one main thread to wait on any one of multiple worker threads. This check ensures that someone using this class doesn't get unexpected results if they try to wait from multiple threads.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that a beneficial behavior?
If you did not use a BinarySemaphore but used a class cv + mutex, then there could be more waiting threads waiting while Worker threads could be notifying them.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I used a mutex I'd have to lock the queue.push method and no output would be able to send while another watched queue is being sent to or while checking the predicate. I'd also have to add a lock for each waiting thread which would increase code complexity, which is not necessary since this accomplishes the goal of this PR.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we have the following scenario - and correct me, if I am wrong.

  1. Multiple writers. One writer doesn't wait for a signal, it tries to push to a particular queue. But there may be multiple writers trying to push data to the same queue.
  2. Multiple readers on the particular queue. A reader is waiting to get a signal that the queue has any data available - and it reads the data and processes further.

What about this solution then:

  1. A worker thread (writer) that pushes data to a queue
    1.1. The thread locks the queue with queue's related queueMutex
    1.2. Data is pushed into queue
    1.3. The queue is unlocked with queue's related queueMutex
    1.4. A conditional variable dataReady is called to notify max. one thread

  2. A worker thread (reader) that waits for a data from a queue:
    2.1 The queue locks the queue with queue's related queueMutex
    2.2 Conditional variable dataReady->wait(queueMutex...) is called:
    2.2.1 Checks the predicate and if true, return
    2.2.2 Unlocks the queueMutex
    2.2.3 Waits for a signal from dataReady (with unlocked queueMutex)
    2.2.4 Locks the queueMutex and goes to 2.2.1
    2.2 The worker thread reads the data from the queue
    2.3 The worker thread unlocks the queueMutex

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, you're describing the way queue.get() works. This is a separate thing. Each message queue already has its own conditional variable and mutex that handles pushes and pops from the queue.

The issue I had for getAny is that there isn't a mechanism built in to only wait until one conditional variable notifies. To do this you need to pass a single condition variable instance to multiple queues and wait until one notifies.

So the way it works now is that each time you want to wait on any of a list of queues, a semaphore is created and added to every queue in the list. When a new message arrives to any of the queues (or when a queue is destroyed) the semaphore is notified, and the waiting thread gets woken up.

std::lock_guard<std::mutex> lk(mtx);
available = true;
}
cv.notify_one();
Copy link
Copy Markdown
Collaborator

@majvan majvan Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we talk about BinarySemaphore (only one waiting), then you can notify conditionally.
If a semaphore was already released and anybody would call "release", do not notify. Take it as an erroneous situation with no action.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's no waiting thread then there's practically no overhead to notifying, which would be the case if available was already true. If I conditionally notified, I'd have to do it under the lock which would mean that the waiting thread would wake up and then immediately block and wait for the mutex to be released which would cause two extra context switches.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure you understood me. Let's make it clear.

This is one scenario I was thinking about:

    void release() {
        ASSERT(available == true, "There is a bug in the SW, a thread calls release while the semaphore is not acquired. We will not call notify_one");
        {
            std::lock_guard<std::mutex> lk(mtx);
            available = true;
        }
        cv.notify_one();
    }

Another one, without ASSERT but with

   void release() {
       bool was_available = false;
       {
           std::lock_guard<std::mutex> lk(mtx);
           was_available = available;
           available = true;
       }
       if (!was_available)
           cv.notify_one();  // call notify one only in an expected scenario.
   }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. notify_one has little overhead when there are no waiters but it certainly doesn't hurt to add this. Thanks

@asahtik asahtik added the testable PR is ready to be tested label Jan 26, 2026
@asahtik asahtik requested review from majvan and removed request for moratom January 26, 2026 15:18
@asahtik
Copy link
Copy Markdown
Contributor Author

asahtik commented Jan 27, 2026

@majvan Do you have any more concerns or can this be merged?

throw std::runtime_error("ManyToOneNotifier: Multiple threads waiting on the same notifier is not supported");
}
try {
while(!pred()) semaphore.acquire();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two issues.

[1]
We check pred() and then (maybe) acquire a semaphore
This function has then two expected outcomes then:

  • the pred was fulfilled BUT NO semaphore acquired (the case when the pred() returns true the first time)
  • the pred was fulfilled AND semaphore acquired (the case when the pred() returns true the next time)
    The semaphore acquisition is not causally linked to the predicate becoming true.

[2]
Typically the Pred in the wait (like cv::wait) are used just because the whole pred() is running protected under the mutex. The reason is that the mutex ensures the atomicity of the operation together with cv's waiting cycle.

It's not a case here. If pred() is an unprotected function, it can be preempted by other thread which can access the state of the pred() - and this creates a potential RACE CONDITION.

Example:
// We need to ensure that the queue has data and that camera is on (an abstract example)
bool pred() {
if (!camera_on)
return false;
if (queue.empty)
return false;
return true;
}

Step 0 - initial state:
camera_on = true;
queue.empty = true;

Step 1 - ThreadA acquires a semaphore and continues:
gets into pred() and computes camera_on = true
-- preemptive context switch --

Step 2 - Thread C runs:
{
lock(camera_and_queue);
camera_on = false; // just set that the camera is off
}
-- preemptive context switch --

Step 3 - ThreadA continues:
continues the pred() and computes queue.empty = false and returns true
ManyToWaitNotifier::wait() returns but the state is:
camera_on = false; <-- RACE CONDITION: we wanted to wait only till camera is on

The predicate reads shared state (camera_on, queue.empty) without holding the mutex that protects it. This allows concurrent modification by other threads, resulting in undefined behavior and inconsistent observations of the system state.
In contrast, std::condition_variable::wait(lock, pred) guarantees that the predicate is evaluated while holding the mutex, making the check and sleep cycle atomic.

Another outcome of [2] is a possible deadlock:

Step 1: ThreadA - runs:
if (!pred())
semaphore.acquire(); // waits in the semaphore for the cv

Step 2: ThreadB - is about to release the semaphore:
{
std::lock(camera_and_queue)
queue.empty = false;
notifyOne(); -> semaphore.release(); // notify there is next some data, release the semaphore
}

Step 3: ThreadA - continues:
semaphore.acquire(); // finishes the semaphore acquiring
-- preemptive context switch --

Step 4: ThreadC - runs:
{
lock(camera_and_queue);
camera_on = false; // just set that the camera is off
}
-- preemptive context switch --

Step 5 - ThreadA continues:
if (!pred())
semaphore.acquire(); // DEADLOCK <-- semaphore count is zero and no further release will occur; ThreadA will get not awaken anymore

The semaphore release is not tied to the predicate remaining true.
A thread may wake, consume the semaphore, and then observe the predicate as false due to a concurrent state change. Since the semaphore signal has already been consumed and no further releases are guaranteed, the waiting thread may block forever.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second issue is documented:

Should always be called after the state change that the waiting thread is waiting for

(maybe the wording is not clear, should it be 'after any state change'?)

ThreadC should notify after setting camera_on to false.

The first issue is a good point. I'll document this limitation.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constraint works for the deadlock, but these rules have to be followed.
And it brings additional unnecessary wakeups even in the case your state is going "against" the wakeup (why would you wakeup threads when you switch off the camera when you want them to work when the camera is switched on?)

Using std::condition_variable is much easier and exactly what you need here without reinventing it. :)

Comment thread src/pipeline/MessageQueue.cpp Outdated
std::mutex inputsWaitMutex;
auto inputsWaitCv = std::make_shared<std::condition_variable>();
};
auto checkAllClosed = [&]() { return boost::algorithm::all_of(queues, [](const std::reference_wrapper<MessageQueue> q) { return q.get().isClosed(); }); };
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no boost please!

Comment thread src/pipeline/MessageQueue.cpp Outdated
if(gotAny) {
for(const auto& kv : queues) {
auto& input = kv.second;
if(!input.isClosed() && input.has()) inputs[kv.first] = input.get<ADatatype>();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be this atomically checked and retrieved?
Get an input only when it is not closed and has data. Any modification and readout of the queue state should be mutex-ed.

@moratom
Copy link
Copy Markdown
Collaborator

moratom commented Mar 19, 2026

@CodeRabbit review

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 19, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 19, 2026

📝 Walkthrough

Walkthrough

This pull request refactors MessageQueue's synchronization mechanism, replacing condition-variable-based notification with a new notifier abstraction. Two new utility classes—BinarySemaphore and WaitAnyNotifier—are introduced to manage thread coordination. MessageQueue's public API is updated with renamed methods and timeout support for multi-queue waiting. Thread safety in LockingQueue is improved, and comprehensive test coverage for multi-queue operations is added.

Changes

Cohort / File(s) Summary
Synchronization Utilities
include/depthai/utility/BinarySemaphore.hpp, include/depthai/utility/WaitAnyNotifier.hpp
New low-level synchronization primitives: BinarySemaphore provides acquire/release semantics with timeout variants, and WaitAnyNotifier wraps a BinarySemaphore to notify waiting threads with predicate-based wakeup logic and concurrent-wait detection.
MessageQueue API Refactoring
include/depthai/pipeline/MessageQueue.hpp
Public method signatures updated: addCondVar/removeCondVar renamed to addNotifier/removeNotifier, and waitAny() now accepts optional timeout parameter. Internal storage switched from condition_variable map to WaitAnyNotifier map.
MessageQueue Implementation
src/pipeline/MessageQueue.cpp
Core notification logic refactored: notifyCondVars() renamed to notifyListeners() and now calls notifyOne() on WaitAnyNotifier instances. waitAny() reimplemented to use shared WaitAnyNotifier with predicate-based waiting and proper exception handling. getAny() updated to leverage new waitAny() signature.
Queue Access Safety
include/depthai/utility/LockingQueue.hpp
isDestroyed() method now acquires mutex guard before reading destructed flag, ensuring thread-safe access consistent with other queue accessors.
Test Coverage
tests/src/onhost_tests/message_queue_test.cpp
Added four new Catch2 tests covering multi-queue getAny() and waitAny() scenarios: basic multi-queue retrieval, synchronous waiting, asynchronous waiting with concurrent sends, and timeout behavior.

Sequence Diagram

sequenceDiagram
    participant Client
    participant MessageQueue as MessageQueue<br/>(Primary)
    participant Notifier as WaitAnyNotifier
    participant Semaphore as BinarySemaphore
    participant WaitThread as Waiting Thread

    Client->>MessageQueue: waitAny(queues, timeout)
    MessageQueue->>Notifier: create shared instance
    MessageQueue->>MessageQueue: subscribe all queues to notifier
    MessageQueue->>WaitThread: begin waiting
    WaitThread->>Notifier: wait(predicate)
    Notifier->>Semaphore: acquire with timeout
    Semaphore-->>Notifier: block until available
    
    Client->>MessageQueue: send(data) on queue[i]
    MessageQueue->>Notifier: notifyListeners()
    Notifier->>Notifier: notifyOne()
    Notifier->>Semaphore: release()
    Semaphore-->>WaitThread: unblock
    
    WaitThread->>Notifier: evaluate predicate
    Notifier-->>Notifier: check if any queue has data
    alt Data Available
        Notifier-->>WaitThread: return true
        WaitThread->>MessageQueue: unsubscribe notifier
        MessageQueue-->>Client: return bool (true)
    else Timeout
        Semaphore-->>WaitThread: timeout expires
        Notifier-->>WaitThread: return false
        WaitThread->>MessageQueue: unsubscribe notifier
        MessageQueue-->>Client: return bool (false)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰 Behold, dear friends, the notifier's grace,
BinarySemaphore keeps time and pace,
Condition variables fade away,
New threads wake up without delay,
With timeout futures, we synchronize the race!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 11.11% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Fix getAny race condition' directly addresses the main objective of the PR—resolving a race condition in the getAny function through refactored synchronization primitives and addition of waitAny.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch bugfix/get_any_race_condition
📝 Coding Plan
  • Generate coding plan for human review comments

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
include/depthai/utility/BinarySemaphore.hpp (1)

24-31: 🧹 Nitpick | 🔵 Trivial

Only notify on the false -> true transition.

notify is always set to true, so a duplicate release() still wakes a waiter even when available was already true. That defeats the conditional-notify behavior discussed earlier.

Suggested fix
     void release() {
         bool notify = false;
         {
             std::lock_guard<std::mutex> lk(mtx);
-            available = true;
-            notify = true;
+            notify = !available;
+            available = true;
         }
         if(notify) cv.notify_one();
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/depthai/utility/BinarySemaphore.hpp` around lines 24 - 31, In
BinarySemaphore.hpp's release() method the notify flag is unconditionally true
so calling release() when available is already true will still wake a waiter;
change the locked section in release() (function name: release, members:
available, mtx, cv) to only set notify = true when available was false (i.e.,
check previous state under the mutex and set available = true and notify = true
only on the false->true transition) so cv.notify_one() runs only when the
semaphore actually became available.
src/pipeline/MessageQueue.cpp (1)

229-260: ⚠️ Potential issue | 🟠 Major

checkForMessages() can still break on a close race.

It builds readiness from !isClosed() and has() under separate queue calls. If a queue closes between those two checks, has() throws QueueException; pred() converts that into true, but Line 260 calls the same helper again outside the catch. That means a close racing with this path can either end waitAny() early or leak an unexpected QueueException even though closed queues are supposed to be tolerated here. Please snapshot {closed, has_message} under one queue lock instead of composing two accessors.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/pipeline/MessageQueue.cpp` around lines 229 - 260, The predicate
checkForMessages() races because it calls q.get().isClosed() and q.get().has()
separately; if the queue closes between those calls has() may throw and the
later call to checkForMessages() (line assigning gotMessage) can propagate
QueueException. Change checkForMessages() to query each queue under a single
atomic snapshot API or lock: for each input use one call that returns both
closed and has_message together (or add a
MessageQueue::snapshotStatus()/tryHasMessage() that acquires the queue lock and
returns {closed, has}), then base readiness on that single snapshot; update
pred() and the final gotMessage = ... line to use this snapshot helper so no
separate isClosed()/has() calls are made outside the exception-safe block (refer
to functions checkForMessages, pred, and the final gotMessage assignment).
include/depthai/utility/WaitAnyNotifier.hpp (1)

39-46: ⚠️ Potential issue | 🔴 Critical

The semaphore wake can still be lost to a competing consumer.

wait() and waitFor() only consume a token and then re-read pred() outside the state's mutex. In src/pipeline/MessageQueue.cpp, any concurrent get()/tryGet()/getAny() on the same queues can drain the notified message before the waiter rechecks checkForMessages(). Once that token is consumed, the next loop iteration can block forever. The current comment only bans another waiting thread, but a non-waiting consumer is enough to trigger the same race.

Also applies to: 74-82

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/depthai/utility/WaitAnyNotifier.hpp` around lines 39 - 46, The wake
token can be stolen by a competing non-waiting consumer because wait(Pred) and
waitFor(...) consume the semaphore then re-check pred() outside synchronization;
fix by making the wake check and token consumption atomic-relative to
notifications: add a monotonic sequence/version counter in the WaitAnyNotifier
class that notify() increments, have wait(Pred) and waitFor(...) sample the
current version before waiting and loop until the sampled version changes and
pred() is true (use the semaphore only as a wake hint), or alternatively hold
the same mutex while checking pred() and consuming the semaphore so no other
consumer can drain the signalled message between the pred check and semaphore
acquire; update the implementations of wait(Pred), waitFor(...), and notify()
accordingly (referencing wait(Pred), waitFor(Pred, ...), and notify()).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/pipeline/MessageQueue.cpp`:
- Around line 281-285: The exception handler in MessageQueue.cpp is comparing
e.what() and CLOSED_QUEUE_MESSAGE with != which compares pointers; change it to
compare string contents (e.g., use std::string(e.what()) != CLOSED_QUEUE_MESSAGE
or strcmp(e.what(), CLOSED_QUEUE_MESSAGE) != 0) so the closed-queue message is
detected correctly and only other exceptions are rethrown; locate the try/catch
around input.tryGet<ADatatype>() and the QueueException handling to apply this
fix.

---

Duplicate comments:
In `@include/depthai/utility/BinarySemaphore.hpp`:
- Around line 24-31: In BinarySemaphore.hpp's release() method the notify flag
is unconditionally true so calling release() when available is already true will
still wake a waiter; change the locked section in release() (function name:
release, members: available, mtx, cv) to only set notify = true when available
was false (i.e., check previous state under the mutex and set available = true
and notify = true only on the false->true transition) so cv.notify_one() runs
only when the semaphore actually became available.

In `@include/depthai/utility/WaitAnyNotifier.hpp`:
- Around line 39-46: The wake token can be stolen by a competing non-waiting
consumer because wait(Pred) and waitFor(...) consume the semaphore then re-check
pred() outside synchronization; fix by making the wake check and token
consumption atomic-relative to notifications: add a monotonic sequence/version
counter in the WaitAnyNotifier class that notify() increments, have wait(Pred)
and waitFor(...) sample the current version before waiting and loop until the
sampled version changes and pred() is true (use the semaphore only as a wake
hint), or alternatively hold the same mutex while checking pred() and consuming
the semaphore so no other consumer can drain the signalled message between the
pred check and semaphore acquire; update the implementations of wait(Pred),
waitFor(...), and notify() accordingly (referencing wait(Pred), waitFor(Pred,
...), and notify()).

In `@src/pipeline/MessageQueue.cpp`:
- Around line 229-260: The predicate checkForMessages() races because it calls
q.get().isClosed() and q.get().has() separately; if the queue closes between
those calls has() may throw and the later call to checkForMessages() (line
assigning gotMessage) can propagate QueueException. Change checkForMessages() to
query each queue under a single atomic snapshot API or lock: for each input use
one call that returns both closed and has_message together (or add a
MessageQueue::snapshotStatus()/tryHasMessage() that acquires the queue lock and
returns {closed, has}), then base readiness on that single snapshot; update
pred() and the final gotMessage = ... line to use this snapshot helper so no
separate isClosed()/has() calls are made outside the exception-safe block (refer
to functions checkForMessages, pred, and the final gotMessage assignment).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: eaa89e5d-54e6-41ae-b3ca-a5a430959072

📥 Commits

Reviewing files that changed from the base of the PR and between 9f4d381 and 020f145.

📒 Files selected for processing (6)
  • include/depthai/pipeline/MessageQueue.hpp
  • include/depthai/utility/BinarySemaphore.hpp
  • include/depthai/utility/LockingQueue.hpp
  • include/depthai/utility/WaitAnyNotifier.hpp
  • src/pipeline/MessageQueue.cpp
  • tests/src/onhost_tests/message_queue_test.cpp
📜 Review details
🧰 Additional context used
🧬 Code graph analysis (3)
include/depthai/pipeline/MessageQueue.hpp (2)
tests/src/onhost_tests/message_queue_test.cpp (1)
  • pipelineEventDispatcher (530-532)
src/pipeline/MessageQueue.cpp (2)
  • getAny (268-292)
  • getAny (268-269)
src/pipeline/MessageQueue.cpp (1)
include/depthai/pipeline/MessageQueue.hpp (16)
  • notifier (206-206)
  • callbackId (214-214)
  • notifierId (222-222)
  • QueueException (28-28)
  • QueueException (28-28)
  • QueueException (29-29)
  • msg (47-47)
  • msg (527-527)
  • msg (536-536)
  • msg (545-545)
  • msg (552-552)
  • queue (241-246)
  • queues (94-94)
  • queues (95-96)
  • getAny (98-109)
  • getAny (98-99)
tests/src/onhost_tests/message_queue_test.cpp (2)
include/depthai/pipeline/MessageQueue.hpp (9)
  • queues (94-94)
  • queues (95-96)
  • getAny (98-109)
  • getAny (98-99)
  • msg (47-47)
  • msg (527-527)
  • msg (536-536)
  • msg (545-545)
  • msg (552-552)
src/pipeline/MessageQueue.cpp (4)
  • getAny (268-292)
  • getAny (268-269)
  • waitAny (220-267)
  • waitAny (220-220)
🪛 Cppcheck (2.20.0)
src/pipeline/MessageQueue.cpp

[information] 5-5: Include file

(missingIncludeSystem)


[information] 5-5: Include file

(missingIncludeSystem)


[information] 5-5: Include file

(missingIncludeSystem)


[information] 5-5: Include file

(missingIncludeSystem)


[information] 5-5: Include file

(missingIncludeSystem)


[information] 5-5: Include file

(missingIncludeSystem)


[information] 5-5: Include file

(missingIncludeSystem)


[information] 5-5: Include file

(missingIncludeSystem)


[information] 5-5: Include file

(missingIncludeSystem)


[information] 5-5: Include file

(missingIncludeSystem)

Comment on lines +281 to +285
try {
msg = input.tryGet<ADatatype>();
} catch(QueueException& e) {
if(e.what() != CLOSED_QUEUE_MESSAGE) {
throw;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🌐 Web query:

In C++, when comparing const char*values likestd::runtime_error::what()and a string literal with!=, does the operator compare pointer addresses or string contents?

💡 Result:

In C++, std::runtime_error::what() returns a const char* (a pointer) to a null-terminated character array. (gcc.gnu.org)

So an expression like:

if (e.what() != "some message") { ... }

uses the built-in operator!= for pointers and compares pointer addresses, not the string contents. (It will only be “equal” if both pointers point to the exact same address.) (web.stanford.edu)

To compare the text, use std::strcmp(e.what(), "some message") == 0 (or convert to std::string / std::string_view and compare). (en.cppreference.com)

Citations:


🏁 Script executed:

# First, locate the MessageQueue.cpp file
fd MessageQueue.cpp

Repository: luxonis/depthai-core

Length of output: 93


🏁 Script executed:

# Once we have the file, read the context around lines 281-285
if [ -f "src/pipeline/MessageQueue.cpp" ]; then
  echo "=== Lines 275-290 of MessageQueue.cpp ==="
  sed -n '275,290p' src/pipeline/MessageQueue.cpp
fi

Repository: luxonis/depthai-core

Length of output: 587


🏁 Script executed:

# Search for CLOSED_QUEUE_MESSAGE definition
rg "CLOSED_QUEUE_MESSAGE" --type cpp -B 2 -A 2

Repository: luxonis/depthai-core

Length of output: 6207


Fix pointer comparison to string comparison in exception handler.

e.what() and CLOSED_QUEUE_MESSAGE are both const char*, so != compares pointer addresses, not string contents. This causes the closed-queue exception to be rethrown instead of silently ignored, breaking the intended exception filtering logic.

Suggested fix
-                if(e.what() != CLOSED_QUEUE_MESSAGE) {
+                if(std::string{e.what()} != CLOSED_QUEUE_MESSAGE) {
                     throw;
                 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try {
msg = input.tryGet<ADatatype>();
} catch(QueueException& e) {
if(e.what() != CLOSED_QUEUE_MESSAGE) {
throw;
try {
msg = input.tryGet<ADatatype>();
} catch(QueueException& e) {
if(std::string{e.what()} != CLOSED_QUEUE_MESSAGE) {
throw;
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/pipeline/MessageQueue.cpp` around lines 281 - 285, The exception handler
in MessageQueue.cpp is comparing e.what() and CLOSED_QUEUE_MESSAGE with != which
compares pointers; change it to compare string contents (e.g., use
std::string(e.what()) != CLOSED_QUEUE_MESSAGE or strcmp(e.what(),
CLOSED_QUEUE_MESSAGE) != 0) so the closed-queue message is detected correctly
and only other exceptions are rethrown; locate the try/catch around
input.tryGet<ADatatype>() and the QueueException handling to apply this fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

testable PR is ready to be tested

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants