@@ -46,7 +46,7 @@ public static ReadPreferenceServerSelector Primary
4646 #endregion
4747
4848 // fields
49- private readonly TimeSpan ? _maxStaleness ; // with Zero and InfiniteTimespan converted to null
49+ private readonly TimeSpan ? _maxStaleness ; // with InfiniteTimespan converted to null
5050 private readonly ReadPreference _readPreference ;
5151
5252 // constructors
@@ -57,7 +57,7 @@ public static ReadPreferenceServerSelector Primary
5757 public ReadPreferenceServerSelector ( ReadPreference readPreference )
5858 {
5959 _readPreference = Ensure . IsNotNull ( readPreference , nameof ( readPreference ) ) ;
60- if ( readPreference . MaxStaleness == TimeSpan . Zero || readPreference . MaxStaleness == Timeout . InfiniteTimeSpan )
60+ if ( readPreference . MaxStaleness == Timeout . InfiniteTimeSpan )
6161 {
6262 _maxStaleness = null ;
6363 }
@@ -132,53 +132,42 @@ private IEnumerable<ServerDescription> SelectByTagSets(IEnumerable<ServerDescrip
132132
133133 private IEnumerable < ServerDescription > SelectForReplicaSet ( ClusterDescription cluster , IEnumerable < ServerDescription > servers )
134134 {
135- if ( _maxStaleness . HasValue )
136- {
137- var minHeartBeatIntervalTicks = servers . Select ( s => s . HeartbeatInterval . Ticks ) . Min ( ) ;
138- if ( _maxStaleness . Value . Ticks < 2 * minHeartBeatIntervalTicks )
139- {
140- throw new MongoClientException ( "MaxStaleness must be at least twice the heartbeat interval." ) ;
141- }
135+ EnsureMaxStalenessIsValid ( cluster ) ;
142136
143- servers = new CachedEnumerable < ServerDescription > ( SelectFreshServers ( cluster , servers ) ) ; // prevent multiple enumeration
144- }
145- else
146- {
147- servers = new CachedEnumerable < ServerDescription > ( servers ) ; // prevent multiple enumeration
148- }
137+ servers = new CachedEnumerable < ServerDescription > ( servers ) ; // prevent multiple enumeration
149138
150139 switch ( _readPreference . ReadPreferenceMode )
151140 {
152141 case ReadPreferenceMode . Primary :
153- return servers . Where ( n => n . Type == ServerType . ReplicaSetPrimary ) ;
142+ return SelectPrimary ( servers ) ;
154143
155144 case ReadPreferenceMode . PrimaryPreferred :
156- var primary = servers . FirstOrDefault ( n => n . Type == ServerType . ReplicaSetPrimary ) ;
157- if ( primary != null )
145+ var primary = SelectPrimary ( servers ) ;
146+ if ( primary . Count != 0 )
158147 {
159- return new [ ] { primary } ;
148+ return primary ;
160149 }
161150 else
162151 {
163- return SelectByTagSets ( servers . Where ( n => n . Type == ServerType . ReplicaSetSecondary ) ) ;
152+ return SelectByTagSets ( SelectFreshSecondaries ( cluster , servers ) ) ;
164153 }
165154
166155 case ReadPreferenceMode . Secondary :
167- return SelectByTagSets ( servers . Where ( n => n . Type == ServerType . ReplicaSetSecondary ) ) ;
156+ return SelectByTagSets ( SelectFreshSecondaries ( cluster , servers ) ) ;
168157
169158 case ReadPreferenceMode . SecondaryPreferred :
170- var matchingSecondaries = SelectByTagSets ( servers . Where ( n => n . Type == ServerType . ReplicaSetSecondary ) ) . ToList ( ) ;
171- if ( matchingSecondaries . Count != 0 )
159+ var selectedSecondaries = SelectByTagSets ( SelectFreshSecondaries ( cluster , servers ) ) . ToList ( ) ;
160+ if ( selectedSecondaries . Count != 0 )
172161 {
173- return matchingSecondaries ;
162+ return selectedSecondaries ;
174163 }
175164 else
176165 {
177- return servers . Where ( n => n . Type == ServerType . ReplicaSetPrimary ) ;
166+ return SelectPrimary ( servers ) ;
178167 }
179168
180169 case ReadPreferenceMode . Nearest :
181- return SelectByTagSets ( servers . Where ( n => n . Type == ServerType . ReplicaSetPrimary || n . Type == ServerType . ReplicaSetSecondary ) ) ;
170+ return SelectByTagSets ( SelectPrimary ( servers ) . Concat ( SelectFreshSecondaries ( cluster , servers ) ) ) ;
182171
183172 default :
184173 throw new ArgumentException ( "Invalid ReadPreferenceMode." ) ;
@@ -195,44 +184,93 @@ private IEnumerable<ServerDescription> SelectForStandaloneCluster(IEnumerable<Se
195184 return servers . Where ( n => n . Type == ServerType . Standalone ) ; // standalone servers match any ReadPreference (to facilitate testing)
196185 }
197186
198- private IReadOnlyList < ServerDescription > SelectFreshServers ( ClusterDescription cluster , IEnumerable < ServerDescription > servers )
187+ private List < ServerDescription > SelectPrimary ( IEnumerable < ServerDescription > servers )
199188 {
200- var primary = cluster . Servers . SingleOrDefault ( s => s . Type == ServerType . ReplicaSetPrimary ) ;
201- if ( primary == null )
189+ var primary = servers . Where ( s => s . Type == ServerType . ReplicaSetPrimary ) . ToList ( ) ;
190+ if ( primary . Count > 1 )
202191 {
203- return SelectFreshServersWithNoPrimary ( cluster , servers ) ;
192+ throw new MongoClientException ( $ "More than one primary found: [{ string . Join ( ", " , servers . Select ( s => s . ToString ( ) ) ) } ].") ;
193+ }
194+ return primary ; // returned as a list because otherwise some callers would have to create a new list
195+ }
196+
197+ private IEnumerable < ServerDescription > SelectSecondaries ( IEnumerable < ServerDescription > servers )
198+ {
199+ return servers . Where ( s => s . Type == ServerType . ReplicaSetSecondary ) ;
200+ }
201+
202+ private IEnumerable < ServerDescription > SelectFreshSecondaries ( ClusterDescription cluster , IEnumerable < ServerDescription > servers )
203+ {
204+ var secondaries = SelectSecondaries ( servers ) ;
205+
206+ if ( _maxStaleness . HasValue )
207+ {
208+ var primary = SelectPrimary ( cluster . Servers ) . SingleOrDefault ( ) ;
209+ if ( primary == null )
210+ {
211+ return SelectFreshSecondariesWithNoPrimary ( secondaries ) ;
212+ }
213+ else
214+ {
215+
216+ return SelectFreshSecondariesWithPrimary ( primary , secondaries ) ;
217+ }
204218 }
205219 else
206220 {
207- return SelectFreshServersWithPrimary ( cluster , primary , servers ) ;
221+ return secondaries ;
208222 }
209223 }
210224
211- private IReadOnlyList < ServerDescription > SelectFreshServersWithNoPrimary ( ClusterDescription cluster , IEnumerable < ServerDescription > servers )
225+ private IEnumerable < ServerDescription > SelectFreshSecondariesWithNoPrimary ( IEnumerable < ServerDescription > secondaries )
212226 {
213- var smax = servers
214- . Where ( s => s . Type == ServerType . ReplicaSetSecondary )
227+ var smax = secondaries
215228 . OrderByDescending ( s => s . LastWriteTimestamp )
216229 . FirstOrDefault ( ) ;
217- return servers
230+ if ( smax == null )
231+ {
232+ return Enumerable . Empty < ServerDescription > ( ) ;
233+ }
234+
235+ return secondaries
218236 . Where ( s =>
219237 {
220238 var estimatedStaleness = smax . LastWriteTimestamp . Value - s . LastWriteTimestamp . Value + s . HeartbeatInterval ;
221239 return estimatedStaleness <= _maxStaleness ;
222- } )
223- . ToList ( ) ;
240+ } ) ;
224241 }
225242
226- private IReadOnlyList < ServerDescription > SelectFreshServersWithPrimary ( ClusterDescription cluster , ServerDescription primary , IEnumerable < ServerDescription > servers )
243+ private IEnumerable < ServerDescription > SelectFreshSecondariesWithPrimary ( ServerDescription primary , IEnumerable < ServerDescription > secondaries )
227244 {
228245 var p = primary ;
229- return servers
246+ return secondaries
230247 . Where ( s =>
231- {
248+ {
232249 var estimatedStaleness = ( s . LastUpdateTimestamp - s . LastWriteTimestamp . Value ) - ( p . LastUpdateTimestamp - p . LastWriteTimestamp . Value ) + s . HeartbeatInterval ;
233250 return estimatedStaleness <= _maxStaleness ;
234- } )
235- . ToList ( ) ;
251+ } ) ;
252+ }
253+
254+ private void EnsureMaxStalenessIsValid ( ClusterDescription cluster )
255+ {
256+ if ( _maxStaleness . HasValue )
257+ {
258+ var primary = SelectPrimary ( cluster . Servers ) . SingleOrDefault ( ) ;
259+ var primaryIdleWritePeriod = primary ? . IdleWritePeriod ;
260+
261+ foreach ( var server in cluster . Servers )
262+ {
263+ if ( server . Type == ServerType . ReplicaSetPrimary || server . Type == ServerType . ReplicaSetSecondary )
264+ {
265+ var heartbeatInterval = server . HeartbeatInterval ;
266+ var idleWriteTime = primaryIdleWritePeriod ?? server . IdleWritePeriod ;
267+ if ( _maxStaleness . Value < heartbeatInterval + idleWriteTime )
268+ {
269+ throw new Exception ( "Max staleness must greater than or equal to heartbeat interval plus idle write period." ) ;
270+ }
271+ }
272+ }
273+ }
236274 }
237275 }
238276}
0 commit comments