diff --git a/Source/EventFlow.MongoDB/ReadStores/MongoDbReadModelStore.cs b/Source/EventFlow.MongoDB/ReadStores/MongoDbReadModelStore.cs index d3c0f18e8..78a49baa6 100644 --- a/Source/EventFlow.MongoDB/ReadStores/MongoDbReadModelStore.cs +++ b/Source/EventFlow.MongoDB/ReadStores/MongoDbReadModelStore.cs @@ -182,17 +182,110 @@ public async Task UpdateAsync(IReadOnlyCollection readModelUpda { var readModelDescription = _readModelDescriptionProvider.GetReadModelDescription(); - foreach (var readModelUpdate in readModelUpdates) + if (readModelUpdates.Count == 1) { await _transientFaultHandler.TryAsync( - c => UpdateReadModelAsync(readModelDescription, readModelUpdate, readModelContextFactory, - updateReadModel, c), + c => UpdateReadModelAsync(readModelDescription, + readModelUpdates.First(), readModelContextFactory, updateReadModel, c), Label.Named("mongodb-read-model-update"), cancellationToken) .ConfigureAwait(false); + return; } + + + var modelIds = readModelUpdates + .Select(x => x.ReadModelId) + .ToList(); + + var collection = _mongoDatabase.GetCollection(readModelDescription.RootCollectionName.Value); + var filter = Builders.Filter.In(readModel => readModel.Id, modelIds); + var results = await collection.Find(filter).ToListAsync(cancellationToken: cancellationToken); + + await _transientFaultHandler + .TryAsync( + async c => + { + var envelopeTasks = readModelUpdates + .Select(async readModelUpdate => + { + var result = results + .FirstOrDefault(x => x.Id == readModelUpdate.ReadModelId); + + var hasResult = result != null; + var readModelEnvelope = hasResult + ? ReadModelEnvelope.With(readModelUpdate.ReadModelId, result) + : ReadModelEnvelope.Empty(readModelUpdate.ReadModelId); + + var readModelContext = readModelContextFactory + .Create(readModelUpdate.ReadModelId, hasResult); + var readModelUpdateResult = + await updateReadModel(readModelContext, readModelUpdate.DomainEvents, readModelEnvelope, + cancellationToken).ConfigureAwait(false); + + if (!readModelUpdateResult.IsModified) + { + return (EnvelopReadModel?)null; + } + + if (readModelContext.IsMarkedForDeletion) + { + await DeleteAsync(readModelUpdate.ReadModelId, c); + return null; + } + + readModelEnvelope = readModelUpdateResult.Envelope; + var originalVersion = readModelEnvelope.ReadModel.Version; + readModelEnvelope.ReadModel.Version = readModelEnvelope.Version; + + return new EnvelopReadModel + { + ReadModelEnvelope = readModelEnvelope, + OriginalVersion = originalVersion + }; + }) + .ToList(); + + var envelopes = await Task.WhenAll(envelopeTasks).ConfigureAwait(false); + var nonNullEnvelopes = envelopes + .Where(x => x.HasValue) + .Select(x => x.Value); + + try + { + await collection + .BulkWriteAsync(nonNullEnvelopes + .Select(x => + { + var filterId = Builders.Filter + .Eq(y => y.Id, x.ReadModelEnvelope.ReadModelId); + var filterVer = Builders.Filter + .Eq(y => y.Version, x.OriginalVersion); + var replaceOneModel = new ReplaceOneModel( + Builders.Filter.And(filterId, filterVer), + x.ReadModelEnvelope.ReadModel) { IsUpsert = true }; + + return replaceOneModel; + }), cancellationToken: c); + } + catch (MongoWriteException e) + { + throw new OptimisticConcurrencyException( + $"Read model ?? updated by another", + e); + } + }, + Label.Named("mongodb-read-model-update"), + cancellationToken) + .ConfigureAwait(false); } + private struct EnvelopReadModel + { + public ReadModelEnvelope ReadModelEnvelope { get; set; } + public long? OriginalVersion { get; set; } + } + public IQueryable AsQueryable() { var readModelDescription = _readModelDescriptionProvider.GetReadModelDescription();