diff --git a/src/proj/CommonDomain.Persistence.EventStore/CommonDomain.Persistence.EventStore.csproj b/src/proj/CommonDomain.Persistence.EventStore/CommonDomain.Persistence.EventStore.csproj
index 7585c39..1d849d0 100644
--- a/src/proj/CommonDomain.Persistence.EventStore/CommonDomain.Persistence.EventStore.csproj
+++ b/src/proj/CommonDomain.Persistence.EventStore/CommonDomain.Persistence.EventStore.csproj
@@ -54,6 +54,7 @@
Code
+
diff --git a/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs b/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs
index 89e101c..2781328 100644
--- a/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs
+++ b/src/proj/CommonDomain.Persistence.EventStore/EventStoreRepository.cs
@@ -1,97 +1,100 @@
-namespace CommonDomain.Persistence.EventStore
-{
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using global::EventStore;
- using global::EventStore.Persistence;
-
- public class EventStoreRepository : IRepository, IDisposable
- {
- private const string AggregateTypeHeader = "AggregateType";
- private readonly IDictionary snapshots = new Dictionary();
- private readonly IDictionary streams = new Dictionary();
- private readonly IStoreEvents eventStore;
- private readonly IConstructAggregates factory;
- private readonly IDetectConflicts conflictDetector;
-
- public EventStoreRepository(
- IStoreEvents eventStore,
- IConstructAggregates factory,
- IDetectConflicts conflictDetector)
- {
- this.eventStore = eventStore;
- this.factory = factory;
- this.conflictDetector = conflictDetector;
- }
-
- public void Dispose()
- {
- this.Dispose(true);
- GC.SuppressFinalize(this);
- }
- protected virtual void Dispose(bool disposing)
- {
- if (!disposing)
- return;
-
- lock (this.streams)
- {
- foreach (var stream in this.streams)
- stream.Value.Dispose();
-
- this.snapshots.Clear();
- this.streams.Clear();
- }
+namespace CommonDomain.Persistence.EventStore
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using global::EventStore;
+ using global::EventStore.Persistence;
+
+ public class EventStoreRepository : IRepository, IDisposable
+ {
+ private const string AggregateTypeHeader = "AggregateType";
+ private readonly IDictionary snapshots = new Dictionary();
+ private readonly IDictionary streams = new Dictionary();
+ private readonly IStoreEvents eventStore;
+ private readonly IConstructAggregates factory;
+ private readonly IDetectConflicts conflictDetector;
+
+ public EventStoreRepository(
+ IStoreEvents eventStore,
+ IConstructAggregates factory,
+ IDetectConflicts conflictDetector)
+ {
+ this.eventStore = eventStore;
+ this.factory = factory;
+ this.conflictDetector = conflictDetector;
+ }
+
+ public void Dispose()
+ {
+ this.Dispose(true);
+ GC.SuppressFinalize(this);
+ }
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!disposing)
+ return;
+
+ lock (this.streams)
+ {
+ foreach (var stream in this.streams)
+ stream.Value.Dispose();
+
+ this.snapshots.Clear();
+ this.streams.Clear();
+ }
}
- public virtual TAggregate GetById(Guid id) where TAggregate : class, IAggregate
+ public virtual IAggregate GetById(Type aggregateType, Guid id)
{
- return GetById(id, int.MaxValue);
+ return GetById(aggregateType, id, int.MaxValue);
}
- public virtual TAggregate GetById(Guid id, int versionToLoad) where TAggregate : class, IAggregate
- {
- var snapshot = this.GetSnapshot(id, versionToLoad);
- var stream = this.OpenStream(id, versionToLoad, snapshot);
- var aggregate = this.GetAggregate(snapshot, stream);
-
- ApplyEventsToAggregate(versionToLoad, stream, aggregate);
-
- return aggregate as TAggregate;
- }
- private static void ApplyEventsToAggregate(int versionToLoad, IEventStream stream, IAggregate aggregate)
- {
- if (versionToLoad == 0 || aggregate.Version < versionToLoad)
- foreach (var @event in stream.CommittedEvents.Select(x => x.Body))
- aggregate.ApplyEvent(@event);
- }
- private IAggregate GetAggregate(Snapshot snapshot, IEventStream stream)
- {
- var memento = snapshot == null ? null : snapshot.Payload as IMemento;
- return this.factory.Build(typeof(TAggregate), stream.StreamId, memento);
- }
- private Snapshot GetSnapshot(Guid id, int version)
- {
- Snapshot snapshot;
- if (!this.snapshots.TryGetValue(id, out snapshot))
- this.snapshots[id] = snapshot = this.eventStore.Advanced.GetSnapshot(id, version);
-
- return snapshot;
- }
- private IEventStream OpenStream(Guid id, int version, Snapshot snapshot)
- {
- IEventStream stream;
- if (this.streams.TryGetValue(id, out stream))
- return stream;
-
- stream = snapshot == null
- ? this.eventStore.OpenStream(id, 0, version)
- : this.eventStore.OpenStream(snapshot, version);
-
- return this.streams[id] = stream;
- }
-
+ public virtual IAggregate GetById(Type aggregateType, Guid id, int versionToLoad)
+ {
+ var snapshot = this.GetSnapshot(id, versionToLoad);
+ var stream = this.OpenStream(id, versionToLoad, snapshot);
+ var aggregate = this.GetAggregate(aggregateType, snapshot, stream);
+
+ ApplyEventsToAggregate(versionToLoad, stream, aggregate);
+
+ return aggregate;
+ }
+ private static void ApplyEventsToAggregate(int versionToLoad, IEventStream stream, IAggregate aggregate)
+ {
+ if (versionToLoad == 0 || aggregate.Version < versionToLoad)
+ foreach (var @event in stream.CommittedEvents.Select(x => x.Body))
+ aggregate.ApplyEvent(@event);
+ }
+ private IAggregate GetAggregate(Type aggregateType, Snapshot snapshot, IEventStream stream)
+ {
+ if (aggregateType == null) throw new ArgumentNullException("aggregateType");
+ if (!aggregateType.Implements(typeof(IAggregate))) throw new ArgumentException(ExceptionMessages.NotAggregateType, "aggregateType");
+
+ var memento = snapshot == null ? null : snapshot.Payload as IMemento;
+ return this.factory.Build(aggregateType, stream.StreamId, memento);
+ }
+ private Snapshot GetSnapshot(Guid id, int version)
+ {
+ Snapshot snapshot;
+ if (!this.snapshots.TryGetValue(id, out snapshot))
+ this.snapshots[id] = snapshot = this.eventStore.Advanced.GetSnapshot(id, version);
+
+ return snapshot;
+ }
+ private IEventStream OpenStream(Guid id, int version, Snapshot snapshot)
+ {
+ IEventStream stream;
+ if (this.streams.TryGetValue(id, out stream))
+ return stream;
+
+ stream = snapshot == null
+ ? this.eventStore.OpenStream(id, 0, version)
+ : this.eventStore.OpenStream(snapshot, version);
+
+ return this.streams[id] = stream;
+ }
+
public virtual void Save(IAggregate aggregate, Guid commitId, Action> updateHeaders)
{
var headers = PrepareHeaders(aggregate, updateHeaders);
@@ -123,14 +126,14 @@ public virtual void Save(IAggregate aggregate, Guid commitId, Action headers)
- {
- IEventStream stream;
- if (!this.streams.TryGetValue(aggregate.Id, out stream))
- this.streams[aggregate.Id] = stream = this.eventStore.CreateStream(aggregate.Id);
-
- foreach (var item in headers)
+ }
+ private IEventStream PrepareStream(IAggregate aggregate, Dictionary headers)
+ {
+ IEventStream stream;
+ if (!this.streams.TryGetValue(aggregate.Id, out stream))
+ this.streams[aggregate.Id] = stream = this.eventStore.CreateStream(aggregate.Id);
+
+ foreach (var item in headers)
stream.UncommittedHeaders[item.Key] = item.Value;
aggregate.GetUncommittedEvents()
@@ -138,24 +141,24 @@ private IEventStream PrepareStream(IAggregate aggregate, Dictionary new EventMessage { Body = x })
.ToList()
.ForEach(stream.Add);
-
- return stream;
- }
- private static Dictionary PrepareHeaders(IAggregate aggregate, Action> updateHeaders)
- {
- var headers = new Dictionary();
-
- headers[AggregateTypeHeader] = aggregate.GetType().FullName;
- if (updateHeaders != null)
- updateHeaders(headers);
-
- return headers;
- }
- private bool ThrowOnConflict(IEventStream stream, int skip)
- {
- var committed = stream.CommittedEvents.Skip(skip).Select(x => x.Body);
- var uncommitted = stream.UncommittedEvents.Select(x => x.Body);
- return this.conflictDetector.ConflictsWith(uncommitted, committed);
- }
- }
+
+ return stream;
+ }
+ private static Dictionary PrepareHeaders(IAggregate aggregate, Action> updateHeaders)
+ {
+ var headers = new Dictionary();
+
+ headers[AggregateTypeHeader] = aggregate.GetType().FullName;
+ if (updateHeaders != null)
+ updateHeaders(headers);
+
+ return headers;
+ }
+ private bool ThrowOnConflict(IEventStream stream, int skip)
+ {
+ var committed = stream.CommittedEvents.Skip(skip).Select(x => x.Body);
+ var uncommitted = stream.UncommittedEvents.Select(x => x.Body);
+ return this.conflictDetector.ConflictsWith(uncommitted, committed);
+ }
+ }
}
\ No newline at end of file
diff --git a/src/proj/CommonDomain.Persistence.EventStore/ExceptionMessages.Designer.cs b/src/proj/CommonDomain.Persistence.EventStore/ExceptionMessages.Designer.cs
index bdf12ce..b644858 100644
--- a/src/proj/CommonDomain.Persistence.EventStore/ExceptionMessages.Designer.cs
+++ b/src/proj/CommonDomain.Persistence.EventStore/ExceptionMessages.Designer.cs
@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
//
// This code was generated by a tool.
-// Runtime Version:4.0.30319.1
+// Runtime Version:4.0.30319.239
//
// Changes to this file may cause incorrect behavior and will be lost if
// the code is regenerated.
@@ -69,6 +69,15 @@ internal static string ConflictingCommand {
}
}
+ ///
+ /// Looks up a localized string similar to The argument must implement IAggregate..
+ ///
+ internal static string NotAggregateType {
+ get {
+ return ResourceManager.GetString("NotAggregateType", resourceCulture);
+ }
+ }
+
///
/// Looks up a localized string similar to There were no uncommitted changes to persist. When attempting to save an aggregate there must be at least one uncommitted event to persist..
///
diff --git a/src/proj/CommonDomain.Persistence.EventStore/ExceptionMessages.resx b/src/proj/CommonDomain.Persistence.EventStore/ExceptionMessages.resx
index d8014a6..9a753e0 100644
--- a/src/proj/CommonDomain.Persistence.EventStore/ExceptionMessages.resx
+++ b/src/proj/CommonDomain.Persistence.EventStore/ExceptionMessages.resx
@@ -120,6 +120,9 @@
The command issued conflicted with another command that was sent by another user, actor, or process in the system. The change could not be automatically merged. Please review the data that has changed and try your change again.
+
+ The argument must implement IAggregate.
+
There were no uncommitted changes to persist. When attempting to save an aggregate there must be at least one uncommitted event to persist.
diff --git a/src/proj/CommonDomain.Persistence.EventStore/TypeExtensions.cs b/src/proj/CommonDomain.Persistence.EventStore/TypeExtensions.cs
new file mode 100644
index 0000000..658cbc8
--- /dev/null
+++ b/src/proj/CommonDomain.Persistence.EventStore/TypeExtensions.cs
@@ -0,0 +1,17 @@
+using System;
+using System.Linq;
+
+namespace CommonDomain.Persistence.EventStore
+{
+ public static class TypeExtensions
+ {
+ public static Boolean Implements(this Type type, Type interfaceType)
+ {
+ return interfaceType != null &&
+ type != null &&
+ !type.IsAbstract &&
+ type.IsClass &&
+ type.GetInterfaces().Any(item => interfaceType.IsGenericTypeDefinition ? item.IsGenericType && item.GetGenericTypeDefinition() == interfaceType : item == interfaceType);
+ }
+ }
+}
diff --git a/src/proj/CommonDomain.Persistence/IRepository.cs b/src/proj/CommonDomain.Persistence/IRepository.cs
index 03de363..897e888 100644
--- a/src/proj/CommonDomain.Persistence/IRepository.cs
+++ b/src/proj/CommonDomain.Persistence/IRepository.cs
@@ -4,9 +4,9 @@ namespace CommonDomain.Persistence
using System.Collections.Generic;
public interface IRepository
- {
- TAggregate GetById(Guid id) where TAggregate : class, IAggregate;
- TAggregate GetById(Guid id, int version) where TAggregate : class, IAggregate;
+ {
+ IAggregate GetById(Type aggregateType, Guid id);
+ IAggregate GetById(Type aggregateType, Guid id, int version);
void Save(IAggregate aggregate, Guid commitId, Action> updateHeaders);
}
}
\ No newline at end of file
diff --git a/src/proj/CommonDomain.Persistence/RepositoryExtensions.cs b/src/proj/CommonDomain.Persistence/RepositoryExtensions.cs
index 37e1acb..ab5b993 100644
--- a/src/proj/CommonDomain.Persistence/RepositoryExtensions.cs
+++ b/src/proj/CommonDomain.Persistence/RepositoryExtensions.cs
@@ -2,11 +2,21 @@
namespace CommonDomain.Persistence
{
- public static class RepositoryExtensions
+ public static class RepositoryExtensions
+ {
+ public static TAggregate GetById(this IRepository repository, Guid id) where TAggregate : class, IAggregate
{
- public static void Save(this IRepository repository, IAggregate aggregate, Guid commitId)
- {
- repository.Save(aggregate, commitId, a => {});
- }
+ return repository.GetById(typeof(TAggregate), id, int.MaxValue) as TAggregate;
}
+
+ public static TAggregate GetById(this IRepository repository, Guid id, int versionToLoad) where TAggregate : class, IAggregate
+ {
+ return repository.GetById(typeof(TAggregate), id, versionToLoad) as TAggregate;
+ }
+
+ public static void Save(this IRepository repository, IAggregate aggregate, Guid commitId)
+ {
+ repository.Save(aggregate, commitId, a => { });
+ }
+ }
}
\ No newline at end of file