2323
2424namespace MongoDB . Driver
2525{
26- internal class AggregateFluent < TDocument , TResult > : AggregateFluentBase < TResult >
26+ internal abstract class AggregateFluent < TInput , TResult > : AggregateFluentBase < TResult >
2727 {
2828 // fields
29- private readonly IMongoCollection < TDocument > _collection ;
30- private readonly AggregateOptions _options ;
31- private readonly PipelineDefinition < TDocument , TResult > _pipeline ;
32- private readonly IClientSessionHandle _session ;
29+ protected readonly AggregateOptions _options ;
30+ protected readonly PipelineDefinition < TInput , TResult > _pipeline ;
31+ protected readonly IClientSessionHandle _session ;
3332
3433 // constructors
35- public AggregateFluent ( IClientSessionHandle session , IMongoCollection < TDocument > collection , PipelineDefinition < TDocument , TResult > pipeline , AggregateOptions options )
34+ protected AggregateFluent ( IClientSessionHandle session , PipelineDefinition < TInput , TResult > pipeline , AggregateOptions options )
3635 {
3736 _session = session ; // can be null
38- _collection = Ensure . IsNotNull ( collection , nameof ( collection ) ) ;
3937 _pipeline = Ensure . IsNotNull ( pipeline , nameof ( pipeline ) ) ;
4038 _options = Ensure . IsNotNull ( options , nameof ( options ) ) ;
4139 }
4240
4341 // properties
44- public override IMongoDatabase Database
45- {
46- get { return _collection . Database ; }
47- }
48-
4942 public override AggregateOptions Options
5043 {
5144 get { return _options ; }
@@ -143,7 +136,7 @@ public override IAggregateFluent<TResult> Limit(int limit)
143136 public override IAggregateFluent < TNewResult > Lookup < TForeignDocument , TNewResult > ( string foreignCollectionName , FieldDefinition < TResult > localField , FieldDefinition < TForeignDocument > foreignField , FieldDefinition < TNewResult > @as , AggregateLookupOptions < TForeignDocument , TNewResult > options )
144137 {
145138 Ensure . IsNotNull ( foreignCollectionName , nameof ( foreignCollectionName ) ) ;
146- var foreignCollection = _collection . Database . GetCollection < TForeignDocument > ( foreignCollectionName ) ;
139+ var foreignCollection = Database . GetCollection < TForeignDocument > ( foreignCollectionName ) ;
147140 return WithPipeline ( _pipeline . Lookup ( foreignCollection , localField , foreignField , @as , options ) ) ;
148141 }
149142
@@ -218,7 +211,7 @@ public override IOrderedAggregateFluent<TResult> ThenBy(SortDefinition<TResult>
218211 var combinedSort = Builders < TResult > . Sort . Combine ( oldSort , newSort ) ;
219212 var combinedSortStage = PipelineStageDefinitionBuilder . Sort ( combinedSort ) ;
220213 stages [ stages . Count - 1 ] = combinedSortStage ;
221- var newPipeline = new PipelineStagePipelineDefinition < TDocument , TResult > ( stages ) ;
214+ var newPipeline = new PipelineStagePipelineDefinition < TInput , TResult > ( stages ) ;
222215 return ( IOrderedAggregateFluent < TResult > ) WithPipeline ( newPipeline ) ;
223216 }
224217
@@ -232,6 +225,34 @@ public override IAggregateFluent<TNewResult> Unwind<TNewResult>(FieldDefinition<
232225 return WithPipeline ( _pipeline . Unwind ( field , options ) ) ;
233226 }
234227
228+ public override string ToString ( )
229+ {
230+ return $ "aggregate({ _pipeline } )";
231+ }
232+
233+ protected abstract IAggregateFluent < TNewResult > WithPipeline < TNewResult > ( PipelineDefinition < TInput , TNewResult > pipeline ) ;
234+ }
235+
236+ internal class CollectionAggregateFluent < TDocument , TResult > : AggregateFluent < TDocument , TResult >
237+ {
238+ // private fields
239+ private readonly IMongoCollection < TDocument > _collection ;
240+
241+ // constructors
242+ public CollectionAggregateFluent (
243+ IClientSessionHandle session ,
244+ IMongoCollection < TDocument > collection ,
245+ PipelineDefinition < TDocument , TResult > pipeline ,
246+ AggregateOptions options )
247+ : base ( session , pipeline , options )
248+ {
249+ _collection = Ensure . IsNotNull ( collection , nameof ( collection ) ) ;
250+ }
251+
252+ // public properties
253+ public override IMongoDatabase Database => _collection . Database ;
254+
255+ // public methods
235256 public override IAsyncCursor < TResult > ToCursor ( CancellationToken cancellationToken )
236257 {
237258 if ( _session == null )
@@ -256,14 +277,61 @@ public override Task<IAsyncCursor<TResult>> ToCursorAsync(CancellationToken canc
256277 }
257278 }
258279
259- public override string ToString ( )
280+ // protected methods
281+ protected override IAggregateFluent < TNewResult > WithPipeline < TNewResult > ( PipelineDefinition < TDocument , TNewResult > pipeline )
260282 {
261- return $ "aggregate({ _pipeline } )";
283+ return new CollectionAggregateFluent < TDocument , TNewResult > ( _session , _collection , pipeline , _options ) ;
284+ }
285+ }
286+
287+ internal class DatabaseAggregateFluent < TResult > : AggregateFluent < NoPipelineInput , TResult >
288+ {
289+ // private fields
290+ private readonly IMongoDatabase _database ;
291+
292+ // constructors
293+ public DatabaseAggregateFluent (
294+ IClientSessionHandle session ,
295+ IMongoDatabase database ,
296+ PipelineDefinition < NoPipelineInput , TResult > pipeline ,
297+ AggregateOptions options )
298+ : base ( session , pipeline , options )
299+ {
300+ _database = Ensure . IsNotNull ( database , nameof ( database ) ) ;
301+ }
302+
303+ // public properties
304+ public override IMongoDatabase Database => _database ;
305+
306+ // public methods
307+ public override IAsyncCursor < TResult > ToCursor ( CancellationToken cancellationToken )
308+ {
309+ if ( _session == null )
310+ {
311+ return _database . Aggregate ( _pipeline , _options , cancellationToken ) ;
312+ }
313+ else
314+ {
315+ return _database . Aggregate ( _session , _pipeline , _options , cancellationToken ) ;
316+ }
317+ }
318+
319+ public override Task < IAsyncCursor < TResult > > ToCursorAsync ( CancellationToken cancellationToken )
320+ {
321+ if ( _session == null )
322+ {
323+ return _database . AggregateAsync ( _pipeline , _options , cancellationToken ) ;
324+ }
325+ else
326+ {
327+ return _database . AggregateAsync ( _session , _pipeline , _options , cancellationToken ) ;
328+ }
262329 }
263330
264- public IAggregateFluent < TNewResult > WithPipeline < TNewResult > ( PipelineDefinition < TDocument , TNewResult > pipeline )
331+ // protected methods
332+ protected override IAggregateFluent < TNewResult > WithPipeline < TNewResult > ( PipelineDefinition < NoPipelineInput , TNewResult > pipeline )
265333 {
266- return new AggregateFluent < TDocument , TNewResult > ( _session , _collection , pipeline , _options ) ;
334+ return new DatabaseAggregateFluent < TNewResult > ( _session , _database , pipeline , _options ) ;
267335 }
268336 }
269337}
0 commit comments