diff --git a/Directory.Packages.props b/Directory.Packages.props
index 26b5371b..7a38340c 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -22,7 +22,13 @@
0.77.3
+
+
+
+
+
+
diff --git a/Eventuous.slnx b/Eventuous.slnx
index 9866ba98..a4215337 100644
--- a/Eventuous.slnx
+++ b/Eventuous.slnx
@@ -114,6 +114,7 @@
+
@@ -140,6 +141,11 @@
+
+
+
+
+
diff --git a/samples/banking/Banking.Api/Banking.Api.csproj b/samples/banking/Banking.Api/Banking.Api.csproj
new file mode 100644
index 00000000..53090d69
--- /dev/null
+++ b/samples/banking/Banking.Api/Banking.Api.csproj
@@ -0,0 +1,25 @@
+
+
+
+ net10.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/samples/banking/Banking.Api/Program.cs b/samples/banking/Banking.Api/Program.cs
new file mode 100644
index 00000000..bf985e32
--- /dev/null
+++ b/samples/banking/Banking.Api/Program.cs
@@ -0,0 +1,65 @@
+using Banking.Api.Services;
+using Banking.Domain.Accounts;
+using Eventuous.KurrentDB;
+using Microsoft.AspNetCore.Mvc;
+
+var builder = WebApplication.CreateBuilder(args);
+
+//----------------------------------------------------------------
+// Snapshot store registration
+
+var postgresSnapshotDbConnectionString = builder.Configuration.GetConnectionString("postgresSnapshotsDb");
+if (postgresSnapshotDbConnectionString == null) {
+ throw new InvalidOperationException("postgres snapshots db conenction string should be not null");
+}
+
+//builder.Services.AddPostgresSnapshotStore(postgresSnapshotDbConnectionString, initializeDatabase: true);
+
+var sqlServerSnapshotsDbConnectionString = builder.Configuration.GetConnectionString("sqlServerSnapshotsDb");
+if (sqlServerSnapshotsDbConnectionString == null) {
+ throw new InvalidOperationException("sqlServer snapshots db connection string should be not null");
+}
+
+//builder.Services.AddSqlServerSnapshotStore(sqlServerSnapshotsDbConnectionString, initializeDatabase: true);
+
+var mongoDbSnapshotsDbConnectionString = builder.Configuration.GetConnectionString("mongoDbSnapshotsDb");
+if (mongoDbSnapshotsDbConnectionString == null) {
+ throw new InvalidOperationException("mongodb snapshots db connection string should be not null");
+}
+
+//builder.Services.AddMongoSnapshotStore(mongoDbSnapshotsDbConnectionString, initializeIndexes: true);
+
+var redisConnectionString = builder.Configuration.GetConnectionString("redis");
+if (redisConnectionString == null) {
+ throw new InvalidOperationException("redis connection string should be not null");
+}
+
+builder.Services.AddRedisSnapshotStore(redisConnectionString, database: 0);
+
+//----------------------------------------------------------------
+// Event store registration
+
+builder.AddKurrentDBClient("kurrentdb");
+builder.Services.AddEventStore();
+
+//----------------------------------------------------------------
+
+builder.Services.AddCommandService();
+
+var app = builder.Build();
+
+app.MapGet("/accounts/{id}/deposit/{amount}", async ([FromRoute] string id, [FromRoute] decimal amount, [FromServices] AccountService accountService) => {
+ var cmd = new AccountService.Deposit(id, amount);
+ var res = await accountService.Handle(cmd, default);
+
+ return res.Match
diff --git a/src/SqlServer/src/Eventuous.SqlServer/Extensions/RegistrationExtensions.cs b/src/SqlServer/src/Eventuous.SqlServer/Extensions/RegistrationExtensions.cs
index f65f0420..2b989be4 100644
--- a/src/SqlServer/src/Eventuous.SqlServer/Extensions/RegistrationExtensions.cs
+++ b/src/SqlServer/src/Eventuous.SqlServer/Extensions/RegistrationExtensions.cs
@@ -3,6 +3,7 @@
using Eventuous.SqlServer;
using Eventuous.SqlServer.Projections;
+using Eventuous.SqlServer.Snapshots;
using Eventuous.SqlServer.Subscriptions;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection.Extensions;
@@ -84,5 +85,55 @@ public IServiceCollection AddSqlServerCheckpointStore()
return new(Ensure.NotNull(connectionString), schema, loggerFactory);
}
);
+
+ ///
+ /// Adds SQL Server snapshot store and the necessary schema to the DI container.
+ ///
+ /// Connection string
+ /// Schema name
+ /// Set to true if you want the schema to be created on startup
+ /// Services collection
+ // ReSharper disable once UnusedMethodReturnValue.Global
+ public IServiceCollection AddSqlServerSnapshotStore(
+ string connectionString,
+ string schema = SnapshotSchema.DefaultSchema,
+ bool initializeDatabase = false
+ ) {
+ var options = new SqlServerSnapshotStoreOptions {
+ Schema = schema,
+ ConnectionString = connectionString,
+ InitializeDatabase = initializeDatabase
+ };
+
+ services.AddSingleton(options);
+ services.AddSingleton(sp => {
+ var snapshotOptions = sp.GetRequiredService();
+ return new SqlServerSnapshotStore(snapshotOptions.ConnectionString, snapshotOptions, sp.GetService());
+ });
+ services.AddSingleton(sp => sp.GetRequiredService());
+ services.AddHostedService();
+
+ return services;
+ }
+
+ ///
+ /// Adds SQL Server snapshot store and the necessary schema to the DI container using the configuration.
+ ///
+ /// Configuration section for SQL Server snapshot store options
+ /// Services collection
+ // ReSharper disable once UnusedMethodReturnValue.Global
+ public IServiceCollection AddSqlServerSnapshotStore(IConfiguration config) {
+ services.Configure(config);
+ services.AddSingleton(sp => sp.GetRequiredService>().Value);
+
+ services.AddSingleton(sp => {
+ var options = sp.GetRequiredService();
+ return new SqlServerSnapshotStore(options.ConnectionString, options, sp.GetService());
+ });
+ services.AddSingleton(sp => sp.GetRequiredService());
+ services.AddHostedService();
+
+ return services;
+ }
}
}
diff --git a/src/SqlServer/src/Eventuous.SqlServer/Snapshots/Scripts/1_SnapshotSchema.sql b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/Scripts/1_SnapshotSchema.sql
new file mode 100644
index 00000000..d118344e
--- /dev/null
+++ b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/Scripts/1_SnapshotSchema.sql
@@ -0,0 +1,22 @@
+IF (SCHEMA_ID(N'__schema__') IS NULL)
+ BEGIN
+ EXEC ('CREATE SCHEMA [__schema__] AUTHORIZATION [dbo]')
+ END
+
+IF OBJECT_ID('__schema__.snapshots', 'U') IS NULL
+ BEGIN
+ CREATE TABLE __schema__.snapshots
+ (
+ stream_name NVARCHAR(1000) NOT NULL,
+ revision BIGINT NOT NULL,
+ event_type NVARCHAR(128) NOT NULL,
+ json_data NVARCHAR(MAX) NOT NULL,
+ created DATETIME2(7) NOT NULL DEFAULT GETUTCDATE(),
+ CONSTRAINT PK_Snapshots PRIMARY KEY CLUSTERED (stream_name),
+ CONSTRAINT CK_Snapshots_RevisionGteZero CHECK (revision >= 0),
+ CONSTRAINT CK_Snapshots_JsonDataIsJson CHECK (ISJSON(json_data) = 1)
+ );
+
+ CREATE INDEX IDX_SnapshotsStreamName ON __schema__.snapshots (stream_name);
+ END
+
diff --git a/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SnapshotSchema.cs b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SnapshotSchema.cs
new file mode 100644
index 00000000..4b901a36
--- /dev/null
+++ b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SnapshotSchema.cs
@@ -0,0 +1,62 @@
+// Copyright (C) Eventuous HQ OÜ. All rights reserved
+// Licensed under the Apache License, Version 2.0.
+
+using System.Reflection;
+using Eventuous.SqlServer;
+using Microsoft.Extensions.Logging;
+
+namespace Eventuous.SqlServer.Snapshots;
+
+///
+/// Instantiate a new SnapshotSchema object with the specified schema name. The default schema name is "eventuous"
+///
+///
+public class SnapshotSchema(string schema = SnapshotSchema.DefaultSchema) {
+ public const string DefaultSchema = "eventuous";
+
+ public string Name => schema;
+
+ public string ReadSnapshot => $"SELECT revision, event_type, json_data FROM {schema}.snapshots WHERE stream_name = @stream_name";
+ public string WriteSnapshot => $"MERGE {schema}.snapshots AS target USING (VALUES (@stream_name, @revision, @event_type, @json_data)) AS source (stream_name, revision, event_type, json_data) ON target.stream_name = source.stream_name WHEN MATCHED THEN UPDATE SET revision = source.revision, event_type = source.event_type, json_data = source.json_data, created = GETUTCDATE() WHEN NOT MATCHED THEN INSERT (stream_name, revision, event_type, json_data, created) VALUES (source.stream_name, source.revision, source.event_type, source.json_data, GETUTCDATE());";
+ public string DeleteSnapshot => $"DELETE FROM {schema}.snapshots WHERE stream_name = @stream_name";
+
+ static readonly Assembly Assembly = typeof(SnapshotSchema).Assembly;
+
+ public async Task CreateSchema(string connectionString, ILogger? log, CancellationToken cancellationToken = default) {
+ log?.LogInformation("Creating snapshot schema {Schema}", schema);
+ const string scriptName = "Eventuous.SqlServer.Snapshots.Scripts.1_SnapshotSchema.sql";
+
+ await using var connection = await ConnectionFactory.GetConnection(connectionString, cancellationToken).NoContext();
+ await using var transaction = (SqlTransaction)await connection.BeginTransactionAsync(cancellationToken).NoContext();
+
+ try {
+ log?.LogInformation("Executing {Script}", scriptName);
+ await using var stream = Assembly.GetManifestResourceStream(scriptName);
+ if (stream == null) {
+ throw new InvalidOperationException($"Embedded resource {scriptName} not found");
+ }
+
+ using var reader = new StreamReader(stream);
+
+#if NET7_0_OR_GREATER
+ var script = await reader.ReadToEndAsync(cancellationToken).NoContext();
+#else
+ var script = await reader.ReadToEndAsync().NoContext();
+#endif
+ var cmdScript = script.Replace("__schema__", schema);
+
+ await using var cmd = new SqlCommand(cmdScript, connection, transaction);
+
+ await cmd.ExecuteNonQueryAsync(cancellationToken).NoContext();
+ } catch (Exception e) {
+ log?.LogCritical(e, "Unable to initialize the snapshot database schema");
+ await transaction.RollbackAsync(cancellationToken);
+
+ throw;
+ }
+
+ await transaction.CommitAsync(cancellationToken).NoContext();
+ log?.LogInformation("Snapshot database schema initialized");
+ }
+}
+
diff --git a/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SnapshotSchemaInitializer.cs b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SnapshotSchemaInitializer.cs
new file mode 100644
index 00000000..33e1bfb7
--- /dev/null
+++ b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SnapshotSchemaInitializer.cs
@@ -0,0 +1,38 @@
+// Copyright (C) Eventuous HQ OÜ. All rights reserved
+// Licensed under the Apache License, Version 2.0.
+
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace Eventuous.SqlServer.Snapshots;
+
+public class SnapshotSchemaInitializer(SqlServerSnapshotStoreOptions options, ILoggerFactory? loggerFactory = null) : IHostedService {
+ readonly ILogger? _log = loggerFactory?.CreateLogger();
+
+ public async Task StartAsync(CancellationToken cancellationToken) {
+ if (!options.InitializeDatabase) return;
+
+ var schema = new SnapshotSchema(options.Schema);
+ var connectionString = Ensure.NotEmptyString(options.ConnectionString);
+
+ Exception? ex = null;
+
+ for (var i = 0; i < 10; i++) {
+ try {
+ await schema.CreateSchema(connectionString, _log, cancellationToken);
+
+ return;
+ } catch (SqlException e) {
+ _log?.LogError("Unable to initialize the snapshot database schema: {Message}", e.Message);
+ ex = e;
+ }
+
+ await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
+ }
+
+ throw ex!;
+ }
+
+ public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
+}
+
diff --git a/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SqlServerSnapshotStore.cs b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SqlServerSnapshotStore.cs
new file mode 100644
index 00000000..e9142ba5
--- /dev/null
+++ b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SqlServerSnapshotStore.cs
@@ -0,0 +1,97 @@
+// Copyright (C) Eventuous HQ OÜ. All rights reserved
+// Licensed under the Apache License, Version 2.0.
+
+using System.Runtime.Serialization;
+using System.Text;
+using Eventuous;
+using Eventuous.SqlServer;
+using Eventuous.SqlServer.Extensions;
+using static Eventuous.DeserializationResult;
+
+namespace Eventuous.SqlServer.Snapshots;
+
+///
+/// SQL Server snapshot store implementation for storing snapshots separately from event streams.
+///
+public class SqlServerSnapshotStore : ISnapshotStore {
+ readonly string _connectionString;
+ readonly SnapshotSchema _schema;
+ readonly IEventSerializer _serializer;
+ const string ContentType = "application/json";
+
+ public SqlServerSnapshotStore(
+ string connectionString,
+ SqlServerSnapshotStoreOptions? options,
+ IEventSerializer? serializer = null
+ ) {
+ var sqlOptions = options ?? new SqlServerSnapshotStoreOptions();
+ _schema = new SnapshotSchema(sqlOptions.Schema);
+ _connectionString = Ensure.NotEmptyString(connectionString);
+ _serializer = serializer ?? DefaultEventSerializer.Instance;
+ }
+
+ ///
+ [RequiresDynamicCode("Only works with AOT when using DefaultStaticEventSerializer")]
+ [RequiresUnreferencedCode("Only works with AOT when using DefaultStaticEventSerializer")]
+ public async Task Read(StreamName streamName, CancellationToken cancellationToken = default) {
+ await using var connection = await ConnectionFactory.GetConnection(_connectionString, cancellationToken).NoContext();
+ await using var cmd = connection.GetTextCommand(_schema.ReadSnapshot)
+ .Add("@stream_name", SqlDbType.NVarChar, streamName.ToString());
+
+ await using var reader = await cmd.ExecuteReaderAsync(cancellationToken).NoContext();
+
+ if (!await reader.ReadAsync(cancellationToken).NoContext()) {
+ return null;
+ }
+
+ var revision = reader.GetInt64(0);
+ var eventType = reader.GetString(1);
+ var jsonData = reader.GetString(2);
+
+ var deserialized = _serializer.DeserializeEvent(
+ Encoding.UTF8.GetBytes(jsonData),
+ eventType,
+ ContentType
+ );
+
+ return deserialized switch {
+ SuccessfullyDeserialized success => new Snapshot {
+ Revision = revision,
+ Payload = success.Payload
+ },
+ FailedToDeserialize failed => throw new SerializationException($"Can't deserialize snapshot {eventType}: {failed.Error}"),
+ _ => throw new("Unknown deserialization result")
+ };
+ }
+
+ ///
+ [RequiresDynamicCode("Only works with AOT when using DefaultStaticEventSerializer")]
+ [RequiresUnreferencedCode("Only works with AOT when using DefaultStaticEventSerializer")]
+ public async Task Write(StreamName streamName, Snapshot snapshot, CancellationToken cancellationToken = default) {
+ if (snapshot.Payload == null) {
+ throw new ArgumentException("Snapshot payload cannot be null", nameof(snapshot));
+ }
+
+ var serialized = _serializer.SerializeEvent(snapshot.Payload);
+ var jsonData = Encoding.UTF8.GetString(serialized.Payload);
+
+ await using var connection = await ConnectionFactory.GetConnection(_connectionString, cancellationToken).NoContext();
+ await using var cmd = connection.GetTextCommand(_schema.WriteSnapshot)
+ .Add("@stream_name", SqlDbType.NVarChar, streamName.ToString())
+ .Add("@revision", SqlDbType.BigInt, snapshot.Revision)
+ .Add("@event_type", SqlDbType.NVarChar, serialized.EventType)
+ .Add("@json_data", SqlDbType.NVarChar, jsonData);
+
+ await cmd.ExecuteNonQueryAsync(cancellationToken).NoContext();
+ }
+
+ ///
+ public async Task Delete(StreamName streamName, CancellationToken cancellationToken = default) {
+ await using var connection = await ConnectionFactory.GetConnection(_connectionString, cancellationToken).NoContext();
+ await using var cmd = connection.GetTextCommand(_schema.DeleteSnapshot)
+ .Add("@stream_name", SqlDbType.NVarChar, streamName.ToString());
+
+ await cmd.ExecuteNonQueryAsync(cancellationToken).NoContext();
+ }
+}
+
diff --git a/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SqlServerSnapshotStoreOptions.cs b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SqlServerSnapshotStoreOptions.cs
new file mode 100644
index 00000000..3cfb886e
--- /dev/null
+++ b/src/SqlServer/src/Eventuous.SqlServer/Snapshots/SqlServerSnapshotStoreOptions.cs
@@ -0,0 +1,26 @@
+// Copyright (C) Eventuous HQ OÜ. All rights reserved
+// Licensed under the Apache License, Version 2.0.
+
+// ReSharper disable PropertyCanBeMadeInitOnly.Global
+
+namespace Eventuous.SqlServer.Snapshots;
+
+public class SqlServerSnapshotStoreOptions(string schema) {
+ public SqlServerSnapshotStoreOptions() : this(SnapshotSchema.DefaultSchema) { }
+
+ ///
+ /// Override the default schema name.
+ ///
+ public string Schema { get; set; } = schema;
+
+ ///
+ /// SQL Server connection string.
+ ///
+ public string ConnectionString { get; set; } = null!;
+
+ ///
+ /// Set to true to initialize the database schema on startup. Default is false.
+ ///
+ public bool InitializeDatabase { get; set; }
+}
+