Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Kurrent.Replicator.slnx
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<Solution>
<Folder Name="/test/">
<Project Path="test\Kurrent.Replicator.Kafka.Tests\Kurrent.Replicator.Kafka.Tests.csproj" />
<Project Path="test\Kurrent.Replicator.Tests\Kurrent.Replicator.Tests.csproj" />
</Folder>
<Project Path="src\Kurrent.Replicator.EventStore\Kurrent.Replicator.EventStore.csproj" />
Expand Down
63 changes: 47 additions & 16 deletions src/Kurrent.Replicator.EventStore/TcpEventReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,33 +79,60 @@ public async Task ReadEvents(LogPosition fromLogPosition, Func<BaseOriginalEvent
.ConfigureAwait(false);

foreach (var sliceEvent in slice?.Events ?? Enumerable.Empty<ResolvedEvent>()) {
var replicationMessageId = Guid.NewGuid();

var eventType = sliceEvent.Event.EventType;
var isMetadata = eventType.StartsWith("$");

if (sliceEvent.Event.EventType.StartsWith('$') &&
sliceEvent.Event.EventType != Predefined.MetadataEventType) {
await next(MapIgnored(sliceEvent, sequence++, activity)).ConfigureAwait(false);
await next(MapIgnored(sliceEvent, sequence++, activity, replicationMessageId)).ConfigureAwait(false);

continue;
}

if (Log.IsDebugEnabled())
if (Log.IsDebugEnabled()) {
Log.Debug(
"TCP: Read event with id {Id} of type {Type} from {Stream} at {Position}",
"TCP: Read event with id {Id} of type {Type} from {Stream} at {Position}, ReplicationMessageId={ReplicationMessageId}",
sliceEvent.Event.EventId,
sliceEvent.Event.EventType,
eventType,
sliceEvent.OriginalStreamId,
sliceEvent.OriginalPosition
sliceEvent.OriginalPosition,
replicationMessageId
);

if (ReplicationDebugOptions.DebugPartitionSequences) {
Log.Debug(
"Reader sequence. Position={Position}, EventType={EventType}, EventId={EventId}, Stream={Stream}, ReplicationMessageId={ReplicationMessageId}",
sliceEvent.OriginalPosition,
eventType,
sliceEvent.Event.EventId,
sliceEvent.OriginalStreamId,
replicationMessageId
);

Log.Debug(
"Event classification. Type={EventType}, IsMetadata={IsMetadata}, Stream={Stream}, Position={Position}, ReplicationMessageId={ReplicationMessageId}",
eventType,
isMetadata,
sliceEvent.OriginalStreamId,
sliceEvent.OriginalPosition,
replicationMessageId
);
}
}

if (sliceEvent.Event.EventType == Predefined.MetadataEventType) {
if (sliceEvent.Event.EventStreamId.StartsWith('$')) continue;

if (Encoding.UTF8.GetString(sliceEvent.Event.Data) == StreamDeletedBody) {
if (Log.IsDebugEnabled())
Log.Debug("Stream deletion {Stream}", sliceEvent.Event.EventStreamId);

await next(MapStreamDeleted(sliceEvent, sequence++, activity));
await next(MapStreamDeleted(sliceEvent, sequence++, activity, replicationMessageId));
}
else {
var meta = MapMetadata(sliceEvent, sequence++, activity);
var meta = MapMetadata(sliceEvent, sequence++, activity, replicationMessageId);

if (Log.IsDebugEnabled())
Log.Debug("Stream meta {Stream}: {Meta}", sliceEvent.Event.EventStreamId, meta);
Expand All @@ -114,7 +141,7 @@ public async Task ReadEvents(LogPosition fromLogPosition, Func<BaseOriginalEvent
}
}
else if (sliceEvent.Event.EventType[0] != '$') {
var originalEvent = Map(sliceEvent, sequence++, activity);
var originalEvent = Map(sliceEvent, sequence++, activity, replicationMessageId);
await next(originalEvent).ConfigureAwait(false);
}
}
Expand All @@ -139,27 +166,29 @@ public async Task ReadEvents(LogPosition fromLogPosition, Func<BaseOriginalEvent

public ValueTask<bool> Filter(BaseOriginalEvent originalEvent) => _filter.Filter(originalEvent);

static IgnoredOriginalEvent MapIgnored(ResolvedEvent evt, int sequence, Activity activity)
static IgnoredOriginalEvent MapIgnored(ResolvedEvent evt, int sequence, Activity activity, Guid replicationMessageId)
=> new(
evt.OriginalEvent.Created,
MapDetails(evt.OriginalEvent, evt.OriginalEvent.IsJson),
MapPosition(evt),
sequence,
new(activity.TraceId, activity.SpanId)
new(activity.TraceId, activity.SpanId),
replicationMessageId
);

static OriginalEvent Map(ResolvedEvent evt, int sequence, Activity activity)
static OriginalEvent Map(ResolvedEvent evt, int sequence, Activity activity, Guid replicationMessageId)
=> new(
evt.OriginalEvent.Created,
MapDetails(evt.OriginalEvent, evt.OriginalEvent.IsJson),
evt.OriginalEvent.Data,
evt.OriginalEvent.Metadata,
MapPosition(evt),
sequence,
new(activity.TraceId, activity.SpanId)
new(activity.TraceId, activity.SpanId),
replicationMessageId
);

static StreamMetadataOriginalEvent MapMetadata(ResolvedEvent evt, int sequence, Activity activity) {
static StreamMetadataOriginalEvent MapMetadata(ResolvedEvent evt, int sequence, Activity activity, Guid replicationMessageId) {
var streamMeta = StreamMetadata.FromJsonBytes(evt.OriginalEvent.Data);

return new(
Expand All @@ -180,17 +209,19 @@ static StreamMetadataOriginalEvent MapMetadata(ResolvedEvent evt, int sequence,
),
MapPosition(evt),
sequence,
new(activity.TraceId, activity.SpanId)
new(activity.TraceId, activity.SpanId),
replicationMessageId
);
}

static StreamDeletedOriginalEvent MapStreamDeleted(ResolvedEvent evt, int sequence, Activity activity)
static StreamDeletedOriginalEvent MapStreamDeleted(ResolvedEvent evt, int sequence, Activity activity, Guid replicationMessageId)
=> new(
evt.OriginalEvent.Created,
MapSystemDetails(evt.OriginalEvent),
MapPosition(evt),
sequence,
new(activity.TraceId, activity.SpanId)
new(activity.TraceId, activity.SpanId),
replicationMessageId
);

static EventDetails MapDetails(RecordedEvent evt, bool isJson) =>
Expand Down
7 changes: 4 additions & 3 deletions src/Kurrent.Replicator.Http/HttpTransform.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public async ValueTask<BaseProposedEvent> Transform(OriginalEvent originalEvent,
}

if (response.StatusCode == HttpStatusCode.NoContent) {
return new IgnoredEvent(originalEvent.EventDetails, originalEvent.LogPosition, originalEvent.SequenceNumber);
return new IgnoredEvent(originalEvent.EventDetails, originalEvent.LogPosition, originalEvent.SequenceNumber, originalEvent.ReplicationMessageId);
}

var httpResponse = (await JsonSerializer.DeserializeAsync<HttpEvent>(
Expand All @@ -45,10 +45,11 @@ originalEvent.EventDetails with {
Encoding.UTF8.GetBytes(httpResponse.Payload),
httpResponse.Metadata == null ? originalEvent.Metadata : Encoding.UTF8.GetBytes(httpResponse.Metadata),
originalEvent.LogPosition,
originalEvent.SequenceNumber
originalEvent.SequenceNumber,
originalEvent.ReplicationMessageId
);
} catch (OperationCanceledException) {
return new NoEvent(originalEvent.EventDetails, originalEvent.LogPosition, originalEvent.SequenceNumber);
return new NoEvent(originalEvent.EventDetails, originalEvent.LogPosition, originalEvent.SequenceNumber, originalEvent.ReplicationMessageId);
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/Kurrent.Replicator.Js/JsTransform.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,14 @@ public ValueTask<BaseProposedEvent> Transform(OriginalEvent original, Cancellati
);

BaseProposedEvent evt = result == null
? new IgnoredEvent(original.EventDetails, original.LogPosition, original.SequenceNumber)
? new IgnoredEvent(original.EventDetails, original.LogPosition, original.SequenceNumber, original.ReplicationMessageId)
: new ProposedEvent(
original.EventDetails with { Stream = result.Stream, EventType = result.EventType },
result.Data,
result.Meta,
original.LogPosition,
original.SequenceNumber
original.SequenceNumber,
original.ReplicationMessageId
);

return new(evt);
Expand Down
39 changes: 39 additions & 0 deletions src/Kurrent.Replicator.Kafka/KafkaHeadersBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) Kurrent, Inc and/or licensed to Kurrent, Inc under one or more agreements.
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).

using System.Text;
using System.Text.Json;
using Kurrent.Replicator.Shared.Contracts;
using Kurrent.Replicator.Shared.Logging;

namespace Kurrent.Replicator.Kafka;

public static class KafkaHeadersBuilder {
static readonly ILog Log = LogProvider.GetCurrentClassLogger();

public static Headers? BuildHeaders(string contentType, byte[]? metadata) {
if (contentType != ContentTypes.Json || metadata is not { Length: > 0 })
return null;

try {
using var doc = JsonDocument.Parse(metadata);
var root = doc.RootElement;
if (root.ValueKind != JsonValueKind.Object)
return null;

var headers = new Headers();
foreach (var prop in root.EnumerateObject()) {
var valueBytes = prop.Value.ValueKind switch {
JsonValueKind.String => Encoding.UTF8.GetBytes(prop.Value.GetString() ?? string.Empty),
_ => Encoding.UTF8.GetBytes(prop.Value.GetRawText())
};
headers.Add(prop.Name, valueBytes);
}

return headers.Count > 0 ? headers : null;
} catch (JsonException e) {
Log.Warn("Malformed json, skipping metadata to header mapping", e);
return null;
}
}
}
30 changes: 19 additions & 11 deletions src/Kurrent.Replicator.Kafka/KafkaWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,30 @@ public Task<long> WriteEvent(BaseProposedEvent proposedEvent, CancellationToken
async Task<long> Append(ProposedEvent p) {
var (topic, partitionKey) = _route(p);

_debug?.Invoke(
"Kafka: Write event with id {Id} of type {Type} to {Stream} with original position {Position}",
[
proposedEvent.EventDetails.EventId,
proposedEvent.EventDetails.EventType,
topic,
proposedEvent.SourceLogPosition.EventPosition
]
);
if (ReplicationDebugOptions.DebugPartitionSequences && _debug != null) {
_debug(
"Kafka: Write event with id {Id} of type {Type} to {Stream} with original position {Position}, ReplicationMessageId={ReplicationMessageId}, PartitionKey={PartitionKey}",
[
proposedEvent.EventDetails.EventId,
proposedEvent.EventDetails.EventType,
topic,
proposedEvent.SourceLogPosition.EventPosition,
proposedEvent.ReplicationMessageId,
partitionKey
]
);
}

// TODO: Map meta to headers, but only for JSON
var message = new Message<string, byte[]> {
Key = partitionKey,
Value = p.Data
};


// Map metadata to Kafka headers when the payload is JSON
var headers = KafkaHeadersBuilder.BuildHeaders(p.EventDetails.ContentType, p.Metadata);
if (headers is { Count: > 0 })
message.Headers = headers;

var result = await _producer.ProduceAsync(topic, message, cancellationToken).ConfigureAwait(false);

return result.Offset.Value;
Expand Down
59 changes: 44 additions & 15 deletions src/Kurrent.Replicator.KurrentDb/GrpcEventReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,30 +62,55 @@ public async Task ReadEvents(LogPosition fromLogPosition, Func<BaseOriginalEvent

var evt = enumerator.Current;
lastPosition = (long)(evt.OriginalPosition?.CommitPosition ?? 0);
var replicationMessageId = Guid.NewGuid();

var eventType = evt.Event.EventType;
var isMetadata = eventType.StartsWith("$");

_debugLog?.Debug(
"gRPC: Read event with id {Id} of type {Type} from {Stream} at {Position}",
"gRPC: Read event with id {Id} of type {Type} from {Stream} at {Position}, ReplicationMessageId={ReplicationMessageId}",
evt.Event.EventId,
evt.Event.EventType,
eventType,
evt.OriginalStreamId,
evt.OriginalPosition
evt.OriginalPosition,
replicationMessageId
);

if (ReplicationDebugOptions.DebugPartitionSequences) {
_debugLog?.Debug(
"Reader sequence. Position={Position}, EventType={EventType}, EventId={EventId}, Stream={Stream}, ReplicationMessageId={ReplicationMessageId}",
evt.OriginalPosition,
eventType,
evt.Event.EventId,
evt.OriginalStreamId,
replicationMessageId
);

_debugLog?.Debug(
"Event classification. Type={EventType}, IsMetadata={IsMetadata}, Stream={Stream}, Position={Position}, ReplicationMessageId={ReplicationMessageId}",
eventType,
isMetadata,
evt.OriginalStreamId,
evt.OriginalPosition,
replicationMessageId
);
}

BaseOriginalEvent originalEvent;

if (evt.Event.EventType == Predefined.MetadataEventType) {
if (Encoding.UTF8.GetString(evt.Event.Data.Span) == StreamDeletedBody) {
originalEvent = MapStreamDeleted(evt, sequence++, activity);
originalEvent = MapStreamDeleted(evt, sequence++, activity, replicationMessageId);
}
else {
originalEvent = MapMetadata(evt, sequence++, activity);
originalEvent = MapMetadata(evt, sequence++, activity, replicationMessageId);
}
}
else if (evt.Event.EventType[0] != '$') {
originalEvent = Map(evt, sequence++, activity);
originalEvent = Map(evt, sequence++, activity, replicationMessageId);
}
else {
await next(MapIgnored(evt, sequence++, activity)).ConfigureAwait(false);
await next(MapIgnored(evt, sequence++, activity, replicationMessageId)).ConfigureAwait(false);

continue;
}
Expand All @@ -105,27 +130,29 @@ public async Task ReadEvents(LogPosition fromLogPosition, Func<BaseOriginalEvent
return (long?)events[0].OriginalPosition?.CommitPosition;
}

static IgnoredOriginalEvent MapIgnored(ResolvedEvent evt, int sequence, Activity activity)
static IgnoredOriginalEvent MapIgnored(ResolvedEvent evt, int sequence, Activity activity, Guid replicationMessageId)
=> new(
evt.Event.Created,
MapDetails(evt.Event),
MapPosition(evt),
sequence,
new(activity.TraceId, activity.SpanId)
new(activity.TraceId, activity.SpanId),
replicationMessageId
);

static OriginalEvent Map(ResolvedEvent evt, int sequence, Activity activity)
static OriginalEvent Map(ResolvedEvent evt, int sequence, Activity activity, Guid replicationMessageId)
=> new(
evt.OriginalEvent.Created,
MapDetails(evt.OriginalEvent),
evt.OriginalEvent.Data.ToArray(),
evt.OriginalEvent.Metadata.ToArray(),
MapPosition(evt),
sequence,
new(activity.TraceId, activity.SpanId)
new(activity.TraceId, activity.SpanId),
replicationMessageId
);

static StreamMetadataOriginalEvent MapMetadata(ResolvedEvent evt, int sequence, Activity activity) {
static StreamMetadataOriginalEvent MapMetadata(ResolvedEvent evt, int sequence, Activity activity, Guid replicationMessageId) {
var streamMeta = JsonSerializer.Deserialize<StreamMetadata>(
evt.Event.Data.Span,
MetaSerialization.StreamMetadataJsonSerializerOptions
Expand All @@ -149,17 +176,19 @@ static StreamMetadataOriginalEvent MapMetadata(ResolvedEvent evt, int sequence,
),
MapPosition(evt),
sequence,
new(activity.TraceId, activity.SpanId)
new(activity.TraceId, activity.SpanId),
replicationMessageId
);
}

static StreamDeletedOriginalEvent MapStreamDeleted(ResolvedEvent evt, int sequence, Activity activity)
static StreamDeletedOriginalEvent MapStreamDeleted(ResolvedEvent evt, int sequence, Activity activity, Guid replicationMessageId)
=> new(
evt.OriginalEvent.Created,
MapSystemDetails(evt.OriginalEvent),
MapPosition(evt),
sequence,
new(activity.TraceId, activity.SpanId)
new(activity.TraceId, activity.SpanId),
replicationMessageId
);

static EventDetails MapDetails(EventRecord evt) =>
Expand Down
Loading