From 6b30f8b6f06a3f3c09d076097cd8311225e585b0 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Mon, 9 Oct 2023 17:46:13 +0200 Subject: [PATCH 1/2] Add load test for SQL Server subscription with 55K events produced and consumed --- .../Filters/ConsumerFilter.cs | 3 - .../Eventuous.SqlServer.csproj | 6 ++ .../src/Eventuous.SqlServer/Schema.cs | 18 ++--- .../Scripts/ReadAllBackwards.sql | 15 ++++ .../Scripts/ReadStreamBackwards.sql | 23 ++++++ .../src/Eventuous.SqlServer/SqlServerStore.cs | 25 +++++-- .../Fixtures/IntegrationFixture.cs | 11 +-- .../Fixtures/SubscriptionFixture.cs | 11 +-- .../Subscriptions/LoadTest.cs | 72 +++++++++++++++++++ .../Subscriptions/SubscribeToStream.cs | 7 +- test/Eventuous.Sut.Domain/BookingEvents.cs | 21 ++---- test/Eventuous.Sut.Subs/TestEventHandler.cs | 3 +- 12 files changed, 165 insertions(+), 50 deletions(-) create mode 100644 src/SqlServer/src/Eventuous.SqlServer/Scripts/ReadAllBackwards.sql create mode 100644 src/SqlServer/src/Eventuous.SqlServer/Scripts/ReadStreamBackwards.sql create mode 100644 src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/LoadTest.cs diff --git a/src/Core/src/Eventuous.Subscriptions/Filters/ConsumerFilter.cs b/src/Core/src/Eventuous.Subscriptions/Filters/ConsumerFilter.cs index 22ad566c..01dbf483 100644 --- a/src/Core/src/Eventuous.Subscriptions/Filters/ConsumerFilter.cs +++ b/src/Core/src/Eventuous.Subscriptions/Filters/ConsumerFilter.cs @@ -1,15 +1,12 @@ // Copyright (C) Ubiquitous AS. All rights reserved // Licensed under the Apache License, Version 2.0. -using System.Runtime.CompilerServices; - namespace Eventuous.Subscriptions.Filters; using Consumers; using Context; public class ConsumerFilter(IMessageConsumer consumer) : ConsumeFilter { - [MethodImpl(MethodImplOptions.AggressiveInlining)] protected override ValueTask Send(IMessageConsumeContext context, LinkedListNode? next) => consumer.Consume(context); } diff --git a/src/SqlServer/src/Eventuous.SqlServer/Eventuous.SqlServer.csproj b/src/SqlServer/src/Eventuous.SqlServer/Eventuous.SqlServer.csproj index 5c7deb19..a4dd9384 100644 --- a/src/SqlServer/src/Eventuous.SqlServer/Eventuous.SqlServer.csproj +++ b/src/SqlServer/src/Eventuous.SqlServer/Eventuous.SqlServer.csproj @@ -12,7 +12,9 @@ + + @@ -26,4 +28,8 @@ + + + + diff --git a/src/SqlServer/src/Eventuous.SqlServer/Schema.cs b/src/SqlServer/src/Eventuous.SqlServer/Schema.cs index 28984de2..a8c5ee8d 100644 --- a/src/SqlServer/src/Eventuous.SqlServer/Schema.cs +++ b/src/SqlServer/src/Eventuous.SqlServer/Schema.cs @@ -8,14 +8,16 @@ namespace Eventuous.SqlServer; public class Schema(string schema = Schema.DefaultSchema) { public const string DefaultSchema = "eventuous"; - public string AppendEvents => $"{schema}.append_events"; - public string ReadStreamForwards => $"{schema}.read_stream_forwards"; - public string ReadStreamSub => $"{schema}.read_stream_sub"; - public string ReadAllForwards => $"{schema}.read_all_forwards"; - public string CheckStream => $"{schema}.check_stream"; - public string StreamExists => $"SELECT CAST(IIF(EXISTS(SELECT 1 FROM {schema}.Streams WHERE StreamName = (@name)), 1, 0) AS BIT)"; - public string GetCheckpointSql => $"SELECT Position FROM {schema}.Checkpoints where Id=(@checkpointId)"; - public string AddCheckpointSql => $"INSERT INTO {schema}.Checkpoints (Id) VALUES ((@checkpointId))"; + public string AppendEvents => $"{schema}.append_events"; + public string ReadStreamForwards => $"{schema}.read_stream_forwards"; + public string ReadStreamBackwards => $"{schema}.read_stream_backwards"; + public string ReadStreamSub => $"{schema}.read_stream_sub"; + public string ReadAllForwards => $"{schema}.read_all_forwards"; + public string ReadAllBackwards => $"{schema}.read_all_backwards"; + public string CheckStream => $"{schema}.check_stream"; + public string StreamExists => $"SELECT CAST(IIF(EXISTS(SELECT 1 FROM {schema}.Streams WHERE StreamName = (@name)), 1, 0) AS BIT)"; + public string GetCheckpointSql => $"SELECT Position FROM {schema}.Checkpoints where Id=(@checkpointId)"; + public string AddCheckpointSql => $"INSERT INTO {schema}.Checkpoints (Id) VALUES ((@checkpointId))"; public string UpdateCheckpointSql => $"UPDATE {schema}.Checkpoints set Position=(@position) where Id=(@checkpointId)"; diff --git a/src/SqlServer/src/Eventuous.SqlServer/Scripts/ReadAllBackwards.sql b/src/SqlServer/src/Eventuous.SqlServer/Scripts/ReadAllBackwards.sql new file mode 100644 index 00000000..f1c0d657 --- /dev/null +++ b/src/SqlServer/src/Eventuous.SqlServer/Scripts/ReadAllBackwards.sql @@ -0,0 +1,15 @@ +CREATE OR ALTER PROCEDURE __schema__.read_all_backwards + @from_position bigint, + @count int + AS +BEGIN + +SELECT TOP (@count) + MessageId, MessageType, StreamPosition, GlobalPosition, + JsonData, JsonMetadata, Created, StreamName +FROM __schema__.Messages +INNER JOIN __schema__.Streams ON Messages.StreamId = Streams.StreamId +WHERE Messages.GlobalPosition <= @from_position +ORDER BY Messages.GlobalPosition DESC + +END \ No newline at end of file diff --git a/src/SqlServer/src/Eventuous.SqlServer/Scripts/ReadStreamBackwards.sql b/src/SqlServer/src/Eventuous.SqlServer/Scripts/ReadStreamBackwards.sql new file mode 100644 index 00000000..ea9039e7 --- /dev/null +++ b/src/SqlServer/src/Eventuous.SqlServer/Scripts/ReadStreamBackwards.sql @@ -0,0 +1,23 @@ +CREATE OR ALTER PROCEDURE __schema__.read_stream_backwards + @stream_name NVARCHAR(850), + @count INT +AS +BEGIN + +DECLARE @current_version int, @stream_id int + +SELECT @current_version = Version, @stream_id = StreamId +FROM __schema__.Streams +WHERE StreamName = @stream_name + +IF @stream_id IS NULL + THROW 50001, 'StreamNotFound', 1; + +SELECT TOP (@count) + MessageId, MessageType, StreamPosition, GlobalPosition, + JsonData, JsonMetadata, Created +FROM __schema__.Messages +WHERE StreamId = @stream_id AND StreamPosition <= @current_version +ORDER BY Messages.GlobalPosition DESC + +END \ No newline at end of file diff --git a/src/SqlServer/src/Eventuous.SqlServer/SqlServerStore.cs b/src/SqlServer/src/Eventuous.SqlServer/SqlServerStore.cs index 9a1324a3..5582a14e 100644 --- a/src/SqlServer/src/Eventuous.SqlServer/SqlServerStore.cs +++ b/src/SqlServer/src/Eventuous.SqlServer/SqlServerStore.cs @@ -59,8 +59,23 @@ CancellationToken cancellationToken } } - public Task ReadEventsBackwards(StreamName stream, int count, CancellationToken cancellationToken) - => throw new NotImplementedException(); + public async Task ReadEventsBackwards(StreamName stream, int count, CancellationToken cancellationToken) { + await using var connection = await OpenConnection(cancellationToken).NoContext(); + + await using var cmd = connection.GetStoredProcCommand(_schema.ReadStreamBackwards) + .Add("@stream_name", SqlDbType.NVarChar, stream.ToString()) + .Add("@count", SqlDbType.Int, count); + + try { + await using var reader = await cmd.ExecuteReaderAsync(cancellationToken).NoContext(); + + var result = reader.ReadEvents(cancellationToken); + + return await result.Select(x => ToStreamEvent(x)).ToArrayAsync(cancellationToken).NoContext(); + } catch (SqlException e) when (e.Message.StartsWith("StreamNotFound")) { + throw new StreamNotFound(stream); + } + } public async Task AppendEvents( StreamName stream, @@ -130,11 +145,7 @@ CancellationToken cancellationToken ) => throw new NotImplementedException(); - public Task DeleteStream( - StreamName stream, - ExpectedStreamVersion expectedVersion, - CancellationToken cancellationToken - ) + public Task DeleteStream(StreamName stream, ExpectedStreamVersion expectedVersion, CancellationToken cancellationToken) => throw new NotImplementedException(); StreamEvent ToStreamEvent(PersistedEvent evt) { diff --git a/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/IntegrationFixture.cs b/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/IntegrationFixture.cs index 304c380e..5d950955 100644 --- a/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/IntegrationFixture.cs +++ b/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/IntegrationFixture.cs @@ -16,6 +16,7 @@ public sealed class IntegrationFixture : IAsyncLifetime { public IFixture Auto { get; } = new Fixture().Customize(new NodaTimeCustomization()); public GetSqlServerConnection GetConnection { get; private set; } = null!; public Faker Faker { get; } = new(); + public Schema Schema { get; set; } public string SchemaName { get; } @@ -35,15 +36,17 @@ public sealed class IntegrationFixture : IAsyncLifetime { public async Task InitializeAsync() { _sqlServer = new SqlEdgeBuilder() .WithImage("mcr.microsoft.com/azure-sql-edge:latest") + .WithAutoRemove(false) + .WithCleanUp(false) .Build(); await _sqlServer.StartAsync(); - var schema = new Schema(SchemaName); + Schema = new Schema(SchemaName); var connString = _sqlServer.GetConnectionString(); GetConnection = () => GetConn(connString); - await schema.CreateSchema(GetConnection); + await Schema.CreateSchema(GetConnection); DefaultEventSerializer.SetDefaultSerializer(Serializer); - EventStore = new SqlServerStore(GetConnection, new SqlServerStoreOptions(SchemaName), Serializer); + EventStore = new SqlServerStore(GetConnection, new SqlServerStoreOptions(SchemaName), Serializer); ActivitySource.AddActivityListener(_listener); return; @@ -52,7 +55,7 @@ public async Task InitializeAsync() { } public async Task DisposeAsync() { - await _sqlServer.DisposeAsync(); + // await _sqlServer.DisposeAsync(); _listener.Dispose(); } } diff --git a/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/SubscriptionFixture.cs b/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/SubscriptionFixture.cs index de2d2834..8b785d57 100644 --- a/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/SubscriptionFixture.cs +++ b/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/SubscriptionFixture.cs @@ -20,7 +20,7 @@ public abstract class SubscriptionFixture : IClassFixture protected SubscriptionFixture( IntegrationFixture fixture, - ITestOutputHelper outputHelper, + ITestOutputHelper output, T handler, bool subscribeToAll, bool autoStart = true, @@ -30,8 +30,8 @@ protected SubscriptionFixture( _fixture = fixture; _subscribeToAll = subscribeToAll; Stream = new StreamName(fixture.Auto.Create()); - SchemaName = fixture.GetSchemaName(); - _loggerFactory = TestHelpers.Logging.GetLoggerFactory(outputHelper, logLevel); + SchemaName = fixture.SchemaName; + _loggerFactory = TestHelpers.Logging.GetLoggerFactory(output, logLevel); _listener = new LoggingEventListener(_loggerFactory); SubscriptionId = $"test-{Guid.NewGuid():N}"; Handler = handler; @@ -39,6 +39,7 @@ protected SubscriptionFixture( } protected string SubscriptionId { get; } + protected Schema Schema { get; set; } protected ValueTask Start() => Subscription.SubscribeWithLog(Log); @@ -51,8 +52,8 @@ protected SubscriptionFixture( readonly ILoggerFactory _loggerFactory; public virtual async Task InitializeAsync() { - var schema = new Schema(SchemaName); - await schema.CreateSchema(_fixture.GetConnection); + this.Schema = new Schema(SchemaName); + await Schema.CreateSchema(_fixture.GetConnection); CheckpointStoreOptions = new SqlServerCheckpointStoreOptions { Schema = SchemaName }; CheckpointStore = new SqlServerCheckpointStore(_fixture.GetConnection, CheckpointStoreOptions); diff --git a/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/LoadTest.cs b/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/LoadTest.cs new file mode 100644 index 00000000..42df2be3 --- /dev/null +++ b/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/LoadTest.cs @@ -0,0 +1,72 @@ +using System.Data; +using Eventuous.SqlServer; +using Eventuous.SqlServer.Extensions; +using Eventuous.Sut.App; +using Eventuous.Sut.Domain; +using Eventuous.Sut.Subs; +using Eventuous.Tests.SqlServer.Fixtures; +using Hypothesist; + +namespace Eventuous.Tests.SqlServer.Subscriptions; + +public class LoadTest : SubscriptionFixture { + readonly IntegrationFixture _fixture; + readonly BookingService _service; + + public LoadTest(IntegrationFixture fixture, ITestOutputHelper output) + : base(fixture, output, new TestEventHandler(), true, autoStart: false, logLevel: LogLevel.Debug) { + _fixture = fixture; + var eventStore = new SqlServerStore(fixture.GetConnection, new SqlServerStoreOptions(SchemaName)); + var store = new AggregateStore(eventStore); + _service = new BookingService(store); + } + + [Fact] + public async Task ProduceAndConsumeManyEvents() { + const int count = 55000; + Handler.AssertThat().Any(_ => true); + + var generateTask = Task.Run(() => GenerateAndHandleCommands(count)); + + await Start(); + await Task.Delay(TimeSpan.FromMinutes(7)); + await Stop(); + Handler.Count.Should().Be(count); + + var checkpoint = await CheckpointStore.GetLastCheckpoint(SubscriptionId, default); + checkpoint.Position.Value.Should().Be(count - 1); + + await using var connection = _fixture.GetConnection(); + await connection.OpenAsync(); + + await using var cmd = connection.GetStoredProcCommand(Schema.ReadAllBackwards) + .Add("@from_position", SqlDbType.BigInt, long.MaxValue) + .Add("@count", SqlDbType.Int, 1); + await using var reader = await cmd.ExecuteReaderAsync(CancellationToken.None); + + var result = reader.ReadEvents(CancellationToken.None); + + var lastEvent = await result.LastAsync(); + lastEvent.GlobalPosition.Should().Be(count - 1); + } + + async Task> GenerateAndHandleCommands(int count) { + var commands = Enumerable + .Range(0, count) + .Select(_ => DomainFixture.CreateImportBooking()) + .ToList(); + + foreach (var cmd in commands) { + var result = await _service.Handle(cmd, default); + + if (result is ErrorResult error) { + throw error.Exception ?? new Exception(error.Message); + } + } + + return commands; + } + + static BookingEvents.BookingImported ToEvent(Commands.ImportBooking cmd) + => new(cmd.RoomId, cmd.Price, cmd.CheckIn, cmd.CheckOut); +} diff --git a/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeToStream.cs b/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeToStream.cs index ea93fe7e..fa7cb18f 100644 --- a/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeToStream.cs +++ b/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeToStream.cs @@ -59,12 +59,7 @@ async Task> GenerateAndProduceEvents(int count) { var streamEvents = events.Select(x => new StreamEvent(Guid.NewGuid(), x, new Metadata(), "", 0)); - await _eventStore.AppendEvents( - Stream, - ExpectedStreamVersion.Any, - streamEvents.ToArray(), - default - ); + await _eventStore.AppendEvents(Stream, ExpectedStreamVersion.Any, streamEvents.ToArray(), default); return events; } diff --git a/test/Eventuous.Sut.Domain/BookingEvents.cs b/test/Eventuous.Sut.Domain/BookingEvents.cs index 3645a4b5..9b3f67ce 100644 --- a/test/Eventuous.Sut.Domain/BookingEvents.cs +++ b/test/Eventuous.Sut.Domain/BookingEvents.cs @@ -6,19 +6,13 @@ namespace Eventuous.Sut.Domain; public static class BookingEvents { [EventType("RoomBooked")] - public record RoomBooked( - string RoomId, - LocalDate CheckIn, - LocalDate CheckOut, - float Price, - string? GuestId = null - ); + public record RoomBooked(string RoomId, LocalDate CheckIn, LocalDate CheckOut, float Price, string? GuestId = null); [EventType("PaymentRegistered")] public record BookingPaymentRegistered( - string PaymentId, - float AmountPaid - ); + string PaymentId, + float AmountPaid + ); [EventType("OutstandingAmountChanged")] public record BookingOutstandingAmountChanged(float OutstandingAmount); @@ -33,12 +27,7 @@ public record BookingOverpaid(float OverpaidAmount); public record BookingCancelled; [EventType("V1.BookingImported")] - public record BookingImported( - string RoomId, - float Price, - LocalDate CheckIn, - LocalDate CheckOut - ); + public record BookingImported(string RoomId, float Price, LocalDate CheckIn, LocalDate CheckOut); // These constants are for test purpose, use inline names in real apps public static class TypeNames { diff --git a/test/Eventuous.Sut.Subs/TestEventHandler.cs b/test/Eventuous.Sut.Subs/TestEventHandler.cs index 509fa9a9..63be527c 100644 --- a/test/Eventuous.Sut.Subs/TestEventHandler.cs +++ b/test/Eventuous.Sut.Subs/TestEventHandler.cs @@ -13,6 +13,7 @@ public record TestEvent(string Data, int Number) { public class TestEventHandler(TimeSpan? delay = null, ITestOutputHelper? output = null) : BaseEventHandler { readonly TimeSpan _delay = delay ?? TimeSpan.Zero; + readonly string _id = Guid.NewGuid().ToString("N"); public int Count { get; private set; } @@ -27,9 +28,9 @@ public IHypothesis AssertThat() { public Task Validate(TimeSpan timeout) => EnsureHypothesis.Validate(timeout); public override async ValueTask HandleEvent(IMessageConsumeContext context) { - output?.WriteLine(context.Message!.ToString()); await Task.Delay(_delay); await EnsureHypothesis.Test(context.Message!, context.CancellationToken); + output?.WriteLine($"[{_id}] Handled event {context.GlobalPosition}, count is {Count}"); Count++; return EventHandlingStatus.Success; From 5e52912e9c010886ac59f7676ef9f5f49b56a98c Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Mon, 9 Oct 2023 18:05:35 +0200 Subject: [PATCH 2/2] Don't reuse schema --- .../Fixtures/IntegrationFixture.cs | 6 +++--- .../Fixtures/SubscriptionFixture.cs | 4 ++-- .../Subscriptions/SubscribeToAll.cs | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/IntegrationFixture.cs b/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/IntegrationFixture.cs index 5d950955..0760c4e6 100644 --- a/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/IntegrationFixture.cs +++ b/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/IntegrationFixture.cs @@ -36,8 +36,8 @@ public sealed class IntegrationFixture : IAsyncLifetime { public async Task InitializeAsync() { _sqlServer = new SqlEdgeBuilder() .WithImage("mcr.microsoft.com/azure-sql-edge:latest") - .WithAutoRemove(false) - .WithCleanUp(false) + // .WithAutoRemove(false) + // .WithCleanUp(false) .Build(); await _sqlServer.StartAsync(); @@ -55,7 +55,7 @@ public async Task InitializeAsync() { } public async Task DisposeAsync() { - // await _sqlServer.DisposeAsync(); + await _sqlServer.DisposeAsync(); _listener.Dispose(); } } diff --git a/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/SubscriptionFixture.cs b/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/SubscriptionFixture.cs index 8b785d57..d0907ac1 100644 --- a/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/SubscriptionFixture.cs +++ b/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/SubscriptionFixture.cs @@ -30,7 +30,7 @@ protected SubscriptionFixture( _fixture = fixture; _subscribeToAll = subscribeToAll; Stream = new StreamName(fixture.Auto.Create()); - SchemaName = fixture.SchemaName; + SchemaName = fixture.GetSchemaName(); _loggerFactory = TestHelpers.Logging.GetLoggerFactory(output, logLevel); _listener = new LoggingEventListener(_loggerFactory); SubscriptionId = $"test-{Guid.NewGuid():N}"; @@ -52,7 +52,7 @@ protected SubscriptionFixture( readonly ILoggerFactory _loggerFactory; public virtual async Task InitializeAsync() { - this.Schema = new Schema(SchemaName); + Schema = new Schema(SchemaName); await Schema.CreateSchema(_fixture.GetConnection); CheckpointStoreOptions = new SqlServerCheckpointStoreOptions { Schema = SchemaName }; diff --git a/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeToAll.cs b/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeToAll.cs index a90264ab..a24f021d 100644 --- a/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeToAll.cs +++ b/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeToAll.cs @@ -30,7 +30,7 @@ public async Task ShouldConsumeProducedEvents() { await Start(); await Handler.Validate(2.Seconds()); - Handler.Count.Should().Be(10); + Handler.Count.Should().Be(testEvents.Count); await Stop(); }