Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/node_mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ class ConditionVariableBase {
inline void Broadcast(const ScopedLock&);
inline void Signal(const ScopedLock&);
inline void Wait(const ScopedLock& scoped_lock);
// Returns 0 if signaled, UV_ETIMEDOUT on timeout.
inline int TimedWait(const ScopedLock& scoped_lock, uint64_t timeout);

ConditionVariableBase(const ConditionVariableBase&) = delete;
ConditionVariableBase& operator=(const ConditionVariableBase&) = delete;
Expand Down Expand Up @@ -175,6 +177,11 @@ struct LibuvMutexTraits {
uv_cond_wait(cond, mutex);
}

static inline int cond_timedwait(CondT* cond, MutexT* mutex,
uint64_t timeout) {
return uv_cond_timedwait(cond, mutex, timeout);
}

static inline void mutex_destroy(MutexT* mutex) {
uv_mutex_destroy(mutex);
}
Expand Down Expand Up @@ -249,6 +256,12 @@ void ConditionVariableBase<Traits>::Wait(const ScopedLock& scoped_lock) {
Traits::cond_wait(&cond_, &scoped_lock.mutex_.mutex_);
}

template <typename Traits>
int ConditionVariableBase<Traits>::TimedWait(const ScopedLock& scoped_lock,
uint64_t timeout) {
return Traits::cond_timedwait(&cond_, &scoped_lock.mutex_.mutex_, timeout);
}

template <typename Traits>
MutexBase<Traits>::MutexBase() {
CHECK_EQ(0, Traits::mutex_init(&mutex_));
Expand Down
57 changes: 38 additions & 19 deletions src/node_platform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,14 @@ void WorkerThreadsTaskRunner::BlockingDrain() {
pending_worker_tasks_.Lock().BlockingDrain();
}

bool WorkerThreadsTaskRunner::TimedBlockingDrain(uint64_t timeout_in_ns) {
return pending_worker_tasks_.Lock().TimedBlockingDrain(timeout_in_ns);
}

bool WorkerThreadsTaskRunner::HasOutstandingTasks() {
return pending_worker_tasks_.Lock().HasOutstandingTasks();
}

void WorkerThreadsTaskRunner::Shutdown() {
pending_worker_tasks_.Lock().Stop();
delayed_task_scheduler_->Stop();
Expand Down Expand Up @@ -581,26 +589,23 @@ void NodePlatform::DrainTasks(Isolate* isolate) {
if (!per_isolate) return;

do {
// FIXME(54918): we should not be blocking on the worker tasks on the
// main thread in one go. Doing so leads to two problems:
// 1. If any of the worker tasks post another foreground task and wait
// for it to complete, and that foreground task is posted right after
// we flush the foreground task queue and before the foreground thread
// goes into sleep, we'll never be able to wake up to execute that
// foreground task and in turn the worker task will never complete, and
// we have a deadlock.
// 2. Worker tasks can be posted from any thread, not necessarily associated
// with the current isolate, and we can be blocking on a worker task that
// is associated with a completely unrelated isolate in the event loop.
// This is suboptimal.
// Worker tasks (e.g. V8 JIT compilation) may post foreground tasks and
// wait for their completion. If we block indefinitely on worker tasks
// without flushing foreground tasks, those worker tasks can never finish,
// causing a deadlock (see https://github.com/nodejs/node/issues/54918).
//
// However, not blocking on the worker tasks at all can lead to loss of some
// critical user-blocking worker tasks e.g. wasm async compilation tasks,
// which should block the main thread until they are completed, as the
// documentation suggets. As a compromise, we currently only block on
// user-blocking tasks to reduce the chance of deadlocks while making sure
// that criticl user-blocking tasks are not lost.
worker_thread_task_runner_->BlockingDrain();
// To avoid this, we interleave: wait briefly for worker tasks to complete,
// then flush any foreground tasks that may have been posted, and repeat.
// This ensures foreground tasks posted by workers get a chance to run.
while (worker_thread_task_runner_->HasOutstandingTasks()) {
// Wait up to 1ms for outstanding worker tasks to complete.
constexpr uint64_t kDrainTimeoutNs = 1'000'000; // 1ms
if (worker_thread_task_runner_->TimedBlockingDrain(kDrainTimeoutNs)) {
break; // All outstanding tasks drained.
}
// Flush foreground tasks that worker tasks may be waiting on.
per_isolate->FlushForegroundTasksInternal();
}
} while (per_isolate->FlushForegroundTasksInternal());
}

Expand Down Expand Up @@ -832,6 +837,20 @@ void TaskQueue<T>::Locked::BlockingDrain() {
}
}

template <class T>
bool TaskQueue<T>::Locked::TimedBlockingDrain(uint64_t timeout_in_ns) {
while (queue_->outstanding_tasks_ > 0) {
int r = queue_->outstanding_tasks_drained_.TimedWait(lock_, timeout_in_ns);
if (r != 0) return false; // Timed out, still has outstanding tasks.
}
return true;
}

template <class T>
bool TaskQueue<T>::Locked::HasOutstandingTasks() {
return queue_->outstanding_tasks_ > 0;
}

template <class T>
void TaskQueue<T>::Locked::Stop() {
queue_->stopped_ = true;
Expand Down
5 changes: 5 additions & 0 deletions src/node_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class TaskQueue {
std::unique_ptr<T> BlockingPop();
void NotifyOfOutstandingCompletion();
void BlockingDrain();
// Returns true if all outstanding tasks are drained, false on timeout.
bool TimedBlockingDrain(uint64_t timeout_in_ns);
bool HasOutstandingTasks();
void Stop();
PriorityQueue PopAll();

Expand Down Expand Up @@ -196,6 +199,8 @@ class WorkerThreadsTaskRunner {
double delay_in_seconds);

void BlockingDrain();
bool TimedBlockingDrain(uint64_t timeout_in_ns);
bool HasOutstandingTasks();
void Shutdown();

int NumberOfWorkerThreads() const;
Expand Down
Loading