Skip to content

Commit f204e02

Browse files
VSadovCopilot
andauthored
Reliability fixes for the Thread Pool (#121887)
When inserting workitems into threadpool queues, we must always guarantee that for every workitem there will be some worker at some point in the future that will certainly notice the presence of the workitem and executes it. There was an attempt to relax the requirements in #100506. Sadly, it leads to occasional deadlocks when items are present in the work queues and no workers are coming to pick them up. The same change was made in all 3 threadpools - IO completion, Sockets and the general purpose ThreadPool. The fix is applied to all three threadpools. We have seen reports about deadlocks when running on net9 or later releases: - In Windows IO completion: #121608 - In general purpose ThreadPool: #119043 - In Unix Sockets: No known reports so far. Perhaps we have not yet seen a case where adding more workitems is indirectly conditioned on existing workitems executed. Without that incoming workitems may "unwedge" the threadpool, and it is just a pause vs. a deadlock, so could be unnoticed. The fix will need to be ported to net10 and net9. Thus this PR tries to restore just the part which changed the enqueuers/workers handshake algorithm. More stuff was piled up into threadpool since the change, so doing the minimal fix without disturbing the rest is somewhat tricky. Fixes: #121608 (definitely, I have tried with the repro) Fixes: #119043 (likely, I do not have a repro to try, but symptoms seem like from the same issue) --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent aa500a4 commit f204e02

File tree

2 files changed

+113
-299
lines changed

2 files changed

+113
-299
lines changed

src/libraries/System.Net.Sockets/src/System/Net/Sockets/SocketAsyncEngine.Unix.cs

Lines changed: 37 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -91,23 +91,18 @@ private static SocketAsyncEngine[] CreateEngines()
9191
//
9292
private readonly ConcurrentQueue<SocketIOEvent> _eventQueue = new ConcurrentQueue<SocketIOEvent>();
9393

94-
// The scheme works as follows:
95-
// - From NotScheduled, the only transition is to Scheduled when new events are enqueued and a work item is enqueued to process them.
96-
// - From Scheduled, the only transition is to Determining right before trying to dequeue an event.
97-
// - From Determining, it can go to either NotScheduled when no events are present in the queue (the previous work item processed all of them)
98-
// or Scheduled if the queue is still not empty (let the current work item handle parallelization as convinient).
99-
//
100-
// The goal is to avoid enqueueing more work items than necessary, while still ensuring that all events are processed.
101-
// Another work item isn't enqueued to the thread pool hastily while the state is Determining,
102-
// instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time.
103-
private enum EventQueueProcessingStage
104-
{
105-
NotScheduled,
106-
Determining,
107-
Scheduled
108-
}
109-
110-
private EventQueueProcessingStage _eventQueueProcessingStage;
94+
// This flag is used for communication between item enqueuing and workers that process the items.
95+
// There are two states of this flag:
96+
// 0: has no guarantees
97+
// 1: means a worker will check work queues and ensure that
98+
// any work items inserted in work queue before setting the flag
99+
// are picked up.
100+
// Note: The state must be cleared by the worker thread _before_
101+
// checking. Otherwise there is a window between finding no work
102+
// and resetting the flag, when the flag is in a wrong state.
103+
// A new work item may be added right before the flag is reset
104+
// without asking for a worker, while the last worker is quitting.
105+
private int _hasOutstandingThreadRequest;
111106

112107
//
113108
// Registers the Socket with a SocketAsyncEngine, and returns the associated engine.
@@ -231,14 +226,9 @@ private void EventLoop()
231226
// The native shim is responsible for ensuring this condition.
232227
Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}");
233228

234-
// Only enqueue a work item if the stage is NotScheduled.
235-
// Otherwise there must be a work item already queued or another thread already handling parallelization.
236-
if (handler.HandleSocketEvents(numEvents) &&
237-
Interlocked.Exchange(
238-
ref _eventQueueProcessingStage,
239-
EventQueueProcessingStage.Scheduled) == EventQueueProcessingStage.NotScheduled)
229+
if (handler.HandleSocketEvents(numEvents))
240230
{
241-
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
231+
EnsureWorkerScheduled();
242232
}
243233
}
244234
}
@@ -248,70 +238,40 @@ private void EventLoop()
248238
}
249239
}
250240

251-
private void UpdateEventQueueProcessingStage(bool isEventQueueEmpty)
241+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
242+
private void EnsureWorkerScheduled()
252243
{
253-
if (!isEventQueueEmpty)
254-
{
255-
// There are more events to process, set stage to Scheduled and enqueue a work item.
256-
_eventQueueProcessingStage = EventQueueProcessingStage.Scheduled;
257-
}
258-
else
244+
// Only one worker is requested at a time to mitigate Thundering Herd problem.
245+
if (Interlocked.Exchange(ref _hasOutstandingThreadRequest, 1) == 0)
259246
{
260-
// The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining
261-
// otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer
262-
// would not have scheduled a work item to process the work, so schedule one now.
263-
EventQueueProcessingStage stageBeforeUpdate =
264-
Interlocked.CompareExchange(
265-
ref _eventQueueProcessingStage,
266-
EventQueueProcessingStage.NotScheduled,
267-
EventQueueProcessingStage.Determining);
268-
Debug.Assert(stageBeforeUpdate != EventQueueProcessingStage.NotScheduled);
269-
if (stageBeforeUpdate == EventQueueProcessingStage.Determining)
270-
{
271-
return;
272-
}
247+
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
273248
}
274-
275-
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
276249
}
277250

278251
void IThreadPoolWorkItem.Execute()
279252
{
280-
ConcurrentQueue<SocketIOEvent> eventQueue = _eventQueue;
281-
SocketIOEvent ev;
282-
while (true)
283-
{
284-
Debug.Assert(_eventQueueProcessingStage == EventQueueProcessingStage.Scheduled);
253+
// We are asking for one worker at a time, thus the state should be 1.
254+
Debug.Assert(_hasOutstandingThreadRequest == 1);
255+
_hasOutstandingThreadRequest = 0;
285256

286-
// The change needs to be visible to other threads that may request a worker thread before a work item is attempted
287-
// to be dequeued by the current thread. In particular, if an enqueuer queues a work item and does not request a
288-
// thread because it sees a Determining or Scheduled stage, and the current thread is the last thread processing
289-
// work items, the current thread must either see the work item queued by the enqueuer, or it must see a stage of
290-
// Scheduled, and try to dequeue again or request another thread.
291-
_eventQueueProcessingStage = EventQueueProcessingStage.Determining;
292-
Interlocked.MemoryBarrier();
257+
// Checking for items must happen after resetting the processing state.
258+
Interlocked.MemoryBarrier();
293259

294-
if (eventQueue.TryDequeue(out ev))
295-
{
296-
break;
297-
}
298-
299-
// The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining
300-
// otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer
301-
// would not have scheduled a work item to process the work, so try to dequeue a work item again.
302-
EventQueueProcessingStage stageBeforeUpdate =
303-
Interlocked.CompareExchange(
304-
ref _eventQueueProcessingStage,
305-
EventQueueProcessingStage.NotScheduled,
306-
EventQueueProcessingStage.Determining);
307-
Debug.Assert(stageBeforeUpdate != EventQueueProcessingStage.NotScheduled);
308-
if (stageBeforeUpdate == EventQueueProcessingStage.Determining)
309-
{
310-
return;
311-
}
260+
ConcurrentQueue<SocketIOEvent> eventQueue = _eventQueue;
261+
if (!eventQueue.TryDequeue(out SocketIOEvent ev))
262+
{
263+
return;
312264
}
313265

314-
UpdateEventQueueProcessingStage(eventQueue.IsEmpty);
266+
// The batch that is currently in the queue could have asked only for one worker.
267+
// We are going to process a workitem, which may take unknown time or even block.
268+
// In a worst case the current workitem will indirectly depend on progress of other
269+
// items and that would lead to a deadlock if no one else checks the queue.
270+
// We must ensure at least one more worker is coming if the queue is not empty.
271+
if (!eventQueue.IsEmpty)
272+
{
273+
EnsureWorkerScheduled();
274+
}
315275

316276
int startTimeMs = Environment.TickCount;
317277
do

0 commit comments

Comments
 (0)