From ffdd2be1c60a01e7cf158fba7e1a610e240aa850 Mon Sep 17 00:00:00 2001 From: Dmytro Aleksandrov Date: Tue, 7 Feb 2012 14:50:43 +0200 Subject: [PATCH 1/5] added possibility to resolve saga instances via external factory (IoC container) --- .../EventStoreRepository.cs | 230 +++++++++--------- .../SagaEventStoreRepository.cs | 18 +- .../CommonDomain.Persistence.csproj | 1 + .../IConstructSagas.cs | 12 + .../ISagaRepository.cs | 2 +- 5 files changed, 139 insertions(+), 124 deletions(-) create mode 100644 src/proj/CommonDomain.Persistence/IConstructSagas.cs diff --git a/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs b/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs index 89e101c..65eb5f4 100644 --- a/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs +++ b/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs @@ -1,48 +1,48 @@ -namespace CommonDomain.Persistence.EventStore -{ - using System; - using System.Collections.Generic; - using System.Linq; - using global::EventStore; - using global::EventStore.Persistence; - - public class EventStoreRepository : IRepository, IDisposable - { - private const string AggregateTypeHeader = "AggregateType"; - private readonly IDictionary snapshots = new Dictionary(); - private readonly IDictionary streams = new Dictionary(); - private readonly IStoreEvents eventStore; - private readonly IConstructAggregates factory; - private readonly IDetectConflicts conflictDetector; - - public EventStoreRepository( - IStoreEvents eventStore, - IConstructAggregates factory, - IDetectConflicts conflictDetector) - { - this.eventStore = eventStore; - this.factory = factory; - this.conflictDetector = conflictDetector; - } - - public void Dispose() - { - this.Dispose(true); - GC.SuppressFinalize(this); - } - protected virtual void Dispose(bool disposing) - { - if (!disposing) - return; - - lock (this.streams) - { - foreach (var stream in this.streams) - stream.Value.Dispose(); - - this.snapshots.Clear(); - this.streams.Clear(); - } +namespace CommonDomain.Persistence.EventStore +{ + using System; + using System.Collections.Generic; + using System.Linq; + using global::EventStore; + using global::EventStore.Persistence; + + public class EventStoreRepository : IRepository, IDisposable + { + private const string AggregateTypeHeader = "AggregateType"; + private readonly IDictionary snapshots = new Dictionary(); + private readonly IDictionary streams = new Dictionary(); + private readonly IStoreEvents eventStore; + private readonly IConstructAggregates factory; + private readonly IDetectConflicts conflictDetector; + + public EventStoreRepository( + IStoreEvents eventStore, + IConstructAggregates factory, + IDetectConflicts conflictDetector) + { + this.eventStore = eventStore; + this.factory = factory; + this.conflictDetector = conflictDetector; + } + + public void Dispose() + { + this.Dispose(true); + GC.SuppressFinalize(this); + } + protected virtual void Dispose(bool disposing) + { + if (!disposing) + return; + + lock (this.streams) + { + foreach (var stream in this.streams) + stream.Value.Dispose(); + + this.snapshots.Clear(); + this.streams.Clear(); + } } public virtual TAggregate GetById(Guid id) where TAggregate : class, IAggregate @@ -50,48 +50,48 @@ public virtual TAggregate GetById(Guid id) where TAggregate : class, return GetById(id, int.MaxValue); } - public virtual TAggregate GetById(Guid id, int versionToLoad) where TAggregate : class, IAggregate - { - var snapshot = this.GetSnapshot(id, versionToLoad); - var stream = this.OpenStream(id, versionToLoad, snapshot); - var aggregate = this.GetAggregate(snapshot, stream); - - ApplyEventsToAggregate(versionToLoad, stream, aggregate); - - return aggregate as TAggregate; - } - private static void ApplyEventsToAggregate(int versionToLoad, IEventStream stream, IAggregate aggregate) - { - if (versionToLoad == 0 || aggregate.Version < versionToLoad) - foreach (var @event in stream.CommittedEvents.Select(x => x.Body)) - aggregate.ApplyEvent(@event); - } - private IAggregate GetAggregate(Snapshot snapshot, IEventStream stream) - { - var memento = snapshot == null ? null : snapshot.Payload as IMemento; - return this.factory.Build(typeof(TAggregate), stream.StreamId, memento); - } - private Snapshot GetSnapshot(Guid id, int version) - { - Snapshot snapshot; - if (!this.snapshots.TryGetValue(id, out snapshot)) - this.snapshots[id] = snapshot = this.eventStore.Advanced.GetSnapshot(id, version); - - return snapshot; - } - private IEventStream OpenStream(Guid id, int version, Snapshot snapshot) - { - IEventStream stream; - if (this.streams.TryGetValue(id, out stream)) - return stream; - - stream = snapshot == null - ? this.eventStore.OpenStream(id, 0, version) - : this.eventStore.OpenStream(snapshot, version); - - return this.streams[id] = stream; - } - + public virtual TAggregate GetById(Guid id, int versionToLoad) where TAggregate : class, IAggregate + { + var snapshot = this.GetSnapshot(id, versionToLoad); + var stream = this.OpenStream(id, versionToLoad, snapshot); + var aggregate = this.GetAggregate(snapshot, stream); + + ApplyEventsToAggregate(versionToLoad, stream, aggregate); + + return aggregate as TAggregate; + } + private static void ApplyEventsToAggregate(int versionToLoad, IEventStream stream, IAggregate aggregate) + { + if (versionToLoad == 0 || aggregate.Version < versionToLoad) + foreach (var @event in stream.CommittedEvents.Select(x => x.Body)) + aggregate.ApplyEvent(@event); + } + private IAggregate GetAggregate(Snapshot snapshot, IEventStream stream) + { + var memento = snapshot == null ? null : snapshot.Payload as IMemento; + return this.factory.Build(typeof(TAggregate), stream.StreamId, memento); + } + private Snapshot GetSnapshot(Guid id, int version) + { + Snapshot snapshot; + if (!this.snapshots.TryGetValue(id, out snapshot)) + this.snapshots[id] = snapshot = this.eventStore.Advanced.GetSnapshot(id, version); + + return snapshot; + } + private IEventStream OpenStream(Guid id, int version, Snapshot snapshot) + { + IEventStream stream; + if (this.streams.TryGetValue(id, out stream)) + return stream; + + stream = snapshot == null + ? this.eventStore.OpenStream(id, 0, version) + : this.eventStore.OpenStream(snapshot, version); + + return this.streams[id] = stream; + } + public virtual void Save(IAggregate aggregate, Guid commitId, Action> updateHeaders) { var headers = PrepareHeaders(aggregate, updateHeaders); @@ -123,14 +123,14 @@ public virtual void Save(IAggregate aggregate, Guid commitId, Action headers) - { - IEventStream stream; - if (!this.streams.TryGetValue(aggregate.Id, out stream)) - this.streams[aggregate.Id] = stream = this.eventStore.CreateStream(aggregate.Id); - - foreach (var item in headers) + } + private IEventStream PrepareStream(IAggregate aggregate, Dictionary headers) + { + IEventStream stream; + if (!this.streams.TryGetValue(aggregate.Id, out stream)) + this.streams[aggregate.Id] = stream = this.eventStore.CreateStream(aggregate.Id); + + foreach (var item in headers) stream.UncommittedHeaders[item.Key] = item.Value; aggregate.GetUncommittedEvents() @@ -138,24 +138,24 @@ private IEventStream PrepareStream(IAggregate aggregate, Dictionary new EventMessage { Body = x }) .ToList() .ForEach(stream.Add); - - return stream; - } - private static Dictionary PrepareHeaders(IAggregate aggregate, Action> updateHeaders) - { - var headers = new Dictionary(); - - headers[AggregateTypeHeader] = aggregate.GetType().FullName; - if (updateHeaders != null) - updateHeaders(headers); - - return headers; - } - private bool ThrowOnConflict(IEventStream stream, int skip) - { - var committed = stream.CommittedEvents.Skip(skip).Select(x => x.Body); - var uncommitted = stream.UncommittedEvents.Select(x => x.Body); - return this.conflictDetector.ConflictsWith(uncommitted, committed); - } - } + + return stream; + } + private static Dictionary PrepareHeaders(IAggregate aggregate, Action> updateHeaders) + { + var headers = new Dictionary(); + + headers[AggregateTypeHeader] = aggregate.GetType().FullName; + if (updateHeaders != null) + updateHeaders(headers); + + return headers; + } + private bool ThrowOnConflict(IEventStream stream, int skip) + { + var committed = stream.CommittedEvents.Skip(skip).Select(x => x.Body); + var uncommitted = stream.UncommittedEvents.Select(x => x.Body); + return this.conflictDetector.ConflictsWith(uncommitted, committed); + } + } } \ No newline at end of file diff --git a/src/proj/CommonDomain.Persistence.EventStore/SagaEventStoreRepository.cs b/src/proj/CommonDomain.Persistence.EventStore/SagaEventStoreRepository.cs index 438d436..bb5ef21 100644 --- a/src/proj/CommonDomain.Persistence.EventStore/SagaEventStoreRepository.cs +++ b/src/proj/CommonDomain.Persistence.EventStore/SagaEventStoreRepository.cs @@ -12,13 +12,15 @@ public class SagaEventStoreRepository : ISagaRepository, IDisposable private const string UndispatchedMessageHeader = "UndispatchedMessage."; private readonly IDictionary streams = new Dictionary(); private readonly IStoreEvents eventStore; + private readonly IConstructSagas sagaFactory; - public SagaEventStoreRepository(IStoreEvents eventStore) + public SagaEventStoreRepository(IStoreEvents eventStore, IConstructSagas sagaFactory) { - this.eventStore = eventStore; + this.eventStore = eventStore; + this.sagaFactory = sagaFactory; } - public void Dispose() + public void Dispose() { this.Dispose(true); GC.SuppressFinalize(this); @@ -37,9 +39,9 @@ protected virtual void Dispose(bool disposing) } } - public TSaga GetById(Guid sagaId) where TSaga : class, ISaga, new() + public TSaga GetById(Guid sagaId) where TSaga : class, ISaga { - return BuildSaga(this.OpenStream(sagaId)); + return BuildSaga(sagaId, this.OpenStream(sagaId)); } private IEventStream OpenStream(Guid sagaId) { @@ -59,9 +61,9 @@ private IEventStream OpenStream(Guid sagaId) return this.streams[sagaId] = stream; } - private static TSaga BuildSaga(IEventStream stream) where TSaga : class, ISaga, new() + private TSaga BuildSaga(Guid sagaId, IEventStream stream) where TSaga : class, ISaga { - var saga = new TSaga(); + var saga = sagaFactory.Build(sagaId); foreach (var @event in stream.CommittedEvents.Select(x => x.Body)) saga.Transition(@event); @@ -128,7 +130,7 @@ private static void Persist(IEventStream stream, Guid commitId) catch (StorageException e) { throw new PersistenceException(e.Message, e); - } + } } } } \ No newline at end of file diff --git a/src/proj/CommonDomain.Persistence/CommonDomain.Persistence.csproj b/src/proj/CommonDomain.Persistence/CommonDomain.Persistence.csproj index 4af358c..e90f23b 100644 --- a/src/proj/CommonDomain.Persistence/CommonDomain.Persistence.csproj +++ b/src/proj/CommonDomain.Persistence/CommonDomain.Persistence.csproj @@ -46,6 +46,7 @@ + diff --git a/src/proj/CommonDomain.Persistence/IConstructSagas.cs b/src/proj/CommonDomain.Persistence/IConstructSagas.cs new file mode 100644 index 0000000..03a3e69 --- /dev/null +++ b/src/proj/CommonDomain.Persistence/IConstructSagas.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace CommonDomain.Persistence +{ + public interface IConstructSagas + { + TSaga Build(Guid id) where TSaga : ISaga; + } +} diff --git a/src/proj/CommonDomain.Persistence/ISagaRepository.cs b/src/proj/CommonDomain.Persistence/ISagaRepository.cs index ed02c49..716654f 100644 --- a/src/proj/CommonDomain.Persistence/ISagaRepository.cs +++ b/src/proj/CommonDomain.Persistence/ISagaRepository.cs @@ -5,7 +5,7 @@ namespace CommonDomain.Persistence public interface ISagaRepository { - TSaga GetById(Guid sagaId) where TSaga : class, ISaga, new(); + TSaga GetById(Guid sagaId) where TSaga : class, ISaga; void Save(ISaga saga, Guid commitId, Action> updateHeaders); } } \ No newline at end of file From 8ec23e55c2de2f3f6e31f94aaac1b9cb894e28d5 Mon Sep 17 00:00:00 2001 From: Dmytro Aleksandrov Date: Wed, 14 Mar 2012 14:00:15 +0200 Subject: [PATCH 2/5] passed stream headers to aggregate factory --- .../EventStoreRepository.cs | 2 +- src/proj/CommonDomain.Persistence/IConstructAggregates.cs | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs b/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs index 65eb5f4..802e49a 100644 --- a/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs +++ b/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs @@ -69,7 +69,7 @@ private static void ApplyEventsToAggregate(int versionToLoad, IEventStream strea private IAggregate GetAggregate(Snapshot snapshot, IEventStream stream) { var memento = snapshot == null ? null : snapshot.Payload as IMemento; - return this.factory.Build(typeof(TAggregate), stream.StreamId, memento); + return this.factory.Build(typeof(TAggregate), stream.StreamId, memento, stream.CommittedHeaders); } private Snapshot GetSnapshot(Guid id, int version) { diff --git a/src/proj/CommonDomain.Persistence/IConstructAggregates.cs b/src/proj/CommonDomain.Persistence/IConstructAggregates.cs index 965ce57..3f1925d 100644 --- a/src/proj/CommonDomain.Persistence/IConstructAggregates.cs +++ b/src/proj/CommonDomain.Persistence/IConstructAggregates.cs @@ -1,9 +1,11 @@ +using System.Collections.Generic; + namespace CommonDomain.Persistence { using System; public interface IConstructAggregates { - IAggregate Build(Type type, Guid id, IMemento snapshot); + IAggregate Build(Type type, Guid id, IMemento snapshot, IDictionary headers); } } \ No newline at end of file From 1f3feaecb378775fdd4f4a4ca2f99cd30da789fc Mon Sep 17 00:00:00 2001 From: Dmytro Aleksandrov Date: Wed, 14 Mar 2012 16:51:57 +0200 Subject: [PATCH 3/5] passed stream headers to aggregate factory --- .../EventStoreRepository.cs | 2 +- src/proj/CommonDomain.Persistence/IConstructAggregates.cs | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs b/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs index 89e101c..f28b4ac 100644 --- a/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs +++ b/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs @@ -69,7 +69,7 @@ private static void ApplyEventsToAggregate(int versionToLoad, IEventStream strea private IAggregate GetAggregate(Snapshot snapshot, IEventStream stream) { var memento = snapshot == null ? null : snapshot.Payload as IMemento; - return this.factory.Build(typeof(TAggregate), stream.StreamId, memento); + return this.factory.Build(typeof(TAggregate), stream.StreamId, memento, stream.CommittedHeaders); } private Snapshot GetSnapshot(Guid id, int version) { diff --git a/src/proj/CommonDomain.Persistence/IConstructAggregates.cs b/src/proj/CommonDomain.Persistence/IConstructAggregates.cs index 965ce57..3f1925d 100644 --- a/src/proj/CommonDomain.Persistence/IConstructAggregates.cs +++ b/src/proj/CommonDomain.Persistence/IConstructAggregates.cs @@ -1,9 +1,11 @@ +using System.Collections.Generic; + namespace CommonDomain.Persistence { using System; public interface IConstructAggregates { - IAggregate Build(Type type, Guid id, IMemento snapshot); + IAggregate Build(Type type, Guid id, IMemento snapshot, IDictionary headers); } } \ No newline at end of file From fbee824aa82ec27376cb02243ac6118978353b33 Mon Sep 17 00:00:00 2001 From: Dmytro Aleksandrov Date: Sun, 30 Sep 2012 08:36:40 +0300 Subject: [PATCH 4/5] Issue #28 - stream caching disabled --- .../EventStoreRepository.cs | 114 +++++++----------- 1 file changed, 42 insertions(+), 72 deletions(-) diff --git a/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs b/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs index 802e49a..8b18b4d 100644 --- a/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs +++ b/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs @@ -6,11 +6,9 @@ namespace CommonDomain.Persistence.EventStore using global::EventStore; using global::EventStore.Persistence; - public class EventStoreRepository : IRepository, IDisposable + public class EventStoreRepository : IRepository { private const string AggregateTypeHeader = "AggregateType"; - private readonly IDictionary snapshots = new Dictionary(); - private readonly IDictionary streams = new Dictionary(); private readonly IStoreEvents eventStore; private readonly IConstructAggregates factory; private readonly IDetectConflicts conflictDetector; @@ -25,26 +23,6 @@ public EventStoreRepository( this.conflictDetector = conflictDetector; } - public void Dispose() - { - this.Dispose(true); - GC.SuppressFinalize(this); - } - protected virtual void Dispose(bool disposing) - { - if (!disposing) - return; - - lock (this.streams) - { - foreach (var stream in this.streams) - stream.Value.Dispose(); - - this.snapshots.Clear(); - this.streams.Clear(); - } - } - public virtual TAggregate GetById(Guid id) where TAggregate : class, IAggregate { return GetById(id, int.MaxValue); @@ -52,13 +30,15 @@ public virtual TAggregate GetById(Guid id) where TAggregate : class, public virtual TAggregate GetById(Guid id, int versionToLoad) where TAggregate : class, IAggregate { - var snapshot = this.GetSnapshot(id, versionToLoad); - var stream = this.OpenStream(id, versionToLoad, snapshot); - var aggregate = this.GetAggregate(snapshot, stream); + var snapshot = GetSnapshot(id, versionToLoad); + using (var stream = OpenStream(id, versionToLoad, snapshot)) + { + var aggregate = GetAggregate(snapshot, stream); - ApplyEventsToAggregate(versionToLoad, stream, aggregate); + ApplyEventsToAggregate(versionToLoad, stream, aggregate); - return aggregate as TAggregate; + return aggregate as TAggregate; + } } private static void ApplyEventsToAggregate(int versionToLoad, IEventStream stream, IAggregate aggregate) { @@ -69,27 +49,17 @@ private static void ApplyEventsToAggregate(int versionToLoad, IEventStream strea private IAggregate GetAggregate(Snapshot snapshot, IEventStream stream) { var memento = snapshot == null ? null : snapshot.Payload as IMemento; - return this.factory.Build(typeof(TAggregate), stream.StreamId, memento, stream.CommittedHeaders); + return factory.Build(typeof(TAggregate), stream.StreamId, memento, stream.CommittedHeaders); } private Snapshot GetSnapshot(Guid id, int version) { - Snapshot snapshot; - if (!this.snapshots.TryGetValue(id, out snapshot)) - this.snapshots[id] = snapshot = this.eventStore.Advanced.GetSnapshot(id, version); - - return snapshot; + return eventStore.Advanced.GetSnapshot(id, version); } private IEventStream OpenStream(Guid id, int version, Snapshot snapshot) { - IEventStream stream; - if (this.streams.TryGetValue(id, out stream)) - return stream; - - stream = snapshot == null - ? this.eventStore.OpenStream(id, 0, version) - : this.eventStore.OpenStream(snapshot, version); - - return this.streams[id] = stream; + return snapshot == null + ? eventStore.OpenStream(id, 0, version) + : eventStore.OpenStream(snapshot, version); } public virtual void Save(IAggregate aggregate, Guid commitId, Action> updateHeaders) @@ -97,38 +67,38 @@ public virtual void Save(IAggregate aggregate, Guid commitId, Action headers) { - IEventStream stream; - if (!this.streams.TryGetValue(aggregate.Id, out stream)) - this.streams[aggregate.Id] = stream = this.eventStore.CreateStream(aggregate.Id); + IEventStream stream = eventStore.OpenStream(aggregate.Id, 0, int.MaxValue); foreach (var item in headers) stream.UncommittedHeaders[item.Key] = item.Value; @@ -155,7 +125,7 @@ private bool ThrowOnConflict(IEventStream stream, int skip) { var committed = stream.CommittedEvents.Skip(skip).Select(x => x.Body); var uncommitted = stream.UncommittedEvents.Select(x => x.Body); - return this.conflictDetector.ConflictsWith(uncommitted, committed); + return conflictDetector.ConflictsWith(uncommitted, committed); } } } \ No newline at end of file From f6ef48afca2821712c6f8238783e64f72cdb7d89 Mon Sep 17 00:00:00 2001 From: zxsanny Date: Thu, 25 Apr 2013 13:36:45 +0300 Subject: [PATCH 5/5] Added support for applying Ilist (for specific converter Event > IList) --- nuget/CommonDomain.nuspec | 2 +- src/proj/CommonDomain.Core/AggregateBase.cs | 7 +++++-- .../CommonDomain.Persistence.EventStore.csproj | 5 +++-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/nuget/CommonDomain.nuspec b/nuget/CommonDomain.nuspec index 7d3da07..34c6b9b 100644 --- a/nuget/CommonDomain.nuspec +++ b/nuget/CommonDomain.nuspec @@ -12,7 +12,7 @@ A domain project for quickly implementing CQRS functionality in domain models. A domain project for quickly implementing CQRS functionality in domain models. - + diff --git a/src/proj/CommonDomain.Core/AggregateBase.cs b/src/proj/CommonDomain.Core/AggregateBase.cs index ceace40..cf07553 100644 --- a/src/proj/CommonDomain.Core/AggregateBase.cs +++ b/src/proj/CommonDomain.Core/AggregateBase.cs @@ -53,8 +53,11 @@ protected void RaiseEvent(object @event) } void IAggregate.ApplyEvent(object @event) { - this.RegisteredRoutes.Dispatch(@event); - this.Version++; + foreach (var item in ( ( @event is IEnumerable ) ? @event as IEnumerable : new []{ @event } )) + { + this.RegisteredRoutes.Dispatch(item); + this.Version++; + } } ICollection IAggregate.GetUncommittedEvents() { diff --git a/src/proj/CommonDomain.Persistence.EventStore/CommonDomain.Persistence.EventStore.csproj b/src/proj/CommonDomain.Persistence.EventStore/CommonDomain.Persistence.EventStore.csproj index 7585c39..db53fea 100644 --- a/src/proj/CommonDomain.Persistence.EventStore/CommonDomain.Persistence.EventStore.csproj +++ b/src/proj/CommonDomain.Persistence.EventStore/CommonDomain.Persistence.EventStore.csproj @@ -71,8 +71,9 @@ - - ..\..\packages\EventStore.3.0.11305.44\lib\net40\EventStore.dll + + False + ..\..\..\_lib\EventStore.dll