Skip to content

Spec 0034: failed-delivery observability for confirmation-based producers (#4179)#4180

Merged
iancooper merged 66 commits into
masterfrom
issue-4179-failed-delivery-context
Jun 16, 2026
Merged

Spec 0034: failed-delivery observability for confirmation-based producers (#4179)#4180
iancooper merged 66 commits into
masterfrom
issue-4179-failed-delivery-context

Conversation

@iancooper

@iancooper iancooper commented Jun 10, 2026

Copy link
Copy Markdown
Member

Summary

Implements Spec 0034 — making failed message delivery from confirmation-based producers (Kafka, RabbitMQ) observable, addressing issue #4179. This PR carries the full workflow: requirements → design (ADR 0063) → tasks → implementation (the production code is included; see "Production changes" below).

Today, when OnMessagePublished(success: false, ...) fires, the failure is silently dropped: nothing logged, no telemetry, the circuit breaker is never tripped for confirmation-based producers, and Kafka discards the failed message id and the ProduceException reason/code. This PR closes those gaps without changing the deliberate "don't bubble, let the Sweeper retry" design.

Production changes (~511 LOC across 11 src/ files)

  • ISupportPublishConfirmation / PublishConfirmationResultOnMessagePublished is enriched to Action<PublishConfirmationResult> (success, message id, wire topic, publish ActivityContext). The only interface contract that changes, affecting just the two ISupportPublishConfirmation producers.
  • OutboxProducerMediator — the confirmation callback emits a standalone settle span first (linked to the publish span), logs a Warning on failure, and trips the circuit breaker on the per-message wire topic (parity with the non-confirmation !sent path). Observability is isolated in try/catch (NFR-4); the span is disposed in finally (NFR-2).
  • BrighterTracer.CreateConfirmationSpan (+ IAmABrighterTracer) — creates the short-lived settle span carrying an ActivityLink to the original publish span; degrades to no-link when the context is absent.
  • Kafka (KafkaMessageProducer / KafkaMessagePublisher) — logs the previously-swallowed ProduceException (reason + code) at Warning and propagates the failed message id through PublishResults.
  • RabbitMQ (RmqMessageProducer async + sync) — confirmation carries the id, wire topic and publish link on ack/nack.
  • InMemoryMessageProducer — opt-in async confirmation pump (used in tests and early/local development).
  • InMemoryOutboxCircuitBreakerConcurrentDictionary + TryRemove to address the documented concurrency race.

Requirements highlights

  • FR-1 Warning-level log (not Error — message stays in Outbox for the Sweeper) with message id + wire topic.
  • FR-2 Standalone short-lived OTel Activity carrying an ActivityLink to the original publish span (graceful degradation to no-Link when the context isn't reachable).
  • FR-3 Trip the circuit breaker for the per-message wire topic (message.Header.Topic) — exact parity with the non-confirmation !sent path, including reply/rewritten topics.
  • FR-6/FR-8 Kafka: log the currently-swallowed ProduceException (reason + code); propagate the failed message id through PublishResults.
  • FR-9 RabbitMQ already carries the id on nack — verify-only for the id path.
  • FR-10 (superseded) Originally proposed an optional RequestContext? on the producer Send methods; ADR 0063 supersedes this — see below.

ADR 0063 — the design decision

The open C-7/C-8 question (how the wire topic + original trace context reach the OnMessagePublished callback) is resolved as mechanism (a): extend the contract, refined:

  • Enrich OnMessagePublished to Action<PublishConfirmationResult> (success, message id, wire topic, publish ActivityContext) — the only interface contract that changes, affecting just the two ISupportPublishConfirmation producers (Kafka, RMQ).
  • No Send-signature change. The producer self-sources the publish span from Activity.Current (set to S1 by CreateProducerSpan, AsyncLocal so race-free), captured synchronously inside Send/SendAsync. This meets FR-10's intent with a strictly smaller blast radius (the ~10 non-confirmation gateways are untouched) — so FR-10/AC-14 are marked superseded in requirements.md.
  • Rejected alternatives (b) correlation map and (c) header propagation are documented with rationale.

Review trajectory

  • Requirements: 3 adversarial rounds, 7 → 4 → 2 findings ≥ threshold, all resolved before approval.
  • Design (ADR 0063): 4 adversarial rounds — Critical (AC-2 capture linchpin) → High (concurrency race on shared producer.Span, fixed by sourcing Activity.Current) → PASS → PASS. Findings in review-design.md.

Closes #4179.

🤖 Generated with Claude Code

Add Spec 0034 capturing requirements for surfacing failed message
delivery from confirmation-based producers (Kafka/RMQ): Warning log,
standalone OTel span linked to the original publish span, and circuit
breaker trip on confirmation failure — addressing issue #4179.

Requirements approved after three adversarial review rounds (7 -> 4 -> 2
findings). Breaker/log topic uses the per-message wire topic
(message.Header.Topic) for exact parity with the non-confirmation path;
the trace-context/wire-topic plumbing mechanism (extend OnMessagePublished
contract vs correlation map vs header propagation) is deferred to ADR 0063.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
codescene-delta-analysis[bot]

This comment was marked as outdated.

@iancooper iancooper self-assigned this Jun 10, 2026
@iancooper iancooper marked this pull request as draft June 10, 2026 22:30
@iancooper iancooper added 2 - In Progress feature request .NET Pull requests that update .net code V10.X labels Jun 10, 2026
@claude

claude Bot commented Jun 10, 2026

Copy link
Copy Markdown

Review — PR #4180 (Spec 0034: failed-delivery observability)

Scope check: This PR is specification artifacts only — no production code (requirements.md, review-requirements.md, README.md, and the spec-state dotfiles). I reviewed it as a requirements document: internal consistency, accuracy of its (many) code references, and readiness to proceed to the ADR phase. This is the right shape for the spec workflow described in CLAUDE.md (Requirements → ADR → review → tasks → implement).

✅ Strengths

  • Every code reference I spot-checked is accurate. Verified against the current tree:
    • ISupportPublishConfirmation.OnMessagePublished is event Action<bool, string> — matches the glossary and the C-8 "delegate can't carry the topic" argument.
    • Configure(Async)PublisherCallbackMaybe genuinely handle only success == true (the false branch is silently dropped) — the core premise (FR-1/2/3).
    • The non-confirmation path really is if(!sent) TripTopic(message.Header.Topic) — so FR-3/C-8's "trip the wire topic, not Publication.Topic" is grounded in real parity, not an assumption.
    • Kafka PublishMessageAsync catches ProduceException<string, byte[]> with no exception variable bound, logs nothing, and writes MESSAGE_ID to deliveryResult.Headers while leaving Message.Headers empty — exactly as FR-6/FR-8/C-3 describe (including the subtle "read report-level Headers, not Message.Headers" caveat).
    • PublishResults NotPersisted branch fires OnMessagePublished?.Invoke(false, string.Empty) — matches FR-8.
    • RMQ RmqMessageProducer.cs:480 already does OnMessagePublished?.Invoke(false, messageId) — matches FR-9 (verify-only).
    • IAmAnOutboxCircuitBreaker.TripTopic(RoutingKey topic) at :44 is non-nullable — matches the corrected C-8 wording.
  • The hard architectural problem is identified honestly. C-6/C-7/C-8 correctly establish that the callback is wired once per producer at construction, captures an empty RequestContext, and captures neither the per-message wire topic nor the per-publish span — so both must be plumbed. That's the crux the ADR has to solve, and the spec doesn't hand-wave it.
  • Good requirement hygiene: every FR maps to ≥1 AC, NFRs are concrete (NFR-2's (i)/(ii)/(iii) replace the prior vague "cheap" wording), degradation paths are specified (AC-2b no-Link fallback), and OOS items keep the unrelated <string,string> dead-catch fix out of scope.

⚠️ Main issue — the committed review artifact is stale relative to the approved requirements

review-requirements.md is the round-3 review: verdict NEEDS WORK, "2 findings ≥ threshold (60)". But .requirements-approved is present and requirements.md has folded in all five of that round's findings:

Finding Status in requirements.md
1. C-6 contradicts C-8 wire-topic decision (82) ✅ C-6 rewritten ("captured producer yields only the static Publication.Topic…")
2. NFR-4 orphan (no AC) (66) ✅ AC-15 added
3. C-8 mis-attributes RoutingKey? to :44 (55) ✅ reworded to "interface overload takes RoutingKey"
4. FR-10 overstates what Send param delivers (50) ✅ note added (param only makes it available inside the producer)
5. AC-10 "background threads" (30) ✅ reworded to "gateway-dependent threads"

So the spec is actually in better shape than the review doc it ships with claims. Anyone reading review-requirements.md in isolation sees "NEEDS WORK / 2 unresolved findings" against an approved spec. Recommendation: either commit the (presumably clean) round-4 review that justified approval, or add a one-line header to review-requirements.md noting "all findings below resolved in requirements.md; superseded — see .requirements-approved." Right now the approval evidence and the newest committed review disagree.

🔸 Minor

  • Absolute line numbers will drift. The doc embeds many file.cs:NNN references (OutboxProducerMediator.cs:998/:1168/:741, KafkaMessagePublisher.cs:52-62, etc.). They're correct today, but the moment implementation lands they'll be off. Consider anchoring to method names (which the doc mostly already does) and treating line numbers as illustrative.
  • For the ADR phase (not a defect here): the async callback is registered as OnMessagePublished += async delegate(...) against an Action<bool,string> — i.e. async void. That makes NFR-4 / AC-15 (error isolation) genuinely load-bearing: an unobserved exception on that path can tear down the thread-pool thread (Kafka invokes the callback via Task.Run). Worth calling out explicitly in the design so the try/catch boundary isn't treated as optional.

Verdict

As a requirements spec this is thorough, accurate, and well-grounded in the actual code — clearly the product of the multi-round adversarial process the repo mandates. The only thing I'd fix before merge is the stale review-requirements.md vs. approved-state mismatch so the artifact set is self-consistent. Everything substantive is ready to carry into ADR 0063, where the C-7 mechanism (a/b/c) is the real open decision.

🤖 Generated with Claude Code

iancooper and others added 2 commits June 11, 2026 10:30
ADR 0063 designs observability + circuit-breaker behaviour for failed
publishes reported asynchronously via OnMessagePublished(success:false).

Decision (mechanism (a), extend the contract — single ADR):
- Enrich the ISupportPublishConfirmation.OnMessagePublished contract to
  Action<PublishConfirmationResult> carrying success, message id, wire
  topic (message.Header.Topic) and the original publish ActivityContext.
- Producer self-sources the publish span from Activity.Current (set to S1
  by CreateProducerSpan, AsyncLocal so race-free), captured synchronously
  inside Send/SendAsync — NO Send-signature change, so the ~10
  non-confirmation gateways are untouched (meets FR-10's intent with a
  strictly smaller blast radius).
- Mediator false-branch: Warning log (FR-1), standalone OTel span linked
  to S1 (FR-2), TripTopic(wire topic) (FR-3), no mark-dispatched / no
  bubble (FR-4), error-isolated (NFR-4). Kafka FR-6/FR-8; RMQ verify+enrich.

Reviewed adversarially over three rounds (Critical -> High -> PASS);
findings in specs/0034-failed-delivery-context/review-design.md.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude

claude Bot commented Jun 11, 2026

Copy link
Copy Markdown

Review — Spec 0034: failed-delivery observability (PR 4180)

This is a spec/documentation-only PR (no production code), so I reviewed it for internal consistency, requirements/design alignment, and the soundness of the design's load-bearing assumptions. Overall a strong, careful spec — the ADR in particular is excellent: it uses an RDD information-holder (PublishConfirmationResult record) over a positional tuple, accounts honestly for the source/binary break, and genuinely weighs alternatives (b)/(c). I verified the two linchpin codebase claims and both hold:

  • ISupportPublishConfirmation.OnMessagePublished is event Action<bool, string> (ISupportPublishConfirmation.cs:42)
  • CreateProducerSpan sets Activity.Current = activity (BrighterTracer.cs:700, method starts :641) — this is what makes the whole Activity.Current-sourced design viable.

A few things to reconcile before moving to /spec:tasks:

1. PR description undersells the actual scope (please update)

The body says Specification artifacts only and the What this PR contains list stops at requirements.md / review-requirements.md / README / markers. But the diff also includes the full design phase: docs/adr/0063-failed-delivery-context.md (232 new lines), review-design.md, and .adr-list. This PR is requirements and ADR design + design review, not just requirements. The description should say so — a reviewer skimming the summary would not expect to be approving a design ADR.

2. Requirements / design drift: FR-10 / AC-14 / AC-2 (resolve before tasks)

The ADR's Decision deliberately abandons FR-10's Send-signature change, sourcing the publish span from Activity.Current instead (no Send signature change). Good call. But requirements.md was not reconciled to match:

  • AC-14 (requirements.md:157) still mandates a test that the new optional RequestContext? parameter ... is passed to Send/SendAsync. Under the chosen design that parameter does not exist, so AC-14 is unsatisfiable as written.
  • AC-2 still carries the precondition the publish RequestContext carrying S1's context was flowed to the producer's Send/SendAsync (FR-10) — no longer how the context reaches the producer.

When /spec:tasks runs, AC-14 will generate a /test-first task for a contract the design explicitly decided not to build. Recommend marking FR-10/AC-14 superseded by the ADR's Activity.Current mechanism (and re-wording AC-2's Given) in requirements.md so tasks derive from a consistent source. The ADR's Relationship to FR-10 section already explains the supersession well — it just needs to land back in the requirements doc.

3. Approval marker vs the committed review file disagree

.requirements-approved exists (requirements approved), and the PR body says the round-3 findings were all resolved — and the committed requirements.md does resolve them (C-6 reworded per the recommendation; AC-15 added for NFR-4). But the committed review-requirements.md still reads Verdict: NEEDS WORK with 2 findings at or above threshold and no resolution note. A reader opening that artifact is told the requirements still need work, contradicting the approval marker. Either add a resolved-in-commit note to that review file (matching how review-design.md records its post-round-3 edits), or it reads as a live blocker.

4. Design risk for the implementation phase (not a spec blocker)

The mechanism's correctness rests on a narrow, implicit, ordering- and config-dependent invariant: Activity.Current == S1 at the first line of Send/SendAsync, before any await, and the OutboxProducer Polly pipeline starting no Activity in between. The ADR documents this honestly (Negative + Risks + the AsyncLocal race-free reasoning that correctly rejects the shared producer.Span). Two things to carry into tasks:

  • The pipeline is user-overridable; a user who adds an activity-emitting resilience strategy would silently get a wrong-or-null captured context. Ensure the degradation contract is wrong-to-no-link, not wrong-to-wrong-link — AC-2b covers the null case, but a mis-set Activity.Current (someone else's span) would link to the wrong span, which AC-2 forbids. The producer cannot distinguish S1 from an interloper by value alone, so at minimum pin the invariant with a test that fails if an intervening activity is started.
  • The review-design.md first nit (RMQ-async: capture must precede the EnsureBrokerAsync await, not sit next to AddPendingConfirmation) is exactly the mistake an implementer will make. Good that it's documented — make it an explicit task note so it is not lost between phases.

Minor

  • PROMPT.md (resume/working state) is committed in the diff. Per CLAUDE.md that is its intended use, so this is fine — just noting it is working-state, not a deliverable.

None of the above blocks the spec direction — items 2 and 3 are the ones I would reconcile before /spec:tasks so the task breakdown derives from a self-consistent set of artifacts.

Generated with Claude Code

Per review feedback, store a named internal PendingConfirmation record
(MessageId, Topic, Context) keyed by delivery tag rather than widening
Dictionary<ulong, string> into a loose tuple. Keeps the delivery-tag map
focused — it is the only bridge between RabbitMQ's delivery-tag-based,
async, batched-confirm protocol and Brighter's message identity +
telemetry context. The record is internal (no public-API obligation).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude

claude Bot commented Jun 11, 2026

Copy link
Copy Markdown

Review — Spec 0034 / ADR 0063 (failed-delivery observability)

This PR is specification + ADR only (no production code): a new requirements doc, ADR 0063, two adversarial-review records, and tracking dotfiles. So I reviewed it as a design artifact — primarily by verifying the ADR's codebase claims against the live source, since the whole design rests on a handful of them.

✅ Codebase claims verified accurate

I spot-checked ~12 of the specific line/behaviour references the ADR depends on; every one matches master:

  • CreateProducerSpan really does Activity.Current = activity after StartActivity (BrighterTracer.cs:700) — the linchpin of the capture mechanism. ✓
  • ISupportPublishConfirmation.OnMessagePublished is event Action<bool, string> (:42). ✓
  • Both mediator callback configurers handle only the success == true branch; false is silently dropped (OutboxProducerMediator.cs:741,765). ✓
  • Non-confirmation path trips TripTopic(message.Header.Topic) at :998; private TripTopic(RoutingKey?) has the RoutingKey.IsNullOrEmpty guard (:1168). ✓
  • Kafka catch is unbound catch (ProduceException<string, byte[]>) and writes MESSAGE_ID to deliveryResult.Headers while leaving Message.Headers = [] (KafkaMessagePublisher.cs:52,58-60). ✓
  • PublishResults reads the id only on Persisted, hardcodes (false, string.Empty) on failure, marshals via Task.Run (KafkaMessageProducer.cs:364-384). ✓
  • RMQ _pendingConfirmations is Dictionary<ulong,string> and already raises OnMessagePublished(false, messageId) on nack (RmqMessageProducer.cs:60,480). ✓
  • CreateSpan<TRequest> already accepts and forwards ActivityLink[]? links (:106). ✓

The accuracy and FR→AC→C traceability here are excellent — this is a high-quality spec.

Design soundness

The central decision — source the publish span from Activity.Current (an AsyncLocal, hence race-free under concurrent same-topic dispatch) rather than the shared mutable producer.Span field — is correct, and the rejection of producer.Span/requestContext.Span is well-argued. The Kafka feasibility holds: PublishResults(report.Status, report.Headers) is invoked from a closure created inside Send/SendAsync (KafkaMessageProducer.cs:260,333), so a captured ActivityContext local is in closure scope and can populate the enriched result. Avoiding the all-gateways Send-signature ripple while still meeting FR-10's intent is a genuine improvement over the literal requirement.

Issues / suggestions (for the design→tasks transition)

  1. review-requirements.md verdict is stale and contradicts the approved state. The committed file is round 3, Verdict: NEEDS WORK, with 2 findings ≥ threshold (C-6 contradiction, score 82; NFR-4 orphan AC, score 66). But the committed requirements.md has already resolved both — C-6 is reworded to "the captured producer yields only the static Publication.Topic… not reachable… must be plumbed," and AC-15 (NFR-4) now exists. Yet .requirements-approved is present and the PR says "approved." A reader opening the review file sees NEEDS WORK. review-design.md handles exactly this situation well, with a "Post-round-3 edits" resolution note — recommend giving review-requirements.md the same treatment (a resolution note or a round-4 PASS) so the artifact reflects reality.

  2. The "capture before any await" invariant is the design's main fragility — consider hardening it at implementation, not just documenting it. It's a same-thread ordering contract that must be re-honoured at four raise sites (Kafka sync/async, RMQ sync/async), and the ADR already flags the RMQ-async EnsureBrokerAsync await hazard. A single shared capture helper (or a defensive guard) would prevent the invariant being silently re-broken by a future edit, since AC-2/AC-10 only catch a regression when a tracing listener is actually active in the test. Worth making a first-class task.

  3. Make the Kafka context channel explicit in the ADR. The ADR says the context rides "the Kafka delivery-handler closure" without citing that the closure is co-located with the capture site (:260,333). Since this is the part of the design that's least obvious (PublishResults' own signature is just (status, headers)), adding the line references would close the only spot where an implementer might reasonably go looking for a per-message correlation store that doesn't exist.

  4. Action<bool,string>Action<PublishConfirmationResult> is binary-breaking — confirm it fits Brighter's versioning policy. The ADR is admirably honest that this breaks ISupportPublishConfirmation implementors (source + binary) and says it must be in release notes. Worth an explicit check during tasks that this is acceptable in the target release line (minor vs major) and that no in-repo samples/test doubles raise the old delegate shape.

  5. Minor — Activity.Current leftover after the async loop. CreateProducerSpan assigns Activity.Current manually (not via using), so disposal doesn't auto-restore the prior value; the ADR notes the async path "relies on the next iteration's overwrite rather than nulling." The capture is safe because it reads at the top of Send while Current == S1 — but a one-line note confirming the capture is unaffected by that leftover would pre-empt the question.

None of these block the design phase; (1) is the only one I'd fix before calling the artifacts final. Nice work overall.

🤖 Generated with Claude Code

iancooper and others added 3 commits June 11, 2026 14:35
ADR 0063 passes a second clean adversarial round after the PendingConfirmation
record refinement. Folded the two cosmetic nits: RMQ removal touch-point now
cites RemovePendingConfirmations/RemoveConfirmationsLocked (:419/:447), and the
resilience-extension path includes the Extensions/ segment.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…req review

Per PR #4180 wider-review feedback:
- FR-10/AC-14 superseded: ADR 0063 adopts C-7 mechanism (a) refined to source the
  publish span from Activity.Current (no Send-signature parameter). Banners added so
  /spec:tasks does not derive a RequestContext?-parameter test from a contract that
  was deliberately not built. AC-2/AC-2b/FR-2 example reworded to the Activity.Current
  capture (observable assertions unchanged).
- review-requirements.md: added a RESOLVED/SUPERSEDED banner — the committed file was
  the round-3 NEEDS WORK record, contradicting .requirements-approved; all its findings
  were folded into requirements.md before approval.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude

claude Bot commented Jun 11, 2026

Copy link
Copy Markdown

Review — Spec 0034 / ADR 0063 (design-phase, docs-only)

Reviewed the full diff and independently verified the load-bearing codebase facts the design rests on. This is a specification + ADR PR with no production code, which is the correct posture for this point in the Requirements → Design → Review → Tasks → Implement workflow in CLAUDE.md. Overall this is a careful, well-grounded design that went through four adversarial rounds, and the honesty about the binary-break is commendable.

✅ Facts I re-verified (all accurate)

  • CreateProducerSpan sets Activity.Current = activity — confirmed at BrighterTracer.cs:700 (the linchpin of the capture mechanism).
  • OnMessagePublished is event Action<bool, string>ISupportPublishConfirmation.cs:42.
  • Only Kafka + RMQ.Sync + RMQ.Async implement ISupportPublishConfirmation; InMemoryMessageProducer has a separate OnMessagePublished (Action<bool, Id>, success-only) and is not affected — so the "blast radius = 2 producers" claim holds.
  • Kafka's delivery-report closure report => PublishResults(report.Status, report.Headers) is created inside Send/SendWithDelayAsync (KafkaMessageProducer.cs:260, 333), so the captured ActivityContext can ride that closure local — the carrying mechanism in the ADR is feasible.
  • PublishResults hardcodes OnMessagePublished?.Invoke(false, string.Empty) (:382) and reads MESSAGE_ID only on Persisted (:366) — FR-8 gap confirmed.
  • The async !sent path trips TripTopic(message.Header.Topic) (DispatchAsync, ~:998) — the wire-topic parity argument (C-8) checks out.

⚠️ Highest implementation risk — the Activity.Current capture invariant

The design correctly identifies this as its main fragility (Negative consequences), but I want to underline it because it is the part most likely to regress silently:

  1. It is a same-thread ordering invariant spread across 4 raise sites (Kafka sync/async, RMQ sync/async). "Capture Activity.Current?.Context as the first action before any await" must be re-honoured at each. The RMQ-async await EnsureBrokerAsync (:167) precedes per-message tracking, so the capture has to sit at the very top of the method — easy to get wrong on a later edit. A single shared capture helper plus a test that fails if an intervening activity is started before capture (as the carry-forward note proposes) should be a hard requirement in tasks.md, not optional.

  2. The resilience-pipeline assumption is user-overridable. The ADR's "Activity.Current stays == S1" holds for the Brighter-default AddRetry pipeline (verified: no telemetry/activity strategy), but the OutboxProducer pipeline is registry-keyed and a user can register an activity-emitting strategy, which would make the captured context a child span rather than S1 — a wrong link, which AC-2 forbids (vs. the tolerated missing link of AC-2b). Worth a sentence on how this is detected/bounded, or an explicit "we accept silent wrong-linking if a user injects an activity strategy" statement.

  3. Trade-off vs. the superseded FR-10. Choosing Activity.Current over an explicit RequestContext?/ActivityContext? Send parameter trades robustness for a much smaller blast radius. That's a defensible call (and the explicit param wouldn't have helped anyway — RequestContext.Span holds the parent, not S1), but it does mean the design's correctness now depends on an ambient, ordering-sensitive read rather than an explicit data flow. The ADR documents this; just flagging that the chosen mechanism is the less defensive of the two and leans entirely on tests to hold the line.

Minor / for the tasks phase

  • AC-2 testability: as the carry-forward notes catch, AC-2/AC-10 only detect a regression when an ActivitySource listener is active. The test fixture must register a listener and a fake ISupportPublishConfirmation producer that can raise (false, …)InMemoryMessageProducer can't serve (not a confirmation producer, no failure path). Make the fake an explicit task deliverable.
  • Stale XML doc: ISupportPublishConfirmation's comment says Guid => what was the id of the published message but the param is string. Since this interface's contract is being rewritten anyway, the new PublishConfirmationResult XML docs are a natural place to correct that legacy wording.
  • Kafka empty-id + topic: when MESSAGE_ID is genuinely absent (AC-8), the id is empty but the breaker still needs the wire topic. Confirm the enriched raise sources Topic from the in-scope message in the closure (not from the report headers), so FR-3 still trips correctly on an empty-id failure.
  • PROMPT.md is transient working-state; it's intentionally tracked per CLAUDE.md, but it's the only non-spec file here and carries a lot of session detail. Fine to keep, just noting it's the one file a reviewer might not expect in a feature PR.

Test coverage

N/A at this phase (no code). The ADR has pre-identified the non-trivial fixtures (fake confirmation producer, listener-active AC-2 tests, the binary-break migration of ~10 in-repo subscriber sites) — those should land as explicit TEST tasks under /test-first per the mandatory TDD workflow.

Verdict: Solid, well-verified design. No blocking issues for a design-phase PR. The Activity.Current capture invariant is the thing to pin hardest with tests before implementation.

🤖 Generated with Claude Code

iancooper and others added 2 commits June 11, 2026 15:55
…ducer

PO-authorised scope expansion: make InMemoryMessageProducer a third
ISupportPublishConfirmation implementer so the full failed-delivery flow is
testable in-process with a production provider rather than a test-only fake
(consistent with Brighter treating in-memory providers as real providers).

- Opt-in async-confirm switch (default off = today's synchronous (true,id));
  on = Channel + worker pump raising the enriched callback off-thread
  (exercises the cross-thread NFR-3 path like Kafka's Task.Run).
- Failure-injection hook drives the false path end-to-end in-process
  (FR-1/2/3/5, AC-5/10/15).
- Same Activity.Current capture-before-enqueue invariant; worker lifecycle
  drained on dispose; NFR-3/NFR-4 apply to the pump.
- Orphaned Action<bool,Id> event (no external subscribers) becomes the
  enriched Action<PublishConfirmationResult>.
- PublishConfirmationResult.MessageId retyped string -> Id (Id.Empty for
  FR-5; implicit string<->Id keeps Kafka/RMQ passing strings).
- OOS-3/NFR-5 reconciled: the InMemory switch toggles provider confirm
  timing/outcome for dev+tests, not the always-on observability behaviour.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude

claude Bot commented Jun 11, 2026

Copy link
Copy Markdown

Review — Spec 0034 / ADR 0063 (failed-delivery observability)

🤖 Automated review by Claude. This is a spec + design-only PR (requirements + ADR 0063, no production code), so I focused on (a) whether the ADR's load-bearing code claims are actually true, and (b) the soundness of the design and its fit with the project's guardrails.

Verdict: strong, unusually well-grounded design. A few process/scope items worth resolving before /spec:approve design.


✅ What's excellent

I spot-checked the ADR's code citations against the live tree. Every load-bearing claim verified, line numbers included:

  • CreateProducerSpan does set Activity.Current = S1 unconditionally at BrighterTracer.cs:700 (and if StartActivity returns null, Current becomes null → the FR-2 degrade-to-no-link path is genuinely correct, not a wrong link).
  • ISupportPublishConfirmation.OnMessagePublished is Action<bool, string> (:42); callback wired once-per-producer in the ctor (:162); success-only handlers at :741/:768; TripTopic(message.Header.Topic) at :998, batch at :933, guarded private overload at :1168-1171.
  • Kafka unbound catch (ProduceException<string, byte[]>) (:52), MESSAGE_IDdeliveryResult.Headers (:60), PublishResults hardcoding (false, string.Empty) via Task.Run (:381-383); the dead <string,string> catches (:262,:336) confirmed as OOS-2.
  • RMQ Dictionary<ulong,string> _pendingConfirmations (:60), nack raise at :480, EnsureBrokerAsync await preceding tracking (:167).
  • AddBrighterDefault is a bare AddRetry with no telemetry/activity strategy (:57-67) — so the "pipeline doesn't perturb Activity.Current" claim holds.

The Activity.Current-as-capture-source decision is the right call over the rejected producer.Span (shared, races under concurrent same-topic dispatch) and requestContext.Span (parent span). The alternatives analysis (correlation map / header propagation) is honest about trade-offs. RDD framing of PublishConfirmationResult as an information holder, and PendingConfirmation as an internal record, is clean.


🟡 Concerns to resolve before approval

1. The Activity.Current capture invariant is the design's single point of fragility — and it's spread across 5 raise sites. Capture must be the first statement inside Send/SendAsync/the pump, before any await or child-activity, at Kafka-sync, Kafka-async, RMQ-sync, RMQ-async, and InMemory. RMQ-async is already a trap: SendWithDelayAsync awaits EnsureBrokerAsync at :167 before tracking, so capture has to hoist above it. This is correctly flagged in PROMPT.md carry-forward note #2, but I'd make the shared capture helper + a regression test that fails if an intervening activity starts before capture a hard task-phase requirement, not a "consider." AC-2/AC-10 only catch this when a tracing listener is active.

2. The InMemory scope expansion landed after the last recorded adversarial round. review-design.md is Round 4 / PASS dated 2026-06-11, but its findings only cover the PendingConfirmation refinement — they do not adversarially review the InMemory confirmation-capable producer (Channel + worker pump + failure-injection hook + dispose draining). PROMPT.md:57 itself says this expansion "materially grows the ADR → re-review (R5) recommended before approve." Right now the PASS verdict predates the expansion it is meant to bless. I'd run that R5 round before /spec:approve design.

3. The InMemory capability has no backing requirement (traceability gap). ADR 0063 introduces a whole new production capability (opt-in async-confirm pump, failure injection, lifecycle draining) on InMemoryMessageProducer, but there's no corresponding FR in requirements.mdPROMPT.md:57 acknowledges this is "pending." An ADR is design for requirements; a new capability with no FR breaks the spec→design→test traceability the workflow depends on, and the ADR has to spend a paragraph arguing it doesn't violate OOS-3/NFR-5. Add the FR (and its ACs) first, or descope the pump to the minimum needed and note the testability rationale there.

4. Guardrail tension — CLAUDE.md "Do NOT change defaults or make changes beyond what was explicitly requested." Issue #4179 is "make Kafka/RMQ confirmation failures observable." Turning a previously-simple in-memory provider into a third ISupportPublishConfirmation implementer with a background pump is a real complexity increase justified only by test enablement. The ADR is candid about this (Negative consequence "InMemory scope increase"), and it's PO-authorised — but given the explicit guardrail, the leaner option (a focused test double, or a minimal sync failure-injection path without the full Channel/worker pump) deserves to be weighed as a documented alternative, not just accepted.


🟢 Minor

  • Binary-break release-line check. Action<bool,string>Action<PublishConfirmationResult> is correctly called out as source+binary-breaking for ISupportPublishConfirmation implementors, and PROMPT.md note Support async versions of Send and Publish #3 lists ~10 in-repo test subscriber sites to migrate. Just confirm this break is slotted into a major/minor that permits it before tasks begin.
  • PROMPT.md in the diff. Committing the rolling resume-state file is sanctioned by your process (CLAUDE.md), but it adds churn/noise to the PR and leaks working notes ("uncommitted", "next session") into history. Optional: keep it out of feature PRs.

Net: the requirements and ADR are genuinely high quality and the code grounding is better than most implementation PRs I see. The blockers are process/scope around the InMemory expansion (items 2–4), not the core mechanism, which is sound. Nice work.

Tidy requirements & ADR 0063 for a fresh reader, and adopt symmetric
success/failure confirmation-span tracing (PO decision 2026-06-11).

Clarity pass:
- Remove FR-10 (the optional RequestContext?-on-Send mechanism) and all
  mechanism prose from C-7/C-8/ACs in requirements; the spec now states
  the need (publish context reachable at callback), not the how.
- ADR: delete the "Relationship to FR-10" self-debate; move the rejected
  Send-parameter approach into Alternatives Considered alongside the
  correlation-map and header-propagation options.

Symmetric decision:
- FR-2 now emits the standalone linked confirmation span on BOTH success
  and failure branches. On success it parents the MarkDispatched DB span,
  fixing today's orphaned/null-parent Outbox span (C-6). Warning log and
  breaker trip stay failure-only; success delivery semantics frozen.
- OOS-4 narrowed to delivery semantics; AC-13 rewritten; NFR-2/AC-12
  generalized; ground-truth item documents the orphan defect.
- ADR Decision, architecture diagram, pseudocode, components, consequences,
  and critical files updated to match.

Refresh PROMPT.md resume state (resolved decisions, R5 still owed).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
codescene-delta-analysis[bot]

This comment was marked as outdated.

codescene-delta-analysis[bot]

This comment was marked as outdated.

InMemoryMessageProducer's channel-based drain does not compile on netstandard2.0,
which lacks System.Threading.Channels in-box. Add the backfill package reference
(already in central package management) scoped to the netstandard2.0 target.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude

claude Bot commented Jun 15, 2026

Copy link
Copy Markdown

Code Review — PR #4180: Failed-delivery observability for confirmation-based producers

Reviewed the production changes (~511 LOC across 11 src/ files) against CLAUDE.md and the project conventions. Overall this is a high-quality, well-scoped PR: the design is documented end-to-end (requirements → ADR 0063 → tasks → review), the change is restricted to exactly the gap described in #4179, error isolation is taken seriously, and test coverage is broad (Core, InMemory, Kafka, RMQ async + sync). The "don't bubble, let the Sweeper retry" semantics are correctly preserved.

Below are observations — mostly minor — that may be worth a look before merge.

Strengths

  • Race-free context capture. Sourcing the publish span from Activity.Current?.Context (AsyncLocal) captured synchronously at send time, rather than from the shared mutable producer.Span, is the right call and is well justified in the ADR. Capturing the ActivityContext value (a struct valid after the span ends) sidesteps the thread-affinity problem cleanly.
  • Error isolation (NFR-4). The observability span is built in its own try/catch, the dispatch/trip work in a second try/catch, and the span is disposed in finally. The explicit local guard around the async void delegate (rather than relying on the resilience pipeline to absorb throws) is correct — an unobserved exception there would be process-terminating.
  • Wire-topic parity. Tripping the breaker on message.Header.Topic (the wire topic carried in PublishConfirmationResult.Topic) rather than Publication.Topic gives exact parity with the non-confirmation !sent path and correctly handles reply/rewritten topics.
  • Kafka Id.Empty degradation. In PublishResults, a Persisted report with a missing id degrades to Id.Empty on the success branch rather than falling through to failure — avoiding a spurious breaker trip on a topic the broker said it persisted. Nicely reasoned and commented.
  • API shape. Choosing an immutable record (PublishConfirmationResult) over a widening positional tuple delegate is the right RDD call and keeps the contract extensible. XML docs on the type and members are present per the documentation guidelines.

Observations / possible issues

  1. Activity.Current is not restored after CreateConfirmationSpan on the RMQ path. CreateConfirmationSpan sets Activity.Current = null then = activity, and the callback disposes the span in finally but never restores the prior ambient. For Kafka/InMemory this is harmless (the callback runs under Task.Run, whose ExecutionContext is discarded). But RMQ invokes OnMessagePublished synchronously from its broker ack/nack handler (OnPublishFailed/OnPublishSucceeded), and the failure branch has no await before TripTopic, so it runs to completion on that long-lived dispatch thread — leaving Activity.Current pointing at a disposed activity. Subsequent tracing started on that same thread could mis-parent. The ADR's remark acknowledges this is "only safe from a context whose ambient activity is owned by the caller"; worth confirming the RMQ dispatch thread genuinely qualifies, or restoring the previous Activity.Current in the finally.

  2. InMemoryOutboxCircuitBreaker.CoolDown decrement is still a non-atomic read-modify-write. The PR correctly fixes the enumeration-vs-mutation race by switching to ConcurrentDictionary + TryRemove. However _trippedTopics[key] -= 1 is read-modify-write, and a concurrent TripTopic (_trippedTopics[topic] = CooldownCount) can be lost/overwritten. For an in-memory "simple implementation" this is likely acceptable, but if strict correctness under concurrent trip+cooldown is intended, an AddOrUpdate/TryUpdate loop would close it.

  3. Success-branch confirmation span is Dispose()d, not ended via EndSpan. The span is disposed directly in finally, so on success its status stays Unset (vs. Ok) and the end time isn't stamped via _timeProvider the way EndSpan does. The failure branch sets Error inside CreateConfirmationSpan, so it's fine. Minor consistency nit with the rest of the tracer.

  4. Sync InMemory path drops the publish context. Enqueue raises PublishConfirmationResult(..., PublishSpanContext: null) even when an Activity is active, so the inline (non-deferred) confirmation span won't link, while the deferred path (EnqueueDeferred/DrainAsync) does capture it. This preserves "today's behavior" for the default mode and is defensible, but the asymmetry is easy to trip over later — a one-line capture of Activity.Current?.Context would make the two paths consistent.

  5. Minor readability: in ConfigureAsyncPublisherCallbackMaybe, the is ISupportPublishConfirmation producerSync pattern variable is named producerSync inside the async registration method, which reads as the wrong polarity.

  6. Log level asymmetry: ConfirmationObservabilityError logs at Error while ConfirmationDispatchError logs at Warning. Both are isolated/non-fatal; the split is defensible (a tracing fault is an unexpected bug vs. a benign sweeper-retry), but worth a deliberate confirmation.

Test coverage

Coverage is thorough and the test names follow the When_..._should_... convention. The negative/edge paths are well covered: empty-id observation, concurrent same-topic trips, observability-throws isolation, dispatch-throws isolation, no-dispatch/no-bubble, async-confirmation pump start/drain/dispose, Kafka synthetic NotPersisted, and RMQ id/topic/context carriage on both ack and nack. The documented caveat in DrainAsync — that DisposeAsync only drains a success-path async void callback up to its first await — is an important subtlety to have called out explicitly.

Nice work overall; the items above are refinements rather than blockers.

🤖 Generated with Claude Code

@claude

claude Bot commented Jun 15, 2026

Copy link
Copy Markdown

Code Review — Spec 0034 (failed-delivery observability)

Reviewed the production changes across the mediator, tracer, Kafka/RMQ/InMemory producers, and the circuit breaker, plus the test suite. Overall this is high-quality, carefully-scoped work: the blast radius is genuinely small (only the two ISupportPublishConfirmation producers change contract), the deliberate "don't bubble, let the Sweeper retry" semantics are preserved, the concurrency-sensitive paths are documented at the point of subtlety, and the breaker is tripped on the wire topic — exact parity with the non-confirmation !sent path (OutboxProducerMediator.cs:1109). Test coverage is excellent: success/failure, empty-id, no-bubble, linked-span, concurrent-trip, observability-isolation, and the InMemory pump's start/drain/dispose races each have a dedicated test.

A few observations, mostly minor:

1. Confirmation span is Dispose()d directly instead of via _tracer.EndSpan(...) — bypasses the TimeProvider (worth fixing)

OutboxProducerMediator.cs:808 and :888 end the span with confirmationSpan?.Dispose(), whereas every other span in this file is closed with _tracer?.EndSpan(span) (:340, :409, :660). Two consequences:

  • End time uses wall-clock now, not the tracer's _timeProvider. CreateConfirmationSpan starts the span with _timeProvider.GetUtcNow() (BrighterTracer.cs:735) but Activity.Dispose() stamps the end with the real clock. Under a controlled/fake TimeProvider (as tests use) the start and end come from different clocks, yielding a meaningless — possibly negative — duration.
  • Success spans never get ActivityStatusCode.Ok. EndSpan sets Ok only when status is Unset, and leaves the Error already set for failures — so it's safe for both branches. Calling it would make success spans consistent with the rest of the codebase.

Recommend _tracer?.EndSpan(confirmationSpan) in both finally blocks instead of Dispose().

2. InMemoryOutboxCircuitBreaker.CoolDown read-modify-write is non-atomic vs. concurrent TripTopic

The move to ConcurrentDictionary + TryRemove correctly fixes the removal race. But CoolDown still does _trippedTopics[key] -= 1 (InMemoryOutboxCircuitBreaker.cs:60), a get-then-set that is not atomic. TripTopic's indexer assignment (:72) is individually atomic, but if it lands between CoolDown's read and write, the fresh trip is clobbered with oldValue - 1 and the topic can cool/evict faster than intended. The new When_concurrent_same_topic_confirmations_fail_should_not_lose_trips test exercises concurrent TripTopic but not TripTopic racing CoolDown. Low impact (CoolDown is typically driven by the single sweeper), but if hardening is intended, AddOrUpdate would make the decrement atomic.

3. Minor: Kafka PublishResults async Task.Run fire-and-forget

KafkaMessageProducer.cs:387 / :406 raise the confirmation via Task.Run(() => OnMessagePublished?.Invoke(...)) with the returned task unobserved. The subscriber (the mediator callback) is itself fully try/caught, so an escaping exception is contained in practice — but unlike the InMemory pump (which tracks _raiseTasks for a deterministic dispose drain) there's no handle here, so a faulting raise is silently swallowed at this layer. Acceptable for a fire-and-forget confirmation, just noting the asymmetry.

Things I specifically checked and liked

  • The Persisted-but-missing-id branch deliberately degrades to Id.Empty rather than falling through to the failure path — avoids tripping the breaker on a topic the broker just confirmed (KafkaMessageProducer.cs:374-385). Good defensive call, well commented.
  • CreateConfirmationSpan clearing Activity.Current to force a true root + link (rather than nesting) is correct, and the AsyncLocal-flow gotcha that makes parentId: null insufficient is documented (BrighterTracer.cs:753-761).
  • Publish context captured synchronously before any closure/await in all four producers (Kafka :260/:336, RMQ :153, InMemory :221) — race-free linking as designed.
  • The async void confirmation delegate's local try/catch is the right safety boundary, and the rationale (process-terminating unobserved exception on the broker thread) is spelled out at OutboxProducerMediator.cs:797.

Nice work — none of the above blocks merge in my view; item 1 is the only one I'd actively recommend changing.

🤖 Generated with Claude Code

iancooper and others added 3 commits June 15, 2026 12:34
…Ok status

Both confirmation callbacks closed the span with raw Activity.Dispose(), which stamps
the end from the wall clock (not the tracer's TimeProvider) and never sets Ok on
success spans. Use _tracer.EndSpan() like every other span in the mediator.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…er concurrency

CoolDown did a non-atomic read-modify-write (_trippedTopics[key] -= 1) plus a second
indexer read for the eviction check. A concurrent TripTopic could be clobbered by the
stale write, and a concurrent CoolDown removing the key first made the second read throw
KeyNotFoundException. Use AddOrUpdate for an atomic decrement and a value-conditional
remove keyed on the computed value, so a fresh trip is never lost and the read-after-
remove fault is gone.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The two PublishResults raises used a fire-and-forget Task.Run whose task was discarded,
so a faulting subscriber became an unobserved Task exception. Route both through a
RaisePublishConfirmation helper that wraps the invoke in try/catch and logs at Warning,
keeping the fault contained on the broker thread.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude

claude Bot commented Jun 15, 2026

Copy link
Copy Markdown

Code Review — Spec 0034: failed-delivery observability for confirmation-based producers

Reviewed the production changes across the mediator, tracer, circuit breaker, and the Kafka / RMQ / InMemory producers, plus the supporting tests. This is a careful, well-documented PR — the inline comments explaining why (threading constraints, NFR references, parity with the non-confirmation path) are genuinely excellent and made the review tractable. Overall it looks solid; none of the points below are blocking.

Strengths

  • Closes the gaps without changing the "don't bubble, let the Sweeper retry" design. The failure path logs at Warning (not Error), trips the breaker, and leaves the message un-dispatched — exactly as the requirements state.
  • Wire-topic parity. Tripping on result.Topic (== message.Header.Topic) mirrors the non-confirmation if (!sent) TripTopic(message.Header.Topic) path in DispatchAsync, including the reply/rewritten-topic case via GetProducerLookupTopic.
  • Observability isolation. Span creation is wrapped in its own try/catch (NFR-4) and the MarkDispatched/trip work in a second try/catch, with the span ended in finally (NFR-2). The async void callback can no longer crash the producer/broker thread.
  • Concurrency fix in InMemoryOutboxCircuitBreaker. Moving to ConcurrentDictionary with an atomic AddOrUpdate compare-and-swap decrement and a conditional ICollection.Remove is the right fix — the old plain-Dictionary read-modify-write plus second-indexer-read could both corrupt state and throw KeyNotFoundException under contention. I confirmed the < 0 eviction threshold is unchanged from the base, so this is a pure race fix with no behavioral change (respects the change-scope guardrail).
  • Kafka: the previously-swallowed ProduceException is now logged at Warning with reason+code, and the failed message id propagates via the synthetic NotPersisted DeliveryResult carrying MESSAGE_ID. The PublishResults guard that degrades a Persisted-but-id-less report to Id.Empty rather than falling through to the failure path is a nice defensive touch.
  • Test coverage is strong and behavior-focused — warning-with-id-and-topic, no-dispatch/no-bubble, trip-on-wire-topic, empty-id, linked span, tracer-clock/status, concurrent-same-topic trips, observability-throws-isolation, dispatch-throws-isolation, plus InMemory pump/drain and per-transport carry-context tests.

Points worth a look (minor / non-blocking)

  1. Activity.Current is left dangling after EndSpan for confirmation spans. CreateConfirmationSpan sets Activity.Current = activity but, unlike CreateSpan, never records the PreviousActivity custom property — so EndSpan's restore branch (previousActivity is not null && Activity.Current == span) is skipped, and after disposal Activity.Current still points at the now-disposed span. The XML doc documents this as a caller constraint, and in practice it's likely benign: the Kafka path runs under Task.Run (the thread pool resets ExecutionContext between work items) and RMQ raises on its dedicated ack/nack handler; it's also self-healing for the next confirmation because CreateConfirmationSpan clears Activity.Current first. Worth a one-line comment confirming a disposed activity can't leak into unrelated reused-thread work, since the safety argument lives entirely in the threading model rather than the code.

  2. Copy-paste variable name in the async callback. In ConfigureAsyncPublisherCallbackMaybe, the ISupportPublishConfirmation cast is named producerSync even though the producer is async (IAmAMessageProducerAsync). Harmless, but slightly confusing next to the genuinely-sync sibling method.

  3. InMemory success-path drain caveat. The DrainAsync note that DisposeAsync only drains an async-void success callback up to its first await (so a success-path MarkDispatched is not guaranteed flushed on dispose) is an important, subtle contract. It's well documented in-code — just flagging that any future test relying on dispose-flushed dispatch would be a trap.

  4. Workflow artifacts in the diff. PROMPT.md, specs/.current-spec, and the .design-approved/.requirements-approved/.tasks-approved marker files are included. CLAUDE.md describes PROMPT.md as temporary state — just confirm these are intended to be version-controlled for this repo's spec workflow rather than accidentally committed.

Verification note

I reviewed statically and reasoned through the threading/concurrency paths; I did not run the full build or transport test suites here (Kafka/RMQ integration tests need broker containers). Recommend confirming the Core + InMemory unit suites and the new TestCorrelator-based Kafka log assertions are green in CI.

Nice work on a tricky, observability-sensitive change.

🤖 Generated with Claude Code

…ename async cast

Document in CreateConfirmationSpan that the forced-root span self-clears Activity.Current
to its null parent on EndSpan (so it cannot dangle at the disposed span) and that the
threading model prevents any leak into reused-thread work. Rename the async callback's
ISupportPublishConfirmation cast from producerSync to confirmingProducer to match the
async producer. No behavioural change.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude

claude Bot commented Jun 15, 2026

Copy link
Copy Markdown

Code Review — Spec 0034: failed-delivery observability (#4180)

Reviewed the production changes across the core library, Kafka, RMQ (sync + async), and the in-memory producer, plus a representative sample of the test suite. This is a high-quality PR: the full requirements to ADR to adversarial-review to TDD trail is present, the concurrency/tracing invariants are documented inline at the point they matter, and test coverage is genuinely strong (distinct wire-vs-publication-topic assertions, broker-backed Kafka/RMQ tests, and concurrency races driven by Barrier). The change is also well-scoped — only the two ISupportPublishConfirmation producers contract changes, and the ~10 non-confirmation gateways are untouched.

A few observations, none of them blockers:

1. Kafka: persisted-with-missing-id now degrades to Success=true, Id.Empty (behavior change). KafkaMessageProducer.PublishResults previously fell through to the failure callback when a Persisted report had no MESSAGE_ID; it now raises PublishConfirmationResult(true, Id.Empty, ...). Avoiding a false circuit-breaker trip for a message the broker says it persisted is the right call, and the comment explains it well. The downstream consequence is worth confirming, though: the mediator success branch will call MarkDispatchedAsync(Id.Empty, ...), which matches no Outbox row, so the message is never marked dispatched and the Sweeper re-delivers it indefinitely (re-persisting a duplicate each cycle). In practice Brighter always stamps MESSAGE_ID, so this is a pathological edge — but since the code deliberately handles the id-missing case, consider whether silent infinite re-delivery is the degradation you want, or whether a one-line Warning log when a persisted report lacks an id would make that state diagnosable.

2. ConfirmationObservabilityError logged at Error (minor, self-identified). NFR-1 puts confirmation-handling logs at Warning (nothing is lost, recoverable). The observability-fault catch in OutboxProducerMediator logs at LogLevel.Error. Defensible as a distinct event class (a tracing fault, not a delivery failure) — your own review-code.md flags it as a nit. Relatedly, KafkaMessageProducer.Log.ErrorRaisingPublishConfirmation is named Error but declared LogLevel.Warning; worth aligning the name with the level.

3. InMemoryMessageProducer.Dispose() is sync-over-async. Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); is safe for the default path (no worker started, so the drain is a no-op Task.WhenAll) and for test usage. If UseAsyncPublishConfirmation = true and a caller disposes synchronously on a thread carrying a single-threaded SynchronizationContext, the blocking GetResult() could deadlock against a continuation. Since this is explicitly a test/local-dev affordance the risk is low — but a sentence in the Dispose XML doc noting prefer DisposeAsync when the async pump is enabled would set expectations.

4. CreateConfirmationSpan Activity.Current contract is sound but sharp. The method clears Activity.Current (forced root) and intentionally does not restore it. This is correct for Kafka/RMQ (confirmations run on a pooled Task.Run or broker ack/nack thread) and is recovered in the sync Dispatch loop by Activity.Current = parentSpan after Send returns. The XML doc warns about the inline-on-a-meaningful-thread case well. The only thing I would note is that the safety of the inline InMemory path depends on that parentSpan reset being there — a future caller invoking the tracer inline without an equivalent reset would silently lose the ambient span. The doc covers it; just noting the guarantee is by-convention rather than enforced.

Things verified correct:

  • Publish-context captured synchronously as the first action before any await/closure in all four producers (race-free via AsyncLocal).
  • Bulk-dispatch guard change (and not ISupportPublishConfirmation removed) is correctly scoped — only InMemoryMessageProducer is both bulk and confirmation-capable (GCP/Azure are bulk but not confirmation; Kafka/RMQ are confirmation but not bulk), and the inner producer is not ISupportPublishConfirmation && sent guard correctly defers MarkDispatched to the callback while preserving !sent => TripTopic.
  • InMemoryOutboxCircuitBreaker Dictionary to ConcurrentDictionary with AddOrUpdate/conditional ICollection.Remove correctly fixes the cooldown read-modify-write race (no more KeyNotFoundException on concurrent CoolDown).
  • Two-stage DisposeAsync drain keys off _pumpStarted rather than a _worker snapshot, closing the first-enqueue race; _raiseTasks is only mutated by the single-reader worker and read after the await barrier.
  • NFR-4 isolation: observability and dispatch are in separate try/catch, so a throwing tracer/breaker/logger cannot escape the async-void callback or skip the breaker trip.

Nice work — the design rationale embedded in the comments made this a pleasant review. The points above are refinements, not gates.

🤖 Generated with Claude Code

iancooper and others added 2 commits June 15, 2026 21:03
…ed-without-id

Per PR review: lower ConfirmationObservabilityError to Warning (nothing is lost — the message
is safely in the Outbox for Sweeper retry) and rename it ConfirmationObservabilityFault so the
name no longer implies the level; likewise rename Kafka's ErrorRaisingPublishConfirmation
(already Warning) to PublishConfirmationRaiseFault. Add a Warning (PersistedReportMissingId)
when a Kafka Persisted report carries no message id, so the degraded Id.Empty path — which
leaves the message un-dispatched for the Sweeper rather than marked sent — is diagnosable
rather than silent.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…cContext

Sync Dispose blocked the async drain with GetAwaiter().GetResult(), which can deadlock when
called from a thread carrying a single-threaded synchronization context while the async pump
is enabled. Pump the drain on a dedicated single-threaded BrighterAsyncContext instead (the
idiom already used by ChannelAsync.Dispose), and document preferring DisposeAsync.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude

claude Bot commented Jun 15, 2026

Copy link
Copy Markdown

Code Review — Spec 0034: failed-delivery observability for confirmation-based producers

Thorough, well-documented PR. The change is genuinely useful (failed confirmations were silently dropped) and the design stays faithful to the deliberate "don't bubble, let the Sweeper retry" model. The inline comments explaining the why (AsyncLocal capture invariant, async-void safety, Activity.Current reset) are excellent and make the tricky parts reviewable. Below are observations, mostly minor — nothing I'd consider a hard blocker.

Strengths

  • Circuit breaker fix is real, not cosmetic. The original CoolDown() mutated a plain Dictionary while iterating .Keys and re-read the indexer after a possible Remove — a latent InvalidOperationException/KeyNotFoundException. Moving to ConcurrentDictionary + AddOrUpdate CAS + conditional KVP-Remove is correct and preserves the decrement/eviction semantics exactly (< 0 evicts).
  • Observability is correctly isolated. Span creation, dispatch, and EndSpan are split across separate try/catch/finally blocks so a tracing fault can't destabilise the producer thread (NFR-4), and the span is always ended in finally (NFR-2).
  • async-void safety. Both callbacks wrap the dispatch body in a local try/catch rather than relying on ExecuteWithResiliencePipeline* to absorb a throwing breaker/logger — the right call for a delegate that runs on the broker/threadpool thread.
  • Kafka persisted-without-id degrades to Id.Empty + a Warning rather than falling through to the failure path (which would have tripped the breaker for a topic the broker just confirmed). Nice catch.
  • Test coverage is strong — concurrency, isolation (throwing tracer/breaker doubles), span-linking, and per-transport id/topic/context propagation are all exercised.

Observations / suggestions

  1. InMemory success path drops the topic (minor consistency). In both Enqueue and DrainAsync, the success result is raised as new PublishConfirmationResult(true, message.Id, null, ...)Topic is null — whereas the failure path passes message.Header.Topic. Kafka and RMQ carry the wire topic on both ack and nack. As a result a successful InMemory confirmation span is named " settle" (empty topic) and omits the messaging.destination tag, so the test double doesn't quite mirror the real producers it emulates. The mediator only uses Topic for tripping on failure, so no functional bug — but passing message.Header.Topic on success too would make the double faithful and costs nothing.

  2. Dispose() is no longer a no-op for every InMemory consumer. It now runs BrighterAsyncContext.Run(async () => await DisposeAsync()). On the default (sync-confirmation) path _pumpStarted == 0, so the drain skips the worker and Task.WhenAll runs over an empty list — functionally fine — but it still spins up a single-threaded async context on every dispose, including the many existing tests that construct an InMemoryMessageProducer and never use the async pump. Worth flagging in the PR description as an intended behavioural change to a widely-used test type (per the repo's "Change Scope" guidance).

  3. BulkDispatchAsync guard removal — please confirm it's covered by a test. Dropping and not ISupportPublishConfirmation from if (producer is IAmABulkMessageProducerAsync bulkMessageProducer) means a producer that is both bulk and confirming (Kafka) now enters the bulk branch instead of falling to the else that throws InvalidOperationException("No async bulk message producer defined."). The inner if (producer is not ISupportPublishConfirmation && sent) still defers MarkDispatched to the confirmation callback, so this reads as correct — and arguably fixes a latent bug — but it changes a real code path for confirming bulk producers. A focused test ("bulk dispatch of a confirming producer sends the batch but does not MarkDispatched inline") would lock the intent in.

  4. CreateConfirmationSpan mutates Activity.Current and never restores it. Well-documented and safe for the intended callers (callback threads whose ExecutionContext is reset), but a sharp edge for any future inline caller. The XML remark covers it — just a "tread carefully later" note, not a change request.

  5. Nit: the DrainAsync comment correctly warns that the success raise Task only awaits to the first await of the async-void mediator callback, so DisposeAsync does not guarantee a success-path MarkDispatched has flushed. Good that it's documented; just ensure no future test relies on post-dispose dispatched state on the success path (the failure path has no pre-TripTopic await, so it does drain fully).

Build / verification

Reviewed statically (couldn't run dotnet build in this environment). System.Threading.Channels is correctly added under the netstandard2.0 ItemGroup and its version is centrally pinned in Directory.Packages.props (10.0.7), so central package management is satisfied.

Overall: clean, careful, well-tested work. Items 1–3 are the ones I'd ask the author to look at, and only #3 (a confirming test for the bulk-path change) feels worth gating on for safety.

🤖 Generated with Claude Code

…firming test

Responds to the code review on PR #4180:

- Lock the BulkDispatchAsync guard change with a focused core test: a producer
  that is both bulk and confirming (InMemoryMessageProducer) now enters the bulk
  branch and defers MarkDispatched to its confirmation rather than throwing
  "No async bulk message producer defined.". Success fact proves dispatch via
  confirmation (pre-change it threw, leaving the batch outstanding); failure fact
  proves no inline MarkDispatched on a nacked confirmation and a wire-topic trip.
- InMemoryMessageProducer success confirmations now carry message.Header.Topic on
  both the sync (Enqueue) and async (DrainAsync) paths, mirroring Kafka/RMQ which
  carry the wire topic on ack and nack.
- InMemoryMessageProducer.Dispose short-circuits when the async pump never started,
  keeping the default sync-confirmation path a true no-op instead of spinning up a
  BrighterAsyncContext on every dispose.

Full core suite green: 775 passed, 0 failed, 7 skipped (net9.0).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@iancooper

Copy link
Copy Markdown
Member Author

Thanks for the thorough review — addressed in 2233fa5.

#3 — bulk-confirming guard (the gating item). Added tests/Paramore.Brighter.Core.Tests/Confirmation/When_bulk_dispatching_a_confirming_producer.cs. InMemoryMessageProducer is both IAmABulkMessageProducerAsync and ISupportPublishConfirmation, so it drives the real OutboxProducerMediator.BulkDispatchAsync path with no broker. Two facts, both deterministic via the DisposeAsync drain + InMemoryOutbox state:

  • Success: bulk-clearing a confirming producer dispatches via the confirmation callback. Before the guard change this threw InvalidOperationException("No async bulk message producer defined."), leaving the batch outstanding — so this locks the change in.
  • Failure (the inline-skip safety property): with a nacking confirmation the message is not dispatched (despite sent == true, the inline MarkDispatched is correctly skipped) and the breaker trips on the wire topic. The trip also proves the bulk branch was entered — the pre-change throw would have sent and tripped nothing.

#1 — InMemory success path drops the topic. Fixed — success confirmations now carry message.Header.Topic on both the sync (Enqueue) and async (DrainAsync) paths, mirroring Kafka/RMQ which carry the wire topic on ack and nack.

#2Dispose() no longer a no-op. Rather than just document it, Dispose() now short-circuits when _pumpStarted == 0, keeping the default sync-confirmation path a true no-op instead of spinning up a BrighterAsyncContext on every dispose.

#4 / #5 are notes only and already covered by the existing XML remark on CreateConfirmationSpan and the DrainAsync flush comment — no change.

Full core suite green: 775 passed, 0 failed, 7 skipped (net9.0).

codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude

claude Bot commented Jun 15, 2026

Copy link
Copy Markdown

Code Review — Spec 0034: failed-delivery observability for confirmation-based producers

I reviewed the production changes across the mediator, tracer, circuit breaker, and all four producer implementations (Kafka, RMQ sync/async, InMemory), plus the test suite. This is a high-quality PR: the design is sound, the concurrency reasoning is careful and well-documented, the change is appropriately scoped (the only contract change is OnMessagePublishedAction<PublishConfirmationResult>, touching just the two confirmation producers), and it preserves the deliberate "don't bubble, let the Sweeper retry" semantics. Test coverage is excellent — every behavior (warning log, no-dispatch/no-bubble, trip-on-wire-topic, empty-id, linked span, tracer clock/status, dispatch nesting, isolation-on-throw, concurrent same-topic trips) has a dedicated test across Core/InMemory/Kafka/RMQ.

Verified correct

  • Callback wiring is once-per-mediator (ConfigureCallbacks is called from the constructor at OutboxProducerMediator.cs:162, not per-send), so the OnMessagePublished += subscriptions don't accumulate — no handler-multiplication leak.
  • Wire-topic parity: the confirmation failure path trips result.Topic (= message.Header.Topic), exactly matching the non-confirmation if(!sent) TripTopic(message.Header.Topic) at OutboxProducerMediator.cs:1115.
  • Circuit breaker atomicity: the ConcurrentDictionary + AddOrUpdate / conditional ICollection.Remove(KeyValuePair) is a correct compare-and-swap; the eviction threshold (< 0) is unchanged from the original, and the prior code's enumerate-while-removing hazard is gone.
  • Capture-before-await invariant holds in all four producers — Activity.Current?.Context is read synchronously before any closure/await.
  • RMQ raises the callback outside the state lock (the collection is removed-and-returned under lock, then invoked), avoiding reentrancy/deadlock.

Minor observations (non-blocking)

  1. Subscriber-isolation asymmetry between Kafka and RMQ. Kafka's RaisePublishConfirmation (KafkaMessageProducer.cs:415) wraps OnMessagePublished?.Invoke in Task.Run + try/catch, so a faulting subscriber is contained and logged. RMQ (both sync OnPublishSucceeded/OnPublishFailed and async) invokes the event directly on the broker confirmation thread with no try/catch. The mediator's own delegate is internally exception-safe so this is fine today, but a future second subscriber that throws synchronously would propagate into RMQ's confirmation handler. Consider mirroring Kafka's guarded-invoke for consistency.

  2. Kafka "persisted but no message id" path emits slightly contradictory logs. When a Persisted report carries no MESSAGE_ID, the producer logs PersistedReportMissingId (Warning) and raises (Success: true, Id.Empty); the mediator success branch then logs SentMessage with an empty id and calls MarkDispatchedAsync(Id.Empty, …) (a no-op match). The degradation is correct and documented, but an operator would see a Warning plus a "Sent message: Id:" line for the same event. A one-line note on SentMessage or skipping it on Id.Empty would read cleaner. Truly an edge case (Brighter always stamps MESSAGE_ID).

  3. InMemory DisposeAsync spin-wait. while ((worker = _worker) is null) await Task.Yield(); is correct and bounded (the _worker publish is the statement immediately after the CAS flip), just worth a comment that it's a yield-spin rather than a one-shot wait. The "drain only reaches the first await of the async-void success callback" caveat in DrainAsync is already well documented — good to have called it out so tests don't rely on a flushed MarkDispatched.

Process note

Per CLAUDE.md, production code in a spec PR should arrive via the TDD//test-first workflow with approval gates. The PR description and review-code.md indicate the full requirements→design→tasks→review trajectory was followed and the tests are behavior-first and present, so this looks consistent with the mandated workflow.

Overall: well-engineered, thoroughly tested, and the self-conducted adversarial reviews clearly paid off. The observations above are polish, not blockers.

🤖 Generated with Claude Code

codescene-delta-analysis[bot]

This comment was marked as outdated.

@claude

claude Bot commented Jun 16, 2026

Copy link
Copy Markdown

Code Review — Spec 0034: failed-delivery observability for confirmation-based producers

Reviewed the full diff (production + tests + spec docs). High-quality, carefully-reasoned PR. The blast radius is well-contained (only the two ISupportPublishConfirmation producers see a contract change), the concurrency reasoning is documented inline at the call sites, and test coverage is genuinely thorough — concurrency races, fault isolation, throwing test doubles, and empty-id degradation are all exercised. Feedback below, ordered by importance. Nothing is a blocker; most are questions/consistency notes.

Strengths

  • Circuit-breaker race fix is correct and behavior-preserving. InMemoryOutboxCircuitBreaker keeps the original decrement + "< 0" eviction threshold, just made atomic via AddOrUpdate + conditional ICollection.Remove. Eviction semantics are unchanged from the Dictionary version.
  • Observability is correctly isolated: span creation in its own try/catch, dispatch/trip in a second try/catch, span disposed in finally via _tracer.EndSpan. The async-void callback safety is called out explicitly — exactly the failure mode that crashes a producer thread.
  • TripTopic(result.Topic) keeps parity with the non-confirmation !sent path (wire topic, not publication topic); TripTopic safely no-ops on null/empty (OutboxProducerMediator.cs:1285).
  • Activity.Current captured synchronously inside Send/SendAsync before any closure/await (Kafka :260/:336, RMQ async :153, RMQ sync :135) — right call given AsyncLocal flow; avoids the shared producer.Span race the design review flagged.

Questions / consistency notes

  1. RMQ does not isolate a faulting subscriber the way Kafka does. Kafka wraps the raise in RaisePublishConfirmation (Task.Run + try/catch + fault log) so a throwing subscriber cannot escape into Confluents delivery-report handler. RMQ OnPublishSucceeded/OnPublishFailed (RMQ.Async/RmqMessageProducer.cs:481-501, RMQ.Sync:238-251) invoke OnMessagePublished?.Invoke(...) directly inside the foreach, unguarded. Brighters own mediator handler is internally guarded so this is safe today, but a third-party subscriber that throws would propagate into the RabbitMQ clients event dispatch. Worth mirroring Kafkas isolation for defense-in-depth (or noting why it is intentionally different). If it matches pre-existing RMQ behavior it is not a regression — just flagging the inconsistency now the two producers are meant to behave alike.

  2. "Persisted but no id" raises a SUCCESS confirmation that never marks dispatched. In KafkaMessageProducer.PublishResults (:372-393), a Persisted report with a missing MESSAGE_ID degrades to Id.Empty and raises Success=true. The mediator success branch then logs SentMessage and calls MarkDispatched(Id.Empty, ...), which matches no Outbox row — so the message silently stays for the sweeper while being reported as a successful send. The PersistedReportMissingId warning mitigates diagnosability and the comment says it "should not happen", so this is acceptable — but the semantics (a success confirmation that does not actually dispatch) are slightly surprising vs. the failure path which at least trips the breaker. Confirm this is the intended trade-off.

  3. Kafka catch clauses are typed ProduceException of (string, string) (KafkaMessageProducer.cs:265, 342) while the underlying producer is (string, byte[]) (the publisher catches ProduceException of (string, byte[]) at KafkaMessagePublisher.cs:56). Those (string,string) catches look like they can never match. This is pre-existing, not introduced here, but the new synthetic-NotPersisted flow sits right beside it — worth confirming the async throw path is fully covered by the publishers catch and the producer-level clauses are not dead code masking a real handler gap.

  4. DisposeAsync does not flush the success-path MarkDispatched for the in-memory async pump. The DrainAsync NOTE (InMemoryMessageProducer.cs:263-268) is admirably honest: because the mediator success callback is async-void and returns at its first await (MarkDispatchedAsync), await Task.WhenAll(_raiseTasks) only drains up to that first await. Since this producer is explicitly a test/local-dev affordance this is fine, but a test that disposes then asserts a success-path row was marked dispatched would be racy. The note prevents that footgun — just confirm no current test relies on it.

Minor

  • Confirmation callbacks are registered once via ConfigureCallbacks from the constructor (:162), so the construction-time RequestContext is shared across callbacks — correctly handled by CreateCopy() + per-callback Span assignment. Per-send trace linkage correctly comes from result.PublishSpanContext.
  • Doc-comment typo in ISupportPublishConfirmation: "If this producer support a callback" -> "supports".

Tests

Coverage is strong and naming follows the repo convention (When_...should...). The throwing/gating doubles and the concurrent-same-topic / cooldown-atomicity tests are the right adversarial cases. When_observability_throws_should_isolate_and_still_trip pins the NFR-4 contract directly.

Note: I could not run dotnet build / the suite in this review environment (sandbox restrictions), so the above is static review only. Please ensure CI is green, particularly the Kafka/RMQ integration tests that depend on live brokers.

Overall: well-structured, well-documented, and faithful to the "do not bubble, let the sweeper retry" design. The items above are clarifications and consistency tweaks rather than defects.

🤖 Generated with Claude Code

PublishFailurePredicate is init-only on InMemoryMessageProducer (deliberate,
so it can't be flipped at runtime), which broke the inline assignment in
the bulk-dispatch test. Move the confirmation-failure case into its own
fixture whose constructor seeds the always-nack predicate via the object
initializer, preserving the init-only contract.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

@codescene-delta-analysis codescene-delta-analysis Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gates Failed
Prevent hotspot decline (1 hotspot with Overall Code Complexity)
Enforce critical code health rules (1 file with Bumpy Road Ahead)
Enforce advisory code health rules (1 file with Overall Code Complexity)

Our agent can fix these. Install it.

Gates Passed
1 Quality Gates Passed

Reason for failure
Prevent hotspot decline Violations Code Health Impact
OutboxProducerMediator.cs 1 rule in this hotspot 6.97 → 6.56 Suppress
Enforce critical code health rules Violations Code Health Impact
KafkaMessageProducer.cs 1 critical rule 8.75 → 8.61 Suppress
Enforce advisory code health rules Violations Code Health Impact
OutboxProducerMediator.cs 1 advisory rule 6.97 → 6.56 Suppress

See analysis details in CodeScene

Quality Gate Profile: Clean Code Collective
Install CodeScene MCP: safeguard and uplift AI-generated code. Catch issues early with our IDE extension and CLI tool.

@claude

claude Bot commented Jun 16, 2026

Copy link
Copy Markdown

Code Review — PR #4180 (Spec 0034: failed-delivery observability)

Reviewed the ~624 LOC of production changes across the 12 src/ files against CLAUDE.md / .agent_instructions. This is a carefully-scoped, well-tested change that closes the silent-failure gap on confirmation-based producers without altering the deliberate "don't bubble, let the Sweeper retry" design. The inline comments documenting the why (NFR-2/3/4, the Activity.Current linchpin, the CAS/CoW reasoning) are excellent and made the review tractable. Below are findings, mostly minor.

Strengths

  • Failure isolation is solid. The confirmation callbacks wrap observability and dispatch in independent try/catch blocks with the span disposed in finally. The async void / broker-thread crash hazard is correctly identified and contained locally rather than relying on the resilience pipeline.
  • Circuit-breaker race fix is correct. InMemoryOutboxCircuitBreaker.CoolDown now uses AddOrUpdate for an atomic decrement plus the value-conditional ICollection<KVP>.Remove for compare-and-remove — this genuinely closes the read-modify-write race against TripTopic, and avoids the KeyNotFoundException the old indexer re-read could throw.
  • Wire-topic parity. Tripping on result.Topic (== message.Header.Topic) matches the non-confirmation !sent path exactly, including reply/rewritten topics. TripTopic no-ops safely on null/empty.
  • Strong test coverage — 49 test files including concurrency, isolation (throwing tracer/breaker), empty-id, span-linking, and the Kafka synthetic-NotPersisted path. System.Threading.Channels is centrally versioned in Directory.Packages.props.

Minor findings

1. Asymmetric subscriber-throw guarding (RMQ vs Kafka). KafkaMessageProducer.RaisePublishConfirmation wraps OnMessagePublished?.Invoke in a try/catch so a faulting subscriber cannot escalate to an unobserved-task fault. The RMQ producers (OnPublishSucceeded/OnPublishFailed, both sync and async) invoke OnMessagePublished directly with no guard. The mediator's own callback is self-contained, so this is low risk today, but a second subscriber that throws would propagate into RabbitMQ.Client's ack/nack handler (sync) or fault the returned Task (async). Consider mirroring Kafka's guard for symmetry/defense-in-depth.

2. Misleading success log on the degraded Kafka Id.Empty path. When Kafka reports Persisted but the delivery report carries no message id, the success branch still calls Log.SentMessage (Information, "Sent message: Id:") with an empty id, then MarkDispatchedAsync(Id.Empty) matches no Outbox row and the message stays un-dispatched for Sweeper retry. PersistedReportMissingId (Warning) already records the degraded state, but the concurrent Information "Sent message" reads as a clean success. Consider suppressing/altering SentMessage when Id.IsNullOrEmpty(result.MessageId).

3. CreateConfirmationSpan mutates Activity.Current and never restores it. This is well-documented and safe for the broker-thread callbacks (Kafka Task.Run, RMQ dispatch thread) and within the sync Dispatch loop (which resets Activity.Current = parentSpan after each Send). The one path that runs the callback inline on the caller's thread is InMemoryMessageProducer's default (sync-confirmation) Send/Enqueue — there the only thing preserving the caller's ambient span is that mediator reset. It holds, but it is a subtle coupling. Worth an explicit test asserting Activity.Current is unchanged for the caller after a confirmation flows through the mediator on the inline path, so a future refactor of that reset cannot silently regress it.

4. Two trip paths now coexist in bulk dispatch. Removing and not ISupportPublishConfirmation from the bulk branch is necessary (a producer that is both IAmABulkMessageProducerAsync and ISupportPublishConfirmation, like the updated InMemoryMessageProducer, would otherwise hit the throw). Note that a future bulk+confirmation producer could trip the breaker twice for one failure — once via !sent and once via the failure confirmation callback. Not an issue for the current producers; worth a comment near the !sent trip so it is not a surprise later.

Questions / nits

  • InMemoryMessageProducer._raiseTasks grows unbounded for the lifetime of the producer in async-confirmation mode (each raise is appended, never trimmed). Fine for the test/local-dev intent, but worth a one-line note that this provider is not meant for long-lived high-volume use in that mode.
  • The success-branch MarkDispatched re-parenting via a CreateCopy() context whose .Span is the confirmation span is a nice touch and correctly avoids mutating the shared construction-time context — good catch on the thread-keyed/ignore-null setter.

Overall: clean, conservative, and faithful to the spec/ADR trajectory. The findings above are polish, not blockers.

🤖 Generated with Claude Code

@iancooper iancooper merged commit 6548bc0 into master Jun 16, 2026
26 of 29 checks passed
@iancooper iancooper deleted the issue-4179-failed-delivery-context branch June 16, 2026 12:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

2 - In Progress feature request .NET Pull requests that update .net code V10.X

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Provide more context for failed delivery using a callback

1 participant