diff --git a/Directory.Packages.props b/Directory.Packages.props index 26b5371b..7a38340c 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -22,7 +22,13 @@ 0.77.3 + + + + + + diff --git a/Eventuous.slnx b/Eventuous.slnx index 9866ba98..a4215337 100644 --- a/Eventuous.slnx +++ b/Eventuous.slnx @@ -114,6 +114,7 @@ + @@ -140,6 +141,11 @@ + + + + + diff --git a/samples/banking/Banking.Api/Banking.Api.csproj b/samples/banking/Banking.Api/Banking.Api.csproj new file mode 100644 index 00000000..53090d69 --- /dev/null +++ b/samples/banking/Banking.Api/Banking.Api.csproj @@ -0,0 +1,25 @@ + + + + net10.0 + enable + enable + + + + + + + + + + + + + + + + + + + diff --git a/samples/banking/Banking.Api/Program.cs b/samples/banking/Banking.Api/Program.cs new file mode 100644 index 00000000..bf985e32 --- /dev/null +++ b/samples/banking/Banking.Api/Program.cs @@ -0,0 +1,65 @@ +using Banking.Api.Services; +using Banking.Domain.Accounts; +using Eventuous.KurrentDB; +using Microsoft.AspNetCore.Mvc; + +var builder = WebApplication.CreateBuilder(args); + +//---------------------------------------------------------------- +// Snapshot store registration + +var postgresSnapshotDbConnectionString = builder.Configuration.GetConnectionString("postgresSnapshotsDb"); +if (postgresSnapshotDbConnectionString == null) { + throw new InvalidOperationException("postgres snapshots db conenction string should be not null"); +} + +//builder.Services.AddPostgresSnapshotStore(postgresSnapshotDbConnectionString, initializeDatabase: true); + +var sqlServerSnapshotsDbConnectionString = builder.Configuration.GetConnectionString("sqlServerSnapshotsDb"); +if (sqlServerSnapshotsDbConnectionString == null) { + throw new InvalidOperationException("sqlServer snapshots db connection string should be not null"); +} + +//builder.Services.AddSqlServerSnapshotStore(sqlServerSnapshotsDbConnectionString, initializeDatabase: true); + +var mongoDbSnapshotsDbConnectionString = builder.Configuration.GetConnectionString("mongoDbSnapshotsDb"); +if (mongoDbSnapshotsDbConnectionString == null) { + throw new InvalidOperationException("mongodb snapshots db connection string should be not null"); +} + +//builder.Services.AddMongoSnapshotStore(mongoDbSnapshotsDbConnectionString, initializeIndexes: true); + +var redisConnectionString = builder.Configuration.GetConnectionString("redis"); +if (redisConnectionString == null) { + throw new InvalidOperationException("redis connection string should be not null"); +} + +builder.Services.AddRedisSnapshotStore(redisConnectionString, database: 0); + +//---------------------------------------------------------------- +// Event store registration + +builder.AddKurrentDBClient("kurrentdb"); +builder.Services.AddEventStore(); + +//---------------------------------------------------------------- + +builder.Services.AddCommandService(); + +var app = builder.Build(); + +app.MapGet("/accounts/{id}/deposit/{amount}", async ([FromRoute] string id, [FromRoute] decimal amount, [FromServices] AccountService accountService) => { + var cmd = new AccountService.Deposit(id, amount); + var res = await accountService.Handle(cmd, default); + + return res.Match(ok => ok, err => err); +}); + +app.MapGet("/accounts/{id}/withdraw/{amount}", async ([FromRoute] string id, [FromRoute] decimal amount, [FromServices] AccountService accountService) => { + var cmd = new AccountService.Withdraw(id, amount); + var res = await accountService.Handle(cmd, default); + + return res.Match(ok => ok, err => err); +}); + +app.Run(); diff --git a/samples/banking/Banking.Api/Services/AccountService.cs b/samples/banking/Banking.Api/Services/AccountService.cs new file mode 100644 index 00000000..3445be5d --- /dev/null +++ b/samples/banking/Banking.Api/Services/AccountService.cs @@ -0,0 +1,35 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Banking.Domain.Accounts; +using Eventuous; + +namespace Banking.Api.Services; + +public class AccountService : CommandService { + public record Deposit(string AccountId, decimal Amount); + public record Withdraw(string AccountId, decimal Amount); + + public AccountService(IEventStore store, ISnapshotStore snapshotStore) : base(store, snapshotStore: snapshotStore) { + + UseSnapshotStrategy( + predicate: (events, _) => events.Count() >= 5, + produce: (_, state) => new AccountEvents.V1.Snapshot(state.Balance)); + + On() + .InState(ExpectedState.Any) + .GetStream(cmd => StreamName.ForState(cmd.AccountId)) + .Act((_, __, cmd) => [new AccountEvents.V1.Deposited(cmd.Amount)]); + + On() + .InState(ExpectedState.Any) + .GetStream(cmd => StreamName.ForState(cmd.AccountId)) + .Act(static (state, __, cmd) => { + if (state.Balance < cmd.Amount) { + throw new InvalidOperationException(); + } + + return [new AccountEvents.V1.Withdrawn(cmd.Amount)]; + }); + } +} diff --git a/samples/banking/Banking.Api/appsettings.Development.json b/samples/banking/Banking.Api/appsettings.Development.json new file mode 100644 index 00000000..0c208ae9 --- /dev/null +++ b/samples/banking/Banking.Api/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + } +} diff --git a/samples/banking/Banking.Api/appsettings.json b/samples/banking/Banking.Api/appsettings.json new file mode 100644 index 00000000..10f68b8c --- /dev/null +++ b/samples/banking/Banking.Api/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "AllowedHosts": "*" +} diff --git a/samples/banking/Banking.AppHost/AppHost.cs b/samples/banking/Banking.AppHost/AppHost.cs new file mode 100644 index 00000000..f6101a37 --- /dev/null +++ b/samples/banking/Banking.AppHost/AppHost.cs @@ -0,0 +1,37 @@ +var builder = DistributedApplication.CreateBuilder(args); + +var kurrentdb = builder.AddKurrentDB("eventuous-kurrentdb", 2113) + .WithEnvironment("EVENTSTORE_ENABLE_ATOM_PUB_OVER_HTTP", "true"); + +var postgres = builder.AddPostgres("eventuous-postgres") + .WithPgWeb(); + +var postgresSnapshotsDb = postgres.AddDatabase("snapshots"); + +var sqlServer = builder.AddSqlServer("eventuous-sqlserver"); +var sqlServerSnapshotsDb = sqlServer.AddDatabase("eventuous-sqlserver-snapshotsdb", "snapshots"); + +var mongodb = builder.AddMongoDB("eventuous-mongodb") + .WithMongoExpress(); + +var mongodbSnapshotsDb = mongodb.AddDatabase("eventuous-mongodb-snapshotsdb", "snapshots"); + +var redis = builder.AddRedis("eventuous-redis") + .WithRedisInsight(); + +builder + .AddProject("banking-api") + .WithReference(kurrentdb, "kurrentdb") + .WaitFor(kurrentdb) + .WithReference(postgresSnapshotsDb, "postgresSnapshotsDb") + .WaitFor(postgresSnapshotsDb) + .WithReference(sqlServerSnapshotsDb, "sqlServerSnapshotsDb") + .WaitFor(sqlServerSnapshotsDb) + .WithReference(mongodbSnapshotsDb, "mongoDbSnapshotsDb") + .WaitFor(mongodbSnapshotsDb) + .WithReference(redis, "redis") + .WaitFor(redis); + +builder + .Build() + .Run(); diff --git a/samples/banking/Banking.AppHost/Banking.AppHost.csproj b/samples/banking/Banking.AppHost/Banking.AppHost.csproj new file mode 100644 index 00000000..a728b1a0 --- /dev/null +++ b/samples/banking/Banking.AppHost/Banking.AppHost.csproj @@ -0,0 +1,23 @@ + + + + Exe + net10.0 + enable + enable + 3f5f156a-5139-482c-a37e-99163d5f39d7 + + + + + + + + + + + + + + + diff --git a/samples/banking/Banking.AppHost/appsettings.Development.json b/samples/banking/Banking.AppHost/appsettings.Development.json new file mode 100644 index 00000000..0c208ae9 --- /dev/null +++ b/samples/banking/Banking.AppHost/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + } +} diff --git a/samples/banking/Banking.AppHost/appsettings.json b/samples/banking/Banking.AppHost/appsettings.json new file mode 100644 index 00000000..31c092aa --- /dev/null +++ b/samples/banking/Banking.AppHost/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning", + "Aspire.Hosting.Dcp": "Warning" + } + } +} diff --git a/samples/banking/Banking.Domain/Accounts/AccountEvents.cs b/samples/banking/Banking.Domain/Accounts/AccountEvents.cs new file mode 100644 index 00000000..304180c7 --- /dev/null +++ b/samples/banking/Banking.Domain/Accounts/AccountEvents.cs @@ -0,0 +1,16 @@ +using Eventuous; + +namespace Banking.Domain.Accounts; + +public static class AccountEvents { + public static class V1 { + [EventType("V1.Deposited")] + public record Deposited(decimal Amount); + + [EventType("V1.Withdrawn")] + public record Withdrawn(decimal Amount); + + [EventType("V1.Snapshot")] + public record Snapshot(decimal Balance); + } +} diff --git a/samples/banking/Banking.Domain/Accounts/AccountState.cs b/samples/banking/Banking.Domain/Accounts/AccountState.cs new file mode 100644 index 00000000..86c6cc58 --- /dev/null +++ b/samples/banking/Banking.Domain/Accounts/AccountState.cs @@ -0,0 +1,26 @@ +using Eventuous; + +namespace Banking.Domain.Accounts; + +[Snapshots(typeof(AccountEvents.V1.Snapshot), StorageStrategy = SnapshotStorageStrategy.SeparateStore)] +public record AccountState : State { + public decimal Balance { get; init; } + + public AccountState() { + On(When); + On(When); + On(When); + } + + private AccountState When(AccountState state, AccountEvents.V1.Snapshot e) => state with { + Balance = e.Balance + }; + + private AccountState When(AccountState state, AccountEvents.V1.Deposited e) => state with { + Balance = state.Balance + e.Amount + }; + + private AccountState When(AccountState state, AccountEvents.V1.Withdrawn e) => state with { + Balance = state.Balance - e.Amount + }; +} diff --git a/samples/banking/Banking.Domain/Banking.Domain.csproj b/samples/banking/Banking.Domain/Banking.Domain.csproj new file mode 100644 index 00000000..d482b1e0 --- /dev/null +++ b/samples/banking/Banking.Domain/Banking.Domain.csproj @@ -0,0 +1,6 @@ + + + + + + diff --git a/samples/banking/README.md b/samples/banking/README.md new file mode 100644 index 00000000..3addf873 --- /dev/null +++ b/samples/banking/README.md @@ -0,0 +1,13 @@ +### Project description + +A small example demonstrating the use of snapshots stored as an event in the aggregate stream + +### How to use snapshots? + +First, declare the type of event that represents a snapshot of the current state. In this example, see AccountEvents.V1.Snapshot. This name is chosen just for the example; ideally, it should correspond to something from your domain, for example, the closing of a shift + +Once we have a declared snapshot type, we need to link it to the state. Note the declaration of the AccountState type - it has the Snapshots attribute applied to it, which is given a list of types that are snapshot events. This attribute helps the EventReader understand that it needs to read the stream in reverse order up to the first encountered snapshot, and not load events prior to the snapshot + +You also need to define the logic according to which the snapshot event will occur. Since a snapshot is a regular event, you can create it at any moment during command processing, based on the available data to make the decision. An example can be seen in AccountService - the ApplySnapshot method. + +Essentially, this is all you need to start using snapshots (it is assumed that a package with source generators has been added to the project). \ No newline at end of file diff --git a/src/Core/gen/Eventuous.Shared.Generators/Constants.cs b/src/Core/gen/Eventuous.Shared.Generators/Constants.cs index 42b3fd4d..2dfac918 100644 --- a/src/Core/gen/Eventuous.Shared.Generators/Constants.cs +++ b/src/Core/gen/Eventuous.Shared.Generators/Constants.cs @@ -5,6 +5,9 @@ namespace Eventuous.Shared.Generators; internal static class Constants { public const string BaseNamespace = "Eventuous"; + public const string StateType = "State"; public const string EventTypeAttribute = "EventTypeAttribute"; public const string EventTypeAttrFqcn = $"{BaseNamespace}.{EventTypeAttribute}"; + public const string SnapshotsAttribute = "SnapshotsAttribute"; + public const string SnapshotsAttrFqcn = $"{BaseNamespace}.{SnapshotsAttribute}"; } diff --git a/src/Core/gen/Eventuous.Shared.Generators/Helpers.cs b/src/Core/gen/Eventuous.Shared.Generators/Helpers.cs new file mode 100644 index 00000000..9a04cdfa --- /dev/null +++ b/src/Core/gen/Eventuous.Shared.Generators/Helpers.cs @@ -0,0 +1,8 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous.Shared.Generators; + +internal static class Helpers { + public static string MakeGlobal(string typeName) => !typeName.StartsWith("global::") ? $"global::{typeName}" : typeName; +} diff --git a/src/Core/gen/Eventuous.Shared.Generators/SnapshotMappingsGenerator.cs b/src/Core/gen/Eventuous.Shared.Generators/SnapshotMappingsGenerator.cs new file mode 100644 index 00000000..2cd2391b --- /dev/null +++ b/src/Core/gen/Eventuous.Shared.Generators/SnapshotMappingsGenerator.cs @@ -0,0 +1,245 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Microsoft.CodeAnalysis; +using Microsoft.CodeAnalysis.CSharp; +using Microsoft.CodeAnalysis.CSharp.Syntax; +using System.Collections.Immutable; +using System.Text; +using static Eventuous.Shared.Generators.Constants; +using static Eventuous.Shared.Generators.Helpers; + +namespace Eventuous.Shared.Generators; + +[Generator(LanguageNames.CSharp)] +public sealed class SnapshotMappingsGenerator : IIncrementalGenerator { + + static Map? GetMapFromSnapshotsAttribute(GeneratorSyntaxContext context) { + var classSyntax = (ClassDeclarationSyntax)context.Node; + var classSymbol = context.SemanticModel.GetDeclaredSymbol(classSyntax); + if (classSymbol == null || !IsState(classSymbol)) return null; + + var snapshotsAttr = classSymbol.GetAttributes() + .FirstOrDefault(attr => attr.AttributeClass?.Name == "SnapshotsAttribute"); + + if (snapshotsAttr == null) return null; + + var snapshotTypes = new HashSet(); + + foreach (var arg in snapshotsAttr.ConstructorArguments) { + if (arg.Kind == TypedConstantKind.Array) { + foreach (var typeConstant in arg.Values) { + if (typeConstant.Value is ITypeSymbol typeSymbol) { + snapshotTypes.Add(MakeGlobal(typeSymbol.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat))); + } + } + } + else if (arg.Kind == TypedConstantKind.Type && arg.Value is ITypeSymbol singleType) { + snapshotTypes.Add(MakeGlobal(singleType.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat))); + } + } + + var storageStrategy = GetStorageStrategyFromAttribute(snapshotsAttr); + + var stateType = classSymbol.BaseType?.TypeArguments[0]; + if (stateType == null) return null; + + return new Map { + SnapshotTypes = snapshotTypes, + StateType = MakeGlobal(classSymbol.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat)), + StorageStrategy = storageStrategy + }; + } + + static HashSet GetTypesFromSnapshotsAttribute(AttributeData attributeData) { + var result = new HashSet(); + + foreach (var arg in attributeData.ConstructorArguments) { + if (arg.Kind == TypedConstantKind.Array) { + foreach (var typeConstant in arg.Values) { + if (typeConstant.Value is ITypeSymbol typeSymbol) { + result.Add(typeSymbol.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat)); + } + } + } + else if (arg.Kind == TypedConstantKind.Type) { + if (arg.Value is ITypeSymbol typeSymbol) { + result.Add(typeSymbol.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat)); + } + } + } + + foreach (var namedArg in attributeData.NamedArguments) { + if (namedArg.Value.Kind == TypedConstantKind.Array) { + foreach (var typeConstant in namedArg.Value.Values) { + if (typeConstant.Value is ITypeSymbol typeSymbol) { + result.Add(typeSymbol.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat)); + } + } + } + else if (namedArg.Value.Kind == TypedConstantKind.Type && namedArg.Value.Value is ITypeSymbol typeSymbol) { + result.Add(typeSymbol.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat)); + } + } + + return result; + } + + static ImmutableArray GetMapFromSnapshotsAttribute(Compilation compilation) { + var builder = ImmutableArray.CreateBuilder(); + + // Current assembly + ProcessNamespace(compilation.Assembly.GlobalNamespace); + + // Referenced assemblies + foreach (var ra in compilation.SourceModule.ReferencedAssemblySymbols) { + ProcessNamespace(ra.GlobalNamespace); + } + + return builder.ToImmutable(); + + void ProcessType(INamedTypeSymbol type) { + var attr = GetSnapshotsAttribute(type); + if (attr is not null) { + var map = new Map { + SnapshotTypes = GetTypesFromSnapshotsAttribute(attr), + StateType = MakeGlobal(type.ToDisplayString(SymbolDisplayFormat.FullyQualifiedFormat)), + StorageStrategy = GetStorageStrategyFromAttribute(attr) + }; + + builder.Add(map); + } + + foreach (var nt in type.GetTypeMembers()) { + ProcessType(nt); + } + } + + void ProcessNamespace(INamespaceSymbol ns) { + foreach (var member in ns.GetMembers()) { + switch (member) { + case INamespaceSymbol cns: + ProcessNamespace(cns); break; + case INamedTypeSymbol type: + ProcessType(type); break; + } + } + } + } + + static void Output(SourceProductionContext context, ImmutableArray maps) { + var sb = new StringBuilder(); + sb.AppendLine("// "); + sb.AppendLine("#pragma warning disable CS8019"); + sb.AppendLine("#nullable enable"); + sb.AppendLine(); + sb.AppendLine("using System;"); + sb.AppendLine("using System.Runtime.CompilerServices;"); + sb.AppendLine(); + sb.AppendLine("namespace Eventuous;"); + sb.AppendLine(); + sb.AppendLine("internal static class SnapshotTypeMappings {"); + sb.AppendLine(" [ModuleInitializer]"); + sb.AppendLine(" internal static void Initialize() {"); + + foreach (var map in maps) { + var strategyValue = map.StorageStrategy switch { + "SameStream" => "SnapshotStorageStrategy.SameStream", + "SeparateStream" => "SnapshotStorageStrategy.SeparateStream", + "SeparateStore" => "SnapshotStorageStrategy.SeparateStore", + _ => "SnapshotStorageStrategy.SameStream" + }; + + foreach (var snapshotType in map.SnapshotTypes) { + sb.AppendLine($" SnapshotTypeMap.Register(typeof({map.StateType}), typeof({snapshotType}), {strategyValue});"); + } + } + + sb.AppendLine(" }"); + sb.AppendLine("}"); + + context.AddSource("SnapshotTypeMappings.g.cs", sb.ToString()); + } + + public void Initialize(IncrementalGeneratorInitializationContext context) { + var maps = context.SyntaxProvider + .CreateSyntaxProvider( + predicate: static (s, _) => s is ClassDeclarationSyntax, + transform: static (c, _) => GetMapFromSnapshotsAttribute(c)) + .Where(static m => m is not null) + .Collect(); + + var mapsFromReferencedAssmeblies = context.CompilationProvider + .Select(static (c, _) => GetMapFromSnapshotsAttribute(c)); + + var mergedMaps = maps + .Combine(mapsFromReferencedAssmeblies) + .Select(static (pair, _) => pair.Left.AddRange((IEnumerable)pair.Right)); + + context.RegisterSourceOutput(mergedMaps, Output!); + + } + + static bool IsState(INamedTypeSymbol type) { + var baseType = type.BaseType; + + return baseType is not null + && baseType.Name == StateType + && baseType.ContainingNamespace.ToDisplayString() == BaseNamespace + && baseType.IsGenericType; + } + + static AttributeData? GetSnapshotsAttribute(ISymbol symbol) { + foreach (var data in symbol.GetAttributes()) { + var attrClass = data.AttributeClass; + if (attrClass is null) continue; + + var name = attrClass.ToDisplayString(); + if (name == SnapshotsAttrFqcn || attrClass.Name is SnapshotsAttribute) return data; + } + + return null; + } + + static string GetStorageStrategyFromAttribute(AttributeData attributeData) { + foreach (var namedArg in attributeData.NamedArguments) { + if (namedArg.Key == "StorageStrategy" && namedArg.Value.Kind == TypedConstantKind.Enum) { + // Try to get the enum value name from the typed constant + var enumType = namedArg.Value.Type; + if (enumType != null) { + // Get the enum value as a string representation + var enumValue = namedArg.Value.Value; + if (enumValue != null) { + // Try to find the enum member with this value + var enumMembers = enumType.GetMembers().OfType() + .Where(f => f.IsStatic && f.IsDefinition && f.ConstantValue != null); + + foreach (var member in enumMembers) { + if (Equals(member.ConstantValue, enumValue)) { + return member.Name; + } + } + + // Fallback: map numeric values + if (enumValue is int intValue) { + return intValue switch { + 0 => "SameStream", + 1 => "SeparateStream", + 2 => "SeparateStore", + _ => "SameStream" + }; + } + } + } + } + } + + return "SameStream"; // Default value + } + + sealed record Map { + public string StateType { get; set; } = null!; + public HashSet SnapshotTypes { get; set; } = []; + public string StorageStrategy { get; set; } = "SameStream"; + } +} \ No newline at end of file diff --git a/src/Core/gen/Eventuous.Shared.Generators/TypeMappingsGenerator.cs b/src/Core/gen/Eventuous.Shared.Generators/TypeMappingsGenerator.cs index d68b13e8..1c1899fd 100644 --- a/src/Core/gen/Eventuous.Shared.Generators/TypeMappingsGenerator.cs +++ b/src/Core/gen/Eventuous.Shared.Generators/TypeMappingsGenerator.cs @@ -7,6 +7,7 @@ using Microsoft.CodeAnalysis.CSharp; using Microsoft.CodeAnalysis.CSharp.Syntax; using static Eventuous.Shared.Generators.Constants; +using static Eventuous.Shared.Generators.Helpers; namespace Eventuous.Shared.Generators; @@ -202,6 +203,4 @@ static string SanitizeIdentifier(string name) { return sb.ToString(); } - - static string MakeGlobal(string typeName) => !typeName.StartsWith("global::") ? $"global::{typeName}" : typeName; } diff --git a/src/Core/src/Eventuous.Application/AggregateService/CommandService.cs b/src/Core/src/Eventuous.Application/AggregateService/CommandService.cs index 45111b4d..c08cae41 100644 --- a/src/Core/src/Eventuous.Application/AggregateService/CommandService.cs +++ b/src/Core/src/Eventuous.Application/AggregateService/CommandService.cs @@ -20,7 +20,8 @@ public abstract partial class CommandService<[DynamicallyAccessedMembers(Dynamic AggregateFactoryRegistry? factoryRegistry = null, StreamNameMap? streamNameMap = null, ITypeMapper? typeMap = null, - AmendEvent? amendEvent = null + AmendEvent? amendEvent = null, + ISnapshotStore? snapshotStore = null ) : ICommandService where TAggregate : Aggregate @@ -31,8 +32,9 @@ protected CommandService( AggregateFactoryRegistry? factoryRegistry = null, StreamNameMap? streamNameMap = null, ITypeMapper? typeMap = null, - AmendEvent? amendEvent = null - ) : this(store, store, factoryRegistry, streamNameMap, typeMap, amendEvent) { } + AmendEvent? amendEvent = null, + ISnapshotStore? snapshotStore = null + ) : this(store, store, factoryRegistry, streamNameMap, typeMap, amendEvent, snapshotStore) { } [PublicAPI] protected IEventReader? Reader { get; } = reader; @@ -43,6 +45,8 @@ protected CommandService( readonly AggregateFactoryRegistry _factoryRegistry = factoryRegistry ?? AggregateFactoryRegistry.Instance; readonly StreamNameMap _streamNameMap = streamNameMap ?? new StreamNameMap(); readonly ITypeMapper _typeMap = typeMap ?? TypeMap.Instance; + readonly ISnapshotStore? _snapshotStore = snapshotStore; + SnapshotStrategy? _snapshotStrategy; /// /// Returns the command handler builder for the specified command type. @@ -52,6 +56,19 @@ protected CommandService( protected IDefineExpectedState On() where TCommand : class => new CommandHandlerBuilder(this, Reader, Writer); + /// + /// Configures a snapshot strategy that determines when and how to create snapshot events. + /// The snapshot event type must be registered in for the state type. + /// + /// Function that takes all events (original + new) and state, returns true if snapshot should be created + /// Function that takes all events and state, returns the snapshot event object + protected void UseSnapshotStrategy( + Func predicate, + Func produce + ) { + _snapshotStrategy = new SnapshotStrategy(predicate, produce); + } + /// /// The command handler. Call this function from your edge (API). /// @@ -70,16 +87,17 @@ public async Task> Handle(TCommand command, Cancellatio } var aggregateId = await registeredHandler.GetId(command, cancellationToken).NoContext(); - var reader = registeredHandler.ResolveReader(command); var stream = _streamNameMap.GetStreamName(aggregateId); + var reader = registeredHandler.ResolveReader(command); + var writer = registeredHandler.ResolveWriter(command); try { var aggregate = registeredHandler.ExpectedState switch { ExpectedState.Any => await reader - .LoadAggregate(aggregateId, _streamNameMap, false, _factoryRegistry, cancellationToken) + .LoadAggregate(aggregateId, _streamNameMap, false, _factoryRegistry, _snapshotStore, cancellationToken) .NoContext(), ExpectedState.Existing => await reader - .LoadAggregate(aggregateId, _streamNameMap, true, _factoryRegistry, cancellationToken) + .LoadAggregate(aggregateId, _streamNameMap, true, _factoryRegistry, _snapshotStore, cancellationToken) .NoContext(), ExpectedState.New => Create(aggregateId), ExpectedState.Unknown => null, @@ -88,14 +106,38 @@ public async Task> Handle(TCommand command, Cancellatio var result = await registeredHandler.Handler(aggregate!, command, cancellationToken).NoContext(); + var newEvents = result.Changes.Select(x => new ProposedEvent(x, [])).ToArray(); + var newState = result.State; + + // Apply snapshot strategy if configured + if (_snapshotStrategy != null) { + var allEvents = result.Current; + if (_snapshotStrategy.Predicate(allEvents, result.State)) { + var snapshotEvent = _snapshotStrategy.Produce(allEvents, result.State); + newState = newState.When(snapshotEvent); + newEvents = [.. newEvents, new(snapshotEvent, [])]; + } + } + // Zero in the global position would mean nothing, so the receiver needs to check the Changes.Length - if (result.Changes.Count == 0) return Result.FromSuccess(result.State, [], 0); + if (newEvents.Length == 0) return Result.FromSuccess(newState, [], 0); + + // Separate snapshots from regular events based on storage strategy + var snapshotTypes = SnapshotTypeMap.GetSnapshotTypes(); + var storageStrategy = SnapshotTypeMap.GetStorageStrategy(); + var (regularEvents, snapshotEvents) = SeparateSnapshots(newEvents, snapshotTypes, storageStrategy); - var proposed = new ProposedAppend(stream, new(result.OriginalVersion), result.Changes.Select(x => new ProposedEvent(x, new())).ToArray()); + // Store regular events first + var proposed = new ProposedAppend(stream, new(result.OriginalVersion), regularEvents); var final = registeredHandler.AmendAppend?.Invoke(proposed, command) ?? proposed; - var writer = registeredHandler.ResolveWriter(command); var storeResult = await writer.Store(final, Amend, cancellationToken).NoContext(); - var changes = result.Changes.Select(x => Change.FromEvent(x, _typeMap)); + + // Handle snapshots based on strategy + if (snapshotEvents.Length > 0 && storageStrategy != SnapshotStorageStrategy.SameStream) { + await HandleSnapshots(stream, snapshotEvents, storeResult.NextExpectedVersion, storageStrategy, writer, cancellationToken).NoContext(); + } + + var changes = result.Changes.Select(x => Change.FromEvent(x, _typeMap)); Log.CommandHandled(); return Result.FromSuccess(result.State, changes, storeResult.GlobalPosition); @@ -116,4 +158,89 @@ NewStreamEvent Amend(NewStreamEvent streamEvent) { internal void AddHandler(RegisteredHandler handler) where TCommand : class => _handlers.AddHandler(handler); + + static (ProposedEvent[] RegularEvents, ProposedEvent[] SnapshotEvents) SeparateSnapshots( + ProposedEvent[] events, + HashSet snapshotTypes, + SnapshotStorageStrategy strategy + ) { + if (strategy == SnapshotStorageStrategy.SameStream || snapshotTypes.Count == 0) { + return (events, []); + } + + var regularEvents = new List(); + var snapshotEvents = new List(); + + foreach (var evt in events) { + if (evt.Data != null && snapshotTypes.Contains(evt.Data.GetType())) { + snapshotEvents.Add(evt); + } else { + regularEvents.Add(evt); + } + } + + return (regularEvents.ToArray(), snapshotEvents.ToArray()); + } + + [RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] + [RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] + async Task HandleSnapshots( + StreamName streamName, + ProposedEvent[] snapshotEvents, + long streamRevision, + SnapshotStorageStrategy strategy, + IEventWriter writer, + CancellationToken cancellationToken + ) { + if (snapshotEvents.Length == 0) return; + + // Take the last snapshot if multiple + var snapshotEvent = snapshotEvents[^1]; + + switch (strategy) { + case SnapshotStorageStrategy.SeparateStream: { + var snapshotStreamName = StreamName.ForSnapshot(streamName); + var store = writer as IEventStore; + if (store == null) { + throw new InvalidOperationException($"IEventStore is required for {nameof(SnapshotStorageStrategy.SeparateStream)} strategy. IEventWriter must implement IEventStore."); + } + + var snapshotAppend = new ProposedAppend( + snapshotStreamName, + ExpectedStreamVersion.Any, + [snapshotEvent] + ); + + var result = await writer.Store( + snapshotAppend, + (@event) => { + @event.Metadata.With("revision", streamRevision.ToString()); + return @event; + }, + cancellationToken) + .NoContext(); + + await store.TruncateStream( + snapshotStreamName, + new StreamTruncatePosition(result.NextExpectedVersion), + ExpectedStreamVersion.Any, + cancellationToken); + + break; + } + + case SnapshotStorageStrategy.SeparateStore: { + if (_snapshotStore == null) { + throw new InvalidOperationException($"Snapshot store is required for {nameof(SnapshotStorageStrategy.SeparateStore)} strategy"); + } + + var snapshot = new Snapshot { + Revision = streamRevision, + Payload = snapshotEvent.Data + }; + await _snapshotStore.Write(streamName, snapshot, cancellationToken).NoContext(); + break; + } + } + } } diff --git a/src/Core/src/Eventuous.Application/FunctionalService/CommandService.cs b/src/Core/src/Eventuous.Application/FunctionalService/CommandService.cs index 5563b2f2..cf7efd5b 100644 --- a/src/Core/src/Eventuous.Application/FunctionalService/CommandService.cs +++ b/src/Core/src/Eventuous.Application/FunctionalService/CommandService.cs @@ -8,10 +8,10 @@ namespace Eventuous; using static Diagnostics.ApplicationEventSource; [Obsolete("Use CommandService")] -public abstract class FunctionalCommandService(IEventReader reader, IEventWriter writer, ITypeMapper? typeMap = null, AmendEvent? amendEvent = null) - : CommandService(reader, writer, typeMap, amendEvent) where TState : State, new() { - protected FunctionalCommandService(IEventStore store, ITypeMapper? typeMap = null, AmendEvent? amendEvent = null) - : this(store, store, typeMap, amendEvent) { } +public abstract class FunctionalCommandService(IEventReader reader, IEventWriter writer, ITypeMapper? typeMap = null, AmendEvent? amendEvent = null, ISnapshotStore? snapshotStore = null) + : CommandService(reader, writer, typeMap, amendEvent, snapshotStore) where TState : State, new() { + protected FunctionalCommandService(IEventStore store, ITypeMapper? typeMap = null, AmendEvent? amendEvent = null, ISnapshotStore? snapshotStore = null) + : this(store, store, typeMap, amendEvent, snapshotStore) { } [Obsolete("Use On().InState(ExpectedState.New).GetStream(...).Act(...) instead")] protected void OnNew(Func getStreamName, Func action) where TCommand : class @@ -36,11 +36,14 @@ protected void OnAny(Func getStreamName, FuncEvent writer or event store /// instance or null to use the default type mapper /// Optional function to add extra information to the event before it gets stored +/// Optional snapshot store for SeparateStore strategy /// State object type -public abstract class CommandService(IEventReader reader, IEventWriter writer, ITypeMapper? typeMap = null, AmendEvent? amendEvent = null) +public abstract class CommandService(IEventReader reader, IEventWriter writer, ITypeMapper? typeMap = null, AmendEvent? amendEvent = null, ISnapshotStore? snapshotStore = null) : ICommandService where TState : State, new() { - readonly ITypeMapper _typeMap = typeMap ?? TypeMap.Instance; - readonly HandlersMap _handlers = new(); + readonly ITypeMapper _typeMap = typeMap ?? TypeMap.Instance; + readonly HandlersMap _handlers = new(); + readonly ISnapshotStore? _snapshotStore = snapshotStore; + SnapshotStrategy? _snapshotStrategy; /// /// Alternative constructor for the functional command service, which uses an instance for both reading and writing. @@ -48,8 +51,9 @@ public abstract class CommandService(IEventReader reader, IEventWriter w /// Event store /// instance or null to use the default type mapper /// Optional function to add extra information to the event before it gets stored + /// Optional snapshot store for SeparateStore strategy // ReSharper disable once UnusedMember.Global - protected CommandService(IEventStore store, ITypeMapper? typeMap = null, AmendEvent? amendEvent = null) : this(store, store, typeMap, amendEvent) { } + protected CommandService(IEventStore store, ITypeMapper? typeMap = null, AmendEvent? amendEvent = null, ISnapshotStore? snapshotStore = null) : this(store, store, typeMap, amendEvent, snapshotStore) { } /// /// Returns the command handler builder for the specified command type. @@ -58,6 +62,19 @@ protected CommandService(IEventStore store, ITypeMapper? typeMap = null, AmendEv /// protected IDefineExpectedState On() where TCommand : class => new CommandHandlerBuilder(this, reader, writer); + /// + /// Configures a snapshot strategy that determines when and how to create snapshot events. + /// The snapshot event type must be registered in for the state type. + /// + /// Function that takes all events (original + new) and state, returns true if snapshot should be created + /// Function that takes all events and state, returns the snapshot event object + protected void UseSnapshotStrategy( + Func, TState, bool> predicate, + Func, TState, object> produce + ) { + _snapshotStrategy = new SnapshotStrategy(predicate, produce); + } + /// /// Function to handle a command and return the resulting state and changes. /// @@ -82,24 +99,47 @@ public async Task> Handle(TCommand command, Cancellatio try { var loadedState = registeredHandler.ExpectedState switch { - ExpectedState.Any => await resolvedReader.LoadState(streamName, false, cancellationToken).NoContext(), - ExpectedState.Existing => await resolvedReader.LoadState(streamName, true, cancellationToken).NoContext(), + ExpectedState.Any => await resolvedReader.LoadState(streamName, false, _snapshotStore, cancellationToken).NoContext(), + ExpectedState.Existing => await resolvedReader.LoadState(streamName, true, _snapshotStore, cancellationToken).NoContext(), ExpectedState.New => new(streamName, ExpectedStreamVersion.NoStream, []), _ => throw new ArgumentOutOfRangeException(null, "Unknown expected state") }; var result = (await registeredHandler.Handler(loadedState.State, loadedState.Events, command, cancellationToken).NoContext()).ToArray(); - var newEvents = result.Select(x => new ProposedEvent(x, new())).ToArray(); + var newEvents = result.Select(x => new ProposedEvent(x, [])).ToArray(); var newState = newEvents.Aggregate(loadedState.State, (current, evt) => current.When(evt.Data)); + // Apply snapshot strategy if configured + if (_snapshotStrategy != null) { + var allEvents = loadedState.Events.Concat(result); + if (_snapshotStrategy.Predicate(allEvents, newState)) { + var snapshotEvent = _snapshotStrategy.Produce(allEvents, newState); + newState = newState.When(snapshotEvent); + result = [.. result, snapshotEvent]; + newEvents = [.. newEvents, new ProposedEvent(snapshotEvent, [])]; + } + } + // Zero in the global position would mean nothing, so the receiver needs to check the Changes.Length if (newEvents.Length == 0) return Result.FromSuccess(newState, [], 0); - var proposed = new ProposedAppend(streamName, loadedState.StreamVersion, newEvents); + // Separate snapshots from regular events based on storage strategy + var snapshotTypes = SnapshotTypeMap.GetSnapshotTypes(); + var storageStrategy = SnapshotTypeMap.GetStorageStrategy(); + var (regularEvents, snapshotEvents) = SeparateSnapshots(newEvents, snapshotTypes, storageStrategy); + + // Store regular events first + var proposed = new ProposedAppend(streamName, loadedState.StreamVersion, regularEvents); var final = registeredHandler.AmendAppend?.Invoke(proposed, command) ?? proposed; var storeResult = await resolvedWriter.Store(final, Amend, cancellationToken).NoContext(); - var changes = result.Select(x => Change.FromEvent(x, _typeMap)); + + // Handle snapshots based on strategy + if (snapshotEvents.Length > 0 && storageStrategy != SnapshotStorageStrategy.SameStream) { + await HandleSnapshots(streamName, snapshotEvents, storeResult.NextExpectedVersion, storageStrategy, resolvedWriter, cancellationToken).NoContext(); + } + + var changes = result.Select(x => Change.FromEvent(x, _typeMap)); Log.CommandHandled(); return Result.FromSuccess(newState, changes, storeResult.GlobalPosition); @@ -119,4 +159,90 @@ NewStreamEvent Amend(NewStreamEvent streamEvent) { protected static StreamName GetStream(string id) => StreamName.ForState(id); internal void AddHandler(RegisteredHandler handler) where TCommand : class => _handlers.AddHandler(handler); + + static (ProposedEvent[] RegularEvents, ProposedEvent[] SnapshotEvents) SeparateSnapshots( + ProposedEvent[] events, + HashSet snapshotTypes, + SnapshotStorageStrategy strategy + ) { + if (strategy == SnapshotStorageStrategy.SameStream || snapshotTypes.Count == 0) { + return (events, []); + } + + var regularEvents = new List(); + var snapshotEvents = new List(); + + foreach (var evt in events) { + if (evt.Data != null && snapshotTypes.Contains(evt.Data.GetType())) { + snapshotEvents.Add(evt); + } else { + regularEvents.Add(evt); + } + } + + return (regularEvents.ToArray(), snapshotEvents.ToArray()); + } + + [RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] + [RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] + async Task HandleSnapshots( + StreamName streamName, + ProposedEvent[] snapshotEvents, + long streamRevision, + SnapshotStorageStrategy strategy, + IEventWriter writer, + CancellationToken cancellationToken + ) { + if (snapshotEvents.Length == 0) return; + + // Take the last snapshot if multiple + var snapshotEvent = snapshotEvents[^1]; + + switch (strategy) { + case SnapshotStorageStrategy.SeparateStream: { + var snapshotStreamName = StreamName.ForSnapshot(streamName); + var store = writer as IEventStore; + + if (store == null) { + throw new InvalidOperationException($"IEventStore is required for {nameof(SnapshotStorageStrategy.SeparateStream)} strategy. IEventWriter must implement IEventStore."); + } + + var snapshotAppend = new ProposedAppend( + snapshotStreamName, + ExpectedStreamVersion.Any, + [snapshotEvent] + ); + + var result = await writer.Store( + snapshotAppend, + (@event) => { + @event.Metadata.With("revision", streamRevision.ToString()); + return @event; + }, + cancellationToken) + .NoContext(); + + await store.TruncateStream( + snapshotStreamName, + new StreamTruncatePosition(result.NextExpectedVersion), + ExpectedStreamVersion.Any, + cancellationToken); + + break; + } + + case SnapshotStorageStrategy.SeparateStore: { + if (_snapshotStore == null) { + throw new InvalidOperationException($"Snapshot store is required for {nameof(SnapshotStorageStrategy.SeparateStore)} strategy"); + } + + var snapshot = new Snapshot { + Revision = streamRevision, + Payload = snapshotEvent.Data + }; + await _snapshotStore.Write(streamName, snapshot, cancellationToken).NoContext(); + break; + } + } + } } diff --git a/src/Core/src/Eventuous.Application/SnapshotStrategy.cs b/src/Core/src/Eventuous.Application/SnapshotStrategy.cs new file mode 100644 index 00000000..d4bfdf46 --- /dev/null +++ b/src/Core/src/Eventuous.Application/SnapshotStrategy.cs @@ -0,0 +1,17 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous; + +/// +/// Represents a snapshot strategy that determines when and how to create snapshot events. +/// +/// The state type +sealed class SnapshotStrategy( + Func predicate, + Func produce +) where TState : State, new() { + public Func Predicate { get; } = predicate; + public Func Produce { get; } = produce; +} + diff --git a/src/Core/src/Eventuous.Persistence/AggregateStore/AggregatePersistenceExtensions.cs b/src/Core/src/Eventuous.Persistence/AggregateStore/AggregatePersistenceExtensions.cs index 1907e364..c58af799 100644 --- a/src/Core/src/Eventuous.Persistence/AggregateStore/AggregatePersistenceExtensions.cs +++ b/src/Core/src/Eventuous.Persistence/AggregateStore/AggregatePersistenceExtensions.cs @@ -109,6 +109,7 @@ public Task StoreAggregate( /// Name of the aggregate stream /// Either fail if the stream is not found, default is false /// Optional: aggregate factory registry. Default instance will be used if the argument isn't provided. + /// Optional snapshot store for SeparateStore strategy /// Cancellation token /// Aggregate type /// Aggregate state type @@ -121,13 +122,78 @@ public Task StoreAggregate( StreamName streamName, bool failIfNotFound = true, AggregateFactoryRegistry? factoryRegistry = null, + ISnapshotStore? snapshotStore = null, CancellationToken cancellationToken = default ) where TAggregate : Aggregate where TState : State, new() { var aggregate = (factoryRegistry ?? AggregateFactoryRegistry.Instance).CreateInstance(); try { - var events = await eventReader.ReadStream(streamName, StreamReadPosition.Start, failIfNotFound, cancellationToken).NoContext(); + StreamEvent[] events; + var snapshotTypes = SnapshotTypeMap.GetSnapshotTypes(); + var storageStrategy = SnapshotTypeMap.GetStorageStrategy(); + + if (snapshotTypes.Count != 0) { + switch (storageStrategy) { + case SnapshotStorageStrategy.SameStream: + events = await eventReader.ReadStreamAfterSnapshot(streamName, snapshotTypes, failIfNotFound, cancellationToken).NoContext(); + break; + + case SnapshotStorageStrategy.SeparateStream: { + var snapshotStreamName = StreamName.ForSnapshot(streamName); + var snapshotEvents = await eventReader.ReadEventsBackwards(snapshotStreamName, StreamReadPosition.End, 1, false, cancellationToken).NoContext(); + + StreamEvent? snapshotEvent = null; + + if (snapshotEvents.Length > 0) { + var candidate = snapshotEvents[0]; + if (candidate.Payload != null && snapshotTypes.Contains(candidate.Payload.GetType())) { + snapshotEvent = candidate with { + Revision = long.Parse(candidate.Metadata.GetString("revision")!) + }; + } + } + + if (snapshotEvent.HasValue) { + var eventsAfterSnapshot = await eventReader.ReadStream(streamName, new(snapshotEvent.Value.Revision + 1), failIfNotFound, cancellationToken).NoContext(); + events = [snapshotEvent.Value, ..eventsAfterSnapshot]; + } else { + events = await eventReader.ReadStream(streamName, StreamReadPosition.Start, failIfNotFound, cancellationToken).NoContext(); + } + break; + } + + case SnapshotStorageStrategy.SeparateStore: { + if (snapshotStore == null) { + throw new InvalidOperationException($"Snapshot store is required for {nameof(SnapshotStorageStrategy.SeparateStore)} strategy"); + } + + var snapshot = await snapshotStore.Read(streamName, cancellationToken).NoContext(); + + if (snapshot != null) { + var snapshotEvent = new StreamEvent( + Guid.Empty, + snapshot.Payload, + [], + string.Empty, + snapshot.Revision + ); + var eventsAfterSnapshot = await eventReader.ReadStream(streamName, new(snapshot.Revision + 1), failIfNotFound, cancellationToken).NoContext(); + events = [snapshotEvent, ..eventsAfterSnapshot]; + } else { + events = await eventReader.ReadStream(streamName, StreamReadPosition.Start, failIfNotFound, cancellationToken).NoContext(); + } + break; + } + + default: + events = await eventReader.ReadStream(streamName, StreamReadPosition.Start, failIfNotFound, cancellationToken).NoContext(); + break; + } + } else { + events = await eventReader.ReadStream(streamName, StreamReadPosition.Start, failIfNotFound, cancellationToken).NoContext(); + } + if (events.Length == 0) return aggregate; aggregate.Load(events[^1].Revision, events.Select(x => x.Payload)); } catch (StreamNotFound) when (!failIfNotFound) { @@ -148,6 +214,7 @@ public Task StoreAggregate( /// Optional: stream name map. Default instance is used when argument isn't provided. /// Either fail if the stream is not found, default is false /// Optional: aggregate factory registry. Default instance will be used if the argument isn't provided. + /// Optional snapshot store for SeparateStore strategy /// Cancellation token /// Aggregate type /// Aggregate state type @@ -162,12 +229,13 @@ public Task StoreAggregate( StreamNameMap? streamNameMap = null, bool failIfNotFound = true, AggregateFactoryRegistry? factoryRegistry = null, + ISnapshotStore? snapshotStore = null, CancellationToken cancellationToken = default ) where TAggregate : Aggregate where TState : State, new() where TId : Id { var streamName = streamNameMap?.GetStreamName(aggregateId) ?? StreamNameFactory.For(aggregateId); - var aggregate = await eventReader.LoadAggregate(streamName, failIfNotFound, factoryRegistry, cancellationToken).NoContext(); + var aggregate = await eventReader.LoadAggregate(streamName, failIfNotFound, factoryRegistry, snapshotStore, cancellationToken).NoContext(); return aggregate.WithId(aggregateId); } diff --git a/src/Core/src/Eventuous.Persistence/AggregateStore/AggregateStore.cs b/src/Core/src/Eventuous.Persistence/AggregateStore/AggregateStore.cs index aeda1b03..a91a75bd 100644 --- a/src/Core/src/Eventuous.Persistence/AggregateStore/AggregateStore.cs +++ b/src/Core/src/Eventuous.Persistence/AggregateStore/AggregateStore.cs @@ -43,7 +43,7 @@ public Task Store(StreamName streamName, [RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] [RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] public Task Load<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicParameterlessConstructor)] T, TState>(StreamName streamName, CancellationToken cancellationToken) where T : Aggregate where TState : State, new() - => _eventReader.LoadAggregate(streamName, true, _factoryRegistry, cancellationToken); + => _eventReader.LoadAggregate(streamName, true, _factoryRegistry, null, cancellationToken); /// [Obsolete("Use IEventReader.LoadAggregate instead.")] @@ -51,5 +51,5 @@ public Task Store(StreamName streamName, [RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] public Task LoadOrNew<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicParameterlessConstructor)] T, TState>(StreamName streamName, CancellationToken cancellationToken) where T : Aggregate where TState : State, new() - => _eventReader.LoadAggregate(streamName, false, _factoryRegistry, cancellationToken); + => _eventReader.LoadAggregate(streamName, false, _factoryRegistry, null, cancellationToken); } diff --git a/src/Core/src/Eventuous.Persistence/AggregateStore/AggregateStoreWithArchive.cs b/src/Core/src/Eventuous.Persistence/AggregateStore/AggregateStoreWithArchive.cs index 52ae93be..11e7ad7d 100644 --- a/src/Core/src/Eventuous.Persistence/AggregateStore/AggregateStoreWithArchive.cs +++ b/src/Core/src/Eventuous.Persistence/AggregateStore/AggregateStoreWithArchive.cs @@ -26,12 +26,12 @@ public Task Store(StreamName streamName, [RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] public Task Load<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicParameterlessConstructor)] TAggregate, TState>(StreamName streamName, CancellationToken cancellationToken) where TAggregate : Aggregate where TState : State, new() - => _tieredEventStore.LoadAggregate(streamName, true, _factoryRegistry, cancellationToken); + => _tieredEventStore.LoadAggregate(streamName, true, _factoryRegistry, null, cancellationToken); /// [RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] [RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] public Task LoadOrNew<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicParameterlessConstructor)] TAggregate, TState>(StreamName streamName, CancellationToken cancellationToken) where TAggregate : Aggregate where TState : State, new() - => _tieredEventStore.LoadAggregate(streamName, false, _factoryRegistry, cancellationToken); + => _tieredEventStore.LoadAggregate(streamName, false, _factoryRegistry, null, cancellationToken); } diff --git a/src/Core/src/Eventuous.Persistence/EventStore/StoreFunctions.cs b/src/Core/src/Eventuous.Persistence/EventStore/StoreFunctions.cs index 2234a3fb..f13f0011 100644 --- a/src/Core/src/Eventuous.Persistence/EventStore/StoreFunctions.cs +++ b/src/Core/src/Eventuous.Persistence/EventStore/StoreFunctions.cs @@ -93,4 +93,52 @@ public static async Task ReadStream( return streamEvents.ToArray(); } + + [RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] + [RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] + public static async Task ReadStreamAfterSnapshot( + this IEventReader eventReader, + StreamName streamName, + HashSet snapshotTypes, + bool failIfNotFound = true, + CancellationToken cancellationToken = default + ) { + const int pageSize = 500; + + var streamEvents = new List(); + + var position = StreamReadPosition.End; + + try { + while (true) { + var events = await eventReader.ReadEventsBackwards(streamName, position, pageSize, failIfNotFound, cancellationToken).NoContext(); + + var snapshotIndex = (int?) null; + + for (var i = 0; i < events.Length; i++) { + var payload = events[i].Payload; + if (payload is not null && snapshotTypes.Contains(payload.GetType())) { + snapshotIndex = i; + break; + } + } + + if (snapshotIndex.HasValue) { + streamEvents.AddRange(events[..(snapshotIndex.Value + 1)]); + break; + } else { + streamEvents.AddRange(events); + } + + if (events.Length < pageSize) break; + + position = new(position.Value - events.Length); + } + } catch (StreamNotFound) when (!failIfNotFound) { + return []; + } + + streamEvents.Reverse(); + return [.. streamEvents]; + } } diff --git a/src/Core/src/Eventuous.Persistence/SnapshotStore/ISnapshotStore.cs b/src/Core/src/Eventuous.Persistence/SnapshotStore/ISnapshotStore.cs new file mode 100644 index 00000000..3b2ccd99 --- /dev/null +++ b/src/Core/src/Eventuous.Persistence/SnapshotStore/ISnapshotStore.cs @@ -0,0 +1,41 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous; + +/// +/// Snapshot store for storing and retrieving snapshot events separately from the main event stream. +/// Used when is configured. +/// +[PublicAPI] +public interface ISnapshotStore { + /// + /// Reads the latest snapshot for the specified stream + /// + /// Stream name to read snapshot for + /// Cancellation token + /// Snapshot data if found, null otherwise + [RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] + [RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] + Task Read(StreamName streamName, CancellationToken cancellationToken = default); + + /// + /// Writes a snapshot to the store + /// + /// Stream name to write snapshot for + /// Snapshot data to write + /// Cancellation token + /// Task representing the write operation + [RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] + [RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] + Task Write(StreamName streamName, Snapshot snapshot, CancellationToken cancellationToken = default); + + /// + /// Deletes a snapshot for the specified stream + /// + /// Stream name to delete snapshot for + /// Cancellation token + /// Task representing the delete operation + Task Delete(StreamName streamName, CancellationToken cancellationToken = default); +} + diff --git a/src/Core/src/Eventuous.Persistence/SnapshotStore/Snapshot.cs b/src/Core/src/Eventuous.Persistence/SnapshotStore/Snapshot.cs new file mode 100644 index 00000000..0673ddf3 --- /dev/null +++ b/src/Core/src/Eventuous.Persistence/SnapshotStore/Snapshot.cs @@ -0,0 +1,20 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous; + +/// +/// Represents a snapshot event with its metadata +/// +public record Snapshot { + /// + /// Stream revision at the time of snapshot creation + /// + public long Revision { get; init; } + + /// + /// The snapshot event payload + /// + public object? Payload { get; init; } +} + diff --git a/src/Core/src/Eventuous.Persistence/StateStore/StateStoreFunctions.cs b/src/Core/src/Eventuous.Persistence/StateStore/StateStoreFunctions.cs index 75d4667f..6fba0d46 100644 --- a/src/Core/src/Eventuous.Persistence/StateStore/StateStoreFunctions.cs +++ b/src/Core/src/Eventuous.Persistence/StateStore/StateStoreFunctions.cs @@ -13,6 +13,7 @@ public static class StateStoreFunctions { /// /// Name of the stream to read from /// When set to false and there's no stream, the function will return an empty instance. + /// Optional snapshot store for SeparateStore strategy /// Cancellation token /// State object type /// Instance of containing events and folded state @@ -22,11 +23,76 @@ public static class StateStoreFunctions { public async Task> LoadState( StreamName streamName, bool failIfNotFound = true, + ISnapshotStore? snapshotStore = null, CancellationToken cancellationToken = default ) where TState : State, new() { try { - var streamEvents = await reader.ReadStream(streamName, StreamReadPosition.Start, failIfNotFound, cancellationToken).NoContext(); - var events = streamEvents.Select(x => x.Payload!).ToArray(); + StreamEvent[] streamEvents; + var snapshotTypes = SnapshotTypeMap.GetSnapshotTypes(); + var storageStrategy = SnapshotTypeMap.GetStorageStrategy(); + + if (snapshotTypes.Count != 0) { + switch (storageStrategy) { + case SnapshotStorageStrategy.SameStream: + streamEvents = await reader.ReadStreamAfterSnapshot(streamName, snapshotTypes, failIfNotFound, cancellationToken); + break; + + case SnapshotStorageStrategy.SeparateStream: { + var snapshotStreamName = StreamName.ForSnapshot(streamName); + var snapshotEvents = await reader.ReadEventsBackwards(snapshotStreamName, StreamReadPosition.End, 1, false, cancellationToken).NoContext(); + + StreamEvent? snapshotEvent = null; + + if (snapshotEvents.Length > 0) { + var candidate = snapshotEvents[0]; + if (candidate.Payload != null && snapshotTypes.Contains(candidate.Payload.GetType())) { + snapshotEvent = candidate with { + Revision = long.Parse(candidate.Metadata.GetString("revision")!) + }; + } + } + + if (snapshotEvent.HasValue) { + var eventsAfterSnapshot = await reader.ReadStream(streamName, new(snapshotEvent.Value.Revision + 1), failIfNotFound, cancellationToken).NoContext(); + streamEvents = [snapshotEvent.Value, ..eventsAfterSnapshot]; + } else { + streamEvents = await reader.ReadStream(streamName, StreamReadPosition.Start, failIfNotFound, cancellationToken).NoContext(); + } + break; + } + + case SnapshotStorageStrategy.SeparateStore: { + if (snapshotStore == null) { + throw new InvalidOperationException($"Snapshot store is required for {nameof(SnapshotStorageStrategy.SeparateStore)} strategy"); + } + + var snapshot = await snapshotStore.Read(streamName, cancellationToken).NoContext(); + + if (snapshot != null) { + var snapshotEvent = new StreamEvent( + Guid.Empty, + snapshot.Payload, + [], + string.Empty, + snapshot.Revision + ); + var eventsAfterSnapshot = await reader.ReadStream(streamName, new(snapshot.Revision + 1), failIfNotFound, cancellationToken).NoContext(); + streamEvents = [snapshotEvent, ..eventsAfterSnapshot]; + } else { + streamEvents = await reader.ReadStream(streamName, StreamReadPosition.Start, failIfNotFound, cancellationToken).NoContext(); + } + break; + } + + default: + streamEvents = await reader.ReadStream(streamName, StreamReadPosition.Start, failIfNotFound, cancellationToken).NoContext(); + break; + } + } else { + streamEvents = await reader.ReadStream(streamName, StreamReadPosition.Start, failIfNotFound, cancellationToken).NoContext(); + } + + var events = streamEvents.Select(x => x.Payload!).ToArray(); var expectedVersion = events.Length == 0 ? ExpectedStreamVersion.NoStream : new(streamEvents.Last().Revision); return (new(streamName, expectedVersion, events)); @@ -44,6 +110,7 @@ public async Task> LoadState( /// /// State identity value /// When set to false and there's no stream, the function will return an empty instance. + /// Optional snapshot store for SeparateStore strategy /// Cancellation token /// Mapper between identity and stream name /// State object type @@ -55,10 +122,11 @@ public async Task> LoadState( StreamNameMap streamNameMap, TId id, bool failIfNotFound = true, + ISnapshotStore? snapshotStore = null, CancellationToken cancellationToken = default ) where TState : State, new() where TId : Id { - var foldedStream = await reader.LoadState(streamNameMap.GetStreamName(id), failIfNotFound, cancellationToken).NoContext(); + var foldedStream = await reader.LoadState(streamNameMap.GetStreamName(id), failIfNotFound, snapshotStore, cancellationToken).NoContext(); return foldedStream with { State = foldedStream.State.WithId(id) }; } diff --git a/src/Core/src/Eventuous.Shared/SnapshotMap/SnapshotStorageStrategy.cs b/src/Core/src/Eventuous.Shared/SnapshotMap/SnapshotStorageStrategy.cs new file mode 100644 index 00000000..11e31c66 --- /dev/null +++ b/src/Core/src/Eventuous.Shared/SnapshotMap/SnapshotStorageStrategy.cs @@ -0,0 +1,24 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous; + +/// +/// Strategy for storing snapshot events +/// +public enum SnapshotStorageStrategy { + /// + /// Store snapshots in the same stream as other aggregate events + /// + SameStream, + + /// + /// Store snapshots in a separate stream + /// + SeparateStream, + + /// + /// Store snapshots in a separate storage (e.g., PostgreSQL, Redis, etc.) + /// + SeparateStore +} diff --git a/src/Core/src/Eventuous.Shared/SnapshotMap/SnapshotTypeMap.cs b/src/Core/src/Eventuous.Shared/SnapshotMap/SnapshotTypeMap.cs new file mode 100644 index 00000000..6e842b58 --- /dev/null +++ b/src/Core/src/Eventuous.Shared/SnapshotMap/SnapshotTypeMap.cs @@ -0,0 +1,36 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous; + +public static class SnapshotTypeMap { + + static readonly Dictionary> StateToSnapshots = []; + static readonly Dictionary StateToStorageStrategy = []; + + public static void Register(Type stateType, Type eventType, SnapshotStorageStrategy storageStrategy = SnapshotStorageStrategy.SameStream) { + if (StateToSnapshots.TryGetValue(stateType, out var value)) { + value.Add(eventType); + } else { + StateToSnapshots[stateType] = [eventType]; + // Store strategy for the state type when first registering snapshots for this state + StateToStorageStrategy[stateType] = storageStrategy; + } + } + + public static HashSet GetSnapshotTypes() => StateToSnapshots.GetValueOrDefault(typeof(TState), []); + + /// + /// Gets the storage strategy for the specified state type + /// + /// The state type + /// The storage strategy, or if not specified + public static SnapshotStorageStrategy GetStorageStrategy() => GetStorageStrategy(typeof(TState)); + + /// + /// Gets the storage strategy for the specified state type + /// + /// The state type + /// The storage strategy, or if not specified + public static SnapshotStorageStrategy GetStorageStrategy(Type stateType) => StateToStorageStrategy.GetValueOrDefault(stateType, SnapshotStorageStrategy.SameStream); +} diff --git a/src/Core/src/Eventuous.Shared/SnapshotMap/SnapshotsAttribute.cs b/src/Core/src/Eventuous.Shared/SnapshotMap/SnapshotsAttribute.cs new file mode 100644 index 00000000..619aa660 --- /dev/null +++ b/src/Core/src/Eventuous.Shared/SnapshotMap/SnapshotsAttribute.cs @@ -0,0 +1,18 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous; + +[AttributeUsage(AttributeTargets.Class)] +public class SnapshotsAttribute : Attribute { + public Type[] SnapshotTypes { get; } + + /// + /// Storage strategy for snapshot events + /// + public SnapshotStorageStrategy StorageStrategy { get; set; } = SnapshotStorageStrategy.SameStream; + + public SnapshotsAttribute(params Type[] snapshotTypes) { + SnapshotTypes = snapshotTypes; + } +} \ No newline at end of file diff --git a/src/Core/src/Eventuous.Shared/Store/StreamName.cs b/src/Core/src/Eventuous.Shared/Store/StreamName.cs index 2f02d9be..6efc9b48 100644 --- a/src/Core/src/Eventuous.Shared/Store/StreamName.cs +++ b/src/Core/src/Eventuous.Shared/Store/StreamName.cs @@ -30,6 +30,17 @@ public static StreamName ForState(string entityId) { public readonly string GetId() => Value[(Value.IndexOf('-') + 1)..]; public readonly string GetCategory() => Value[..Value.IndexOf('-')]; + /// + /// Creates a snapshot stream name by adding "Snapshot" suffix after the category + /// + /// Original stream name + /// Snapshot stream name in format: {Category}Snapshot-{Id} + public static StreamName ForSnapshot(StreamName originalStream) { + var category = originalStream.GetCategory(); + var id = originalStream.GetId(); + return new($"{category}Snapshot-{id}"); + } + public static implicit operator string(StreamName streamName) => streamName.Value; public override readonly string ToString() => Value; diff --git a/src/Mongo/src/Eventuous.MongoDB/Eventuous.MongoDB.csproj b/src/Mongo/src/Eventuous.MongoDB/Eventuous.MongoDB.csproj new file mode 100644 index 00000000..ee3f296b --- /dev/null +++ b/src/Mongo/src/Eventuous.MongoDB/Eventuous.MongoDB.csproj @@ -0,0 +1,21 @@ + + + + + + + + + + + + Tools\TaskExtensions.cs + + + Tools\Ensure.cs + + + + + + diff --git a/src/Mongo/src/Eventuous.MongoDB/Extensions/RegistrationExtensions.cs b/src/Mongo/src/Eventuous.MongoDB/Extensions/RegistrationExtensions.cs new file mode 100644 index 00000000..dbac37d8 --- /dev/null +++ b/src/Mongo/src/Eventuous.MongoDB/Extensions/RegistrationExtensions.cs @@ -0,0 +1,99 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.MongoDB.Snapshots; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +// ReSharper disable once CheckNamespace +namespace Microsoft.Extensions.DependencyInjection; + +/// +/// Extension methods for registering MongoDB snapshot store. +/// +public static class ServiceCollectionExtensions { + /// Service collection + extension(IServiceCollection services) { + /// + /// Adds MongoDB snapshot store to the DI container. + /// + /// MongoDB connection string + /// Database name. Default is "eventuous" + /// Collection name. Default is "snapshots" + /// Set to true to initialize indexes on startup. Default is false + /// Services collection + // ReSharper disable once UnusedMethodReturnValue.Global + public IServiceCollection AddMongoSnapshotStore( + string connectionString, + string databaseName = "eventuous", + string collectionName = "snapshots", + bool initializeIndexes = false + ) { + var options = new MongoSnapshotStoreOptions { + ConnectionString = Ensure.NotEmptyString(connectionString), + DatabaseName = databaseName, + CollectionName = collectionName, + InitializeIndexes = initializeIndexes + }; + + services.AddSingleton(options); + services.AddSingleton(sp => { + var opts = sp.GetRequiredService(); + var mongoSettings = MongoClientSettings.FromConnectionString(opts.ConnectionString); + var client = new MongoClient(mongoSettings); + var database = client.GetDatabase(opts.DatabaseName); + return new MongoSnapshotStore(database, opts, sp.GetService()); + }); + services.AddSingleton(sp => sp.GetRequiredService()); + + if (options.InitializeIndexes) { + services.AddHostedService(sp => { + var opts = sp.GetRequiredService(); + var mongoSettings = MongoClientSettings.FromConnectionString(opts.ConnectionString); + var client = new MongoClient(mongoSettings); + var database = client.GetDatabase(opts.DatabaseName); + var loggerFactory = sp.GetService(); + return new SnapshotIndexInitializer(database, opts, loggerFactory); + }); + } + + return services; + } + + /// + /// Adds MongoDB snapshot store to the DI container using configuration. + /// + /// Configuration section for MongoDB snapshot store options + /// Services collection + // ReSharper disable once UnusedMethodReturnValue.Global + public IServiceCollection AddMongoSnapshotStore(IConfiguration config) { + services.Configure(config); + services.AddSingleton(sp => sp.GetRequiredService>().Value); + + services.AddSingleton(sp => { + var opts = sp.GetRequiredService(); + var mongoSettings = MongoClientSettings.FromConnectionString(Ensure.NotEmptyString(opts.ConnectionString)); + var client = new MongoClient(mongoSettings); + var database = client.GetDatabase(opts.DatabaseName); + return new MongoSnapshotStore(database, opts, sp.GetService()); + }); + services.AddSingleton(sp => sp.GetRequiredService()); + + // Register index initializer only if InitializeIndexes is enabled + // The initializer will check the option and skip if false + services.AddHostedService(sp => { + var opts = sp.GetRequiredService(); + var mongoSettings = MongoClientSettings.FromConnectionString(Ensure.NotEmptyString(opts.ConnectionString)); + var client = new MongoClient(mongoSettings); + var database = client.GetDatabase(opts.DatabaseName); + var loggerFactory = sp.GetService(); + return new SnapshotIndexInitializer(database, opts, loggerFactory); + }); + + return services; + } + } +} + diff --git a/src/Mongo/src/Eventuous.MongoDB/Snapshots/MongoSnapshotStore.cs b/src/Mongo/src/Eventuous.MongoDB/Snapshots/MongoSnapshotStore.cs new file mode 100644 index 00000000..d819811b --- /dev/null +++ b/src/Mongo/src/Eventuous.MongoDB/Snapshots/MongoSnapshotStore.cs @@ -0,0 +1,94 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Runtime.Serialization; +using System.Text; +using static Eventuous.DeserializationResult; + +namespace Eventuous.MongoDB.Snapshots; + +/// +/// MongoDB snapshot store implementation for storing snapshots separately from event streams. +/// +public class MongoSnapshotStore : ISnapshotStore { + readonly IMongoCollection _collection; + readonly IEventSerializer _serializer; + const string ContentType = "application/json"; + + public MongoSnapshotStore( + IMongoDatabase database, + MongoSnapshotStoreOptions? options, + IEventSerializer? serializer = null + ) { + var mongoOptions = options ?? new MongoSnapshotStoreOptions(); + _collection = Ensure.NotNull(database).GetCollection(mongoOptions.CollectionName); + _serializer = serializer ?? DefaultEventSerializer.Instance; + } + + /// + [RequiresDynamicCode("Only works with AOT when using DefaultStaticEventSerializer")] + [RequiresUnreferencedCode("Only works with AOT when using DefaultStaticEventSerializer")] + public async Task Read(StreamName streamName, CancellationToken cancellationToken = default) { + var document = await _collection + .Find(Builders.Filter.Eq(x => x.StreamName, streamName.ToString())) + .SingleOrDefaultAsync(cancellationToken) + .NoContext(); + + if (document == null) { + return null; + } + + var deserialized = _serializer.DeserializeEvent( + Encoding.UTF8.GetBytes(document.JsonData), + document.EventType, + ContentType + ); + + return deserialized switch { + SuccessfullyDeserialized success => new Snapshot { + Revision = document.Revision, + Payload = success.Payload + }, + FailedToDeserialize failed => throw new SerializationException($"Can't deserialize snapshot {document.EventType}: {failed.Error}"), + _ => throw new("Unknown deserialization result") + }; + } + + /// + [RequiresDynamicCode("Only works with AOT when using DefaultStaticEventSerializer")] + [RequiresUnreferencedCode("Only works with AOT when using DefaultStaticEventSerializer")] + public async Task Write(StreamName streamName, Snapshot snapshot, CancellationToken cancellationToken = default) { + if (snapshot.Payload == null) { + throw new ArgumentException("Snapshot payload cannot be null", nameof(snapshot)); + } + + var serialized = _serializer.SerializeEvent(snapshot.Payload); + var jsonData = Encoding.UTF8.GetString(serialized.Payload); + + var document = new SnapshotDocument { + StreamName = streamName.ToString(), + Revision = snapshot.Revision, + EventType = serialized.EventType, + JsonData = jsonData, + Created = DateTime.UtcNow + }; + + await _collection.ReplaceOneAsync( + Builders.Filter.Eq(x => x.StreamName, streamName.ToString()), + document, + new ReplaceOptions { IsUpsert = true }, + cancellationToken + ) + .NoContext(); + } + + /// + public async Task Delete(StreamName streamName, CancellationToken cancellationToken = default) { + await _collection.DeleteOneAsync( + Builders.Filter.Eq(x => x.StreamName, streamName.ToString()), + cancellationToken + ) + .NoContext(); + } +} + diff --git a/src/Mongo/src/Eventuous.MongoDB/Snapshots/MongoSnapshotStoreOptions.cs b/src/Mongo/src/Eventuous.MongoDB/Snapshots/MongoSnapshotStoreOptions.cs new file mode 100644 index 00000000..fe48ccc5 --- /dev/null +++ b/src/Mongo/src/Eventuous.MongoDB/Snapshots/MongoSnapshotStoreOptions.cs @@ -0,0 +1,33 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +// ReSharper disable PropertyCanBeMadeInitOnly.Global + +namespace Eventuous.MongoDB.Snapshots; + +/// +/// MongoDB snapshot store options. +/// +[PublicAPI] +public class MongoSnapshotStoreOptions { + /// + /// MongoDB connection string. + /// + public string ConnectionString { get; set; } = null!; + + /// + /// Database name. Default is "eventuous". + /// + public string DatabaseName { get; set; } = "eventuous"; + + /// + /// Collection name for snapshots. Default is "snapshots". + /// + public string CollectionName { get; set; } = "snapshots"; + + /// + /// Set to true to initialize indexes on startup. Default is false. + /// + public bool InitializeIndexes { get; set; } +} + diff --git a/src/Mongo/src/Eventuous.MongoDB/Snapshots/SnapshotDocument.cs b/src/Mongo/src/Eventuous.MongoDB/Snapshots/SnapshotDocument.cs new file mode 100644 index 00000000..efcd38f7 --- /dev/null +++ b/src/Mongo/src/Eventuous.MongoDB/Snapshots/SnapshotDocument.cs @@ -0,0 +1,19 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using MongoDB.Bson.Serialization.Attributes; + +namespace Eventuous.MongoDB.Snapshots; + +/// +/// MongoDB document representation of a snapshot. +/// +record SnapshotDocument { + [BsonId] + public string StreamName { get; init; } = null!; + public long Revision { get; init; } + public string EventType { get; init; } = null!; + public string JsonData { get; init; } = null!; + public DateTime Created { get; init; } +} + diff --git a/src/Mongo/src/Eventuous.MongoDB/Snapshots/SnapshotIndexInitializer.cs b/src/Mongo/src/Eventuous.MongoDB/Snapshots/SnapshotIndexInitializer.cs new file mode 100644 index 00000000..d1d64fd7 --- /dev/null +++ b/src/Mongo/src/Eventuous.MongoDB/Snapshots/SnapshotIndexInitializer.cs @@ -0,0 +1,57 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Eventuous.MongoDB.Snapshots; + +/// +/// Initializes MongoDB indexes for snapshot collection on startup. +/// +public class SnapshotIndexInitializer : IHostedService { + readonly IMongoDatabase _database; + readonly MongoSnapshotStoreOptions _options; + readonly ILogger? _logger; + + public SnapshotIndexInitializer( + IMongoDatabase database, + MongoSnapshotStoreOptions options, + ILoggerFactory? loggerFactory = null + ) { + _database = Ensure.NotNull(database); + _options = Ensure.NotNull(options); + _logger = loggerFactory?.CreateLogger(); + } + + public async Task StartAsync(CancellationToken cancellationToken) { + if (!_options.InitializeIndexes) return; + + _logger?.LogInformation("Initializing snapshot indexes for collection {CollectionName}", _options.CollectionName); + + var collection = _database.GetCollection(_options.CollectionName); + + try { + // Note: StreamName is marked as [BsonId], so it's automatically indexed as _id + // No need to create a separate index on StreamName + + // Create index on Created for potential queries/filtering + var createdIndex = new CreateIndexModel( + Builders.IndexKeys.Ascending(x => x.Created), + new CreateIndexOptions { Name = "IX_Snapshots_Created" } + ); + + await collection.Indexes.CreateOneAsync(createdIndex, cancellationToken: cancellationToken).NoContext(); + + _logger?.LogInformation("Snapshot indexes initialized successfully"); + } catch (MongoCommandException ex) when (ex.CodeName == "IndexOptionsConflict" || ex.Message.Contains("already exists")) { + _logger?.LogWarning("Index already exists, skipping creation"); + } catch (Exception ex) { + _logger?.LogError(ex, "Failed to initialize snapshot indexes"); + throw; + } + } + + public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask; +} + diff --git a/src/Postgres/src/Eventuous.Postgresql/Eventuous.Postgresql.csproj b/src/Postgres/src/Eventuous.Postgresql/Eventuous.Postgresql.csproj index 7d3ed25c..f4e99a06 100644 --- a/src/Postgres/src/Eventuous.Postgresql/Eventuous.Postgresql.csproj +++ b/src/Postgres/src/Eventuous.Postgresql/Eventuous.Postgresql.csproj @@ -1,25 +1,26 @@ - - - - + + + + - - - + + + - - - - - - - - - + + + + + + + + + + @@ -30,11 +31,11 @@ - + - - - + + + diff --git a/src/Postgres/src/Eventuous.Postgresql/Extensions/RegistrationExtensions.cs b/src/Postgres/src/Eventuous.Postgresql/Extensions/RegistrationExtensions.cs index 7909e50f..afdbf3ee 100644 --- a/src/Postgres/src/Eventuous.Postgresql/Extensions/RegistrationExtensions.cs +++ b/src/Postgres/src/Eventuous.Postgresql/Extensions/RegistrationExtensions.cs @@ -3,6 +3,7 @@ using System.Data.Common; using Eventuous.Postgresql; +using Eventuous.Postgresql.Snapshots; using Eventuous.Postgresql.Subscriptions; using Eventuous.Sql.Base; using Microsoft.Extensions.Configuration; @@ -135,5 +136,109 @@ public IServiceCollection AddPostgresCheckpointStore() { } ); } + + /// + /// Adds PostgreSQL snapshot store and the necessary schema to the DI container. + /// + /// Connection string + /// Schema name + /// Set to true if you want the schema to be created on startup + /// Optional: function to configure the data source builder + /// Optional: lifetime of the connection, default is transient + /// Optional: lifetime of the data source, default is singleton + /// Services collection + // ReSharper disable once UnusedMethodReturnValue.Global + public IServiceCollection AddPostgresSnapshotStore( + string connectionString, + string schema = SnapshotSchema.DefaultSchema, + bool initializeDatabase = false, + Action? configureBuilder = null, + ServiceLifetime connectionLifetime = ServiceLifetime.Transient, + ServiceLifetime dataSourceLifetime = ServiceLifetime.Singleton + ) { + var options = new PostgresSnapshotStoreOptions { + Schema = schema, + ConnectionString = connectionString, + InitializeDatabase = initializeDatabase + }; + + services.AddNpgsqlSnapshotDataSourceCore( + _ => connectionString, + configureBuilder, + connectionLifetime, + dataSourceLifetime + ); + services.AddSingleton(options); + services.AddSingleton(sp => { + var snapshotDataSource = sp.GetRequiredService(); + return new PostgresSnapshotStore(snapshotDataSource.DataSource, options, sp.GetService()); + }); + services.AddSingleton(sp => sp.GetRequiredService()); + services.AddHostedService(); + + return services; + } + + /// + /// Adds PostgreSQL snapshot store and the necessary schema to the DI container. + /// + /// Configuration section for PostgreSQL snapshot store options + /// Optional: function to configure the data source builder + /// Optional: lifetime of the connection, default is transient + /// Optional: lifetime of the data source, default is singleton + /// Services collection + // ReSharper disable once UnusedMethodReturnValue.Global + public IServiceCollection AddPostgresSnapshotStore( + IConfiguration config, + Action? configureBuilder = null, + ServiceLifetime connectionLifetime = ServiceLifetime.Transient, + ServiceLifetime dataSourceLifetime = ServiceLifetime.Singleton + ) { + services.Configure(config); + services.AddSingleton(sp => sp.GetRequiredService>().Value); + + services.AddNpgsqlSnapshotDataSourceCore( + sp => Ensure.NotEmptyString(sp.GetRequiredService().ConnectionString), + configureBuilder, + connectionLifetime, + dataSourceLifetime + ); + + services.AddSingleton(sp => { + var options = sp.GetRequiredService(); + var snapshotDataSource = sp.GetRequiredService(); + return new PostgresSnapshotStore(snapshotDataSource.DataSource, options, sp.GetService()); + }); + services.AddSingleton(sp => sp.GetRequiredService()); + services.AddHostedService(); + + return services; + } + + void AddNpgsqlSnapshotDataSourceCore( + Func getConnectionString, + Action? configureDataSource, + ServiceLifetime connectionLifetime, + ServiceLifetime dataSourceLifetime + ) { + // Register snapshot-specific data source as singleton + services.TryAdd( + new ServiceDescriptor( + typeof(PostgresSnapshotDataSource), + sp => { + var dataSourceBuilder = new NpgsqlDataSourceBuilder(getConnectionString(sp)); + dataSourceBuilder.UseLoggerFactory(sp.GetService()); + configureDataSource?.Invoke(sp, dataSourceBuilder); + return new PostgresSnapshotDataSource(dataSourceBuilder.Build()); + }, + dataSourceLifetime + ) + ); + } + } + + // Wrapper to distinguish snapshot data source from event store data source + internal class PostgresSnapshotDataSource(NpgsqlDataSource dataSource) { + public NpgsqlDataSource DataSource { get; } = dataSource; } } diff --git a/src/Postgres/src/Eventuous.Postgresql/Schema.cs b/src/Postgres/src/Eventuous.Postgresql/Schema.cs index bed75ac0..44c04f3b 100644 --- a/src/Postgres/src/Eventuous.Postgresql/Schema.cs +++ b/src/Postgres/src/Eventuous.Postgresql/Schema.cs @@ -35,7 +35,9 @@ public class Schema(string schema = Schema.DefaultSchema) { public async Task CreateSchema(NpgsqlDataSource dataSource, ILogger? log, CancellationToken cancellationToken = default) { log?.LogInformation("Creating schema {Schema}", schema); - var names = Assembly.GetManifestResourceNames().Where(x => x.EndsWith(".sql")).OrderBy(x => x); + var names = Assembly.GetManifestResourceNames() + .Where(x => x.EndsWith(".sql") && !x.Contains("Snapshots.Scripts")) + .OrderBy(x => x); await using var connection = await dataSource.OpenConnectionAsync(cancellationToken).NoContext(); diff --git a/src/Postgres/src/Eventuous.Postgresql/Snapshots/PostgresSnapshotStore.cs b/src/Postgres/src/Eventuous.Postgresql/Snapshots/PostgresSnapshotStore.cs new file mode 100644 index 00000000..2bcc75cf --- /dev/null +++ b/src/Postgres/src/Eventuous.Postgresql/Snapshots/PostgresSnapshotStore.cs @@ -0,0 +1,95 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Runtime.Serialization; +using System.Text; +using Eventuous.Postgresql.Extensions; +using static Eventuous.DeserializationResult; + +namespace Eventuous.Postgresql.Snapshots; + +/// +/// PostgreSQL snapshot store implementation for storing snapshots separately from event streams. +/// +public class PostgresSnapshotStore : ISnapshotStore { + readonly NpgsqlDataSource _dataSource; + readonly SnapshotSchema _schema; + readonly IEventSerializer _serializer; + const string ContentType = "application/json"; + + public PostgresSnapshotStore( + NpgsqlDataSource dataSource, + PostgresSnapshotStoreOptions? options, + IEventSerializer? serializer = null + ) { + var pgOptions = options ?? new PostgresSnapshotStoreOptions(); + _schema = new SnapshotSchema(pgOptions.Schema); + _dataSource = Ensure.NotNull(dataSource, "Data Source"); + _serializer = serializer ?? DefaultEventSerializer.Instance; + } + + /// + [RequiresDynamicCode("Only works with AOT when using DefaultStaticEventSerializer")] + [RequiresUnreferencedCode("Only works with AOT when using DefaultStaticEventSerializer")] + public async Task Read(StreamName streamName, CancellationToken cancellationToken = default) { + await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken).NoContext(); + await using var cmd = connection.GetCommand(_schema.ReadSnapshot) + .Add("stream_name", NpgsqlDbType.Varchar, streamName.ToString()); + + await using var reader = await cmd.ExecuteReaderAsync(cancellationToken).NoContext(); + + if (!await reader.ReadAsync(cancellationToken).NoContext()) { + return null; + } + + var revision = reader.GetInt64(0); + var eventType = reader.GetString(1); + var jsonData = reader.GetString(2); + + var deserialized = _serializer.DeserializeEvent( + Encoding.UTF8.GetBytes(jsonData), + eventType, + ContentType + ); + + return deserialized switch { + SuccessfullyDeserialized success => new Snapshot { + Revision = revision, + Payload = success.Payload + }, + FailedToDeserialize failed => throw new SerializationException($"Can't deserialize snapshot {eventType}: {failed.Error}"), + _ => throw new("Unknown deserialization result") + }; + } + + /// + [RequiresDynamicCode("Only works with AOT when using DefaultStaticEventSerializer")] + [RequiresUnreferencedCode("Only works with AOT when using DefaultStaticEventSerializer")] + public async Task Write(StreamName streamName, Snapshot snapshot, CancellationToken cancellationToken = default) { + if (snapshot.Payload == null) { + throw new ArgumentException("Snapshot payload cannot be null", nameof(snapshot)); + } + + var serialized = _serializer.SerializeEvent(snapshot.Payload); + var jsonData = Encoding.UTF8.GetString(serialized.Payload); + + await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken).NoContext(); + await using var cmd = connection.GetCommand(_schema.WriteSnapshot) + .Add("stream_name", NpgsqlDbType.Varchar, streamName.ToString()) + .Add("revision", NpgsqlDbType.Bigint, snapshot.Revision) + .Add("event_type", NpgsqlDbType.Varchar, serialized.EventType) + .Add("json_data", NpgsqlDbType.Jsonb, jsonData); + + await cmd.ExecuteNonQueryAsync(cancellationToken).NoContext(); + } + + /// + public async Task Delete(StreamName streamName, CancellationToken cancellationToken = default) { + await using var connection = await _dataSource.OpenConnectionAsync(cancellationToken).NoContext(); + await using var cmd = connection.GetCommand(_schema.DeleteSnapshot) + .Add("stream_name", NpgsqlDbType.Varchar, streamName.ToString()); + + await cmd.ExecuteNonQueryAsync(cancellationToken).NoContext(); + } +} + diff --git a/src/Postgres/src/Eventuous.Postgresql/Snapshots/PostgresSnapshotStoreOptions.cs b/src/Postgres/src/Eventuous.Postgresql/Snapshots/PostgresSnapshotStoreOptions.cs new file mode 100644 index 00000000..55eee872 --- /dev/null +++ b/src/Postgres/src/Eventuous.Postgresql/Snapshots/PostgresSnapshotStoreOptions.cs @@ -0,0 +1,26 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +// ReSharper disable PropertyCanBeMadeInitOnly.Global + +namespace Eventuous.Postgresql.Snapshots; + +public class PostgresSnapshotStoreOptions(string schema) { + public PostgresSnapshotStoreOptions() : this(SnapshotSchema.DefaultSchema) { } + + /// + /// Override the default schema name. + /// + public string Schema { get; set; } = schema; + + /// + /// PostgreSQL connection string. + /// + public string ConnectionString { get; set; } = null!; + + /// + /// Set to true to initialize the database schema on startup. Default is false. + /// + public bool InitializeDatabase { get; set; } +} + diff --git a/src/Postgres/src/Eventuous.Postgresql/Snapshots/Scripts/1_SnapshotSchema.sql b/src/Postgres/src/Eventuous.Postgresql/Snapshots/Scripts/1_SnapshotSchema.sql new file mode 100644 index 00000000..18b21da5 --- /dev/null +++ b/src/Postgres/src/Eventuous.Postgresql/Snapshots/Scripts/1_SnapshotSchema.sql @@ -0,0 +1,14 @@ +create schema if not exists __schema__; + +create table if not exists __schema__.snapshots ( + stream_name varchar(1000) not null, + revision bigint not null, + event_type varchar(128) not null, + json_data jsonb not null, + created timestamp not null default (now() at time zone 'utc'), + constraint pk_snapshots primary key (stream_name), + constraint ck_revision_gte_zero check (revision >= 0) +); + +create index if not exists snapshots_stream_name_idx on __schema__.snapshots (stream_name); + diff --git a/src/Postgres/src/Eventuous.Postgresql/Snapshots/SnapshotSchema.cs b/src/Postgres/src/Eventuous.Postgresql/Snapshots/SnapshotSchema.cs new file mode 100644 index 00000000..58affba5 --- /dev/null +++ b/src/Postgres/src/Eventuous.Postgresql/Snapshots/SnapshotSchema.cs @@ -0,0 +1,62 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Reflection; +using Microsoft.Extensions.Logging; + +namespace Eventuous.Postgresql.Snapshots; + +/// +/// Instantiate a new SnapshotSchema object with the specified schema name. The default schema name is "eventuous" +/// +/// +public class SnapshotSchema(string schema = SnapshotSchema.DefaultSchema) { + public const string DefaultSchema = "eventuous"; + + public string Name => schema; + + public string ReadSnapshot => $"SELECT revision, event_type, json_data FROM {schema}.snapshots WHERE stream_name = @stream_name"; + public string WriteSnapshot => $"INSERT INTO {schema}.snapshots (stream_name, revision, event_type, json_data) VALUES (@stream_name, @revision, @event_type, @json_data) ON CONFLICT (stream_name) DO UPDATE SET revision = @revision, event_type = @event_type, json_data = @json_data, created = now() at time zone 'utc'"; + public string DeleteSnapshot => $"DELETE FROM {schema}.snapshots WHERE stream_name = @stream_name"; + + static readonly Assembly Assembly = typeof(SnapshotSchema).Assembly; + + public async Task CreateSchema(NpgsqlDataSource dataSource, ILogger? log, CancellationToken cancellationToken = default) { + log?.LogInformation("Creating snapshot schema {Schema}", schema); + const string scriptName = "Eventuous.Postgresql.Snapshots.Scripts.1_SnapshotSchema.sql"; + + await using var connection = await dataSource.OpenConnectionAsync(cancellationToken).NoContext(); + + var transaction = await connection.BeginTransactionAsync(cancellationToken).NoContext(); + + try { + log?.LogInformation("Executing {Script}", scriptName); + await using var stream = Assembly.GetManifestResourceStream(scriptName); + if (stream == null) { + throw new InvalidOperationException($"Embedded resource {scriptName} not found"); + } + + using var reader = new StreamReader(stream); + +#if NET7_0_OR_GREATER + var script = await reader.ReadToEndAsync(cancellationToken).NoContext(); +#else + var script = await reader.ReadToEndAsync().NoContext(); +#endif + var cmdScript = script.Replace("__schema__", schema); + + await using var cmd = new NpgsqlCommand(cmdScript, connection, transaction); + + await cmd.ExecuteNonQueryAsync(cancellationToken).NoContext(); + } catch (Exception e) { + log?.LogCritical(e, "Unable to initialize the snapshot database schema"); + await transaction.RollbackAsync(cancellationToken); + + throw; + } + + await transaction.CommitAsync(cancellationToken).NoContext(); + log?.LogInformation("Snapshot database schema initialized"); + } +} + diff --git a/src/Postgres/src/Eventuous.Postgresql/Snapshots/SnapshotSchemaInitializer.cs b/src/Postgres/src/Eventuous.Postgresql/Snapshots/SnapshotSchemaInitializer.cs new file mode 100644 index 00000000..ee8c9689 --- /dev/null +++ b/src/Postgres/src/Eventuous.Postgresql/Snapshots/SnapshotSchemaInitializer.cs @@ -0,0 +1,19 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Eventuous.Postgresql.Snapshots; + +public class SnapshotSchemaInitializer(PostgresSnapshotStoreOptions options, ILoggerFactory? loggerFactory = null) : IHostedService { + public Task StartAsync(CancellationToken cancellationToken) { + if (!options.InitializeDatabase) return Task.CompletedTask; + var dataSource = new NpgsqlDataSourceBuilder(options.ConnectionString).Build(); + var schema = new SnapshotSchema(options.Schema); + return schema.CreateSchema(dataSource, loggerFactory?.CreateLogger(), cancellationToken); + } + + public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask; +} + diff --git a/src/Redis/src/Eventuous.Redis/Extensions/RegistrationExtensions.cs b/src/Redis/src/Eventuous.Redis/Extensions/RegistrationExtensions.cs new file mode 100644 index 00000000..df0cdbc5 --- /dev/null +++ b/src/Redis/src/Eventuous.Redis/Extensions/RegistrationExtensions.cs @@ -0,0 +1,212 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.Redis; +using Eventuous.Redis.Snapshots; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Options; +using StackExchange.Redis; + +// ReSharper disable UnusedMethodReturnValue.Global +// ReSharper disable once CheckNamespace +namespace Microsoft.Extensions.DependencyInjection; + +public static class ServiceCollectionExtensions { + /// Service collection + extension(IServiceCollection services) { + /// + /// Adds Redis snapshot store to the DI container. + /// + /// Function to get Redis database instance + /// Snapshot store options + /// Services collection + // ReSharper disable once UnusedMethodReturnValue.Global + public IServiceCollection AddRedisSnapshotStore( + GetRedisDatabase getDatabase, + RedisSnapshotStoreOptions? options + ) { + var snapshotOptions = options ?? new RedisSnapshotStoreOptions(); + services.AddSingleton(snapshotOptions); + services.AddSingleton(sp => { + var snapshotOptions = sp.GetRequiredService(); + return new RedisSnapshotStore(getDatabase, snapshotOptions, sp.GetService()); + }); + services.AddSingleton(sp => sp.GetRequiredService()); + + return services; + } + + /// + /// Adds Redis snapshot store to the DI container with default options. + /// + /// Function to get Redis database instance + /// Services collection + // ReSharper disable once UnusedMethodReturnValue.Global + public IServiceCollection AddRedisSnapshotStore(GetRedisDatabase getDatabase) { + return services.AddRedisSnapshotStore(getDatabase, (RedisSnapshotStoreOptions?)null); + } + + /// + /// Adds Redis snapshot store to the DI container using connection multiplexer. + /// + /// Redis connection multiplexer + /// Database number (default: 0) + /// Snapshot store options + /// Services collection + // ReSharper disable once UnusedMethodReturnValue.Global + public IServiceCollection AddRedisSnapshotStore( + IConnectionMultiplexer connectionMultiplexer, + int database, + RedisSnapshotStoreOptions? options + ) { + GetRedisDatabase getDatabase = () => connectionMultiplexer.GetDatabase(database); + return services.AddRedisSnapshotStore(getDatabase, options); + } + + /// + /// Adds Redis snapshot store to the DI container using connection multiplexer with default options. + /// + /// Redis connection multiplexer + /// Database number (default: 0) + /// Services collection + // ReSharper disable once UnusedMethodReturnValue.Global + public IServiceCollection AddRedisSnapshotStore( + IConnectionMultiplexer connectionMultiplexer, + int database = 0 + ) { + return services.AddRedisSnapshotStore(connectionMultiplexer, database, null); + } + + /// + /// Adds Redis snapshot store to the DI container using configuration. + /// + /// Function to get Redis database instance + /// Configuration section for Redis snapshot store options + /// Services collection + // ReSharper disable once UnusedMethodReturnValue.Global + public IServiceCollection AddRedisSnapshotStore( + GetRedisDatabase getDatabase, + IConfiguration config + ) { + var options = new RedisSnapshotStoreOptions(); + if (config["KeyPrefix"] != null) { + options.KeyPrefix = config["KeyPrefix"]; + } + services.AddSingleton(options); + + services.AddSingleton(sp => { + var snapshotOptions = sp.GetRequiredService(); + return new RedisSnapshotStore(getDatabase, snapshotOptions, sp.GetService()); + }); + services.AddSingleton(sp => sp.GetRequiredService()); + + return services; + } + + /// + /// Adds Redis snapshot store to the DI container using connection multiplexer and configuration. + /// + /// Redis connection multiplexer + /// Configuration section for Redis snapshot store options + /// Database number (default: 0) + /// Services collection + // ReSharper disable once UnusedMethodReturnValue.Global + public IServiceCollection AddRedisSnapshotStore( + IConnectionMultiplexer connectionMultiplexer, + IConfiguration config, + int database = 0 + ) { + var options = new RedisSnapshotStoreOptions(); + if (config["KeyPrefix"] != null) { + options.KeyPrefix = config["KeyPrefix"]; + } + services.AddSingleton(options); + + GetRedisDatabase getDatabase = () => connectionMultiplexer.GetDatabase(database); + services.AddSingleton(sp => { + var snapshotOptions = sp.GetRequiredService(); + return new RedisSnapshotStore(getDatabase, snapshotOptions, sp.GetService()); + }); + services.AddSingleton(sp => sp.GetRequiredService()); + + return services; + } + + /// + /// Adds Redis snapshot store to the DI container using connection string. + /// + /// Redis connection string + /// Database number (default: 0) + /// Snapshot store options + /// Services collection + // ReSharper disable once UnusedMethodReturnValue.Global + public IServiceCollection AddRedisSnapshotStore( + string connectionString, + int database, + RedisSnapshotStoreOptions? options + ) { + services.AddSingleton(_ => ConnectionMultiplexer.Connect(connectionString)); + + var snapshotOptions = options ?? new RedisSnapshotStoreOptions(); + services.AddSingleton(snapshotOptions); + + services.AddSingleton(sp => { + var snapshotOptions = sp.GetRequiredService(); + var muxer = sp.GetRequiredService(); + GetRedisDatabase getDatabase = () => muxer.GetDatabase(database); + return new RedisSnapshotStore(getDatabase, snapshotOptions, sp.GetService()); + }); + services.AddSingleton(sp => sp.GetRequiredService()); + + return services; + } + + /// + /// Adds Redis snapshot store to the DI container using connection string with default options. + /// + /// Redis connection string + /// Database number (default: 0) + /// Services collection + // ReSharper disable once UnusedMethodReturnValue.Global + public IServiceCollection AddRedisSnapshotStore( + string connectionString, + int database = 0 + ) { + return services.AddRedisSnapshotStore(connectionString, database, null); + } + + /// + /// Adds Redis snapshot store to the DI container using connection string and configuration. + /// + /// Redis connection string + /// Configuration section for Redis snapshot store options + /// Database number (default: 0) + /// Services collection + // ReSharper disable once UnusedMethodReturnValue.Global + public IServiceCollection AddRedisSnapshotStore( + string connectionString, + IConfiguration config, + int database = 0 + ) { + services.AddSingleton(_ => ConnectionMultiplexer.Connect(connectionString)); + + var options = new RedisSnapshotStoreOptions(); + if (config["KeyPrefix"] != null) { + options.KeyPrefix = config["KeyPrefix"]; + } + services.AddSingleton(options); + + services.AddSingleton(sp => { + var snapshotOptions = sp.GetRequiredService(); + var muxer = sp.GetRequiredService(); + GetRedisDatabase getDatabase = () => muxer.GetDatabase(database); + return new RedisSnapshotStore(getDatabase, snapshotOptions, sp.GetService()); + }); + services.AddSingleton(sp => sp.GetRequiredService()); + + return services; + } + } +} + diff --git a/src/Redis/src/Eventuous.Redis/RedisKeys.cs b/src/Redis/src/Eventuous.Redis/RedisKeys.cs index 9ffec171..291c394b 100644 --- a/src/Redis/src/Eventuous.Redis/RedisKeys.cs +++ b/src/Redis/src/Eventuous.Redis/RedisKeys.cs @@ -11,4 +11,8 @@ public static class EventuousRedisKeys { public const string Position = "position"; public const string MessageId = "message_id"; public const string MessageType = "message_type"; + + // Snapshot keys + public const string Revision = "revision"; + public const string EventType = "event_type"; } diff --git a/src/Redis/src/Eventuous.Redis/Snapshots/RedisSnapshotStore.cs b/src/Redis/src/Eventuous.Redis/Snapshots/RedisSnapshotStore.cs new file mode 100644 index 00000000..7cfc7b3d --- /dev/null +++ b/src/Redis/src/Eventuous.Redis/Snapshots/RedisSnapshotStore.cs @@ -0,0 +1,105 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Runtime.Serialization; +using System.Text; +using Eventuous; +using static Eventuous.DeserializationResult; +using static Eventuous.Redis.EventuousRedisKeys; + +namespace Eventuous.Redis.Snapshots; + +using Tools; + +/// +/// Redis snapshot store implementation for storing snapshots separately from event streams. +/// +public class RedisSnapshotStore : ISnapshotStore { + readonly GetRedisDatabase _getDatabase; + readonly RedisSnapshotStoreOptions _options; + readonly IEventSerializer _serializer; + const string ContentType = "application/json"; + + public RedisSnapshotStore( + GetRedisDatabase getDatabase, + RedisSnapshotStoreOptions? options = null, + IEventSerializer? serializer = null + ) { + _getDatabase = Ensure.NotNull(getDatabase, "Connection factory"); + _options = options ?? new RedisSnapshotStoreOptions(); + _serializer = serializer ?? DefaultEventSerializer.Instance; + } + + string GetSnapshotKey(StreamName streamName) => $"{_options.KeyPrefix}:{streamName}"; + + /// + [RequiresDynamicCode("Only works with AOT when using DefaultStaticEventSerializer")] + [RequiresUnreferencedCode("Only works with AOT when using DefaultStaticEventSerializer")] + public async Task Read(StreamName streamName, CancellationToken cancellationToken = default) { + var database = _getDatabase(); + var key = GetSnapshotKey(streamName); + + var revisionValue = await database.HashGetAsync(key, Revision).NoContext(); + var eventTypeValue = await database.HashGetAsync(key, EventType).NoContext(); + var jsonDataValue = await database.HashGetAsync(key, JsonData).NoContext(); + + if (revisionValue.IsNull || eventTypeValue.IsNull || jsonDataValue.IsNull) { + return null; + } + + var revision = revisionValue.ToString(); + var eventType = eventTypeValue.ToString(); + var jsonData = jsonDataValue.ToString(); + + if (string.IsNullOrEmpty(revision) || string.IsNullOrEmpty(eventType) || string.IsNullOrEmpty(jsonData)) { + return null; + } + + var deserialized = _serializer.DeserializeEvent( + Encoding.UTF8.GetBytes(jsonData), + eventType, + ContentType + ); + + return deserialized switch { + SuccessfullyDeserialized success => new Snapshot { + Revision = long.Parse(revision), + Payload = success.Payload + }, + FailedToDeserialize failed => throw new SerializationException($"Can't deserialize snapshot {eventType}: {failed.Error}"), + _ => throw new("Unknown deserialization result") + }; + } + + /// + [RequiresDynamicCode("Only works with AOT when using DefaultStaticEventSerializer")] + [RequiresUnreferencedCode("Only works with AOT when using DefaultStaticEventSerializer")] + public async Task Write(StreamName streamName, Snapshot snapshot, CancellationToken cancellationToken = default) { + if (snapshot.Payload == null) { + throw new ArgumentException("Snapshot payload cannot be null", nameof(snapshot)); + } + + var serialized = _serializer.SerializeEvent(snapshot.Payload); + var jsonData = Encoding.UTF8.GetString(serialized.Payload); + var key = GetSnapshotKey(streamName); + var database = _getDatabase(); + + var hashFields = new HashEntry[] { + new(Revision, snapshot.Revision.ToString()), + new(EventType, serialized.EventType), + new(JsonData, jsonData), + new(Created, DateTime.UtcNow.ToString("O")) + }; + + await database.HashSetAsync(key, hashFields).NoContext(); + } + + /// + public async Task Delete(StreamName streamName, CancellationToken cancellationToken = default) { + var database = _getDatabase(); + var key = GetSnapshotKey(streamName); + + await database.KeyDeleteAsync(key).NoContext(); + } +} + diff --git a/src/Redis/src/Eventuous.Redis/Snapshots/RedisSnapshotStoreOptions.cs b/src/Redis/src/Eventuous.Redis/Snapshots/RedisSnapshotStoreOptions.cs new file mode 100644 index 00000000..18bbd3df --- /dev/null +++ b/src/Redis/src/Eventuous.Redis/Snapshots/RedisSnapshotStoreOptions.cs @@ -0,0 +1,14 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +// ReSharper disable PropertyCanBeMadeInitOnly.Global + +namespace Eventuous.Redis.Snapshots; + +public class RedisSnapshotStoreOptions { + /// + /// Redis key prefix for snapshots. Default is "snapshot". + /// + public string KeyPrefix { get; set; } = "snapshot"; +} + diff --git a/src/SqlServer/src/Eventuous.SqlServer/Eventuous.SqlServer.csproj b/src/SqlServer/src/Eventuous.SqlServer/Eventuous.SqlServer.csproj index 8ba83d74..92bbc07c 100644 --- a/src/SqlServer/src/Eventuous.SqlServer/Eventuous.SqlServer.csproj +++ b/src/SqlServer/src/Eventuous.SqlServer/Eventuous.SqlServer.csproj @@ -19,6 +19,7 @@ + diff --git a/src/SqlServer/src/Eventuous.SqlServer/Extensions/RegistrationExtensions.cs b/src/SqlServer/src/Eventuous.SqlServer/Extensions/RegistrationExtensions.cs index f65f0420..2b989be4 100644 --- a/src/SqlServer/src/Eventuous.SqlServer/Extensions/RegistrationExtensions.cs +++ b/src/SqlServer/src/Eventuous.SqlServer/Extensions/RegistrationExtensions.cs @@ -3,6 +3,7 @@ using Eventuous.SqlServer; using Eventuous.SqlServer.Projections; +using Eventuous.SqlServer.Snapshots; using Eventuous.SqlServer.Subscriptions; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection.Extensions; @@ -84,5 +85,55 @@ public IServiceCollection AddSqlServerCheckpointStore() return new(Ensure.NotNull(connectionString), schema, loggerFactory); } ); + + /// + /// Adds SQL Server snapshot store and the necessary schema to the DI container. + /// + /// Connection string + /// Schema name + /// Set to true if you want the schema to be created on startup + /// Services collection + // ReSharper disable once UnusedMethodReturnValue.Global + public IServiceCollection AddSqlServerSnapshotStore( + string connectionString, + string schema = SnapshotSchema.DefaultSchema, + bool initializeDatabase = false + ) { + var options = new SqlServerSnapshotStoreOptions { + Schema = schema, + ConnectionString = connectionString, + InitializeDatabase = initializeDatabase + }; + + services.AddSingleton(options); + services.AddSingleton(sp => { + var snapshotOptions = sp.GetRequiredService(); + return new SqlServerSnapshotStore(snapshotOptions.ConnectionString, snapshotOptions, sp.GetService()); + }); + services.AddSingleton(sp => sp.GetRequiredService()); + services.AddHostedService(); + + return services; + } + + /// + /// Adds SQL Server snapshot store and the necessary schema to the DI container using the configuration. + /// + /// Configuration section for SQL Server snapshot store options + /// Services collection + // ReSharper disable once UnusedMethodReturnValue.Global + public IServiceCollection AddSqlServerSnapshotStore(IConfiguration config) { + services.Configure(config); + services.AddSingleton(sp => sp.GetRequiredService>().Value); + + services.AddSingleton(sp => { + var options = sp.GetRequiredService(); + return new SqlServerSnapshotStore(options.ConnectionString, options, sp.GetService()); + }); + services.AddSingleton(sp => sp.GetRequiredService()); + services.AddHostedService(); + + return services; + } } } diff --git a/src/SqlServer/src/Eventuous.SqlServer/Snapshots/Scripts/1_SnapshotSchema.sql b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/Scripts/1_SnapshotSchema.sql new file mode 100644 index 00000000..d118344e --- /dev/null +++ b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/Scripts/1_SnapshotSchema.sql @@ -0,0 +1,22 @@ +IF (SCHEMA_ID(N'__schema__') IS NULL) + BEGIN + EXEC ('CREATE SCHEMA [__schema__] AUTHORIZATION [dbo]') + END + +IF OBJECT_ID('__schema__.snapshots', 'U') IS NULL + BEGIN + CREATE TABLE __schema__.snapshots + ( + stream_name NVARCHAR(1000) NOT NULL, + revision BIGINT NOT NULL, + event_type NVARCHAR(128) NOT NULL, + json_data NVARCHAR(MAX) NOT NULL, + created DATETIME2(7) NOT NULL DEFAULT GETUTCDATE(), + CONSTRAINT PK_Snapshots PRIMARY KEY CLUSTERED (stream_name), + CONSTRAINT CK_Snapshots_RevisionGteZero CHECK (revision >= 0), + CONSTRAINT CK_Snapshots_JsonDataIsJson CHECK (ISJSON(json_data) = 1) + ); + + CREATE INDEX IDX_SnapshotsStreamName ON __schema__.snapshots (stream_name); + END + diff --git a/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SnapshotSchema.cs b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SnapshotSchema.cs new file mode 100644 index 00000000..4b901a36 --- /dev/null +++ b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SnapshotSchema.cs @@ -0,0 +1,62 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Reflection; +using Eventuous.SqlServer; +using Microsoft.Extensions.Logging; + +namespace Eventuous.SqlServer.Snapshots; + +/// +/// Instantiate a new SnapshotSchema object with the specified schema name. The default schema name is "eventuous" +/// +/// +public class SnapshotSchema(string schema = SnapshotSchema.DefaultSchema) { + public const string DefaultSchema = "eventuous"; + + public string Name => schema; + + public string ReadSnapshot => $"SELECT revision, event_type, json_data FROM {schema}.snapshots WHERE stream_name = @stream_name"; + public string WriteSnapshot => $"MERGE {schema}.snapshots AS target USING (VALUES (@stream_name, @revision, @event_type, @json_data)) AS source (stream_name, revision, event_type, json_data) ON target.stream_name = source.stream_name WHEN MATCHED THEN UPDATE SET revision = source.revision, event_type = source.event_type, json_data = source.json_data, created = GETUTCDATE() WHEN NOT MATCHED THEN INSERT (stream_name, revision, event_type, json_data, created) VALUES (source.stream_name, source.revision, source.event_type, source.json_data, GETUTCDATE());"; + public string DeleteSnapshot => $"DELETE FROM {schema}.snapshots WHERE stream_name = @stream_name"; + + static readonly Assembly Assembly = typeof(SnapshotSchema).Assembly; + + public async Task CreateSchema(string connectionString, ILogger? log, CancellationToken cancellationToken = default) { + log?.LogInformation("Creating snapshot schema {Schema}", schema); + const string scriptName = "Eventuous.SqlServer.Snapshots.Scripts.1_SnapshotSchema.sql"; + + await using var connection = await ConnectionFactory.GetConnection(connectionString, cancellationToken).NoContext(); + await using var transaction = (SqlTransaction)await connection.BeginTransactionAsync(cancellationToken).NoContext(); + + try { + log?.LogInformation("Executing {Script}", scriptName); + await using var stream = Assembly.GetManifestResourceStream(scriptName); + if (stream == null) { + throw new InvalidOperationException($"Embedded resource {scriptName} not found"); + } + + using var reader = new StreamReader(stream); + +#if NET7_0_OR_GREATER + var script = await reader.ReadToEndAsync(cancellationToken).NoContext(); +#else + var script = await reader.ReadToEndAsync().NoContext(); +#endif + var cmdScript = script.Replace("__schema__", schema); + + await using var cmd = new SqlCommand(cmdScript, connection, transaction); + + await cmd.ExecuteNonQueryAsync(cancellationToken).NoContext(); + } catch (Exception e) { + log?.LogCritical(e, "Unable to initialize the snapshot database schema"); + await transaction.RollbackAsync(cancellationToken); + + throw; + } + + await transaction.CommitAsync(cancellationToken).NoContext(); + log?.LogInformation("Snapshot database schema initialized"); + } +} + diff --git a/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SnapshotSchemaInitializer.cs b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SnapshotSchemaInitializer.cs new file mode 100644 index 00000000..33e1bfb7 --- /dev/null +++ b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SnapshotSchemaInitializer.cs @@ -0,0 +1,38 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Eventuous.SqlServer.Snapshots; + +public class SnapshotSchemaInitializer(SqlServerSnapshotStoreOptions options, ILoggerFactory? loggerFactory = null) : IHostedService { + readonly ILogger? _log = loggerFactory?.CreateLogger(); + + public async Task StartAsync(CancellationToken cancellationToken) { + if (!options.InitializeDatabase) return; + + var schema = new SnapshotSchema(options.Schema); + var connectionString = Ensure.NotEmptyString(options.ConnectionString); + + Exception? ex = null; + + for (var i = 0; i < 10; i++) { + try { + await schema.CreateSchema(connectionString, _log, cancellationToken); + + return; + } catch (SqlException e) { + _log?.LogError("Unable to initialize the snapshot database schema: {Message}", e.Message); + ex = e; + } + + await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); + } + + throw ex!; + } + + public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask; +} + diff --git a/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SqlServerSnapshotStore.cs b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SqlServerSnapshotStore.cs new file mode 100644 index 00000000..e9142ba5 --- /dev/null +++ b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SqlServerSnapshotStore.cs @@ -0,0 +1,97 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Runtime.Serialization; +using System.Text; +using Eventuous; +using Eventuous.SqlServer; +using Eventuous.SqlServer.Extensions; +using static Eventuous.DeserializationResult; + +namespace Eventuous.SqlServer.Snapshots; + +/// +/// SQL Server snapshot store implementation for storing snapshots separately from event streams. +/// +public class SqlServerSnapshotStore : ISnapshotStore { + readonly string _connectionString; + readonly SnapshotSchema _schema; + readonly IEventSerializer _serializer; + const string ContentType = "application/json"; + + public SqlServerSnapshotStore( + string connectionString, + SqlServerSnapshotStoreOptions? options, + IEventSerializer? serializer = null + ) { + var sqlOptions = options ?? new SqlServerSnapshotStoreOptions(); + _schema = new SnapshotSchema(sqlOptions.Schema); + _connectionString = Ensure.NotEmptyString(connectionString); + _serializer = serializer ?? DefaultEventSerializer.Instance; + } + + /// + [RequiresDynamicCode("Only works with AOT when using DefaultStaticEventSerializer")] + [RequiresUnreferencedCode("Only works with AOT when using DefaultStaticEventSerializer")] + public async Task Read(StreamName streamName, CancellationToken cancellationToken = default) { + await using var connection = await ConnectionFactory.GetConnection(_connectionString, cancellationToken).NoContext(); + await using var cmd = connection.GetTextCommand(_schema.ReadSnapshot) + .Add("@stream_name", SqlDbType.NVarChar, streamName.ToString()); + + await using var reader = await cmd.ExecuteReaderAsync(cancellationToken).NoContext(); + + if (!await reader.ReadAsync(cancellationToken).NoContext()) { + return null; + } + + var revision = reader.GetInt64(0); + var eventType = reader.GetString(1); + var jsonData = reader.GetString(2); + + var deserialized = _serializer.DeserializeEvent( + Encoding.UTF8.GetBytes(jsonData), + eventType, + ContentType + ); + + return deserialized switch { + SuccessfullyDeserialized success => new Snapshot { + Revision = revision, + Payload = success.Payload + }, + FailedToDeserialize failed => throw new SerializationException($"Can't deserialize snapshot {eventType}: {failed.Error}"), + _ => throw new("Unknown deserialization result") + }; + } + + /// + [RequiresDynamicCode("Only works with AOT when using DefaultStaticEventSerializer")] + [RequiresUnreferencedCode("Only works with AOT when using DefaultStaticEventSerializer")] + public async Task Write(StreamName streamName, Snapshot snapshot, CancellationToken cancellationToken = default) { + if (snapshot.Payload == null) { + throw new ArgumentException("Snapshot payload cannot be null", nameof(snapshot)); + } + + var serialized = _serializer.SerializeEvent(snapshot.Payload); + var jsonData = Encoding.UTF8.GetString(serialized.Payload); + + await using var connection = await ConnectionFactory.GetConnection(_connectionString, cancellationToken).NoContext(); + await using var cmd = connection.GetTextCommand(_schema.WriteSnapshot) + .Add("@stream_name", SqlDbType.NVarChar, streamName.ToString()) + .Add("@revision", SqlDbType.BigInt, snapshot.Revision) + .Add("@event_type", SqlDbType.NVarChar, serialized.EventType) + .Add("@json_data", SqlDbType.NVarChar, jsonData); + + await cmd.ExecuteNonQueryAsync(cancellationToken).NoContext(); + } + + /// + public async Task Delete(StreamName streamName, CancellationToken cancellationToken = default) { + await using var connection = await ConnectionFactory.GetConnection(_connectionString, cancellationToken).NoContext(); + await using var cmd = connection.GetTextCommand(_schema.DeleteSnapshot) + .Add("@stream_name", SqlDbType.NVarChar, streamName.ToString()); + + await cmd.ExecuteNonQueryAsync(cancellationToken).NoContext(); + } +} + diff --git a/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SqlServerSnapshotStoreOptions.cs b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SqlServerSnapshotStoreOptions.cs new file mode 100644 index 00000000..3cfb886e --- /dev/null +++ b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SqlServerSnapshotStoreOptions.cs @@ -0,0 +1,26 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +// ReSharper disable PropertyCanBeMadeInitOnly.Global + +namespace Eventuous.SqlServer.Snapshots; + +public class SqlServerSnapshotStoreOptions(string schema) { + public SqlServerSnapshotStoreOptions() : this(SnapshotSchema.DefaultSchema) { } + + /// + /// Override the default schema name. + /// + public string Schema { get; set; } = schema; + + /// + /// SQL Server connection string. + /// + public string ConnectionString { get; set; } = null!; + + /// + /// Set to true to initialize the database schema on startup. Default is false. + /// + public bool InitializeDatabase { get; set; } +} +