From ea4bbeaf5d6b5a927c6f1c821ff781e1eb1bc703 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Mon, 16 Feb 2026 13:14:52 +0100 Subject: [PATCH 01/14] brontide: add context-aware Dial variant MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add DialContext alongside existing Dial. The new function accepts a context.Context and a context-aware dialer (DialContextFunc). The TCP connect phase respects context cancellation natively via the dialer. If the context is canceled during the Brontide handshake, a background goroutine closes the underlying connection, causing handshake reads/writes to fail immediately. The existing Dial function is left unchanged — non-persistent paths still use it. --- brontide/conn.go | 103 ++++++++++++++++++++++++++++ brontide/conn_test.go | 152 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 255 insertions(+) create mode 100644 brontide/conn_test.go diff --git a/brontide/conn.go b/brontide/conn.go index bf30e2659ee..03088fcbe2a 100644 --- a/brontide/conn.go +++ b/brontide/conn.go @@ -2,6 +2,7 @@ package brontide import ( "bytes" + "context" "io" "math" "net" @@ -107,6 +108,108 @@ func Dial(local keychain.SingleKeyECDH, netAddr *lnwire.NetAddress, return b, nil } +// DialContextFunc is a function that establishes a network connection +// with context cancellation support. +type DialContextFunc func(ctx context.Context, net, addr string) ( + net.Conn, error) + +// DialContext attempts to establish an encrypted+authenticated connection +// with the remote peer located at netAddr which has the remote public key +// embedded. Unlike Dial, DialContext accepts a context for cancellation +// and a context-aware dialer. The TCP connect phase respects context +// cancellation natively via the dialer. If the context is canceled during +// the Brontide handshake, the underlying connection is closed, causing +// handshake reads/writes to fail with an error. +func DialContext(ctx context.Context, local keychain.SingleKeyECDH, + netAddr *lnwire.NetAddress, dialer DialContextFunc) (*Conn, error) { + + ipAddr := netAddr.Address.String() + conn, err := dialer(ctx, "tcp", ipAddr) + if err != nil { + return nil, err + } + + b := &Conn{ + conn: conn, + noise: NewBrontideMachine(true, local, netAddr.IdentityKey), + } + + // Spawn a goroutine that closes the underlying connection if the + // context is canceled during the handshake. This causes any blocking + // Read/Write on the connection to return immediately. We signal done + // before returning on success to prevent the goroutine from closing a + // successfully established connection. + done := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + conn.Close() + case <-done: + } + }() + + // closeAndReturn is a helper to close the connection on handshake + // failure. It signals the cancel goroutine first to avoid a + // double-close race. + closeAndReturn := func(err error) (*Conn, error) { + close(done) + b.conn.Close() + return nil, err + } + + // Initiate the handshake by sending the first act to the receiver. + actOne, err := b.noise.GenActOne() + if err != nil { + return closeAndReturn(err) + } + if _, err := conn.Write(actOne[:]); err != nil { + return closeAndReturn(err) + } + + // We'll ensure that we get ActTwo from the remote peer in a timely + // manner. If they don't respond within handshakeReadTimeout, then + // we'll kill the connection. + err = conn.SetReadDeadline(time.Now().Add(handshakeReadTimeout)) + if err != nil { + return closeAndReturn(err) + } + + // If the first act was successful (we know that address is actually + // remotePub), then read the second act after which we'll be able to + // send our static public key to the remote peer with strong forward + // secrecy. + var actTwo [ActTwoSize]byte + if _, err := io.ReadFull(conn, actTwo[:]); err != nil { + return closeAndReturn(err) + } + if err := b.noise.RecvActTwo(actTwo); err != nil { + return closeAndReturn(err) + } + + // Finally, complete the handshake by sending over our encrypted static + // key and execute the final ECDH operation. + actThree, err := b.noise.GenActThree() + if err != nil { + return closeAndReturn(err) + } + if _, err := conn.Write(actThree[:]); err != nil { + return closeAndReturn(err) + } + + // We'll reset the deadline as it's no longer critical beyond the + // initial handshake. + err = conn.SetReadDeadline(time.Time{}) + if err != nil { + return closeAndReturn(err) + } + + // Signal the cancel goroutine before returning the connection to + // prevent it from closing a successfully established connection. + close(done) + + return b, nil +} + // ReadNextMessage uses the connection in a message-oriented manner, instructing // it to read the next _full_ message with the brontide stream. This function // will block until the read of the header and body succeeds. diff --git a/brontide/conn_test.go b/brontide/conn_test.go new file mode 100644 index 00000000000..1c9100c0bab --- /dev/null +++ b/brontide/conn_test.go @@ -0,0 +1,152 @@ +package brontide + +import ( + "context" + "net" + "testing" + "time" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/lightningnetwork/lnd/keychain" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/require" +) + +// TestDialContextHappyPath verifies that DialContext establishes an +// encrypted connection when the context is not canceled. +func TestDialContextHappyPath(t *testing.T) { + t.Parallel() + + listener, netAddr, err := makeListener() + require.NoError(t, err) + t.Cleanup(func() { listener.Close() }) + + remotePriv, err := btcec.NewPrivateKey() + require.NoError(t, err) + remoteKeyECDH := &keychain.PrivKeyECDH{PrivKey: remotePriv} + + // Accept connections in a goroutine. + acceptCh := make(chan net.Conn, 1) + acceptErrCh := make(chan error, 1) + go func() { + conn, err := listener.Accept() + if err != nil { + acceptErrCh <- err + return + } + acceptCh <- conn + }() + + ctx := t.Context() + dialer := func(ctx context.Context, network, + addr string) (net.Conn, error) { + + var d net.Dialer + return d.DialContext(ctx, network, addr) + } + + conn, err := DialContext(ctx, remoteKeyECDH, netAddr, dialer) + require.NoError(t, err) + require.NotNil(t, conn) + t.Cleanup(func() { conn.Close() }) + + // Wait for listener to accept. + select { + case accepted := <-acceptCh: + t.Cleanup(func() { accepted.Close() }) + + case err := <-acceptErrCh: + require.NoError(t, err) + + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for listener accept") + } +} + +// TestDialContextCanceledBeforeDial verifies that DialContext returns an error +// when the context is already canceled before the TCP dial begins. +func TestDialContextCanceledBeforeDial(t *testing.T) { + t.Parallel() + + remotePriv, err := btcec.NewPrivateKey() + require.NoError(t, err) + remoteKeyECDH := &keychain.PrivKeyECDH{PrivKey: remotePriv} + + // Create a listener just to get a valid address. + listener, netAddr, err := makeListener() + require.NoError(t, err) + t.Cleanup(func() { listener.Close() }) + + // Cancel the context before dialing. + ctx, cancel := context.WithCancel(t.Context()) + cancel() + + dialer := func(ctx context.Context, network, + addr string) (net.Conn, error) { + + var d net.Dialer + return d.DialContext(ctx, network, addr) + } + + conn, err := DialContext(ctx, remoteKeyECDH, netAddr, dialer) + require.ErrorContains(t, err, "operation was canceled") + require.Nil(t, conn) +} + +// TestDialContextCanceledDuringHandshake verifies that DialContext returns an +// error when the context is canceled after the TCP connection is established +// but during the Brontide handshake. +func TestDialContextCanceledDuringHandshake(t *testing.T) { + t.Parallel() + + remotePriv, err := btcec.NewPrivateKey() + require.NoError(t, err) + remoteKeyECDH := &keychain.PrivKeyECDH{PrivKey: remotePriv} + + // Use a raw TCP listener that accepts but never completes the + // handshake. This causes the dialer to block on the handshake read, + // giving us time to cancel the context. + rawListener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + t.Cleanup(func() { rawListener.Close() }) + + // Accept connections but never send handshake data. + go func() { + conn, err := rawListener.Accept() + if err != nil { + return + } + defer conn.Close() + + // Hold the connection open until the test ends. The context + // cancel will close the dialer side. + <-t.Context().Done() + }() + + serverPub, err := btcec.NewPrivateKey() + require.NoError(t, err) + + netAddr := &lnwire.NetAddress{ + IdentityKey: serverPub.PubKey(), + Address: rawListener.Addr().(*net.TCPAddr), + } + + ctx, cancel := context.WithCancel(t.Context()) + dialer := func(ctx context.Context, network, + addr string) (net.Conn, error) { + + var d net.Dialer + return d.DialContext(ctx, network, addr) + } + + // Cancel the context after a short delay to allow TCP connect to + // succeed but interrupt the handshake. + go func() { + time.Sleep(100 * time.Millisecond) + cancel() + }() + + conn, err := DialContext(ctx, remoteKeyECDH, netAddr, dialer) + require.ErrorContains(t, err, "use of closed network connection") + require.Nil(t, conn) +} From 1274b79728cbfdf7aac82bc6f07e7a9c6e683882 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Mon, 16 Feb 2026 13:17:55 +0100 Subject: [PATCH 02/14] server: extract nextPeerBackoff as standalone function Refactor nextPeerBackoff from a server method that reads s.persistentPeersBackoff into a pure function peerBackoff that takes the current backoff, start time, and configured bounds as arguments. The server method becomes a thin wrapper. This prepares the function for reuse by the connection worker without server access. --- server.go | 42 +++++++++++++++----- server_test.go | 102 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 135 insertions(+), 9 deletions(-) diff --git a/server.go b/server.go index 91b6624631c..6dcfd8dd128 100644 --- a/server.go +++ b/server.go @@ -3893,36 +3893,53 @@ func (s *server) nextPeerBackoff(pubStr string, return s.cfg.MinBackoff } + return peerBackoff( + backoff, startTime, s.cfg.MinBackoff, s.cfg.MaxBackoff, + defaultStableConnDuration, + ) +} + +// peerBackoff computes the next reconnection backoff for a persistent peer +// given the current backoff, the time the peer was last started, and the +// configured bounds. A zero startTime indicates the peer failed to start. +// Connections that lasted longer than stableConnDuration get their backoff +// reduced; short-lived connections double it. +func peerBackoff(currentBackoff time.Duration, startTime time.Time, + minBackoff, maxBackoff, + stableConnDuration time.Duration) time.Duration { + // If the peer failed to start properly, we'll just use the previous // backoff to compute the subsequent randomized exponential backoff // duration. This will roughly double on average. if startTime.IsZero() { - return computeNextBackoff(backoff, s.cfg.MaxBackoff) + return computeNextBackoff(currentBackoff, maxBackoff) } // The peer succeeded in starting. If the connection didn't last long // enough to be considered stable, we'll continue to back off retries // with this peer. connDuration := time.Since(startTime) - if connDuration < defaultStableConnDuration { - return computeNextBackoff(backoff, s.cfg.MaxBackoff) + if connDuration < stableConnDuration { + return computeNextBackoff(currentBackoff, maxBackoff) } - // The peer succeed in starting and this was stable peer, so we'll + // The peer succeeded in starting and this was a stable peer, so we'll // reduce the timeout duration by the length of the connection after // applying randomized exponential backoff. We'll only apply this in the // case that: - // reb(curBackoff) - connDuration > cfg.MinBackoff - relaxedBackoff := computeNextBackoff(backoff, s.cfg.MaxBackoff) - connDuration - if relaxedBackoff > s.cfg.MinBackoff { + // reb(curBackoff) - connDuration > minBackoff + relaxedBackoff := computeNextBackoff( + currentBackoff, maxBackoff, + ) - connDuration + if relaxedBackoff > minBackoff { return relaxedBackoff } - // Lastly, if reb(currBackoff) - connDuration <= cfg.MinBackoff, meaning + // Lastly, if reb(currBackoff) - connDuration <= minBackoff, meaning // the stable connection lasted much longer than our previous backoff. // To reward such good behavior, we'll reconnect after the default // timeout. - return s.cfg.MinBackoff + return minBackoff } // shouldDropLocalConnection determines if our local connection to a remote peer @@ -5193,6 +5210,13 @@ func computeNextBackoff(currBackoff, maxBackoff time.Duration) time.Duration { // avoid the nodes entering connection cycles. margin := nextBackoff / 10 + // If the margin is too small (0), skip randomization and just return + // the backoff, otherwise the rand.Int call panics. This can happen when + // nextBackoff < 10 time units. + if margin == 0 { + return nextBackoff + } + var wiggle big.Int wiggle.SetUint64(uint64(margin)) if _, err := rand.Int(rand.Reader, &wiggle); err != nil { diff --git a/server_test.go b/server_test.go index 0cb3643184f..643a7318ae9 100644 --- a/server_test.go +++ b/server_test.go @@ -139,3 +139,105 @@ func TestNodeAnnouncementTimestampComparison(t *testing.T) { }) } } + +// TestPeerBackoff tests the pure peerBackoff function with table-driven cases +// covering zero backoff, short-lived connections (doubles), stable connections +// (reduces), and capping at max. +func TestPeerBackoff(t *testing.T) { + t.Parallel() + + const ( + minBackoff = 1 * time.Second + maxBackoff = 1 * time.Hour + stableConnDuration = 10 * time.Minute + ) + + tests := []struct { + name string + currentBackoff time.Duration + startTime time.Time + assertBackoff func(t *testing.T, result time.Duration) + }{ + { + // Peer never started: backoff should roughly double + // (with randomization). + name: "zero start time doubles backoff", + currentBackoff: 10 * time.Second, + startTime: time.Time{}, + assertBackoff: func(t *testing.T, result time.Duration) { + // computeNextBackoff doubles with ±5% + // wiggle, so result should be roughly 20s. + require.Greater(t, result, + 15*time.Second, + "backoff too low for failed start") + require.Less(t, result, + 25*time.Second, + "backoff too high for failed start") + }, + }, + { + // Short-lived connection (< stableConnDuration): + // backoff should roughly double. + name: "short lived connection doubles backoff", + currentBackoff: 10 * time.Second, + startTime: time.Now().Add(-5 * time.Minute), + assertBackoff: func(t *testing.T, result time.Duration) { + require.Greater(t, result, + 15*time.Second, + "backoff too low for short conn") + require.Less(t, result, + 25*time.Second, + "backoff too high for short conn") + }, + }, + { + // Stable connection (> stableConnDuration) with + // large backoff: should reduce. reb(30m) ≈ 60m, + // minus 20m conn = ~40m, which is > minBackoff. + name: "stable connection reduces backoff", + currentBackoff: 30 * time.Minute, + startTime: time.Now().Add(-20 * time.Minute), + assertBackoff: func(t *testing.T, result time.Duration) { + require.Greater(t, result, minBackoff, + "should be above min") + require.Less(t, result, 50*time.Minute, + "should be reduced from doubled") + }, + }, + { + // Stable connection that lasted much longer than + // backoff: should return minBackoff. + // reb(1s) ≈ 2s, minus 1h conn → negative → min. + name: "long stable connection resets to min", + currentBackoff: minBackoff, + startTime: time.Now().Add(-1 * time.Hour), + assertBackoff: func(t *testing.T, result time.Duration) { + require.Equal(t, minBackoff, result) + }, + }, + { + // Backoff at max: doubling should cap at max. + name: "backoff caps at max", + currentBackoff: maxBackoff, + startTime: time.Time{}, + assertBackoff: func(t *testing.T, result time.Duration) { + // After capping + wiggle. + require.LessOrEqual(t, result, + maxBackoff+maxBackoff/10) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + result := peerBackoff( + tc.currentBackoff, tc.startTime, minBackoff, + maxBackoff, stableConnDuration, + ) + + tc.assertBackoff(t, result) + }) + } +} From 9a6c7b3795fc648712ce89690b9bb5720ffec726 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Mon, 16 Feb 2026 13:21:22 +0100 Subject: [PATCH 03/14] server: define connWorker types and command protocol MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Define the vocabulary for per-peer connection workers in a new file conn_worker.go: - connWorkerCmd enum: cmdConnect, cmdUpdateAddrs, cmdStandDown, cmdStop - connWorkerMsg struct: carries command with addresses and backoff - connWorkerCfg struct: injected dependencies (dialer, callbacks, timing) - connWorker struct: pubKeyStr, perm, cmdChan, backoff, addrs, cfg No constructor or methods yet — compile-only commit establishing the types that the run loop (next commit) will use. --- conn_worker.go | 107 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 conn_worker.go diff --git a/conn_worker.go b/conn_worker.go new file mode 100644 index 00000000000..be940be4be1 --- /dev/null +++ b/conn_worker.go @@ -0,0 +1,107 @@ +package lnd + +import ( + "context" + "net" + "time" + + "github.com/lightningnetwork/lnd/lnwire" +) + +// connWorkerCmd enumerates the commands that the server can send to a +// connection worker. +type connWorkerCmd uint8 + +const ( + // cmdConnect instructs the worker to begin dialing the provided + // addresses after waiting for the specified backoff duration. + cmdConnect connWorkerCmd = iota + + // cmdUpdateAddrs replaces the worker's address list. If a dial loop is + // active, it is restarted with the new addresses. + cmdUpdateAddrs + + // cmdStandDown cancels any in-progress dial and returns the worker to + // an idle state. Used when an inbound connection makes the outbound + // attempt unnecessary. + cmdStandDown + + // cmdStop terminates the worker goroutine permanently. + cmdStop +) + +// connWorkerMsg carries a command and its associated data from the server to a +// connection worker. +type connWorkerMsg struct { + // cmd is the command to execute. + cmd connWorkerCmd + + // addrs is the list of addresses to dial, used by cmdConnect and + // cmdUpdateAddrs. + addrs []*lnwire.NetAddress + + // backoff is the duration to wait before the first dial attempt. Only + // meaningful for cmdConnect. + backoff time.Duration +} + +// connWorkerCfg bundles the dependencies injected into a connection worker at +// creation time. All fields are set once and never modified. +type connWorkerCfg struct { + // dialContext establishes a Brontide connection to the given address. + // The context controls cancellation of the TCP dial and handshake. + dialContext func(ctx context.Context, + addr *lnwire.NetAddress) (net.Conn, error) + + // onConnection is called when a dial succeeds. The worker passes the + // raw connection and the address that was dialed. + onConnection func(conn net.Conn, addr *lnwire.NetAddress) + + // minBackoff is the floor for the exponential backoff. + minBackoff time.Duration + + // maxBackoff is the ceiling for the exponential backoff. + maxBackoff time.Duration + + // stableConnDuration is the minimum connection lifetime before backoff + // reduction is applied. + stableConnDuration time.Duration + + // staggerDelay is the pause between dialing successive addresses for + // the same peer. + staggerDelay time.Duration + + // quit is the server-wide shutdown signal. + quit chan struct{} +} + +// connWorker manages the dial/retry/backoff lifecycle for a single persistent +// peer. The server communicates with it exclusively through the cmdChan +// channel. A worker's existence in the server's persistentWorkers map means the +// peer is persistent; its removal means the peer has been disconnected or +// pruned. +type connWorker struct { + // pubKeyStr identifies the remote peer (compressed pubkey as raw + // string). + pubKeyStr string + + // perm indicates whether the user explicitly requested this connection + // via ConnectToPeer with perm=true. Non-perm workers are removed when + // the peer's last channel closes. + perm bool + + // cmdChan carries commands from the server to the worker's run loop. + // Buffered with capacity 1 so the server never blocks under normal + // operation. + cmdChan chan connWorkerMsg + + // backoff is the current reconnection delay, maintained locally by the + // worker across dial rounds. + backoff time.Duration + + // addrs is the current set of addresses to dial. + addrs []*lnwire.NetAddress + + // cfg holds the injected dependencies. + cfg connWorkerCfg +} From 48899c7fe40aa77947b327a77df30ca5d0a7ee60 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Mon, 16 Feb 2026 13:28:23 +0100 Subject: [PATCH 04/14] server: implement connWorker run loop Add the runtime for per-peer connection workers. A worker sits idle until told to connect, then dials each known address in sequence with a stagger delay, backing off exponentially on failure. Commands that arrive mid-dial cancel the in-progress attempt immediately via context cancellation and are processed without goroutine leaks. This gives the server a single point of control per peer: sending cmdConnect replaces whatever the worker is doing, cmdStandDown returns it to idle (used when an inbound connection arrives), and cmdStop tears it down permanently. --- conn_worker.go | 260 +++++++++++++++- conn_worker_test.go | 702 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 961 insertions(+), 1 deletion(-) create mode 100644 conn_worker_test.go diff --git a/conn_worker.go b/conn_worker.go index be940be4be1..9bf44d397ca 100644 --- a/conn_worker.go +++ b/conn_worker.go @@ -41,7 +41,9 @@ type connWorkerMsg struct { addrs []*lnwire.NetAddress // backoff is the duration to wait before the first dial attempt. Only - // meaningful for cmdConnect. + // meaningful for cmdConnect. A zero value means dial immediately and + // resets any prior exponential backoff — this is intentional for fresh + // connect requests (e.g. ConnectToPeer, startup). backoff time.Duration } @@ -105,3 +107,259 @@ type connWorker struct { // cfg holds the injected dependencies. cfg connWorkerCfg } + +// newConnWorker creates a connection worker for the given persistent peer. The +// worker is idle until it receives a command on cmdChan. The caller must start +// the worker via go w.Run(). +func newConnWorker(pubKeyStr string, perm bool, cfg connWorkerCfg) *connWorker { + return &connWorker{ + pubKeyStr: pubKeyStr, + perm: perm, + cmdChan: make(chan connWorkerMsg, 1), + backoff: cfg.minBackoff, + cfg: cfg, + } +} + +// Run is the main event loop for the connection worker. It waits for commands +// on cmdChan and dispatches them. cmdConnect starts a dial loop; cmdStandDown +// cancels an active dial and returns to idle; cmdStop exits the goroutine. Run +// returns when cmdStop is received or the server's quit channel is closed. +// +// NOTE: This method MUST be run as a goroutine. +func (w *connWorker) Run() { + for { + select { + case msg := <-w.cmdChan: + switch msg.cmd { + case cmdConnect: + w.addrs = msg.addrs + w.backoff = msg.backoff + if !w.dialLoop() { + return + } + + case cmdUpdateAddrs: + w.addrs = msg.addrs + + case cmdStandDown: + // Already idle, nothing to cancel. + + case cmdStop: + return + } + + case <-w.cfg.quit: + return + } + } +} + +// dialLoop runs successive dial rounds until a connection succeeds, a +// preempting command arrives, or the worker is stopped. Before the first round +// (and between subsequent rounds on failure), the worker waits for the current +// backoff duration. Returns true if the caller should continue the run loop +// (idle), false if the worker should exit (false) or be idled (true). +func (w *connWorker) dialLoop() bool { + for { + // Wait for the backoff before dialing. A zero backoff skips the + // wait entirely (used for immediate connects at startup). + if w.backoff > 0 { + timer := time.NewTimer(w.backoff) + + select { + case msg := <-w.cmdChan: + timer.Stop() + + switch msg.cmd { + case cmdConnect: + w.addrs = msg.addrs + w.backoff = msg.backoff + continue + + case cmdUpdateAddrs: + w.addrs = msg.addrs + + // TODO: Should we reset the backoff + // timer here? Currently we restart the + // full wait with the same duration. If + // the addresses changed, dialing sooner + // may be preferable. + continue + + case cmdStandDown: + return true + + case cmdStop: + return false + } + + case <-timer.C: + + case <-w.cfg.quit: + timer.Stop() + + return false + } + } + + ctx, cancel := context.WithCancel(context.Background()) + result := w.tryAllAddresses(ctx, cancel) + cancel() + + switch result { + case dialSuccess: + return true + + case dialRestart: + continue + + case dialStop: + return false + + case dialStandDown: + return true + + case dialFailed: + // All addresses failed. Increase backoff for the next + // round. If the current backoff is zero (first + // attempt), start from minBackoff. + if w.backoff < w.cfg.minBackoff { + w.backoff = w.cfg.minBackoff + } else { + w.backoff = computeNextBackoff( + w.backoff, w.cfg.maxBackoff, + ) + } + } + } +} + +// dialResult enumerates the outcomes of a dial round. +type dialResult uint8 + +const ( + // dialSuccess means a connection was established and delivered. + dialSuccess dialResult = iota + + // dialFailed means all addresses were tried without success. + dialFailed + + // dialRestart means a new cmdConnect or cmdUpdateAddrs arrived during + // the round, and the caller should restart. + dialRestart + + // dialStandDown means a cmdStandDown arrived and the worker should + // return to idle. + dialStandDown + + // dialStop means cmdStop or quit was received and the worker should + // exit. + dialStop +) + +// dialOutcome carries the result of a single dial attempt back from the +// goroutine to the select loop. +type dialOutcome struct { + conn net.Conn + err error +} + +// tryAllAddresses iterates over the worker's addresses, dialing each one. +// Between addresses it waits for the stagger delay, checking for preempting +// commands. Each dial runs in a goroutine so that the worker can respond to +// commands and quit signals during in-progress dials. The provided cancel +// function is called when a preempting command aborts the round. +func (w *connWorker) tryAllAddresses(ctx context.Context, + cancel context.CancelFunc) dialResult { + + for i, addr := range w.addrs { + // Stagger between addresses (skip for the first one). + if i > 0 { + timer := time.NewTimer(w.cfg.staggerDelay) + select { + case msg := <-w.cmdChan: + timer.Stop() + cancel() + + return w.handleMidDial(msg) + + case <-timer.C: + + case <-w.cfg.quit: + timer.Stop() + cancel() + + return dialStop + } + } + + srvrLog.Debugf("Dialing persistent peer %x addr=%v", + w.pubKeyStr, addr) + + // Run the dial in a goroutine so we can select on commands and + // quit concurrently. + resultCh := make(chan dialOutcome, 1) + go func() { + conn, err := w.cfg.dialContext(ctx, addr) + resultCh <- dialOutcome{conn: conn, err: err} + }() + + select { + case outcome := <-resultCh: + if outcome.err != nil { + srvrLog.Debugf("Failed to dial %v for peer "+ + "%x: %v", addr, w.pubKeyStr, + outcome.err) + continue + } + + // Dial succeeded. Deliver the connection. + w.cfg.onConnection(outcome.conn, addr) + + return dialSuccess + + case msg := <-w.cmdChan: + cancel() + // Wait for the dial goroutine to finish so we don't + // leak it. + <-resultCh + + return w.handleMidDial(msg) + + case <-w.cfg.quit: + cancel() + <-resultCh + + return dialStop + } + } + + return dialFailed +} + +// handleMidDial processes a command that arrived during a stagger wait or +// between dial attempts, returning the appropriate dialResult. +func (w *connWorker) handleMidDial(msg connWorkerMsg) dialResult { + switch msg.cmd { + case cmdConnect: + w.addrs = msg.addrs + w.backoff = msg.backoff + + return dialRestart + + case cmdUpdateAddrs: + w.addrs = msg.addrs + + return dialRestart + + case cmdStandDown: + return dialStandDown + + case cmdStop: + return dialStop + + default: + return dialFailed + } +} diff --git a/conn_worker_test.go b/conn_worker_test.go new file mode 100644 index 00000000000..90db3afbbce --- /dev/null +++ b/conn_worker_test.go @@ -0,0 +1,702 @@ +package lnd + +import ( + "context" + "errors" + "math/rand/v2" + "net" + "sync" + "testing" + "time" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/require" +) + +// testAddr creates a test lnwire.NetAddress with the given IP string. +func testAddr(t *testing.T, ip string, + pub *btcec.PublicKey) *lnwire.NetAddress { + + t.Helper() + + return &lnwire.NetAddress{ + IdentityKey: pub, + Address: &net.TCPAddr{ + IP: net.ParseIP(ip), + Port: 9735, + }, + } +} + +// dialCall records the address and timestamp of a mock dial attempt. +type dialCall struct { + addr *lnwire.NetAddress + time time.Time +} + +// mockDialer is a controllable dialer for testing connWorker. Each dial blocks +// until the test sends a result on resultCh, allowing precise control over +// timing. +type mockDialer struct { + t *testing.T + mu sync.Mutex + calls []dialCall + resultCh chan error +} + +func newMockDialer(t *testing.T) *mockDialer { + return &mockDialer{ + t: t, + resultCh: make(chan error, 1), + } +} + +func (d *mockDialer) dial(ctx context.Context, addr *lnwire.NetAddress) ( + net.Conn, error) { + + d.mu.Lock() + d.calls = append(d.calls, dialCall{addr: addr, time: time.Now()}) + d.mu.Unlock() + + select { + case err := <-d.resultCh: + if err != nil { + return nil, err + } + + // Return a pipe conn as a stand-in for a real connection. + client, server := net.Pipe() + d.t.Cleanup(func() { server.Close() }) + + return client, nil + + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +func (d *mockDialer) getCallCount() int { + d.mu.Lock() + defer d.mu.Unlock() + + return len(d.calls) +} + +func (d *mockDialer) getCalls() []dialCall { + d.mu.Lock() + defer d.mu.Unlock() + + out := make([]dialCall, len(d.calls)) + copy(out, d.calls) + + return out +} + +// testWorkerHarness bundles a connWorker and its test dependencies. +type testWorkerHarness struct { + worker *connWorker + dialer *mockDialer + connCh chan net.Conn + quit chan struct{} + pub *btcec.PublicKey + stopped chan struct{} +} + +func newTestWorkerHarness(t *testing.T) *testWorkerHarness { + t.Helper() + + priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + pub := priv.PubKey() + + d := newMockDialer(t) + connCh := make(chan net.Conn, 1) + quit := make(chan struct{}) + + cfg := connWorkerCfg{ + dialContext: d.dial, + onConnection: func(conn net.Conn, addr *lnwire.NetAddress) { + connCh <- conn + }, + minBackoff: 100 * time.Millisecond, + maxBackoff: 2 * time.Second, + stableConnDuration: 10 * time.Minute, + staggerDelay: 50 * time.Millisecond, + quit: quit, + } + + pubStr := string(pub.SerializeCompressed()) + w := newConnWorker(pubStr, false, cfg) + stopped := make(chan struct{}) + + return &testWorkerHarness{ + worker: w, + dialer: d, + connCh: connCh, + quit: quit, + pub: pub, + stopped: stopped, + } +} + +// start launches the worker goroutine and returns a channel that closes when +// Run exits. +func (h *testWorkerHarness) start() { + go func() { + h.worker.Run() + close(h.stopped) + }() +} + +// sendCmd sends a command to the worker, failing if it blocks. +func (h *testWorkerHarness) sendCmd(t *testing.T, msg connWorkerMsg) { + t.Helper() + + select { + case h.worker.cmdChan <- msg: + case <-time.After(2 * time.Second): + t.Fatal("timed out sending command to worker") + } +} + +// waitStopped waits for the worker goroutine to exit. +func (h *testWorkerHarness) waitStopped(t *testing.T) { + t.Helper() + + select { + case <-h.stopped: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for worker to stop") + } +} + +// TestConnWorkerDialsAllAddresses verifies that the worker dials each address +// in order with the stagger delay between them. +func TestConnWorkerDialsAllAddresses(t *testing.T) { + t.Parallel() + + h := newTestWorkerHarness(t) + h.start() + + addrs := []*lnwire.NetAddress{ + testAddr(t, "1.1.1.1", h.pub), + testAddr(t, "2.2.2.2", h.pub), + testAddr(t, "3.3.3.3", h.pub), + } + + now := time.Now() + h.sendCmd(t, connWorkerMsg{ + cmd: cmdConnect, + addrs: addrs, + }) + + // First address: fail it immediately. + h.dialer.resultCh <- errors.New("refused") + + // Second address: fail it too. + h.dialer.resultCh <- errors.New("refused") + + // Third address: succeed. + h.dialer.resultCh <- nil + + // Expect the connection to be delivered. + select { + case conn := <-h.connCh: + require.NotNil(t, conn) + conn.Close() + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for connection") + } + + // TODO: Assert that total time should be at least 2x the stagger delay + // due to the two failures. + t.Logf("total time to dial all last address: %v", time.Since(now)) + + // Verify all 3 addresses were dialed in order. + calls := h.dialer.getCalls() + require.Len(t, calls, 3) + require.Equal(t, "1.1.1.1:9735", calls[0].addr.Address.String()) + require.Equal(t, "2.2.2.2:9735", calls[1].addr.Address.String()) + require.Equal(t, "3.3.3.3:9735", calls[2].addr.Address.String()) + + // Stop the worker. + h.sendCmd(t, connWorkerMsg{cmd: cmdStop}) + h.waitStopped(t) +} + +// TestConnWorkerStandDownCancelsDial verifies that cmdStandDown cancels an +// in-progress dial and returns the worker to idle. +func TestConnWorkerStandDownCancelsDial(t *testing.T) { + t.Parallel() + + h := newTestWorkerHarness(t) + h.start() + + addrs := []*lnwire.NetAddress{ + testAddr(t, "1.1.1.1", h.pub), + } + + h.sendCmd(t, connWorkerMsg{ + cmd: cmdConnect, + addrs: addrs, + }) + + // Wait for dial to start. + require.Eventually( + t, + func() bool { + return h.dialer.getCallCount() >= 1 + }, + 2*time.Second, + 10*time.Millisecond, + ) + + // Send stand down — the dial should be canceled via context. + h.sendCmd(t, connWorkerMsg{cmd: cmdStandDown}) + + // The worker should be idle, not stopped. Send another connect to + // verify it's alive. + h.sendCmd(t, connWorkerMsg{ + cmd: cmdConnect, + addrs: addrs, + }) + + // Wait for the second dial to start before sending a result. + require.Eventually( + t, + func() bool { + return h.dialer.getCallCount() >= 2 + }, + 2*time.Second, + 10*time.Millisecond, + ) + + // Let the second dial succeed. + h.dialer.resultCh <- nil + select { + case conn := <-h.connCh: + require.NotNil(t, conn) + conn.Close() + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for connection after stand down") + } + + h.sendCmd(t, connWorkerMsg{cmd: cmdStop}) + h.waitStopped(t) +} + +// TestConnWorkerCmdStopTerminates verifies that cmdStop exits the worker +// goroutine from any state. +func TestConnWorkerCmdStopTerminates(t *testing.T) { + t.Parallel() + + h := newTestWorkerHarness(t) + h.start() + + // Stop immediately without any connect command. + h.sendCmd(t, connWorkerMsg{cmd: cmdStop}) + h.waitStopped(t) +} + +// TestConnWorkerCmdStopDuringDial verifies that cmdStop terminates the worker +// even when a dial is in progress. +func TestConnWorkerCmdStopDuringDial(t *testing.T) { + t.Parallel() + + h := newTestWorkerHarness(t) + h.start() + + h.sendCmd( + t, connWorkerMsg{ + cmd: cmdConnect, + addrs: []*lnwire.NetAddress{ + testAddr(t, "1.1.1.1", h.pub), + testAddr(t, "2.2.2.2", h.pub), + }, + }, + ) + + // Fail first dial so the worker moves to stagger wait. + h.dialer.resultCh <- errors.New("refused") + + // During stagger wait, send stop. + h.sendCmd(t, connWorkerMsg{cmd: cmdStop}) + h.waitStopped(t) +} + +// TestConnWorkerUpdateAddrsDuringDial verifies that cmdUpdateAddrs during a +// stagger wait restarts the round with new addresses. +func TestConnWorkerUpdateAddrsDuringDial(t *testing.T) { + t.Parallel() + + h := newTestWorkerHarness(t) + + // Use a longer stagger so we can intercept it. + h.worker.cfg.staggerDelay = 500 * time.Millisecond + h.start() + + addrs1 := []*lnwire.NetAddress{ + testAddr(t, "1.1.1.1", h.pub), + testAddr(t, "2.2.2.2", h.pub), + } + + h.sendCmd(t, connWorkerMsg{ + cmd: cmdConnect, + addrs: addrs1, + }) + + // Fail first dial so worker enters stagger wait. + h.dialer.resultCh <- errors.New("refused") + + // During stagger wait, update addresses. The following sleep is needed + // to enter stagget wait, otherwise we will update the addrs immediately + // and then attemt a new round of dials. It's not strictly needed, but + // makes sure to test another path. + time.Sleep(50 * time.Millisecond) + newAddrs := []*lnwire.NetAddress{ + testAddr(t, "9.9.9.9", h.pub), + } + h.sendCmd(t, connWorkerMsg{ + cmd: cmdUpdateAddrs, + addrs: newAddrs, + }) + + // The worker should restart and dial the new address. + h.dialer.resultCh <- nil + select { + case conn := <-h.connCh: + require.NotNil(t, conn) + conn.Close() + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for connection") + } + + // Verify the new address was dialed. + calls := h.dialer.getCalls() + lastCall := calls[len(calls)-1] + require.Equal(t, "9.9.9.9:9735", lastCall.addr.Address.String()) + + h.sendCmd(t, connWorkerMsg{cmd: cmdStop}) + h.waitStopped(t) +} + +// TestConnWorkerBackoffDoublesOnFailure verifies that the worker retries with +// increasing delays after failed dials. With minBackoff=100ms the expected +// gaps are ~100ms, ~200ms, ~400ms (each doubled, ±5% jitter). We assert each +// gap is at least half the nominal value and that gaps grow monotonically. +func TestConnWorkerBackoffDoublesOnFailure(t *testing.T) { + t.Parallel() + + h := newTestWorkerHarness(t) + h.worker.cfg.staggerDelay = 0 + h.start() + + addrs := []*lnwire.NetAddress{ + testAddr(t, "1.1.1.1", h.pub), + } + + // Connect with zero backoff — first dial is immediate. + h.sendCmd(t, connWorkerMsg{ + cmd: cmdConnect, + addrs: addrs, + }) + + // Fail 3 rounds so backoff increases: 0 → 100ms → ~200ms → ~400ms. + for i := range 3 { + require.Eventually( + t, + func() bool { + return h.dialer.getCallCount() > i + }, + 5*time.Second, + 10*time.Millisecond, + ) + h.dialer.resultCh <- errors.New("refused") + } + + // Wait for the 4th dial to start, then succeed. + require.Eventually( + t, + func() bool { + return h.dialer.getCallCount() >= 4 + }, + 5*time.Second, + 10*time.Millisecond, + ) + h.dialer.resultCh <- nil + + select { + case conn := <-h.connCh: + conn.Close() + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for connection") + } + + // Verify the backoff gaps between consecutive dials. The sequence + // of waits before each dial is: 0, 100ms, ~200ms, ~400ms. + calls := h.dialer.getCalls() + require.Len(t, calls, 4) + + for i := 1; i < len(calls); i++ { + gap := calls[i].time.Sub(calls[i-1].time) + t.Logf("gap[%d→%d] = %v", i-1, i, gap) + } + + // Gap 1→2 should be ~100ms (minBackoff). Allow ≥50ms. + gap1 := calls[1].time.Sub(calls[0].time) + require.GreaterOrEqual(t, gap1, 50*time.Millisecond, + "first backoff too short") + + // Gap 2→3 should be ~200ms. Assert ≥100ms and > gap1. + gap2 := calls[2].time.Sub(calls[1].time) + require.GreaterOrEqual(t, gap2, 100*time.Millisecond, + "second backoff too short") + require.Greater(t, gap2, gap1, + "backoff should increase") + + // Gap 3→4 should be ~400ms. Assert ≥200ms and > gap2. + gap3 := calls[3].time.Sub(calls[2].time) + require.GreaterOrEqual(t, gap3, 200*time.Millisecond, + "third backoff too short") + require.Greater(t, gap3, gap2, + "backoff should keep increasing") + + h.sendCmd(t, connWorkerMsg{cmd: cmdStop}) + h.waitStopped(t) +} + +// TestConnWorkerIdleAfterSuccess verifies that after a successful dial, the +// worker returns to idle and waits for the next command. +func TestConnWorkerIdleAfterSuccess(t *testing.T) { + t.Parallel() + + h := newTestWorkerHarness(t) + h.start() + + addrs := []*lnwire.NetAddress{ + testAddr(t, "1.1.1.1", h.pub), + } + + // First connect: succeed immediately. + h.sendCmd(t, connWorkerMsg{ + cmd: cmdConnect, + addrs: addrs, + }) + h.dialer.resultCh <- nil + + select { + case conn := <-h.connCh: + conn.Close() + case <-time.After(2 * time.Second): + t.Fatal("timed out") + } + + // Worker should be idle. No more dials should happen for a while. + startCount := h.dialer.getCallCount() + time.Sleep(200 * time.Millisecond) + require.Equal(t, startCount, h.dialer.getCallCount()) + + // A second connect should work. + h.sendCmd(t, connWorkerMsg{ + cmd: cmdConnect, + addrs: addrs, + }) + h.dialer.resultCh <- nil + select { + case conn := <-h.connCh: + conn.Close() + case <-time.After(2 * time.Second): + t.Fatal("timed out on second connect") + } + require.Greater(t, h.dialer.getCallCount(), startCount) + + h.sendCmd(t, connWorkerMsg{cmd: cmdStop}) + h.waitStopped(t) +} + +// TestConnWorkerQuitTerminates verifies that closing the quit channel +// terminates the worker from some state. +func TestConnWorkerQuitTerminates(t *testing.T) { + t.Parallel() + + h := newTestWorkerHarness(t) + h.start() + + // Start a dial. + h.sendCmd( + t, connWorkerMsg{ + cmd: cmdConnect, + addrs: []*lnwire.NetAddress{ + testAddr(t, "1.1.1.1", h.pub), + }, + }, + ) + + // Wait for dial to begin. + require.Eventually( + t, + func() bool { + return h.dialer.getCallCount() >= 1 + }, + 2*time.Second, + 10*time.Millisecond, + ) + + // Close quit channel. + close(h.quit) + h.waitStopped(t) +} + +// TestConnWorkerConnectWithBackoff verifies that when cmdConnect includes a +// non-zero backoff, the worker waits before dialing. +func TestConnWorkerConnectWithBackoff(t *testing.T) { + t.Parallel() + + h := newTestWorkerHarness(t) + h.start() + startTime := time.Now() + + addrs := []*lnwire.NetAddress{ + testAddr(t, "1.1.1.1", h.pub), + } + + // Send connect with a 200ms backoff. + h.sendCmd( + t, connWorkerMsg{ + cmd: cmdConnect, + addrs: addrs, + backoff: 200 * time.Millisecond, + }, + ) + + // No dial should happen during the backoff window. + time.Sleep(100 * time.Millisecond) + require.Equal( + t, 0, h.dialer.getCallCount(), "should not dial during backoff", + ) + + // After the backoff elapses, the dial should start. + require.Eventually( + t, + func() bool { + return h.dialer.getCallCount() >= 1 + }, + 2*time.Second, + 10*time.Millisecond, + ) + + // Log the elapsed time to verify it's at least the backoff duration. + timeDial := h.dialer.getCalls()[0].time.Sub(startTime) + require.GreaterOrEqual( + t, timeDial, 200*time.Millisecond, + "dial should happen after backoff", + ) + + // Succeed on the dial. + h.dialer.resultCh <- nil + select { + case conn := <-h.connCh: + conn.Close() + case <-time.After(2 * time.Second): + t.Fatal("timed out") + } + + h.sendCmd(t, connWorkerMsg{cmd: cmdStop}) + h.waitStopped(t) +} + +// TestConnWorkerRapidCycling bombards the worker with a random stream of +// commands to verify it never deadlocks or gets stuck regardless of ordering. +func TestConnWorkerRapidCycling(t *testing.T) { + t.Parallel() + + priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + pub := priv.PubKey() + + quit := make(chan struct{}) + cfg := connWorkerCfg{ + // Dialer that always fails immediately — no channel + // coordination needed. This lets the worker churn through + // dial attempts at full speed. + dialContext: func(ctx context.Context, + addr *lnwire.NetAddress) (net.Conn, error) { + + return nil, errors.New("refused") + }, + onConnection: func(net.Conn, *lnwire.NetAddress) {}, + minBackoff: time.Millisecond, + maxBackoff: 5 * time.Millisecond, + stableConnDuration: 10 * time.Minute, + staggerDelay: time.Millisecond, + quit: quit, + } + + pubStr := string(pub.SerializeCompressed()) + w := newConnWorker(pubStr, false, cfg) + stopped := make(chan struct{}) + go func() { + w.Run() + close(stopped) + }() + + addrs := []*lnwire.NetAddress{ + testAddr(t, "1.1.1.1", pub), + testAddr(t, "2.2.2.2", pub), + testAddr(t, "3.3.3.3", pub), + } + + send := func(msg connWorkerMsg) { + t.Helper() + select { + case w.cmdChan <- msg: + case <-time.After(5 * time.Second): + t.Fatal("timed out sending command — worker stuck") + } + } + + // Bombard with random commands. The worker must handle any sequence + // without deadlocking — redundant stand-downs while idle, back-to-back + // connects, addr updates with no active dial, etc. + for range 200 { + switch rand.IntN(5) { + case 0: + send(connWorkerMsg{ + cmd: cmdConnect, + addrs: addrs[:1+rand.IntN(len(addrs))], + }) + + case 1: + send(connWorkerMsg{ + cmd: cmdConnect, + addrs: addrs[:1+rand.IntN(len(addrs))], + backoff: time.Duration( + rand.IntN(3), + ) * time.Millisecond, + }) + + case 2: + send(connWorkerMsg{ + cmd: cmdUpdateAddrs, + addrs: addrs[:1+rand.IntN(len(addrs))], + }) + + case 3: + send(connWorkerMsg{cmd: cmdStandDown}) + + case 4: + // Double stand-down — tests the idle no-op path. + send(connWorkerMsg{cmd: cmdStandDown}) + send(connWorkerMsg{cmd: cmdStandDown}) + } + } + + // The worker must always terminate on cmdStop. + send(connWorkerMsg{cmd: cmdStop}) + select { + case <-stopped: + case <-time.After(10 * time.Second): + t.Fatal("worker did not terminate — likely stuck") + } +} From 619b7bb82eadc580a3b74b6615ad37976f4650f8 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Mon, 16 Feb 2026 13:32:05 +0100 Subject: [PATCH 05/14] server: introduce persistentWorkers map and server helpers Introduce the server-side plumbing that connects the connWorker goroutines to the rest of the server. Each persistent peer gets an entry in the new persistentWorkers map; getOrCreateWorker lazily creates workers and starts their Run loop under the server WaitGroup, while stopWorker and sendWorkerCmd give callers a uniform way to control them. Wire cmdStandDown into InboundPeerConnected and OutboundPeerConnected so that workers yield when the connection they are dialing for is already being handled. Both old (connmgr) and new (worker) paths run in parallel during the transition. --- server.go | 102 +++++++++++++++++++++++++++++++++++++++++++++++-- server_test.go | 102 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 201 insertions(+), 3 deletions(-) diff --git a/server.go b/server.go index 6dcfd8dd128..2e04924e16b 100644 --- a/server.go +++ b/server.go @@ -291,6 +291,11 @@ type server struct { persistentConnReqs map[string][]*connmgr.ConnReq persistentRetryCancels map[string]chan struct{} + // persistentWorkers maps each persistent peer's pubkey to its + // connection worker. A worker's existence means the peer is persistent; + // its backoff, addresses, and dial state live inside the worker. + persistentWorkers map[string]*connWorker + // peerErrors keeps a set of peer error buffers for peers that have // disconnected from us. This allows us to track historic peer errors // over connections. The string of the peer's compressed pubkey is used @@ -716,6 +721,7 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, persistentConnReqs: make(map[string][]*connmgr.ConnReq), persistentPeerAddrs: make(map[string][]*lnwire.NetAddress), persistentRetryCancels: make(map[string]chan struct{}), + persistentWorkers: make(map[string]*connWorker), peerErrors: make(map[string]*queue.CircularBuffer), ignorePeerTermination: make(map[*peer.Brontide]struct{}), scheduledPeerConnection: make(map[string]func()), @@ -3710,6 +3716,94 @@ func (s *server) bannedPersistentPeerConnection(remotePub string) { } } +// getOrCreateWorker returns the existing connection worker for the +// given peer, or creates and starts a new one. The caller must hold +// the server mutex. +func (s *server) getOrCreateWorker(pubStr string, perm bool, + addrs []*lnwire.NetAddress) *connWorker { + + if w, ok := s.persistentWorkers[pubStr]; ok { + // Upgrade to perm if requested. + if perm && !w.perm { + w.perm = true + } + return w + } + + cfg := connWorkerCfg{ + dialContext: func(ctx context.Context, + addr *lnwire.NetAddress) (net.Conn, error) { + + dialer := func(ctx context.Context, network, + ipAddr string) (net.Conn, error) { + + return s.cfg.net.Dial( + network, ipAddr, tor.DefaultConnTimeout, + ) + } + + return brontide.DialContext( + ctx, s.identityECDH, addr, dialer, + ) + }, + onConnection: func(conn net.Conn, + addr *lnwire.NetAddress) { + + s.OutboundPeerConnected(nil, conn) + }, + minBackoff: s.cfg.MinBackoff, + maxBackoff: s.cfg.MaxBackoff, + stableConnDuration: defaultStableConnDuration, + staggerDelay: multiAddrConnectionStagger, + quit: s.quit, + } + + w := newConnWorker(pubStr, perm, cfg) + w.addrs = addrs + s.persistentWorkers[pubStr] = w + + s.wg.Go(w.Run) + + return w +} + +// stopWorker sends cmdStop to the worker for the given peer and removes it from +// the map. The caller must hold the server mutex. No-op if no worker exists. +func (s *server) stopWorker(pubStr string) { + w, ok := s.persistentWorkers[pubStr] + if !ok { + return + } + + select { + case w.cmdChan <- connWorkerMsg{cmd: cmdStop}: + default: + srvrLog.Warnf("Worker cmdChan full for peer %x, "+ + "stop is not processed", pubStr) + } + + delete(s.persistentWorkers, pubStr) +} + +// sendWorkerCmd sends a command to the worker for the given peer. Returns false +// if no worker exists. The caller must hold the server mutex (at least RLock). +func (s *server) sendWorkerCmd(pubStr string, msg connWorkerMsg) bool { + + w, ok := s.persistentWorkers[pubStr] + if !ok { + return false + } + + select { + case w.cmdChan <- msg: + default: + srvrLog.Warnf("Worker cmdChan full for peer %x, "+ + "command %v is not processed", pubStr, msg.cmd) + } + + return true +} + // BroadcastMessage sends a request to the server to broadcast a set of // messages to all peers other than the one specified by the `skips` parameter. // All messages sent via BroadcastMessage will be queued for lazy delivery to @@ -4013,6 +4107,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) { // We were unable to locate an existing connection with the // target peer, proceed to connect. s.cancelConnReqs(pubStr, nil) + s.sendWorkerCmd(pubStr, connWorkerMsg{cmd: cmdStandDown}) s.peerConnected(conn, nil, true) case nil: @@ -4043,6 +4138,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) { srvrLog.DebugS(ctx, "Disconnecting stale connection") s.cancelConnReqs(pubStr, nil) + s.sendWorkerCmd(pubStr, connWorkerMsg{cmd: cmdStandDown}) // Remove the current peer from the server's internal state and // signal that the peer termination watcher does not need to @@ -4082,9 +4178,9 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) "ignoring outbound connection from local=%v, remote=%v", p, conn.LocalAddr(), conn.RemoteAddr()) - if connReq != nil { - s.connMgr.Remove(connReq.ID()) - } + s.cancelConnReqs(pubStr, nil) + s.sendWorkerCmd(pubStr, connWorkerMsg{cmd: cmdStandDown}) + conn.Close() return } diff --git a/server_test.go b/server_test.go index 643a7318ae9..5f5d352aab7 100644 --- a/server_test.go +++ b/server_test.go @@ -4,9 +4,49 @@ import ( "testing" "time" + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/connmgr" + "github.com/lightningnetwork/lnd/lncfg" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/peer" "github.com/stretchr/testify/require" ) +// newTestServer creates a minimal server instance with only the fields needed +// for persistent connection management tests. +func newTestServer(t *testing.T) *server { + t.Helper() + + s := &server{ + cfg: &Config{ + MinBackoff: time.Second, + Dev: &lncfg.DevConfig{}, + }, + persistentPeers: make(map[string]bool), + persistentPeersBackoff: make(map[string]time.Duration), + persistentConnReqs: make(map[string][]*connmgr.ConnReq), + persistentPeerAddrs: make(map[string][]*lnwire.NetAddress), + persistentRetryCancels: make(map[string]chan struct{}), + persistentWorkers: make(map[string]*connWorker), + peersByPub: make(map[string]*peer.Brontide), + inboundPeers: make(map[string]*peer.Brontide), + outboundPeers: make(map[string]*peer.Brontide), + ignorePeerTermination: make(map[*peer.Brontide]struct{}), + scheduledPeerConnection: make(map[string]func()), + quit: make(chan struct{}), + } + + return s +} + +// generateTestPubKey creates a new random public key for testing. +func generateTestPubKey(t *testing.T) *btcec.PublicKey { + t.Helper() + priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + return priv.PubKey() +} + // TestNodeAnnouncementTimestampComparison tests the timestamp comparison // logic used in setSelfNode to ensure node announcements have strictly // increasing timestamps at second precision (as required by BOLT-07 and @@ -241,3 +281,65 @@ func TestPeerBackoff(t *testing.T) { }) } } + +// TestGetOrCreateWorker verifies that getOrCreateWorker creates a new worker on +// first call and returns the existing one on subsequent calls. +func TestGetOrCreateWorker(t *testing.T) { + t.Parallel() + + s := newTestServer(t) + + pubKey := generateTestPubKey(t) + pubStr := string(pubKey.SerializeCompressed()) + + // First call creates a new worker. + w := s.getOrCreateWorker(pubStr, false, nil) + require.NotNil(t, w) + + _, ok := s.persistentWorkers[pubStr] + require.True(t, ok, "worker should be in map") + + // Second call returns the same worker. + w2 := s.getOrCreateWorker(pubStr, false, nil) + require.Equal(t, w, w2, "should return same worker") + + // Upgrade to perm. + w3 := s.getOrCreateWorker(pubStr, true, nil) + require.Equal(t, w, w3) + require.True(t, w.perm, "should be upgraded to perm") + + // Clean up. + s.stopWorker(pubStr) +} + +// TestStopWorker verifies that stopWorker removes the worker from the map and +// sends cmdStop. +func TestStopWorker(t *testing.T) { + t.Parallel() + + s := newTestServer(t) + + pubKey := generateTestPubKey(t) + pubStr := string(pubKey.SerializeCompressed()) + + // Create a worker. + s.getOrCreateWorker(pubStr, false, nil) + require.Contains(t, s.persistentWorkers, pubStr) + + // Stop it. + s.stopWorker(pubStr) + require.NotContains(t, s.persistentWorkers, pubStr) +} + +// TestSendWorkerCmdNoWorker verifies that sendWorkerCmd returns false when no +// worker exists for the peer. +func TestSendWorkerCmdNoWorker(t *testing.T) { + t.Parallel() + + s := newTestServer(t) + + ok := s.sendWorkerCmd("nonexistent", connWorkerMsg{ + cmd: cmdStandDown, + }) + require.False(t, ok) +} From ed9a72ae50f99d0dfd8b1510596d3fbbebd66d50 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Mon, 16 Feb 2026 13:33:37 +0100 Subject: [PATCH 06/14] server: migrate establishPersistentConnections to workers Replace the map writes (persistentPeers, persistentPeersBackoff, persistentPeerAddrs) and connectToPersistentPeer/delayInitialReconnect calls with getOrCreateWorker + cmdConnect. Startup stagger is preserved: first numInstantInitReconnect peers get cmdConnect immediately, the rest get a delayed send in a goroutine (random 0-30s). The old maps are no longer written from this function; other callers still use them during the transition period. --- server.go | 57 ++++++++++++++++++++++++++----------------------------- 1 file changed, 27 insertions(+), 30 deletions(-) diff --git a/server.go b/server.go index 2e04924e16b..579028fe2bd 100644 --- a/server.go +++ b/server.go @@ -3620,42 +3620,39 @@ func (s *server) establishPersistentConnections(ctx context.Context) error { // node announcements and attempt to reconnect to each node. var numOutboundConns int for pubStr, nodeAddr := range nodeAddrsMap { - // Add this peer to the set of peers we should maintain a - // persistent connection with. We set the value to false to - // indicate that we should not continue to reconnect if the - // number of channels returns to zero, since this peer has not - // been requested as perm by the user. - s.persistentPeers[pubStr] = false - if _, ok := s.persistentPeersBackoff[pubStr]; !ok { - s.persistentPeersBackoff[pubStr] = s.cfg.MinBackoff - } - + // Build the address list for this peer. + addrs := make([]*lnwire.NetAddress, 0, + len(nodeAddr.addresses)) for _, address := range nodeAddr.addresses { - // Create a wrapper address which couples the IP and - // the pubkey so the brontide authenticated connection - // can be established. - lnAddr := &lnwire.NetAddress{ + addrs = append(addrs, &lnwire.NetAddress{ IdentityKey: nodeAddr.pubKey, Address: address, - } - - s.persistentPeerAddrs[pubStr] = append( - s.persistentPeerAddrs[pubStr], lnAddr) + }) } - // We'll connect to the first 10 peers immediately, then - // randomly stagger any remaining connections if the - // stagger initial reconnect flag is set. This ensures - // that mobile nodes or nodes with a small number of - // channels obtain connectivity quickly, but larger - // nodes are able to disperse the costs of connecting to - // all peers at once. - if numOutboundConns < numInstantInitReconnect || - !s.cfg.StaggerInitialReconnect { + // Create the worker for this peer (not perm — it was + // discovered from the channel graph, not requested by + // the user). + w := s.getOrCreateWorker(pubStr, false, addrs) - go s.connectToPersistentPeer(pubStr) - } else { - go s.delayInitialReconnect(pubStr) + // We'll connect to the first 10 peers immediately, + // then randomly stagger any remaining connections if + // the stagger initial reconnect flag is set. The + // worker handles the delay via its backoff wait, + // which also responds to commands during the wait. + var delay time.Duration + if numOutboundConns >= numInstantInitReconnect && + s.cfg.StaggerInitialReconnect { + + delay = time.Duration( + prand.Intn(maxInitReconnectDelay), + ) * time.Second + } + + w.cmdChan <- connWorkerMsg{ + cmd: cmdConnect, + addrs: addrs, + backoff: delay, } numOutboundConns++ From 6bdb36908b0867e42bf969db98fc3fe796963afe Mon Sep 17 00:00:00 2001 From: bitromortac Date: Mon, 16 Feb 2026 13:35:14 +0100 Subject: [PATCH 07/14] server: migrate gossip address updates to workers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the gossip handler receives a node announcement with new addresses, we need to update the dial targets for any active connection worker. Previously this required coordinating three server maps: checking persistentPeers for membership, writing the new addresses to persistentPeerAddrs, and inspecting persistentConnReqs to decide whether to trigger a reconnect via connectToPersistentPeer. Replace all of that with a single cmdUpdateAddrs to the worker. The worker updates its address list internally and, if it is mid-dial, restarts the round with the new addresses. If no worker exists for the peer (not persistent), sendWorkerCmd returns false and the update is silently ignored — preserving the same filtering the old persistentPeers map check provided. --- server.go | 36 ++++++++++-------------------------- 1 file changed, 10 insertions(+), 26 deletions(-) diff --git a/server.go b/server.go index 579028fe2bd..ba5d331347b 100644 --- a/server.go +++ b/server.go @@ -474,15 +474,6 @@ func (s *server) updatePersistentPeerAddrs() error { SerializeCompressed(), ) - // We only care about updates from - // our persistentPeers. - s.mu.RLock() - _, ok := s.persistentPeers[pubKeyStr] - s.mu.RUnlock() - if !ok { - continue - } - addrs := make([]*lnwire.NetAddress, 0, len(update.Addresses)) @@ -496,25 +487,18 @@ func (s *server) updatePersistentPeerAddrs() error { ) } + // Send the updated addresses to the + // worker. If no worker exists for + // this peer (not persistent), this + // is a no-op. s.mu.Lock() - - // Update the stored addresses for this - // to peer to reflect the new set. - s.persistentPeerAddrs[pubKeyStr] = addrs - - // If there are no outstanding - // connection requests for this peer - // then our work is done since we are - // not currently trying to connect to - // them. - if len(s.persistentConnReqs[pubKeyStr]) == 0 { - s.mu.Unlock() - continue - } - + s.sendWorkerCmd(pubKeyStr, + connWorkerMsg{ + cmd: cmdUpdateAddrs, + addrs: addrs, + }, + ) s.mu.Unlock() - - s.connectToPersistentPeer(pubKeyStr) } } } From 0066c0bb950f831ba87d810e93336c4f4723f9de Mon Sep 17 00:00:00 2001 From: bitromortac Date: Mon, 16 Feb 2026 13:36:47 +0100 Subject: [PATCH 08/14] server: migrate peerTerminationWatcher to workers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a persistent peer disconnects, peerTerminationWatcher must schedule a reconnection after an appropriate backoff. Previously this involved writing the computed backoff to persistentPeersBackoff, merging advertised addresses into persistentPeerAddrs, creating a retry cancel channel in persistentRetryCancels, and spawning an untracked goroutine that slept for the backoff before calling connectToPersistentPeer. Replace all of that with a single cmdConnect carrying the address list and backoff duration. The worker handles the timed wait internally in dialLoop, where it also responds to preempting commands (standDown, stop, address updates) — something the old time.After goroutine could not do. The persistentPeers membership check becomes a persistentWorkers lookup. The backoff is computed via the pure peerBackoff function reading the worker's current backoff, avoiding the old server map. --- server.go | 85 ++++++++++++++++++++----------------------------------- 1 file changed, 30 insertions(+), 55 deletions(-) diff --git a/server.go b/server.go index ba5d331347b..a8cc397a32b 100644 --- a/server.go +++ b/server.go @@ -4777,13 +4777,13 @@ func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) { // in question. s.removePeerUnsafe(ctx, p) - // Next, check to see if this is a persistent peer or not. - if _, ok := s.persistentPeers[pubStr]; !ok { + // Check whether this is a persistent peer via the worker map. + if _, ok := s.persistentWorkers[pubStr]; !ok { return } // Get the last address that we used to connect to the peer. - addrs := []net.Addr{ + rawAddrs := []net.Addr{ p.NetAddress().Address, } @@ -4793,7 +4793,7 @@ func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) { switch { // We found advertised addresses, so use them. case err == nil: - addrs = advertisedAddrs + rawAddrs = advertisedAddrs // The peer doesn't have an advertised address. case err == errNoAdvertisedAddr: @@ -4826,62 +4826,37 @@ func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) { "address for peer", err) } - // Make an easy lookup map so that we can check if an address - // is already in the address list that we have stored for this peer. - existingAddrs := make(map[string]bool) - for _, addr := range s.persistentPeerAddrs[pubStr] { - existingAddrs[addr.String()] = true - } - - // Add any missing addresses for this peer to persistentPeerAddr. - for _, addr := range addrs { - if existingAddrs[addr.String()] { - continue - } - - s.persistentPeerAddrs[pubStr] = append( - s.persistentPeerAddrs[pubStr], - &lnwire.NetAddress{ - IdentityKey: p.IdentityKey(), - Address: addr, - ChainNet: p.NetAddress().ChainNet, - }, - ) + // Convert raw addresses to lnwire.NetAddress. + lnAddrs := make([]*lnwire.NetAddress, 0, len(rawAddrs)) + for _, addr := range rawAddrs { + lnAddrs = append(lnAddrs, &lnwire.NetAddress{ + IdentityKey: p.IdentityKey(), + Address: addr, + ChainNet: p.NetAddress().ChainNet, + }) } - // Record the computed backoff in the backoff map. - backoff := s.nextPeerBackoff(pubStr, p.StartTime()) - s.persistentPeersBackoff[pubStr] = backoff - - // Initialize a retry canceller for this peer if one does not - // exist. - cancelChan, ok := s.persistentRetryCancels[pubStr] - if !ok { - cancelChan = make(chan struct{}) - s.persistentRetryCancels[pubStr] = cancelChan + // Compute the backoff using the pure function so we don't + // depend on the old map. If the worker doesn't exist yet (e.g., for + // an inbound peer that just opened a channel), use zero backoff which + // will result in immediate reconnection. + var currentBackoff time.Duration + if w := s.persistentWorkers[pubStr]; w != nil { + currentBackoff = w.backoff } + backoff := peerBackoff( + currentBackoff, p.StartTime(), s.cfg.MinBackoff, + s.cfg.MaxBackoff, defaultStableConnDuration, + ) - // We choose not to wait group this go routine since the Connect - // call can stall for arbitrarily long if we shutdown while an - // outbound connection attempt is being made. - go func() { - srvrLog.DebugS(ctx, "Scheduling connection "+ - "re-establishment to persistent peer", - "reconnecting_in", backoff) - - select { - case <-time.After(backoff): - case <-cancelChan: - return - case <-s.quit: - return - } - - srvrLog.DebugS(ctx, "Attempting to re-establish persistent "+ - "connection") + srvrLog.DebugS(ctx, "Scheduling connection re-establishment "+ + "to persistent peer", "reconnecting_in", backoff) - s.connectToPersistentPeer(pubStr) - }() + s.sendWorkerCmd(pubStr, connWorkerMsg{ + cmd: cmdConnect, + addrs: lnAddrs, + backoff: backoff, + }) } // connectToPersistentPeer uses all the stored addresses for a peer to attempt From d31e09ee4152075f56336bef285d50c9762029ca Mon Sep 17 00:00:00 2001 From: bitromortac Date: Mon, 16 Feb 2026 13:40:16 +0100 Subject: [PATCH 09/14] server: migrate ConnectToPeer perm path to workers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a user calls ConnectToPeer with perm=true, the server must create a persistent connection that survives disconnects. Previously this wrote to persistentPeers, initialized persistentPeersBackoff, appended a ConnReq to persistentConnReqs, and launched a connmgr goroutine — all under the server lock. Repeated calls accumulated duplicate ConnReqs for the same peer. Replace with getOrCreateWorker followed by a single cmdConnect. The worker is idempotent: sending cmdConnect to an already-dialing worker cancels the current attempt and restarts with the new addresses, so repeated calls converge on one active dial rather than accumulating. Add TestConnectToPeerAccumulation to verify that 10 rapid perm calls produce exactly one worker. --- server.go | 34 ++++++---------------- server_test.go | 77 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 25 deletions(-) diff --git a/server.go b/server.go index a8cc397a32b..5d7d96c1c80 100644 --- a/server.go +++ b/server.go @@ -5056,39 +5056,23 @@ func (s *server) ConnectToPeer(addr *lnwire.NetAddress, // Peer was not found, continue to pursue connection with peer. - // If there's already a pending connection request for this pubkey, - // then we ignore this request to ensure we don't create a redundant - // connection. - if reqs, ok := s.persistentConnReqs[targetPub]; ok { - srvrLog.Warnf("Already have %d persistent connection "+ - "requests for %v, connecting anyway.", len(reqs), addr) - } - // If there's not already a pending or active connection to this node, // then instruct the connection manager to attempt to establish a // persistent connection to the peer. srvrLog.Debugf("Connecting to %v", addr) if perm { - connReq := &connmgr.ConnReq{ - Addr: addr, - Permanent: true, - } - - // Since the user requested a permanent connection, we'll set - // the entry to true which will tell the server to continue - // reconnecting even if the number of channels with this peer is - // zero. - s.persistentPeers[targetPub] = true - if _, ok := s.persistentPeersBackoff[targetPub]; !ok { - s.persistentPeersBackoff[targetPub] = s.cfg.MinBackoff + // Create or retrieve the worker and send a connect + // command. Sending cmdConnect to an existing worker + // naturally replaces its current operation (cancel + + // restart). + addrs := []*lnwire.NetAddress{addr} + w := s.getOrCreateWorker(targetPub, true, addrs) + w.cmdChan <- connWorkerMsg{ + cmd: cmdConnect, + addrs: addrs, } - s.persistentConnReqs[targetPub] = append( - s.persistentConnReqs[targetPub], connReq, - ) s.mu.Unlock() - go s.connMgr.Connect(connReq) - return nil } s.mu.Unlock() diff --git a/server_test.go b/server_test.go index 5f5d352aab7..8d2ed30fe8a 100644 --- a/server_test.go +++ b/server_test.go @@ -1,26 +1,67 @@ package lnd import ( + "errors" + "net" "testing" "time" "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/connmgr" + "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lncfg" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/peer" + "github.com/lightningnetwork/lnd/tor" "github.com/stretchr/testify/require" ) +// mockNet implements tor.Net for testing. All calls return errors +// since tests that use workers mock the dialer at a higher level. +type mockNet struct{} + +var _ tor.Net = (*mockNet)(nil) + +func (m *mockNet) Dial(network, addr string, + timeout time.Duration) (net.Conn, error) { + + return nil, errors.New("mock: no real dialing") +} + +func (m *mockNet) LookupHost(host string) ([]string, error) { + return nil, errors.New("mock: no DNS") +} + +func (m *mockNet) LookupSRV(service, proto, name string, + timeout time.Duration) (string, []*net.SRV, error) { + + return "", nil, errors.New("mock: no SRV") +} + +func (m *mockNet) ResolveTCPAddr(network, + address string) (*net.TCPAddr, error) { + + return nil, errors.New("mock: no resolve") +} + // newTestServer creates a minimal server instance with only the fields needed // for persistent connection management tests. func newTestServer(t *testing.T) *server { t.Helper() + // Generate a test identity key for the server. + identityPriv, err := btcec.NewPrivateKey() + require.NoError(t, err) + s := &server{ cfg: &Config{ MinBackoff: time.Second, + MaxBackoff: time.Hour, Dev: &lncfg.DevConfig{}, + net: &mockNet{}, + }, + identityECDH: &keychain.PrivKeyECDH{ + PrivKey: identityPriv, }, persistentPeers: make(map[string]bool), persistentPeersBackoff: make(map[string]time.Duration), @@ -180,6 +221,42 @@ func TestNodeAnnouncementTimestampComparison(t *testing.T) { } } +// TestConnectToPeerAccumulation verifies that repeated ConnectToPeer calls +// with perm=true result in a single worker, not accumulated ConnReqs. +func TestConnectToPeerAccumulation(t *testing.T) { + t.Parallel() + + s := newTestServer(t) + + pubKey := generateTestPubKey(t) + addr := &lnwire.NetAddress{ + IdentityKey: pubKey, + Address: &net.TCPAddr{IP: net.ParseIP("1.2.3.4"), Port: 9735}, + } + + targetPub := string(pubKey.SerializeCompressed()) + + // Call ConnectToPeer 10 times with perm=true. With workers, + // each call reuses the same worker and sends cmdConnect. + for i := 0; i < 10; i++ { + err := s.ConnectToPeer(addr, true, time.Second) + require.NoError(t, err) + } + + s.mu.Lock() + w, ok := s.persistentWorkers[targetPub] + s.mu.Unlock() + + // Only one worker should exist, and it should be perm. + require.True(t, ok, "worker should exist") + require.True(t, w.perm, "worker should be perm") + + // Clean up workers. + s.mu.Lock() + s.stopWorker(targetPub) + s.mu.Unlock() +} + // TestPeerBackoff tests the pure peerBackoff function with table-driven cases // covering zero backoff, short-lived connections (doubles), stable connections // (reduces), and capping at max. From 5cf201b9a70271090fb1b824903fddf666e4deb1 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Mon, 16 Feb 2026 13:42:23 +0100 Subject: [PATCH 10/14] server: migrate remaining persistent-peer callers to workers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wire the remaining server methods into the worker map so that all persistent-peer lifecycle operations flow through connWorker: DisconnectPeer now calls stopWorker alongside the existing ConnReq cancellation, ensuring the worker goroutine exits when a user explicitly disconnects a peer. prunePersistentPeerConnection and bannedPersistentPeerConnection check the worker's perm flag and stop non-perm workers when the last channel closes or the peer is banned, mirroring the existing persistentPeers map logic. The WatchNewChannel callback uses getOrCreateWorker to ensure a worker exists when a new channel is opened with a peer. Since the peer is already connected, no cmdConnect is sent — the worker starts idle and will handle reconnection if the peer disconnects later. createBootstrapIgnorePeers iterates persistentWorkers instead of persistentPeers so the autopilot ignores peers that have dedicated workers. --- server.go | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/server.go b/server.go index 5d7d96c1c80..277eecbbef0 100644 --- a/server.go +++ b/server.go @@ -1537,17 +1537,15 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, WatchNewChannel: func(channel *channeldb.OpenChannel, peerKey *btcec.PublicKey) error { - // First, we'll mark this new peer as a persistent peer - // for re-connection purposes. If the peer is not yet - // tracked or the user hasn't requested it to be perm, - // we'll set false to prevent the server from continuing - // to connect to this peer even if the number of - // channels with this peer is zero. + // Ensure a worker exists for this peer so we + // maintain a persistent connection. The peer is + // already connected (we just opened a channel), + // so no cmdConnect is sent — the worker starts + // idle. If a worker already exists, this is a + // no-op. s.mu.Lock() pubStr := string(peerKey.SerializeCompressed()) - if _, ok := s.persistentPeers[pubStr]; !ok { - s.persistentPeers[pubStr] = false - } + s.getOrCreateWorker(pubStr, false, nil) s.mu.Unlock() // With that taken care of, we'll send this channel to @@ -2988,7 +2986,7 @@ func (s *server) createBootstrapIgnorePeers() map[autopilot.NodeID]struct{} { // Ignore all persistent peers as they have a dedicated reconnecting // process. - for pubKeyStr := range s.persistentPeers { + for pubKeyStr := range s.persistentWorkers { var nID autopilot.NodeID copy(nID[:], []byte(pubKeyStr)) ignore[nID] = struct{}{} @@ -3665,6 +3663,12 @@ func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) { pubKeyStr := string(compressedPubKey[:]) s.mu.Lock() + + // Check the worker map: if the worker is non-perm, stop it. + if w, ok := s.persistentWorkers[pubKeyStr]; ok && !w.perm { + s.stopWorker(pubKeyStr) + } + if perm, ok := s.persistentPeers[pubKeyStr]; ok && !perm { delete(s.persistentPeers, pubKeyStr) delete(s.persistentPeersBackoff, pubKeyStr) @@ -3689,6 +3693,11 @@ func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) { // // NOTE: The server's write lock MUST be held when this is called. func (s *server) bannedPersistentPeerConnection(remotePub string) { + // Stop the worker if the peer is non-perm. + if w, ok := s.persistentWorkers[remotePub]; ok && !w.perm { + s.stopWorker(remotePub) + } + if perm, ok := s.persistentPeers[remotePub]; ok && !perm { delete(s.persistentPeers, remotePub) delete(s.persistentPeersBackoff, remotePub) @@ -5140,6 +5149,7 @@ func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error { srvrLog.Infof("Disconnecting from %v", peer) s.cancelConnReqs(pubStr, nil) + s.stopWorker(pubStr) // If this peer was formerly a persistent connection, then we'll remove // them from this map so we don't attempt to re-connect after we From 069bf226a4851ea5d68bb5a6578a64198abe7d4d Mon Sep 17 00:00:00 2001 From: bitromortac Date: Mon, 16 Feb 2026 13:50:46 +0100 Subject: [PATCH 11/14] server: remove old persistent connection maps and dead code Delete the five persistent-connection maps (persistentPeers, persistentPeersBackoff, persistentPeerAddrs, persistentConnReqs, persistentRetryCancels) and all code that operated on them, now that per-peer connection workers handle the full dial/retry/backoff lifecycle. Removed functions: cancelConnReqs, connectToPersistentPeer, delayInitialReconnect, nextPeerBackoff (server method). Removed constants: UnassignedConnID, maxPersistentConnReqsPerPeer. Removed tests: TestConnectToPersistentPeerGoroutineRace, TestCancelConnReqsUnassignedID, TestOutboundPeerConnectedInboundCleanup (the bugs they reproduced are structurally impossible with workers). Cleaned up InboundPeerConnected and OutboundPeerConnected to remove cancelConnReqs calls and the persistentConnReqs existence check. --- server.go | 269 +++---------------------------------------------- server_test.go | 6 -- 2 files changed, 16 insertions(+), 259 deletions(-) diff --git a/server.go b/server.go index 277eecbbef0..32b85c2d804 100644 --- a/server.go +++ b/server.go @@ -281,16 +281,6 @@ type server struct { peerConnectedListeners map[string][]chan<- lnpeer.Peer peerDisconnectedListeners map[string][]chan<- struct{} - // TODO(yy): the Brontide.Start doesn't know this value, which means it - // will continue to send messages even if there are no active channels - // and the value below is false. Once it's pruned, all its connections - // will be closed, thus the Brontide.Start will return an error. - persistentPeers map[string]bool - persistentPeersBackoff map[string]time.Duration - persistentPeerAddrs map[string][]*lnwire.NetAddress - persistentConnReqs map[string][]*connmgr.ConnReq - persistentRetryCancels map[string]chan struct{} - // persistentWorkers maps each persistent peer's pubkey to its // connection worker. A worker's existence means the peer is persistent; // its backoff, addresses, and dial state live inside the worker. @@ -700,11 +690,6 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, torController: torController, - persistentPeers: make(map[string]bool), - persistentPeersBackoff: make(map[string]time.Duration), - persistentConnReqs: make(map[string][]*connmgr.ConnReq), - persistentPeerAddrs: make(map[string][]*lnwire.NetAddress), - persistentRetryCancels: make(map[string]chan struct{}), persistentWorkers: make(map[string]*connWorker), peerErrors: make(map[string]*queue.CircularBuffer), ignorePeerTermination: make(map[*peer.Brontide]struct{}), @@ -2967,7 +2952,7 @@ func initNetworkBootstrappers(s *server) ([]discovery.NetworkPeerBootstrapper, e // - the node itself needs to be skipped as it doesn't make sense to connect // to itself. // - the peers that already have connections with, as in s.peersByPub. -// - the peers that we are attempting to connect, as in s.persistentPeers. +// - the peers that we are attempting to connect, as in s.persistentWorkers. func (s *server) createBootstrapIgnorePeers() map[autopilot.NodeID]struct{} { s.mu.RLock() defer s.mu.RUnlock() @@ -3643,19 +3628,6 @@ func (s *server) establishPersistentConnections(ctx context.Context) error { return nil } -// delayInitialReconnect will attempt a reconnection to the given peer after -// sampling a value for the delay between 0s and the maxInitReconnectDelay. -// -// NOTE: This method MUST be run as a goroutine. -func (s *server) delayInitialReconnect(pubStr string) { - delay := time.Duration(prand.Intn(maxInitReconnectDelay)) * time.Second - select { - case <-time.After(delay): - s.connectToPersistentPeer(pubStr) - case <-s.quit: - } -} - // prunePersistentPeerConnection removes all internal state related to // persistent connections to a peer within the server. This is used to avoid // persistent connection retries to peers we do not have any open channels with. @@ -3663,25 +3635,23 @@ func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) { pubKeyStr := string(compressedPubKey[:]) s.mu.Lock() + defer s.mu.Unlock() - // Check the worker map: if the worker is non-perm, stop it. - if w, ok := s.persistentWorkers[pubKeyStr]; ok && !w.perm { - s.stopWorker(pubKeyStr) + w, ok := s.persistentWorkers[pubKeyStr] + if !ok { + return } - if perm, ok := s.persistentPeers[pubKeyStr]; ok && !perm { - delete(s.persistentPeers, pubKeyStr) - delete(s.persistentPeersBackoff, pubKeyStr) - delete(s.persistentPeerAddrs, pubKeyStr) - s.cancelConnReqs(pubKeyStr, nil) - s.mu.Unlock() - - srvrLog.Infof("Pruned peer %x from persistent connections, "+ - "peer has no open channels", compressedPubKey) - + // Only stop non-perm workers. Perm workers persist even with + // zero channels. + if w.perm { return } - s.mu.Unlock() + + s.stopWorker(pubKeyStr) + + srvrLog.Infof("Pruned peer %x from persistent connections, "+ + "peer has no open channels", compressedPubKey) } // bannedPersistentPeerConnection does not actually "ban" a persistent peer. It @@ -3693,17 +3663,9 @@ func (s *server) prunePersistentPeerConnection(compressedPubKey [33]byte) { // // NOTE: The server's write lock MUST be held when this is called. func (s *server) bannedPersistentPeerConnection(remotePub string) { - // Stop the worker if the peer is non-perm. if w, ok := s.persistentWorkers[remotePub]; ok && !w.perm { s.stopWorker(remotePub) } - - if perm, ok := s.persistentPeers[remotePub]; ok && !perm { - delete(s.persistentPeers, remotePub) - delete(s.persistentPeersBackoff, remotePub) - delete(s.persistentPeerAddrs, remotePub) - s.cancelConnReqs(remotePub, nil) - } } // getOrCreateWorker returns the existing connection worker for the @@ -3964,25 +3926,6 @@ func (s *server) findPeerByPubStr(pubStr string) (*peer.Brontide, error) { return peer, nil } -// nextPeerBackoff computes the next backoff duration for a peer's pubkey using -// exponential backoff. If no previous backoff was known, the default is -// returned. -func (s *server) nextPeerBackoff(pubStr string, - startTime time.Time) time.Duration { - - // Now, determine the appropriate backoff to use for the retry. - backoff, ok := s.persistentPeersBackoff[pubStr] - if !ok { - // If an existing backoff was unknown, use the default. - return s.cfg.MinBackoff - } - - return peerBackoff( - backoff, startTime, s.cfg.MinBackoff, s.cfg.MaxBackoff, - defaultStableConnDuration, - ) -} - // peerBackoff computes the next reconnection backoff for a persistent peer // given the current backoff, the time the peer was last started, and the // configured bounds. A zero startTime indicates the peer failed to start. @@ -4096,7 +4039,6 @@ func (s *server) InboundPeerConnected(conn net.Conn) { case ErrPeerNotConnected: // We were unable to locate an existing connection with the // target peer, proceed to connect. - s.cancelConnReqs(pubStr, nil) s.sendWorkerCmd(pubStr, connWorkerMsg{cmd: cmdStandDown}) s.peerConnected(conn, nil, true) @@ -4127,7 +4069,6 @@ func (s *server) InboundPeerConnected(conn net.Conn) { // disconnect our already connected peer. srvrLog.DebugS(ctx, "Disconnecting stale connection") - s.cancelConnReqs(pubStr, nil) s.sendWorkerCmd(pubStr, connWorkerMsg{cmd: cmdStandDown}) // Remove the current peer from the server's internal state and @@ -4161,36 +4102,25 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) s.mu.Lock() defer s.mu.Unlock() - // If we already have an inbound connection to this peer, then ignore - // this new connection. + // If we already have an inbound connection to this peer, then + // ignore this new connection. The worker is told to stand down + // so it stops dialing. if p, ok := s.inboundPeers[pubStr]; ok { srvrLog.Debugf("Already have inbound connection for %v, "+ "ignoring outbound connection from local=%v, remote=%v", p, conn.LocalAddr(), conn.RemoteAddr()) - s.cancelConnReqs(pubStr, nil) s.sendWorkerCmd(pubStr, connWorkerMsg{cmd: cmdStandDown}) conn.Close() return } - if _, ok := s.persistentConnReqs[pubStr]; !ok && connReq != nil { - srvrLog.Debugf("Ignoring canceled outbound connection") - s.connMgr.Remove(connReq.ID()) - conn.Close() - return - } // If we already have a valid connection that is scheduled to take // precedence once the prior peer has finished disconnecting, we'll // ignore this connection. if _, ok := s.scheduledPeerConnection[pubStr]; ok { srvrLog.Debugf("Ignoring connection, peer already scheduled") - - if connReq != nil { - s.connMgr.Remove(connReq.ID()) - } - conn.Close() return } @@ -4198,18 +4128,6 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) srvrLog.Infof("Established outbound connection to: %x@%v", pubStr, conn.RemoteAddr()) - if connReq != nil { - // A successful connection was returned by the connmgr. - // Immediately cancel all pending requests, excluding the - // outbound connection we just established. - ignore := connReq.ID() - s.cancelConnReqs(pubStr, &ignore) - } else { - // This was a successful connection made by some other - // subsystem. Remove all requests being managed by the connmgr. - s.cancelConnReqs(pubStr, nil) - } - // If we already have a connection with this peer, decide whether or not // we need to drop the stale connection. We forgo adding a default case // as we expect these to be the only error values returned from @@ -4263,57 +4181,6 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) } } -// UnassignedConnID is the default connection ID that a request can have before -// it actually is submitted to the connmgr. -// TODO(conner): move into connmgr package, or better, add connmgr method for -// generating atomic IDs -const UnassignedConnID uint64 = 0 - -// cancelConnReqs stops all persistent connection requests for a given pubkey. -// Any attempts initiated by the peerTerminationWatcher are canceled first. -// Afterwards, each connection request removed from the connmgr. The caller can -// optionally specify a connection ID to ignore, which prevents us from -// canceling a successful request. All persistent connreqs for the provided -// pubkey are discarded after the operationjw. -func (s *server) cancelConnReqs(pubStr string, skip *uint64) { - // First, cancel any lingering persistent retry attempts, which will - // prevent retries for any with backoffs that are still maturing. - if cancelChan, ok := s.persistentRetryCancels[pubStr]; ok { - close(cancelChan) - delete(s.persistentRetryCancels, pubStr) - } - - // Next, check to see if we have any outstanding persistent connection - // requests to this peer. If so, then we'll remove all of these - // connection requests, and also delete the entry from the map. - connReqs, ok := s.persistentConnReqs[pubStr] - if !ok { - return - } - - for _, connReq := range connReqs { - srvrLog.Tracef("Canceling %s:", connReqs) - - // Atomically capture the current request identifier. - connID := connReq.ID() - - // Skip any zero IDs, this indicates the request has not - // yet been schedule. - if connID == UnassignedConnID { - continue - } - - // Skip a particular connection ID if instructed. - if skip != nil && connID == *skip { - continue - } - - s.connMgr.Remove(connID) - } - - delete(s.persistentConnReqs, pubStr) -} - // handleCustomMessage dispatches an incoming custom peers message to // subscribers. func (s *server) handleCustomMessage(peer [33]byte, msg *lnwire.Custom) error { @@ -4868,103 +4735,6 @@ func (s *server) peerTerminationWatcher(p *peer.Brontide, ready chan struct{}) { }) } -// connectToPersistentPeer uses all the stored addresses for a peer to attempt -// to connect to the peer. It creates connection requests if there are -// currently none for a given address and it removes old connection requests -// if the associated address is no longer in the latest address list for the -// peer. -func (s *server) connectToPersistentPeer(pubKeyStr string) { - s.mu.Lock() - defer s.mu.Unlock() - - // Create an easy lookup map of the addresses we have stored for the - // peer. We will remove entries from this map if we have existing - // connection requests for the associated address and then any leftover - // entries will indicate which addresses we should create new - // connection requests for. - addrMap := make(map[string]*lnwire.NetAddress) - for _, addr := range s.persistentPeerAddrs[pubKeyStr] { - addrMap[addr.String()] = addr - } - - // Go through each of the existing connection requests and - // check if they correspond to the latest set of addresses. If - // there is a connection requests that does not use one of the latest - // advertised addresses then remove that connection request. - var updatedConnReqs []*connmgr.ConnReq - for _, connReq := range s.persistentConnReqs[pubKeyStr] { - lnAddr := connReq.Addr.(*lnwire.NetAddress).Address.String() - - switch _, ok := addrMap[lnAddr]; ok { - // If the existing connection request is using one of the - // latest advertised addresses for the peer then we add it to - // updatedConnReqs and remove the associated address from - // addrMap so that we don't recreate this connReq later on. - case true: - updatedConnReqs = append( - updatedConnReqs, connReq, - ) - delete(addrMap, lnAddr) - - // If the existing connection request is using an address that - // is not one of the latest advertised addresses for the peer - // then we remove the connecting request from the connection - // manager. - case false: - srvrLog.Info( - "Removing conn req:", connReq.Addr.String(), - ) - s.connMgr.Remove(connReq.ID()) - } - } - - s.persistentConnReqs[pubKeyStr] = updatedConnReqs - - cancelChan, ok := s.persistentRetryCancels[pubKeyStr] - if !ok { - cancelChan = make(chan struct{}) - s.persistentRetryCancels[pubKeyStr] = cancelChan - } - - // Any addresses left in addrMap are new ones that we have not made - // connection requests for. So create new connection requests for those. - // If there is more than one address in the address map, stagger the - // creation of the connection requests for those. - go func() { - ticker := time.NewTicker(multiAddrConnectionStagger) - defer ticker.Stop() - - for _, addr := range addrMap { - // Send the persistent connection request to the - // connection manager, saving the request itself so we - // can cancel/restart the process as needed. - connReq := &connmgr.ConnReq{ - Addr: addr, - Permanent: true, - } - - s.mu.Lock() - s.persistentConnReqs[pubKeyStr] = append( - s.persistentConnReqs[pubKeyStr], connReq, - ) - s.mu.Unlock() - - srvrLog.Debugf("Attempting persistent connection to "+ - "channel peer %v", addr) - - go s.connMgr.Connect(connReq) - - select { - case <-s.quit: - return - case <-cancelChan: - return - case <-ticker.C: - } - } - }() -} - // removePeerUnsafe removes the passed peer from the server's state of all // active peers. // @@ -5148,15 +4918,8 @@ func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error { srvrLog.Infof("Disconnecting from %v", peer) - s.cancelConnReqs(pubStr, nil) s.stopWorker(pubStr) - // If this peer was formerly a persistent connection, then we'll remove - // them from this map so we don't attempt to re-connect after we - // disconnect. - delete(s.persistentPeers, pubStr) - delete(s.persistentPeersBackoff, pubStr) - // Remove the peer by calling Disconnect. Previously this was done with // removePeerUnsafe, which bypassed the peerTerminationWatcher. // diff --git a/server_test.go b/server_test.go index 8d2ed30fe8a..0a83c4ce5f7 100644 --- a/server_test.go +++ b/server_test.go @@ -7,7 +7,6 @@ import ( "time" "github.com/btcsuite/btcd/btcec/v2" - "github.com/btcsuite/btcd/connmgr" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lncfg" "github.com/lightningnetwork/lnd/lnwire" @@ -63,11 +62,6 @@ func newTestServer(t *testing.T) *server { identityECDH: &keychain.PrivKeyECDH{ PrivKey: identityPriv, }, - persistentPeers: make(map[string]bool), - persistentPeersBackoff: make(map[string]time.Duration), - persistentConnReqs: make(map[string][]*connmgr.ConnReq), - persistentPeerAddrs: make(map[string][]*lnwire.NetAddress), - persistentRetryCancels: make(map[string]chan struct{}), persistentWorkers: make(map[string]*connWorker), peersByPub: make(map[string]*peer.Brontide), inboundPeers: make(map[string]*peer.Brontide), From 2371b008d8f3a8b1d13368c9e4d8cef4f016bbab Mon Sep 17 00:00:00 2001 From: bitromortac Date: Tue, 10 Mar 2026 11:24:41 +0100 Subject: [PATCH 12/14] temp: add docs --- ...602181000-Persistent-Connection-Workers.md | 37 +++++++++++ zk/202602181001-connworker-run-loop.md | 38 +++++++++++ ...81002-connworker-object-context-diagram.md | 48 ++++++++++++++ ...81003-connworker-execution-flow-diagram.md | 63 +++++++++++++++++++ zk/202602181004-brontide-dial-context.md | 26 ++++++++ 5 files changed, 212 insertions(+) create mode 100644 zk/202602181000-Persistent-Connection-Workers.md create mode 100644 zk/202602181001-connworker-run-loop.md create mode 100644 zk/202602181002-connworker-object-context-diagram.md create mode 100644 zk/202602181003-connworker-execution-flow-diagram.md create mode 100644 zk/202602181004-brontide-dial-context.md diff --git a/zk/202602181000-Persistent-Connection-Workers.md b/zk/202602181000-Persistent-Connection-Workers.md new file mode 100644 index 00000000000..d682b949412 --- /dev/null +++ b/zk/202602181000-Persistent-Connection-Workers.md @@ -0,0 +1,37 @@ +# Persistent Connection Workers + +LND maintains persistent connections to peers it shares channels with. The +server must track which peers are persistent, manage reconnection backoff, store +known addresses, and coordinate dial attempts — all while responding to inbound +connections, gossip address updates, user-initiated connects, and channel +lifecycle events. + +The original design spread this state across five loosely coupled maps +(`persistentPeers`, `persistentPeersBackoff`, `persistentPeerAddrs`, +`persistentConnReqs`, `persistentRetryCancels`) and delegated dialing to the +connection manager. Multiple server methods mutated these maps from different +goroutines, creating race windows where connection requests could accumulate +without bound. + +The worker architecture consolidates the entire dial/retry/backoff lifecycle for +each persistent peer into a single [[202602181001-connworker-run-loop.md]] +goroutine that the server communicates with via a typed command channel. A +worker's existence in `persistentWorkers` is the sole indicator that a peer is +persistent; its removal means the peer was pruned or disconnected. + +The server interacts with workers through three helpers: `getOrCreateWorker` +(creates or returns the existing worker), `stopWorker` (sends stop and removes +from map), and `sendWorkerCmd` (delivers a command to an existing worker). The +command vocabulary is small: connect, update addresses, stand down, and stop. + +The structural relationships between these objects are visualized in +[[202602181002-connworker-object-context-diagram.md]]. The runtime execution +flow with error paths is shown in +[[202602181003-connworker-execution-flow-diagram.md]]. + +Tags: #architecture #persistent-connections #peer-management + +## References +- Execution model: [[202602181001-connworker-run-loop.md]] +- Structural overview: [[202602181002-connworker-object-context-diagram.md]] +- Runtime flow: [[202602181003-connworker-execution-flow-diagram.md]] diff --git a/zk/202602181001-connworker-run-loop.md b/zk/202602181001-connworker-run-loop.md new file mode 100644 index 00000000000..7cbd8a1d219 --- /dev/null +++ b/zk/202602181001-connworker-run-loop.md @@ -0,0 +1,38 @@ +# Connection Worker Run Loop + +Each persistent peer gets a single `connWorker` goroutine that owns its +dial/retry/backoff lifecycle. The worker starts idle and transitions between two +states: idle (waiting for commands) and dialing (actively attempting +connections). + +In the idle state, the worker blocks on its command channel. A `cmdConnect` +message transitions it into the dial loop, carrying the target addresses and an +initial backoff duration. `cmdUpdateAddrs` replaces the address list without +starting a dial. `cmdStandDown` is a no-op when idle. `cmdStop` and the server's +quit channel terminate the goroutine. + +The dial loop first waits for the backoff duration (skipped when zero), then +iterates over addresses with a stagger delay between each. Every dial runs in a +child goroutine under a cancellable context — the +[[202602181004-brontide-dial-context.md]] makes the encrypted handshake itself +interruptible — so the worker can respond to preempting commands or shutdown +without leaking goroutines. When a command arrives mid-dial, the context is +canceled, the dial goroutine is drained, and the command is dispatched. + +On dial failure across all addresses, the backoff increases via exponential +backoff with randomized jitter. On success, the connection is delivered through +the `onConnection` callback and the worker returns to idle. + +The backoff is computed by the pure `peerBackoff` function which considers +connection stability: short-lived connections double the backoff, stable +connections (exceeding `stableConnDuration`) reduce it, and very stable +connections reset to `minBackoff`. + +The buffered command channel (capacity 1) ensures the server never blocks when +sending commands under normal operation. + +Tags: #architecture #persistent-connections #concurrency + +## References +- Cancellable dial mechanism enabling mid-dial preemption: + [[202602181004-brontide-dial-context.md]] diff --git a/zk/202602181002-connworker-object-context-diagram.md b/zk/202602181002-connworker-object-context-diagram.md new file mode 100644 index 00000000000..ec8c1885a68 --- /dev/null +++ b/zk/202602181002-connworker-object-context-diagram.md @@ -0,0 +1,48 @@ +# Connection Worker Object Context + +Structural overview of how the server communicates with per-peer connection +workers and what each caller needs from the worker layer. + +```mermaid +flowchart LR + subgraph triggers["Trigger Events"] + startup["Startup"] + gossip["Gossip addr update"] + disconnect["Peer disconnects"] + user["User: ConnectToPeer"] + inbound["Inbound connection"] + prune["Channel close / ban"] + end + + subgraph helpers["Server Helpers"] + create["getOrCreateWorker"] + send["sendWorkerCmd"] + stop["stopWorker"] + end + + subgraph worker["connWorker goroutine"] + cmdChan["cmdChan"] + loop["Run / dialLoop"] + dial["DialContext"] + end + + startup -->|cmdConnect| create + gossip -->|cmdUpdateAddrs| send + disconnect -->|cmdConnect + backoff| send + user -->|cmdConnect| create + inbound -->|cmdStandDown| send + prune -->|cmdStop| stop + + create --> cmdChan + send --> cmdChan + stop --> cmdChan + + cmdChan --> loop --> dial + dial -->|success| cb["OutboundPeerConnected"] +``` + +Tags: #diagram #architecture #persistent-connections + +## References +- Visualizes the trigger events and commands handled by: + [[202602181001-connworker-run-loop.md]] diff --git a/zk/202602181003-connworker-execution-flow-diagram.md b/zk/202602181003-connworker-execution-flow-diagram.md new file mode 100644 index 00000000000..8f99b733624 --- /dev/null +++ b/zk/202602181003-connworker-execution-flow-diagram.md @@ -0,0 +1,63 @@ +# Connection Worker Execution Flow + +Runtime flow of a single `connWorker` goroutine showing state transitions, +command handling at each stage, and error paths. + +```mermaid +flowchart TD + Start["Run() starts"] --> Idle["IDLE: select cmdChan or quit"] + + Idle -->|cmdConnect| SetState["Update addrs + backoff"] + Idle -->|cmdUpdateAddrs| UpdateIdle["Update addrs only"] + UpdateIdle --> Idle + Idle -->|cmdStandDown| Idle + Idle -->|cmdStop| Exit["EXIT"] + Idle -->|quit| Exit + + SetState --> EnterLoop["dialLoop()"] + + EnterLoop --> CheckBackoff{"backoff > 0?"} + CheckBackoff -->|no| StartDial["tryAllAddresses()"] + CheckBackoff -->|yes| WaitTimer["Timer: select timer/cmd/quit"] + + WaitTimer -->|timer fires| StartDial + WaitTimer -->|cmdConnect| SetState + WaitTimer -->|cmdUpdateAddrs| EnterLoop + WaitTimer -->|cmdStandDown| Idle + WaitTimer -->|cmdStop or quit| Exit + + StartDial --> ForAddr["For each addr i=0..N"] + + ForAddr -->|"i > 0"| Stagger["Stagger delay: select timer/cmd/quit"] + ForAddr -->|"i == 0"| SpawnDial["Spawn dial goroutine"] + + Stagger -->|timer fires| SpawnDial + Stagger -->|cmd| CancelRound["cancel ctx, handleMidDial()"] + Stagger -->|quit| DrainExit["cancel ctx"] --> Exit + + SpawnDial --> DialSelect["select: result / cmd / quit"] + + DialSelect -->|"err != nil"| NextAddr{"More addrs?"} + DialSelect -->|"conn != nil"| Deliver["onConnection(conn)"] --> Idle + DialSelect -->|cmd| CancelDrain["cancel ctx, drain goroutine, handleMidDial()"] + DialSelect -->|quit| QuitDrain["cancel ctx, drain goroutine"] --> Exit + + NextAddr -->|yes| ForAddr + NextAddr -->|no| BumpBackoff["Increase backoff"] + BumpBackoff --> EnterLoop + + CancelRound -->|cmdConnect| SetState + CancelRound -->|cmdUpdateAddrs| EnterLoop + CancelRound -->|cmdStandDown| Idle + CancelRound -->|cmdStop| Exit + + CancelDrain -->|cmdConnect| SetState + CancelDrain -->|cmdUpdateAddrs| EnterLoop + CancelDrain -->|cmdStandDown| Idle + CancelDrain -->|cmdStop| Exit +``` + +Tags: #diagram #architecture #persistent-connections #concurrency + +## References +- Visualizes runtime of: [[202602181001-connworker-run-loop.md]] diff --git a/zk/202602181004-brontide-dial-context.md b/zk/202602181004-brontide-dial-context.md new file mode 100644 index 00000000000..16fe90dd248 --- /dev/null +++ b/zk/202602181004-brontide-dial-context.md @@ -0,0 +1,26 @@ +# Context-Cancellable Brontide Dial + +Establishing a Brontide connection has two phases: a TCP connection and a +multi-round Noise protocol handshake. The TCP phase honours context cancellation +natively — the OS-level connect call propagates it through the injected +context-aware dialer. + +The handshake phase is harder to interrupt. Its reads and writes are blocking +I/O with no built-in context support. The chosen approach treats connection +closure as a cancellation proxy: a watcher goroutine listens on the context's +done channel and closes the underlying TCP connection when signalled. This +immediately unblocks any in-progress handshake read or write, causing it to +return an error. + +A completion signal prevents the watcher from closing a connection that finished +its handshake successfully. The caller signals this channel before returning the +live connection, neutralising the watcher before it can act. + +This design enables the [[202602181001-connworker-run-loop.md]] to preempt +in-progress dials when commands arrive mid-handshake, without leaking goroutines +or leaving zombie connections open. + +Tags: #architecture #networking #brontide #concurrency + +## References +- Enables mid-dial preemption in: [[202602181001-connworker-run-loop.md]] From 299edeb972924046d9456519be9d7b200a16dc44 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Tue, 10 Mar 2026 12:01:54 +0100 Subject: [PATCH 13/14] temp+server: add test reproducing stopWorker orphaning on full cmdChan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the worker's cmdChan buffer is full, stopWorker silently drops cmdStop yet unconditionally removes the worker from persistentWorkers. The test confirms the goroutine continues running after removal — unreachable by the server until shutdown. --- server_test.go | 90 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) diff --git a/server_test.go b/server_test.go index 0a83c4ce5f7..b59fedca69c 100644 --- a/server_test.go +++ b/server_test.go @@ -1,6 +1,7 @@ package lnd import ( + "context" "errors" "net" "testing" @@ -402,6 +403,95 @@ func TestStopWorker(t *testing.T) { require.NotContains(t, s.persistentWorkers, pubStr) } +// TestStopWorkerDropsCmdOnFullChannel demonstrates that stopWorker silently +// drops cmdStop when the worker's command channel is already full. The worker +// is removed from the map but its goroutine continues running — it becomes +// orphaned and can only be stopped by closing s.quit. +func TestStopWorkerDropsCmdOnFullChannel(t *testing.T) { + t.Parallel() + + s := newTestServer(t) + + pubKey := generateTestPubKey(t) + pubStr := string(pubKey.SerializeCompressed()) + + // Create a worker with a slow dial so it stays in the dial loop. + dialStarted := make(chan struct{}) + blockDial := make(chan struct{}) + s.getOrCreateWorker(pubStr, false, nil) + w := s.persistentWorkers[pubStr] + + // Override the dial function to block until we release it. + w.cfg.dialContext = func(ctx context.Context, + addr *lnwire.NetAddress) (net.Conn, error) { + + close(dialStarted) + <-blockDial + return nil, errors.New("blocked dial released") + } + + addr := &lnwire.NetAddress{ + IdentityKey: pubKey, + Address: &net.TCPAddr{IP: net.ParseIP("1.2.3.4"), Port: 9735}, + } + + // Send cmdConnect to start the worker dialing. This fills the buffer + // momentarily but the worker drains it and enters dialLoop. + w.cmdChan <- connWorkerMsg{ + cmd: cmdConnect, + addrs: []*lnwire.NetAddress{addr}, + } + + // Wait for the worker to be inside the dial goroutine. + select { + case <-dialStarted: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for dial to start") + } + + // Now fill the command channel while the worker is blocked in dial. + // The worker is inside tryAllAddresses waiting on resultCh, so it + // hasn't drained cmdChan yet. We send a command to fill the buffer. + w.cmdChan <- connWorkerMsg{cmd: cmdStandDown} + + // Channel is now full (capacity 1). stopWorker should fail to deliver + // cmdStop. + s.stopWorker(pubStr) + + // The worker is removed from the map... + require.NotContains(t, s.persistentWorkers, pubStr) + + // ...but the goroutine is still alive. Release the dial so the worker + // can process commands again. + close(blockDial) + + // Give the worker time to process the cmdStandDown (which it received + // instead of cmdStop) and return to idle. + time.Sleep(100 * time.Millisecond) + + // The worker goroutine is still running in its idle select — it never + // received cmdStop. We can prove this by sending it another command. + // If the goroutine had exited, this send would block forever (no + // reader). + workerAlive := make(chan bool, 1) + go func() { + select { + case w.cmdChan <- connWorkerMsg{cmd: cmdStop}: + workerAlive <- true + case <-time.After(2 * time.Second): + workerAlive <- false + } + }() + + alive := <-workerAlive + require.True(t, alive, + "worker goroutine is still running despite being "+ + "removed from persistentWorkers — orphaned") + + // Clean up: close quit to stop the orphaned worker. + close(s.quit) +} + // TestSendWorkerCmdNoWorker verifies that sendWorkerCmd returns false when no // worker exists for the peer. func TestSendWorkerCmdNoWorker(t *testing.T) { From 3c3d0f23e1405a410d58c6b3c11be174a30cd238 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Tue, 10 Mar 2026 12:05:19 +0100 Subject: [PATCH 14/14] temp+server: add test reproducing ConnectToPeer deadlock on full cmdChan When ConnectToPeer (perm path) does a blocking send on w.cmdChan while holding s.mu, it deadlocks if the worker is inside onConnection which also acquires s.mu via OutboundPeerConnected. The test fills the channel while the worker is stalled in onConnection and confirms the blocking send cannot complete. --- server_test.go | 166 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 166 insertions(+) diff --git a/server_test.go b/server_test.go index b59fedca69c..3ed79176cb7 100644 --- a/server_test.go +++ b/server_test.go @@ -403,6 +403,172 @@ func TestStopWorker(t *testing.T) { require.NotContains(t, s.persistentWorkers, pubStr) } +// TestConnectToPeerDeadlockOnFullChannel demonstrates that a blocking channel +// send in ConnectToPeer (perm path) while holding s.mu deadlocks against the +// worker's onConnection callback which also needs s.mu. +// +// Deadlock diagram: +// +// Goroutine A (ConnectToPeer): holds s.mu → blocks on w.cmdChan +// Goroutine B (worker): holds cmdChan slot (in onConnection) → blocks on s.mu +// +// The test uses a timeout to detect the deadlock rather than hanging forever. +func TestConnectToPeerDeadlockOnFullChannel(t *testing.T) { + t.Parallel() + + s := newTestServer(t) + + pubKey := generateTestPubKey(t) + pubStr := string(pubKey.SerializeCompressed()) + + // onConnectionReached signals that the worker's onConnection callback + // has been entered. We use this to coordinate the deadlock scenario. + onConnectionReached := make(chan struct{}) + + // onConnectionBlock keeps the worker inside onConnection so we can + // attempt the competing s.mu acquisition. + onConnectionBlock := make(chan struct{}) + + // Create the worker manually so we can control the callbacks. + cfg := connWorkerCfg{ + dialContext: func(ctx context.Context, + addr *lnwire.NetAddress) (net.Conn, error) { + + // Return a mock connection that succeeds immediately. + return &mockConn{}, nil + }, + onConnection: func(conn net.Conn, + addr *lnwire.NetAddress) { + + // Simulate what OutboundPeerConnected does: acquire + // s.mu. In the real code this is line 4102 of + // server.go. + close(onConnectionReached) + <-onConnectionBlock + + // In the real code, s.mu.Lock() would block here, + // but for this test we just simulate the blocking + // by waiting on onConnectionBlock above. The actual + // deadlock is: worker blocks on s.mu.Lock() inside + // OutboundPeerConnected, while ConnectToPeer holds + // s.mu and blocks on w.cmdChan. + }, + minBackoff: time.Second, + maxBackoff: time.Hour, + staggerDelay: time.Millisecond, + quit: s.quit, + } + + w := newConnWorker(pubStr, false, cfg) + s.persistentWorkers[pubStr] = w + go w.Run() + + addr := &lnwire.NetAddress{ + IdentityKey: pubKey, + Address: &net.TCPAddr{ + IP: net.ParseIP("1.2.3.4"), Port: 9735, + }, + } + + // Send cmdConnect so the worker starts dialing. The mock dialer + // succeeds immediately, so the worker will enter onConnection. + w.cmdChan <- connWorkerMsg{ + cmd: cmdConnect, + addrs: []*lnwire.NetAddress{addr}, + } + + // Wait for the worker to enter onConnection. + select { + case <-onConnectionReached: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for onConnection") + } + + // Now the worker is blocked inside onConnection (simulating the + // s.mu.Lock() call in OutboundPeerConnected). The worker's run loop + // is stalled — it cannot drain cmdChan. + // + // Simulate what ConnectToPeer does: hold s.mu and do a blocking + // send on w.cmdChan. With the worker stalled, cmdChan is empty + // (capacity 1, the earlier cmdConnect was already consumed), so the + // send should succeed... BUT if we first fill the channel, the + // blocking send will deadlock. + // + // To reproduce the exact deadlock, we need the channel to be full. + // In the real scenario, the channel could be full from a concurrent + // peerTerminationWatcher sending cmdConnect. Simulate that: + select { + case w.cmdChan <- connWorkerMsg{cmd: cmdStandDown}: + // Channel is now full. + default: + t.Fatal("expected channel to be empty at this point") + } + + // Now attempt the ConnectToPeer pattern: hold s.mu + blocking send. + // This MUST deadlock (or timeout) because: + // - w.cmdChan is full (we just filled it) + // - The worker can't drain it (blocked in onConnection) + // - In real code, the worker would be blocked on s.mu.Lock() + // which ConnectToPeer holds + deadlockDetected := make(chan struct{}) + go func() { + s.mu.Lock() + // This is the problematic line from server.go:4849. + // It blocks because cmdChan is full and the worker is + // stalled. + w.cmdChan <- connWorkerMsg{ + cmd: cmdConnect, + addrs: []*lnwire.NetAddress{addr}, + } + s.mu.Unlock() + + // If we get here, no deadlock occurred. + close(deadlockDetected) + }() + + // The blocking send should not complete within the timeout because + // the worker is stalled and can't drain the channel. + select { + case <-deadlockDetected: + t.Fatal("expected deadlock but send completed — " + + "test setup is wrong") + case <-time.After(2 * time.Second): + // Deadlock confirmed: the goroutine is stuck on the + // blocking channel send while holding s.mu. + } + + // Clean up: unblock the worker and release everything. + close(onConnectionBlock) + + // Drain the channel so the blocked goroutine can complete. + <-w.cmdChan + + // Wait for the deadlocked goroutine to finish. + select { + case <-deadlockDetected: + case <-time.After(5 * time.Second): + t.Fatal("cleanup: goroutine still stuck") + } + + s.mu.Lock() + delete(s.persistentWorkers, pubStr) + s.mu.Unlock() + + close(s.quit) +} + +// mockConn is a minimal net.Conn for testing. +type mockConn struct{} + +func (m *mockConn) Read(b []byte) (int, error) { return 0, nil } +func (m *mockConn) Write(b []byte) (int, error) { return len(b), nil } +func (m *mockConn) Close() error { return nil } +func (m *mockConn) LocalAddr() net.Addr { return &net.TCPAddr{} } +func (m *mockConn) RemoteAddr() net.Addr { return &net.TCPAddr{} } +func (m *mockConn) SetDeadline(time.Time) error { return nil } +func (m *mockConn) SetReadDeadline(time.Time) error { return nil } +func (m *mockConn) SetWriteDeadline(time.Time) error { return nil } + // TestStopWorkerDropsCmdOnFullChannel demonstrates that stopWorker silently // drops cmdStop when the worker's command channel is already full. The worker // is removed from the map but its goroutine continues running — it becomes