From a8c7b64231f7b1abbf2f59979a07702270c75c6c Mon Sep 17 00:00:00 2001 From: ziggie Date: Thu, 25 Sep 2025 14:33:30 +0200 Subject: [PATCH 1/3] graphdb: add wrapper to SetSourceNode This makes sure also our source node is part of the cache. Currently there is use of the source node but also adding it to the cache seems the right way here. --- graph/db/graph.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/graph/db/graph.go b/graph/db/graph.go index 08e287238cf..e725c33c8fa 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -295,6 +295,25 @@ func (c *ChannelGraph) AddNode(ctx context.Context, return nil } +// SetSourceNode sets the source node in the graph database and also adds it +// to the graph cache. +func (c *ChannelGraph) SetSourceNode(ctx context.Context, + node *models.Node) error { + + err := c.V1Store.SetSourceNode(ctx, node) + if err != nil { + return err + } + + if c.graphCache != nil { + c.graphCache.AddNodeFeatures( + node.PubKeyBytes, node.Features, + ) + } + + return nil +} + // DeleteNode starts a new database transaction to remove a vertex/node // from the database according to the node's public key. func (c *ChannelGraph) DeleteNode(ctx context.Context, From d4c2ae9cfd65f491a708ae9c77ace60ccfe93819 Mon Sep 17 00:00:00 2001 From: ziggie Date: Fri, 26 Sep 2025 13:10:11 +0200 Subject: [PATCH 2/3] gossiper: handle premature NodeAnnouncments During some tests it was reveiled that there might be a race condition where we would receive a node annoucment without having received the Channel Announcment especially for private channels this might be the case because sending the channel announcment and sending the node annoucnment happens by default at the same time (3 blocks by default). Now we cache node annoucments when received and no channel has been found yet and later replay them as soon as a new channel announcment for this channel is received. --- discovery/gossiper.go | 127 +++++++++++++++++++++++++++++++++-- discovery/gossiper_test.go | 134 ++++++++++++++++++++++++++++++------- 2 files changed, 233 insertions(+), 28 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index bebc1976043..7e79d59d64f 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -56,6 +56,10 @@ const ( // updates that we'll hold onto. maxPrematureUpdates = 100 + // maxPrematureNodeAnnouncements tracks the max amount of premature node + // announcements that we'll hold onto. + maxPrematureNodeAnnouncements = 100 + // maxFutureMessages tracks the max amount of future messages that // we'll hold onto. maxFutureMessages = 1000 @@ -502,6 +506,11 @@ type AuthenticatedGossiper struct { // ChannelAnnouncement for the channel is received. prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg] + // prematureNodeAnnouncements stores NodeAnnouncements for which we + // don't yet know any associated channel. They'll be replayed once + // one of the node's channels is added to the graph. + prematureNodeAnnouncements *lru.Cache[route.Vertex, *cachedNetworkMsg] + // banman tracks our peer's ban status. banman *banman @@ -585,6 +594,9 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: ll maxPrematureUpdates, ), + prematureNodeAnnouncements: lru.NewCache[route.Vertex, *cachedNetworkMsg]( //nolint: ll + maxPrematureNodeAnnouncements, + ), channelMtx: multimutex.NewMutex[uint64](), recentRejects: lru.NewCache[rejectCacheKey, *cachedReject]( maxRejectedUpdates, @@ -2132,6 +2144,78 @@ func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID, return true } +// cachePrematureNodeAnnouncement stores the passed node announcement for later +// processing. We do this when we haven't yet seen any channels for the node and +// therefore can't verify whether the announcement should be accepted. +func (d *AuthenticatedGossiper) cachePrematureNodeAnnouncement( + nodeID route.Vertex, nMsg *networkMsg) { + + pMsg := &processedNetworkMsg{msg: nMsg} + + earlyMsgs, err := d.prematureNodeAnnouncements.Get(nodeID) + switch { + case errors.Is(err, cache.ErrElementNotFound): + _, _ = d.prematureNodeAnnouncements.Put( + nodeID, &cachedNetworkMsg{ + msgs: []*processedNetworkMsg{pMsg}, + }, + ) + + default: + msgs := earlyMsgs.msgs + msgs = append(msgs, pMsg) + _, _ = d.prematureNodeAnnouncements.Put( + nodeID, &cachedNetworkMsg{ + msgs: msgs, + }, + ) + } + + log.Debugf("Caching NodeAnnouncement for node=%x until a channel "+ + "is known", nodeID) +} + +// replayPrematureNodeAnnouncements re-queues any cached node announcements for +// the provided vertices so they can be processed now that a channel exists. +func (d *AuthenticatedGossiper) replayPrematureNodeAnnouncements( + vertices ...route.Vertex) { + + for _, vertex := range vertices { + earlyMsgs, err := d.prematureNodeAnnouncements.Get(vertex) + if errors.Is(err, cache.ErrElementNotFound) { + continue + } + if err != nil { + log.Warnf("Unable to load cached node announcement "+ + "for %x: %v", vertex, err) + continue + } + + for _, cached := range earlyMsgs.msgs { + if cached.processed { + continue + } + + cached.processed = true + + nMsg := cached.msg + d.wg.Add(1) + go func(vertex route.Vertex, replay *networkMsg) { + defer d.wg.Done() + + log.Debugf("Replaying cached NodeAnnouncement "+ + "for node=%x", vertex) + + select { + case d.networkMsgs <- replay: + case <-d.quit: + replay.err <- ErrGossiperShuttingDown + } + }(vertex, nMsg) + } + } +} + // processNetworkAnnouncement processes a new network relate authenticated // channel or node announcement or announcements proofs. If the announcement // didn't affect the internal state due to either being out of date, invalid, @@ -2490,6 +2574,25 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(ctx context.Context, // We'll quickly ask the router if it already has a newer update for // this node so we can skip validating signatures if not required. if d.cfg.Graph.IsStaleNode(ctx, nodeAnn.NodeID, timestamp) { + if nMsg.isRemote { + _, err := d.cfg.Graph.FetchNode(ctx, nodeAnn.NodeID) + switch { + case errors.Is(err, graphdb.ErrGraphNodeNotFound): + vertex := route.Vertex(nodeAnn.NodeID) + d.cachePrematureNodeAnnouncement(vertex, nMsg) + + return nil, false + + case err != nil: + log.Errorf("Unable to fetch node %x: %v", + nodeAnn.NodeID, err) + + nMsg.err <- err + + return nil, false + } + } + log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID) nMsg.err <- nil return nil, true @@ -2499,12 +2602,21 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(ctx context.Context, log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID, err) - if !graph.IsError( - err, - graph.ErrOutdated, - graph.ErrIgnored, - ) { + switch { + // If the node is ignored because we don't have any of its + // channels, we'll cache it for later processing. + case graph.IsError(err, graph.ErrIgnored): + if nMsg.isRemote { + vertex := route.Vertex(nodeAnn.NodeID) + d.cachePrematureNodeAnnouncement(vertex, nMsg) + return nil, false + } + + case graph.IsError(err, graph.ErrOutdated): + // No need to log, we'll just return the error below. + + default: log.Error(err) } @@ -2914,6 +3026,11 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context, channelUpdates = append(channelUpdates, chanMsgs.msgs...) } + d.replayPrematureNodeAnnouncements( + route.Vertex(edge.NodeKey1Bytes), + route.Vertex(edge.NodeKey2Bytes), + ) + // Launch a new goroutine to handle each ChannelUpdate, this is to // ensure we don't block here, as we can handle only one announcement // at a time. diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 63369f4d662..d911deda89b 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -116,6 +116,32 @@ func (r *mockGraphSource) AddNode(_ context.Context, node *models.Node, r.mu.Lock() defer r.mu.Unlock() + for i := range r.nodes { + existing := &r.nodes[i] + if existing.PubKeyBytes == node.PubKeyBytes { + return nil + } + } + + hasChannel := false + for _, info := range r.infos { + if info.NodeKey1Bytes == node.PubKeyBytes || + info.NodeKey2Bytes == node.PubKeyBytes { + + hasChannel = true + break + } + } + + if !hasChannel { + return graph.NewErrf( + graph.ErrIgnored, + "Ignoring node announcement for node not found in "+ + "channel graph (%x)", + node.PubKeyBytes[:], + ) + } + r.nodes = append(r.nodes, *node) return nil } @@ -2637,10 +2663,21 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { case <-time.After(2 * trickleDelay): } - err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + // We don't have processed the channel announcement yet, so the node + // announcement should be cached and replayed as soon as the channel + // announcement is processed. + errRemoteNodeAnn := tCtx.gossiper.ProcessRemoteAnnouncement( ctx, batch.nodeAnn2, remotePeer, ) - require.NoError(t, err, "unable to process node ann") + select { + case err := <-errRemoteNodeAnn: + require.NoError(t, err, "node announcement returned error "+ + "before channel was known") + t.Fatal("node announcement was processed before channel " + + "was known") + case <-time.After(2 * trickleDelay): + } + select { case <-tCtx.broadcastedMessage: t.Fatal("node announcement was broadcast") @@ -2720,6 +2757,16 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { t.Fatalf("remote update was not processed") } + // The remote node announcement should now also be replayed, as we now + // have the necessary edge entry in the graph. + select { + case err := <-errRemoteNodeAnn: + require.NoError(t, err, "cached node announcement should "+ + "succeed once channel exists") + case <-time.After(time.Second): + t.Fatal("cached node announcement wasn't replayed") + } + // Check that the ChannelEdgePolicy was added to the graph. chanInfo, e1, e2, err = tCtx.router.GetChannelByID( batch.chanUpdAnn1.ShortChannelID, @@ -2735,6 +2782,25 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { t.Fatalf("e2 was nil") } + // We also make sure the node announcement was added to the graph. + remoteNode, err := tCtx.router.FetchNode( + ctx, + batch.nodeAnn2.NodeID, + ) + require.NoError(t, err, "unable to get remote node from router") + if remoteNode == nil { + t.Fatalf("remoteNode was nil") + } + + localNode, err := tCtx.router.FetchNode( + ctx, + batch.nodeAnn1.NodeID, + ) + require.NoError(t, err, "unable to get local node from router") + if localNode == nil { + t.Fatalf("localNode was nil") + } + // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process. err = <-tCtx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn) @@ -2911,6 +2977,20 @@ func TestExtraDataNodeAnnouncementValidation(t *testing.T) { } timestamp := testTimestamp + // We first need to create a channel announcement so that the node + // announcement can be processed. + remoteChanAnn, err := tCtx.createRemoteChannelAnnouncement(0) + require.NoError(t, err, "unable to create chan ann") + + select { + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, remoteChanAnn, remotePeer, + ): + require.NoError(t, err, "unable to process remote chan ann") + case <-time.After(2 * time.Second): + t.Fatal("did not process remote chan ann") + } + // We'll create a node announcement that includes a set of opaque data // which we don't know of, but will store anyway in order to ensure // upgrades can flow smoothly in the future. @@ -3096,15 +3176,21 @@ func TestNodeAnnouncementNoChannels(t *testing.T) { require.NoError(t, err, "unable to parse pubkey") remotePeer := &mockPeer{remoteKey, nil, nil, atomic.Bool{}} - // Process the remote node announcement. - select { - case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + // Process the remote node announcement. Since we don't yet know any of + // the node's channels, the announcement should be cached and not + // resolved immediately. + errChan := tCtx.gossiper.ProcessRemoteAnnouncement( ctx, batch.nodeAnn2, remotePeer, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process remote announcement") + ) + + select { + case err = <-errChan: + require.NoError(t, err, "unable to process announcement") + t.Fatal("node announcement was processed before a channel " + + "was known") + + case <-time.After(100 * time.Millisecond): } - require.NoError(t, err, "unable to process announcement") // Since no channels or node announcements were already in the graph, // the node announcement should be ignored, and not forwarded. @@ -3134,18 +3220,9 @@ func TestNodeAnnouncementNoChannels(t *testing.T) { } require.NoError(t, err, "unable to process announcement") - // Now process the node announcement again. - select { - case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( - ctx, batch.nodeAnn2, remotePeer, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process remote announcement") - } - require.NoError(t, err, "unable to process announcement") - - // This time the node announcement should be forwarded. The same should - // the channel announcement and update be. + // At this point the cached node announcement should also be replayed, + // causing all three announcements (channel ann, channel update, node + // ann) to be broadcast. for i := 0; i < 3; i++ { select { case <-tCtx.broadcastedMessage: @@ -3154,8 +3231,17 @@ func TestNodeAnnouncementNoChannels(t *testing.T) { } } - // Processing the same node announcement again should be ignored, as it - // is stale. + select { + case err = <-errChan: + require.NoError(t, err, "cached node announcement should "+ + "succeed") + + case <-time.After(2 * time.Second): + t.Fatal("cached node announcement wasn't replayed") + } + + // Now process the node announcement again which should be treated as + // stale. select { case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( ctx, batch.nodeAnn2, remotePeer, @@ -3165,6 +3251,8 @@ func TestNodeAnnouncementNoChannels(t *testing.T) { } require.NoError(t, err, "unable to process announcement") + // Processing the same node announcement again should be ignored, as it + // is stale. select { case <-tCtx.broadcastedMessage: t.Fatal("node announcement was broadcast") From 58e6b6bfef9af9c571ba166bbf6b398aacf4ffcd Mon Sep 17 00:00:00 2001 From: ziggie Date: Fri, 26 Sep 2025 14:25:23 +0200 Subject: [PATCH 3/3] lncfg: add comment to tlv legacy option --- lncfg/protocol_legacy_on.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lncfg/protocol_legacy_on.go b/lncfg/protocol_legacy_on.go index 9b8fd4ebdf7..89e5b5630c1 100644 --- a/lncfg/protocol_legacy_on.go +++ b/lncfg/protocol_legacy_on.go @@ -12,6 +12,10 @@ type LegacyProtocol struct { // LegacyOnionFormat if set to true, then we won't signal // TLVOnionPayloadOptional. As a result, nodes that include us in the // route won't use the new modern onion framing. + // + // NOTE: LND will still be able to decode tlv onion payloads if they + // reach us because the decoder supports both formats althoug we + // explicitly gossip that we don't support it via this config. LegacyOnionFormat bool `long:"onion" description:"force node to not advertise the new modern TLV onion format"` // CommitmentTweak guards if we should use the old legacy commitment