Skip to content

Conversation

@bukk530
Copy link

@bukk530 bukk530 commented Dec 8, 2025

Why

Implements the TODO to propagate event metadata to Kafka headers when the event payload is JSON, enabling downstream consumers to access metadata without parsing the value body.

What’s changed

  • KafkaWriter
    • Build Kafka message headers from metadata only when ContentTypes.Json and metadata is present.
    • Delegates to KafkaHeadersBuilder.BuildHeaders.
    • No behavior change for non‑JSON content.
  • KafkaHeadersBuilder
    • Parses metadata JSON and maps only top‑level properties to headers.
    • Header value encoding:
      • String → raw UTF‑8 bytes of the string
      • Non‑string JSON → UTF‑8 bytes of compact GetRawText()
    • Safely ignores malformed JSON, non‑object roots, or empty objects; logs a warning on malformed JSON.
  • Tests (NUnit)
    • KafkaHeaderBuilderTests: verifies mapping rules and edge cases.
    • KafkaWriterHeadersTests: integration‑style test with a fake IProducer<string, byte[]> to assert headers on produced messages.
    • Tests use Assert.That syntax.
    • Added CapturingProducer test double implementing required IProducer members.
    • Initialize metrics in [OneTimeSetUp] via ReplicationMetrics.Configure(Metrics.CreateUsing()); to avoid NREs during Metrics.Measure.

Notes

  • Closes: TODO “Map meta to headers, but only for JSON”.
  • No breaking changes; non‑JSON events remain unaffected.
  • I've introduced NUnit for the new tests.
  • This PR was created with the help of automated software development methods (AI).

How to test

  • Run: dotnet test test/Kurrent.Replicator.Kafka.Tests/Kurrent.Replicator.Kafka.Tests.csproj
  • Expected: all tests pass (header mapping for JSON; ignored otherwise).

Manual testing

  • The feature was manually tested as per below screenshots

Before:

Screenshot From 2025-12-08 16-50-08

After:

Screenshot From 2025-12-08 17-43-00

The manual test was run with the following transform:

function transform(original) {
    const originalMeta = original.Meta || {};

    const meta = {
        ...originalMeta,
        stream: original.Stream,
        eventType: original.EventType,
        createdAt: original.Created,
        replicatedAt: new Date().toISOString()
    };

    return {
        Stream: original.Stream,
        EventType: original.EventType,
        Data: original.Data,
        Meta: meta,
    };
}

@CLAassistant
Copy link

CLAassistant commented Dec 8, 2025

CLA assistant check
All committers have signed the CLA.

@bukk530 bukk530 marked this pull request as draft December 8, 2025 16:35
@bukk530 bukk530 marked this pull request as ready for review December 8, 2025 17:45
@bukk530 bukk530 force-pushed the feature/add-metadata-to-header-kafka branch from 050a8de to 2ec4550 Compare December 11, 2025 20:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants