From 3f84abab8d48d36b49f4d02052bf20055bb3c589 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 7 May 2026 14:03:23 -0700 Subject: [PATCH 01/10] graph/db: fix zero ChannelID in zombie ChannelEdgeInfo When FetchChannelEdgesByID hits a zombie edge, it constructs the partial ChannelEdgeInfo it returns alongside ErrZombieEdge with a hard-coded ChannelID of zero instead of the actual channel ID that was looked up. Callers such as the gossiper's processZombieUpdate receive this struct and may use the ChannelID field; returning zero is incorrect and could mask bugs in downstream code. Pass the looked-up chanID through to NewV1Channel / NewV2Channel so the zombie ChannelEdgeInfo carries the correct ChannelID. --- graph/db/sql_store.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index f4824d75e1b..fcf4b700cef 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -2547,12 +2547,12 @@ func (s *SQLStore) FetchChannelEdgesByID(ctx context.Context, switch v { case gossipV1: edge, err = models.NewV1Channel( - 0, chainhash.Hash{}, node1, + chanID, chainhash.Hash{}, node1, node2, &models.ChannelV1Fields{}, ) case gossipV2: edge, err = models.NewV2Channel( - 0, chainhash.Hash{}, node1, + chanID, chainhash.Hash{}, node1, node2, &models.ChannelV2Fields{}, ) } From 34f1ee9abe241c8cb78d9c3dd263421d399d1495 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 7 Apr 2026 08:03:55 +0545 Subject: [PATCH 02/10] sqldb: add preferred-node and preferred-channel mapping tables Add two precomputed mapping tables that track the "best" gossip version for each unique node (pub_key) and channel (SCID): - graph_preferred_nodes: pub_key -> node_id - graph_preferred_channels: scid -> channel_id Priority for nodes: v2 announced > v1 announced > v2 shell > v1 shell. Priority for channels: v2 with policies > v1 with policies > v2 > v1. These tables enable simple indexed-join queries for cross-version traversal (ForEachNode, ForEachChannel, ForEachNodeDirectedChannel) without expensive per-row COALESCE subqueries. The tables are populated from existing data during the migration and maintained by upsert/delete queries on every write path (added in the next commit). --- sqldb/migrations.go | 5 + sqldb/sqlc/graph.sql.go | 281 ++++++++++++++++++ .../000016_graph_preferred_lookups.down.sql | 2 + .../000016_graph_preferred_lookups.up.sql | 83 ++++++ sqldb/sqlc/models.go | 10 + sqldb/sqlc/querier.go | 8 + sqldb/sqlc/queries/graph.sql | 105 +++++++ 7 files changed, 494 insertions(+) create mode 100644 sqldb/sqlc/migrations/000016_graph_preferred_lookups.down.sql create mode 100644 sqldb/sqlc/migrations/000016_graph_preferred_lookups.up.sql diff --git a/sqldb/migrations.go b/sqldb/migrations.go index 241e5c0d683..afee7220294 100644 --- a/sqldb/migrations.go +++ b/sqldb/migrations.go @@ -136,6 +136,11 @@ var ( Version: 18, SchemaVersion: 15, }, + { + Name: "000016_graph_preferred_lookups", + Version: 19, + SchemaVersion: 16, + }, }, migrationAdditions...) // ErrMigrationMismatch is returned when a migrated record does not diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 703afd8f458..61a769c24d6 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -4217,6 +4217,236 @@ func (q *Queries) ListNodesPaginated(ctx context.Context, arg ListNodesPaginated return items, nil } +const listPreferredChannelsPaginated = `-- name: ListPreferredChannelsPaginated :many +SELECT + c.id, c.version, c.scid, c.node_id_1, c.node_id_2, c.outpoint, c.capacity, c.bitcoin_key_1, c.bitcoin_key_2, c.node_1_signature, c.node_2_signature, c.bitcoin_1_signature, c.bitcoin_2_signature, c.signature, c.funding_pk_script, c.merkle_root_hash, + + -- Join node pubkeys + n1.pub_key AS node1_pubkey, + n2.pub_key AS node2_pubkey, + + -- Node 1 policy + cp1.id AS policy_1_id, + cp1.node_id AS policy_1_node_id, + cp1.version AS policy_1_version, + cp1.timelock AS policy_1_timelock, + cp1.fee_ppm AS policy_1_fee_ppm, + cp1.base_fee_msat AS policy_1_base_fee_msat, + cp1.min_htlc_msat AS policy_1_min_htlc_msat, + cp1.max_htlc_msat AS policy_1_max_htlc_msat, + cp1.last_update AS policy_1_last_update, + cp1.disabled AS policy_1_disabled, + cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat, + cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat, + cp1.message_flags AS policy1_message_flags, + cp1.channel_flags AS policy1_channel_flags, + cp1.block_height AS policy1_block_height, + cp1.disable_flags AS policy1_disable_flags, + cp1.signature AS policy_1_signature, + + -- Node 2 policy + cp2.id AS policy_2_id, + cp2.node_id AS policy_2_node_id, + cp2.version AS policy_2_version, + cp2.timelock AS policy_2_timelock, + cp2.fee_ppm AS policy_2_fee_ppm, + cp2.base_fee_msat AS policy_2_base_fee_msat, + cp2.min_htlc_msat AS policy_2_min_htlc_msat, + cp2.max_htlc_msat AS policy_2_max_htlc_msat, + cp2.last_update AS policy_2_last_update, + cp2.disabled AS policy_2_disabled, + cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat, + cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat, + cp2.message_flags AS policy2_message_flags, + cp2.channel_flags AS policy2_channel_flags, + cp2.signature AS policy_2_signature, + cp2.block_height AS policy_2_block_height, + cp2.disable_flags AS policy_2_disable_flags + +FROM graph_preferred_channels pc +JOIN graph_channels c ON c.id = pc.channel_id +JOIN graph_nodes n1 ON c.node_id_1 = n1.id +JOIN graph_nodes n2 ON c.node_id_2 = n2.id +LEFT JOIN graph_channel_policies cp1 + ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version +LEFT JOIN graph_channel_policies cp2 + ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version +WHERE pc.scid > $1 +ORDER BY pc.scid +LIMIT $2 +` + +type ListPreferredChannelsPaginatedParams struct { + Scid []byte + Limit int32 +} + +type ListPreferredChannelsPaginatedRow struct { + GraphChannel GraphChannel + Node1Pubkey []byte + Node2Pubkey []byte + Policy1ID sql.NullInt64 + Policy1NodeID sql.NullInt64 + Policy1Version sql.NullInt16 + Policy1Timelock sql.NullInt32 + Policy1FeePpm sql.NullInt64 + Policy1BaseFeeMsat sql.NullInt64 + Policy1MinHtlcMsat sql.NullInt64 + Policy1MaxHtlcMsat sql.NullInt64 + Policy1LastUpdate sql.NullInt64 + Policy1Disabled sql.NullBool + Policy1InboundBaseFeeMsat sql.NullInt64 + Policy1InboundFeeRateMilliMsat sql.NullInt64 + Policy1MessageFlags sql.NullInt16 + Policy1ChannelFlags sql.NullInt16 + Policy1BlockHeight sql.NullInt64 + Policy1DisableFlags sql.NullInt16 + Policy1Signature []byte + Policy2ID sql.NullInt64 + Policy2NodeID sql.NullInt64 + Policy2Version sql.NullInt16 + Policy2Timelock sql.NullInt32 + Policy2FeePpm sql.NullInt64 + Policy2BaseFeeMsat sql.NullInt64 + Policy2MinHtlcMsat sql.NullInt64 + Policy2MaxHtlcMsat sql.NullInt64 + Policy2LastUpdate sql.NullInt64 + Policy2Disabled sql.NullBool + Policy2InboundBaseFeeMsat sql.NullInt64 + Policy2InboundFeeRateMilliMsat sql.NullInt64 + Policy2MessageFlags sql.NullInt16 + Policy2ChannelFlags sql.NullInt16 + Policy2Signature []byte + Policy2BlockHeight sql.NullInt64 + Policy2DisableFlags sql.NullInt16 +} + +func (q *Queries) ListPreferredChannelsPaginated(ctx context.Context, arg ListPreferredChannelsPaginatedParams) ([]ListPreferredChannelsPaginatedRow, error) { + rows, err := q.db.QueryContext(ctx, listPreferredChannelsPaginated, arg.Scid, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ListPreferredChannelsPaginatedRow + for rows.Next() { + var i ListPreferredChannelsPaginatedRow + if err := rows.Scan( + &i.GraphChannel.ID, + &i.GraphChannel.Version, + &i.GraphChannel.Scid, + &i.GraphChannel.NodeID1, + &i.GraphChannel.NodeID2, + &i.GraphChannel.Outpoint, + &i.GraphChannel.Capacity, + &i.GraphChannel.BitcoinKey1, + &i.GraphChannel.BitcoinKey2, + &i.GraphChannel.Node1Signature, + &i.GraphChannel.Node2Signature, + &i.GraphChannel.Bitcoin1Signature, + &i.GraphChannel.Bitcoin2Signature, + &i.GraphChannel.Signature, + &i.GraphChannel.FundingPkScript, + &i.GraphChannel.MerkleRootHash, + &i.Node1Pubkey, + &i.Node2Pubkey, + &i.Policy1ID, + &i.Policy1NodeID, + &i.Policy1Version, + &i.Policy1Timelock, + &i.Policy1FeePpm, + &i.Policy1BaseFeeMsat, + &i.Policy1MinHtlcMsat, + &i.Policy1MaxHtlcMsat, + &i.Policy1LastUpdate, + &i.Policy1Disabled, + &i.Policy1InboundBaseFeeMsat, + &i.Policy1InboundFeeRateMilliMsat, + &i.Policy1MessageFlags, + &i.Policy1ChannelFlags, + &i.Policy1BlockHeight, + &i.Policy1DisableFlags, + &i.Policy1Signature, + &i.Policy2ID, + &i.Policy2NodeID, + &i.Policy2Version, + &i.Policy2Timelock, + &i.Policy2FeePpm, + &i.Policy2BaseFeeMsat, + &i.Policy2MinHtlcMsat, + &i.Policy2MaxHtlcMsat, + &i.Policy2LastUpdate, + &i.Policy2Disabled, + &i.Policy2InboundBaseFeeMsat, + &i.Policy2InboundFeeRateMilliMsat, + &i.Policy2MessageFlags, + &i.Policy2ChannelFlags, + &i.Policy2Signature, + &i.Policy2BlockHeight, + &i.Policy2DisableFlags, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listPreferredNodesPaginated = `-- name: ListPreferredNodesPaginated :many +SELECT n.id, n.version, n.pub_key, n.alias, n.last_update, n.color, n.signature, n.block_height +FROM graph_preferred_nodes pn +JOIN graph_nodes n ON n.id = pn.node_id +WHERE pn.pub_key > $1 +ORDER BY pn.pub_key +LIMIT $2 +` + +type ListPreferredNodesPaginatedParams struct { + PubKey []byte + Limit int32 +} + +type ListPreferredNodesPaginatedRow struct { + GraphNode GraphNode +} + +func (q *Queries) ListPreferredNodesPaginated(ctx context.Context, arg ListPreferredNodesPaginatedParams) ([]ListPreferredNodesPaginatedRow, error) { + rows, err := q.db.QueryContext(ctx, listPreferredNodesPaginated, arg.PubKey, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ListPreferredNodesPaginatedRow + for rows.Next() { + var i ListPreferredNodesPaginatedRow + if err := rows.Scan( + &i.GraphNode.ID, + &i.GraphNode.Version, + &i.GraphNode.PubKey, + &i.GraphNode.Alias, + &i.GraphNode.LastUpdate, + &i.GraphNode.Color, + &i.GraphNode.Signature, + &i.GraphNode.BlockHeight, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const nodeExists = `-- name: NodeExists :one SELECT EXISTS ( SELECT 1 @@ -4497,6 +4727,57 @@ func (q *Queries) UpsertNodeExtraType(ctx context.Context, arg UpsertNodeExtraTy return err } +const upsertPreferredChannel = `-- name: UpsertPreferredChannel :exec +/* ───────────────────────────────────────────── + graph_preferred_channels table queries + ───────────────────────────────────────────── +*/ + +INSERT INTO graph_preferred_channels (scid, channel_id) +SELECT c.scid, c.id +FROM graph_channels c +WHERE c.scid = $1 +ORDER BY + EXISTS ( + SELECT 1 FROM graph_channel_policies p + WHERE p.channel_id = c.id AND p.version = c.version + ) DESC, + c.version DESC +LIMIT 1 +ON CONFLICT (scid) DO UPDATE SET channel_id = EXCLUDED.channel_id +` + +// Recompute the preferred channel for a given SCID and upsert the result. +// Priority: v2 with policies > v1 with policies > v2 bare > v1 bare. +func (q *Queries) UpsertPreferredChannel(ctx context.Context, scid []byte) error { + _, err := q.db.ExecContext(ctx, upsertPreferredChannel, scid) + return err +} + +const upsertPreferredNode = `-- name: UpsertPreferredNode :exec +/* ───────────────────────────────────────────── + graph_preferred_nodes table queries + ───────────────────────────────────────────── +*/ + +INSERT INTO graph_preferred_nodes (pub_key, node_id) +SELECT n.pub_key, n.id +FROM graph_nodes n +WHERE n.pub_key = $1 +ORDER BY + (COALESCE(length(n.signature), 0) > 0) DESC, + n.version DESC +LIMIT 1 +ON CONFLICT (pub_key) DO UPDATE SET node_id = EXCLUDED.node_id +` + +// Recompute the preferred node for a given pub_key and upsert the result. +// Priority: v2 announced > v1 announced > v2 shell > v1 shell. +func (q *Queries) UpsertPreferredNode(ctx context.Context, pubKey []byte) error { + _, err := q.db.ExecContext(ctx, upsertPreferredNode, pubKey) + return err +} + const upsertPruneLogEntry = `-- name: UpsertPruneLogEntry :exec /* ───────────────────────────���───────────────── graph_prune_log table queries diff --git a/sqldb/sqlc/migrations/000016_graph_preferred_lookups.down.sql b/sqldb/sqlc/migrations/000016_graph_preferred_lookups.down.sql new file mode 100644 index 00000000000..2cfc0cd5af6 --- /dev/null +++ b/sqldb/sqlc/migrations/000016_graph_preferred_lookups.down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS graph_preferred_channels; +DROP TABLE IF EXISTS graph_preferred_nodes; diff --git a/sqldb/sqlc/migrations/000016_graph_preferred_lookups.up.sql b/sqldb/sqlc/migrations/000016_graph_preferred_lookups.up.sql new file mode 100644 index 00000000000..547e4971e2c --- /dev/null +++ b/sqldb/sqlc/migrations/000016_graph_preferred_lookups.up.sql @@ -0,0 +1,83 @@ +-- Preferred-node mapping: one row per unique pub_key pointing at the "best" +-- node row across gossip versions. Priority: v2 announced > v1 announced > +-- v2 shell > v1 shell. +CREATE TABLE IF NOT EXISTS graph_preferred_nodes ( + pub_key BLOB PRIMARY KEY, + node_id BIGINT NOT NULL REFERENCES graph_nodes(id) ON DELETE CASCADE +); + +-- Index on node_id so cascade deletes from graph_nodes can locate the +-- referencing rows without a sequential scan. +CREATE INDEX IF NOT EXISTS graph_preferred_nodes_node_id_idx + ON graph_preferred_nodes (node_id); + +-- Preferred-channel mapping: one row per unique SCID pointing at the "best" +-- channel row across gossip versions. Priority: v2 with policies > +-- v1 with policies > v2 bare > v1 bare. +CREATE TABLE IF NOT EXISTS graph_preferred_channels ( + scid BLOB PRIMARY KEY, + channel_id BIGINT NOT NULL REFERENCES graph_channels(id) ON DELETE CASCADE +); + +-- Index on channel_id so cascade deletes from graph_channels can locate +-- the referencing rows without a sequential scan. +CREATE INDEX IF NOT EXISTS graph_preferred_channels_channel_id_idx + ON graph_preferred_channels (channel_id); + +-- Populate graph_preferred_nodes from the graph_nodes rows that already +-- existed before this migration. The inner query ranks every node row within +-- each pub_key group. Announced nodes, identified by a non-empty signature, +-- outrank shell nodes, and higher gossip versions win within the same +-- announced/shell class. The outer INSERT keeps only rn = 1, leaving exactly +-- one preferred node_id per pub_key. +-- +-- The conflict clause makes this population step idempotent if the migration +-- is retried after the tables were created and partially populated. +INSERT INTO graph_preferred_nodes (pub_key, node_id) +SELECT sub.pub_key, sub.node_id +FROM ( + SELECT + n.pub_key, + n.id AS node_id, + ROW_NUMBER() OVER ( + PARTITION BY n.pub_key + ORDER BY + (COALESCE(length(n.signature), 0) > 0) DESC, + n.version DESC + ) AS rn + FROM graph_nodes n +) sub +WHERE sub.rn = 1 +ON CONFLICT (pub_key) DO UPDATE SET node_id = EXCLUDED.node_id +WHERE graph_preferred_nodes.node_id <> EXCLUDED.node_id; + +-- Populate graph_preferred_channels from the graph_channels rows that already +-- existed before this migration. The inner query ranks every channel row +-- within each SCID group. A channel version with at least one policy row +-- outranks a bare channel version, and higher gossip versions win within the +-- same policy/bare class. The outer INSERT keeps only rn = 1, leaving exactly +-- one preferred channel_id per SCID. +-- +-- The conflict clause makes this population step idempotent if the migration +-- is retried after the tables were created and partially populated. +INSERT INTO graph_preferred_channels (scid, channel_id) +SELECT sub.scid, sub.channel_id +FROM ( + SELECT + c.scid, + c.id AS channel_id, + ROW_NUMBER() OVER ( + PARTITION BY c.scid + ORDER BY + EXISTS ( + SELECT 1 FROM graph_channel_policies p + WHERE p.channel_id = c.id + AND p.version = c.version + ) DESC, + c.version DESC + ) AS rn + FROM graph_channels c +) sub +WHERE sub.rn = 1 +ON CONFLICT (scid) DO UPDATE SET channel_id = EXCLUDED.channel_id +WHERE graph_preferred_channels.channel_id <> EXCLUDED.channel_id; diff --git a/sqldb/sqlc/models.go b/sqldb/sqlc/models.go index ef9aa9006f9..6df9fae7e35 100644 --- a/sqldb/sqlc/models.go +++ b/sqldb/sqlc/models.go @@ -123,6 +123,16 @@ type GraphNodeFeature struct { FeatureBit int32 } +type GraphPreferredChannel struct { + Scid []byte + ChannelID int64 +} + +type GraphPreferredNode struct { + PubKey []byte + NodeID int64 +} + type GraphPruneLog struct { BlockHeight int64 BlockHash []byte diff --git a/sqldb/sqlc/querier.go b/sqldb/sqlc/querier.go index 9b95a669917..6b8e0ad2181 100644 --- a/sqldb/sqlc/querier.go +++ b/sqldb/sqlc/querier.go @@ -231,6 +231,8 @@ type Querier interface { ListChannelsWithPoliciesPaginated(ctx context.Context, arg ListChannelsWithPoliciesPaginatedParams) ([]ListChannelsWithPoliciesPaginatedRow, error) ListNodeIDsAndPubKeys(ctx context.Context, arg ListNodeIDsAndPubKeysParams) ([]ListNodeIDsAndPubKeysRow, error) ListNodesPaginated(ctx context.Context, arg ListNodesPaginatedParams) ([]GraphNode, error) + ListPreferredChannelsPaginated(ctx context.Context, arg ListPreferredChannelsPaginatedParams) ([]ListPreferredChannelsPaginatedRow, error) + ListPreferredNodesPaginated(ctx context.Context, arg ListPreferredNodesPaginatedParams) ([]ListPreferredNodesPaginatedRow, error) NextInvoiceSettleIndex(ctx context.Context) (int64, error) NodeExists(ctx context.Context, arg NodeExistsParams) (bool, error) OnAMPSubInvoiceCanceled(ctx context.Context, arg OnAMPSubInvoiceCanceledParams) error @@ -255,6 +257,12 @@ type Querier interface { UpsertNode(ctx context.Context, arg UpsertNodeParams) (int64, error) UpsertNodeAddress(ctx context.Context, arg UpsertNodeAddressParams) error UpsertNodeExtraType(ctx context.Context, arg UpsertNodeExtraTypeParams) error + // Recompute the preferred channel for a given SCID and upsert the result. + // Priority: v2 with policies > v1 with policies > v2 bare > v1 bare. + UpsertPreferredChannel(ctx context.Context, scid []byte) error + // Recompute the preferred node for a given pub_key and upsert the result. + // Priority: v2 announced > v1 announced > v2 shell > v1 shell. + UpsertPreferredNode(ctx context.Context, pubKey []byte) error UpsertPruneLogEntry(ctx context.Context, arg UpsertPruneLogEntryParams) error // We use a separate upsert for our own node since we want to be less strict // about the last_update field. For our own node, we always want to diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index a7683d1fbd9..fef054d0c39 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -80,6 +80,14 @@ WHERE version = $1 AND id > $2 ORDER BY id LIMIT $3; +-- name: ListPreferredNodesPaginated :many +SELECT sqlc.embed(n) +FROM graph_preferred_nodes pn +JOIN graph_nodes n ON n.id = pn.node_id +WHERE pn.pub_key > $1 +ORDER BY pn.pub_key +LIMIT $2; + -- name: ListNodeIDsAndPubKeys :many SELECT id, pub_key FROM graph_nodes @@ -978,6 +986,64 @@ WHERE c.version = $1 AND c.id > $2 ORDER BY c.id LIMIT $3; +-- name: ListPreferredChannelsPaginated :many +SELECT + sqlc.embed(c), + + -- Join node pubkeys + n1.pub_key AS node1_pubkey, + n2.pub_key AS node2_pubkey, + + -- Node 1 policy + cp1.id AS policy_1_id, + cp1.node_id AS policy_1_node_id, + cp1.version AS policy_1_version, + cp1.timelock AS policy_1_timelock, + cp1.fee_ppm AS policy_1_fee_ppm, + cp1.base_fee_msat AS policy_1_base_fee_msat, + cp1.min_htlc_msat AS policy_1_min_htlc_msat, + cp1.max_htlc_msat AS policy_1_max_htlc_msat, + cp1.last_update AS policy_1_last_update, + cp1.disabled AS policy_1_disabled, + cp1.inbound_base_fee_msat AS policy1_inbound_base_fee_msat, + cp1.inbound_fee_rate_milli_msat AS policy1_inbound_fee_rate_milli_msat, + cp1.message_flags AS policy1_message_flags, + cp1.channel_flags AS policy1_channel_flags, + cp1.block_height AS policy1_block_height, + cp1.disable_flags AS policy1_disable_flags, + cp1.signature AS policy_1_signature, + + -- Node 2 policy + cp2.id AS policy_2_id, + cp2.node_id AS policy_2_node_id, + cp2.version AS policy_2_version, + cp2.timelock AS policy_2_timelock, + cp2.fee_ppm AS policy_2_fee_ppm, + cp2.base_fee_msat AS policy_2_base_fee_msat, + cp2.min_htlc_msat AS policy_2_min_htlc_msat, + cp2.max_htlc_msat AS policy_2_max_htlc_msat, + cp2.last_update AS policy_2_last_update, + cp2.disabled AS policy_2_disabled, + cp2.inbound_base_fee_msat AS policy2_inbound_base_fee_msat, + cp2.inbound_fee_rate_milli_msat AS policy2_inbound_fee_rate_milli_msat, + cp2.message_flags AS policy2_message_flags, + cp2.channel_flags AS policy2_channel_flags, + cp2.signature AS policy_2_signature, + cp2.block_height AS policy_2_block_height, + cp2.disable_flags AS policy_2_disable_flags + +FROM graph_preferred_channels pc +JOIN graph_channels c ON c.id = pc.channel_id +JOIN graph_nodes n1 ON c.node_id_1 = n1.id +JOIN graph_nodes n2 ON c.node_id_2 = n2.id +LEFT JOIN graph_channel_policies cp1 + ON cp1.channel_id = c.id AND cp1.node_id = c.node_id_1 AND cp1.version = c.version +LEFT JOIN graph_channel_policies cp2 + ON cp2.channel_id = c.id AND cp2.node_id = c.node_id_2 AND cp2.version = c.version +WHERE pc.scid > $1 +ORDER BY pc.scid +LIMIT $2; + -- name: ListChannelsWithPoliciesForCachePaginated :many SELECT c.id as id, @@ -1435,3 +1501,42 @@ ON CONFLICT (channel_id, node_id, version) channel_flags = EXCLUDED.channel_flags, signature = EXCLUDED.signature RETURNING id; + +/* ───────────────────────────────────────────── + graph_preferred_nodes table queries + ───────────────────────────────────────────── +*/ + +-- name: UpsertPreferredNode :exec +-- Recompute the preferred node for a given pub_key and upsert the result. +-- Priority: v2 announced > v1 announced > v2 shell > v1 shell. +INSERT INTO graph_preferred_nodes (pub_key, node_id) +SELECT n.pub_key, n.id +FROM graph_nodes n +WHERE n.pub_key = $1 +ORDER BY + (COALESCE(length(n.signature), 0) > 0) DESC, + n.version DESC +LIMIT 1 +ON CONFLICT (pub_key) DO UPDATE SET node_id = EXCLUDED.node_id; + +/* ───────────────────────────────────────────── + graph_preferred_channels table queries + ───────────────────────────────────────────── +*/ + +-- name: UpsertPreferredChannel :exec +-- Recompute the preferred channel for a given SCID and upsert the result. +-- Priority: v2 with policies > v1 with policies > v2 bare > v1 bare. +INSERT INTO graph_preferred_channels (scid, channel_id) +SELECT c.scid, c.id +FROM graph_channels c +WHERE c.scid = $1 +ORDER BY + EXISTS ( + SELECT 1 FROM graph_channel_policies p + WHERE p.channel_id = c.id AND p.version = c.version + ) DESC, + c.version DESC +LIMIT 1 +ON CONFLICT (scid) DO UPDATE SET channel_id = EXCLUDED.channel_id; From f6f8444c91e861f3a5ed0a362bb7978f2671ea3f Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 7 May 2026 13:17:13 -0700 Subject: [PATCH 03/10] graph/db: add preferred channel fetch methods Add version-agnostic channel fetch helpers that choose the preferred gossip-version row for a logical channel: the highest version with policy data, falling back to the highest bare version. Implement the SQL path inside a single read transaction instead of calling SQLStore methods recursively, preserve zombie edge info for SCID lookups, and keep KV behavior as v1-only delegation. Add coverage for preferred selection, version listing, missing channels, missing outpoints, and zombie edge info. --- graph/db/graph_test.go | 181 +++++++++++++++++++++ graph/db/interfaces.go | 31 ++++ graph/db/kv_store.go | 79 +++++++++ graph/db/sql_store.go | 354 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 645 insertions(+) diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 6521a849e51..da2aabce378 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -6794,3 +6794,184 @@ func TestUpdateRangeValidateForVersion(t *testing.T) { }) } } + +// TestPreferredChannelAndGetVersions tests the four new Store methods: +// FetchChannelEdgesByIDPreferred, FetchChannelEdgesByOutpointPreferred, +// GetVersionsBySCID, and GetVersionsByOutpoint. +func TestPreferredChannelAndGetVersions(t *testing.T) { + t.Parallel() + ctx := t.Context() + + graph := MakeTestGraph(t) + store := graph.db + + node1Priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + node2Priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + + node1V1 := createNode(t, lnwire.GossipVersion1, node1Priv) + node2V1 := createNode(t, lnwire.GossipVersion1, node2Priv) + + require.NoError(t, graph.AddNode(ctx, node1V1)) + require.NoError(t, graph.AddNode(ctx, node2V1)) + + // Create and add a v1 channel edge. + edgeInfo, scid := createEdge( + lnwire.GossipVersion1, 100, 1, 0, 1, node1V1, node2V1, + ) + require.NoError(t, graph.AddChannelEdge(ctx, edgeInfo)) + + chanID := scid.ToUint64() + op := edgeInfo.ChannelPoint + + // FetchChannelEdgesByIDPreferred should return the v1 channel. + info, _, _, err := store.FetchChannelEdgesByIDPreferred( + ctx, chanID, + ) + require.NoError(t, err) + require.Equal(t, chanID, info.ChannelID) + + // FetchChannelEdgesByOutpointPreferred should also return it. + info, _, _, err = store.FetchChannelEdgesByOutpointPreferred( + ctx, &op, + ) + require.NoError(t, err) + require.Equal(t, chanID, info.ChannelID) + + // Querying a non-existent channel should return an error. + _, _, _, err = store.FetchChannelEdgesByIDPreferred(ctx, 999999) + require.ErrorIs(t, err, ErrEdgeNotFound) + + // GetVersionsBySCID should report v1. + versions, err := store.GetVersionsBySCID(ctx, chanID) + require.NoError(t, err) + require.Equal(t, []lnwire.GossipVersion{ + lnwire.GossipVersion1, + }, versions) + + // GetVersionsByOutpoint should also report v1. + versions, err = store.GetVersionsByOutpoint(ctx, &op) + require.NoError(t, err) + require.Equal(t, []lnwire.GossipVersion{ + lnwire.GossipVersion1, + }, versions) + + // GetVersions for a non-existent SCID should return empty. + versions, err = store.GetVersionsBySCID(ctx, 999999) + require.NoError(t, err) + require.Empty(t, versions) + + unknownOutpoint := wire.OutPoint{ + Hash: chainhash.Hash{1}, + Index: 99, + } + versions, err = store.GetVersionsByOutpoint(ctx, &unknownOutpoint) + require.NoError(t, err) + require.Empty(t, versions) + + zombieChanID := uint64(888888) + err = store.MarkEdgeZombie( + ctx, lnwire.GossipVersion1, zombieChanID, + node1V1.PubKeyBytes, node2V1.PubKeyBytes, + ) + require.NoError(t, err) + + info, _, _, err = store.FetchChannelEdgesByIDPreferred( + ctx, zombieChanID, + ) + require.ErrorIs(t, err, ErrZombieEdge) + require.NotNil(t, info) + require.Equal(t, route.Vertex(node1V1.PubKeyBytes), info.NodeKey1Bytes) + require.Equal(t, route.Vertex(node2V1.PubKeyBytes), info.NodeKey2Bytes) + + if !isSQLDB { + return + } + + node1V2 := createNode(t, lnwire.GossipVersion2, node1Priv) + node2V2 := createNode(t, lnwire.GossipVersion2, node2Priv) + require.NoError(t, graph.AddNode(ctx, node1V2)) + require.NoError(t, graph.AddNode(ctx, node2V2)) + + // Add a duplicate v1/v2 channel and verify preferred lookup chooses + // the v2 edge while GetVersions reports both versions. + dupV1, dupSCID := createEdge( + lnwire.GossipVersion1, 101, 1, 0, 2, node1V1, node2V1, + ) + dupV2, _ := createEdge( + lnwire.GossipVersion2, 101, 1, 0, 2, node1V2, node2V2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, dupV1)) + require.NoError(t, graph.AddChannelEdge(ctx, dupV2)) + + dupChanID := dupSCID.ToUint64() + dupOutpoint := dupV1.ChannelPoint + + info, _, _, err = store.FetchChannelEdgesByIDPreferred( + ctx, dupChanID, + ) + require.NoError(t, err) + require.Equal(t, dupChanID, info.ChannelID) + require.Equal(t, lnwire.GossipVersion2, info.Version) + + info, _, _, err = store.FetchChannelEdgesByOutpointPreferred( + ctx, &dupOutpoint, + ) + require.NoError(t, err) + require.Equal(t, dupChanID, info.ChannelID) + require.Equal(t, lnwire.GossipVersion2, info.Version) + versions, err = store.GetVersionsBySCID(ctx, dupChanID) + require.NoError(t, err) + require.Equal(t, []lnwire.GossipVersion{ + lnwire.GossipVersion1, + lnwire.GossipVersion2, + }, versions) + + versions, err = store.GetVersionsByOutpoint(ctx, &dupOutpoint) + require.NoError(t, err) + require.Equal(t, []lnwire.GossipVersion{ + lnwire.GossipVersion1, + lnwire.GossipVersion2, + }, versions) + + // Add another duplicate v1/v2 channel where only the v1 version has a + // policy. Preferred lookup should return the lower version with usable + // policy data instead of the higher version shell. + policyPrefV1, policyPrefSCID := createEdge( + lnwire.GossipVersion1, 102, 1, 0, 3, node1V1, node2V1, + ) + policyPrefV2, _ := createEdge( + lnwire.GossipVersion2, 102, 1, 0, 3, node1V2, node2V2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, policyPrefV1)) + require.NoError(t, graph.AddChannelEdge(ctx, policyPrefV2)) + + policyOnlyV1 := newEdgePolicy( + lnwire.GossipVersion1, policyPrefV1.ChannelID, 1000, true, + ) + policyOnlyV1.ToNode = node2V1.PubKeyBytes + policyOnlyV1.SigBytes = testSig.Serialize() + require.NoError(t, graph.UpdateEdgePolicy(ctx, policyOnlyV1)) + + policyPrefChanID := policyPrefSCID.ToUint64() + policyPrefOutpoint := policyPrefV1.ChannelPoint + + info, p1, p2, err := store.FetchChannelEdgesByIDPreferred( + ctx, policyPrefChanID, + ) + require.NoError(t, err) + require.Equal(t, policyPrefChanID, info.ChannelID) + require.Equal(t, lnwire.GossipVersion1, info.Version) + require.NotNil(t, p1) + require.Nil(t, p2) + + info, p1, p2, err = store.FetchChannelEdgesByOutpointPreferred( + ctx, &policyPrefOutpoint, + ) + require.NoError(t, err) + require.Equal(t, policyPrefChanID, info.ChannelID) + require.Equal(t, lnwire.GossipVersion1, info.Version) + require.NotNil(t, p1) + require.Nil(t, p2) +} diff --git a/graph/db/interfaces.go b/graph/db/interfaces.go index 0230300290f..367f2805510 100644 --- a/graph/db/interfaces.go +++ b/graph/db/interfaces.go @@ -324,6 +324,37 @@ type Store interface { //nolint:interfacebloat *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) + // FetchChannelEdgesByIDPreferred behaves like FetchChannelEdgesByID + // but is version-agnostic: if the channel exists under multiple gossip + // versions it returns the preferred record. Preferred means the highest + // version with policies, falling back to the highest bare version. + FetchChannelEdgesByIDPreferred(ctx context.Context, + chanID uint64) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) + + // FetchChannelEdgesByOutpointPreferred behaves like + // FetchChannelEdgesByOutpoint but is version-agnostic: if the channel + // exists under multiple gossip versions it returns the preferred + // record. Preferred means the highest version with policies, falling + // back to the highest bare version. + FetchChannelEdgesByOutpointPreferred(ctx context.Context, + op *wire.OutPoint) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) + + // GetVersionsBySCID returns the list of gossip versions for which a + // channel with the given SCID exists in the database, ordered + // ascending. + GetVersionsBySCID(ctx context.Context, + chanID uint64) ([]lnwire.GossipVersion, error) + + // GetVersionsByOutpoint returns the list of gossip versions for which + // a channel with the given funding outpoint exists in the database, + // ordered ascending. + GetVersionsByOutpoint(ctx context.Context, + op *wire.OutPoint) ([]lnwire.GossipVersion, error) + // ChannelView returns the verifiable edge information for each active // channel within the known channel graph for the given gossip version. // The set of UTXO's (along with their scripts) returned are the ones diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index ed6d0e07e53..3afb5204c64 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -4198,6 +4198,85 @@ func (c *KVStore) FetchChannelEdgesByID(_ context.Context, return edgeInfo, policy1, policy2, nil } +// FetchChannelEdgesByIDPreferred looks up the channel by ID. The KV store +// only supports gossip v1, so this simply delegates to the versioned fetch. +// +// NOTE: part of the Store interface. +func (c *KVStore) FetchChannelEdgesByIDPreferred(ctx context.Context, + chanID uint64) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + return c.FetchChannelEdgesByID(ctx, lnwire.GossipVersion1, chanID) +} + +// FetchChannelEdgesByOutpointPreferred looks up the channel by funding +// outpoint. The KV store only supports gossip v1, so this simply delegates to +// the versioned fetch. +// +// NOTE: part of the Store interface. +func (c *KVStore) FetchChannelEdgesByOutpointPreferred( + ctx context.Context, op *wire.OutPoint) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + return c.FetchChannelEdgesByOutpoint( + ctx, lnwire.GossipVersion1, op, + ) +} + +// GetVersionsBySCID returns the gossip versions for which a channel with the +// given SCID exists. The KV store only supports gossip v1, so at most one +// version is returned. +// +// NOTE: part of the Store interface. +func (c *KVStore) GetVersionsBySCID(ctx context.Context, + chanID uint64) ([]lnwire.GossipVersion, error) { + + _, _, _, err := c.FetchChannelEdgesByID( + ctx, lnwire.GossipVersion1, chanID, + ) + switch { + case errors.Is(err, ErrEdgeNotFound): + return nil, nil + + case errors.Is(err, ErrZombieEdge): + return nil, nil + + case err != nil: + return nil, err + + default: + return []lnwire.GossipVersion{lnwire.GossipVersion1}, nil + } +} + +// GetVersionsByOutpoint returns the gossip versions for which a channel with +// the given funding outpoint exists. The KV store only supports gossip v1, so +// at most one version is returned. +// +// NOTE: part of the Store interface. +func (c *KVStore) GetVersionsByOutpoint(ctx context.Context, + op *wire.OutPoint) ([]lnwire.GossipVersion, error) { + + _, _, _, err := c.FetchChannelEdgesByOutpoint( + ctx, lnwire.GossipVersion1, op, + ) + switch { + case errors.Is(err, ErrEdgeNotFound): + return nil, nil + + case errors.Is(err, ErrZombieEdge): + return nil, nil + + case err != nil: + return nil, err + + default: + return []lnwire.GossipVersion{lnwire.GossipVersion1}, nil + } +} + // IsPublicNode is a helper method that determines whether the node with the // given public key is seen as a public node in the graph from the graph's // source node's point of view. diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index fcf4b700cef..3dea3f092cf 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -2684,6 +2684,360 @@ func (s *SQLStore) FetchChannelEdgesByOutpoint(ctx context.Context, return edge, policy1, policy2, nil } +var ( + preferredGossipVersionsDescending = []lnwire.GossipVersion{ + gossipV2, gossipV1, + } + preferredGossipVersionsAscending = []lnwire.GossipVersion{ + gossipV1, gossipV2, + } +) + +// FetchChannelEdgesByIDPreferred tries each known gossip version from highest +// to lowest and returns the first result that has at least one policy. If no +// version has policies, the highest version found is returned. This prevents a +// v2 channel with no policies from hiding a v1 channel that has valid policy +// data. +// +// If no live edge is found across versions but at least one version reports +// the channel as a zombie, ErrZombieEdge is returned with the zombie edge info +// populated so callers can resurrect it. +// +// NOTE: part of the Store interface. +func (s *SQLStore) FetchChannelEdgesByIDPreferred(ctx context.Context, + chanID uint64) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + var ( + bestInfo *models.ChannelEdgeInfo + bestP1 *models.ChannelEdgePolicy + bestP2 *models.ChannelEdgePolicy + bestZombie *models.ChannelEdgeInfo + chanIDB = channelIDToBytes(chanID) + buildLiveEdge = func(ctx context.Context, db SQLQueries, + row sqlc.GetChannelBySCIDWithPoliciesRow) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + node1, node2, err := buildNodeVertices( + row.GraphNode.PubKey, row.GraphNode_2.PubKey, + ) + if err != nil { + return nil, nil, nil, err + } + + edge, err := getAndBuildEdgeInfo( + ctx, s.cfg, db, row.GraphChannel, node1, node2, + ) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to "+ + "build channel info: %w", err) + } + + dbPol1, dbPol2, err := extractChannelPolicies(row) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to "+ + "extract channel policies: %w", err) + } + + policy1, policy2, err := getAndBuildChanPolicies( + ctx, s.cfg.QueryCfg, db, dbPol1, dbPol2, + edge.ChannelID, node1, node2, + ) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to "+ + "build channel policies: %w", err) + } + + return edge, policy1, policy2, nil + } + ) + + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + for _, v := range preferredGossipVersionsDescending { + row, err := db.GetChannelBySCIDWithPolicies( + ctx, sqlc.GetChannelBySCIDWithPoliciesParams{ + Scid: chanIDB, + Version: int16(v), + }, + ) + if errors.Is(err, sql.ErrNoRows) { + zombie, err := db.GetZombieChannel( + ctx, sqlc.GetZombieChannelParams{ + Scid: chanIDB, + Version: int16(v), + }, + ) + if errors.Is(err, sql.ErrNoRows) { + continue + } + if err != nil { + return fmt.Errorf("unable to check if "+ + "channel is zombie: %w", err) + } + + if bestZombie == nil { + var err error + bestZombie, err = buildZombieEdge( + v, chanID, zombie.NodeKey1, + zombie.NodeKey2, + ) + if err != nil { + return err + } + } + + continue + } + if err != nil { + return fmt.Errorf("unable to fetch channel: %w", + err) + } + + info, p1, p2, err := buildLiveEdge(ctx, db, row) + if err != nil { + return err + } + + if p1 != nil || p2 != nil { + bestInfo, bestP1, bestP2 = info, p1, p2 + + return nil + } + + if bestInfo == nil { + bestInfo, bestP1, bestP2 = info, p1, p2 + } + } + + if bestInfo != nil { + return nil + } + + if bestZombie != nil { + return ErrZombieEdge + } + + return ErrEdgeNotFound + }, sqldb.NoOpReset) + if errors.Is(err, ErrZombieEdge) { + return bestZombie, nil, nil, ErrZombieEdge + } + if err != nil { + return nil, nil, nil, fmt.Errorf("could not fetch preferred "+ + "channel: %w", err) + } + + return bestInfo, bestP1, bestP2, nil +} + +// FetchChannelEdgesByOutpointPreferred tries each known gossip version from +// highest to lowest and returns the first result that has at least one policy. +// If no version has policies, the highest version found is returned. This +// prevents a v2 channel with no policies from hiding a v1 channel that has +// valid policy data. +// +// NOTE: part of the Store interface. +func (s *SQLStore) FetchChannelEdgesByOutpointPreferred( + ctx context.Context, op *wire.OutPoint) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + var ( + bestInfo *models.ChannelEdgeInfo + bestP1 *models.ChannelEdgePolicy + bestP2 *models.ChannelEdgePolicy + buildLiveEdge = func(ctx context.Context, db SQLQueries, + row sqlc.GetChannelByOutpointWithPoliciesRow) ( + *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy, error) { + + node1, node2, err := buildNodeVertices( + row.Node1Pubkey, row.Node2Pubkey, + ) + if err != nil { + return nil, nil, nil, err + } + + edge, err := getAndBuildEdgeInfo( + ctx, s.cfg, db, row.GraphChannel, node1, node2, + ) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to "+ + "build channel info: %w", err) + } + + dbPol1, dbPol2, err := extractChannelPolicies(row) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to "+ + "extract channel policies: %w", err) + } + + policy1, policy2, err := getAndBuildChanPolicies( + ctx, s.cfg.QueryCfg, db, dbPol1, dbPol2, + edge.ChannelID, node1, node2, + ) + if err != nil { + return nil, nil, nil, fmt.Errorf("unable to "+ + "build channel policies: %w", err) + } + + return edge, policy1, policy2, nil + } + ) + + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + for _, v := range preferredGossipVersionsDescending { + params := sqlc.GetChannelByOutpointWithPoliciesParams{ + Outpoint: op.String(), + Version: int16(v), + } + row, err := db.GetChannelByOutpointWithPolicies( + ctx, params, + ) + if errors.Is(err, sql.ErrNoRows) { + continue + } + if err != nil { + return fmt.Errorf("unable to fetch channel: %w", + err) + } + + info, p1, p2, err := buildLiveEdge(ctx, db, row) + if err != nil { + return err + } + + if p1 != nil || p2 != nil { + bestInfo, bestP1, bestP2 = info, p1, p2 + + return nil + } + + if bestInfo == nil { + bestInfo, bestP1, bestP2 = info, p1, p2 + } + } + + if bestInfo != nil { + return nil + } + + return ErrEdgeNotFound + }, sqldb.NoOpReset) + if err != nil { + return nil, nil, nil, fmt.Errorf("could not fetch preferred "+ + "channel: %w", err) + } + + return bestInfo, bestP1, bestP2, nil +} + +// GetVersionsBySCID returns the gossip versions for which a channel with the +// given SCID exists in the database. +// +// NOTE: part of the Store interface. +func (s *SQLStore) GetVersionsBySCID(ctx context.Context, + chanID uint64) ([]lnwire.GossipVersion, error) { + + var versions []lnwire.GossipVersion + chanIDB := channelIDToBytes(chanID) + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + for _, v := range preferredGossipVersionsAscending { + _, err := db.GetChannelBySCID( + ctx, sqlc.GetChannelBySCIDParams{ + Scid: chanIDB, + Version: int16(v), + }, + ) + if errors.Is(err, sql.ErrNoRows) { + continue + } + if err != nil { + return fmt.Errorf("unable to fetch channel: %w", + err) + } + + versions = append(versions, v) + } + + return nil + }, sqldb.NoOpReset) + if err != nil { + return nil, err + } + + return versions, nil +} + +// GetVersionsByOutpoint returns the gossip versions for which a channel with +// the given funding outpoint exists in the database. +// +// NOTE: part of the Store interface. +func (s *SQLStore) GetVersionsByOutpoint(ctx context.Context, + op *wire.OutPoint) ([]lnwire.GossipVersion, error) { + + var versions []lnwire.GossipVersion + err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + for _, v := range preferredGossipVersionsAscending { + params := sqlc.GetChannelByOutpointWithPoliciesParams{ + Outpoint: op.String(), + Version: int16(v), + } + _, err := db.GetChannelByOutpointWithPolicies( + ctx, params, + ) + if errors.Is(err, sql.ErrNoRows) { + continue + } + if err != nil { + return fmt.Errorf("unable to fetch channel: %w", + err) + } + + versions = append(versions, v) + } + + return nil + }, sqldb.NoOpReset) + if err != nil { + return nil, err + } + + return versions, nil +} + +func buildZombieEdge(v lnwire.GossipVersion, chanID uint64, nodeKey1, + nodeKey2 []byte) (*models.ChannelEdgeInfo, error) { + + node1, err := route.NewVertexFromBytes(nodeKey1) + if err != nil { + return nil, err + } + node2, err := route.NewVertexFromBytes(nodeKey2) + if err != nil { + return nil, err + } + + switch v { + case gossipV1: + return models.NewV1Channel( + chanID, chainhash.Hash{}, node1, node2, + &models.ChannelV1Fields{}, + ) + + case gossipV2: + return models.NewV2Channel( + chanID, chainhash.Hash{}, node1, node2, + &models.ChannelV2Fields{}, + ) + + default: + return nil, fmt.Errorf("unsupported gossip version: %d", v) + } +} + // HasV1ChannelEdge returns true if the database knows of a channel edge // with the passed channel ID, and false otherwise. If an edge with that ID // is found within the graph, then two time stamps representing the last time From 822e05a4c19226497c9b9fbdcb7be68a5a04033b Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 7 Apr 2026 08:35:10 +0545 Subject: [PATCH 04/10] graph/db: wire preferred-table maintenance into write paths Add UpsertPreferredNode and UpsertPreferredChannel calls to every Store write path so the preferred mapping tables stay consistent: - upsertSourceNode, upsertNode, maybeCreateShellNode, DeleteNode - insertChannel, updateChanEdgePolicy, DeleteChannelEdges - pruneGraphNodes CASCADE deletes on the underlying graph_nodes and graph_channels tables automatically clean up preferred entries when a version is removed. --- graph/db/sql_store.go | 101 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 97 insertions(+), 4 deletions(-) diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 3dea3f092cf..4ffd14bac47 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -55,6 +55,7 @@ type SQLQueries interface { GetNodesByBlockHeightRange(ctx context.Context, arg sqlc.GetNodesByBlockHeightRangeParams) ([]sqlc.GraphNode, error) GetPublicNodesByLastUpdateRange(ctx context.Context, arg sqlc.GetPublicNodesByLastUpdateRangeParams) ([]sqlc.GraphNode, error) ListNodesPaginated(ctx context.Context, arg sqlc.ListNodesPaginatedParams) ([]sqlc.GraphNode, error) + UpsertPreferredNode(ctx context.Context, pubKey []byte) error ListNodeIDsAndPubKeys(ctx context.Context, arg sqlc.ListNodeIDsAndPubKeysParams) ([]sqlc.ListNodeIDsAndPubKeysRow, error) IsPublicV1Node(ctx context.Context, pubKey []byte) (bool, error) IsPublicV2Node(ctx context.Context, pubKey []byte) (bool, error) @@ -105,6 +106,7 @@ type SQLQueries interface { ListChannelsByNodeID(ctx context.Context, arg sqlc.ListChannelsByNodeIDParams) ([]sqlc.ListChannelsByNodeIDRow, error) ListChannelsForNodeIDs(ctx context.Context, arg sqlc.ListChannelsForNodeIDsParams) ([]sqlc.ListChannelsForNodeIDsRow, error) ListChannelsWithPoliciesPaginated(ctx context.Context, arg sqlc.ListChannelsWithPoliciesPaginatedParams) ([]sqlc.ListChannelsWithPoliciesPaginatedRow, error) + UpsertPreferredChannel(ctx context.Context, scid []byte) error ListChannelsWithPoliciesForCachePaginated(ctx context.Context, arg sqlc.ListChannelsWithPoliciesForCachePaginatedParams) ([]sqlc.ListChannelsWithPoliciesForCachePaginatedRow, error) ListChannelsPaginated(ctx context.Context, arg sqlc.ListChannelsPaginatedParams) ([]sqlc.ListChannelsPaginatedRow, error) ListChannelsPaginatedV2(ctx context.Context, arg sqlc.ListChannelsPaginatedV2Params) ([]sqlc.ListChannelsPaginatedV2Row, error) @@ -439,7 +441,13 @@ func (s *SQLStore) DeleteNode(ctx context.Context, v lnwire.GossipVersion, return fmt.Errorf("deleted %d rows, expected 1", rows) } - return err + // Recompute the preferred mapping. If another version of + // this node still exists, UpsertPreferredNode will point + // the mapping at it. If no version remains, the + // INSERT...SELECT is a no-op and the CASCADE on the FK + // already removed the mapping row when the node was + // deleted above. + return db.UpsertPreferredNode(ctx, pubKey[:]) }, sqldb.NoOpReset) if err != nil { return fmt.Errorf("unable to delete node: %w", err) @@ -2458,7 +2466,25 @@ func (s *SQLStore) DeleteChannelEdges(ctx context.Context, } } - return s.deleteChannels(ctx, db, chanIDsToDelete) + err = s.deleteChannels(ctx, db, chanIDsToDelete) + if err != nil { + return err + } + + // The CASCADE on graph_preferred_channels will have + // removed the mapping row for any deleted channel. If + // another version of the same SCID still exists, we + // need to re-insert the mapping. + for _, chanID := range chanIDs { + scidBytes := channelIDToBytes(chanID) + err = db.UpsertPreferredChannel(ctx, scidBytes) + if err != nil { + return fmt.Errorf("recalc preferred "+ + "channel(%d): %w", chanID, err) + } + } + + return nil }, func() { edges = nil @@ -3697,6 +3723,10 @@ func (s *SQLStore) PruneGraph(ctx context.Context, return err } + // Delete all matched channels. GetChannelsByOutpoints + // returns every version for a given outpoint, so all + // versions are deleted and the CASCADE on + // graph_preferred_channels handles cleanup. err = s.deleteChannels(ctx, db, chansToDelete) if err != nil { return fmt.Errorf("unable to delete channels: %w", err) @@ -4004,9 +4034,22 @@ func (s *SQLStore) pruneGraphNodes(ctx context.Context, "nodes: %w", err) } + // Recalc preferred node mappings for all affected pub_keys. The + // CASCADE may have removed some entries; if another version of the + // node still exists, UpsertPreferredNode will re-insert the mapping. + // If no version remains, the upsert is a no-op (the INSERT ... SELECT + // returns no rows). Note that nodeKeys may contain duplicates if a + // node existed in multiple gossip versions and was pruned in all of + // them; UpsertPreferredNode is idempotent, so this is harmless. prunedNodes := make([]route.Vertex, len(nodeKeys)) - for i, nodeKey := range nodeKeys { - pub, err := route.NewVertexFromBytes(nodeKey) + for i, key := range nodeKeys { + err = db.UpsertPreferredNode(ctx, key) + if err != nil { + return nil, fmt.Errorf("recalc preferred "+ + "node: %w", err) + } + + pub, err := route.NewVertexFromBytes(key) if err != nil { return nil, fmt.Errorf("unable to parse pubkey "+ "from bytes: %w", err) @@ -4081,6 +4124,10 @@ func (s *SQLStore) DisconnectBlockAtHeight(ctx context.Context, removedChans = channelEdges + // Delete all matched channels. GetChannelsBySCIDRange + // returns every version for a given SCID, so all versions + // are deleted and the CASCADE on + // graph_preferred_channels handles cleanup. err = s.deleteChannels(ctx, db, chanIDsToDelete) if err != nil { return fmt.Errorf("unable to delete channels: %w", err) @@ -4663,6 +4710,15 @@ func updateChanEdgePolicy(ctx context.Context, tx SQLQueries, "policy extra TLVs: %w", err) } + // Adding a policy may change which version is preferred for this + // SCID (a version with policies outranks one without). + scidBytes := channelIDToBytes(edge.ChannelID) + err = tx.UpsertPreferredChannel(ctx, scidBytes) + if err != nil { + return node1Pub, node2Pub, false, fmt.Errorf("upserting "+ + "preferred channel(%d): %w", edge.ChannelID, err) + } + return node1Pub, node2Pub, isNode1, nil } @@ -5016,6 +5072,13 @@ func upsertSourceNode(ctx context.Context, db SQLQueries, node.PubKeyBytes, err) } + // Recompute the preferred node mapping for this pub_key. + err = db.UpsertPreferredNode(ctx, node.PubKeyBytes[:]) + if err != nil { + return 0, fmt.Errorf("upserting preferred node(%x): %w", + node.PubKeyBytes, err) + } + // We can exit here if we don't have the announcement yet. if !node.HaveAnnouncement() { return nodeID, nil @@ -5054,6 +5117,14 @@ func upsertNode(ctx context.Context, db SQLQueries, // We can exit here if we don't have the announcement yet. if !node.HaveAnnouncement() { + // Even a shell node may become the preferred entry for + // this pub_key (e.g. first v2 shell for a key). + err = db.UpsertPreferredNode(ctx, node.PubKeyBytes[:]) + if err != nil { + return 0, fmt.Errorf("upserting preferred "+ + "node(%x): %w", node.PubKeyBytes, err) + } + return nodeID, nil } @@ -5063,6 +5134,13 @@ func upsertNode(ctx context.Context, db SQLQueries, return 0, err } + // Recompute the preferred node mapping for this pub_key. + err = db.UpsertPreferredNode(ctx, node.PubKeyBytes[:]) + if err != nil { + return 0, fmt.Errorf("upserting preferred node(%x): %w", + node.PubKeyBytes, err) + } + return nodeID, nil } @@ -5534,6 +5612,14 @@ func insertChannel(ctx context.Context, db SQLQueries, } } + // Recompute the preferred channel mapping for this SCID. + scidBytes := channelIDToBytes(edge.ChannelID) + err = db.UpsertPreferredChannel(ctx, scidBytes) + if err != nil { + return fmt.Errorf("upserting preferred channel(%d): %w", + edge.ChannelID, err) + } + return nil } @@ -5567,6 +5653,13 @@ func maybeCreateShellNode(ctx context.Context, db SQLQueries, return 0, fmt.Errorf("unable to create shell node: %w", err) } + // Recompute the preferred node mapping for this pub_key. + err = db.UpsertPreferredNode(ctx, pubKey[:]) + if err != nil { + return 0, fmt.Errorf("upserting preferred node(%x): %w", + pubKey[:], err) + } + return id, nil } From 066a6184dc92c157ddc5f0c385107929cbaaf8e3 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 7 Apr 2026 11:06:17 +0545 Subject: [PATCH 05/10] graph/db: make ForEachNode and ForEachChannel cross-version Drop the GossipVersion parameter from ForEachNode and ForEachChannel. Both methods now iterate across all gossip versions, yielding one result per unique pub_key or SCID using the preferred mapping tables for pagination. Wire ChannelGraph.FetchChannelEdgesByID and FetchChannelEdgesByOutpoint through the PreferHighest variants. Add GetVersionsBySCID and GetVersionsByOutpoint wrappers to ChannelGraph. --- graph/db/benchmark_test.go | 17 +-- graph/db/graph.go | 51 +++++-- graph/db/graph_test.go | 286 ++++++++++++++++++++++++++++++++++- graph/db/interfaces.go | 33 ++-- graph/db/kv_store.go | 15 +- graph/db/sql_store.go | 159 ++++++++++++------- sqldb/sqlc/graph.sql.go | 46 ------ sqldb/sqlc/querier.go | 1 - sqldb/sqlc/queries/graph.sql | 7 - 9 files changed, 446 insertions(+), 169 deletions(-) diff --git a/graph/db/benchmark_test.go b/graph/db/benchmark_test.go index d0764446f7f..bcadb8c2cf0 100644 --- a/graph/db/benchmark_test.go +++ b/graph/db/benchmark_test.go @@ -372,7 +372,7 @@ func TestPopulateDBs(t *testing.T) { numPolicies = 0 ) err := graph.ForEachChannel( - ctx, lnwire.GossipVersion1, + ctx, func(info *models.ChannelEdgeInfo, policy, policy2 *models.ChannelEdgePolicy) error { @@ -500,7 +500,7 @@ func syncGraph(t *testing.T, src, dest *ChannelGraph) { } var wgChans sync.WaitGroup - err = src.ForEachChannel(ctx, lnwire.GossipVersion1, + err = src.ForEachChannel(ctx, func(info *models.ChannelEdgeInfo, policy1, policy2 *models.ChannelEdgePolicy) error { @@ -624,7 +624,7 @@ func BenchmarkGraphReadMethods(b *testing.B) { name: "ForEachNode", fn: func(b testing.TB, store Store) { err := store.ForEachNode( - ctx, lnwire.GossipVersion1, + ctx, func(_ *models.Node) error { // Increment the counter to // ensure the callback is doing @@ -640,12 +640,11 @@ func BenchmarkGraphReadMethods(b *testing.B) { { name: "ForEachChannel", fn: func(b testing.TB, store Store) { - //nolint:ll - err := store.ForEachChannel( - ctx, lnwire.GossipVersion1, + err := store.ForEachChannel(ctx, func(_ *models.ChannelEdgeInfo, + _, _ *models.ChannelEdgePolicy, - _ *models.ChannelEdgePolicy) error { + ) error { // Increment the counter to // ensure the callback is doing @@ -996,7 +995,7 @@ func BenchmarkFindOptimalSQLQueryConfig(b *testing.B) { ) err := store.ForEachNode( - ctx, lnwire.GossipVersion1, + ctx, func(_ *models.Node) error { numNodes++ @@ -1007,7 +1006,7 @@ func BenchmarkFindOptimalSQLQueryConfig(b *testing.B) { //nolint:ll err = store.ForEachChannel( - ctx, lnwire.GossipVersion1, + ctx, func(_ *models.ChannelEdgeInfo, _, _ *models.ChannelEdgePolicy) error { diff --git a/graph/db/graph.go b/graph/db/graph.go index 9e72cad2a33..169f48bb14f 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -679,13 +679,14 @@ func (c *ChannelGraph) HasV1Node(ctx context.Context, return c.db.HasV1Node(ctx, nodePub) } -// ForEachChannel iterates through all channel edges stored within the graph. +// ForEachChannel iterates through all channel edges stored within the graph +// across all gossip versions. func (c *ChannelGraph) ForEachChannel(ctx context.Context, - v lnwire.GossipVersion, cb func(*models.ChannelEdgeInfo, - *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error, + cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy) error, reset func()) error { - return c.db.ForEachChannel(ctx, v, cb, reset) + return c.db.ForEachChannel(ctx, cb, reset) } // DisabledChannelIDs returns the channel ids of disabled channels. @@ -817,26 +818,39 @@ func (c *ChannelGraph) FetchChanInfos(ctx context.Context, } // FetchChannelEdgesByOutpoint attempts to lookup directed edges by funding -// outpoint. +// outpoint, returning the highest available gossip version. func (c *ChannelGraph) FetchChannelEdgesByOutpoint(ctx context.Context, op *wire.OutPoint) ( *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) { - return c.db.FetchChannelEdgesByOutpoint( - ctx, lnwire.GossipVersion1, op, - ) + return c.db.FetchChannelEdgesByOutpointPreferred(ctx, op) } -// FetchChannelEdgesByID attempts to lookup directed edges by channel ID. +// FetchChannelEdgesByID attempts to lookup directed edges by channel ID, +// returning the highest available gossip version. func (c *ChannelGraph) FetchChannelEdgesByID(ctx context.Context, chanID uint64) ( *models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy, error) { - return c.db.FetchChannelEdgesByID( - ctx, lnwire.GossipVersion1, chanID, - ) + return c.db.FetchChannelEdgesByIDPreferred(ctx, chanID) +} + +// GetVersionsBySCID returns the list of gossip versions for which a channel +// with the given SCID exists in the database. +func (c *ChannelGraph) GetVersionsBySCID(ctx context.Context, + chanID uint64) ([]lnwire.GossipVersion, error) { + + return c.db.GetVersionsBySCID(ctx, chanID) +} + +// GetVersionsByOutpoint returns the list of gossip versions for which a channel +// with the given funding outpoint exists in the database. +func (c *ChannelGraph) GetVersionsByOutpoint(ctx context.Context, + op *wire.OutPoint) ([]lnwire.GossipVersion, error) { + + return c.db.GetVersionsByOutpoint(ctx, op) } // PutClosedScid stores a SCID for a closed channel in the database. @@ -927,11 +941,14 @@ func (c *VersionedGraph) ForEachNodeCached(ctx context.Context, return c.ChannelGraph.ForEachNodeCached(ctx, c.v, withAddrs, cb, reset) } -// ForEachNode iterates through all stored vertices/nodes in the graph. +// ForEachNode iterates through all stored vertices/nodes in the graph across +// all gossip versions, returning the preferred version for each pub_key. Note +// that this intentionally ignores c.v — cross-version iteration is the desired +// behaviour for callers that enumerate graph topology. func (c *VersionedGraph) ForEachNode(ctx context.Context, cb func(*models.Node) error, reset func()) error { - return c.db.ForEachNode(ctx, c.v, cb, reset) + return c.db.ForEachNode(ctx, cb, reset) } // NumZombies returns the current number of zombie channels in the graph. @@ -1107,12 +1124,14 @@ func (c *VersionedGraph) ForEachNodeChannel(ctx context.Context, return c.db.ForEachNodeChannel(ctx, c.v, nodePub, cb, reset) } -// ForEachChannel iterates through all channel edges stored within the graph. +// ForEachChannel iterates through all channel edges stored within the graph +// across all gossip versions, returning the preferred version for each SCID. +// See ForEachNode for the rationale on ignoring c.v. func (c *VersionedGraph) ForEachChannel(ctx context.Context, cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error, reset func()) error { - return c.db.ForEachChannel(ctx, c.v, cb, reset) + return c.db.ForEachChannel(ctx, cb, reset) } // ForEachNodeCacheable iterates through all stored vertices/nodes in the graph. diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index da2aabce378..a111e230f55 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -1824,7 +1824,7 @@ func TestGraphTraversal(t *testing.T) { // Iterate through all the known channels within the graph DB, once // again if the map is empty that indicates that all edges have // properly been reached. - err = graph.ForEachChannel(ctx, lnwire.GossipVersion1, + err = graph.ForEachChannel(ctx, func(ei *models.ChannelEdgeInfo, _ *models.ChannelEdgePolicy, _ *models.ChannelEdgePolicy) error { @@ -2179,7 +2179,7 @@ func assertPruneTip(t *testing.T, graph *ChannelGraph, func assertNumChans(t *testing.T, graph *ChannelGraph, n int) { numChans := 0 err := graph.ForEachChannel( - t.Context(), lnwire.GossipVersion1, + t.Context(), func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error { @@ -6975,3 +6975,285 @@ func TestPreferredChannelAndGetVersions(t *testing.T) { require.NotNil(t, p1) require.Nil(t, p2) } + +// TestDeleteNodePreferredRecomputation verifies that deleting one gossip +// version of a dual-version node correctly recomputes the preferred-node +// mapping so the surviving version remains visible via ForEachNode. +func TestDeleteNodePreferredRecomputation(t *testing.T) { + t.Parallel() + + if !isSQLDB { + t.Skip("preferred lookup requires SQL backend") + } + + ctx := t.Context() + graph := MakeTestGraph(t) + store := graph.db + + // Create a node with both v1 and v2 announcements. + priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + + nodeV1 := createNode(t, lnwire.GossipVersion1, priv) + nodeV2 := createNode(t, lnwire.GossipVersion2, priv) + + require.NoError(t, graph.AddNode(ctx, nodeV1)) + require.NoError(t, graph.AddNode(ctx, nodeV2)) + + // ForEachNode should return the node (v2 preferred). + var count int + err = store.ForEachNode(ctx, func(n *models.Node) error { + if n.PubKeyBytes == nodeV1.PubKeyBytes { + require.Equal(t, lnwire.GossipVersion2, n.Version) + count++ + } + + return nil + }, func() { count = 0 }) + require.NoError(t, err) + require.Equal(t, 1, count, "node should be visible before delete") + + // Delete the v2 version. + require.NoError(t, store.DeleteNode( + ctx, lnwire.GossipVersion2, nodeV1.PubKeyBytes, + )) + + // The node should still be visible via ForEachNode, now as v1. + count = 0 + err = store.ForEachNode(ctx, func(n *models.Node) error { + if n.PubKeyBytes == nodeV1.PubKeyBytes { + require.Equal(t, lnwire.GossipVersion1, n.Version) + count++ + } + + return nil + }, func() { count = 0 }) + require.NoError(t, err) + require.Equal(t, 1, count, + "node should still be visible after deleting one version") + + // Delete the remaining v1 version. + require.NoError(t, store.DeleteNode( + ctx, lnwire.GossipVersion1, nodeV1.PubKeyBytes, + )) + + // The node should now be gone. + count = 0 + err = store.ForEachNode(ctx, func(n *models.Node) error { + if n.PubKeyBytes == nodeV1.PubKeyBytes { + count++ + } + + return nil + }, func() { count = 0 }) + require.NoError(t, err) + require.Equal(t, 0, count, + "node should be gone after deleting all versions") +} + +// TestPreferredForEachNode verifies that SQLStore.ForEachNode returns one +// node per pubkey, preferring the highest announced version and otherwise +// falling back to the highest-version shell node. +func TestPreferredForEachNode(t *testing.T) { + t.Parallel() + + if !isSQLDB { + t.Skip("preferred lookup requires SQL backend") + } + + ctx := t.Context() + graph := MakeTestGraph(t) + store := graph.db + + v1Only := createTestVertex(t, lnwire.GossipVersion1) + v1Only.Alias = fn.Some("v1-only") + require.NoError(t, graph.AddNode(ctx, v1Only)) + + bothPriv, err := btcec.NewPrivateKey() + require.NoError(t, err) + + bothV1 := createNode(t, lnwire.GossipVersion1, bothPriv) + bothV1.Alias = fn.Some("both-v1") + require.NoError(t, graph.AddNode(ctx, bothV1)) + + bothV2 := createNode(t, lnwire.GossipVersion2, bothPriv) + bothV2.Alias = fn.Some("both-v2") + require.NoError(t, graph.AddNode(ctx, bothV2)) + + shellPriv, err := btcec.NewPrivateKey() + require.NoError(t, err) + + shellPub, err := route.NewVertexFromBytes( + shellPriv.PubKey().SerializeCompressed(), + ) + require.NoError(t, err) + + require.NoError(t, graph.AddNode( + ctx, models.NewShellNode(lnwire.GossipVersion1, shellPub), + )) + require.NoError(t, graph.AddNode( + ctx, models.NewShellNode(lnwire.GossipVersion2, shellPub), + )) + + var nodeCount int + nodesByPub := make(map[route.Vertex]*models.Node) + err = store.ForEachNode(ctx, func(node *models.Node) error { + nodesByPub[node.PubKeyBytes] = node + nodeCount++ + + return nil + }, func() { + clear(nodesByPub) + nodeCount = 0 + }) + require.NoError(t, err) + require.Len(t, nodesByPub, 3) + require.Equal(t, 3, nodeCount, "unexpected duplicate nodes") + + gotV1Only := nodesByPub[v1Only.PubKeyBytes] + require.NotNil(t, gotV1Only) + require.Equal(t, lnwire.GossipVersion1, gotV1Only.Version) + require.Equal(t, "v1-only", gotV1Only.Alias.UnwrapOr("")) + require.True(t, gotV1Only.HaveAnnouncement()) + + gotBoth := nodesByPub[bothV1.PubKeyBytes] + require.NotNil(t, gotBoth) + require.Equal(t, lnwire.GossipVersion2, gotBoth.Version) + require.Equal(t, "both-v2", gotBoth.Alias.UnwrapOr("")) + require.True(t, gotBoth.HaveAnnouncement()) + + gotShell := nodesByPub[shellPub] + require.NotNil(t, gotShell) + require.Equal(t, lnwire.GossipVersion2, gotShell.Version) + require.False(t, gotShell.HaveAnnouncement()) +} + +// TestPreferredForEachChannel verifies that SQLStore.ForEachChannel returns +// one channel per SCID, preferring a higher-version channel when both versions +// have policies, preserving lower-version policy data when the higher version +// has none, and otherwise falling back to the highest-version no-policy +// channel. +func TestPreferredForEachChannel(t *testing.T) { + t.Parallel() + + if !isSQLDB { + t.Skip("preferred lookup requires SQL backend") + } + + ctx := t.Context() + graph := MakeTestGraph(t) + store := graph.db + + node1Priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + node2Priv, err := btcec.NewPrivateKey() + require.NoError(t, err) + + node1V1 := createNode(t, lnwire.GossipVersion1, node1Priv) + node1V2 := createNode(t, lnwire.GossipVersion2, node1Priv) + node2V1 := createNode(t, lnwire.GossipVersion1, node2Priv) + node2V2 := createNode(t, lnwire.GossipVersion2, node2Priv) + + require.NoError(t, graph.AddNode(ctx, node1V1)) + require.NoError(t, graph.AddNode(ctx, node1V2)) + require.NoError(t, graph.AddNode(ctx, node2V1)) + require.NoError(t, graph.AddNode(ctx, node2V2)) + + v1Only, _ := createEdge( + lnwire.GossipVersion1, 200, 0, 0, 1, node1V1, node2V1, + ) + require.NoError(t, graph.AddChannelEdge(ctx, v1Only)) + + policyPrefV1, _ := createEdge( + lnwire.GossipVersion1, 201, 0, 0, 2, node1V1, node2V1, + ) + policyPrefV2, _ := createEdge( + lnwire.GossipVersion2, 201, 0, 0, 2, node1V2, node2V2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, policyPrefV1)) + require.NoError(t, graph.AddChannelEdge(ctx, policyPrefV2)) + + policyOnlyV1 := newEdgePolicy( + lnwire.GossipVersion1, policyPrefV1.ChannelID, 1000, true, + ) + policyOnlyV1.ToNode = node2V1.PubKeyBytes + policyOnlyV1.SigBytes = testSig.Serialize() + require.NoError(t, graph.UpdateEdgePolicy(ctx, policyOnlyV1)) + + versionPrefV1, _ := createEdge( + lnwire.GossipVersion1, 202, 0, 0, 3, node1V1, node2V1, + ) + versionPrefV2, _ := createEdge( + lnwire.GossipVersion2, 202, 0, 0, 3, node1V2, node2V2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, versionPrefV1)) + require.NoError(t, graph.AddChannelEdge(ctx, versionPrefV2)) + + versionPolicyV1 := newEdgePolicy( + lnwire.GossipVersion1, versionPrefV1.ChannelID, 1001, true, + ) + versionPolicyV1.ToNode = node2V1.PubKeyBytes + versionPolicyV1.SigBytes = testSig.Serialize() + require.NoError(t, graph.UpdateEdgePolicy(ctx, versionPolicyV1)) + + versionPolicyV2 := newEdgePolicy( + lnwire.GossipVersion2, versionPrefV2.ChannelID, 1002, true, + ) + versionPolicyV2.ToNode = node2V2.PubKeyBytes + versionPolicyV2.SigBytes = testSig.Serialize() + require.NoError(t, graph.UpdateEdgePolicy(ctx, versionPolicyV2)) + + shellPrefV1, _ := createEdge( + lnwire.GossipVersion1, 203, 0, 0, 4, node1V1, node2V1, + ) + shellPrefV2, _ := createEdge( + lnwire.GossipVersion2, 203, 0, 0, 4, node1V2, node2V2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, shellPrefV1)) + require.NoError(t, graph.AddChannelEdge(ctx, shellPrefV2)) + + type channelResult struct { + info *models.ChannelEdgeInfo + p1 *models.ChannelEdgePolicy + p2 *models.ChannelEdgePolicy + } + var chanCount int + channelsByID := make(map[uint64]channelResult) + err = store.ForEachChannel(ctx, func(info *models.ChannelEdgeInfo, + p1, p2 *models.ChannelEdgePolicy) error { + + channelsByID[info.ChannelID] = channelResult{ + info: info, + p1: p1, + p2: p2, + } + chanCount++ + + return nil + }, func() { + clear(channelsByID) + chanCount = 0 + }) + require.NoError(t, err) + require.Len(t, channelsByID, 4) + require.Equal(t, 4, chanCount, "unexpected duplicate channels") + + gotV1Only := channelsByID[v1Only.ChannelID] + require.Equal(t, lnwire.GossipVersion1, gotV1Only.info.Version) + require.Nil(t, gotV1Only.p1) + require.Nil(t, gotV1Only.p2) + + gotPolicyPref := channelsByID[policyPrefV1.ChannelID] + require.Equal(t, lnwire.GossipVersion1, gotPolicyPref.info.Version) + require.NotNil(t, gotPolicyPref.p1) + require.Nil(t, gotPolicyPref.p2) + + gotVersionPref := channelsByID[versionPrefV1.ChannelID] + require.Equal(t, lnwire.GossipVersion2, gotVersionPref.info.Version) + require.NotNil(t, gotVersionPref.p1) + + gotShellPref := channelsByID[shellPrefV1.ChannelID] + require.Equal(t, lnwire.GossipVersion2, gotShellPref.info.Version) + require.Nil(t, gotShellPref.p1) + require.Nil(t, gotShellPref.p2) +} diff --git a/graph/db/interfaces.go b/graph/db/interfaces.go index 367f2805510..1a172d9eb9b 100644 --- a/graph/db/interfaces.go +++ b/graph/db/interfaces.go @@ -95,11 +95,11 @@ type Store interface { //nolint:interfacebloat chans map[uint64]*DirectedChannel) error, reset func()) error - // ForEachNode iterates through all the stored vertices/nodes in the - // graph, executing the passed callback with each node encountered. If - // the callback returns an error, then the transaction is aborted and - // the iteration stops early. - ForEachNode(ctx context.Context, v lnwire.GossipVersion, + // ForEachNode iterates through all nodes in the graph across all + // gossip versions, yielding each unique node exactly once. The + // callback receives the best available Node (highest advertised + // version preferred, falling back to shell nodes). + ForEachNode(ctx context.Context, cb func(*models.Node) error, reset func()) error // ForEachNodeCacheable iterates through all the stored vertices/nodes @@ -162,21 +162,16 @@ type Store interface { //nolint:interfacebloat GraphSession(ctx context.Context, cb func(graph NodeTraverser) error, reset func()) error - // ForEachChannel iterates through all the channel edges stored within - // the graph and invokes the passed callback for each edge. The callback - // takes two edges as since this is a directed graph, both the in/out - // edges are visited. If the callback returns an error, then the - // transaction is aborted and the iteration stops early. - // - // NOTE: If an edge can't be found, or wasn't advertised, then a nil - // pointer for that particular channel edge routing policy will be - // passed into the callback. - // - // TODO(elle): add a cross-version iteration API and make this iterate - // over all versions. - ForEachChannel(ctx context.Context, v lnwire.GossipVersion, + // ForEachChannel iterates through all channel edges stored within the + // graph across all gossip versions, yielding each unique channel + // exactly once. The callback receives the edge info and both + // directional policies. When both versions are present, v2 is + // preferred. Nil pointers are passed for policies that haven't been + // advertised. + ForEachChannel(ctx context.Context, cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error, reset func()) error + *models.ChannelEdgePolicy) error, + reset func()) error // ForEachChannelCacheable iterates through all the channel edges stored // within the graph and invokes the passed callback for each edge. The diff --git a/graph/db/kv_store.go b/graph/db/kv_store.go index 3afb5204c64..138c619d8f2 100644 --- a/graph/db/kv_store.go +++ b/graph/db/kv_store.go @@ -408,13 +408,10 @@ func (c *KVStore) AddrsForNode(ctx context.Context, v lnwire.GossipVersion, // NOTE: If an edge can't be found, or wasn't advertised, then a nil pointer // for that particular channel edge routing policy will be passed into the // callback. -func (c *KVStore) ForEachChannel(_ context.Context, v lnwire.GossipVersion, +func (c *KVStore) ForEachChannel(_ context.Context, cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error, reset func()) error { - - if v != lnwire.GossipVersion1 { - return ErrVersionNotSupportedForKVDB - } + *models.ChannelEdgePolicy) error, + reset func()) error { return forEachChannel(c.db, cb, reset) } @@ -842,13 +839,9 @@ func (c *KVStore) DisabledChannelIDs( // early. // // NOTE: this is part of the Store interface. -func (c *KVStore) ForEachNode(_ context.Context, v lnwire.GossipVersion, +func (c *KVStore) ForEachNode(_ context.Context, cb func(*models.Node) error, reset func()) error { - if v != lnwire.GossipVersion1 { - return ErrVersionNotSupportedForKVDB - } - return forEachNode(c.db, func(tx kvdb.RTx, node *models.Node) error { diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 4ffd14bac47..4b452b928d5 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -54,7 +54,7 @@ type SQLQueries interface { GetNodesByLastUpdateRange(ctx context.Context, arg sqlc.GetNodesByLastUpdateRangeParams) ([]sqlc.GraphNode, error) GetNodesByBlockHeightRange(ctx context.Context, arg sqlc.GetNodesByBlockHeightRangeParams) ([]sqlc.GraphNode, error) GetPublicNodesByLastUpdateRange(ctx context.Context, arg sqlc.GetPublicNodesByLastUpdateRangeParams) ([]sqlc.GraphNode, error) - ListNodesPaginated(ctx context.Context, arg sqlc.ListNodesPaginatedParams) ([]sqlc.GraphNode, error) + ListPreferredNodesPaginated(ctx context.Context, arg sqlc.ListPreferredNodesPaginatedParams) ([]sqlc.ListPreferredNodesPaginatedRow, error) UpsertPreferredNode(ctx context.Context, pubKey []byte) error ListNodeIDsAndPubKeys(ctx context.Context, arg sqlc.ListNodeIDsAndPubKeysParams) ([]sqlc.ListNodeIDsAndPubKeysRow, error) IsPublicV1Node(ctx context.Context, pubKey []byte) (bool, error) @@ -106,6 +106,7 @@ type SQLQueries interface { ListChannelsByNodeID(ctx context.Context, arg sqlc.ListChannelsByNodeIDParams) ([]sqlc.ListChannelsByNodeIDRow, error) ListChannelsForNodeIDs(ctx context.Context, arg sqlc.ListChannelsForNodeIDsParams) ([]sqlc.ListChannelsForNodeIDsRow, error) ListChannelsWithPoliciesPaginated(ctx context.Context, arg sqlc.ListChannelsWithPoliciesPaginatedParams) ([]sqlc.ListChannelsWithPoliciesPaginatedRow, error) + ListPreferredChannelsPaginated(ctx context.Context, arg sqlc.ListPreferredChannelsPaginatedParams) ([]sqlc.ListPreferredChannelsPaginatedRow, error) UpsertPreferredChannel(ctx context.Context, scid []byte) error ListChannelsWithPoliciesForCachePaginated(ctx context.Context, arg sqlc.ListChannelsWithPoliciesForCachePaginatedParams) ([]sqlc.ListChannelsWithPoliciesForCachePaginatedRow, error) ListChannelsPaginated(ctx context.Context, arg sqlc.ListChannelsPaginatedParams) ([]sqlc.ListChannelsPaginatedRow, error) @@ -1156,17 +1157,12 @@ func (s *SQLStore) ForEachSourceNodeChannel(ctx context.Context, // early. // // NOTE: part of the Store interface. -func (s *SQLStore) ForEachNode(ctx context.Context, v lnwire.GossipVersion, +func (s *SQLStore) ForEachNode(ctx context.Context, cb func(node *models.Node) error, reset func()) error { return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { - return forEachNodePaginated( - ctx, s.cfg.QueryCfg, db, - v, func(_ context.Context, _ int64, - node *models.Node) error { - - return cb(node) - }, + return forEachPreferredNodePaginated( + ctx, s.cfg.QueryCfg, db, cb, ) }, reset) } @@ -2034,16 +2030,14 @@ func (s *SQLStore) ForEachChannelCacheable(ctx context.Context, // // NOTE: part of the Store interface. func (s *SQLStore) ForEachChannel(ctx context.Context, - v lnwire.GossipVersion, cb func(*models.ChannelEdgeInfo, - *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error, + cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, + *models.ChannelEdgePolicy) error, reset func()) error { - if !isKnownGossipVersion(v) { - return fmt.Errorf("unsupported gossip version: %d", v) - } - return s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { - return forEachChannelWithPolicies(ctx, db, s.cfg, v, cb) + return forEachPreferredChannelWithPolicies( + ctx, db, s.cfg, cb, + ) }, reset) } @@ -4319,7 +4313,6 @@ func (s *sqlNodeTraverser) ForEachNodeDirectedChannel( ctx, s.db, lnwire.GossipVersion1, nodePub, cb, ) } - // FetchNodeFeatures returns the features of the given node. If the node is // unknown, assume no additional features are supported. // @@ -6431,6 +6424,54 @@ func extractChannelPolicies(row any) (*sqlc.GraphChannelPolicy, return policy1, policy2, nil + case sqlc.ListPreferredChannelsPaginatedRow: + if r.Policy1ID.Valid { + policy1 = &sqlc.GraphChannelPolicy{ + ID: r.Policy1ID.Int64, + Version: r.Policy1Version.Int16, + ChannelID: r.GraphChannel.ID, + NodeID: r.Policy1NodeID.Int64, + Timelock: r.Policy1Timelock.Int32, + FeePpm: r.Policy1FeePpm.Int64, + BaseFeeMsat: r.Policy1BaseFeeMsat.Int64, + MinHtlcMsat: r.Policy1MinHtlcMsat.Int64, + MaxHtlcMsat: r.Policy1MaxHtlcMsat, + LastUpdate: r.Policy1LastUpdate, + InboundBaseFeeMsat: r.Policy1InboundBaseFeeMsat, + InboundFeeRateMilliMsat: r.Policy1InboundFeeRateMilliMsat, + Disabled: r.Policy1Disabled, + MessageFlags: r.Policy1MessageFlags, + ChannelFlags: r.Policy1ChannelFlags, + Signature: r.Policy1Signature, + BlockHeight: r.Policy1BlockHeight, + DisableFlags: r.Policy1DisableFlags, + } + } + if r.Policy2ID.Valid { + policy2 = &sqlc.GraphChannelPolicy{ + ID: r.Policy2ID.Int64, + Version: r.Policy2Version.Int16, + ChannelID: r.GraphChannel.ID, + NodeID: r.Policy2NodeID.Int64, + Timelock: r.Policy2Timelock.Int32, + FeePpm: r.Policy2FeePpm.Int64, + BaseFeeMsat: r.Policy2BaseFeeMsat.Int64, + MinHtlcMsat: r.Policy2MinHtlcMsat.Int64, + MaxHtlcMsat: r.Policy2MaxHtlcMsat, + LastUpdate: r.Policy2LastUpdate, + InboundBaseFeeMsat: r.Policy2InboundBaseFeeMsat, + InboundFeeRateMilliMsat: r.Policy2InboundFeeRateMilliMsat, + Disabled: r.Policy2Disabled, + MessageFlags: r.Policy2MessageFlags, + ChannelFlags: r.Policy2ChannelFlags, + Signature: r.Policy2Signature, + BlockHeight: r.Policy2BlockHeight, + DisableFlags: r.Policy2DisableFlags, + } + } + + return policy1, policy2, nil + case sqlc.ListChannelsWithPoliciesPaginatedRow: if r.Policy1ID.Valid { policy1 = &sqlc.GraphChannelPolicy{ @@ -7000,32 +7041,32 @@ func batchLoadChannelPolicyExtrasHelper(ctx context.Context, ) } -// forEachNodePaginated executes a paginated query to process each node in the -// graph. It uses the provided SQLQueries interface to fetch nodes in batches -// and applies the provided processNode function to each node. -func forEachNodePaginated(ctx context.Context, cfg *sqldb.QueryConfig, - db SQLQueries, protocol lnwire.GossipVersion, - processNode func(context.Context, int64, - *models.Node) error) error { +// forEachPreferredNodePaginated executes a paginated query that yields one +// preferred node per pubkey across all gossip versions. +func forEachPreferredNodePaginated(ctx context.Context, cfg *sqldb.QueryConfig, + db SQLQueries, processNode func(*models.Node) error) error { - pageQueryFunc := func(ctx context.Context, lastID int64, - limit int32) ([]sqlc.GraphNode, error) { + pageQueryFunc := func(ctx context.Context, cursor []byte, + limit int32) ([]sqlc.ListPreferredNodesPaginatedRow, error) { - return db.ListNodesPaginated( - ctx, sqlc.ListNodesPaginatedParams{ - Version: int16(protocol), - ID: lastID, - Limit: limit, + return db.ListPreferredNodesPaginated( + ctx, sqlc.ListPreferredNodesPaginatedParams{ + PubKey: cursor, + Limit: limit, }, ) } - extractPageCursor := func(node sqlc.GraphNode) int64 { - return node.ID + extractPageCursor := func( + row sqlc.ListPreferredNodesPaginatedRow) []byte { + + return row.GraphNode.PubKey } - collectFunc := func(node sqlc.GraphNode) (int64, error) { - return node.ID, nil + collectFunc := func( + row sqlc.ListPreferredNodesPaginatedRow) (int64, error) { + + return row.GraphNode.ID, nil } batchQueryFunc := func(ctx context.Context, @@ -7034,29 +7075,32 @@ func forEachNodePaginated(ctx context.Context, cfg *sqldb.QueryConfig, return batchLoadNodeData(ctx, cfg, db, nodeIDs) } - processItem := func(ctx context.Context, dbNode sqlc.GraphNode, + processItem := func(_ context.Context, + row sqlc.ListPreferredNodesPaginatedRow, batchData *batchNodeData) error { + dbNode := row.GraphNode node, err := buildNodeWithBatchData(dbNode, batchData) if err != nil { - return fmt.Errorf("unable to build "+ - "node(id=%d): %w", dbNode.ID, err) + return fmt.Errorf("unable to build node(id=%d): %w", + dbNode.ID, err) } - return processNode(ctx, dbNode.ID, node) + return processNode(node) } return sqldb.ExecuteCollectAndBatchWithSharedDataQuery( - ctx, cfg, int64(-1), pageQueryFunc, extractPageCursor, + ctx, cfg, []byte{}, pageQueryFunc, extractPageCursor, collectFunc, batchQueryFunc, processItem, ) } -// forEachChannelWithPolicies executes a paginated query to process each channel -// with policies in the graph. -func forEachChannelWithPolicies(ctx context.Context, db SQLQueries, - cfg *SQLStoreConfig, v lnwire.GossipVersion, - processChannel func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, +// forEachPreferredChannelWithPolicies executes a paginated query that yields +// one preferred channel per SCID across all gossip versions. +func forEachPreferredChannelWithPolicies(ctx context.Context, + db SQLQueries, cfg *SQLStoreConfig, + processChannel func(*models.ChannelEdgeInfo, + *models.ChannelEdgePolicy, *models.ChannelEdgePolicy) error) error { type channelBatchIDs struct { @@ -7064,33 +7108,32 @@ func forEachChannelWithPolicies(ctx context.Context, db SQLQueries, policyIDs []int64 } - pageQueryFunc := func(ctx context.Context, lastID int64, - limit int32) ([]sqlc.ListChannelsWithPoliciesPaginatedRow, + pageQueryFunc := func(ctx context.Context, cursor []byte, + limit int32) ([]sqlc.ListPreferredChannelsPaginatedRow, error) { - return db.ListChannelsWithPoliciesPaginated( - ctx, sqlc.ListChannelsWithPoliciesPaginatedParams{ - Version: int16(v), - ID: lastID, - Limit: limit, + return db.ListPreferredChannelsPaginated( + ctx, sqlc.ListPreferredChannelsPaginatedParams{ + Scid: cursor, + Limit: limit, }, ) } extractPageCursor := func( - row sqlc.ListChannelsWithPoliciesPaginatedRow) int64 { + row sqlc.ListPreferredChannelsPaginatedRow) []byte { - return row.GraphChannel.ID + return row.GraphChannel.Scid } - collectFunc := func(row sqlc.ListChannelsWithPoliciesPaginatedRow) ( + collectFunc := func( + row sqlc.ListPreferredChannelsPaginatedRow) ( channelBatchIDs, error) { ids := channelBatchIDs{ channelID: row.GraphChannel.ID, } - // Extract policy IDs from the row. dbPol1, dbPol2, err := extractChannelPolicies(row) if err != nil { return ids, err @@ -7126,7 +7169,7 @@ func forEachChannelWithPolicies(ctx context.Context, db SQLQueries, } processItem := func(ctx context.Context, - row sqlc.ListChannelsWithPoliciesPaginatedRow, + row sqlc.ListPreferredChannelsPaginatedRow, batchData *batchChannelData) error { node1, node2, err := buildNodeVertices( @@ -7161,7 +7204,7 @@ func forEachChannelWithPolicies(ctx context.Context, db SQLQueries, } return sqldb.ExecuteCollectAndBatchWithSharedDataQuery( - ctx, cfg.QueryCfg, int64(-1), pageQueryFunc, extractPageCursor, + ctx, cfg.QueryCfg, []byte{}, pageQueryFunc, extractPageCursor, collectFunc, batchDataFunc, processItem, ) } diff --git a/sqldb/sqlc/graph.sql.go b/sqldb/sqlc/graph.sql.go index 61a769c24d6..48d0f3284e6 100644 --- a/sqldb/sqlc/graph.sql.go +++ b/sqldb/sqlc/graph.sql.go @@ -4171,52 +4171,6 @@ func (q *Queries) ListNodeIDsAndPubKeys(ctx context.Context, arg ListNodeIDsAndP return items, nil } -const listNodesPaginated = `-- name: ListNodesPaginated :many -SELECT id, version, pub_key, alias, last_update, color, signature, block_height -FROM graph_nodes -WHERE version = $1 AND id > $2 -ORDER BY id -LIMIT $3 -` - -type ListNodesPaginatedParams struct { - Version int16 - ID int64 - Limit int32 -} - -func (q *Queries) ListNodesPaginated(ctx context.Context, arg ListNodesPaginatedParams) ([]GraphNode, error) { - rows, err := q.db.QueryContext(ctx, listNodesPaginated, arg.Version, arg.ID, arg.Limit) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GraphNode - for rows.Next() { - var i GraphNode - if err := rows.Scan( - &i.ID, - &i.Version, - &i.PubKey, - &i.Alias, - &i.LastUpdate, - &i.Color, - &i.Signature, - &i.BlockHeight, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - const listPreferredChannelsPaginated = `-- name: ListPreferredChannelsPaginated :many SELECT c.id, c.version, c.scid, c.node_id_1, c.node_id_2, c.outpoint, c.capacity, c.bitcoin_key_1, c.bitcoin_key_2, c.node_1_signature, c.node_2_signature, c.bitcoin_1_signature, c.bitcoin_2_signature, c.signature, c.funding_pk_script, c.merkle_root_hash, diff --git a/sqldb/sqlc/querier.go b/sqldb/sqlc/querier.go index 6b8e0ad2181..a49cb6d6b18 100644 --- a/sqldb/sqlc/querier.go +++ b/sqldb/sqlc/querier.go @@ -230,7 +230,6 @@ type Querier interface { ListChannelsWithPoliciesForCachePaginated(ctx context.Context, arg ListChannelsWithPoliciesForCachePaginatedParams) ([]ListChannelsWithPoliciesForCachePaginatedRow, error) ListChannelsWithPoliciesPaginated(ctx context.Context, arg ListChannelsWithPoliciesPaginatedParams) ([]ListChannelsWithPoliciesPaginatedRow, error) ListNodeIDsAndPubKeys(ctx context.Context, arg ListNodeIDsAndPubKeysParams) ([]ListNodeIDsAndPubKeysRow, error) - ListNodesPaginated(ctx context.Context, arg ListNodesPaginatedParams) ([]GraphNode, error) ListPreferredChannelsPaginated(ctx context.Context, arg ListPreferredChannelsPaginatedParams) ([]ListPreferredChannelsPaginatedRow, error) ListPreferredNodesPaginated(ctx context.Context, arg ListPreferredNodesPaginatedParams) ([]ListPreferredNodesPaginatedRow, error) NextInvoiceSettleIndex(ctx context.Context) (int64, error) diff --git a/sqldb/sqlc/queries/graph.sql b/sqldb/sqlc/queries/graph.sql index fef054d0c39..9d55e2fecde 100644 --- a/sqldb/sqlc/queries/graph.sql +++ b/sqldb/sqlc/queries/graph.sql @@ -73,13 +73,6 @@ FROM graph_nodes WHERE pub_key = $1 AND version = $2; --- name: ListNodesPaginated :many -SELECT * -FROM graph_nodes -WHERE version = $1 AND id > $2 -ORDER BY id -LIMIT $3; - -- name: ListPreferredNodesPaginated :many SELECT sqlc.embed(n) FROM graph_preferred_nodes pn From a09a61573242e811764d5044fb383afdd9517abb Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 7 Apr 2026 11:09:27 +0545 Subject: [PATCH 06/10] graph/db: implement cross-version node traversal Update ForEachNodeDirectedChannel and FetchNodeFeatures to work across gossip versions. ForEachNodeDirectedChannel now checks for a preferHighestNodeDirectedChanneler interface on the store (implemented by SQLStore) to stream cross-version directed channels in a single paginated query against the preferred-channel mapping table. The fallback path iterates v2 then v1, buffering results to deduplicate by SCID. FetchNodeFeatures tries v2 first, falling back to v1 when v2 features are empty. The cache population comment is updated to note that v1-then-v2 iteration order naturally gives v2 precedence via key collision overwrite. --- graph/db/graph.go | 41 +++++++++++++---- graph/db/graph_test.go | 102 +++++++++++++++++++++++++++++++++++++++++ graph/db/sql_store.go | 1 + 3 files changed, 134 insertions(+), 10 deletions(-) diff --git a/graph/db/graph.go b/graph/db/graph.go index 169f48bb14f..c2bc1626ca2 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -238,12 +238,21 @@ func (c *ChannelGraph) populateCache(ctx context.Context) error { for _, v := range []lnwire.GossipVersion{ gossipV1, gossipV2, } { - // TODO(elle): If we have both v1 and v2 entries for the same - // node/channel, prefer v2 when merging. + // We iterate v1 first, then v2. AddNodeFeatures and AddChannel + // overwrite on key collision, so v2 data takes precedence when + // both versions exist. For features specifically we + // additionally skip empty v2 entries so they don't shadow a + // non-empty v1 feature set; this matches the no-cache + // FetchNodeFeatures fallback rule that a non-empty + // lower-version vector wins over an empty higher-version one. err := c.db.ForEachNodeCacheable(ctx, v, func(node route.Vertex, features *lnwire.FeatureVector) error { + if v == gossipV2 && features.IsEmpty() { + return nil + } + cache.AddNodeFeatures(node, features) return nil @@ -299,9 +308,9 @@ func (c *ChannelGraph) ForEachNodeDirectedChannel(ctx context.Context, return c.cache.graphCache.ForEachChannel(node, cb) } - // TODO(elle): once the no-cache path needs to support - // pathfinding across gossip versions, this should iterate - // across all versions rather than defaulting to v1. + // Without the in-memory cache, traversal stays version-scoped. Both + // SQL and KV stores support v1 adjacency reads here; cross-version + // merging is reserved for the cache-backed path above. return c.db.ForEachNodeDirectedChannel( ctx, gossipV1, node, cb, reset, ) @@ -320,7 +329,22 @@ func (c *ChannelGraph) FetchNodeFeatures(ctx context.Context, return c.cache.graphCache.GetFeatures(node), nil } - return c.db.FetchNodeFeatures(ctx, lnwire.GossipVersion1, node) + // Try v2 first, fall back to v1 if the v2 features are empty. + for _, v := range []lnwire.GossipVersion{gossipV2, gossipV1} { + features, err := c.db.FetchNodeFeatures(ctx, v, node) + if errors.Is(err, ErrVersionNotSupportedForKVDB) { + continue + } + if err != nil { + return nil, err + } + + if !features.IsEmpty() { + return features, nil + } + } + + return lnwire.EmptyFeatureVector(), nil } // GraphSession will provide the call-back with access to a NodeTraverser @@ -986,7 +1010,7 @@ func (c *VersionedGraph) ChannelView(ctx context.Context) ([]EdgePoint, // for performing queries against the channel graph. If the graph cache is // enabled, the callback receives the VersionedGraph directly (which implements // NodeTraverser using the cache). Otherwise a read-only database session is -// used. +// used, and that session remains v1-only for now. func (c *VersionedGraph) GraphSession(ctx context.Context, cb func(graph NodeTraverser) error, reset func()) error { @@ -994,9 +1018,6 @@ func (c *VersionedGraph) GraphSession(ctx context.Context, return cb(c) } - // TODO(elle): the underlying GraphSession currently creates a - // NodeTraverser that is hardcoded to GossipVersion1. This needs to be - // updated to pass the version through for v2 support. return c.db.GraphSession(ctx, cb, reset) } diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index a111e230f55..8621acaaf11 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -7051,6 +7051,108 @@ func TestDeleteNodePreferredRecomputation(t *testing.T) { "node should be gone after deleting all versions") } +// TestPreferredNodeTraversal verifies that ChannelGraph's +// ForEachNodeDirectedChannel and FetchNodeFeatures correctly prefer v2 over v1 +// when the graph cache is disabled (exercising the no-cache code paths). +func TestPreferredNodeTraversal(t *testing.T) { + t.Parallel() + + if !isSQLDB { + t.Skip("preferred lookup requires SQL backend") + } + + ctx := t.Context() + + // Disable the cache so we exercise the no-cache code paths in + // ChannelGraph.ForEachNodeDirectedChannel and FetchNodeFeatures. + graph := MakeTestGraph(t, WithUseGraphCache(false)) + + // --- FetchNodeFeatures --- + + // Create a v1-only node and verify its features are returned. + privV1, err := btcec.NewPrivateKey() + require.NoError(t, err) + + nodeV1 := createNode(t, lnwire.GossipVersion1, privV1) + require.NoError(t, graph.AddNode(ctx, nodeV1)) + + features, err := graph.FetchNodeFeatures(ctx, nodeV1.PubKeyBytes) + require.NoError(t, err) + require.False(t, features.IsEmpty(), + "v1-only node should have features") + + // Create a v2-only node and verify its features are returned + // (exercises the v2 fallback). + privV2, err := btcec.NewPrivateKey() + require.NoError(t, err) + + nodeV2 := createNode(t, lnwire.GossipVersion2, privV2) + require.NoError(t, graph.AddNode(ctx, nodeV2)) + + features, err = graph.FetchNodeFeatures(ctx, nodeV2.PubKeyBytes) + require.NoError(t, err) + require.False(t, features.IsEmpty(), + "v2-only node should have features") + + // Create a node with both v1 and v2 announcements. + privBoth, err := btcec.NewPrivateKey() + require.NoError(t, err) + + nodeBothV1 := createNode(t, lnwire.GossipVersion1, privBoth) + v1Features := lnwire.NewFeatureVector( + lnwire.NewRawFeatureVector(lnwire.GossipQueriesRequired), + lnwire.Features, + ) + nodeBothV1.Features = v1Features + require.NoError(t, graph.AddNode(ctx, nodeBothV1)) + + nodeBothV2 := createNode(t, lnwire.GossipVersion2, privBoth) + v2Features := lnwire.NewFeatureVector( + lnwire.NewRawFeatureVector(lnwire.TLVOnionPayloadRequired), + lnwire.Features, + ) + nodeBothV2.Features = v2Features + require.NoError(t, graph.AddNode(ctx, nodeBothV2)) + + features, err = graph.FetchNodeFeatures( + ctx, nodeBothV1.PubKeyBytes, + ) + require.NoError(t, err) + require.Equal(t, v2Features, features) + require.NotEqual(t, v1Features, features) + + // --- ForEachNodeDirectedChannel --- + + // Add a v1 channel between nodeV1 and nodeBothV1. + edge, _ := createEdge( + lnwire.GossipVersion1, 100, 0, 0, 0, + nodeV1, nodeBothV1, + ) + require.NoError(t, graph.AddChannelEdge(ctx, edge)) + + pol := newEdgePolicy( + lnwire.GossipVersion1, edge.ChannelID, 1000, true, + ) + pol.ToNode = nodeBothV1.PubKeyBytes + pol.SigBytes = testSig.Serialize() + require.NoError(t, graph.UpdateEdgePolicy(ctx, pol)) + + // ForEachNodeDirectedChannel should find the channel. + var foundChannels int + err = graph.ForEachNodeDirectedChannel( + ctx, nodeV1.PubKeyBytes, + func(_ *DirectedChannel) error { + foundChannels++ + return nil + }, func() { + foundChannels = 0 + }, + ) + require.NoError(t, err) + require.Equal(t, 1, foundChannels, + "expected 1 channel for v1 node") +} + // TestPreferredForEachNode verifies that SQLStore.ForEachNode returns one // node per pubkey, preferring the highest announced version and otherwise // falling back to the highest-version shell node. diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index 4b452b928d5..a0d51bba369 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -4313,6 +4313,7 @@ func (s *sqlNodeTraverser) ForEachNodeDirectedChannel( ctx, s.db, lnwire.GossipVersion1, nodePub, cb, ) } + // FetchNodeFeatures returns the features of the given node. If the node is // unknown, assume no additional features are supported. // From 9ec310b7f19bee23a058c5c0b53e889e1c594100 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 7 May 2026 14:43:57 -0700 Subject: [PATCH 07/10] graph/db: move ForEachNode/ForEachChannel off VersionedGraph MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit VersionedGraph wraps ChannelGraph for callers that want to operate against a specific gossip version. After the cross-version refactor, the ForEachNode and ForEachChannel methods on VersionedGraph silently ignored c.v and iterated across all versions — a surprising behaviour for a wrapper whose whole purpose is version-scoping. Move the cross-version iteration to ChannelGraph (where it belongs) and drop the foot-gun overrides from VersionedGraph. *VersionedGraph continues to expose these via the embedded *ChannelGraph, but now they are explicitly methods of the version-agnostic type. Add ChannelGraph.ForEachNode mirroring ChannelGraph.ForEachChannel, and update the only non-embedded callsite (rpcserver describeGraph) to take ChannelGraph directly. The two remaining test helpers also switch to *ChannelGraph since they only ever wanted a node count. --- graph/db/benchmark_test.go | 3 +-- graph/db/graph.go | 28 ++++++++-------------------- graph/db/graph_test.go | 3 +-- rpcserver.go | 9 ++++----- 4 files changed, 14 insertions(+), 29 deletions(-) diff --git a/graph/db/benchmark_test.go b/graph/db/benchmark_test.go index bcadb8c2cf0..2076ccd1ba5 100644 --- a/graph/db/benchmark_test.go +++ b/graph/db/benchmark_test.go @@ -348,8 +348,7 @@ func TestPopulateDBs(t *testing.T) { // graph. countNodes := func(graph *ChannelGraph) int { numNodes := 0 - v1Graph := NewVersionedGraph(graph, lnwire.GossipVersion1) - err := v1Graph.ForEachNode( + err := graph.ForEachNode( ctx, func(node *models.Node) error { numNodes++ diff --git a/graph/db/graph.go b/graph/db/graph.go index c2bc1626ca2..63ffdac07d5 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -713,6 +713,14 @@ func (c *ChannelGraph) ForEachChannel(ctx context.Context, return c.db.ForEachChannel(ctx, cb, reset) } +// ForEachNode iterates through all stored vertices/nodes in the graph across +// all gossip versions. +func (c *ChannelGraph) ForEachNode(ctx context.Context, + cb func(*models.Node) error, reset func()) error { + + return c.db.ForEachNode(ctx, cb, reset) +} + // DisabledChannelIDs returns the channel ids of disabled channels. func (c *ChannelGraph) DisabledChannelIDs(ctx context.Context, v lnwire.GossipVersion) ( @@ -965,16 +973,6 @@ func (c *VersionedGraph) ForEachNodeCached(ctx context.Context, return c.ChannelGraph.ForEachNodeCached(ctx, c.v, withAddrs, cb, reset) } -// ForEachNode iterates through all stored vertices/nodes in the graph across -// all gossip versions, returning the preferred version for each pub_key. Note -// that this intentionally ignores c.v — cross-version iteration is the desired -// behaviour for callers that enumerate graph topology. -func (c *VersionedGraph) ForEachNode(ctx context.Context, - cb func(*models.Node) error, reset func()) error { - - return c.db.ForEachNode(ctx, cb, reset) -} - // NumZombies returns the current number of zombie channels in the graph. func (c *VersionedGraph) NumZombies(ctx context.Context) (uint64, error) { return c.db.NumZombies(ctx, c.v) @@ -1145,16 +1143,6 @@ func (c *VersionedGraph) ForEachNodeChannel(ctx context.Context, return c.db.ForEachNodeChannel(ctx, c.v, nodePub, cb, reset) } -// ForEachChannel iterates through all channel edges stored within the graph -// across all gossip versions, returning the preferred version for each SCID. -// See ForEachNode for the rationale on ignoring c.v. -func (c *VersionedGraph) ForEachChannel(ctx context.Context, - cb func(*models.ChannelEdgeInfo, *models.ChannelEdgePolicy, - *models.ChannelEdgePolicy) error, reset func()) error { - - return c.db.ForEachChannel(ctx, cb, reset) -} - // ForEachNodeCacheable iterates through all stored vertices/nodes in the graph. func (c *VersionedGraph) ForEachNodeCacheable(ctx context.Context, cb func(route.Vertex, *lnwire.FeatureVector) error, diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 8621acaaf11..0332be06b9b 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -2196,8 +2196,7 @@ func assertNumChans(t *testing.T, graph *ChannelGraph, n int) { func assertNumNodes(t *testing.T, graph *ChannelGraph, n int) { numNodes := 0 - v1Graph := NewVersionedGraph(graph, lnwire.GossipVersion1) - err := v1Graph.ForEachNode(t.Context(), func(_ *models.Node) error { + err := graph.ForEachNode(t.Context(), func(_ *models.Node) error { numNodes++ return nil diff --git a/rpcserver.go b/rpcserver.go index d4a9ce51bb5..4d43180fe8c 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -7000,11 +7000,10 @@ func (r *rpcServer) DescribeGraph(ctx context.Context, } } - // Obtain the pointer to the V1 channel graph. This will provide a - // consistent view of the graph due to bolt db's transactional model. - // - // TODO(elle): switch to a cross-version graph view when available. - graph := r.server.v1Graph + // Obtain the pointer to the cross-version channel graph. This will + // provide a consistent view of the graph due to bolt db's + // transactional model. + graph := r.server.graphDB // First iterate through all the known nodes (connected or unconnected // within the graph), collating their current state into the RPC From 890b5e9f7d16dfad34fd3934d9ad316404dcef82 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 7 May 2026 15:04:12 -0700 Subject: [PATCH 08/10] graph/db: thread version through ForEachNodeCached channel lookup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SQLStore.ForEachNodeCached takes a gossip version and uses it correctly when paginating through nodes, but the inner ListChannelsForNodeIDs call hardcoded GossipVersion1 instead of forwarding the requested version. Calling ForEachNodeCached(ctx, GossipVersion2, ...) therefore returned v2 nodes paired with v1 channels — silently inconsistent data. The bug was latent because every existing caller happens to request v1, but it would surface as soon as any v2-scoped caller appears. Add a regression test that creates a v2-only channel between two nodes that exist under both versions and asserts that ForEachNodeCached(ctx, GossipVersion2, ...) reports the channel for each endpoint. --- graph/db/graph_test.go | 62 ++++++++++++++++++++++++++++++++++++++++++ graph/db/sql_store.go | 2 +- 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 0332be06b9b..adc9bf0aa8b 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -7358,3 +7358,65 @@ func TestPreferredForEachChannel(t *testing.T) { require.Nil(t, gotShellPref.p1) require.Nil(t, gotShellPref.p2) } + +// TestForEachNodeCachedHonoursVersion verifies that +// SQLStore.ForEachNodeCached uses the requested gossip version when looking +// up channels for the iterated nodes, rather than silently falling back to +// v1 channels for v2 nodes. +func TestForEachNodeCachedHonoursVersion(t *testing.T) { + t.Parallel() + + if !isSQLDB { + t.Skip("test only meaningful for SQL backend") + } + + ctx := t.Context() + graph := MakeTestGraph(t, WithUseGraphCache(false)) + + // Create two nodes that exist under both v1 and v2. + priv1, err := btcec.NewPrivateKey() + require.NoError(t, err) + priv2, err := btcec.NewPrivateKey() + require.NoError(t, err) + + require.NoError(t, graph.AddNode( + ctx, createNode(t, lnwire.GossipVersion1, priv1), + )) + require.NoError(t, graph.AddNode( + ctx, createNode(t, lnwire.GossipVersion1, priv2), + )) + node1V2 := createNode(t, lnwire.GossipVersion2, priv1) + node2V2 := createNode(t, lnwire.GossipVersion2, priv2) + require.NoError(t, graph.AddNode(ctx, node1V2)) + require.NoError(t, graph.AddNode(ctx, node2V2)) + + // Add a channel only under v2 between the two nodes. If + // ForEachNodeCached honours its version parameter, callers asking for + // v2 nodes must see this channel; if it instead hardcodes v1 in the + // channel lookup, the channel will be missing. + edgeV2, _ := createEdge( + lnwire.GossipVersion2, 100, 1, 0, 1, node1V2, node2V2, + ) + require.NoError(t, graph.AddChannelEdge(ctx, edgeV2)) + + store := graph.db + chansSeen := 0 + err = store.ForEachNodeCached( + ctx, lnwire.GossipVersion2, false, + func(_ context.Context, n route.Vertex, _ []net.Addr, + chans map[uint64]*DirectedChannel) error { + + if n == node1V2.PubKeyBytes || + n == node2V2.PubKeyBytes { + + chansSeen += len(chans) + } + + return nil + }, func() { chansSeen = 0 }, + ) + require.NoError(t, err) + require.Equal(t, 2, chansSeen, + "v2 ForEachNodeCached should report the v2 channel for "+ + "each endpoint") +} diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index a0d51bba369..961c98ab394 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -1786,7 +1786,7 @@ func (s *SQLStore) ForEachNodeCached(ctx context.Context, // page. allChannels, err := db.ListChannelsForNodeIDs( ctx, sqlc.ListChannelsForNodeIDsParams{ - Version: int16(lnwire.GossipVersion1), + Version: int16(v), Node1Ids: nodeIDs, Node2Ids: nodeIDs, }, From 00d6ce979d1ed78d52670e89b6248e32b1d23d6b Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 7 May 2026 15:12:16 -0700 Subject: [PATCH 09/10] graph/db: simplify no-cache fallback paths to v1-only The in-memory graph cache is only ever disabled on the bbolt KV backend in production, so the no-cache fallback paths in ChannelGraph.ForEachNodeDirectedChannel, ChannelGraph.FetchNodeFeatures and VersionedGraph.GraphSession will only run against a v1-only store. The v2-then-v1 loop in FetchNodeFeatures is therefore dead code in production, and the comments framing v1 as a temporary choice are misleading. Collapse FetchNodeFeatures to a direct v1 call, drop the "version-scoped"/"for now" comment language, and trim the TestPreferredNodeTraversal cases that were exercising the now-removed v2 fallback. Rename the test to TestNoCacheNodeTraversal to reflect what it actually covers (the v1 KV path). --- graph/db/graph.go | 26 +++---------- graph/db/graph_test.go | 84 +++++++++++------------------------------- 2 files changed, 28 insertions(+), 82 deletions(-) diff --git a/graph/db/graph.go b/graph/db/graph.go index 63ffdac07d5..a19487b75f5 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -308,9 +308,8 @@ func (c *ChannelGraph) ForEachNodeDirectedChannel(ctx context.Context, return c.cache.graphCache.ForEachChannel(node, cb) } - // Without the in-memory cache, traversal stays version-scoped. Both - // SQL and KV stores support v1 adjacency reads here; cross-version - // merging is reserved for the cache-backed path above. + // The no-cache path only runs against the KV backend, which is + // v1-only. return c.db.ForEachNodeDirectedChannel( ctx, gossipV1, node, cb, reset, ) @@ -329,22 +328,9 @@ func (c *ChannelGraph) FetchNodeFeatures(ctx context.Context, return c.cache.graphCache.GetFeatures(node), nil } - // Try v2 first, fall back to v1 if the v2 features are empty. - for _, v := range []lnwire.GossipVersion{gossipV2, gossipV1} { - features, err := c.db.FetchNodeFeatures(ctx, v, node) - if errors.Is(err, ErrVersionNotSupportedForKVDB) { - continue - } - if err != nil { - return nil, err - } - - if !features.IsEmpty() { - return features, nil - } - } - - return lnwire.EmptyFeatureVector(), nil + // The no-cache path only runs against the KV backend, which is + // v1-only. + return c.db.FetchNodeFeatures(ctx, gossipV1, node) } // GraphSession will provide the call-back with access to a NodeTraverser @@ -1008,7 +994,7 @@ func (c *VersionedGraph) ChannelView(ctx context.Context) ([]EdgePoint, // for performing queries against the channel graph. If the graph cache is // enabled, the callback receives the VersionedGraph directly (which implements // NodeTraverser using the cache). Otherwise a read-only database session is -// used, and that session remains v1-only for now. +// used; the no-cache path only runs against the KV backend, which is v1-only. func (c *VersionedGraph) GraphSession(ctx context.Context, cb func(graph NodeTraverser) error, reset func()) error { diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index adc9bf0aa8b..900c7211a7d 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -7050,98 +7050,58 @@ func TestDeleteNodePreferredRecomputation(t *testing.T) { "node should be gone after deleting all versions") } -// TestPreferredNodeTraversal verifies that ChannelGraph's -// ForEachNodeDirectedChannel and FetchNodeFeatures correctly prefer v2 over v1 -// when the graph cache is disabled (exercising the no-cache code paths). -func TestPreferredNodeTraversal(t *testing.T) { +// TestNoCacheNodeTraversal verifies that the no-cache fallback paths of +// ChannelGraph.FetchNodeFeatures and ChannelGraph.ForEachNodeDirectedChannel +// return v1 data correctly. The no-cache path is only ever reached on the KV +// backend in production (which is v1-only), so cross-version semantics are +// the cache path's job, not this one's. +func TestNoCacheNodeTraversal(t *testing.T) { t.Parallel() if !isSQLDB { - t.Skip("preferred lookup requires SQL backend") + t.Skip("test only meaningful for SQL backend") } ctx := t.Context() - // Disable the cache so we exercise the no-cache code paths in - // ChannelGraph.ForEachNodeDirectedChannel and FetchNodeFeatures. + // Disable the cache so we exercise the no-cache code paths. graph := MakeTestGraph(t, WithUseGraphCache(false)) - // --- FetchNodeFeatures --- - - // Create a v1-only node and verify its features are returned. - privV1, err := btcec.NewPrivateKey() - require.NoError(t, err) - - nodeV1 := createNode(t, lnwire.GossipVersion1, privV1) - require.NoError(t, graph.AddNode(ctx, nodeV1)) - - features, err := graph.FetchNodeFeatures(ctx, nodeV1.PubKeyBytes) + priv1, err := btcec.NewPrivateKey() require.NoError(t, err) - require.False(t, features.IsEmpty(), - "v1-only node should have features") - - // Create a v2-only node and verify its features are returned - // (exercises the v2 fallback). - privV2, err := btcec.NewPrivateKey() + priv2, err := btcec.NewPrivateKey() require.NoError(t, err) - nodeV2 := createNode(t, lnwire.GossipVersion2, privV2) - require.NoError(t, graph.AddNode(ctx, nodeV2)) + node1 := createNode(t, lnwire.GossipVersion1, priv1) + node2 := createNode(t, lnwire.GossipVersion1, priv2) + require.NoError(t, graph.AddNode(ctx, node1)) + require.NoError(t, graph.AddNode(ctx, node2)) - features, err = graph.FetchNodeFeatures(ctx, nodeV2.PubKeyBytes) + // FetchNodeFeatures should return the node's v1 features. + features, err := graph.FetchNodeFeatures(ctx, node1.PubKeyBytes) require.NoError(t, err) require.False(t, features.IsEmpty(), - "v2-only node should have features") + "v1 node should have features") - // Create a node with both v1 and v2 announcements. - privBoth, err := btcec.NewPrivateKey() - require.NoError(t, err) - - nodeBothV1 := createNode(t, lnwire.GossipVersion1, privBoth) - v1Features := lnwire.NewFeatureVector( - lnwire.NewRawFeatureVector(lnwire.GossipQueriesRequired), - lnwire.Features, - ) - nodeBothV1.Features = v1Features - require.NoError(t, graph.AddNode(ctx, nodeBothV1)) - - nodeBothV2 := createNode(t, lnwire.GossipVersion2, privBoth) - v2Features := lnwire.NewFeatureVector( - lnwire.NewRawFeatureVector(lnwire.TLVOnionPayloadRequired), - lnwire.Features, - ) - nodeBothV2.Features = v2Features - require.NoError(t, graph.AddNode(ctx, nodeBothV2)) - - features, err = graph.FetchNodeFeatures( - ctx, nodeBothV1.PubKeyBytes, - ) - require.NoError(t, err) - require.Equal(t, v2Features, features) - require.NotEqual(t, v1Features, features) - - // --- ForEachNodeDirectedChannel --- - - // Add a v1 channel between nodeV1 and nodeBothV1. + // Add a v1 channel and verify ForEachNodeDirectedChannel sees it. edge, _ := createEdge( - lnwire.GossipVersion1, 100, 0, 0, 0, - nodeV1, nodeBothV1, + lnwire.GossipVersion1, 100, 0, 0, 0, node1, node2, ) require.NoError(t, graph.AddChannelEdge(ctx, edge)) pol := newEdgePolicy( lnwire.GossipVersion1, edge.ChannelID, 1000, true, ) - pol.ToNode = nodeBothV1.PubKeyBytes + pol.ToNode = node2.PubKeyBytes pol.SigBytes = testSig.Serialize() require.NoError(t, graph.UpdateEdgePolicy(ctx, pol)) - // ForEachNodeDirectedChannel should find the channel. var foundChannels int err = graph.ForEachNodeDirectedChannel( - ctx, nodeV1.PubKeyBytes, + ctx, node1.PubKeyBytes, func(_ *DirectedChannel) error { foundChannels++ + return nil }, func() { foundChannels = 0 From 26ef5e2f3f465381b2ef9a91df20934871d5f595 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 7 Apr 2026 11:09:38 +0545 Subject: [PATCH 10/10] docs: add release note for cross-version graph Store --- docs/release-notes/release-notes-0.21.0.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/release-notes/release-notes-0.21.0.md b/docs/release-notes/release-notes-0.21.0.md index ad6147ca3a4..b3ce3357ca1 100644 --- a/docs/release-notes/release-notes-0.21.0.md +++ b/docs/release-notes/release-notes-0.21.0.md @@ -409,6 +409,12 @@ fallback](https://github.com/lightningnetwork/lnd/pull/10717) so that gossip channel filtering and zombie edge lookups use the correct gossip version instead of hardcoding v1. +* Make the [graph `Store` interface + cross-version](https://github.com/lightningnetwork/lnd/pull/10714) so that + `ForEachNode`, `ForEachChannel`, and `ForEachNodeDirectedChannel` work across + gossip v1 and v2. Add `Preferred` fetch helpers and `GetVersions` queries + so callers can retrieve channels without knowing which gossip version + announced them. * Updated waiting proof persistence for gossip upgrades by introducing typed waiting proof keys and payloads, with a DB migration to rewrite legacy waiting proof records to the new key/value format