From 2ec455041a57a8135a0eb19e6e88cc93e4b86ed5 Mon Sep 17 00:00:00 2001 From: Valerio Balsamo Date: Mon, 8 Dec 2025 16:32:36 +0000 Subject: [PATCH 1/3] Map JSON metadata to Kafka message headers, add unit tests --- Kurrent.Replicator.slnx | 1 + .../KafkaHeadersBuilder.cs | 39 ++++ src/Kurrent.Replicator.Kafka/KafkaWriter.cs | 8 +- .../KafkaHeaderBuilderTests.cs | 74 ++++++ .../KafkaWriterTests.cs | 219 ++++++++++++++++++ .../Kurrent.Replicator.Kafka.Tests.csproj | 30 +++ 6 files changed, 369 insertions(+), 2 deletions(-) create mode 100644 src/Kurrent.Replicator.Kafka/KafkaHeadersBuilder.cs create mode 100644 test/Kurrent.Replicator.Kafka.Tests/KafkaHeaderBuilderTests.cs create mode 100644 test/Kurrent.Replicator.Kafka.Tests/KafkaWriterTests.cs create mode 100644 test/Kurrent.Replicator.Kafka.Tests/Kurrent.Replicator.Kafka.Tests.csproj diff --git a/Kurrent.Replicator.slnx b/Kurrent.Replicator.slnx index 145ea9c5..a1643eb6 100644 --- a/Kurrent.Replicator.slnx +++ b/Kurrent.Replicator.slnx @@ -1,5 +1,6 @@ + diff --git a/src/Kurrent.Replicator.Kafka/KafkaHeadersBuilder.cs b/src/Kurrent.Replicator.Kafka/KafkaHeadersBuilder.cs new file mode 100644 index 00000000..af284bc9 --- /dev/null +++ b/src/Kurrent.Replicator.Kafka/KafkaHeadersBuilder.cs @@ -0,0 +1,39 @@ +// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements. +// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md). + +using System.Text; +using System.Text.Json; +using Kurrent.Replicator.Shared.Contracts; +using Kurrent.Replicator.Shared.Logging; + +namespace Kurrent.Replicator.Kafka; + +public static class KafkaHeadersBuilder { + static readonly ILog Log = LogProvider.GetCurrentClassLogger(); + + public static Headers? BuildHeaders(string contentType, byte[]? metadata) { + if (contentType != ContentTypes.Json || metadata is not { Length: > 0 }) + return null; + + try { + using var doc = JsonDocument.Parse(metadata); + var root = doc.RootElement; + if (root.ValueKind != JsonValueKind.Object) + return null; + + var headers = new Headers(); + foreach (var prop in root.EnumerateObject()) { + var valueBytes = prop.Value.ValueKind switch { + JsonValueKind.String => Encoding.UTF8.GetBytes(prop.Value.GetString() ?? string.Empty), + _ => Encoding.UTF8.GetBytes(prop.Value.GetRawText()) + }; + headers.Add(prop.Name, valueBytes); + } + + return headers.Count > 0 ? headers : null; + } catch (JsonException e) { + Log.Warn("Malformed json, skipping metadata to header mapping", e); + return null; + } + } +} diff --git a/src/Kurrent.Replicator.Kafka/KafkaWriter.cs b/src/Kurrent.Replicator.Kafka/KafkaWriter.cs index a60426a7..592ac936 100644 --- a/src/Kurrent.Replicator.Kafka/KafkaWriter.cs +++ b/src/Kurrent.Replicator.Kafka/KafkaWriter.cs @@ -50,12 +50,16 @@ async Task Append(ProposedEvent p) { ] ); - // TODO: Map meta to headers, but only for JSON var message = new Message { Key = partitionKey, Value = p.Data }; - + + // Map metadata to Kafka headers when the payload is JSON + var headers = KafkaHeadersBuilder.BuildHeaders(p.EventDetails.ContentType, p.Metadata); + if (headers is { Count: > 0 }) + message.Headers = headers; + var result = await _producer.ProduceAsync(topic, message, cancellationToken).ConfigureAwait(false); return result.Offset.Value; diff --git a/test/Kurrent.Replicator.Kafka.Tests/KafkaHeaderBuilderTests.cs b/test/Kurrent.Replicator.Kafka.Tests/KafkaHeaderBuilderTests.cs new file mode 100644 index 00000000..1da997f5 --- /dev/null +++ b/test/Kurrent.Replicator.Kafka.Tests/KafkaHeaderBuilderTests.cs @@ -0,0 +1,74 @@ +using System.Text.Json; +using Kurrent.Replicator.Shared.Contracts; +using NUnit.Framework; + +namespace Kurrent.Replicator.Kafka.Tests; + +[TestFixture] +public class KafkaHeadersBuilderMetadataHeaderTests { + [Test] + public void Should_map_top_level_json_properties_to_headers() { + var metadataObj = new { + s = "hello", + n = 123, + b = true, + obj = new { a = 1 }, + arr = new[] { 1, 2 } + }; + + var metadataBytes = JsonSerializer.SerializeToUtf8Bytes(metadataObj); + + var headers = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Json, metadataBytes); + + Assert.That(headers, Is.Not.Null); + Assert.That(headers!.Count, Is.EqualTo(5)); + + var sHeader = headers.First(h => h.Key == "s"); + var nHeader = headers.First(h => h.Key == "n"); + var bHeader = headers.First(h => h.Key == "b"); + var objHeader = headers.First(h => h.Key == "obj"); + var arrHeader = headers.First(h => h.Key == "arr"); + + Assert.That(sHeader.GetValueBytes(), Is.EqualTo("hello"u8.ToArray())); + Assert.That(nHeader.GetValueBytes(), Is.EqualTo("123"u8.ToArray())); + Assert.That(bHeader.GetValueBytes(), Is.EqualTo("true"u8.ToArray())); + Assert.That(objHeader.GetValueBytes(), Is.EqualTo("{\"a\":1}"u8.ToArray())); + Assert.That(arrHeader.GetValueBytes(), Is.EqualTo("[1,2]"u8.ToArray())); + } + + [Test] + public void Should_ignore_when_content_type_not_json() { + var metadata = "{\"x\":1}"u8.ToArray(); + var headers = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Binary, metadata); + Assert.That(headers, Is.Null); + } + + [Test] + public void Should_ignore_when_metadata_is_null_or_empty() { + var nullHeaders = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Json, null); + var emptyHeaders = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Json, Array.Empty()); + Assert.That(nullHeaders, Is.Null); + Assert.That(emptyHeaders, Is.Null); + } + + [Test] + public void Should_ignore_when_malformed_json() { + var bad = "{not json}"u8.ToArray(); + var headers = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Json, bad); + Assert.That(headers, Is.Null); + } + + [Test] + public void Should_ignore_when_root_is_not_object() { + var arr = "[1,2,3]"u8.ToArray(); + var headers = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Json, arr); + Assert.That(headers, Is.Null); + } + + [Test] + public void Should_ignore_when_object_has_no_properties() { + var emptyObj = "{}"u8.ToArray(); + var headers = KafkaHeadersBuilder.BuildHeaders(ContentTypes.Json, emptyObj); + Assert.That(headers, Is.Null); + } +} diff --git a/test/Kurrent.Replicator.Kafka.Tests/KafkaWriterTests.cs b/test/Kurrent.Replicator.Kafka.Tests/KafkaWriterTests.cs new file mode 100644 index 00000000..586d846b --- /dev/null +++ b/test/Kurrent.Replicator.Kafka.Tests/KafkaWriterTests.cs @@ -0,0 +1,219 @@ +#nullable enable +using System.Reflection; +using System.Text.Json; +using Confluent.Kafka; +using Kurrent.Replicator.Shared.Observe; +using Kurrent.Replicator.Shared.Contracts; +using NUnit.Framework; +using Ubiquitous.Metrics; + +namespace Kurrent.Replicator.Kafka.Tests; + +[TestFixture] +public class KafkaWriterHeadersTests { + [OneTimeSetUp] + public void OneTimeSetup() { + // Configure metrics to avoid NREs in Metrics.Measure during tests + ReplicationMetrics.Configure(Metrics.CreateUsing()); + } + + [Test] + public async Task Should_map_metadata_to_headers_for_json_events() { + var writer = CreateWriterWithFakeProducer(out var producer); + + var metadataObj = new { + s = "hello", + n = 123, + b = true, + obj = new { a = 1 }, + arr = new[] { 1, 2 } + }; + + var metadataBytes = JsonSerializer.SerializeToUtf8Bytes(metadataObj); + + var proposedEvent = new ProposedEvent( + new("category-stream", Guid.NewGuid(), "TestEvent", ContentTypes.Json), + "data"u8.ToArray(), + metadataBytes, + new(0L, 0UL), + 0L + ); + + var result = await writer.WriteEvent(proposedEvent, CancellationToken.None); + + Assert.That(result, Is.EqualTo(42)); + + Assert.That(producer.LastTopic, Is.EqualTo("category")); + Assert.That(producer.LastMessage, Is.Not.Null); + + var headers = producer.LastMessage!.Headers; + Assert.That(headers, Is.Not.Null); + Assert.That(headers!.Count, Is.EqualTo(5)); + + var sHeader = headers.First(h => h.Key == "s"); + var nHeader = headers.First(h => h.Key == "n"); + var bHeader = headers.First(h => h.Key == "b"); + var objHeader = headers.First(h => h.Key == "obj"); + var arrHeader = headers.First(h => h.Key == "arr"); + + Assert.That(sHeader.GetValueBytes(), Is.EqualTo("hello"u8.ToArray())); + Assert.That(nHeader.GetValueBytes(), Is.EqualTo("123"u8.ToArray())); + Assert.That(bHeader.GetValueBytes(), Is.EqualTo("true"u8.ToArray())); + Assert.That(objHeader.GetValueBytes(), Is.EqualTo("{\"a\":1}"u8.ToArray())); + Assert.That(arrHeader.GetValueBytes(), Is.EqualTo("[1,2]"u8.ToArray())); + } + + [Test] + public async Task Should_not_set_headers_for_non_json_content_type() { + var writer = CreateWriterWithFakeProducer(out var producer); + + var metadata = "{\"x\":1}"u8.ToArray(); + + var proposedEvent = new ProposedEvent( + new("stream-1", Guid.NewGuid(), "TestEvent", ContentTypes.Binary), + "data"u8.ToArray(), + metadata, + new(0L, 0UL), + 0L + ); + + await writer.WriteEvent(proposedEvent, CancellationToken.None); + + Assert.That(producer.LastMessage, Is.Not.Null); + Assert.That(producer.LastMessage!.Headers, Is.Null); + } + + [Test] + public async Task Should_not_set_headers_when_metadata_is_null_or_empty() { + var writer = CreateWriterWithFakeProducer(out var producer); + + var withNullMetadata = new ProposedEvent( + new("stream-1", Guid.NewGuid(), "TestEvent", ContentTypes.Json), + "data"u8.ToArray(), + null, + new(0L, 0UL), + 0L + ); + + await writer.WriteEvent(withNullMetadata, CancellationToken.None); + + Assert.That(producer.LastMessage, Is.Not.Null); + Assert.That(producer.LastMessage!.Headers, Is.Null); + + var withEmptyMetadata = new ProposedEvent( + new("stream-1", Guid.NewGuid(), "TestEvent", ContentTypes.Json), + "data"u8.ToArray(), + Array.Empty(), + new(0L, 0UL), + 0L + ); + + await writer.WriteEvent(withEmptyMetadata, CancellationToken.None); + + Assert.That(producer.LastMessage, Is.Not.Null); + Assert.That(producer.LastMessage!.Headers, Is.Null); + } + + [Test] + public async Task Should_not_set_headers_when_metadata_is_malformed_json() { + var writer = CreateWriterWithFakeProducer(out var producer); + + var badMetadata = "{not json}"u8.ToArray(); + + var proposedEvent = new ProposedEvent( + new("stream-1", Guid.NewGuid(), "TestEvent", ContentTypes.Json), + "data"u8.ToArray(), + badMetadata, + new(0L, 0UL), + 0L + ); + + await writer.WriteEvent(proposedEvent, CancellationToken.None); + + Assert.That(producer.LastMessage, Is.Not.Null); + Assert.That(producer.LastMessage!.Headers, Is.Null); + } + + static KafkaWriter CreateWriterWithFakeProducer(out CapturingProducer producer) { + var config = new ProducerConfig { + BootstrapServers = "dummy:9092" + }; + + var writer = new KafkaWriter(config, null); + + var field = typeof(KafkaWriter).GetField("_producer", BindingFlags.Instance | BindingFlags.NonPublic); + Assert.That(field, Is.Not.Null); + + producer = new(); + field!.SetValue(writer, producer); + + return writer; + } + + class CapturingProducer : IProducer { + public string? LastTopic { get; private set; } + public Message? LastMessage { get; private set; } + + public string Name => "capturing-producer"; + + public Handle Handle => throw new NotSupportedException(); + + public Task> ProduceAsync(string topic, Message message, CancellationToken cancellationToken = default) { + LastTopic = topic; + LastMessage = message; + + var result = new DeliveryResult { + Topic = topic, + Message = message, + Offset = new(42), + Partition = new(0), + Status = PersistenceStatus.Persisted + }; + + return Task.FromResult(result); + } + + public Task> ProduceAsync(TopicPartition topicPartition, Message message, CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public void Produce(string topic, Message message, Action>? deliveryHandler = null) + => throw new NotSupportedException(); + + public void Produce(TopicPartition topicPartition, Message message, Action>? deliveryHandler = null) + => throw new NotSupportedException(); + + public int Flush(TimeSpan timeout) => 0; + + public void Flush(CancellationToken cancellationToken) { } + + public int Poll(TimeSpan timeout) => 0; + + public int AddBrokers(string brokers) => 0; + + public Metadata GetMetadata(string topic, TimeSpan timeout) => throw new NotSupportedException(); + + public Metadata GetMetadata(TimeSpan timeout) => throw new NotSupportedException(); + + public void InitTransactions(TimeSpan timeout) => throw new NotSupportedException(); + + public void BeginTransaction() => throw new NotSupportedException(); + + public void CommitTransaction(TimeSpan timeout) => throw new NotSupportedException(); + + public void CommitTransaction() => throw new NotSupportedException(); + + public void AbortTransaction(TimeSpan timeout) => throw new NotSupportedException(); + + public void AbortTransaction() => throw new NotSupportedException(); + + public void SendOffsetsToTransaction(IEnumerable offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout) + => throw new NotSupportedException(); + + public void SendOffsetsToTransaction(IEnumerable offsets, IConsumerGroupMetadata groupMetadata, CancellationToken cancellationToken = default) + => throw new NotSupportedException(); + + public void SetSaslCredentials(string username, string password) => throw new NotSupportedException(); + + public void Dispose() { } + } +} diff --git a/test/Kurrent.Replicator.Kafka.Tests/Kurrent.Replicator.Kafka.Tests.csproj b/test/Kurrent.Replicator.Kafka.Tests/Kurrent.Replicator.Kafka.Tests.csproj new file mode 100644 index 00000000..6dc0d36f --- /dev/null +++ b/test/Kurrent.Replicator.Kafka.Tests/Kurrent.Replicator.Kafka.Tests.csproj @@ -0,0 +1,30 @@ + + + + $([System.IO.Directory]::GetParent($(MSBuildThisFileDirectory)).Parent.FullName) + net9.0 + latest + false + enable + true + true + true + true + --report-trx --results-directory $(RepoRoot)/test-results/$(TargetFramework) + false + + + + + + + + + + + + + + + + From bf82dad3520cdcd1c534b23fc390fbbdd6b53c1f Mon Sep 17 00:00:00 2001 From: Valerio Balsamo Date: Thu, 11 Dec 2025 19:16:48 +0000 Subject: [PATCH 2/3] Update config --- src/Kurrent.Replicator/Partitioning/PartitionChannel.cs | 2 ++ src/replicator/Settings/ReplicatorSettings.cs | 2 +- src/replicator/Startup.cs | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Kurrent.Replicator/Partitioning/PartitionChannel.cs b/src/Kurrent.Replicator/Partitioning/PartitionChannel.cs index 280dfe1c..8f2de178 100644 --- a/src/Kurrent.Replicator/Partitioning/PartitionChannel.cs +++ b/src/Kurrent.Replicator/Partitioning/PartitionChannel.cs @@ -33,6 +33,8 @@ async Task Reader(ChannelReader reader) { if (context == null || pipe == null) throw new InvalidCastException("Wrong context type, expected SinkContext"); + Log.Warn("debug 0 === {proposedSequence} {writeSequence}", context.ProposedEvent.SequenceNumber, _writeSequence); + if (context.ProposedEvent.SequenceNumber < _writeSequence) Log.Warn("Wrong sequence for {Type}", context.ProposedEvent.EventDetails.EventType); _writeSequence = context.ProposedEvent.SequenceNumber; diff --git a/src/replicator/Settings/ReplicatorSettings.cs b/src/replicator/Settings/ReplicatorSettings.cs index bf89e68e..9a02e1bd 100644 --- a/src/replicator/Settings/ReplicatorSettings.cs +++ b/src/replicator/Settings/ReplicatorSettings.cs @@ -49,7 +49,7 @@ public record Replicator { public bool Scavenge { get; init; } public bool RestartOnFailure { get; init; } = true; public bool RunContinuously { get; init; } = true; - public int RestartDelayInSeconds { get; init; } = 5; + public long RestartDelayInMilliseconds { get; init; } = 5000; public int ReportMetricsFrequencyInSeconds { get; init; } = 5; public Checkpoint Checkpoint { get; init; } = new(); public TransformSettings Transform { get; init; } = new(); diff --git a/src/replicator/Startup.cs b/src/replicator/Startup.cs index 59bbee3b..cddef399 100644 --- a/src/replicator/Startup.cs +++ b/src/replicator/Startup.cs @@ -64,7 +64,7 @@ public static void ConfigureServices(WebApplicationBuilder builder) { new ReplicatorOptions( replicatorOptions.RestartOnFailure, replicatorOptions.RunContinuously, - TimeSpan.FromSeconds(replicatorOptions.RestartDelayInSeconds), + TimeSpan.FromMilliseconds(replicatorOptions.RestartDelayInMilliseconds), TimeSpan.FromSeconds(replicatorOptions.ReportMetricsFrequencyInSeconds) ) ); From 050a8dee7c5b9e79f1ae1a5391d8f58c6a83d0fe Mon Sep 17 00:00:00 2001 From: Valerio Balsamo Date: Thu, 11 Dec 2025 20:08:45 +0000 Subject: [PATCH 3/3] Fix checkpoint handling and improve partition sequencing diagnostics --- .../TcpEventReader.cs | 63 +++++-- src/Kurrent.Replicator.Http/HttpTransform.cs | 7 +- src/Kurrent.Replicator.Js/JsTransform.cs | 5 +- src/Kurrent.Replicator.Kafka/KafkaWriter.cs | 22 ++- .../GrpcEventReader.cs | 59 +++++-- .../MongoCheckpointStore.cs | 12 ++ .../Contracts/OriginalEvent.cs | 32 ++-- .../Contracts/ProposedEvent.cs | 31 ++-- .../Observe/ReplicationDebugOptions.cs | 14 ++ .../Observe/ReplicationRun.cs | 20 +++ .../Pipeline/Transforms.cs | 3 +- src/Kurrent.Replicator/FileCheckpointStore.cs | 63 ++++++- .../Partitioning/HashPartitioner.cs | 4 +- .../Partitioning/PartitionChannel.cs | 154 +++++++++++++++++- .../Partitioning/ValuePartitioner.cs | 2 +- .../Prepare/PrepareContext.cs | 3 +- .../Prepare/TransformFilter.cs | 9 +- src/Kurrent.Replicator/Replicator.cs | 16 +- src/Kurrent.Replicator/Sink/SinkPipe.cs | 2 +- .../Sink/SinkPipelineOptions.cs | 3 +- src/replicator/Settings/ReplicatorSettings.cs | 19 ++- src/replicator/Startup.cs | 27 ++- src/replicator/replicator.csproj | 12 +- .../KafkaWriterTests.cs | 15 +- 24 files changed, 489 insertions(+), 108 deletions(-) create mode 100644 src/Kurrent.Replicator.Shared/Observe/ReplicationDebugOptions.cs create mode 100644 src/Kurrent.Replicator.Shared/Observe/ReplicationRun.cs diff --git a/src/Kurrent.Replicator.EventStore/TcpEventReader.cs b/src/Kurrent.Replicator.EventStore/TcpEventReader.cs index 86a0d1e7..ceb17831 100644 --- a/src/Kurrent.Replicator.EventStore/TcpEventReader.cs +++ b/src/Kurrent.Replicator.EventStore/TcpEventReader.cs @@ -79,22 +79,49 @@ public async Task ReadEvents(LogPosition fromLogPosition, Func()) { + var replicationMessageId = Guid.NewGuid(); + + var eventType = sliceEvent.Event.EventType; + var isMetadata = eventType.StartsWith("$"); + if (sliceEvent.Event.EventType.StartsWith('$') && sliceEvent.Event.EventType != Predefined.MetadataEventType) { - await next(MapIgnored(sliceEvent, sequence++, activity)).ConfigureAwait(false); + await next(MapIgnored(sliceEvent, sequence++, activity, replicationMessageId)).ConfigureAwait(false); continue; } - if (Log.IsDebugEnabled()) + if (Log.IsDebugEnabled()) { Log.Debug( - "TCP: Read event with id {Id} of type {Type} from {Stream} at {Position}", + "TCP: Read event with id {Id} of type {Type} from {Stream} at {Position}, ReplicationMessageId={ReplicationMessageId}", sliceEvent.Event.EventId, - sliceEvent.Event.EventType, + eventType, sliceEvent.OriginalStreamId, - sliceEvent.OriginalPosition + sliceEvent.OriginalPosition, + replicationMessageId ); + if (ReplicationDebugOptions.DebugPartitionSequences) { + Log.Debug( + "Reader sequence. Position={Position}, EventType={EventType}, EventId={EventId}, Stream={Stream}, ReplicationMessageId={ReplicationMessageId}", + sliceEvent.OriginalPosition, + eventType, + sliceEvent.Event.EventId, + sliceEvent.OriginalStreamId, + replicationMessageId + ); + + Log.Debug( + "Event classification. Type={EventType}, IsMetadata={IsMetadata}, Stream={Stream}, Position={Position}, ReplicationMessageId={ReplicationMessageId}", + eventType, + isMetadata, + sliceEvent.OriginalStreamId, + sliceEvent.OriginalPosition, + replicationMessageId + ); + } + } + if (sliceEvent.Event.EventType == Predefined.MetadataEventType) { if (sliceEvent.Event.EventStreamId.StartsWith('$')) continue; @@ -102,10 +129,10 @@ public async Task ReadEvents(LogPosition fromLogPosition, Func Filter(BaseOriginalEvent originalEvent) => _filter.Filter(originalEvent); - static IgnoredOriginalEvent MapIgnored(ResolvedEvent evt, int sequence, Activity activity) + static IgnoredOriginalEvent MapIgnored(ResolvedEvent evt, int sequence, Activity activity, Guid replicationMessageId) => new( evt.OriginalEvent.Created, MapDetails(evt.OriginalEvent, evt.OriginalEvent.IsJson), MapPosition(evt), sequence, - new(activity.TraceId, activity.SpanId) + new(activity.TraceId, activity.SpanId), + replicationMessageId ); - static OriginalEvent Map(ResolvedEvent evt, int sequence, Activity activity) + static OriginalEvent Map(ResolvedEvent evt, int sequence, Activity activity, Guid replicationMessageId) => new( evt.OriginalEvent.Created, MapDetails(evt.OriginalEvent, evt.OriginalEvent.IsJson), @@ -156,10 +184,11 @@ static OriginalEvent Map(ResolvedEvent evt, int sequence, Activity activity) evt.OriginalEvent.Metadata, MapPosition(evt), sequence, - new(activity.TraceId, activity.SpanId) + new(activity.TraceId, activity.SpanId), + replicationMessageId ); - static StreamMetadataOriginalEvent MapMetadata(ResolvedEvent evt, int sequence, Activity activity) { + static StreamMetadataOriginalEvent MapMetadata(ResolvedEvent evt, int sequence, Activity activity, Guid replicationMessageId) { var streamMeta = StreamMetadata.FromJsonBytes(evt.OriginalEvent.Data); return new( @@ -180,17 +209,19 @@ static StreamMetadataOriginalEvent MapMetadata(ResolvedEvent evt, int sequence, ), MapPosition(evt), sequence, - new(activity.TraceId, activity.SpanId) + new(activity.TraceId, activity.SpanId), + replicationMessageId ); } - static StreamDeletedOriginalEvent MapStreamDeleted(ResolvedEvent evt, int sequence, Activity activity) + static StreamDeletedOriginalEvent MapStreamDeleted(ResolvedEvent evt, int sequence, Activity activity, Guid replicationMessageId) => new( evt.OriginalEvent.Created, MapSystemDetails(evt.OriginalEvent), MapPosition(evt), sequence, - new(activity.TraceId, activity.SpanId) + new(activity.TraceId, activity.SpanId), + replicationMessageId ); static EventDetails MapDetails(RecordedEvent evt, bool isJson) => diff --git a/src/Kurrent.Replicator.Http/HttpTransform.cs b/src/Kurrent.Replicator.Http/HttpTransform.cs index ce0aedd6..30c3030b 100644 --- a/src/Kurrent.Replicator.Http/HttpTransform.cs +++ b/src/Kurrent.Replicator.Http/HttpTransform.cs @@ -29,7 +29,7 @@ public async ValueTask Transform(OriginalEvent originalEvent, } if (response.StatusCode == HttpStatusCode.NoContent) { - return new IgnoredEvent(originalEvent.EventDetails, originalEvent.LogPosition, originalEvent.SequenceNumber); + return new IgnoredEvent(originalEvent.EventDetails, originalEvent.LogPosition, originalEvent.SequenceNumber, originalEvent.ReplicationMessageId); } var httpResponse = (await JsonSerializer.DeserializeAsync( @@ -45,10 +45,11 @@ originalEvent.EventDetails with { Encoding.UTF8.GetBytes(httpResponse.Payload), httpResponse.Metadata == null ? originalEvent.Metadata : Encoding.UTF8.GetBytes(httpResponse.Metadata), originalEvent.LogPosition, - originalEvent.SequenceNumber + originalEvent.SequenceNumber, + originalEvent.ReplicationMessageId ); } catch (OperationCanceledException) { - return new NoEvent(originalEvent.EventDetails, originalEvent.LogPosition, originalEvent.SequenceNumber); + return new NoEvent(originalEvent.EventDetails, originalEvent.LogPosition, originalEvent.SequenceNumber, originalEvent.ReplicationMessageId); } } diff --git a/src/Kurrent.Replicator.Js/JsTransform.cs b/src/Kurrent.Replicator.Js/JsTransform.cs index 7f266389..92f0dd32 100644 --- a/src/Kurrent.Replicator.Js/JsTransform.cs +++ b/src/Kurrent.Replicator.Js/JsTransform.cs @@ -77,13 +77,14 @@ public ValueTask Transform(OriginalEvent original, Cancellati ); BaseProposedEvent evt = result == null - ? new IgnoredEvent(original.EventDetails, original.LogPosition, original.SequenceNumber) + ? new IgnoredEvent(original.EventDetails, original.LogPosition, original.SequenceNumber, original.ReplicationMessageId) : new ProposedEvent( original.EventDetails with { Stream = result.Stream, EventType = result.EventType }, result.Data, result.Meta, original.LogPosition, - original.SequenceNumber + original.SequenceNumber, + original.ReplicationMessageId ); return new(evt); diff --git a/src/Kurrent.Replicator.Kafka/KafkaWriter.cs b/src/Kurrent.Replicator.Kafka/KafkaWriter.cs index 592ac936..0e49cff3 100644 --- a/src/Kurrent.Replicator.Kafka/KafkaWriter.cs +++ b/src/Kurrent.Replicator.Kafka/KafkaWriter.cs @@ -40,15 +40,19 @@ public Task WriteEvent(BaseProposedEvent proposedEvent, CancellationToken async Task Append(ProposedEvent p) { var (topic, partitionKey) = _route(p); - _debug?.Invoke( - "Kafka: Write event with id {Id} of type {Type} to {Stream} with original position {Position}", - [ - proposedEvent.EventDetails.EventId, - proposedEvent.EventDetails.EventType, - topic, - proposedEvent.SourceLogPosition.EventPosition - ] - ); + if (ReplicationDebugOptions.DebugPartitionSequences && _debug != null) { + _debug( + "Kafka: Write event with id {Id} of type {Type} to {Stream} with original position {Position}, ReplicationMessageId={ReplicationMessageId}, PartitionKey={PartitionKey}", + [ + proposedEvent.EventDetails.EventId, + proposedEvent.EventDetails.EventType, + topic, + proposedEvent.SourceLogPosition.EventPosition, + proposedEvent.ReplicationMessageId, + partitionKey + ] + ); + } var message = new Message { Key = partitionKey, diff --git a/src/Kurrent.Replicator.KurrentDb/GrpcEventReader.cs b/src/Kurrent.Replicator.KurrentDb/GrpcEventReader.cs index 77c7adb7..ebb235f2 100644 --- a/src/Kurrent.Replicator.KurrentDb/GrpcEventReader.cs +++ b/src/Kurrent.Replicator.KurrentDb/GrpcEventReader.cs @@ -62,30 +62,55 @@ public async Task ReadEvents(LogPosition fromLogPosition, Func new( evt.Event.Created, MapDetails(evt.Event), MapPosition(evt), sequence, - new(activity.TraceId, activity.SpanId) + new(activity.TraceId, activity.SpanId), + replicationMessageId ); - static OriginalEvent Map(ResolvedEvent evt, int sequence, Activity activity) + static OriginalEvent Map(ResolvedEvent evt, int sequence, Activity activity, Guid replicationMessageId) => new( evt.OriginalEvent.Created, MapDetails(evt.OriginalEvent), @@ -122,10 +148,11 @@ static OriginalEvent Map(ResolvedEvent evt, int sequence, Activity activity) evt.OriginalEvent.Metadata.ToArray(), MapPosition(evt), sequence, - new(activity.TraceId, activity.SpanId) + new(activity.TraceId, activity.SpanId), + replicationMessageId ); - static StreamMetadataOriginalEvent MapMetadata(ResolvedEvent evt, int sequence, Activity activity) { + static StreamMetadataOriginalEvent MapMetadata(ResolvedEvent evt, int sequence, Activity activity, Guid replicationMessageId) { var streamMeta = JsonSerializer.Deserialize( evt.Event.Data.Span, MetaSerialization.StreamMetadataJsonSerializerOptions @@ -149,17 +176,19 @@ static StreamMetadataOriginalEvent MapMetadata(ResolvedEvent evt, int sequence, ), MapPosition(evt), sequence, - new(activity.TraceId, activity.SpanId) + new(activity.TraceId, activity.SpanId), + replicationMessageId ); } - static StreamDeletedOriginalEvent MapStreamDeleted(ResolvedEvent evt, int sequence, Activity activity) + static StreamDeletedOriginalEvent MapStreamDeleted(ResolvedEvent evt, int sequence, Activity activity, Guid replicationMessageId) => new( evt.OriginalEvent.Created, MapSystemDetails(evt.OriginalEvent), MapPosition(evt), sequence, - new(activity.TraceId, activity.SpanId) + new(activity.TraceId, activity.SpanId), + replicationMessageId ); static EventDetails MapDetails(EventRecord evt) => diff --git a/src/Kurrent.Replicator.Mongo/MongoCheckpointStore.cs b/src/Kurrent.Replicator.Mongo/MongoCheckpointStore.cs index 83e87539..0ed78eb7 100644 --- a/src/Kurrent.Replicator.Mongo/MongoCheckpointStore.cs +++ b/src/Kurrent.Replicator.Mongo/MongoCheckpointStore.cs @@ -59,6 +59,18 @@ public async ValueTask LoadCheckpoint(CancellationToken cancellatio } public async ValueTask StoreCheckpoint(LogPosition logPosition, CancellationToken cancellationToken) { + // Ensure checkpoint never moves backwards within a run. + if (_lastPosition != null && logPosition.EventPosition < _lastPosition.EventPosition) { + Log.Error( + "Attempt to move checkpoint backwards. Current={Current}, New={New}", + _lastPosition, + logPosition + ); + + // Ignore the backwards update to maintain monotonicity. + return; + } + _lastPosition = logPosition; Interlocked.Increment(ref _counter); diff --git a/src/Kurrent.Replicator.Shared/Contracts/OriginalEvent.cs b/src/Kurrent.Replicator.Shared/Contracts/OriginalEvent.cs index 8e4c074e..a79ecfd3 100644 --- a/src/Kurrent.Replicator.Shared/Contracts/OriginalEvent.cs +++ b/src/Kurrent.Replicator.Shared/Contracts/OriginalEvent.cs @@ -7,8 +7,16 @@ public abstract record BaseOriginalEvent( EventDetails EventDetails, LogPosition LogPosition, long SequenceNumber, - TracingMetadata TracingMetadata - ); + TracingMetadata TracingMetadata, + Guid ReplicationMessageId + ) { + /// + /// Indicates whether this event is a metadata/system event (e.g. $>, $@) as opposed to a domain event. + /// Computed on demand from the so that existing record constructors + /// do not need to change. + /// + public bool IsMetadata => EventDetails.EventType.StartsWith("$"); +} public record OriginalEvent( DateTimeOffset Created, @@ -17,8 +25,9 @@ public record OriginalEvent( byte[]? Metadata, LogPosition LogPosition, long SequenceNumber, - TracingMetadata TracingMetadata - ) : BaseOriginalEvent(Created, EventDetails, LogPosition, SequenceNumber, TracingMetadata); + TracingMetadata TracingMetadata, + Guid ReplicationMessageId + ) : BaseOriginalEvent(Created, EventDetails, LogPosition, SequenceNumber, TracingMetadata, ReplicationMessageId); public record StreamMetadataOriginalEvent( DateTimeOffset Created, @@ -26,22 +35,25 @@ public record StreamMetadataOriginalEvent( StreamMetadata Data, LogPosition LogPosition, long SequenceNumber, - TracingMetadata TracingMetadata - ) : BaseOriginalEvent(Created, EventDetails, LogPosition, SequenceNumber, TracingMetadata); + TracingMetadata TracingMetadata, + Guid ReplicationMessageId + ) : BaseOriginalEvent(Created, EventDetails, LogPosition, SequenceNumber, TracingMetadata, ReplicationMessageId); public record StreamDeletedOriginalEvent( DateTimeOffset Created, EventDetails EventDetails, LogPosition LogPosition, long SequenceNumber, - TracingMetadata TracingMetadata - ) : BaseOriginalEvent(Created, EventDetails, LogPosition, SequenceNumber, TracingMetadata); + TracingMetadata TracingMetadata, + Guid ReplicationMessageId + ) : BaseOriginalEvent(Created, EventDetails, LogPosition, SequenceNumber, TracingMetadata, ReplicationMessageId); public record IgnoredOriginalEvent( DateTimeOffset Created, EventDetails EventDetails, LogPosition LogPosition, long SequenceNumber, - TracingMetadata TracingMetadata + TracingMetadata TracingMetadata, + Guid ReplicationMessageId ) - : BaseOriginalEvent(Created, EventDetails, LogPosition, SequenceNumber, TracingMetadata); + : BaseOriginalEvent(Created, EventDetails, LogPosition, SequenceNumber, TracingMetadata, ReplicationMessageId); diff --git a/src/Kurrent.Replicator.Shared/Contracts/ProposedEvent.cs b/src/Kurrent.Replicator.Shared/Contracts/ProposedEvent.cs index a3d99e40..2efda243 100644 --- a/src/Kurrent.Replicator.Shared/Contracts/ProposedEvent.cs +++ b/src/Kurrent.Replicator.Shared/Contracts/ProposedEvent.cs @@ -2,28 +2,37 @@ namespace Kurrent.Replicator.Shared.Contracts; -public abstract record BaseProposedEvent(EventDetails EventDetails, LogPosition SourceLogPosition, long SequenceNumber); +public abstract record BaseProposedEvent(EventDetails EventDetails, LogPosition SourceLogPosition, long SequenceNumber, Guid ReplicationMessageId) { + /// + /// Indicates whether this proposed event originated from a metadata/system event (e.g. $>, $@) + /// rather than a domain event. Computed from the so that + /// existing record constructors do not need to change. + /// + public bool IsMetadata => EventDetails.EventType.StartsWith("$"); +} public record ProposedEvent( EventDetails EventDetails, byte[] Data, byte[]? Metadata, LogPosition SourceLogPosition, - long SequenceNumber - ) : BaseProposedEvent(EventDetails, SourceLogPosition, SequenceNumber); + long SequenceNumber, + Guid ReplicationMessageId + ) : BaseProposedEvent(EventDetails, SourceLogPosition, SequenceNumber, ReplicationMessageId); public record ProposedMetaEvent( EventDetails EventDetails, StreamMetadata Data, LogPosition SourceLogPosition, - long SequenceNumber - ) : BaseProposedEvent(EventDetails, SourceLogPosition, SequenceNumber); + long SequenceNumber, + Guid ReplicationMessageId + ) : BaseProposedEvent(EventDetails, SourceLogPosition, SequenceNumber, ReplicationMessageId); -public record ProposedDeleteStream(EventDetails EventDetails, LogPosition SourceLogPosition, long SequenceNumber) - : BaseProposedEvent(EventDetails, SourceLogPosition, SequenceNumber); +public record ProposedDeleteStream(EventDetails EventDetails, LogPosition SourceLogPosition, long SequenceNumber, Guid ReplicationMessageId) + : BaseProposedEvent(EventDetails, SourceLogPosition, SequenceNumber, ReplicationMessageId); -public record IgnoredEvent(EventDetails EventDetails, LogPosition SourceLogPosition, long SequenceNumber) - : BaseProposedEvent(EventDetails, SourceLogPosition, SequenceNumber); +public record IgnoredEvent(EventDetails EventDetails, LogPosition SourceLogPosition, long SequenceNumber, Guid ReplicationMessageId) + : BaseProposedEvent(EventDetails, SourceLogPosition, SequenceNumber, ReplicationMessageId); -public record NoEvent(EventDetails EventDetails, LogPosition SourceLogPosition, long SequenceNumber) - : BaseProposedEvent(EventDetails, SourceLogPosition, SequenceNumber); +public record NoEvent(EventDetails EventDetails, LogPosition SourceLogPosition, long SequenceNumber, Guid ReplicationMessageId) + : BaseProposedEvent(EventDetails, SourceLogPosition, SequenceNumber, ReplicationMessageId); diff --git a/src/Kurrent.Replicator.Shared/Observe/ReplicationDebugOptions.cs b/src/Kurrent.Replicator.Shared/Observe/ReplicationDebugOptions.cs new file mode 100644 index 00000000..8a54112b --- /dev/null +++ b/src/Kurrent.Replicator.Shared/Observe/ReplicationDebugOptions.cs @@ -0,0 +1,14 @@ +namespace Kurrent.Replicator.Shared.Observe; + +/// +/// Holds runtime debug switches that control high-verbosity diagnostic logging +/// across the replicator components. +/// +public static class ReplicationDebugOptions { + /// + /// When enabled, partitions, readers and writers will emit detailed + /// per-event sequencing logs (buffer dumps, sequence transitions, etc.). + /// Intended for local troubleshooting only. + /// + public static bool DebugPartitionSequences { get; set; } +} diff --git a/src/Kurrent.Replicator.Shared/Observe/ReplicationRun.cs b/src/Kurrent.Replicator.Shared/Observe/ReplicationRun.cs new file mode 100644 index 00000000..2cb5c81c --- /dev/null +++ b/src/Kurrent.Replicator.Shared/Observe/ReplicationRun.cs @@ -0,0 +1,20 @@ +using System; + +namespace Kurrent.Replicator.Shared.Observe; + +/// +/// Holds contextual information about the current replication run. +/// A new RunId is generated at the start of each Replicator execution +/// and is used to correlate logs across components. +/// +public static class ReplicationRun { + /// + /// Identifier of the current replication run. + /// + public static Guid RunId { get; private set; } + + /// + /// Starts a new replication run by generating a fresh RunId. + /// + public static void StartNew() => RunId = Guid.NewGuid(); +} diff --git a/src/Kurrent.Replicator.Shared/Pipeline/Transforms.cs b/src/Kurrent.Replicator.Shared/Pipeline/Transforms.cs index 4b3ff68b..3cfbed5f 100644 --- a/src/Kurrent.Replicator.Shared/Pipeline/Transforms.cs +++ b/src/Kurrent.Replicator.Shared/Pipeline/Transforms.cs @@ -13,7 +13,8 @@ public static ValueTask DefaultWithExtraMeta(OriginalEvent or originalEvent.Data, AddMeta(), originalEvent.LogPosition, - originalEvent.SequenceNumber + originalEvent.SequenceNumber, + originalEvent.ReplicationMessageId ); return new(proposed); diff --git a/src/Kurrent.Replicator/FileCheckpointStore.cs b/src/Kurrent.Replicator/FileCheckpointStore.cs index f63e1d32..170348b1 100644 --- a/src/Kurrent.Replicator/FileCheckpointStore.cs +++ b/src/Kurrent.Replicator/FileCheckpointStore.cs @@ -1,5 +1,6 @@ using Kurrent.Replicator.Shared; using Kurrent.Replicator.Shared.Logging; +using Kurrent.Replicator.Shared.Observe; namespace Kurrent.Replicator; @@ -33,29 +34,57 @@ public ValueTask HasStoredCheckpoint(CancellationToken cancellationToken) public async ValueTask LoadCheckpoint(CancellationToken cancellationToken) { if (_lastPosition != null) { - Log.Info("Starting from a previously known checkpoint {LastKnown}", _lastPosition); + Log.Info( + "[Run {RunId}] Loading checkpoint {Checkpoint}. Source={Source}", + ReplicationRun.RunId, + _lastPosition, + "InMemory" + ); return _lastPosition; } if (!File.Exists(_fileName)) { - Log.Info("No checkpoint file found, starting from the beginning"); + Log.Info( + "[Run {RunId}] Loading checkpoint {Checkpoint}. Source={Source}", + ReplicationRun.RunId, + LogPosition.Start, + "Default" + ); return LogPosition.Start; } var content = await File.ReadAllTextAsync(_fileName, cancellationToken).ConfigureAwait(false); var numbers = content.Split(',').Select(x => Convert.ToInt64(x)).ToArray(); + var checkpoint = new LogPosition(numbers[0], (ulong)numbers[1]); - Log.Info("Loaded the checkpoint from file: {Checkpoint}", numbers[1]); + Log.Info( + "[Run {RunId}] Loading checkpoint {Checkpoint}. Source={Source}", + ReplicationRun.RunId, + checkpoint, + "File" + ); - return new LogPosition(numbers[0], (ulong)numbers[1]); + return checkpoint; } int _counter; LogPosition? _lastPosition; public async ValueTask StoreCheckpoint(LogPosition logPosition, CancellationToken cancellationToken) { + // Ensure checkpoint never moves backwards within a run. + if (_lastPosition != null && logPosition.EventPosition < _lastPosition.EventPosition) { + Log.Error( + "Attempt to move checkpoint backwards. Current={Current}, New={New}", + _lastPosition, + logPosition + ); + + // Ignore the backwards update to maintain monotonicity. + return; + } + _lastPosition = logPosition; Interlocked.Increment(ref _counter); @@ -70,6 +99,30 @@ public async ValueTask StoreCheckpoint(LogPosition logPosition, CancellationToke public async ValueTask Flush(CancellationToken cancellationToken) { if (_lastPosition == null) return; - await File.WriteAllTextAsync(_fileName, $"{_lastPosition.EventNumber},{_lastPosition.EventPosition}", cancellationToken).ConfigureAwait(false); + LogPosition? previousCheckpoint = null; + + if (File.Exists(_fileName)) { + var existingContent = await File.ReadAllTextAsync(_fileName, cancellationToken).ConfigureAwait(false); + + if (!string.IsNullOrWhiteSpace(existingContent)) { + var parts = existingContent.Split(',').Select(x => Convert.ToInt64(x)).ToArray(); + + if (parts.Length == 2) + previousCheckpoint = new LogPosition(parts[0], (ulong)parts[1]); + } + } + + Log.Debug( + "Flushing checkpoint. Run={RunId}, Previous={PreviousCheckpoint}, New={NewCheckpoint}", + ReplicationRun.RunId, + previousCheckpoint, + _lastPosition + ); + + await File.WriteAllTextAsync( + _fileName, + $"{_lastPosition.EventNumber},{_lastPosition.EventPosition}", + cancellationToken + ).ConfigureAwait(false); } } diff --git a/src/Kurrent.Replicator/Partitioning/HashPartitioner.cs b/src/Kurrent.Replicator/Partitioning/HashPartitioner.cs index e639648c..eeb69ae7 100644 --- a/src/Kurrent.Replicator/Partitioning/HashPartitioner.cs +++ b/src/Kurrent.Replicator/Partitioning/HashPartitioner.cs @@ -10,12 +10,12 @@ public class HashPartitioner : Supervisor, IPartitioner { readonly int _partitionCount; readonly PartitionChannel[] _partitions; - public HashPartitioner(int partitionCount, IHashGenerator hashGenerator) { + public HashPartitioner(int partitionCount, IHashGenerator hashGenerator, bool ignoreMetadataEventsForPartitioning = false) { _partitionCount = partitionCount; _hashGenerator = hashGenerator; _partitions = Enumerable.Range(0, partitionCount) - .Select(index => new PartitionChannel(index)) + .Select(index => new PartitionChannel(index, ignoreMetadataEventsForPartitioning)) .ToArray(); } diff --git a/src/Kurrent.Replicator/Partitioning/PartitionChannel.cs b/src/Kurrent.Replicator/Partitioning/PartitionChannel.cs index 8f2de178..48d90344 100644 --- a/src/Kurrent.Replicator/Partitioning/PartitionChannel.cs +++ b/src/Kurrent.Replicator/Partitioning/PartitionChannel.cs @@ -1,5 +1,9 @@ +using System.Collections.Generic; +using System.Linq; using System.Threading.Channels; +using Kurrent.Replicator.Shared.Contracts; using Kurrent.Replicator.Shared.Logging; +using Kurrent.Replicator.Shared.Observe; using GreenPipes; using GreenPipes.Agents; using Kurrent.Replicator.Sink; @@ -8,15 +12,30 @@ namespace Kurrent.Replicator.Partitioning; public class PartitionChannel : Agent { readonly int _index; + readonly bool _ignoreMetadataEventsForPartitioning; readonly Task _reader; readonly ChannelWriter _writer; static readonly ILog Log = LogProvider.GetCurrentClassLogger(); long _writeSequence; + + // Tracks the last successfully written event for this partition so we can + // log the boundary between "good" and "bad" ordering on sequence errors. + BaseProposedEvent? _lastWrittenEvent; - public PartitionChannel(int index) { + // Best-effort in-memory buffer snapshot of the most recent events routed + // through this partition. This does not affect sequencing logic and is + // only used for diagnostic logging when a sequence error occurs. + readonly Queue _recentEvents = new(); + + // To avoid logging unbounded state, keep only a small sliding window of + // recent events per partition. + const int RecentEventsLimit = 32; + + public PartitionChannel(int index, bool ignoreMetadataEventsForPartitioning) { _index = index; + _ignoreMetadataEventsForPartitioning = ignoreMetadataEventsForPartitioning; var channel = Channel.CreateBounded(1); _reader = Task.Run(() => Reader(channel.Reader)); _writer = channel.Writer; @@ -33,13 +52,130 @@ async Task Reader(ChannelReader reader) { if (context == null || pipe == null) throw new InvalidCastException("Wrong context type, expected SinkContext"); - Log.Warn("debug 0 === {proposedSequence} {writeSequence}", context.ProposedEvent.SequenceNumber, _writeSequence); + var proposedEvent = context.ProposedEvent; + // Use the global commit position for ordering instead of SequenceNumber, + // which is per-stream and can reset to 0 for new streams causing false + // sequence errors. + var proposedSequence = (long)proposedEvent.SourceLogPosition.EventPosition; + + // Maintain a sliding window of recent events for buffer dumps + // on sequence errors. This is best-effort diagnostics only. + TrackRecentEvent(proposedEvent); + + if (_ignoreMetadataEventsForPartitioning && proposedEvent.IsMetadata) { + if (ReplicationDebugOptions.DebugPartitionSequences && Log.IsDebugEnabled()) { + Log.Debug( + "Skipping sequence check for metadata event. Partition={PartitionId}, Key={Key}, EventType={EventType}, EventId={EventId}, Position=C:{EventNumber}/P:{EventPosition}, ReplicationMessageId={ReplicationMessageId}", + _index, + proposedEvent.EventDetails.Stream, + proposedEvent.EventDetails.EventType, + proposedEvent.EventDetails.EventId, + proposedEvent.SourceLogPosition.EventNumber, + proposedEvent.SourceLogPosition.EventPosition, + proposedEvent.ReplicationMessageId + ); + } + + await pipe.Send(context); + + // Do not advance _writeSequence when metadata events are ignored + // for partitioning; they should not affect domain event ordering. + continue; + } + + if (proposedSequence < _writeSequence) + Log.Warn( + "Wrong sequence for {EventType}. Partition={PartitionId}, EventId={EventId}, Position=C:{EventNumber}/P:{EventPosition}, ProposedSequence={ProposedSequence}, WriteSequence={WriteSequence}, Stream={Stream}, IsMetadata={IsMetadata}, ReplicationMessageId={ReplicationMessageId}", + proposedEvent.EventDetails.EventType, + _index, + proposedEvent.EventDetails.EventId, + proposedEvent.SourceLogPosition.EventNumber, + proposedEvent.SourceLogPosition.EventPosition, + proposedSequence, + _writeSequence, + proposedEvent.EventDetails.Stream, + proposedEvent.IsMetadata, + proposedEvent.ReplicationMessageId + ); + + if (proposedSequence < _writeSequence && ReplicationDebugOptions.DebugPartitionSequences && Log.IsDebugEnabled()) { + // 5.1. Log the current buffer content (best-effort dump of + // recent events in this partition). + Log.Debug( + "Partition buffer dump on sequence error. Partition={PartitionId}, Key={Key}, Count={Count}, Items={Items}", + _index, + proposedEvent.EventDetails.Stream, + _recentEvents.Count, + _recentEvents + .Select(x => new { + x.SequenceNumber, + x.EventDetails.EventType, + x.EventDetails.EventId, + x.IsMetadata, + Position = $"C:{x.SourceLogPosition.EventNumber}/P:{x.SourceLogPosition.EventPosition}", + x.ReplicationMessageId + }) + .ToArray() + ); + + // 5.2. Log previous event that advanced writeSequence so + // we can see the exact boundary between "good" and "bad" + // ordering. + if (_lastWrittenEvent != null) { + Log.Debug( + "Last written before error. Partition={PartitionId}, Key={Key}, EventType={EventType}, EventId={EventId}, Position=C:{EventNumber}/P:{EventPosition}, WriteSequence={WriteSequence}, IsMetadata={IsMetadata}, ReplicationMessageId={ReplicationMessageId}", + _index, + _lastWrittenEvent.EventDetails.Stream, + _lastWrittenEvent.EventDetails.EventType, + _lastWrittenEvent.EventDetails.EventId, + _lastWrittenEvent.SourceLogPosition.EventNumber, + _lastWrittenEvent.SourceLogPosition.EventPosition, + _writeSequence, + _lastWrittenEvent.IsMetadata, + _lastWrittenEvent.ReplicationMessageId + ); + } + } + + if (ReplicationDebugOptions.DebugPartitionSequences && Log.IsDebugEnabled()) { + Log.Debug( + "Sequence transition in partition {PartitionId}: {PreviousSequence} -> {NewSequence} (delta={Delta}), EventType={EventType}, Stream={Stream}, Position=C:{EventNumber}/P:{EventPosition}, IsMetadata={IsMetadata}, ReplicationMessageId={ReplicationMessageId}", + _index, + _writeSequence, + proposedSequence, + proposedSequence - _writeSequence, + proposedEvent.EventDetails.EventType, + proposedEvent.EventDetails.Stream, + proposedEvent.SourceLogPosition.EventNumber, + proposedEvent.SourceLogPosition.EventPosition, + proposedEvent.IsMetadata, + proposedEvent.ReplicationMessageId + ); + } - if (context.ProposedEvent.SequenceNumber < _writeSequence) - Log.Warn("Wrong sequence for {Type}", context.ProposedEvent.EventDetails.EventType); - _writeSequence = context.ProposedEvent.SequenceNumber; + _writeSequence = proposedSequence; await pipe.Send(context); + + // Track the last successfully written event per partition so + // we can log it when a future sequence error happens. + _lastWrittenEvent = proposedEvent; + + if (ReplicationDebugOptions.DebugPartitionSequences && Log.IsDebugEnabled()) { + Log.Debug( + "Seq OK. Partition={PartitionId}, Key={Key}, EventType={EventType}, EventId={EventId}, Position=C:{EventNumber}/P:{EventPosition}, ProposedSequence={ProposedSequence}, WriteSequence={WriteSequence}, IsMetadata={IsMetadata}, ReplicationMessageId={ReplicationMessageId}", + _index, + proposedEvent.EventDetails.Stream, + proposedEvent.EventDetails.EventType, + proposedEvent.EventDetails.EventId, + proposedEvent.SourceLogPosition.EventNumber, + proposedEvent.SourceLogPosition.EventPosition, + proposedSequence, + _writeSequence, + proposedEvent.IsMetadata, + proposedEvent.ReplicationMessageId + ); + } } catch (OperationCanceledException) { if (!IsStopping) throw; @@ -47,6 +183,14 @@ async Task Reader(ChannelReader reader) { } } + void TrackRecentEvent(BaseProposedEvent proposedEvent) { + _recentEvents.Enqueue(proposedEvent); + + while (_recentEvents.Count > RecentEventsLimit) { + _recentEvents.Dequeue(); + } + } + protected override async Task StopAgent(StopContext context) { await _reader.ConfigureAwait(false); await base.StopAgent(context).ConfigureAwait(false); diff --git a/src/Kurrent.Replicator/Partitioning/ValuePartitioner.cs b/src/Kurrent.Replicator/Partitioning/ValuePartitioner.cs index f7c1134c..c5d21327 100644 --- a/src/Kurrent.Replicator/Partitioning/ValuePartitioner.cs +++ b/src/Kurrent.Replicator/Partitioning/ValuePartitioner.cs @@ -30,7 +30,7 @@ async Task Send(string key, T context, IPipe next) where T : class, PipeCo var partitionKey = key.StartsWith("$") ? "$system" : key; if (!_partitions.ContainsKey(partitionKey)) { Log.Info("Adding new partition {Partition}", partitionKey); - _partitions[partitionKey] = new PartitionChannel(_partitionsCount++); + _partitions[partitionKey] = new PartitionChannel(_partitionsCount++, ignoreMetadataEventsForPartitioning: false); } await _partitions[partitionKey].Send(context, next); } diff --git a/src/Kurrent.Replicator/Prepare/PrepareContext.cs b/src/Kurrent.Replicator/Prepare/PrepareContext.cs index 1cfd3fb7..c356b368 100644 --- a/src/Kurrent.Replicator/Prepare/PrepareContext.cs +++ b/src/Kurrent.Replicator/Prepare/PrepareContext.cs @@ -16,7 +16,8 @@ public void IgnoreEvent() OriginalEvent.EventDetails, OriginalEvent.LogPosition, OriginalEvent.SequenceNumber, - OriginalEvent.TracingMetadata + OriginalEvent.TracingMetadata, + OriginalEvent.ReplicationMessageId ); public EventDetails EventDetails => OriginalEvent.EventDetails; diff --git a/src/Kurrent.Replicator/Prepare/TransformFilter.cs b/src/Kurrent.Replicator/Prepare/TransformFilter.cs index bcfd0185..e78155b4 100644 --- a/src/Kurrent.Replicator/Prepare/TransformFilter.cs +++ b/src/Kurrent.Replicator/Prepare/TransformFilter.cs @@ -63,19 +63,22 @@ static BaseProposedEvent TransformMeta(BaseOriginalEvent originalEvent) new ProposedDeleteStream( deleted.EventDetails, deleted.LogPosition, - deleted.SequenceNumber + deleted.SequenceNumber, + deleted.ReplicationMessageId ), StreamMetadataOriginalEvent meta => new ProposedMetaEvent( meta.EventDetails, meta.Data, meta.LogPosition, - meta.SequenceNumber + meta.SequenceNumber, + meta.ReplicationMessageId ), IgnoredOriginalEvent ignored => new IgnoredEvent( ignored.EventDetails, ignored.LogPosition, - ignored.SequenceNumber + ignored.SequenceNumber, + ignored.ReplicationMessageId ), _ => throw new InvalidOperationException("Unknown original event type") }; diff --git a/src/Kurrent.Replicator/Replicator.cs b/src/Kurrent.Replicator/Replicator.cs index d812b82b..e8b2e011 100644 --- a/src/Kurrent.Replicator/Replicator.cs +++ b/src/Kurrent.Replicator/Replicator.cs @@ -22,6 +22,8 @@ public static async Task Replicate( ReplicatorOptions replicatorOptions, CancellationToken stoppingToken ) { + ReplicationRun.StartNew(); + ReplicationMetrics.SetCapacity(preparePipeOptions.BufferSize, sinkPipeOptions.BufferSize); var cts = new CancellationTokenSource(); @@ -132,13 +134,23 @@ CancellationToken stoppingToken return; void Stop() { - Log.Info("Replicator stopping..."); + Log.Info("Replicator stopping... [Run {RunId}].", ReplicationRun.RunId); stopping = true; writerCts.Cancel(); } async Task Flush() { - Log.Info("Storing the last known checkpoint"); + // The authoritative checkpoint is maintained by the checkpoint store + // (updated from the sink pipeline via StoreCheckpoint). Here we only + // trigger persistence of whatever the store currently holds in + // memory, without trying to compute or override that value using + // the reader's last position. + + Log.Info( + "Flushing checkpoint store. Run={RunId}", + ReplicationRun.RunId + ); + await checkpointStore.Flush(CancellationToken.None).ConfigureAwait(false); } diff --git a/src/Kurrent.Replicator/Sink/SinkPipe.cs b/src/Kurrent.Replicator/Sink/SinkPipe.cs index 24de2108..445a9126 100644 --- a/src/Kurrent.Replicator/Sink/SinkPipe.cs +++ b/src/Kurrent.Replicator/Sink/SinkPipe.cs @@ -25,7 +25,7 @@ public SinkPipe(IEventWriter writer, SinkPipeOptions options, ICheckpointStore c ? x => KeyProvider.ByStreamName(x.ProposedEvent) : GetJsPartitioner(); - cfg.UsePartitioner(new HashPartitioner(options.PartitionCount, new Murmur3UnsafeHashGenerator()), keyProvider); + cfg.UsePartitioner(new HashPartitioner(options.PartitionCount, new Murmur3UnsafeHashGenerator(), options.IgnoreMetadataEventsForPartitioning), keyProvider); } else if (useJsPartitioner) { cfg.UsePartitioner(new ValuePartitioner(), GetJsPartitioner()); diff --git a/src/Kurrent.Replicator/Sink/SinkPipelineOptions.cs b/src/Kurrent.Replicator/Sink/SinkPipelineOptions.cs index a2e1cc55..a346c6fa 100644 --- a/src/Kurrent.Replicator/Sink/SinkPipelineOptions.cs +++ b/src/Kurrent.Replicator/Sink/SinkPipelineOptions.cs @@ -3,5 +3,6 @@ namespace Kurrent.Replicator.Sink; public record SinkPipeOptions( int PartitionCount = 1, int BufferSize = 1000, - string? Partitioner = null + string? Partitioner = null, + bool IgnoreMetadataEventsForPartitioning = false ); \ No newline at end of file diff --git a/src/replicator/Settings/ReplicatorSettings.cs b/src/replicator/Settings/ReplicatorSettings.cs index 9a02e1bd..8d3b042d 100644 --- a/src/replicator/Settings/ReplicatorSettings.cs +++ b/src/replicator/Settings/ReplicatorSettings.cs @@ -25,10 +25,15 @@ public record Checkpoint { } public record SinkSettings : EsdbSettings { - public int PartitionCount { get; init; } = 1; - public string Router { get; init; } - public string Partitioner { get; init; } - public int BufferSize { get; init; } = 1000; + public int PartitionCount { get; init; } = 1; + public string Router { get; init; } + public string Partitioner { get; init; } + public int BufferSize { get; init; } = 1000; + /// + /// When enabled, metadata/system events (e.g. $>, $@) are not used for partition sequencing checks. + /// This is a diagnostic flag to help investigate ordering issues involving metadata events. + /// + public bool IgnoreMetadataEventsForPartitioning { get; init; } = false; } public record TransformSettings { @@ -54,6 +59,12 @@ public record Replicator { public Checkpoint Checkpoint { get; init; } = new(); public TransformSettings Transform { get; init; } = new(); public Filter[] Filters { get; init; } + /// + /// Enables high-verbosity sequencing diagnostics (per-event sequence logs, + /// partition buffer dumps, etc.). Intended for local or non-production + /// environments only, as it can generate a large volume of logs. + /// + public bool DebugPartitionSequences { get; init; } = false; } public static class ConfigExtensions { diff --git a/src/replicator/Startup.cs b/src/replicator/Startup.cs index cddef399..2dd6bf2f 100644 --- a/src/replicator/Startup.cs +++ b/src/replicator/Startup.cs @@ -6,6 +6,7 @@ using Kurrent.Replicator.Mongo; using Kurrent.Replicator.Prepare; using Kurrent.Replicator.Shared; +using Kurrent.Replicator.Shared.Observe; using Kurrent.Replicator.Sink; using Prometheus; using replicator.HttpApi; @@ -21,6 +22,15 @@ public static void ConfigureServices(WebApplicationBuilder builder) { var replicatorOptions = builder.Configuration.GetAs(); + if (replicatorOptions.DebugPartitionSequences) { + // Enable high-verbosity partition/sequence diagnostics for this run. + ReplicationDebugOptions.DebugPartitionSequences = true; + + var log = Kurrent.Replicator.Shared.Logging.LogProvider.GetCurrentClassLogger(); + log.Log(Kurrent.Replicator.Shared.Logging.LogLevel.Info, + () => "DebugPartitionSequences is ENABLED. High-verbosity sequencing logs may produce large volumes of output and should not be used in production."); + } + var services = builder.Services; services.AddSingleton(); @@ -52,13 +62,20 @@ public static void ConfigureServices(WebApplicationBuilder builder) { ) ); - services.AddSingleton( - new SinkPipeOptions( + services.AddSingleton(sp => { + if (replicatorOptions.Sink.IgnoreMetadataEventsForPartitioning) { + var log = Kurrent.Replicator.Shared.Logging.LogProvider.GetCurrentClassLogger(); + log.Log(Kurrent.Replicator.Shared.Logging.LogLevel.Info, + () => "IgnoreMetadataEventsForPartitioning is ENABLED. Metadata events will bypass partition sequencing checks."); + } + + return new SinkPipeOptions( replicatorOptions.Sink.PartitionCount, replicatorOptions.Sink.BufferSize, - FunctionLoader.LoadFile(replicatorOptions.Sink.Partitioner, "Partitioner") - ) - ); + FunctionLoader.LoadFile(replicatorOptions.Sink.Partitioner, "Partitioner"), + replicatorOptions.Sink.IgnoreMetadataEventsForPartitioning + ); + }); services.AddSingleton( new ReplicatorOptions( diff --git a/src/replicator/replicator.csproj b/src/replicator/replicator.csproj index c381ad0c..c4b0cb03 100644 --- a/src/replicator/replicator.csproj +++ b/src/replicator/replicator.csproj @@ -26,12 +26,12 @@ - - - - - - + + + + + + diff --git a/test/Kurrent.Replicator.Kafka.Tests/KafkaWriterTests.cs b/test/Kurrent.Replicator.Kafka.Tests/KafkaWriterTests.cs index 586d846b..e055c7e3 100644 --- a/test/Kurrent.Replicator.Kafka.Tests/KafkaWriterTests.cs +++ b/test/Kurrent.Replicator.Kafka.Tests/KafkaWriterTests.cs @@ -36,7 +36,8 @@ public async Task Should_map_metadata_to_headers_for_json_events() { "data"u8.ToArray(), metadataBytes, new(0L, 0UL), - 0L + 0L, + Guid.NewGuid() ); var result = await writer.WriteEvent(proposedEvent, CancellationToken.None); @@ -74,7 +75,8 @@ public async Task Should_not_set_headers_for_non_json_content_type() { "data"u8.ToArray(), metadata, new(0L, 0UL), - 0L + 0L, + Guid.NewGuid() ); await writer.WriteEvent(proposedEvent, CancellationToken.None); @@ -92,7 +94,8 @@ public async Task Should_not_set_headers_when_metadata_is_null_or_empty() { "data"u8.ToArray(), null, new(0L, 0UL), - 0L + 0L, + Guid.NewGuid() ); await writer.WriteEvent(withNullMetadata, CancellationToken.None); @@ -105,7 +108,8 @@ public async Task Should_not_set_headers_when_metadata_is_null_or_empty() { "data"u8.ToArray(), Array.Empty(), new(0L, 0UL), - 0L + 0L, + Guid.NewGuid() ); await writer.WriteEvent(withEmptyMetadata, CancellationToken.None); @@ -125,7 +129,8 @@ public async Task Should_not_set_headers_when_metadata_is_malformed_json() { "data"u8.ToArray(), badMetadata, new(0L, 0UL), - 0L + 0L, + Guid.NewGuid() ); await writer.WriteEvent(proposedEvent, CancellationToken.None);