@@ -16,10 +16,12 @@ public class Server
1616 {
1717 private readonly ServerBuilder _builder ;
1818 private readonly ILogger < Server > _logger ;
19- private readonly ConcurrentDictionary < EndPoint , RunningListener > _listeners = new ConcurrentDictionary < EndPoint , RunningListener > ( ) ;
19+ private readonly Dictionary < EndPoint , RunningListener > _listeners = new Dictionary < EndPoint , RunningListener > ( ) ;
2020 private readonly TaskCompletionSource < object > _shutdownTcs = new TaskCompletionSource < object > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
2121 private readonly TimerAwaitable _timerAwaitable ;
22+ private readonly SemaphoreSlim _listenerSemaphore = new SemaphoreSlim ( initialCount : 1 ) ;
2223 private Task _timerTask = Task . CompletedTask ;
24+ private int _stopping ;
2325
2426 internal Server ( ServerBuilder builder )
2527 {
@@ -75,59 +77,55 @@ private async Task StartTimerAsync()
7577
7678 public async Task StopAsync ( CancellationToken cancellationToken = default )
7779 {
78- // TODO: Svar: Oh, you're right! Didn't think about that :) I don't often come across areas where I have to think about thread safety in my daily work, but I've given it a go here. Let me know if there are any gotchas or something special to look out for...
79- // TODO: Question: When stopping the server, do we need to prevent new listeners from being added?
80- // TODO: Question: The "take values and clear" isn't an atomic operation. Is that ok?
81- // TODO: Question: Should Add/RemoveSocketListenerAsync take Endpoint as input?
82-
83- // Svar: To be honest, I'm not really sure how to solve this the best way. I started out with a ConcurrentDictionary
84- // for the RunningListeners, but I ended up with an issue where I can't guarantee that RunningListeners aren't added/removed
85- // while stopping the server. So I went for a simple locking instead. This way, the server will have to wait until any
86- // Add/Remove operation has finished. But it would have felt better if I could have solved it with the ConcurrentDictionary
87- // and no locking...
88-
89- // TODO: KOLLA MED ENDPOINTPOOL OCH LOCKING
90- // Get the listeners and clear the ConcurrentDictionary, so that they can't be touched anywhere else
91- // TODO: lock or something else that we check in add/remove
92- // TODO: Om jag kör lock, behöver jag kanske inte ha concurrentdictionary heller?
93- // TODO: Får inte kör StopAsync om det pågår en Add/RemoveSockerListener
94- var listeners = _listeners . Values . ToList ( ) ;
95- _listeners . Clear ( ) ;
96-
97- var tasks = new Task [ listeners . Count ] ;
98-
99- for ( int i = 0 ; i < listeners . Count ; i ++ )
80+ if ( Interlocked . Exchange ( ref _stopping , 1 ) == 1 )
10081 {
101- tasks [ i ] = listeners [ i ] . Listener . UnbindAsync ( cancellationToken ) . AsTask ( ) ;
82+ return ;
10283 }
10384
104- await Task . WhenAll ( tasks ) . ConfigureAwait ( false ) ;
85+ await _listenerSemaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
86+ try
87+ {
88+ var listeners = _listeners . Values . ToList ( ) ;
10589
106- // Signal to all of the listeners that it's time to start the shutdown process
107- // We call this after unbind so that we're not touching the listener anymore (each loop will dispose the listener)
108- _shutdownTcs . TrySetResult ( null ) ;
90+ var tasks = new Task [ listeners . Count ] ;
10991
110- for ( int i = 0 ; i < listeners . Count ; i ++ )
111- {
112- tasks [ i ] = listeners [ i ] . ExecutionTask ;
113- }
92+ for ( int i = 0 ; i < listeners . Count ; i ++ )
93+ {
94+ tasks [ i ] = listeners [ i ] . Listener . UnbindAsync ( cancellationToken ) . AsTask ( ) ;
95+ }
11496
115- var shutdownTask = Task . WhenAll ( tasks ) ;
97+ await Task . WhenAll ( tasks ) . ConfigureAwait ( false ) ;
11698
117- if ( cancellationToken . CanBeCanceled )
118- {
119- await shutdownTask . WithCancellation ( cancellationToken ) . ConfigureAwait ( false ) ;
120- }
121- else
122- {
123- await shutdownTask . ConfigureAwait ( false ) ;
124- }
99+ // Signal to all of the listeners that it's time to start the shutdown process
100+ // We call this after unbind so that we're not touching the listener anymore (each loop will dispose the listener)
101+ _shutdownTcs . TrySetResult ( null ) ;
125102
126- if ( _timerAwaitable != null )
127- {
128- _timerAwaitable . Stop ( ) ;
103+ for ( int i = 0 ; i < listeners . Count ; i ++ )
104+ {
105+ tasks [ i ] = listeners [ i ] . ExecutionTask ;
106+ }
107+
108+ var shutdownTask = Task . WhenAll ( tasks ) ;
109+
110+ if ( cancellationToken . CanBeCanceled )
111+ {
112+ await shutdownTask . WithCancellation ( cancellationToken ) . ConfigureAwait ( false ) ;
113+ }
114+ else
115+ {
116+ await shutdownTask . ConfigureAwait ( false ) ;
117+ }
118+
119+ if ( _timerAwaitable != null )
120+ {
121+ _timerAwaitable . Stop ( ) ;
129122
130- await _timerTask . ConfigureAwait ( false ) ;
123+ await _timerTask . ConfigureAwait ( false ) ;
124+ }
125+ }
126+ finally
127+ {
128+ _listenerSemaphore . Release ( ) ;
131129 }
132130 }
133131
@@ -144,39 +142,67 @@ public Task AddSocketListenerAsync(EndPoint endpoint, Action<IConnectionBuilder>
144142
145143 public async Task RemoveSocketListener ( EndPoint endpoint , CancellationToken cancellationToken = default )
146144 {
147- if ( ! _listeners . TryRemove ( endpoint , out var listener ) )
145+ await _listenerSemaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
146+
147+ if ( _stopping == 1 )
148148 {
149- return ;
149+ throw new InvalidOperationException ( "The server has already been stopped." ) ;
150150 }
151151
152- await listener . Listener . UnbindAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
152+ try
153+ {
154+ if ( ! _listeners . Remove ( endpoint , out var listener ) )
155+ {
156+ return ;
157+ }
158+
159+ await listener . Listener . UnbindAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
153160
154- // Signal to the listener that it's time to start the shutdown process
155- // We call this after unbind so that we're not touching the listener anymore
156- listener . ShutdownTcs . TrySetResult ( null ) ;
161+ // Signal to the listener that it's time to start the shutdown process
162+ // We call this after unbind so that we're not touching the listener anymore
163+ listener . ShutdownTcs . TrySetResult ( null ) ;
157164
158- if ( cancellationToken . CanBeCanceled )
159- {
160- await listener . ExecutionTask . WithCancellation ( cancellationToken ) . ConfigureAwait ( false ) ;
165+ if ( cancellationToken . CanBeCanceled )
166+ {
167+ await listener . ExecutionTask . WithCancellation ( cancellationToken ) . ConfigureAwait ( false ) ;
168+ }
169+ else
170+ {
171+ await listener . ExecutionTask . ConfigureAwait ( false ) ;
172+ }
161173 }
162- else
174+ finally
163175 {
164- await listener . ExecutionTask . ConfigureAwait ( false ) ;
176+ _listenerSemaphore . Release ( ) ;
165177 }
166178 }
167179
168180 private async Task StartRunningListenersAsync ( ServerBinding binding , CancellationToken cancellationToken = default )
169181 {
170- await foreach ( var listener in binding . BindAsync ( cancellationToken ) . ConfigureAwait ( false ) )
182+ await _listenerSemaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
183+
184+ if ( _stopping == 1 )
185+ {
186+ throw new InvalidOperationException ( "The server has already been stopped." ) ;
187+ }
188+
189+ try
171190 {
172- var runningListener = new RunningListener ( this , binding , listener ) ;
173- if ( ! _listeners . TryAdd ( runningListener . Listener . EndPoint , runningListener ) )
191+ await foreach ( var listener in binding . BindAsync ( cancellationToken ) . ConfigureAwait ( false ) )
174192 {
175- _logger . LogWarning ( "Will not start RunningListener, EndPoint already exist" ) ;
176- continue ;
177- }
193+ var runningListener = new RunningListener ( this , binding , listener ) ;
194+ if ( ! _listeners . TryAdd ( runningListener . Listener . EndPoint , runningListener ) )
195+ {
196+ _logger . LogWarning ( "Will not start RunningListener, EndPoint already exist" ) ;
197+ continue ;
198+ }
178199
179- runningListener . Start ( ) ;
200+ runningListener . Start ( ) ;
201+ }
202+ }
203+ finally
204+ {
205+ _listenerSemaphore . Release ( ) ;
180206 }
181207 }
182208
0 commit comments