diff --git a/discovery/gossiper.go b/discovery/gossiper.go index bebc1976043..7e79d59d64f 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -56,6 +56,10 @@ const ( // updates that we'll hold onto. maxPrematureUpdates = 100 + // maxPrematureNodeAnnouncements tracks the max amount of premature node + // announcements that we'll hold onto. + maxPrematureNodeAnnouncements = 100 + // maxFutureMessages tracks the max amount of future messages that // we'll hold onto. maxFutureMessages = 1000 @@ -502,6 +506,11 @@ type AuthenticatedGossiper struct { // ChannelAnnouncement for the channel is received. prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg] + // prematureNodeAnnouncements stores NodeAnnouncements for which we + // don't yet know any associated channel. They'll be replayed once + // one of the node's channels is added to the graph. + prematureNodeAnnouncements *lru.Cache[route.Vertex, *cachedNetworkMsg] + // banman tracks our peer's ban status. banman *banman @@ -585,6 +594,9 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: ll maxPrematureUpdates, ), + prematureNodeAnnouncements: lru.NewCache[route.Vertex, *cachedNetworkMsg]( //nolint: ll + maxPrematureNodeAnnouncements, + ), channelMtx: multimutex.NewMutex[uint64](), recentRejects: lru.NewCache[rejectCacheKey, *cachedReject]( maxRejectedUpdates, @@ -2132,6 +2144,78 @@ func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID, return true } +// cachePrematureNodeAnnouncement stores the passed node announcement for later +// processing. We do this when we haven't yet seen any channels for the node and +// therefore can't verify whether the announcement should be accepted. +func (d *AuthenticatedGossiper) cachePrematureNodeAnnouncement( + nodeID route.Vertex, nMsg *networkMsg) { + + pMsg := &processedNetworkMsg{msg: nMsg} + + earlyMsgs, err := d.prematureNodeAnnouncements.Get(nodeID) + switch { + case errors.Is(err, cache.ErrElementNotFound): + _, _ = d.prematureNodeAnnouncements.Put( + nodeID, &cachedNetworkMsg{ + msgs: []*processedNetworkMsg{pMsg}, + }, + ) + + default: + msgs := earlyMsgs.msgs + msgs = append(msgs, pMsg) + _, _ = d.prematureNodeAnnouncements.Put( + nodeID, &cachedNetworkMsg{ + msgs: msgs, + }, + ) + } + + log.Debugf("Caching NodeAnnouncement for node=%x until a channel "+ + "is known", nodeID) +} + +// replayPrematureNodeAnnouncements re-queues any cached node announcements for +// the provided vertices so they can be processed now that a channel exists. +func (d *AuthenticatedGossiper) replayPrematureNodeAnnouncements( + vertices ...route.Vertex) { + + for _, vertex := range vertices { + earlyMsgs, err := d.prematureNodeAnnouncements.Get(vertex) + if errors.Is(err, cache.ErrElementNotFound) { + continue + } + if err != nil { + log.Warnf("Unable to load cached node announcement "+ + "for %x: %v", vertex, err) + continue + } + + for _, cached := range earlyMsgs.msgs { + if cached.processed { + continue + } + + cached.processed = true + + nMsg := cached.msg + d.wg.Add(1) + go func(vertex route.Vertex, replay *networkMsg) { + defer d.wg.Done() + + log.Debugf("Replaying cached NodeAnnouncement "+ + "for node=%x", vertex) + + select { + case d.networkMsgs <- replay: + case <-d.quit: + replay.err <- ErrGossiperShuttingDown + } + }(vertex, nMsg) + } + } +} + // processNetworkAnnouncement processes a new network relate authenticated // channel or node announcement or announcements proofs. If the announcement // didn't affect the internal state due to either being out of date, invalid, @@ -2490,6 +2574,25 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(ctx context.Context, // We'll quickly ask the router if it already has a newer update for // this node so we can skip validating signatures if not required. if d.cfg.Graph.IsStaleNode(ctx, nodeAnn.NodeID, timestamp) { + if nMsg.isRemote { + _, err := d.cfg.Graph.FetchNode(ctx, nodeAnn.NodeID) + switch { + case errors.Is(err, graphdb.ErrGraphNodeNotFound): + vertex := route.Vertex(nodeAnn.NodeID) + d.cachePrematureNodeAnnouncement(vertex, nMsg) + + return nil, false + + case err != nil: + log.Errorf("Unable to fetch node %x: %v", + nodeAnn.NodeID, err) + + nMsg.err <- err + + return nil, false + } + } + log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID) nMsg.err <- nil return nil, true @@ -2499,12 +2602,21 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(ctx context.Context, log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID, err) - if !graph.IsError( - err, - graph.ErrOutdated, - graph.ErrIgnored, - ) { + switch { + // If the node is ignored because we don't have any of its + // channels, we'll cache it for later processing. + case graph.IsError(err, graph.ErrIgnored): + if nMsg.isRemote { + vertex := route.Vertex(nodeAnn.NodeID) + d.cachePrematureNodeAnnouncement(vertex, nMsg) + return nil, false + } + + case graph.IsError(err, graph.ErrOutdated): + // No need to log, we'll just return the error below. + + default: log.Error(err) } @@ -2914,6 +3026,11 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context, channelUpdates = append(channelUpdates, chanMsgs.msgs...) } + d.replayPrematureNodeAnnouncements( + route.Vertex(edge.NodeKey1Bytes), + route.Vertex(edge.NodeKey2Bytes), + ) + // Launch a new goroutine to handle each ChannelUpdate, this is to // ensure we don't block here, as we can handle only one announcement // at a time. diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index 63369f4d662..d911deda89b 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -116,6 +116,32 @@ func (r *mockGraphSource) AddNode(_ context.Context, node *models.Node, r.mu.Lock() defer r.mu.Unlock() + for i := range r.nodes { + existing := &r.nodes[i] + if existing.PubKeyBytes == node.PubKeyBytes { + return nil + } + } + + hasChannel := false + for _, info := range r.infos { + if info.NodeKey1Bytes == node.PubKeyBytes || + info.NodeKey2Bytes == node.PubKeyBytes { + + hasChannel = true + break + } + } + + if !hasChannel { + return graph.NewErrf( + graph.ErrIgnored, + "Ignoring node announcement for node not found in "+ + "channel graph (%x)", + node.PubKeyBytes[:], + ) + } + r.nodes = append(r.nodes, *node) return nil } @@ -2637,10 +2663,21 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { case <-time.After(2 * trickleDelay): } - err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + // We don't have processed the channel announcement yet, so the node + // announcement should be cached and replayed as soon as the channel + // announcement is processed. + errRemoteNodeAnn := tCtx.gossiper.ProcessRemoteAnnouncement( ctx, batch.nodeAnn2, remotePeer, ) - require.NoError(t, err, "unable to process node ann") + select { + case err := <-errRemoteNodeAnn: + require.NoError(t, err, "node announcement returned error "+ + "before channel was known") + t.Fatal("node announcement was processed before channel " + + "was known") + case <-time.After(2 * trickleDelay): + } + select { case <-tCtx.broadcastedMessage: t.Fatal("node announcement was broadcast") @@ -2720,6 +2757,16 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { t.Fatalf("remote update was not processed") } + // The remote node announcement should now also be replayed, as we now + // have the necessary edge entry in the graph. + select { + case err := <-errRemoteNodeAnn: + require.NoError(t, err, "cached node announcement should "+ + "succeed once channel exists") + case <-time.After(time.Second): + t.Fatal("cached node announcement wasn't replayed") + } + // Check that the ChannelEdgePolicy was added to the graph. chanInfo, e1, e2, err = tCtx.router.GetChannelByID( batch.chanUpdAnn1.ShortChannelID, @@ -2735,6 +2782,25 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { t.Fatalf("e2 was nil") } + // We also make sure the node announcement was added to the graph. + remoteNode, err := tCtx.router.FetchNode( + ctx, + batch.nodeAnn2.NodeID, + ) + require.NoError(t, err, "unable to get remote node from router") + if remoteNode == nil { + t.Fatalf("remoteNode was nil") + } + + localNode, err := tCtx.router.FetchNode( + ctx, + batch.nodeAnn1.NodeID, + ) + require.NoError(t, err, "unable to get local node from router") + if localNode == nil { + t.Fatalf("localNode was nil") + } + // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process. err = <-tCtx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn) @@ -2911,6 +2977,20 @@ func TestExtraDataNodeAnnouncementValidation(t *testing.T) { } timestamp := testTimestamp + // We first need to create a channel announcement so that the node + // announcement can be processed. + remoteChanAnn, err := tCtx.createRemoteChannelAnnouncement(0) + require.NoError(t, err, "unable to create chan ann") + + select { + case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + ctx, remoteChanAnn, remotePeer, + ): + require.NoError(t, err, "unable to process remote chan ann") + case <-time.After(2 * time.Second): + t.Fatal("did not process remote chan ann") + } + // We'll create a node announcement that includes a set of opaque data // which we don't know of, but will store anyway in order to ensure // upgrades can flow smoothly in the future. @@ -3096,15 +3176,21 @@ func TestNodeAnnouncementNoChannels(t *testing.T) { require.NoError(t, err, "unable to parse pubkey") remotePeer := &mockPeer{remoteKey, nil, nil, atomic.Bool{}} - // Process the remote node announcement. - select { - case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( + // Process the remote node announcement. Since we don't yet know any of + // the node's channels, the announcement should be cached and not + // resolved immediately. + errChan := tCtx.gossiper.ProcessRemoteAnnouncement( ctx, batch.nodeAnn2, remotePeer, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process remote announcement") + ) + + select { + case err = <-errChan: + require.NoError(t, err, "unable to process announcement") + t.Fatal("node announcement was processed before a channel " + + "was known") + + case <-time.After(100 * time.Millisecond): } - require.NoError(t, err, "unable to process announcement") // Since no channels or node announcements were already in the graph, // the node announcement should be ignored, and not forwarded. @@ -3134,18 +3220,9 @@ func TestNodeAnnouncementNoChannels(t *testing.T) { } require.NoError(t, err, "unable to process announcement") - // Now process the node announcement again. - select { - case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( - ctx, batch.nodeAnn2, remotePeer, - ): - case <-time.After(2 * time.Second): - t.Fatal("did not process remote announcement") - } - require.NoError(t, err, "unable to process announcement") - - // This time the node announcement should be forwarded. The same should - // the channel announcement and update be. + // At this point the cached node announcement should also be replayed, + // causing all three announcements (channel ann, channel update, node + // ann) to be broadcast. for i := 0; i < 3; i++ { select { case <-tCtx.broadcastedMessage: @@ -3154,8 +3231,17 @@ func TestNodeAnnouncementNoChannels(t *testing.T) { } } - // Processing the same node announcement again should be ignored, as it - // is stale. + select { + case err = <-errChan: + require.NoError(t, err, "cached node announcement should "+ + "succeed") + + case <-time.After(2 * time.Second): + t.Fatal("cached node announcement wasn't replayed") + } + + // Now process the node announcement again which should be treated as + // stale. select { case err = <-tCtx.gossiper.ProcessRemoteAnnouncement( ctx, batch.nodeAnn2, remotePeer, @@ -3165,6 +3251,8 @@ func TestNodeAnnouncementNoChannels(t *testing.T) { } require.NoError(t, err, "unable to process announcement") + // Processing the same node announcement again should be ignored, as it + // is stale. select { case <-tCtx.broadcastedMessage: t.Fatal("node announcement was broadcast") diff --git a/graph/db/graph.go b/graph/db/graph.go index 08e287238cf..e725c33c8fa 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -295,6 +295,25 @@ func (c *ChannelGraph) AddNode(ctx context.Context, return nil } +// SetSourceNode sets the source node in the graph database and also adds it +// to the graph cache. +func (c *ChannelGraph) SetSourceNode(ctx context.Context, + node *models.Node) error { + + err := c.V1Store.SetSourceNode(ctx, node) + if err != nil { + return err + } + + if c.graphCache != nil { + c.graphCache.AddNodeFeatures( + node.PubKeyBytes, node.Features, + ) + } + + return nil +} + // DeleteNode starts a new database transaction to remove a vertex/node // from the database according to the node's public key. func (c *ChannelGraph) DeleteNode(ctx context.Context, diff --git a/lncfg/protocol_legacy_on.go b/lncfg/protocol_legacy_on.go index 9b8fd4ebdf7..89e5b5630c1 100644 --- a/lncfg/protocol_legacy_on.go +++ b/lncfg/protocol_legacy_on.go @@ -12,6 +12,10 @@ type LegacyProtocol struct { // LegacyOnionFormat if set to true, then we won't signal // TLVOnionPayloadOptional. As a result, nodes that include us in the // route won't use the new modern onion framing. + // + // NOTE: LND will still be able to decode tlv onion payloads if they + // reach us because the decoder supports both formats althoug we + // explicitly gossip that we don't support it via this config. LegacyOnionFormat bool `long:"onion" description:"force node to not advertise the new modern TLV onion format"` // CommitmentTweak guards if we should use the old legacy commitment