11using System ;
22using System . Collections . Concurrent ;
33using System . Collections . Generic ;
4+ using System . Linq ;
45using System . Net ;
56using System . Threading ;
67using System . Threading . Tasks ;
78using Microsoft . AspNetCore . Connections ;
9+ using Microsoft . AspNetCore . Server . Kestrel . Transport . Sockets ;
810using Microsoft . Extensions . Logging ;
11+ using Microsoft . Extensions . Options ;
912
1013namespace Bedrock . Framework
1114{
1215 public class Server
1316 {
1417 private readonly ServerBuilder _builder ;
1518 private readonly ILogger < Server > _logger ;
16- private readonly List < RunningListener > _listeners = new List < RunningListener > ( ) ;
19+ private readonly Dictionary < EndPoint , RunningListener > _listeners = new Dictionary < EndPoint , RunningListener > ( ) ;
1720 private readonly TaskCompletionSource < object > _shutdownTcs = new TaskCompletionSource < object > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
1821 private readonly TimerAwaitable _timerAwaitable ;
22+ private readonly SemaphoreSlim _listenerSemaphore = new SemaphoreSlim ( initialCount : 1 ) ;
1923 private Task _timerTask = Task . CompletedTask ;
24+ private int _stopping ;
2025
2126 internal Server ( ServerBuilder builder )
2227 {
@@ -29,7 +34,7 @@ public IEnumerable<EndPoint> EndPoints
2934 {
3035 get
3136 {
32- foreach ( var listener in _listeners )
37+ foreach ( var listener in _listeners . Values )
3338 {
3439 yield return listener . Listener . EndPoint ;
3540 }
@@ -42,12 +47,7 @@ public async Task StartAsync(CancellationToken cancellationToken = default)
4247 {
4348 foreach ( var binding in _builder . Bindings )
4449 {
45- await foreach ( var listener in binding . BindAsync ( cancellationToken ) . ConfigureAwait ( false ) )
46- {
47- var runningListener = new RunningListener ( this , binding , listener ) ;
48- _listeners . Add ( runningListener ) ;
49- runningListener . Start ( ) ;
50- }
50+ await StartRunningListenersAsync ( binding , cancellationToken ) . ConfigureAwait ( false ) ;
5151 }
5252 }
5353 catch
@@ -67,7 +67,7 @@ private async Task StartTimerAsync()
6767 {
6868 while ( await _timerAwaitable )
6969 {
70- foreach ( var listener in _listeners )
70+ foreach ( var listener in _listeners . Values )
7171 {
7272 listener . TickHeartbeat ( ) ;
7373 }
@@ -77,40 +77,132 @@ private async Task StartTimerAsync()
7777
7878 public async Task StopAsync ( CancellationToken cancellationToken = default )
7979 {
80- var tasks = new Task [ _listeners . Count ] ;
80+ if ( Interlocked . Exchange ( ref _stopping , 1 ) == 1 )
81+ {
82+ return ;
83+ }
84+
85+ await _listenerSemaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
86+ try
87+ {
88+ var listeners = _listeners . Values . ToList ( ) ;
89+
90+ var tasks = new Task [ listeners . Count ] ;
91+
92+ for ( int i = 0 ; i < listeners . Count ; i ++ )
93+ {
94+ tasks [ i ] = listeners [ i ] . Listener . UnbindAsync ( cancellationToken ) . AsTask ( ) ;
95+ }
96+
97+ await Task . WhenAll ( tasks ) . ConfigureAwait ( false ) ;
98+
99+ // Signal to all of the listeners that it's time to start the shutdown process
100+ // We call this after unbind so that we're not touching the listener anymore (each loop will dispose the listener)
101+ _shutdownTcs . TrySetResult ( null ) ;
102+
103+ for ( int i = 0 ; i < listeners . Count ; i ++ )
104+ {
105+ tasks [ i ] = listeners [ i ] . ExecutionTask ;
106+ }
107+
108+ var shutdownTask = Task . WhenAll ( tasks ) ;
109+
110+ if ( cancellationToken . CanBeCanceled )
111+ {
112+ await shutdownTask . WithCancellation ( cancellationToken ) . ConfigureAwait ( false ) ;
113+ }
114+ else
115+ {
116+ await shutdownTask . ConfigureAwait ( false ) ;
117+ }
118+
119+ if ( _timerAwaitable != null )
120+ {
121+ _timerAwaitable . Stop ( ) ;
81122
82- for ( int i = 0 ; i < _listeners . Count ; i ++ )
123+ await _timerTask . ConfigureAwait ( false ) ;
124+ }
125+ }
126+ finally
83127 {
84- tasks [ i ] = _listeners [ i ] . Listener . UnbindAsync ( cancellationToken ) . AsTask ( ) ;
128+ _listenerSemaphore . Release ( ) ;
85129 }
130+ }
86131
87- await Task . WhenAll ( tasks ) . ConfigureAwait ( false ) ;
132+ public Task AddSocketListenerAsync ( EndPoint endpoint , Action < IConnectionBuilder > configure , CancellationToken cancellationToken = default )
133+ {
134+ var socketTransportFactory = new SocketTransportFactory ( Options . Create ( new SocketTransportOptions ( ) ) , _builder . ApplicationServices . GetLoggerFactory ( ) ) ;
135+ var connectionBuilder = new ConnectionBuilder ( _builder . ApplicationServices ) ;
88136
89- // Signal to all of the listeners that it's time to start the shutdown process
90- // We call this after unbind so that we're not touching the listener anymore (each loop will dispose the listener)
91- _shutdownTcs . TrySetResult ( null ) ;
137+ configure ( connectionBuilder ) ;
92138
93- for ( int i = 0 ; i < _listeners . Count ; i ++ )
139+ var binding = new EndPointBinding ( endpoint , connectionBuilder . Build ( ) , socketTransportFactory ) ;
140+ return StartRunningListenersAsync ( binding , cancellationToken ) ;
141+ }
142+
143+ public async Task RemoveSocketListener ( EndPoint endpoint , CancellationToken cancellationToken = default )
144+ {
145+ await _listenerSemaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
146+
147+ if ( _stopping == 1 )
94148 {
95- tasks [ i ] = _listeners [ i ] . ExecutionTask ;
149+ throw new InvalidOperationException ( "The server has already been stopped." ) ;
96150 }
97151
98- var shutdownTask = Task . WhenAll ( tasks ) ;
152+ try
153+ {
154+ if ( ! _listeners . Remove ( endpoint , out var listener ) )
155+ {
156+ return ;
157+ }
158+
159+ await listener . Listener . UnbindAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
99160
100- if ( cancellationToken . CanBeCanceled )
161+ // Signal to the listener that it's time to start the shutdown process
162+ // We call this after unbind so that we're not touching the listener anymore
163+ listener . ShutdownTcs . TrySetResult ( null ) ;
164+
165+ if ( cancellationToken . CanBeCanceled )
166+ {
167+ await listener . ExecutionTask . WithCancellation ( cancellationToken ) . ConfigureAwait ( false ) ;
168+ }
169+ else
170+ {
171+ await listener . ExecutionTask . ConfigureAwait ( false ) ;
172+ }
173+ }
174+ finally
101175 {
102- await shutdownTask . WithCancellation ( cancellationToken ) . ConfigureAwait ( false ) ;
176+ _listenerSemaphore . Release ( ) ;
103177 }
104- else
178+ }
179+
180+ private async Task StartRunningListenersAsync ( ServerBinding binding , CancellationToken cancellationToken = default )
181+ {
182+ await _listenerSemaphore . WaitAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
183+
184+ if ( _stopping == 1 )
105185 {
106- await shutdownTask . ConfigureAwait ( false ) ;
186+ throw new InvalidOperationException ( "The server has already been stopped." ) ;
107187 }
108188
109- if ( _timerAwaitable != null )
189+ try
110190 {
111- _timerAwaitable . Stop ( ) ;
191+ await foreach ( var listener in binding . BindAsync ( cancellationToken ) . ConfigureAwait ( false ) )
192+ {
193+ var runningListener = new RunningListener ( this , binding , listener ) ;
194+ if ( ! _listeners . TryAdd ( runningListener . Listener . EndPoint , runningListener ) )
195+ {
196+ _logger . LogWarning ( "Will not start RunningListener, EndPoint already exist" ) ;
197+ continue ;
198+ }
112199
113- await _timerTask . ConfigureAwait ( false ) ;
200+ runningListener . Start ( ) ;
201+ }
202+ }
203+ finally
204+ {
205+ _listenerSemaphore . Release ( ) ;
114206 }
115207 }
116208
@@ -130,10 +222,12 @@ public RunningListener(Server server, ServerBinding binding, IConnectionListener
130222 public void Start ( )
131223 {
132224 ExecutionTask = RunListenerAsync ( ) ;
225+ ShutdownTcs = new TaskCompletionSource < object > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
133226 }
134227
135228 public IConnectionListener Listener { get ; }
136229 public Task ExecutionTask { get ; private set ; }
230+ public TaskCompletionSource < object > ShutdownTcs { get ; private set ; }
137231
138232 public void TickHeartbeat ( )
139233 {
@@ -215,8 +309,11 @@ async Task ExecuteConnectionAsync(ServerConnection serverConnection)
215309 id ++ ;
216310 }
217311
218- // Don't shut down connections until entire server is shutting down
219- await _server . _shutdownTcs . Task . ConfigureAwait ( false ) ;
312+ // Don't shut down connections until this listener or the entire server is shutting down
313+ await Task . WhenAny (
314+ ShutdownTcs . Task ,
315+ _server . _shutdownTcs . Task )
316+ . ConfigureAwait ( false ) ;
220317
221318 // Give connections a chance to close gracefully
222319 var tasks = new List < Task > ( _connections . Count ) ;
@@ -241,7 +338,6 @@ async Task ExecuteConnectionAsync(ServerConnection serverConnection)
241338 await listener . DisposeAsync ( ) . ConfigureAwait ( false ) ;
242339 }
243340
244-
245341 private IDisposable BeginConnectionScope ( ServerConnection connection )
246342 {
247343 if ( _server . _logger . IsEnabled ( LogLevel . Critical ) )
@@ -253,4 +349,4 @@ private IDisposable BeginConnectionScope(ServerConnection connection)
253349 }
254350 }
255351 }
256- }
352+ }
0 commit comments