Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 122 additions & 5 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ const (
// updates that we'll hold onto.
maxPrematureUpdates = 100

// maxPrematureNodeAnnouncements tracks the max amount of premature node
// announcements that we'll hold onto.
maxPrematureNodeAnnouncements = 100

// maxFutureMessages tracks the max amount of future messages that
// we'll hold onto.
maxFutureMessages = 1000
Expand Down Expand Up @@ -502,6 +506,11 @@ type AuthenticatedGossiper struct {
// ChannelAnnouncement for the channel is received.
prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg]

// prematureNodeAnnouncements stores NodeAnnouncements for which we
// don't yet know any associated channel. They'll be replayed once
// one of the node's channels is added to the graph.
prematureNodeAnnouncements *lru.Cache[route.Vertex, *cachedNetworkMsg]

// banman tracks our peer's ban status.
banman *banman

Expand Down Expand Up @@ -585,6 +594,9 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: ll
maxPrematureUpdates,
),
prematureNodeAnnouncements: lru.NewCache[route.Vertex, *cachedNetworkMsg]( //nolint: ll
maxPrematureNodeAnnouncements,
),
channelMtx: multimutex.NewMutex[uint64](),
recentRejects: lru.NewCache[rejectCacheKey, *cachedReject](
maxRejectedUpdates,
Expand Down Expand Up @@ -2132,6 +2144,78 @@ func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
return true
}

// cachePrematureNodeAnnouncement stores the passed node announcement for later
// processing. We do this when we haven't yet seen any channels for the node and
// therefore can't verify whether the announcement should be accepted.
func (d *AuthenticatedGossiper) cachePrematureNodeAnnouncement(
nodeID route.Vertex, nMsg *networkMsg) {

pMsg := &processedNetworkMsg{msg: nMsg}

earlyMsgs, err := d.prematureNodeAnnouncements.Get(nodeID)
switch {
case errors.Is(err, cache.ErrElementNotFound):
_, _ = d.prematureNodeAnnouncements.Put(
nodeID, &cachedNetworkMsg{
msgs: []*processedNetworkMsg{pMsg},
},
)

default:
msgs := earlyMsgs.msgs
msgs = append(msgs, pMsg)
_, _ = d.prematureNodeAnnouncements.Put(
nodeID, &cachedNetworkMsg{
msgs: msgs,
},
)
}

log.Debugf("Caching NodeAnnouncement for node=%x until a channel "+
"is known", nodeID)
}

// replayPrematureNodeAnnouncements re-queues any cached node announcements for
// the provided vertices so they can be processed now that a channel exists.
func (d *AuthenticatedGossiper) replayPrematureNodeAnnouncements(
vertices ...route.Vertex) {

for _, vertex := range vertices {
earlyMsgs, err := d.prematureNodeAnnouncements.Get(vertex)
if errors.Is(err, cache.ErrElementNotFound) {
continue
}
if err != nil {
log.Warnf("Unable to load cached node announcement "+
"for %x: %v", vertex, err)
continue
}

for _, cached := range earlyMsgs.msgs {
if cached.processed {
continue
}

cached.processed = true

nMsg := cached.msg
d.wg.Add(1)
go func(vertex route.Vertex, replay *networkMsg) {
defer d.wg.Done()

log.Debugf("Replaying cached NodeAnnouncement "+
"for node=%x", vertex)

select {
case d.networkMsgs <- replay:
case <-d.quit:
replay.err <- ErrGossiperShuttingDown
}
}(vertex, nMsg)
}
}
}

// processNetworkAnnouncement processes a new network relate authenticated
// channel or node announcement or announcements proofs. If the announcement
// didn't affect the internal state due to either being out of date, invalid,
Expand Down Expand Up @@ -2490,6 +2574,25 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(ctx context.Context,
// We'll quickly ask the router if it already has a newer update for
// this node so we can skip validating signatures if not required.
if d.cfg.Graph.IsStaleNode(ctx, nodeAnn.NodeID, timestamp) {
if nMsg.isRemote {
_, err := d.cfg.Graph.FetchNode(ctx, nodeAnn.NodeID)
switch {
case errors.Is(err, graphdb.ErrGraphNodeNotFound):
vertex := route.Vertex(nodeAnn.NodeID)
d.cachePrematureNodeAnnouncement(vertex, nMsg)

return nil, false

case err != nil:
log.Errorf("Unable to fetch node %x: %v",
nodeAnn.NodeID, err)

nMsg.err <- err

return nil, false
}
}

log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID)
nMsg.err <- nil
return nil, true
Expand All @@ -2499,12 +2602,21 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(ctx context.Context,
log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
err)

if !graph.IsError(
err,
graph.ErrOutdated,
graph.ErrIgnored,
) {
switch {
// If the node is ignored because we don't have any of its
// channels, we'll cache it for later processing.
case graph.IsError(err, graph.ErrIgnored):
if nMsg.isRemote {
vertex := route.Vertex(nodeAnn.NodeID)
d.cachePrematureNodeAnnouncement(vertex, nMsg)

return nil, false
}

case graph.IsError(err, graph.ErrOutdated):
// No need to log, we'll just return the error below.

default:
log.Error(err)
}

Expand Down Expand Up @@ -2914,6 +3026,11 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(ctx context.Context,
channelUpdates = append(channelUpdates, chanMsgs.msgs...)
}

d.replayPrematureNodeAnnouncements(
route.Vertex(edge.NodeKey1Bytes),
route.Vertex(edge.NodeKey2Bytes),
)

// Launch a new goroutine to handle each ChannelUpdate, this is to
// ensure we don't block here, as we can handle only one announcement
// at a time.
Expand Down
134 changes: 111 additions & 23 deletions discovery/gossiper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,32 @@ func (r *mockGraphSource) AddNode(_ context.Context, node *models.Node,
r.mu.Lock()
defer r.mu.Unlock()

for i := range r.nodes {
existing := &r.nodes[i]
if existing.PubKeyBytes == node.PubKeyBytes {
return nil
}
}

hasChannel := false
for _, info := range r.infos {
if info.NodeKey1Bytes == node.PubKeyBytes ||
info.NodeKey2Bytes == node.PubKeyBytes {

hasChannel = true
break
}
}

if !hasChannel {
return graph.NewErrf(
graph.ErrIgnored,
"Ignoring node announcement for node not found in "+
"channel graph (%x)",
node.PubKeyBytes[:],
)
}

r.nodes = append(r.nodes, *node)
return nil
}
Expand Down Expand Up @@ -2637,10 +2663,21 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
case <-time.After(2 * trickleDelay):
}

err = <-tCtx.gossiper.ProcessRemoteAnnouncement(
// We don't have processed the channel announcement yet, so the node
// announcement should be cached and replayed as soon as the channel
// announcement is processed.
errRemoteNodeAnn := tCtx.gossiper.ProcessRemoteAnnouncement(
ctx, batch.nodeAnn2, remotePeer,
)
require.NoError(t, err, "unable to process node ann")
select {
case err := <-errRemoteNodeAnn:
require.NoError(t, err, "node announcement returned error "+
"before channel was known")
t.Fatal("node announcement was processed before channel " +
"was known")
case <-time.After(2 * trickleDelay):
}

select {
case <-tCtx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
Expand Down Expand Up @@ -2720,6 +2757,16 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
t.Fatalf("remote update was not processed")
}

// The remote node announcement should now also be replayed, as we now
// have the necessary edge entry in the graph.
select {
case err := <-errRemoteNodeAnn:
require.NoError(t, err, "cached node announcement should "+
"succeed once channel exists")
case <-time.After(time.Second):
t.Fatal("cached node announcement wasn't replayed")
}

// Check that the ChannelEdgePolicy was added to the graph.
chanInfo, e1, e2, err = tCtx.router.GetChannelByID(
batch.chanUpdAnn1.ShortChannelID,
Expand All @@ -2735,6 +2782,25 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
t.Fatalf("e2 was nil")
}

// We also make sure the node announcement was added to the graph.
remoteNode, err := tCtx.router.FetchNode(
ctx,
batch.nodeAnn2.NodeID,
)
require.NoError(t, err, "unable to get remote node from router")
if remoteNode == nil {
t.Fatalf("remoteNode was nil")
}

localNode, err := tCtx.router.FetchNode(
ctx,
batch.nodeAnn1.NodeID,
)
require.NoError(t, err, "unable to get local node from router")
if localNode == nil {
t.Fatalf("localNode was nil")
}

// Pretending that we receive local channel announcement from funding
// manager, thereby kick off the announcement exchange process.
err = <-tCtx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn)
Expand Down Expand Up @@ -2911,6 +2977,20 @@ func TestExtraDataNodeAnnouncementValidation(t *testing.T) {
}
timestamp := testTimestamp

// We first need to create a channel announcement so that the node
// announcement can be processed.
remoteChanAnn, err := tCtx.createRemoteChannelAnnouncement(0)
require.NoError(t, err, "unable to create chan ann")

select {
case err = <-tCtx.gossiper.ProcessRemoteAnnouncement(
ctx, remoteChanAnn, remotePeer,
):
require.NoError(t, err, "unable to process remote chan ann")
case <-time.After(2 * time.Second):
t.Fatal("did not process remote chan ann")
}

// We'll create a node announcement that includes a set of opaque data
// which we don't know of, but will store anyway in order to ensure
// upgrades can flow smoothly in the future.
Expand Down Expand Up @@ -3096,15 +3176,21 @@ func TestNodeAnnouncementNoChannels(t *testing.T) {
require.NoError(t, err, "unable to parse pubkey")
remotePeer := &mockPeer{remoteKey, nil, nil, atomic.Bool{}}

// Process the remote node announcement.
select {
case err = <-tCtx.gossiper.ProcessRemoteAnnouncement(
// Process the remote node announcement. Since we don't yet know any of
// the node's channels, the announcement should be cached and not
// resolved immediately.
errChan := tCtx.gossiper.ProcessRemoteAnnouncement(
ctx, batch.nodeAnn2, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
)

select {
case err = <-errChan:
require.NoError(t, err, "unable to process announcement")
t.Fatal("node announcement was processed before a channel " +
"was known")

case <-time.After(100 * time.Millisecond):
}
require.NoError(t, err, "unable to process announcement")

// Since no channels or node announcements were already in the graph,
// the node announcement should be ignored, and not forwarded.
Expand Down Expand Up @@ -3134,18 +3220,9 @@ func TestNodeAnnouncementNoChannels(t *testing.T) {
}
require.NoError(t, err, "unable to process announcement")

// Now process the node announcement again.
select {
case err = <-tCtx.gossiper.ProcessRemoteAnnouncement(
ctx, batch.nodeAnn2, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
require.NoError(t, err, "unable to process announcement")

// This time the node announcement should be forwarded. The same should
// the channel announcement and update be.
// At this point the cached node announcement should also be replayed,
// causing all three announcements (channel ann, channel update, node
// ann) to be broadcast.
for i := 0; i < 3; i++ {
select {
case <-tCtx.broadcastedMessage:
Expand All @@ -3154,8 +3231,17 @@ func TestNodeAnnouncementNoChannels(t *testing.T) {
}
}

// Processing the same node announcement again should be ignored, as it
// is stale.
select {
case err = <-errChan:
require.NoError(t, err, "cached node announcement should "+
"succeed")

case <-time.After(2 * time.Second):
t.Fatal("cached node announcement wasn't replayed")
}

// Now process the node announcement again which should be treated as
// stale.
select {
case err = <-tCtx.gossiper.ProcessRemoteAnnouncement(
ctx, batch.nodeAnn2, remotePeer,
Expand All @@ -3165,6 +3251,8 @@ func TestNodeAnnouncementNoChannels(t *testing.T) {
}
require.NoError(t, err, "unable to process announcement")

// Processing the same node announcement again should be ignored, as it
// is stale.
select {
case <-tCtx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
Expand Down
Loading
Loading