Skip to content

Commit 1c3449e

Browse files
committed
CSHARP-5798: Implement test to track if any UnobservedTaskExceptions were raised while test run
1 parent 83e8b52 commit 1c3449e

File tree

3 files changed

+53
-54
lines changed

3 files changed

+53
-54
lines changed

src/MongoDB.Driver/Core/Connections/TcpStreamFactory.cs

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -167,17 +167,16 @@ private void ConfigureConnectedSocket(Socket socket)
167167

168168
private void Connect(Socket socket, EndPoint endPoint, CancellationToken cancellationToken)
169169
{
170-
var callbackState = new ConnectOperationState(socket);
170+
var callbackState = new OperationCallbackState<Socket>(socket);
171171
using var timeoutCancellationTokenSource = new CancellationTokenSource(_settings.ConnectTimeout);
172172
using var combinedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCancellationTokenSource.Token);
173173
using var cancellationSubscription = combinedCancellationTokenSource.Token.Register(state =>
174174
{
175-
var operationState = (ConnectOperationState)state;
176-
if (operationState.IsSucceeded)
175+
var operationState = (OperationCallbackState<Socket>)state;
176+
if (operationState.TryChangeStatusFromInProgress(OperationCallbackState<Socket>.OperationStatus.Interrupted))
177177
{
178-
return;
178+
DisposeSocket(operationState.Subject);
179179
}
180-
DisposeSocket(operationState.Socket);
181180
}, callbackState);
182181

183182
try
@@ -195,8 +194,10 @@ private void Connect(Socket socket, EndPoint endPoint, CancellationToken cancell
195194
#else
196195
socket.Connect(endPoint);
197196
#endif
198-
EnsureConnected(socket);
199-
callbackState.IsSucceeded = true;
197+
if (!callbackState.TryChangeStatusFromInProgress(OperationCallbackState<Socket>.OperationStatus.Done))
198+
{
199+
throw new ObjectDisposedException(nameof(Socket));
200+
}
200201
}
201202
catch
202203
{
@@ -222,23 +223,6 @@ static void DisposeSocket(Socket socket)
222223
// Ignore any exceptions.
223224
}
224225
}
225-
226-
static void EnsureConnected(Socket socket)
227-
{
228-
bool originalBlockingState = socket.Blocking;
229-
230-
try
231-
{
232-
socket.Blocking = false;
233-
// Try to use the socket to ensure it's connected. On MacOS with net6.0 sometimes Connect is completed successfully even after the socket disposal.
234-
socket.Send(__ensureConnectedBuffer, 0, 0);
235-
}
236-
finally
237-
{
238-
// Restore original blocking state
239-
socket.Blocking = originalBlockingState;
240-
}
241-
}
242226
}
243227

244228
private async Task ConnectAsync(Socket socket, EndPoint endPoint, CancellationToken cancellationToken)
@@ -401,10 +385,5 @@ public int Compare(EndPoint x, EndPoint y)
401385
return 0;
402386
}
403387
}
404-
405-
private sealed record ConnectOperationState(Socket Socket)
406-
{
407-
public bool IsSucceeded { get; set; }
408-
}
409388
}
410389
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/* Copyright 2010-present MongoDB Inc.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
using System.Threading;
17+
18+
namespace MongoDB.Driver.Core.Misc;
19+
20+
internal sealed class OperationCallbackState<T>(T subject)
21+
{
22+
private int _status = (int)OperationStatus.InProgress;
23+
24+
public OperationStatus Status => (OperationStatus)_status;
25+
public T Subject => subject;
26+
public bool TryChangeStatusFromInProgress(OperationStatus newState) =>
27+
Interlocked.CompareExchange(ref _status, (int)newState, (int)OperationStatus.InProgress) == (int)OperationStatus.InProgress;
28+
29+
public enum OperationStatus
30+
{
31+
InProgress = 0,
32+
Done,
33+
Interrupted,
34+
}
35+
}
36+
37+

src/MongoDB.Driver/Core/Misc/StreamExtensionMethods.cs

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -287,33 +287,33 @@ private static void ExecuteOperationWithTimeout<TState>(Stream stream, TState st
287287
throw new TimeoutException();
288288
}
289289

290-
StreamDisposeCallbackState callbackState = null;
290+
OperationCallbackState<Stream> callbackState = null;
291291
Timer timer = null;
292292
CancellationTokenRegistration cancellationSubscription = default;
293293
if (timeoutMs > 0)
294294
{
295-
callbackState = new StreamDisposeCallbackState(stream);
295+
callbackState = new OperationCallbackState<Stream>(stream);
296296
timer = new Timer(DisposeStreamCallback, callbackState, timeoutMs, Timeout.Infinite);
297297
}
298298

299299
if (cancellationToken.CanBeCanceled)
300300
{
301-
callbackState ??= new StreamDisposeCallbackState(stream);
301+
callbackState ??= new OperationCallbackState<Stream>(stream);
302302
cancellationSubscription = cancellationToken.Register(DisposeStreamCallback, callbackState);
303303
}
304304

305305
try
306306
{
307307
operation(stream, state);
308-
if (callbackState?.TryChangeStateFromInProgress(OperationState.Done) == false)
308+
if (callbackState?.TryChangeStatusFromInProgress(OperationCallbackState<Stream>.OperationStatus.Done) == false)
309309
{
310310
// If the state can't be changed - then the stream was/will be disposed, throw here
311311
throw new IOException();
312312
}
313313
}
314314
catch (Exception ex)
315315
{
316-
if (callbackState?.OperationState == OperationState.Interrupted)
316+
if (callbackState?.Status == OperationCallbackState<Stream>.OperationStatus.Interrupted)
317317
{
318318
cancellationToken.ThrowIfCancellationRequested();
319319
throw new TimeoutException();
@@ -334,39 +334,22 @@ private static void ExecuteOperationWithTimeout<TState>(Stream stream, TState st
334334

335335
static void DisposeStreamCallback(object state)
336336
{
337-
var disposeCallbackState = (StreamDisposeCallbackState)state;
338-
if (!disposeCallbackState.TryChangeStateFromInProgress(OperationState.Interrupted))
337+
var disposeCallbackState = (OperationCallbackState<Stream>)state;
338+
if (!disposeCallbackState.TryChangeStatusFromInProgress(OperationCallbackState<Stream>.OperationStatus.Interrupted))
339339
{
340340
// If the state can't be changed - then I/O had already succeeded
341341
return;
342342
}
343343

344344
try
345345
{
346-
disposeCallbackState.Stream.Dispose();
346+
disposeCallbackState.Subject.Dispose();
347347
}
348348
catch (Exception)
349349
{
350350
// callbacks should not fail, suppress any exceptions here
351351
}
352352
}
353353
}
354-
355-
private record StreamDisposeCallbackState(Stream Stream)
356-
{
357-
private int _operationState = (int)OperationState.InProgress;
358-
359-
public OperationState OperationState => (OperationState)_operationState;
360-
361-
public bool TryChangeStateFromInProgress(OperationState newState) =>
362-
Interlocked.CompareExchange(ref _operationState, (int)newState, (int)OperationState.InProgress) == (int)OperationState.InProgress;
363-
}
364-
365-
private enum OperationState
366-
{
367-
InProgress = 0,
368-
Done,
369-
Interrupted,
370-
}
371354
}
372355
}

0 commit comments

Comments
 (0)