Skip to content

Add support for d14n and SubscribeAll mechanism#65

Open
Maelkum wants to merge 31 commits intomainfrom
maelkum/d14n-support
Open

Add support for d14n and SubscribeAll mechanism#65
Maelkum wants to merge 31 commits intomainfrom
maelkum/d14n-support

Conversation

@Maelkum
Copy link

@Maelkum Maelkum commented Mar 4, 2026

This PR introduces does two major things

  1. Adds support for notification server to use the new, v4 GRPC API, and
  2. Use the new SubscribeAll endpoint to stream envelopes

CLI usage remains largely unchanged. Similarly to xdbg, use --d14n to specify that the GRPC node that the server will be connecting to is a decentralized xmtpd, and that it will need to use the new, v4 API.

PR aims to maintain backwards compatibility when it comes to the old, v3 code.

There are some differences in implementation just because the nature of the XMTP nodes is different (addressed in comments where there are clarifications that would be useful).

This PR also updates protos, which includes some changes that are not relevant to this exact PR (e.g. device sync etc). If needed I can single that out in a separate PR for a more focused review.

Test support for new code is slim. This was tested locally to make sure the messages are processed and filtered correctly, though a proper target for sending payloads too is missing. Whichever recipient of the notification payloads is will need to implement handling the new payload.

Note

Add v4 (d14n) subscription support via unified SubscriberClient interface

  • Introduces SubscriberClient, SubscriberStream, and EnvelopesWrapped abstractions in pkg/xmtp/ to unify v3 and v4 XMTP envelope streams behind a single API.
  • Listener now selects between v3 (MessageApiClient) and v4 (ReplicationApiClient) backends at startup based on the new --d14n / XTMP_D14N option; refreshClient also respects this flag.
  • Adds processV4Envelope in pkg/xmtp/process.go and parseV4Envelope in pkg/xmtp/envelope.go to decode originator envelopes and route group/welcome messages through the existing delivery pipeline.
  • Extends interfaces.SendRequest with MessageV4 and accessor methods (Empty(), GetTopic(), GetMessage()); updates APNs and FCM delivery to use these accessors so both v3 and v4 messages are accepted.
  • Renames installations.DefaultInstallationServiceService and subscriptions.SubscriptionsServiceService with updated constructors; updates topics.GetMessageType to accept a topic string instead of a *v1.Envelope.
  • Risk: envelopes channel type changes from chan *v1.Envelope to chan any, requiring type assertions in all workers; any unhandled type will silently no-op.
📊 Macroscope summarized 81f28fd. 21 files reviewed, 10 issues evaluated, 0 issues filtered, 2 comments posted

🗂️ Filtered Issues

Summary by CodeRabbit

  • New Features

    • XMTP v4 streaming support alongside v3, new typed topics (group, welcome, identity, key packages, payer-related), new subscription APIs (subscribe topics / all envelopes), and a decentralization (d14n) option.
  • Bug Fixes

    • Stronger envelope parsing, safer delivery guards (skip empty requests), improved error wrapping and logging, and more robust delivery/retry flows.
  • Chores

    • Dependency upgrades, tooling/script improvements, test DB helper enhancements, and .gitignore update.

@Maelkum Maelkum self-assigned this Mar 4, 2026
@Maelkum Maelkum requested a review from a team as a code owner March 4, 2026 22:20
@coderabbitai
Copy link

coderabbitai bot commented Mar 4, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds XMTP v4 envelope support and a SubscriberClient abstraction, refactors installation/subscription types to Service/NewService, converts the listener to an envelope-driven v3/v4 pipeline, extends SendRequest for v4 envelopes, updates test DB helpers to accept *testing.T, and bumps dependencies and generated proto/Swift outputs.

Changes

Cohort / File(s) Summary
Service refactor
pkg/installations/installations.go, pkg/installations/installations_test.go, pkg/subscriptions/subscriptions.go, pkg/subscriptions/subscriptions_test.go
Renamed concrete service types to Service and constructors to NewService; adjusted method receivers and updated tests to use new constructors and test DB helper signature.
XMTP client & transport
pkg/xmtp/client.go, pkg/xmtp/client_v3.go, pkg/xmtp/client_v4.go, pkg/xmtp/grpc.go
Introduced SubscriberClient/SubscriberStream abstraction, v3/v4 adapters, EnvelopesWrapped, client options, and centralized gRPC connection/credentials helpers; removed legacy bootstrap logic.
Listener & processing pipeline
pkg/xmtp/listener.go, pkg/xmtp/process.go, pkg/xmtp/envelope.go, pkg/xmtp/message.go, pkg/xmtp/utils.go, pkg/xmtp/utils_test.go, pkg/xmtp/listener_test.go
Reworked listener to envelope-based flow: startEnvelopeListener, worker dispatch for V3/V4, parseV4Envelope, buildSendRequestV4/idempotency, deliver guard and client refresh with locking; updated utilities and tests to new signatures.
Topic system
pkg/topics/topic.go, pkg/topics/topics.go
Added typed Topic and TopicKind enum, ParseTopic, serialization and IsReserved; changed GetMessageType to accept a raw topic string.
SendRequest & delivery
pkg/interfaces/interfaces.go, pkg/delivery/apns.go, pkg/delivery/fcm.go, pkg/xmtp/message.go
Extended SendRequest with MessageV4 and helper accessors (Empty, GetMessage); updated delivery code to use accessors and MessageContext.Topic.
Tests & test helpers
test/helpers.go, pkg/.../*_test.go
Test DB helpers now accept *testing.T and resolve DSN from env; tests updated to pass t and to new service constructors.
Config & tooling
.gitignore, dev/gen-proto, pkg/options/options.go
Ignored cmd/server/server; dev/gen-proto accepts optional XMTP proto source argument; added D14N flag to XmtpOptions.
Deps & small fixes
go.mod, pkg/db/db.go
Dependency version bumps across many modules; added fmt-wrapped error context in DB migration.
Generated Swift proto
swift/device_sync/*.pb.swift, swift/xmtpv4/message_api/*.pb.swift, swift/xmtpv4/message_api/*.connect.swift
Renamed archive/backup types and fields in device_sync; added v4 subscription messages and client methods (SubscribeAllEnvelopes, SubscribeTopics) in generated Swift code.
Server main updates
cmd/server/main.go
Updated server wiring to use NewService constructors and adjusted error logging to use logger.Fatal in places.

Sequence Diagram(s)

sequenceDiagram
    participant XMTP as XMTP Server
    participant Client as SubscriberClient
    participant Listener as Listener
    participant SubSvc as Subscriptions Service
    participant InstSvc as Installations Service
    participant Delivery as DeliveryService

    XMTP->>Client: stream envelopes (v3 or v4)
    Client->>Listener: Subscribe / Receive -> enqueue envelope
    Listener->>Listener: dispatch envelope type (v3/v4)
    alt v3
        Listener->>SubSvc: GetSubscriptions(topic, periods)
        SubSvc-->>Listener: subscriptions
    else v4
        Listener->>Listener: parseV4Envelope -> topic
        Listener->>SubSvc: GetSubscriptions(parsedTopic, periods)
        SubSvc-->>Listener: subscriptions
    end
    Listener->>InstSvc: GetInstallations(installationIDs)
    InstSvc-->>Listener: installations
    Listener->>Delivery: deliver(sendRequest)
    Delivery-->>Listener: result / error
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • codabrink
  • neekolas
  • lourou

Poem

🐇
I hopped through streams both old and new,
Wrapped envelopes and topics too.
Services renamed, tests given cheer,
V4 arrived — I did a rear! 🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 3.53% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Add support for d14n and SubscribeAll mechanism' directly captures the two main features introduced in this pull request: d14n (decentralized XMTP support) and SubscribeAll mechanism for envelope streaming.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch maelkum/d14n-support
📝 Coding Plan for PR comments
  • Generate coding plan

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@macroscopeapp
Copy link

macroscopeapp bot commented Mar 4, 2026

Approvability

Verdict: Needs human review

2 blocking correctness issues found. This PR introduces significant new functionality (V4/D14N API support) with new runtime behavior controlled by a --d14n flag, new message parsing logic, and changes to the notification delivery pipeline. Multiple unresolved review comments identify potential issues including a security vulnerability in dependencies, possible logic errors in ShouldPush handling, and sensitive data logging concerns.

You can customize Macroscope's approvability policy. Learn more.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

🧹 Nitpick comments (4)
dev/gen-proto (1)

18-28: Fail fast on invalid input before deleting pkg/proto.

Line 26 removes generated code before validating user input. If Line 18 receives a bad local path, the script can leave the repo in a partially generated state. Recommend preflight checks for argument count and local path existence first.

Suggested hardening patch
+# Usage guard: allow at most one optional argument
+if [ "$#" -gt 1 ]; then
+    echo "Usage: ./dev/gen-proto [/path/to/local/xmtp/proto]" >&2
+    exit 1
+fi
+
 # Use a local proto repo path if provided, otherwise pull from the Buf Schema Registry
 # Usage: ./dev/gen-proto [/path/to/local/xmtp/proto]
 if [ -n "${1:-}" ]; then
     XMTP_PROTO_SOURCE="$1"
+    if [[ "$XMTP_PROTO_SOURCE" != buf.build/* ]] && [ ! -d "$XMTP_PROTO_SOURCE" ]; then
+        echo "Local proto repo path not found: $XMTP_PROTO_SOURCE" >&2
+        exit 1
+    fi
     echo "Using local proto repo: $XMTP_PROTO_SOURCE"
 else
     XMTP_PROTO_SOURCE="buf.build/xmtp/proto"
     echo "Using BSR: $XMTP_PROTO_SOURCE"
 fi
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@dev/gen-proto` around lines 18 - 28, The script currently removes ./pkg/proto
(rm -rf ./pkg/proto) before validating the input path in the XMTP_PROTO_SOURCE
logic; change the flow to validate arguments and, if a local path was provided,
verify it exists and is a readable directory (or a valid git remote reference)
before deleting or regenerating proto output; specifically, perform preflight
checks on $1 (ensure non-empty and test -d "$1" or appropriate git/url
validation) and only proceed to rm -rf ./pkg/proto and run buf generate
"$XMTP_PROTO_SOURCE" after the checks pass, leaving the default BSR behavior
unchanged when no argument is given.
pkg/installations/installations.go (1)

102-106: Avoid pointer churn in result transformation.

At Lines 104-105, transformResult returns *interfaces.Installation and is immediately dereferenced. Returning by value is simpler and avoids unnecessary allocation pressure.

♻️ Suggested patch
-	out := make([]interfaces.Installation, len(results))
+	out := make([]interfaces.Installation, len(results))
 	for i := range results {
-		t := transformResult(results[i])
-		out[i] = *t
+		out[i] = transformResult(results[i])
 	}
@@
-func transformResult(m db.DeviceDeliveryMechanism) *interfaces.Installation {
-	return &interfaces.Installation{
+func transformResult(m db.DeviceDeliveryMechanism) interfaces.Installation {
+	return interfaces.Installation{
 		Id: m.InstallationId,
 		DeliveryMechanism: interfaces.DeliveryMechanism{
 			Kind:      m.Kind,
 			Token:     m.Token,
 			UpdatedAt: m.UpdatedAt,
 		},
 	}
 }

Also applies to: 111-119

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/installations/installations.go` around lines 102 - 106, transformResult
currently returns *interfaces.Installation but callers immediately dereference
it (e.g., the loop building out := make([]interfaces.Installation, len(results))
where t := transformResult(results[i]) and out[i] = *t); change transformResult
to return interfaces.Installation by value and update its signature and return
sites accordingly, then update call sites (including the similar block at the
other loop around lines 111-119) to assign the returned value directly (out[i] =
transformResult(results[i])) to avoid unnecessary pointer allocation and
dereference churn.
pkg/topics/topic.go (1)

19-36: Minor: Inconsistent naming convention in String() output.

TopicKindWelcomeMessagesV1 returns "welcome_message_v1" (singular) while TopicKindGroupMessagesV1 returns "group_messages_v1" (plural). Consider aligning these for consistency.

♻️ Suggested fix
 	case TopicKindWelcomeMessagesV1:
-		return "welcome_message_v1"
+		return "welcome_messages_v1"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/topics/topic.go` around lines 19 - 36, The String() implementation for
TopicKind has an inconsistent naming: TopicKindWelcomeMessagesV1 returns
"welcome_message_v1" (singular) while TopicKindGroupMessagesV1 returns
"group_messages_v1" (plural); update the return value for
TopicKindWelcomeMessagesV1 inside the TopicKind.String() method to
"welcome_messages_v1" to match the plural convention used by
TopicKindGroupMessagesV1 and keep other case returns unchanged.
pkg/xmtp/listener.go (1)

169-170: Add a defensive default for worker count.

If opts.NumWorkers is zero/negative, no workers start and envelope processing can stall. A local fallback to 50 keeps behavior safe.

♻️ Suggested fallback
 func (l *Listener) startMessageWorkers() {
-	for i := 0; i < l.opts.NumWorkers; i++ {
+	numWorkers := l.opts.NumWorkers
+	if numWorkers <= 0 {
+		numWorkers = 50
+	}
+	for i := 0; i < numWorkers; i++ {
 		go func() {
 			for msg := range l.envelopes {

As per coding guidelines: pkg/xmtp/**/*.go: "XMTP listener should be implemented as a single instance with a worker pool (default 50 workers, channel capacity 100) for processing messages".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/xmtp/listener.go` around lines 169 - 170, The worker-start loop uses
l.opts.NumWorkers directly so if NumWorkers <= 0 no workers are started; modify
the listener initialization to compute a local workerCount (e.g., workerCount :=
l.opts.NumWorkers; if workerCount <= 0 { workerCount = 50 }) and use that
variable in the for loop that launches goroutines (the block starting with `for
i := 0; i < l.opts.NumWorkers; i++ { go func() { ... } }`), and ensure the
worker pool/channel creation follows the project guideline (default 50 workers,
channel capacity 100).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@go.mod`:
- Line 73: The go.mod currently pins go.opentelemetry.io/otel/sdk at v1.39.0
which is vulnerable; update the dependency to v1.40.0 or later by changing the
module version for go.opentelemetry.io/otel/sdk to v1.40.0+ and then run your Go
module commands (e.g., go get go.opentelemetry.io/otel/sdk@v1.40.0 && go mod
tidy or go get -u ./... followed by go mod tidy) to refresh go.sum and ensure
the patched release is used; target the module name
"go.opentelemetry.io/otel/sdk" when locating the entry in go.mod.

In `@pkg/db/db.go`:
- Line 45: Fix the typo in the error message returned by the fmt.Errorf call
that currently uses "coult not initialize migrator: %w"; change it to "could not
initialize migrator: %w" so wrapped errors log correctly and read clearly
(update the fmt.Errorf invocation that constructs this message).

In `@pkg/installations/installations_test.go`:
- Around line 192-196: The test Test_Get currently has its cleanup call
commented out; re-enable the deferred cleanup by uncommenting the defer
cleanup() so resources from test.CreateTestDb(t) are released after the test
finishes—locate the Test_Get function and the captured cleanup variable (from
test.CreateTestDb) alongside createService and restore the line defer cleanup()
immediately after db creation.

In `@pkg/interfaces/interfaces.go`:
- Around line 79-88: GetMessage in the SendRequest type dereferences r.MessageV4
without checking for nil which can panic; update SendRequest.GetMessage to first
check r.Message and if nil then check r.MessageV4 for nil before accessing
UnsignedOriginatorEnvelope, returning nil (or an empty []byte) when both are nil
to avoid a panic. Specifically, modify the GetMessage method to guard the
r.MessageV4.UnsignedOriginatorEnvelope access and return a safe zero value when
MessageV4 is nil.

In `@pkg/options/options.go`:
- Line 28: The struct tag for the D14N flag uses the wrong environment variable
prefix; update the env tag on the D14N field (symbol: D14N) to use the XMTP_
prefix (env:"XMTP_D14N") so it matches other XMTP options and enables env-based
configuration to work correctly; locate the D14N field in options.go and replace
env:"XTMP_D14N" with env:"XMTP_D14N".

In `@pkg/xmtp/envelope.go`:
- Around line 92-104: The TODO should be resolved by confirming and documenting
whether groupMessage.GetV1().GetData() (the serialized MLS message) is the
correct input for computing sender_hmac, not the unsigned envelope payload:
update the messageV4Info construction (fields idempotencyData and HmacInputs) to
either (a) keep using groupMessage.GetV1().GetData() after validating against
the XMTP v4 spec and add a clarifying comment explaining why V1().GetData() is
used for HMAC, or (b) if the spec indicates the unsigned envelope payload is
correct, switch both idempotencyData and HmacInputs to reference that payload;
make the change near the messageV4Info assignment and reference
groupMessage.GetV1().GetData() and interfaces.MessageContext in your comment so
future readers understand the decision.

In `@pkg/xmtp/grpc.go`:
- Around line 31-33: The TLS client config created in the credentials.NewTLS
call (return credentials.NewTLS(&tls.Config{InsecureSkipVerify: false,})) must
explicitly set a minimum TLS version; update the tls.Config used in
pkg/xmtp/grpc.go to include MinVersion: tls.VersionTLS12 (or tls.VersionTLS13 if
your environment requires it) so the configuration clearly enforces the minimum
protocol; ensure the tls package is imported if not already and verify
compatible servers if choosing TLS 1.3.

In `@pkg/xmtp/listener.go`:
- Around line 149-162: The code resets sleepTime only when msg != nil but still
dereferences msg.V3 and msg.V4 unconditionally, causing a nil deref if Receive()
returns (nil, nil); update the message handling in the listener loop (the
variable msg returned by Receive()) to check for nil immediately and skip
processing (e.g., continue) when msg == nil, preserving the existing reset of
sleepTime and only ranging over msg.V3 and msg.V4 when msg is non-nil so
enqueuing to l.envelopes happens safely.
- Around line 54-58: The Listener currently discards the gRPC connection
returned by newConn, leaking resources; modify the Listener struct to hold a
*grpc.ClientConn (or whatever connection type newConn returns), assign conn to
that field after newConn in refreshClient() and wherever a new connection is
created (e.g., startEnvelopeListener flows), and before creating/assigning a new
connection always close the existing one (check for nil and call conn.Close()).
Also add closure logic in Stop() to close any stored connection and set it to
nil so repeated refreshClient() calls (from startEnvelopeListener()) do not
accumulate open sockets/goroutines; reference Listener, newConn,
refreshClient(), startEnvelopeListener(), and Stop().

In `@pkg/xmtp/message.go`:
- Around line 29-30: getContext currently dereferences env without checking for
nil; add a nil guard at the start of getContext (the function that reads
env.ContentTopic) and return an appropriate zero value (e.g., nil or an empty
interfaces.MessageContext) when env is nil to avoid a panic; ensure all
subsequent logic that uses env.ContentTopic only runs after the nil check.

In `@pkg/xmtp/process.go`:
- Around line 111-113: The Info-level log currently emits the entire SendRequest
object (l.logger.Info("delivering notification", zap.Any("send_request", req))),
which risks leaking sensitive data and is too verbose; change this to log only
non-sensitive identifiers or metadata (e.g., request ID, recipient ID, message
size) or switch to Debug level for full payloads. Update the call that
references l.logger.Info and the req (SendRequest) so it either constructs a
redacted/sanitized map of safe fields or uses l.logger.Debug for
zap.Any("send_request", req), ensuring no sensitive installation/message content
is logged at Info.

---

Nitpick comments:
In `@dev/gen-proto`:
- Around line 18-28: The script currently removes ./pkg/proto (rm -rf
./pkg/proto) before validating the input path in the XMTP_PROTO_SOURCE logic;
change the flow to validate arguments and, if a local path was provided, verify
it exists and is a readable directory (or a valid git remote reference) before
deleting or regenerating proto output; specifically, perform preflight checks on
$1 (ensure non-empty and test -d "$1" or appropriate git/url validation) and
only proceed to rm -rf ./pkg/proto and run buf generate "$XMTP_PROTO_SOURCE"
after the checks pass, leaving the default BSR behavior unchanged when no
argument is given.

In `@pkg/installations/installations.go`:
- Around line 102-106: transformResult currently returns
*interfaces.Installation but callers immediately dereference it (e.g., the loop
building out := make([]interfaces.Installation, len(results)) where t :=
transformResult(results[i]) and out[i] = *t); change transformResult to return
interfaces.Installation by value and update its signature and return sites
accordingly, then update call sites (including the similar block at the other
loop around lines 111-119) to assign the returned value directly (out[i] =
transformResult(results[i])) to avoid unnecessary pointer allocation and
dereference churn.

In `@pkg/topics/topic.go`:
- Around line 19-36: The String() implementation for TopicKind has an
inconsistent naming: TopicKindWelcomeMessagesV1 returns "welcome_message_v1"
(singular) while TopicKindGroupMessagesV1 returns "group_messages_v1" (plural);
update the return value for TopicKindWelcomeMessagesV1 inside the
TopicKind.String() method to "welcome_messages_v1" to match the plural
convention used by TopicKindGroupMessagesV1 and keep other case returns
unchanged.

In `@pkg/xmtp/listener.go`:
- Around line 169-170: The worker-start loop uses l.opts.NumWorkers directly so
if NumWorkers <= 0 no workers are started; modify the listener initialization to
compute a local workerCount (e.g., workerCount := l.opts.NumWorkers; if
workerCount <= 0 { workerCount = 50 }) and use that variable in the for loop
that launches goroutines (the block starting with `for i := 0; i <
l.opts.NumWorkers; i++ { go func() { ... } }`), and ensure the worker
pool/channel creation follows the project guideline (default 50 workers, channel
capacity 100).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 60d25f56-ae46-4fe3-ba54-63938f3eaee8

📥 Commits

Reviewing files that changed from the base of the PR and between ddc8661 and 81f28fd.

⛔ Files ignored due to path filters (6)
  • go.sum is excluded by !**/*.sum
  • pkg/proto/device_sync/content.pb.go is excluded by !**/*.pb.go
  • pkg/proto/device_sync/device_sync.pb.go is excluded by !**/*.pb.go
  • pkg/proto/mls/database/task.pb.go is excluded by !**/*.pb.go
  • pkg/proto/xmtpv4/message_api/message_api.pb.go is excluded by !**/*.pb.go
  • pkg/proto/xmtpv4/message_api/message_api_grpc.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (31)
  • .gitignore
  • cmd/server/main.go
  • dev/gen-proto
  • go.mod
  • pkg/db/db.go
  • pkg/delivery/apns.go
  • pkg/delivery/fcm.go
  • pkg/installations/installations.go
  • pkg/installations/installations_test.go
  • pkg/interfaces/interfaces.go
  • pkg/options/options.go
  • pkg/subscriptions/subscriptions.go
  • pkg/subscriptions/subscriptions_test.go
  • pkg/topics/topic.go
  • pkg/topics/topics.go
  • pkg/xmtp/client.go
  • pkg/xmtp/client_v3.go
  • pkg/xmtp/client_v4.go
  • pkg/xmtp/envelope.go
  • pkg/xmtp/grpc.go
  • pkg/xmtp/listener.go
  • pkg/xmtp/listener_test.go
  • pkg/xmtp/message.go
  • pkg/xmtp/process.go
  • pkg/xmtp/utils.go
  • pkg/xmtp/utils_test.go
  • swift/device_sync/content.pb.swift
  • swift/device_sync/device_sync.pb.swift
  • swift/xmtpv4/message_api/message_api.connect.swift
  • swift/xmtpv4/message_api/message_api.pb.swift
  • test/helpers.go

go.opentelemetry.io/otel/trace v1.35.0 // indirect
go.opentelemetry.io/otel v1.39.0 // indirect
go.opentelemetry.io/otel/metric v1.39.0 // indirect
go.opentelemetry.io/otel/sdk v1.39.0 // indirect
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

GHSA-9h8m-3fm2-qjrq GO-2026-4394 go.opentelemetry.io/otel/sdk

💡 Result:

GHSA-9h8m-3fm2-qjrq / GO-2026-4394 (CVE-2026-24051) affects go.opentelemetry.io/otel/sdk on macOS/Darwin: the SDK’s resource detection (sdk/resource/host_id.go) executes the ioreg command via the system search PATH, allowing local PATH hijacking that can lead to arbitrary code execution in the application’s context. [1][2]

  • Affected versions: >= 1.21.0, < 1.40.0 [1][2]
  • Fixed version: 1.40.0 (patch commit d45961b...) [1][2]
  • Remediation: upgrade go.opentelemetry.io/otel/sdk to v1.40.0+. [1][2]

Sources:
[1] GitHub Advisory (GHSA-9h8m-3fm2-qjrq / CVE-2026-24051) (github.com)
[2] OSV entry for GO-2026-4394 (osv.dev)

Citations:


Upgrade to go.opentelemetry.io/otel/sdk v1.40.0 or later to fix vulnerability GHSA-9h8m-3fm2-qjrq.

Version v1.39.0 is affected by CVE-2026-24051, a PATH hijacking vulnerability in the SDK's resource detection (sdk/resource/host_id.go) on macOS/Darwin that can lead to arbitrary code execution. The first patched release is v1.40.0.

🧰 Tools
🪛 OSV Scanner (2.3.3)

[HIGH] 73-73: go.opentelemetry.io/otel/sdk 1.39.0: OpenTelemetry Go SDK Vulnerable to Arbitrary Code Execution via PATH Hijacking in go.opentelemetry.io/otel/sdk

(GO-2026-4394)


[HIGH] 73-73: go.opentelemetry.io/otel/sdk 1.39.0: OpenTelemetry Go SDK Vulnerable to Arbitrary Code Execution via PATH Hijacking

(GHSA-9h8m-3fm2-qjrq)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@go.mod` at line 73, The go.mod currently pins go.opentelemetry.io/otel/sdk at
v1.39.0 which is vulnerable; update the dependency to v1.40.0 or later by
changing the module version for go.opentelemetry.io/otel/sdk to v1.40.0+ and
then run your Go module commands (e.g., go get
go.opentelemetry.io/otel/sdk@v1.40.0 && go mod tidy or go get -u ./... followed
by go mod tidy) to refresh go.sum and ensure the patched release is used; target
the module name "go.opentelemetry.io/otel/sdk" when locating the entry in
go.mod.

Comment on lines +31 to +33
return credentials.NewTLS(&tls.Config{
InsecureSkipVerify: false,
})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, locate and examine the grpc.go file
find . -name "grpc.go" -path "*/xmtp/*" | head -20

Repository: xmtp/example-notification-server-go

Length of output: 97


🏁 Script executed:

# Get the specific lines mentioned in the review
sed -n '25,40p' pkg/xmtp/grpc.go

Repository: xmtp/example-notification-server-go

Length of output: 286


🌐 Web query:

recommended minimum TLS version Go grpc-go client connections best practices 2024 2025

💡 Result:

Recommended minimum TLS version for Go grpc-go client connections (best practice 2024–2025): set TLS 1.2 as the minimum, and prefer TLS 1.3 when you control both ends.

  • Why TLS 1.2 minimum? gRPC uses HTTP/2, and the HTTP/2 spec requires TLS 1.2 or higher when running “HTTP/2 over TLS”. [1]
  • What does Go do by default? In modern Go, crypto/tls uses TLS 1.2 as the default minimum if you don’t set tls.Config.MinVersion. Still, many teams set it explicitly for policy/audit clarity. [2]
  • How to enforce in grpc-go: use credentials.NewTLS so you can pass a full tls.Config (including MinVersion). [3]

Example (client):

creds := credentials.NewTLS(&tls.Config{
    MinVersion: tls.VersionTLS12, // or tls.VersionTLS13 if you can require it
})
conn, err := grpc.Dial(target, grpc.WithTransportCredentials(creds))

Sources: [1] RFC 7540 (HTTP/2) TLS requirements, [2] Go crypto/tls docs for MinVersion default, [3] grpc-go credentials package docs (NewTLS).


🌐 Web query:

Go tls.VersionTLS13 client configuration gRPC compatibility requirements

💡 Result:

To force TLS 1.3 for a gRPC-Go client, set tls.Config.MinVersion = tls.VersionTLS13 when creating transport creds (e.g., via credentials.NewTLS). If you do this, the server (or TLS-terminating proxy/LB in front of it) must support TLS 1.3, otherwise the TLS handshake will fail. [2]

gRPC over TLS requires HTTP/2 negotiation via ALPN:

  • HTTP/2 over TLS uses the ALPN protocol ID h2, and HTTP/2 over TLS MUST use TLS negotiation with ALPN. [3]
  • grpc-go’s credentials.NewTLS implementation automatically appends h2 to tls.Config.NextProtos, so you generally don’t need to set NextProtos yourself. [1]

Defaults & what changes when you set TLS 1.3:

  • If you don’t set a MinVersion, grpc-go defaults MinVersion to TLS 1.2 (to satisfy HTTP/2 requirements). [1]
  • Go’s crypto/tls.Config defaults are Min TLS 1.2 and Max TLS 1.3 (unless you override them). [2]

Minimal client example (TLS 1.3 only):

creds := credentials.NewTLS(&tls.Config{
    MinVersion: tls.VersionTLS13,
    // RootCAs / ServerName / Certificates as needed
})
conn, err := grpc.NewClient(target, grpc.WithTransportCredentials(creds))

Sources: [1] grpc-go credentials.NewTLS defaults / ALPN h2 handling, [2] Go crypto/tls.Config MinVersion/MaxVersion defaults, [3] RFC 9113 HTTP/2 ALPN + TLS requirements.


Set an explicit minimum TLS version.

At Line 31, the client TLS config does not set MinVersion. While Go defaults to TLS 1.2 (which satisfies HTTP/2 requirements), explicitly configuring this is a best practice for security policy clarity and auditability.

Set MinVersion: tls.VersionTLS12 to match Go's secure default explicitly. If your environment requires TLS 1.3 only, use tls.VersionTLS13, but verify all connected servers support TLS 1.3 to avoid handshake failures.

🔐 Suggested patch (TLS 1.2)
 	if useTls {
 		return credentials.NewTLS(&tls.Config{
+			MinVersion:         tls.VersionTLS12,
 			InsecureSkipVerify: false,
 		})
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return credentials.NewTLS(&tls.Config{
InsecureSkipVerify: false,
})
if useTls {
return credentials.NewTLS(&tls.Config{
MinVersion: tls.VersionTLS12,
InsecureSkipVerify: false,
})
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/xmtp/grpc.go` around lines 31 - 33, The TLS client config created in the
credentials.NewTLS call (return
credentials.NewTLS(&tls.Config{InsecureSkipVerify: false,})) must explicitly set
a minimum TLS version; update the tls.Config used in pkg/xmtp/grpc.go to include
MinVersion: tls.VersionTLS12 (or tls.VersionTLS13 if your environment requires
it) so the configuration clearly enforces the minimum protocol; ensure the tls
package is imported if not already and verify compatible servers if choosing TLS
1.3.

Comment on lines +111 to +113
l.logger.Info("delivering notification",
zap.Any("send_request", req),
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid logging full SendRequest at Info level.

Line 112 logs the entire request object, which is likely to include sensitive installation/message data and is too verbose for hot-path Info logs.

🔧 Suggested log-scope reduction
-		l.logger.Info("delivering notification",
-			zap.Any("send_request", req),
-		)
+		l.logger.Info("delivering notification",
+			zap.String("topic", req.GetTopic()),
+			zap.String("message_type", string(req.MessageContext.MessageType)),
+			zap.String("installation_id", req.Installation.Id),
+		)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/xmtp/process.go` around lines 111 - 113, The Info-level log currently
emits the entire SendRequest object (l.logger.Info("delivering notification",
zap.Any("send_request", req))), which risks leaking sensitive data and is too
verbose; change this to log only non-sensitive identifiers or metadata (e.g.,
request ID, recipient ID, message size) or switch to Debug level for full
payloads. Update the call that references l.logger.Info and the req
(SendRequest) so it either constructs a redacted/sanitized map of safe fields or
uses l.logger.Debug for zap.Any("send_request", req), ensuring no sensitive
installation/message content is logged at Info.

Comment on lines +84 to +87
// TODO: Potentially this should be the internal V1.GetData()
// Right now the HmacInputs and IdempotencyKey are the GetData() bytes,
// while 'Message' is the unsigned originator envelope.
return r.MessageV4.UnsignedOriginatorEnvelope
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure what the best equivalent is for the Message when it comes to v1 envelope.

Is it the unsigned originator envelopes as a whole, or the deeply nested Data field we get from the group message.

Right now here we use the former, and for HmacInputs and IdempotencyKeys the latter. I have a hunch only one of those is correct.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the clients perspective, I think they are going to have the easiest time just taking the full serialized OriginatorEnvelope.

This is different than the current behavior, but I think will make the client's job easier since it is the same payload they would get from the streaming or query APIs. Somehow we also need to tell the client what type of envelope is coming in the push payload, so they can process v4 and v3 envelopes differently.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted.

Regarding types - that's why I put them into separate fields in the outgoing message. Receiver could only check if they have message or message_v4. If they have the first one they work like so far, if they get message_v4 then it's a new envelope.

Now, I have two things I'm unsure of.


The idempotency key

To me it seems like it's a deduplication thing, so as long as we use the same source for this each time, all is well. Is my understanding correct?

HMAC

If I understood it correctly, SenderHMAC comes with the message. We have keys in the DB, and if all of that data is present - we recalculate the hash of the message and verify if the signature matches the key we have. If it does => skip sending the notification as it comes from the poster itself (don't notify me on messages I send).

So, to correctly verify this, we need to know what data does the SenderHMAC refer to, is that correct? Is it still the original V1 group message that we unpacked from the client envelope?


If my assumptions are correct, we can just keep the code as is, use the unpacked V1 data for both of these, but send the originator envelope as the payload to the notification receiver..

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually - one thing I'll need to check is - the payload difference will be evident to the HTTP delivery mechanism. I will need to check handling for APNs and FCM - they might need additional info to know that the payload is different.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I like the two-fields approach. Makes sense to let the client decide how to handle.

Idempotency key just needs to be something that is unique per message from the network. Mostly used for custom or HTTP delivery services to decide how to dedupe.

SenderHMAC is unique to group messages. Your understanding sounds correct. To correctly verify you can generate some phony payloads and a HMAC key.

My understanding of what gets signed in v4 might be out of date. Your best bet is to dig into libxmtp and see what payload gets signed for the SenderHMAC today.

Copy link
Author

@Maelkum Maelkum Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with the option to use V1 Data for both idempotency key and HMAC Input.

By checking the code I saw that the HMAC does refer to the original V1 data, and not any surrounding envelopes (which makes sense when observing the locality of fields).

Regarding the message payloads sent to the receiving party - I went with the mentioned option (message vs message_v4) for HTTP delivery, which makes it easy for the client to know what it is receiving.

For APNs and FCM - I went with the current approach where we only send the message payload - the current approach also doesn't even send the full V1 envelope, just the data. I felt this made sense because

  1. It's backwards compatible. Both APNs and FC seem to have an expected layout for the incoming data, so changing it would break the current implementation.
  2. Client doesn't have to know/change anything to handle V4. If we wanted to communicate we are sending V4 originator envelope to the recipient, we would have to change the data format, or, perhaps - add some HTTP headers (if handling that is supported - not sure how notifications get processed on the other side of the pipeline).
  3. Since the previous approach was already discarding data - seemed fine
  4. HTTP Delivery is there if one wants to get more detail message info

Also - in order to avoid double unmarshalling of the envelope - I added a private field to the message context with messageData. That way when we need to send it - it's there and ready.

NOTE: Idempotency data, HMAC Input and messageData point to byte slices. They all point to the same thing (message data), but seemed safer in case one of those ever changes, not to accidentally impact the other fields. Little redundant but seems safer

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appreciate the work to maintain backwards compatibility with clients. Have you talked to @insipx about whether this will work with clients processing d14n payloads?

We would need to be able to process these messages without knowing their originator_id, sequence_id, or dependencies, since the client would have all those fields stripped out. The client would take the d14n message and treat is as a V3 message, which could lead to problems in the client.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I did not think about that. That actually makes a lot of sense, perhaps striving for compatibility was a fools errand just because the entire concept of messages was so different. Will check this

@macroscopeapp
Copy link

macroscopeapp bot commented Mar 4, 2026

Add v4 envelope processing and d14n backend support to the XMTP listener

  • Introduces a SubscriberClient abstraction in pkg/xmtp/client.go that unifies v3 and v4 gRPC streaming behind a common Subscribe/Receive interface, selectable via the new --d14n CLI flag.
  • Replaces the v3-only startMessageListener with startEnvelopeListener in pkg/xmtp/listener.go, routing both v3 *v1.Envelope and v4 *OriginatorEnvelope through a single chan any to worker goroutines.
  • Adds processV4Envelope in pkg/xmtp/process.go to parse, classify, and deliver v4 envelopes, including idempotency key generation and cursor-based subscription.
  • Extends SendRequest in pkg/interfaces/interfaces.go with a MessageV4 field and Empty()/GetTopic()/GetMessage() helpers; updates APNs and FCM delivery to use these helpers.
  • Adds structured topic parsing via ParseTopic in pkg/topics/topic.go and renames installations/subscriptions service types from DefaultInstallationService/SubscriptionsService to Service.
  • Risk: the --d14n flag defaults to false (v3); enabling it switches to v4 streams with cursor state, which changes reconnect behavior and requires v4-compatible infrastructure.

Macroscope summarized b34a88d.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
pkg/xmtp/listener.go (1)

30-31: ⚠️ Potential issue | 🟠 Major

Close superseded gRPC connections on refresh.

newConn() returns a *grpc.ClientConn, but this file only keeps the wrapped SubscriberClient. Every reconnect replaces l.client without closing the old transport, and Stop() has no way to release the initial connection either. Under stream errors, that will accumulate leaked sockets/goroutines.

Suggested fix
 import (
 	"context"
 	"crypto/sha1"
 	"encoding/hex"
 	"errors"
 	"fmt"
 	"io"
 	"strings"
 	"time"
 
 	"go.uber.org/zap"
+	"google.golang.org/grpc"
@@
 type Listener struct {
 	logger     *zap.Logger
 	ctx        context.Context
 	cancelFunc func()
 	opts       options.XmtpOptions
 
+	conn      *grpc.ClientConn
 	client    SubscriberClient
 	envelopes chan any
@@
 	listener := &Listener{
 		ctx:              ctx,
 		cancelFunc:       cancel,
 		logger:           logger.Named("xmtp-listener"),
 		opts:             opts,
+		conn:             conn,
 		client:           client,
 		envelopes:        make(chan any, 100),
@@
 func (l *Listener) Stop() {
 	l.cancelFunc()
+	if l.conn != nil {
+		_ = l.conn.Close()
+		l.conn = nil
+	}
 }
@@
 func (l *Listener) refreshClient() error {
 	conn, err := newConn(l.opts.GrpcAddress, l.opts.UseTls, l.clientVersion, l.appVersion)
 	if err != nil {
 		return fmt.Errorf("could not refresh GRPC client: %w", err)
 	}
 
+	if l.conn != nil {
+		_ = l.conn.Close()
+	}
+	l.conn = conn
 	l.client = newSubscriberClient(conn, UseV3Client(!l.opts.D14N))
 
 	return nil
 }

Also applies to: 54-62, 243-250

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/xmtp/listener.go` around lines 30 - 31, The listener currently only
stores the wrapped SubscriberClient and never closes the underlying
*grpc.ClientConn from newConn(), leaking sockets/goroutines on reconnects;
modify the listener struct to also store the active *grpc.ClientConn (e.g., add
a conn *grpc.ClientConn field), update the code paths that call newConn() (where
l.client is replaced) to close any previous l.conn before assigning the new
connection and creating the SubscriberClient, and ensure Stop() also closes
l.conn and drains/cleans resources; add necessary synchronization (mutex) around
replacing/closing l.conn and l.client if these happen on different goroutines to
avoid races.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/xmtp/listener.go`:
- Around line 107-119: The retry loop after l.client.Subscribe must exit when
the listener context is canceled: after Subscribe returns an error (and
similarly in the later retry block around lines 136-146), check l.ctx.Err() or
context.Canceled and break/return so the goroutine stops retrying, closes
l.envelopes, and lets streamLoop/Stop() complete; update the error handling in
the Subscribe retry block and the refreshClient/sleep backoff path to detect
l.ctx cancellation and return early instead of continuing reconnect attempts.

---

Duplicate comments:
In `@pkg/xmtp/listener.go`:
- Around line 30-31: The listener currently only stores the wrapped
SubscriberClient and never closes the underlying *grpc.ClientConn from
newConn(), leaking sockets/goroutines on reconnects; modify the listener struct
to also store the active *grpc.ClientConn (e.g., add a conn *grpc.ClientConn
field), update the code paths that call newConn() (where l.client is replaced)
to close any previous l.conn before assigning the new connection and creating
the SubscriberClient, and ensure Stop() also closes l.conn and drains/cleans
resources; add necessary synchronization (mutex) around replacing/closing l.conn
and l.client if these happen on different goroutines to avoid races.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: aa1b5078-4d48-4bfe-9875-608e4c1638f4

📥 Commits

Reviewing files that changed from the base of the PR and between 99e7d43 and b34a88d.

📒 Files selected for processing (1)
  • pkg/xmtp/listener.go

@macroscopeapp
Copy link

macroscopeapp bot commented Mar 10, 2026

  • Refactored topic access pattern by removing SendRequest.GetTopic method and updating all consumers to access MessageContext.Topic directly [8ee7a4b]
  • Modified Listener.deliver method to attempt delivery through all matching delivery services instead of returning after the first successful delivery [8ee7a4b]
  • Added connection lifecycle management to Listener struct with mutex protection for thread-safe operations [8ee7a4b]
  • Improved Listener.startEnvelopeListener method to handle channel cleanup and context cancellation properly [8ee7a4b]
  • Added early return in Listener.processV4Envelope method to skip subscription lookup and delivery when context.ShouldPush is explicitly false [8ee7a4b]
  • Corrected typos in error messages and comments [8ee7a4b]
  • Updated test setup in Test_Get to properly capture and defer cleanup function [8ee7a4b]

Changes since #65 opened

  • Changed Listener.processV3Envelope method to always return nil instead of propagating delivery errors [56d1b46]
  • Removed cursor parameter from SubscriberClient.Subscribe interface method and updated clientWrapper.Subscribe implementation to accept only context [ce14918]
  • Updated Listener.startEnvelopeListener to remove cursor parameter and call Subscribe without cursor argument [ce14918]
  • Introduced MessagePayloadV4 field to MessageContext and modified SendRequest.GetMessagePayload() to return V3 message bytes unchanged or V4 payload bytes from MessageContext.MessagePayloadV4, with xmtp.parseV4Envelope now populating MessagePayloadV4 from V1 data bytes [8cd7c65]
  • Updated delivery.ApnsDelivery.Send and delivery.FcmDelivery.Send to call req.GetMessagePayload() instead of req.GetMessage() when building notifications [8cd7c65]
  • Refactored xmtp.Listener envelope channel from chan any to chan genericEnvelope with a new wrapper type and helper methods [8cd7c65]
  • Added Migration.md documentation describing V3/V4 node support, D14N enablement via --d14n flag, and payload format differences, and updated README.md with reference to migration guide and new flag documentation [8cd7c65]
  • Added TODO comment in subscriptions.Service.Subscribe regarding input validation [8cd7c65]
  • Added support for welcome messages in V4 envelope parsing [7ce66aa]
  • Extracted group message handling into dedicated helper function [7ce66aa]
  • Added import alias for MLS API V1 protocol buffer types [7ce66aa]

Macroscope summarized 8ee7a4b.

case TopicKindGroupMessagesV1:
return "group_messages_v1"
case TopicKindWelcomeMessagesV1:
return "welcome_message_v1"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium topics/topic.go:24

TopicKindWelcomeMessagesV1.String() returns "welcome_message_v1" (singular) but the constant name and all other topic kinds use plural forms. If this string is used for topic routing or matching against external systems expecting "welcome_messages_v1", the mismatch will cause lookup failures.

Suggested change
return "welcome_message_v1"
return "welcome_messages_v1"
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file pkg/topics/topic.go around line 24:

`TopicKindWelcomeMessagesV1.String()` returns `"welcome_message_v1"` (singular) but the constant name and all other topic kinds use plural forms. If this string is used for topic routing or matching against external systems expecting `"welcome_messages_v1"`, the mismatch will cause lookup failures.

Evidence trail:
pkg/topics/topic.go lines 12-32 (at REVIEWED_COMMIT): Shows `TopicKindWelcomeMessagesV1` constant (plural) returns `"welcome_message_v1"` (singular) while all other constants have matching plural forms in their String() output.
pkg/subscriptions/subscriptions.go lines 138-151: Shows `GetSubscriptions` does exact string match via `WHERE topic = ?`.
pkg/xmtp/process.go line 79: Shows `info.context.Topic` (from `topic.String()`) is used to lookup subscriptions.
pkg/xmtp/envelope.go line 98: Shows `topic.String()` is assigned to `MessageContext.Topic`.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/delivery/fcm.go (1)

57-62: ⚠️ Potential issue | 🟠 Major

Clarify v4 push payload compatibility or add versioning.

The payload structure has changed: req.GetMessage() now returns the serialized UnsignedOriginatorEnvelope (which contains blockchain metadata like originator node ID, sequence ID, payer envelope bytes, and fees) instead of the inner encrypted MLS message. This is published under the legacy encryptedMessage field, which is semantically misleading for v4.

Two unresolved TODOs in the codebase (pkg/interfaces/interfaces.go:76-78 and pkg/xmtp/envelope.go) indicate ongoing uncertainty about this design choice. Either add a version field to the payload structure to allow clients to distinguish between v3 and v4 formats, or provide documentation confirming that all recipient clients have been updated to parse the outer envelope from this field.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/delivery/fcm.go` around lines 57 - 62, The payload now encodes the
serialized UnsignedOriginatorEnvelope (req.GetMessage()) but is still published
under the misleading "encryptedMessage" field in the map created in
pkg/delivery/fcm.go (see variables message, topic and the data map), so update
the payload to be explicitly versioned or renamed: add a "version" (e.g. "v3" vs
"v4") field to the data map or rename "encryptedMessage" to something like
"envelope" and set "version":"v4"; update any related sending code to include
this new field and ensure consumers parse based on the new version field (or
rename) so clients can distinguish v3/v4 formats and avoid misinterpreting
req.GetMessage() contents.
♻️ Duplicate comments (2)
pkg/xmtp/process.go (1)

46-49: ⚠️ Potential issue | 🟠 Major

Redact hot-path notification logs.

These zap.Any(...) calls can dump HMAC inputs, envelope bytes, installation metadata, and delivery tokens into application logs. Keep Info logs to stable identifiers like topic, message_type, and installation_id, or move full payloads to Debug with explicit redaction.

Also applies to: 114-116

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/xmtp/process.go` around lines 46 - 49, Log call is currently leaking
sensitive payloads via zap.Any("message_context", request.MessageContext) and
similar fields; change Info-level logging to only include stable identifiers
(e.g., topic, message_type, installation_id) and move full payloads to
Debug-level with explicit redaction. Specifically, update the call to
l.logger.Info in process.go (and the duplicated block around lines 114-116) to
remove zap.Any("message_context", request.MessageContext) and any direct
payload/HMAC fields, add zap.String/zap.Bool for safe identifiers (topic,
message_type, installation_id, subscription_has_hmac_key), and if you still need
message_context or envelope bytes, log them at Debug with a redaction helper
that masks HMACs/tokens before passing to zap.Debug. Ensure the
Subscription.HmacKey presence check remains but do not log the raw key or full
structures at Info level.
pkg/xmtp/message.go (1)

29-30: ⚠️ Potential issue | 🟡 Minor

Guard env before reading ContentTopic.

getContext(nil) still panics here. A small nil guard would make bad envelope inputs fail closed instead of crashing the worker.

Also applies to: 48-48

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/xmtp/message.go` around lines 29 - 30, getContext reads env.ContentTopic
without checking env for nil, causing a panic for bad inputs; add a nil guard at
the start of getContext (check if env == nil) and return a safe
zero-value/closed-failure MessageContext (or an error-wrapping context) instead
of dereferencing env; update the code path that calls
topics.GetMessageType(env.ContentTopic) to only run after the nil check. Apply
the same nil-guard pattern to any other functions in this file that read
env.ContentTopic so all envelope reads fail closed rather than panicking.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/xmtp/client.go`:
- Around line 22-24: SubscriberStream.Receive currently returns only
(*EnvelopesWrapped, error) and drops per-node/sequence cursor metadata
(EnvelopesWrapped), causing resubscribes to replay already-processed envelopes;
change the stream to carry cursor progress by returning the updated cursor with
each Receive call (e.g., Receive() -> (/*envelopes*/ *EnvelopesWrapped,
/*cursor*/ CursorType, error)) or, if altering the interface is undesirable,
update the v4 stream adapter in pkg/xmtp/listener.go to advance/attach the
latest cursor to EnvelopesWrapped before handing batches to the listener so
Subscribe()’s cursor map gets updated on each batch; locate and modify
SubscriberStream.Receive, EnvelopesWrapped, and the v4 stream adapter code paths
that transform incoming batches to ensure the latest per-node/sequence cursor is
preserved and returned to the caller.

---

Outside diff comments:
In `@pkg/delivery/fcm.go`:
- Around line 57-62: The payload now encodes the serialized
UnsignedOriginatorEnvelope (req.GetMessage()) but is still published under the
misleading "encryptedMessage" field in the map created in pkg/delivery/fcm.go
(see variables message, topic and the data map), so update the payload to be
explicitly versioned or renamed: add a "version" (e.g. "v3" vs "v4") field to
the data map or rename "encryptedMessage" to something like "envelope" and set
"version":"v4"; update any related sending code to include this new field and
ensure consumers parse based on the new version field (or rename) so clients can
distinguish v3/v4 formats and avoid misinterpreting req.GetMessage() contents.

---

Duplicate comments:
In `@pkg/xmtp/message.go`:
- Around line 29-30: getContext reads env.ContentTopic without checking env for
nil, causing a panic for bad inputs; add a nil guard at the start of getContext
(check if env == nil) and return a safe zero-value/closed-failure MessageContext
(or an error-wrapping context) instead of dereferencing env; update the code
path that calls topics.GetMessageType(env.ContentTopic) to only run after the
nil check. Apply the same nil-guard pattern to any other functions in this file
that read env.ContentTopic so all envelope reads fail closed rather than
panicking.

In `@pkg/xmtp/process.go`:
- Around line 46-49: Log call is currently leaking sensitive payloads via
zap.Any("message_context", request.MessageContext) and similar fields; change
Info-level logging to only include stable identifiers (e.g., topic,
message_type, installation_id) and move full payloads to Debug-level with
explicit redaction. Specifically, update the call to l.logger.Info in process.go
(and the duplicated block around lines 114-116) to remove
zap.Any("message_context", request.MessageContext) and any direct payload/HMAC
fields, add zap.String/zap.Bool for safe identifiers (topic, message_type,
installation_id, subscription_has_hmac_key), and if you still need
message_context or envelope bytes, log them at Debug with a redaction helper
that masks HMACs/tokens before passing to zap.Debug. Ensure the
Subscription.HmacKey presence check remains but do not log the raw key or full
structures at Info level.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 2be76994-a974-4c52-9b55-4161111a364a

📥 Commits

Reviewing files that changed from the base of the PR and between b34a88d and 8ee7a4b.

📒 Files selected for processing (9)
  • pkg/db/db.go
  • pkg/delivery/apns.go
  • pkg/delivery/fcm.go
  • pkg/installations/installations_test.go
  • pkg/interfaces/interfaces.go
  • pkg/xmtp/client.go
  • pkg/xmtp/listener.go
  • pkg/xmtp/message.go
  • pkg/xmtp/process.go
🚧 Files skipped from review as they are similar to previous changes (3)
  • pkg/delivery/apns.go
  • pkg/installations/installations_test.go
  • pkg/interfaces/interfaces.go

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
pkg/xmtp/listener.go (1)

127-134: ⚠️ Potential issue | 🟠 Major

Make the reconnect backoff cancelable and bounded.

Lines 129-131 and 156-157 still use time.Sleep(sleepTime) and double the delay forever. That keeps shutdown waiting on backoff instead of letting the deferred close(l.envelopes) run promptly, and a longer outage can push retries into hours or days.

Suggested retry-loop hardening
 const STARTING_SLEEP_TIME = 100 * time.Millisecond
+const MAX_SLEEP_TIME = 30 * time.Second
@@
-			time.Sleep(sleepTime)
-
-			sleepTime = sleepTime * 2
+			select {
+			case <-time.After(sleepTime):
+			case <-l.ctx.Done():
+				return
+			}
+
+			sleepTime *= 2
+			if sleepTime > MAX_SLEEP_TIME {
+				sleepTime = MAX_SLEEP_TIME
+			}
 			if err = l.refreshClient(); err != nil {
 				l.logger.Error("error refreshing client", zap.Error(err))
 			}
@@
-					time.Sleep(sleepTime)
-					sleepTime = sleepTime * 2
+					select {
+					case <-time.After(sleepTime):
+					case <-l.ctx.Done():
+						return
+					}
+					sleepTime *= 2
+					if sleepTime > MAX_SLEEP_TIME {
+						sleepTime = MAX_SLEEP_TIME
+					}
 					if err = l.refreshClient(); err != nil {
 						l.logger.Error("error refreshing client", zap.Error(err))
 					}

Based on learnings: Both API server and XMTP listener services should be started as goroutines from the main entry point and shut down gracefully on SIGINT/SIGTERM signals.

Also applies to: 155-160

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/xmtp/listener.go` around lines 127 - 134, The reconnect backoff must be
cancelable and bounded: replace direct time.Sleep(sleepTime) usage in the
listener retry loop with a context-aware wait (select on ctx.Done() and
time.After or a timer) so shutdown signals can interrupt sleeps and let deferred
close(l.envelopes) run; introduce a maxBackoff constant and cap sleepTime (e.g.,
sleepTime = min(sleepTime*2, maxBackoff)) and reset sleepTime to the initial
value on a successful connection; ensure the loop uses an injected or existing
context (ctx or l.ctx) and that calls around l.refreshClient() and the select
return/exit when ctx.Done() is signaled.
pkg/xmtp/process.go (1)

45-49: ⚠️ Potential issue | 🟠 Major

Redact the delivery-path logs.

Lines 45-49 and 107-116 pass whole context/request structs to zap.Any. That will reflect exported HmacInputs, SenderHmac, idempotency bytes, installation data, and raw envelopes, so these logs are both sensitive and noisy on the hot path.

Suggested log-scope reduction
-		if !shouldDeliver(request.MessageContext, request.Subscription) {
-			l.logger.Info("skipping delivery of request",
-				zap.Any("message_context", request.MessageContext),
-				zap.Bool("subscription_has_hmac_key", request.Subscription.HmacKey != nil),
-			)
+		if !shouldDeliver(request.MessageContext, request.Subscription) {
+			l.logger.Debug("skipping delivery of request",
+				zap.String("topic", request.MessageContext.Topic),
+				zap.String("message_type", string(request.MessageContext.MessageType)),
+				zap.Bool("subscription_has_hmac_key", request.Subscription.HmacKey != nil),
+			)
 			continue
 		}
@@
-		if !shouldDeliver(info.context, req.Subscription) {
-			l.logger.Debug("skipping delivery",
-				zap.Any("message_context", *info),
-				zap.Bool("subscription_has_hmac_key", req.Subscription.HmacKey != nil),
-			)
+		if !shouldDeliver(info.context, req.Subscription) {
+			l.logger.Debug("skipping delivery",
+				zap.String("topic", req.MessageContext.Topic),
+				zap.String("message_type", string(req.MessageContext.MessageType)),
+				zap.Bool("subscription_has_hmac_key", req.Subscription.HmacKey != nil),
+			)
 			continue
 		}
 
-		l.logger.Info("delivering notification",
-			zap.Any("send_request", req),
-		)
+		l.logger.Info("delivering notification",
+			zap.String("topic", req.MessageContext.Topic),
+			zap.String("message_type", string(req.MessageContext.MessageType)),
+			zap.String("delivery_mechanism", string(req.Installation.DeliveryMechanism.Kind)),
+		)

Also applies to: 107-116

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/xmtp/process.go` around lines 45 - 49, The log that calls logger.Info
with zap.Any("message_context", request.MessageContext) and
zap.Any("subscription", request.Subscription) leaks sensitive and noisy fields
(HmacInputs, SenderHmac, idempotency, installation, raw envelopes); update the
logger calls (the one guarding shouldDeliver(request.MessageContext,
request.Subscription) and the similar logger call later that logs the full
request) to only emit non-sensitive, minimal fields—e.g., subscription
identifier or name (request.Subscription.ID or request.Subscription.Name), a
boolean for HMAC presence (request.Subscription.HmacKey != nil), and any small
routing flags—and remove zap.Any of the full structs (or use a dedicated
redact/summary helper that returns a sanitized struct with only safe fields) so
sensitive fields like HmacInputs, SenderHmac, idempotency bytes, installation
data, and raw envelopes are never logged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@pkg/xmtp/process.go`:
- Around line 71-75: The early-return treating ShouldPush as an explicit opt-out
is wrong because presence may be lost by using GetShouldPush(); update parsing
in envelope.go so presence is preserved (set info.context.ShouldPush to a *bool
only when the proto field is present) and stop relying on GetShouldPush();
ensure the code that checks push uses the nil-check pattern
(info.context.ShouldPush != nil && !*info.context.ShouldPush) so unset fields
default to allowing push and only an explicit false suppresses notifications.

---

Duplicate comments:
In `@pkg/xmtp/listener.go`:
- Around line 127-134: The reconnect backoff must be cancelable and bounded:
replace direct time.Sleep(sleepTime) usage in the listener retry loop with a
context-aware wait (select on ctx.Done() and time.After or a timer) so shutdown
signals can interrupt sleeps and let deferred close(l.envelopes) run; introduce
a maxBackoff constant and cap sleepTime (e.g., sleepTime = min(sleepTime*2,
maxBackoff)) and reset sleepTime to the initial value on a successful
connection; ensure the loop uses an injected or existing context (ctx or l.ctx)
and that calls around l.refreshClient() and the select return/exit when
ctx.Done() is signaled.

In `@pkg/xmtp/process.go`:
- Around line 45-49: The log that calls logger.Info with
zap.Any("message_context", request.MessageContext) and zap.Any("subscription",
request.Subscription) leaks sensitive and noisy fields (HmacInputs, SenderHmac,
idempotency, installation, raw envelopes); update the logger calls (the one
guarding shouldDeliver(request.MessageContext, request.Subscription) and the
similar logger call later that logs the full request) to only emit
non-sensitive, minimal fields—e.g., subscription identifier or name
(request.Subscription.ID or request.Subscription.Name), a boolean for HMAC
presence (request.Subscription.HmacKey != nil), and any small routing flags—and
remove zap.Any of the full structs (or use a dedicated redact/summary helper
that returns a sanitized struct with only safe fields) so sensitive fields like
HmacInputs, SenderHmac, idempotency bytes, installation data, and raw envelopes
are never logged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 45a2da9a-8e9a-4e0e-b5bc-3985122fea75

📥 Commits

Reviewing files that changed from the base of the PR and between 8ee7a4b and ce14918.

📒 Files selected for processing (3)
  • pkg/xmtp/client.go
  • pkg/xmtp/listener.go
  • pkg/xmtp/process.go

Comment on lines +71 to +75
if info.context.ShouldPush != nil && !*info.context.ShouldPush {
// No need to do any expensive queries.
return nil

}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

rg -n -C3 'GetShouldPush\(|ShouldPush|should_push' pkg/xmtp/envelope.go pkg/proto

Repository: xmtp/example-notification-server-go

Length of output: 6539


Don't treat defaulted should_push as an explicit opt-out.

When GroupMessage_V1.ShouldPush is absent (proto3 default), GetShouldPush() returns false, losing the distinction between "field not set" and "explicitly set to false". Lines 71-73 treat ShouldPush == false as an explicit opt-out and skip the notification, but unset fields should default to allowing push rather than skipping. The fast-path early return is incorrect unless presence information is preserved during parsing in envelope.go (line 88–97).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/xmtp/process.go` around lines 71 - 75, The early-return treating
ShouldPush as an explicit opt-out is wrong because presence may be lost by using
GetShouldPush(); update parsing in envelope.go so presence is preserved (set
info.context.ShouldPush to a *bool only when the proto field is present) and
stop relying on GetShouldPush(); ensure the code that checks push uses the
nil-check pattern (info.context.ShouldPush != nil && !*info.context.ShouldPush)
so unset fields default to allowing push and only an explicit false suppresses
notifications.

@Maelkum
Copy link
Author

Maelkum commented Mar 10, 2026

Update for a number of changes made today:

  • topic handling unified - top-level field for topic originally added for v4 message is now uniformly used for v1 also
  • remove cursor support
  • update shutdown and cleanup
    • keep latest/ongoing grpc connection in order to close it when needed
    • add locking to prevent errors for concurrent use (e.g. subscription goroutine and cancellation signal arrives)
  • short circuit if message should not be pushed - don't do any expensive queries
  • continue trying to deliver message if any single delivery mechanism fails

TODOs:

  • check idempotency and HMAC parameters - question here
  • versioning - API clients should know if delivered payload is v1 or v4 (only HTTP delivery mechanism knows this for sure at the moment)
  • topic handling - topic names are different for v1 and v4, and right now we (probably) do exact matching on topic name

Comment on lines +112 to +133
func parseV4WelcomeMessage(welcomeMessage *apiv1.WelcomeMessageInput, topic string, originatorNs int64) (*messageV4Info, bool, error) {
if welcomeMessage == nil {
// Should not happen as topic kind told us what it was
return nil, false, errors.New("welcome message missing")
}

messageData := welcomeMessage.GetV1().GetData()

// We use the V1 Data as inputs for both idempotency key and HMAC input, equivalent to the old behavior.
out := messageV4Info{
context: interfaces.MessageContext{
// Essentially maintaining compatibility with the previous classification.
// TODO: Check - might include V4 variants?
MessageType: topics.V3Welcome,
Topic: topic,
MessagePayloadV4: messageData,
},
originatorNs: originatorNs,
idempotencyData: &messageData,
}

return &out, true, nil
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Medium xmtp/envelope.go:112

parseV4WelcomeMessage does not validate that welcomeMessage.GetV1() returns a non-nil value. When the Version oneof contains WelcomePointer_ instead of V1_, GetV1() returns nil and GetData() returns nil, causing the function to succeed with messageData set to nil and resulting in silent data loss.

-	messageData := welcomeMessage.GetV1().GetData()
+	v1 := welcomeMessage.GetV1()
+	if v1 == nil {
+		return nil, false, errors.New("welcome message v1 missing")
+	}
+	messageData := v1.GetData()
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file pkg/xmtp/envelope.go around lines 112-133:

`parseV4WelcomeMessage` does not validate that `welcomeMessage.GetV1()` returns a non-nil value. When the `Version` oneof contains `WelcomePointer_` instead of `V1_`, `GetV1()` returns nil and `GetData()` returns nil, causing the function to succeed with `messageData` set to nil and resulting in silent data loss.

Evidence trail:
- pkg/xmtp/envelope.go:112-133 (REVIEWED_COMMIT) - `parseV4WelcomeMessage` function that calls `welcomeMessage.GetV1().GetData()` without validating the result
- pkg/proto/mls/api/v1/mls.pb.go:170-171 - Shows `WelcomeMessageInput` has Version oneof with `V1_` and `WelcomePointer_` variants
- pkg/proto/mls/api/v1/mls.pb.go:214-221 - `GetV1()` implementation returns nil when Version is not `WelcomeMessageInput_V1_`

}

func CreateTestDb() (*bun.DB, func()) {
func CreateTestDb(t *testing.T) (*bun.DB, func()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we are passing in t to this function, might as well use t.Cleanup rather than returning a cleanup function

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Notification Server needs to handle empty return when no filters specified

2 participants