diff --git a/forester/docs/v1_forester_flows.md b/forester/docs/v1_forester_flows.md new file mode 100644 index 0000000000..a3d9b2ddcb --- /dev/null +++ b/forester/docs/v1_forester_flows.md @@ -0,0 +1,164 @@ +# Forester V1 Flows (PR: v2 Nullify + Blockhash) + +## 1. Transaction Send Flow (Blockhash) + +``` +┌─────────────────────────────────────────────────────────────────────────────────┐ +│ send_batched_transactions │ +└─────────────────────────────────────────────────────────────────────────────────┘ + + ┌──────────────────────────────────┐ + │ prepare_batch_prerequisites │ + │ - fetch queue items │ + │ - single RPC: blockhash + │ + │ priority_fee (same connection) │ + │ - PreparedBatchData: │ + │ recent_blockhash │ + │ last_valid_block_height │ + └──────────────┬───────────────────┘ + │ + ▼ + ┌──────────────────────────────────┐ + │ for each work_chunk (100 items) │ + └──────────────┬───────────────────┘ + │ + ┌────────────┴────────────┐ + │ elapsed > 30s? │ + │ YES → refresh blockhash│ + │ (pool.get_connection │ + │ → rpc.get_latest_ │ + │ blockhash) │ + │ NO → keep current │ + └────────────┬────────────┘ + │ + ▼ + ┌──────────────────────────────────┐ + │ build_signed_transaction_batch │ + │ (recent_blockhash, │ + │ last_valid_block_height) │ + │ → (txs, chunk_last_valid_ │ + │ block_height) │ + └──────────────┬───────────────────┘ + │ + ▼ + ┌──────────────────────────────────┐ + │ execute_transaction_chunk_sending │ + │ PreparedTransaction::legacy( │ + │ tx, chunk_last_valid_block_ │ + │ height) │ + │ - send + confirm │ + │ - blockhash expiry check via │ + │ last_valid_block_height │ + └──────────────────────────────────┘ + + No refetch-before-send. No re-sign. +``` + +## 2. State Nullify Instruction Flow (Legacy vs v2) + +``` +┌─────────────────────────────────────────────────────────────────────────────────┐ +│ Registry: nullify instruction paths │ +└─────────────────────────────────────────────────────────────────────────────────┘ + + LEGACY (proof in ix data) v2 (proof in remaining_accounts) + ─────────────────────── ──────────────────────────────────── + + create_nullify_instruction() create_nullify_with_proof_accounts_instruction() + │ │ + │ ix data: [change_log, queue_idx, │ ix data: [change_log, queue_idx, + │ leaf_idx, proofs[16][32]] │ leaf_idx] (no proofs) + │ │ + │ remaining_accounts: standard │ remaining_accounts: 16 proof + │ (authority, merkle_tree, queue...) │ account pubkeys (key = node bytes) + │ │ + ▼ ▼ + process_nullify() nullify_2 instruction + (proofs from ix data) - validate: 1 change, 1 queue, 1 index + - validate: exactly 16 proof accounts + - extract_proof_nodes_from_remaining_accounts + - process_nullify(..., vec![proof_nodes]) + + Forester V1 uses nullify_2 only (create_nullify_2_instruction). +``` + +## 3. Forester V1 State Nullify Pairing Flow + +``` +┌─────────────────────────────────────────────────────────────────────────────────┐ +│ build_instruction_batches (state nullify path) │ +└─────────────────────────────────────────────────────────────────────────────────┘ + + fetch_proofs_and_create_instructions + │ + │ For each state item: + │ create_nullify_with_proof_accounts_instruction (v2) + │ → StateNullifyInstruction { instruction, proof_nodes, leaf_index } + │ + ▼ + ┌─────────────────────────────────────────────────────────────────────────────┐ + │ allow_pairing? │ + │ batch_size >= 2 AND should_attempt_pairing() │ + └─────────────────────────────────────────────────────────────────────────────┘ + │ + │ should_attempt_pairing checks: + │ - pair_candidates = n*(n-1)/2 <= 2000 (MAX_PAIR_CANDIDATES) + │ - state_nullify_count <= 96 (MAX_PAIRING_INSTRUCTIONS) + │ - remaining_blocks = last_valid - current > 25 (MIN_REMAINING_BLOCKS_FOR_PAIRING) + │ + ├── NO → each nullify → 1 tx (no pairing) + │ + └── YES → pair_state_nullify_batches + │ + │ For each pair (i,j): + │ - estimated_tx_size(ix_i, ix_j) <= 1200? (packet - safety margin) + │ - weight = 10000 + proof_overlap_count + │ + │ Max-cardinality matching (mwmatching) + │ - prioritize number of pairs + │ - then maximize proof overlap (fewer unique accounts) + │ + ▼ + Output: Vec> + - paired: [ix_a, ix_b] in one tx + - unpaired: [ix] in one tx + + Address updates: no pairing, chunked by batch_size only. +``` + +## 4. End-to-End Forester V1 State Tree Flow + +``` + Queue (state nullifier) Indexer (proofs) + │ │ + └──────────┬─────────────────┘ + │ + ▼ + prepare_batch_prerequisites + - queue items + - blockhash + last_valid_block_height + - priority_fee + │ + ▼ + for chunk in work_items.chunks(100): + refresh blockhash if 30s elapsed + │ + ▼ + build_signed_transaction_batch + │ + ├─ fetch_proofs_and_create_instructions + │ - state: v2 nullify ix (proof in remaining_accounts) + │ - address: update ix + │ + ├─ build_instruction_batches + │ - address: chunk by batch_size + │ - state nullify: pair if allow_pairing else 1-per-tx + │ + └─ create_smart_transaction per batch + │ + ▼ + execute_transaction_chunk_sending + - PreparedTransaction::legacy(tx, chunk_last_valid_block_height) + - send + confirm with blockhash expiry check +``` + diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index f52efa1b13..96abb8b01d 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -2993,6 +2993,7 @@ impl EpochManager { compute_unit_limit: Some(self.config.transaction_config.cu_limit), enable_priority_fees: self.config.transaction_config.enable_priority_fees, max_concurrent_sends: Some(self.config.transaction_config.max_concurrent_sends), + pairs_only: false, // overridden at runtime based on queue fullness }, queue_config: self.config.queue_config, retry_config: RetryConfig { diff --git a/forester/src/lib.rs b/forester/src/lib.rs index aebb2d4e9f..264b2066ea 100644 --- a/forester/src/lib.rs +++ b/forester/src/lib.rs @@ -10,6 +10,7 @@ pub mod forester_status; pub mod health_check; pub mod helius_priority_fee_types; pub mod logging; +pub(crate) mod matching; pub mod metrics; pub mod pagerduty; pub mod priority_fee; diff --git a/forester/src/matching.rs b/forester/src/matching.rs new file mode 100644 index 0000000000..d108bed7c4 --- /dev/null +++ b/forester/src/matching.rs @@ -0,0 +1,356 @@ +//! Maximum cardinality matching via Edmonds' Blossom algorithm. +//! +//! Finds the largest set of non-overlapping vertex pairs in a general +//! (non-bipartite) weighted graph. When multiple maximum-cardinality +//! matchings exist, the one with the highest total weight is preferred +//! thanks to the greedy initialization sorting edges by weight. + +use std::collections::VecDeque; + +/// Returned for unmatched vertices. +pub const SENTINEL: usize = usize::MAX; + +/// Builder for a maximum-cardinality matching on a general weighted graph. +/// +/// ```ignore +/// let mates = Matching::new(edges).max_cardinality().solve(); +/// ``` +pub struct Matching { + edges: Vec<(usize, usize, i32)>, +} + +impl Matching { + pub fn new(edges: Vec<(usize, usize, i32)>) -> Self { + Self { edges } + } + + /// Request maximum-cardinality mode (currently the only mode). + pub fn max_cardinality(self) -> Self { + self + } + + /// Solve and return `mates[v]` for every vertex `v`. + /// `mates[v] == SENTINEL` when `v` is unmatched. + pub fn solve(self) -> Vec { + if self.edges.is_empty() { + return Vec::new(); + } + let n = self + .edges + .iter() + .flat_map(|&(u, v, _)| [u, v]) + .max() + .map(|m| m + 1) + .unwrap_or(0); + if n == 0 { + return Vec::new(); + } + edmonds_matching(n, &self.edges) + } +} + +// --------------------------------------------------------------------------- +// Edmonds' Blossom algorithm – O(V²·E) maximum-cardinality matching +// --------------------------------------------------------------------------- + +fn edmonds_matching(n: usize, edges: &[(usize, usize, i32)]) -> Vec { + let mut adj = vec![vec![]; n]; + for &(u, v, _) in edges { + adj[u].push(v); + adj[v].push(u); + } + + let mut mate = vec![SENTINEL; n]; + + // Greedy init: prefer higher-weight edges for a better starting point. + let mut sorted: Vec<_> = edges.to_vec(); + sorted.sort_by(|a, b| b.2.cmp(&a.2)); + for &(u, v, _) in &sorted { + if mate[u] == SENTINEL && mate[v] == SENTINEL { + mate[u] = v; + mate[v] = u; + } + } + + // Augment from every remaining free vertex. + for root in 0..n { + if mate[root] != SENTINEL { + continue; + } + try_augment(n, &adj, &mut mate, root); + } + + mate +} + +/// BFS from `root` looking for an augmenting path. Returns `true` if one was +/// found (and the matching has already been updated). +fn try_augment(n: usize, adj: &[Vec], mate: &mut [usize], root: usize) -> bool { + let mut base: Vec = (0..n).collect(); + let mut parent = vec![SENTINEL; n]; + let mut color = vec![0u8; n]; // 0 = unseen, 1 = outer, 2 = inner + let mut queue = VecDeque::new(); + + color[root] = 1; + queue.push_back(root); + + while let Some(v) = queue.pop_front() { + for &u in &adj[v] { + if base[v] == base[u] || color[u] == 2 { + continue; + } + if color[u] == 1 { + // Both outer → blossom. + let lca = find_lca(&base, &parent, mate, root, v, u); + contract( + &mut base, + &mut parent, + &mut color, + mate, + &mut queue, + v, + u, + lca, + ); + } else if mate[u] == SENTINEL { + // Free vertex → augmenting path found. + parent[u] = v; + augment(mate, &parent, u); + return true; + } else { + // Matched, unseen vertex → extend tree. + parent[u] = v; + color[u] = 2; + let w = mate[u]; + color[w] = 1; + queue.push_back(w); + } + } + } + + false +} + +/// Walk from both endpoints towards the root to find the lowest common +/// ancestor in the alternating tree (respecting blossom bases). +fn find_lca( + base: &[usize], + parent: &[usize], + mate: &[usize], + root: usize, + a: usize, + b: usize, +) -> usize { + let n = base.len(); + let mut visited = vec![false; n]; + let mut a = base[a]; + let mut b = base[b]; + loop { + visited[a] = true; + if a == root { + break; + } + a = base[parent[mate[a]]]; + } + loop { + if visited[b] { + return b; + } + b = base[parent[mate[b]]]; + } +} + +/// Shrink the blossom defined by paths v→lca and u→lca. +#[allow(clippy::too_many_arguments)] +fn contract( + base: &mut [usize], + parent: &mut [usize], + color: &mut [u8], + mate: &[usize], + queue: &mut VecDeque, + v: usize, + u: usize, + lca: usize, +) { + let n = base.len(); + let mut blossom = vec![false; n]; + mark_path(base, parent, mate, &mut blossom, v, lca, u); + mark_path(base, parent, mate, &mut blossom, u, lca, v); + for i in 0..n { + if blossom[base[i]] { + base[i] = lca; + if color[i] != 1 { + color[i] = 1; + queue.push_back(i); + } + } + } +} + +/// Walk from `v` towards `lca`, marking blossom members and redirecting +/// parent pointers so that future augmentations can traverse the blossom. +fn mark_path( + base: &[usize], + parent: &mut [usize], + mate: &[usize], + blossom: &mut [bool], + mut v: usize, + lca: usize, + child: usize, +) { + let mut cur_child = child; + while base[v] != lca { + blossom[base[v]] = true; + blossom[base[mate[v]]] = true; + parent[v] = cur_child; + cur_child = mate[v]; + v = parent[mate[v]]; + } +} + +/// Flip matched / unmatched edges along the augmenting path ending at `u`. +fn augment(mate: &mut [usize], parent: &[usize], mut u: usize) { + while u != SENTINEL { + let v = parent[u]; + let prev = if v != SENTINEL { mate[v] } else { SENTINEL }; + mate[u] = v; + if v != SENTINEL { + mate[v] = u; + } + u = prev; + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn prioritizes_cardinality() { + // Path 0–1–2: one edge can be matched. With max_cardinality the + // algorithm must NOT leave node 1 unmatched just because (0,1) has + // higher weight. Either (0,1) or (1,2) is fine – 1 pair total. + let edges = vec![(0, 1, 10_100i32), (1, 2, 10_090)]; + let mates = Matching::new(edges).max_cardinality().solve(); + let pairs: Vec<_> = mates + .iter() + .enumerate() + .filter_map(|(i, &m)| { + if m != SENTINEL && m > i { + Some((i, m)) + } else { + None + } + }) + .collect(); + assert_eq!(pairs.len(), 1); + } + + #[test] + fn disconnected_components() { + let edges = vec![(0, 1, 10_010), (2, 3, 10_005)]; + let mates = Matching::new(edges).max_cardinality().solve(); + let matched = mates.iter().filter(|&&m| m != SENTINEL).count(); + assert_eq!(matched, 4); + } + + #[test] + fn empty_edges() { + let mates = Matching::new(vec![]).max_cardinality().solve(); + assert!(mates.is_empty()); + } + + #[test] + fn triangle() { + // Triangle: max matching = 1 pair. + let edges = vec![(0, 1, 1), (1, 2, 1), (0, 2, 1)]; + let mates = Matching::new(edges).max_cardinality().solve(); + let matched = mates.iter().filter(|&&m| m != SENTINEL).count(); + assert_eq!(matched, 2); // 1 pair = 2 matched vertices + } + + #[test] + fn augmenting_path_needed() { + // 0–1–2–3: greedy might match (0,1) and leave (2,3) for the second + // pass. Either way, 2 pairs are achievable. + let edges = vec![(0, 1, 1), (1, 2, 1), (2, 3, 1)]; + let mates = Matching::new(edges).max_cardinality().solve(); + let pairs: Vec<_> = mates + .iter() + .enumerate() + .filter_map(|(i, &m)| { + if m != SENTINEL && m > i { + Some((i, m)) + } else { + None + } + }) + .collect(); + assert_eq!(pairs.len(), 2); + } + + #[test] + fn pentagon_blossom() { + // 5-cycle: max matching = 2 pairs. + let edges = vec![(0, 1, 1), (1, 2, 1), (2, 3, 1), (3, 4, 1), (4, 0, 1)]; + let mates = Matching::new(edges).max_cardinality().solve(); + let pairs: Vec<_> = mates + .iter() + .enumerate() + .filter_map(|(i, &m)| { + if m != SENTINEL && m > i { + Some((i, m)) + } else { + None + } + }) + .collect(); + assert_eq!(pairs.len(), 2); + } + + #[test] + fn complete_graph_k6() { + // K6: 6 vertices, max matching = 3 pairs. + let mut edges = Vec::new(); + for i in 0..6 { + for j in (i + 1)..6 { + edges.push((i, j, 1)); + } + } + let mates = Matching::new(edges).max_cardinality().solve(); + let pairs: Vec<_> = mates + .iter() + .enumerate() + .filter_map(|(i, &m)| { + if m != SENTINEL && m > i { + Some((i, m)) + } else { + None + } + }) + .collect(); + assert_eq!(pairs.len(), 3); + } + + #[test] + fn weight_tiebreaker() { + // 4 vertices, 2 possible perfect matchings: + // (0,1)+(2,3) total weight = 100+1 = 101 + // (0,2)+(1,3) total weight = 50+50 = 100 + // Greedy init prefers (0,1) first (weight 100), then (2,3). + let edges = vec![(0, 1, 100), (0, 2, 50), (1, 3, 50), (2, 3, 1)]; + let mates = Matching::new(edges).max_cardinality().solve(); + let pairs: Vec<_> = mates + .iter() + .enumerate() + .filter_map(|(i, &m)| { + if m != SENTINEL && m > i { + Some((i, m)) + } else { + None + } + }) + .collect(); + assert_eq!(pairs.len(), 2); // perfect matching + } +} diff --git a/forester/src/processor/v1/config.rs b/forester/src/processor/v1/config.rs index f2ee05f353..153e005c96 100644 --- a/forester/src/processor/v1/config.rs +++ b/forester/src/processor/v1/config.rs @@ -21,6 +21,12 @@ pub struct SendBatchedTransactionsConfig { pub confirmation_max_attempts: usize, } +/// Pending-item threshold below which the forester only emits *paired* +/// state-nullify transactions, dropping unpaired singles. When the queue is +/// nearly empty there is no urgency, so we save a transaction by waiting for +/// the next cycle when the single can potentially be paired. +pub const PAIRS_ONLY_THRESHOLD: u64 = 4_000; + #[derive(Debug, Clone, Copy)] pub struct BuildTransactionBatchConfig { pub batch_size: u64, @@ -28,4 +34,8 @@ pub struct BuildTransactionBatchConfig { pub compute_unit_limit: Option, pub enable_priority_fees: bool, pub max_concurrent_sends: Option, + /// When `true`, only emit paired state-nullify transactions. + /// Unpaired singles are dropped and retried in the next cycle. + /// Computed at runtime: `pairs_only = total_pending < PAIRS_ONLY_THRESHOLD`. + pub pairs_only: bool, } diff --git a/forester/src/processor/v1/helpers.rs b/forester/src/processor/v1/helpers.rs index d980b02e32..e465e260da 100644 --- a/forester/src/processor/v1/helpers.rs +++ b/forester/src/processor/v1/helpers.rs @@ -11,8 +11,8 @@ use forester_utils::{rpc_pool::SolanaRpcPool, utils::wait_for_indexer}; use light_client::{indexer::Indexer, rpc::Rpc}; use light_compressed_account::TreeType; use light_registry::account_compression_cpi::sdk::{ - create_nullify_instruction, create_update_address_merkle_tree_instruction, - CreateNullifyInstructionInputs, UpdateAddressMerkleTreeInstructionInputs, + create_nullify_2_instruction, create_update_address_merkle_tree_instruction, + CreateNullify2InstructionInputs, UpdateAddressMerkleTreeInstructionInputs, }; use solana_program::instruction::Instruction; use tokio::time::Instant; @@ -32,6 +32,20 @@ use crate::{ errors::ForesterError, }; +#[derive(Clone, Debug)] +pub enum PreparedV1Instruction { + AddressUpdate(Instruction), + StateNullify(StateNullifyInstruction), +} + +#[derive(Clone, Debug)] +pub struct StateNullifyInstruction { + pub instruction: Instruction, + pub proof_nodes: Vec<[u8; 32]>, + pub leaf_index: u64, + pub merkle_tree: Pubkey, +} + /// Work items should be of only one type and tree pub async fn fetch_proofs_and_create_instructions( authority: Pubkey, @@ -39,7 +53,7 @@ pub async fn fetch_proofs_and_create_instructions( pool: Arc>, epoch: u64, work_items: &[WorkItem], -) -> crate::Result<(Vec, Vec)> { +) -> crate::Result<(Vec, Vec)> { let mut proofs = Vec::new(); let mut instructions = vec![]; @@ -360,7 +374,7 @@ pub async fn fetch_proofs_and_create_instructions( }, epoch, ); - instructions.push(instruction); + instructions.push(PreparedV1Instruction::AddressUpdate(instruction)); } // Process state proofs and create instructions @@ -375,21 +389,34 @@ pub async fn fetch_proofs_and_create_instructions( for (item, proof) in state_items.iter().zip(state_proofs.into_iter()) { proofs.push(MerkleProofType::StateProof(proof.clone())); - let instruction = create_nullify_instruction( - CreateNullifyInstructionInputs { + let instruction = create_nullify_2_instruction( + CreateNullify2InstructionInputs { nullifier_queue: item.tree_account.queue, merkle_tree: item.tree_account.merkle_tree, - change_log_indices: vec![proof.root_seq % STATE_MERKLE_TREE_CHANGELOG], - leaves_queue_indices: vec![item.queue_item_data.index as u16], - indices: vec![proof.leaf_index], - proofs: vec![proof.proof.clone()], + change_log_index: proof.root_seq % STATE_MERKLE_TREE_CHANGELOG, + leaves_queue_index: item.queue_item_data.index as u16, + index: proof.leaf_index, + proof: proof + .proof + .clone() + .try_into() + .map_err(|_| ForesterError::General { + error: "Failed to convert state proof to fixed array".to_string(), + })?, authority, derivation, is_metadata_forester: false, }, epoch, ); - instructions.push(instruction); + instructions.push(PreparedV1Instruction::StateNullify( + StateNullifyInstruction { + instruction, + proof_nodes: proof.proof, + leaf_index: proof.leaf_index, + merkle_tree: item.tree_account.merkle_tree, + }, + )); } Ok((proofs, instructions)) diff --git a/forester/src/processor/v1/send_transaction.rs b/forester/src/processor/v1/send_transaction.rs index b5282bc47a..9eddc83889 100644 --- a/forester/src/processor/v1/send_transaction.rs +++ b/forester/src/processor/v1/send_transaction.rs @@ -27,7 +27,9 @@ use crate::{ errors::ForesterError, metrics::increment_transactions_failed, priority_fee::PriorityFeeConfig, - processor::v1::{config::SendBatchedTransactionsConfig, tx_builder::TransactionBuilder}, + processor::v1::{ + config, config::SendBatchedTransactionsConfig, tx_builder::TransactionBuilder, + }, queue_helpers::fetch_queue_item_data, smart_transaction::{ConfirmationConfig, PreparedTransaction, SmartTransactionError}, Result, @@ -39,6 +41,7 @@ struct PreparedBatchData { last_valid_block_height: u64, priority_fee: Option, timeout_deadline: Instant, + total_pending: u64, } #[derive(Clone)] @@ -97,22 +100,24 @@ pub async fn send_batched_transactions( } }; - let queue_item_data = { + let (queue_item_data, total_pending) = { let mut rpc = pool.get_connection().await.map_err(|e| { error!(tree = %tree_id_str, "Failed to get RPC for queue data: {:?}", e); ForesterError::RpcPool(e) })?; - fetch_queue_item_data(&mut *rpc, &tree_accounts.queue, queue_fetch_start_index) - .await - .map_err(|e| { - warn!(tree = %tree_id_str, "Failed to fetch queue item data: {:?}", e); - ForesterError::General { - error: format!("Fetch queue data failed for {}: {}", tree_id_str, e), - } - })? - .items + let result = + fetch_queue_item_data(&mut *rpc, &tree_accounts.queue, queue_fetch_start_index) + .await + .map_err(|e| { + warn!(tree = %tree_id_str, "Failed to fetch queue item data: {:?}", e); + ForesterError::General { + error: format!("Fetch queue data failed for {}: {}", tree_id_str, e), + } + })?; + (result.items, result.total_pending) }; if queue_item_data.is_empty() { @@ -315,6 +321,7 @@ async fn prepare_batch_prerequisites( last_valid_block_height, priority_fee, timeout_deadline, + total_pending, })) } diff --git a/forester/src/processor/v1/tx_builder.rs b/forester/src/processor/v1/tx_builder.rs index 463cc0b2bf..17b267a67e 100644 --- a/forester/src/processor/v1/tx_builder.rs +++ b/forester/src/processor/v1/tx_builder.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, time::Duration}; +use std::{collections::HashSet, sync::Arc, time::Duration}; use account_compression::processor::initialize_address_merkle_tree::Pubkey; use async_trait::async_trait; @@ -6,22 +6,46 @@ use forester_utils::rpc_pool::SolanaRpcPool; use light_client::rpc::Rpc; use solana_program::hash::Hash; use solana_sdk::{ + compute_budget::ComputeBudgetInstruction, signature::{Keypair, Signer}, transaction::Transaction, }; use tokio::sync::Mutex; -use tracing::{trace, warn}; +use tracing::{info, trace, warn}; use crate::{ epoch_manager::WorkItem, + matching::{Matching, SENTINEL}, processor::{ tx_cache::ProcessedHashCache, - v1::{config::BuildTransactionBatchConfig, helpers::fetch_proofs_and_create_instructions}, + v1::{ + config::BuildTransactionBatchConfig, + helpers::{ + fetch_proofs_and_create_instructions, PreparedV1Instruction, + StateNullifyInstruction, + }, + }, }, smart_transaction::{create_smart_transaction, CreateSmartTransactionConfig}, Result, }; +const MAX_PAIRING_INSTRUCTIONS: usize = 100; +const MAX_PAIR_CANDIDATES: usize = 4_950; +const MIN_REMAINING_BLOCKS_FOR_PAIRING: u64 = 25; + +/// Safety margin subtracted from the Solana packet size (1232 bytes) when +/// checking whether two instructions fit in a single transaction. This +/// accounts for any minor divergence between the size-check path and the +/// real `create_smart_transaction` path (e.g. signature encoding). +const TX_SIZE_SAFETY_MARGIN: usize = 32; + +/// Maximum legacy transaction size (Solana PACKET_DATA_SIZE). +const PACKET_DATA_SIZE: usize = 1232; + +/// Maximum allowed serialised transaction size for a paired batch. +const MAX_TRANSACTION_SIZE: usize = PACKET_DATA_SIZE - TX_SIZE_SAFETY_MARGIN; + #[async_trait] #[allow(clippy::too_many_arguments)] pub trait TransactionBuilder: Send + Sync { @@ -58,6 +82,52 @@ impl EpochManagerTransactions { processed_hash_cache: cache, } } + + async fn should_attempt_pairing( + &self, + last_valid_block_height: u64, + state_nullify_count: usize, + ) -> bool { + let pair_candidates = pairing_candidate_count(state_nullify_count); + if !pairing_precheck_passes(state_nullify_count, pair_candidates) { + warn!( + "Skipping nullify pairing due to candidate explosion: count={}, pair_candidates={}", + state_nullify_count, pair_candidates + ); + return false; + } + + let conn = match self.pool.get_connection().await { + Ok(conn) => conn, + Err(e) => { + warn!( + "Skipping nullify pairing because RPC connection unavailable for block-height check: {}", + e + ); + return false; + } + }; + let current_block_height = match conn.get_block_height().await { + Ok(height) => height, + Err(e) => { + warn!( + "Skipping nullify pairing because block-height check failed: {}", + e + ); + return false; + } + }; + let remaining_blocks = last_valid_block_height.saturating_sub(current_block_height); + if !remaining_blocks_allows_pairing(remaining_blocks) { + warn!( + "Skipping nullify pairing near blockhash expiry: remaining_blocks={}", + remaining_blocks + ); + return false; + } + + true + } } #[async_trait] @@ -91,7 +161,7 @@ impl TransactionBuilder for EpochManagerTransactions { }) .collect(); - // Add items with short timeout (30 seconds) for processing + // Add items with a short timeout (15 seconds) for processing. for item in &work_items { let hash_str = bs58::encode(&item.queue_item_data.hash).into_string(); cache.add_with_timeout(&hash_str, Duration::from_secs(15)); @@ -116,7 +186,7 @@ impl TransactionBuilder for EpochManagerTransactions { .collect::>(); let mut transactions = vec![]; - let all_instructions = match fetch_proofs_and_create_instructions( + let prepared_instructions = match fetch_proofs_and_create_instructions( payer.pubkey(), *derivation, self.pool.clone(), @@ -143,17 +213,47 @@ impl TransactionBuilder for EpochManagerTransactions { }; let batch_size = config.batch_size.max(1) as usize; + let state_nullify_count = prepared_instructions + .iter() + .filter(|ix| matches!(ix, PreparedV1Instruction::StateNullify(_))) + .count(); + let allow_pairing = if state_nullify_count >= 2 { + self.should_attempt_pairing(last_valid_block_height, state_nullify_count) + .await + } else { + false + }; + let instruction_batches = build_instruction_batches( + prepared_instructions, + batch_size, + allow_pairing, + config.pairs_only, + &payer.pubkey(), + priority_fee, + config.compute_unit_limit, + )?; - for instruction_chunk in all_instructions.chunks(batch_size) { + for instruction_chunk in instruction_batches { + let is_paired = instruction_chunk.len() >= 2; let (transaction, _) = create_smart_transaction(CreateSmartTransactionConfig { payer: payer.insecure_clone(), - instructions: instruction_chunk.to_vec(), + instructions: instruction_chunk, recent_blockhash: *recent_blockhash, compute_unit_price: priority_fee, compute_unit_limit: config.compute_unit_limit, last_valid_block_height, }) .await?; + if is_paired { + info!( + "Paired nullify_2 tx: sig={}, ixs=2", + transaction + .signatures + .first() + .map(|s| s.to_string()) + .unwrap_or_default() + ); + } transactions.push(transaction); } @@ -171,3 +271,721 @@ impl TransactionBuilder for EpochManagerTransactions { Ok((transactions, last_valid_block_height)) } } + +// --------------------------------------------------------------------------- +// Instruction batching with optional pairing +// --------------------------------------------------------------------------- + +#[allow(clippy::too_many_arguments)] +fn build_instruction_batches( + prepared_instructions: Vec, + batch_size: usize, + allow_pairing: bool, + pairs_only: bool, + payer: &Pubkey, + priority_fee: Option, + compute_unit_limit: Option, +) -> Result>> { + let mut address_instructions = Vec::new(); + let mut state_nullify_instructions = Vec::new(); + for prepared in prepared_instructions { + match prepared { + PreparedV1Instruction::AddressUpdate(ix) => address_instructions.push(ix), + PreparedV1Instruction::StateNullify(ix) => state_nullify_instructions.push(ix), + } + } + + let mut batches = Vec::new(); + for chunk in address_instructions.chunks(batch_size) { + batches.push(chunk.to_vec()); + } + + if state_nullify_instructions.is_empty() { + return Ok(batches); + } + + // Sort by leaf_index for better proof-node overlap between neighbours. + state_nullify_instructions.sort_by_key(|ix| ix.leaf_index); + + let paired_batches = if allow_pairing { + pair_state_nullify_batches( + state_nullify_instructions, + payer, + priority_fee, + compute_unit_limit, + pairs_only, + )? + } else if !pairs_only { + state_nullify_instructions + .into_iter() + .map(|ix| vec![ix.instruction]) + .collect() + } else { + Vec::new() + }; + batches.extend(paired_batches); + Ok(batches) +} + +fn pair_state_nullify_batches( + state_nullify_instructions: Vec, + payer: &Pubkey, + priority_fee: Option, + compute_unit_limit: Option, + pairs_only: bool, +) -> Result>> { + let n = state_nullify_instructions.len(); + if n < 2 { + if pairs_only { + return Ok(Vec::new()); + } + return Ok(state_nullify_instructions + .into_iter() + .map(|ix| vec![ix.instruction]) + .collect()); + } + + // Pre-compute compute budget instructions once for all pairs. + let compute_budget_ixs = make_compute_budget_instructions(priority_fee, compute_unit_limit); + + // Pre-compute HashSets for O(1) overlap lookup. + let proof_sets: Vec> = state_nullify_instructions + .iter() + .map(|ix| ix.proof_nodes.iter().copied().collect()) + .collect(); + let leaf_indices: Vec = state_nullify_instructions + .iter() + .map(|ix| ix.leaf_index) + .collect(); + + let mut edges: Vec<(usize, usize, i32)> = Vec::new(); + for i in 0..n { + for j in (i + 1)..n { + if estimated_tx_size( + payer, + &compute_budget_ixs, + &[ + &state_nullify_instructions[i].instruction, + &state_nullify_instructions[j].instruction, + ], + ) > MAX_TRANSACTION_SIZE + { + continue; + } + let overlap = proof_sets[i].intersection(&proof_sets[j]).count() as i32; + // Prioritize pair count first, then maximize proof overlap. + let weight = 10_000 + overlap; + edges.push((i, j, weight)); + } + } + + if edges.is_empty() { + if pairs_only { + return Ok(Vec::new()); + } + return Ok(state_nullify_instructions + .into_iter() + .map(|ix| vec![ix.instruction]) + .collect()); + } + + let mates = Matching::new(edges).max_cardinality().solve(); + + // Move instructions into Options for zero-copy extraction. + let mut instructions: Vec> = + state_nullify_instructions + .into_iter() + .map(|ix| Some(ix.instruction)) + .collect(); + + let mut used = vec![false; n]; + let mut paired_batches: Vec<(u64, Vec)> = Vec::new(); + + for i in 0..n { + if used[i] { + continue; + } + let mate = mates.get(i).copied().unwrap_or(SENTINEL); + if mate != SENTINEL && mate > i && mate < n { + used[i] = true; + used[mate] = true; + let (left, right) = if leaf_indices[i] <= leaf_indices[mate] { + (i, mate) + } else { + (mate, i) + }; + let min_leaf = leaf_indices[left]; + paired_batches.push(( + min_leaf, + vec![ + instructions[left].take().unwrap(), + instructions[right].take().unwrap(), + ], + )); + } + } + + let mut single_batches: Vec<(u64, Vec)> = Vec::new(); + if !pairs_only { + for (i, ix) in instructions.into_iter().enumerate() { + if let Some(ix) = ix { + single_batches.push((leaf_indices[i], vec![ix])); + } + } + } + + paired_batches.sort_by_key(|(leaf, _)| *leaf); + single_batches.sort_by_key(|(leaf, _)| *leaf); + paired_batches.extend(single_batches); + Ok(paired_batches.into_iter().map(|(_, batch)| batch).collect()) +} + +// --------------------------------------------------------------------------- +// Transaction-size estimation (zero-copy – no instruction cloning) +// --------------------------------------------------------------------------- + +/// Build the compute-budget instructions that `create_smart_transaction` would +/// prepend. Built once and reused across all pair checks. +fn make_compute_budget_instructions( + priority_fee: Option, + compute_unit_limit: Option, +) -> Vec { + let mut ixs = Vec::with_capacity(2); + if let Some(price) = priority_fee { + ixs.push(ComputeBudgetInstruction::set_compute_unit_price(price)); + } + if let Some(limit) = compute_unit_limit { + ixs.push(ComputeBudgetInstruction::set_compute_unit_limit(limit)); + } + ixs +} + +/// Estimate the Solana legacy-transaction wire-format size from instruction +/// references, without cloning instructions or constructing a Transaction. +fn estimated_tx_size( + payer: &Pubkey, + compute_budget_ixs: &[solana_program::instruction::Instruction], + main_ixs: &[&solana_program::instruction::Instruction], +) -> usize { + let mut keys = HashSet::new(); + keys.insert(*payer); + + let mut signer_keys = HashSet::new(); + signer_keys.insert(*payer); + + for ix in compute_budget_ixs { + keys.insert(ix.program_id); + for meta in &ix.accounts { + keys.insert(meta.pubkey); + if meta.is_signer { + signer_keys.insert(meta.pubkey); + } + } + } + for ix in main_ixs { + keys.insert(ix.program_id); + for meta in &ix.accounts { + keys.insert(meta.pubkey); + if meta.is_signer { + signer_keys.insert(meta.pubkey); + } + } + } + + let num_keys = keys.len(); + let num_sigs = signer_keys.len(); + + // signatures section: compact-u16(count) + count * 64 + let sigs = short_vec_len(num_sigs) + num_sigs * 64; + + // message header (3 bytes) + let header = 3; + + // account keys: compact-u16(count) + count * 32 + let key_bytes = short_vec_len(num_keys) + num_keys * 32; + + // recent_blockhash + let blockhash = 32; + + // instructions: compact-u16(count) + each instruction + let instruction_count = compute_budget_ixs.len() + main_ixs.len(); + let mut ixs = short_vec_len(instruction_count); + for ix in compute_budget_ixs { + ixs += 1; // program_id_index (u8) + ixs += short_vec_len(ix.accounts.len()) + ix.accounts.len(); + ixs += short_vec_len(ix.data.len()) + ix.data.len(); + } + for ix in main_ixs { + ixs += 1; + ixs += short_vec_len(ix.accounts.len()) + ix.accounts.len(); + ixs += short_vec_len(ix.data.len()) + ix.data.len(); + } + + sigs + header + key_bytes + blockhash + ixs +} + +/// Compute the Solana legacy-transaction wire-format size from a constructed +/// Transaction. Used in tests to verify `estimated_tx_size` correctness. +#[cfg(test)] +fn legacy_transaction_size(tx: &Transaction) -> usize { + let msg = &tx.message; + let num_sigs = msg.header.num_required_signatures as usize; + + let sigs = short_vec_len(num_sigs) + num_sigs * 64; + let header = 3; + let keys = short_vec_len(msg.account_keys.len()) + msg.account_keys.len() * 32; + let blockhash = 32; + + let mut ixs = short_vec_len(msg.instructions.len()); + for ix in &msg.instructions { + ixs += 1; + ixs += short_vec_len(ix.accounts.len()) + ix.accounts.len(); + ixs += short_vec_len(ix.data.len()) + ix.data.len(); + } + + sigs + header + keys + blockhash + ixs +} + +/// Length of a Solana ShortVec (compact-u16) encoding. +fn short_vec_len(val: usize) -> usize { + if val < 0x80 { + 1 + } else if val < 0x4000 { + 2 + } else { + 3 + } +} + +// --------------------------------------------------------------------------- +// Guard helpers +// --------------------------------------------------------------------------- + +fn pairing_candidate_count(n: usize) -> usize { + n.saturating_sub(1).saturating_mul(n) / 2 +} + +fn pairing_precheck_passes(state_nullify_count: usize, pair_candidates: usize) -> bool { + if state_nullify_count < 2 { + return false; + } + if state_nullify_count > MAX_PAIRING_INSTRUCTIONS { + return false; + } + pair_candidates <= MAX_PAIR_CANDIDATES +} + +fn remaining_blocks_allows_pairing(remaining_blocks: u64) -> bool { + remaining_blocks > MIN_REMAINING_BLOCKS_FOR_PAIRING +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use solana_program::instruction::{AccountMeta, Instruction}; + use solana_sdk::signature::Keypair; + + use super::*; + + // -- matching tests (verify our own Blossom impl) -- + + #[test] + fn max_matching_prioritizes_cardinality() { + let edges = vec![(0usize, 1usize, 10_100i32), (1usize, 2usize, 10_090i32)]; + let mates = Matching::new(edges).max_cardinality().solve(); + let pairs = mates + .iter() + .enumerate() + .filter_map(|(i, mate)| { + if *mate != SENTINEL && *mate > i { + Some((i, *mate)) + } else { + None + } + }) + .collect::>(); + assert_eq!(pairs.len(), 1); + } + + #[test] + fn max_matching_handles_disconnected_graph() { + let edges = vec![(0usize, 1usize, 10_010i32), (2usize, 3usize, 10_005i32)]; + let mates = Matching::new(edges).max_cardinality().solve(); + let matched_vertices = mates.iter().filter(|mate| **mate != SENTINEL).count(); + assert_eq!(matched_vertices, 4); + } + + #[test] + fn max_matching_returns_unmatched_for_empty_edges() { + let mates = Matching::new(vec![]).max_cardinality().solve(); + assert!(mates.is_empty()); + } + + // -- pairing helper tests -- + + #[test] + fn pairing_candidate_count_matches_combination_formula() { + assert_eq!(pairing_candidate_count(0), 0); + assert_eq!(pairing_candidate_count(1), 0); + assert_eq!(pairing_candidate_count(2), 1); + assert_eq!(pairing_candidate_count(3), 3); + assert_eq!(pairing_candidate_count(10), 45); + assert_eq!(pairing_candidate_count(100), 4950); + } + + #[test] + fn pairing_precheck_enforces_instruction_and_candidate_limits() { + assert!(!pairing_precheck_passes(1, pairing_candidate_count(1))); + assert!(pairing_precheck_passes(2, pairing_candidate_count(2))); + assert!(pairing_precheck_passes( + MAX_PAIRING_INSTRUCTIONS, + pairing_candidate_count(MAX_PAIRING_INSTRUCTIONS) + )); + assert!(!pairing_precheck_passes( + MAX_PAIRING_INSTRUCTIONS + 1, + pairing_candidate_count(MAX_PAIRING_INSTRUCTIONS + 1) + )); + assert!(!pairing_precheck_passes(90, MAX_PAIR_CANDIDATES + 1)); + } + + #[test] + fn remaining_blocks_guard_is_strictly_greater_than_threshold() { + assert!(!remaining_blocks_allows_pairing( + MIN_REMAINING_BLOCKS_FOR_PAIRING - 1 + )); + assert!(!remaining_blocks_allows_pairing( + MIN_REMAINING_BLOCKS_FOR_PAIRING + )); + assert!(remaining_blocks_allows_pairing( + MIN_REMAINING_BLOCKS_FOR_PAIRING + 1 + )); + } + + // -- transaction size tests -- + + #[test] + fn estimated_tx_size_matches_legacy_transaction_size() { + let payer = Keypair::new(); + let program_id = Pubkey::new_unique(); + let ix = Instruction { + program_id, + accounts: vec![AccountMeta::new(payer.pubkey(), true)], + data: vec![0u8; 100], + }; + let compute_budget_ixs = make_compute_budget_instructions(Some(1_000), Some(200_000)); + + // Estimate without constructing a transaction. + let estimated = estimated_tx_size(&payer.pubkey(), &compute_budget_ixs, &[&ix]); + + // Build the real transaction for comparison. + let mut all_ixs = compute_budget_ixs; + all_ixs.push(ix); + let tx = Transaction::new_with_payer(&all_ixs, Some(&payer.pubkey())); + let actual = legacy_transaction_size(&tx); + + assert_eq!(estimated, actual); + } + + #[test] + fn estimated_tx_size_with_two_instructions() { + let payer = Keypair::new(); + let fx = TestFixture::new(&payer); + let proof: Vec<[u8; 32]> = (0..16).map(shared_proof).collect(); + let ix_a = fx.make_ix(10, proof.clone()); + let ix_b = fx.make_ix(11, proof); + let compute_budget_ixs = make_compute_budget_instructions(Some(1), Some(200_000)); + + let estimated = estimated_tx_size( + &payer.pubkey(), + &compute_budget_ixs, + &[&ix_a.instruction, &ix_b.instruction], + ); + + // Build the real transaction for comparison. + let mut all_ixs = compute_budget_ixs; + all_ixs.push(ix_a.instruction); + all_ixs.push(ix_b.instruction); + let tx = Transaction::new_with_payer(&all_ixs, Some(&payer.pubkey())); + let actual = legacy_transaction_size(&tx); + + assert_eq!(estimated, actual); + // Two nullify_2 instructions with 16 shared proof accounts should fit. + assert!( + estimated <= MAX_TRANSACTION_SIZE, + "estimated={estimated} > MAX_TRANSACTION_SIZE={MAX_TRANSACTION_SIZE}" + ); + } + + #[test] + fn legacy_transaction_size_is_consistent() { + let payer = Keypair::new(); + let ix = Instruction { + program_id: Pubkey::new_unique(), + accounts: vec![AccountMeta::new(payer.pubkey(), true)], + data: vec![0u8; 100], + }; + let tx = Transaction::new_with_payer(&[ix], Some(&payer.pubkey())); + let native_size = legacy_transaction_size(&tx); + // Sanity: a non-trivial tx should be > 200 bytes. + assert!(native_size > 200, "native_size = {native_size}"); + // And under the packet limit. + assert!(native_size < PACKET_DATA_SIZE); + } + + // -- pair_state_nullify_batches integration tests -- + + /// Shared test fixtures that mimic real nullify_2 instructions: same + /// program_id, same queue, same merkle tree, differing only in proof + /// remaining-accounts and per-leaf instruction data. + struct TestFixture { + program_id: Pubkey, + merkle_tree: Pubkey, + // Base accounts shared by every nullify_2 instruction. + base_accounts: Vec, + } + + impl TestFixture { + fn new(payer: &Keypair) -> Self { + let program_id = Pubkey::new_unique(); + let queue = Pubkey::new_unique(); + let merkle_tree = Pubkey::new_unique(); + + // 8 base accounts: authority, forester_pda, registered_program, + // queue, merkle_tree, log_wrapper, cpi_authority, acc_compression + let base_accounts = vec![ + AccountMeta::new(payer.pubkey(), true), + AccountMeta::new(Pubkey::new_unique(), false), + AccountMeta::new_readonly(Pubkey::new_unique(), false), + AccountMeta::new(queue, false), + AccountMeta::new(merkle_tree, false), + AccountMeta::new_readonly(Pubkey::new_unique(), false), + AccountMeta::new_readonly(Pubkey::new_unique(), false), + AccountMeta::new_readonly(Pubkey::new_unique(), false), + ]; + + Self { + program_id, + merkle_tree, + base_accounts, + } + } + + fn make_ix(&self, leaf_index: u64, proof_nodes: Vec<[u8; 32]>) -> StateNullifyInstruction { + let mut accounts = self.base_accounts.clone(); + for node in &proof_nodes { + accounts.push(AccountMeta::new_readonly( + Pubkey::new_from_array(*node), + false, + )); + } + let instruction = Instruction { + program_id: self.program_id, + accounts, + data: vec![0u8; 27], // 8-byte discriminator + 19-byte scalar payload + }; + StateNullifyInstruction { + instruction, + proof_nodes, + leaf_index, + merkle_tree: self.merkle_tree, + } + } + } + + fn shared_proof(prefix: u8) -> [u8; 32] { + let mut node = [0u8; 32]; + node[0] = prefix; + node + } + + fn unique_proof(idx: u16) -> [u8; 32] { + let mut node = [0xFFu8; 32]; + node[0] = (idx >> 8) as u8; + node[1] = (idx & 0xFF) as u8; + node + } + + #[test] + fn pair_state_nullify_batches_pairs_overlapping_proofs() { + let payer = Keypair::new(); + let fx = TestFixture::new(&payer); + + // 4 instructions, each with exactly 16 proof nodes (realistic). + // ix0 and ix1 share 14/16 nodes (like adjacent leaves in a tree). + // ix2 and ix3 share 14/16 nodes (different subtree). + let shared_0_1: Vec<[u8; 32]> = (0..14).map(shared_proof).collect(); + let shared_2_3: Vec<[u8; 32]> = (100..114).map(shared_proof).collect(); + + let mut proof_0: Vec<[u8; 32]> = shared_0_1.clone(); + proof_0.extend((0..2).map(unique_proof)); + let mut proof_1: Vec<[u8; 32]> = shared_0_1; + proof_1.extend((10..12).map(unique_proof)); + let mut proof_2: Vec<[u8; 32]> = shared_2_3.clone(); + proof_2.extend((20..22).map(unique_proof)); + let mut proof_3: Vec<[u8; 32]> = shared_2_3; + proof_3.extend((40..42).map(unique_proof)); + + let ixs = vec![ + fx.make_ix(10, proof_0), + fx.make_ix(11, proof_1), + fx.make_ix(50, proof_2), + fx.make_ix(51, proof_3), + ]; + + let batches = + pair_state_nullify_batches(ixs, &payer.pubkey(), Some(1), Some(200_000), false) + .unwrap(); + + // All 4 should be paired into 2 batches. + assert_eq!(batches.len(), 2, "expected 2 paired batches"); + assert_eq!(batches[0].len(), 2, "first batch should have 2 ixs"); + assert_eq!(batches[1].len(), 2, "second batch should have 2 ixs"); + } + + #[test] + fn pair_state_nullify_batches_single_instruction_no_pairs() { + let payer = Keypair::new(); + let fx = TestFixture::new(&payer); + + let proof: Vec<[u8; 32]> = (0..16).map(shared_proof).collect(); + let ixs = vec![fx.make_ix(42, proof)]; + + let batches = + pair_state_nullify_batches(ixs, &payer.pubkey(), Some(1), Some(200_000), false) + .unwrap(); + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].len(), 1); + } + + #[test] + fn pair_state_nullify_batches_sorted_by_leaf_index() { + let payer = Keypair::new(); + let fx = TestFixture::new(&payer); + + // Two instructions with identical proofs → will pair. + let proof: Vec<[u8; 32]> = (0..16).map(shared_proof).collect(); + let ixs = vec![fx.make_ix(999, proof.clone()), fx.make_ix(1, proof)]; + + let batches = + pair_state_nullify_batches(ixs, &payer.pubkey(), Some(1), Some(200_000), false) + .unwrap(); + + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].len(), 2); + } + + #[test] + fn pair_state_nullify_batches_no_edges_falls_back_to_singles() { + let payer = Keypair::new(); + let fx = TestFixture::new(&payer); + + // Create instructions with huge data that won't fit paired in one tx. + let make_big_ix = |leaf_index: u64| -> StateNullifyInstruction { + let proof_nodes: Vec<[u8; 32]> = (0..16) + .map(|i| unique_proof(leaf_index as u16 * 100 + i)) + .collect(); + let mut accounts = fx.base_accounts.clone(); + for node in &proof_nodes { + accounts.push(AccountMeta::new_readonly( + Pubkey::new_from_array(*node), + false, + )); + } + // Large data payload to force tx over size limit when paired. + let instruction = Instruction { + program_id: fx.program_id, + accounts, + data: vec![0u8; 500], + }; + StateNullifyInstruction { + instruction, + proof_nodes, + leaf_index, + merkle_tree: fx.merkle_tree, + } + }; + + let ixs = vec![make_big_ix(1), make_big_ix(2)]; + let batches = + pair_state_nullify_batches(ixs, &payer.pubkey(), Some(1), Some(200_000), false) + .unwrap(); + + // Both should be singles since pairing exceeds tx size. + assert_eq!(batches.len(), 2); + assert_eq!(batches[0].len(), 1); + assert_eq!(batches[1].len(), 1); + } + + #[test] + fn build_instruction_batches_separates_address_and_state() { + let payer = Keypair::new(); + let fx = TestFixture::new(&payer); + + let addr_ix = Instruction { + program_id: Pubkey::new_unique(), + accounts: vec![AccountMeta::new(payer.pubkey(), true)], + data: vec![0u8; 50], + }; + + let proof: Vec<[u8; 32]> = (0..16).map(shared_proof).collect(); + let state_ix_0 = fx.make_ix(10, proof.clone()); + let state_ix_1 = fx.make_ix(11, proof); + + let prepared = vec![ + PreparedV1Instruction::AddressUpdate(addr_ix), + PreparedV1Instruction::StateNullify(state_ix_0), + PreparedV1Instruction::StateNullify(state_ix_1), + ]; + + let batches = build_instruction_batches( + prepared, + 2, + true, + false, + &payer.pubkey(), + Some(1), + Some(200_000), + ) + .unwrap(); + + // 1 address batch + 1 paired state batch. + assert_eq!(batches.len(), 2); + assert_eq!(batches[0].len(), 1, "address batch should have 1 ix"); + assert_eq!(batches[1].len(), 2, "state batch should be paired"); + } + + #[test] + fn build_instruction_batches_no_pairing_when_disabled() { + let payer = Keypair::new(); + let fx = TestFixture::new(&payer); + + let proof: Vec<[u8; 32]> = (0..16).map(shared_proof).collect(); + let state_ix_0 = fx.make_ix(10, proof.clone()); + let state_ix_1 = fx.make_ix(11, proof); + + let prepared = vec![ + PreparedV1Instruction::StateNullify(state_ix_0), + PreparedV1Instruction::StateNullify(state_ix_1), + ]; + + let batches = build_instruction_batches( + prepared, + 2, + false, // pairing disabled + false, + &payer.pubkey(), + Some(1), + Some(200_000), + ) + .unwrap(); + + // Each state nullify should be a separate batch. + assert_eq!(batches.len(), 2); + assert_eq!(batches[0].len(), 1); + assert_eq!(batches[1].len(), 1); + } +} diff --git a/forester/src/smart_transaction.rs b/forester/src/smart_transaction.rs index 38df9f70be..c619319812 100644 --- a/forester/src/smart_transaction.rs +++ b/forester/src/smart_transaction.rs @@ -173,7 +173,7 @@ pub fn collect_priority_fee_accounts(payer: Pubkey, instructions: &[Instruction] account_keys } -fn with_compute_budget_instructions( +pub(crate) fn with_compute_budget_instructions( mut instructions: Vec, compute_budget: ComputeBudgetConfig, ) -> Vec { diff --git a/program-tests/registry-test/tests/nullify_2_regression.rs b/program-tests/registry-test/tests/nullify_2_regression.rs new file mode 100644 index 0000000000..465248d340 --- /dev/null +++ b/program-tests/registry-test/tests/nullify_2_regression.rs @@ -0,0 +1,240 @@ +use account_compression::state::QueueAccount; +use forester_utils::account_zero_copy::{get_concurrent_merkle_tree, get_hash_set}; +use light_compressed_account::TreeType; +use light_hasher::Poseidon; +use light_program_test::{ + indexer::state_tree::StateMerkleTreeBundle, program_test::LightProgramTest, + utils::assert::assert_rpc_error, ProgramTestConfig, +}; +use light_registry::{ + account_compression_cpi::sdk::{ + create_nullify_2_instruction, create_nullify_instruction, CreateNullify2InstructionInputs, + CreateNullifyInstructionInputs, + }, + errors::RegistryError, +}; +use light_test_utils::{e2e_test_env::init_program_test_env, Rpc}; +use serial_test::serial; +use solana_sdk::{ + pubkey::Pubkey, + signature::{Keypair, Signer}, +}; + +/// Queue item data extracted from the nullifier queue. +struct QueueEntry { + queue_index: u16, + leaf_index: u64, + proof: Vec<[u8; 32]>, + change_log_index: u64, +} + +/// Shared test environment: creates a state tree, compresses SOL, and transfers +/// to populate the nullifier queue. +async fn setup_tree_with_nullifier_queue_entries( + num_entries: usize, +) -> (LightProgramTest, StateMerkleTreeBundle, Keypair) { + let mut rpc = LightProgramTest::new(ProgramTestConfig::default_with_batched_trees(true)) + .await + .unwrap(); + rpc.indexer = None; + let env = rpc.test_accounts.clone(); + let forester = Keypair::new(); + rpc.airdrop_lamports(&forester.pubkey(), 10_000_000_000) + .await + .unwrap(); + let merkle_tree_keypair = Keypair::new(); + let nullifier_queue_keypair = Keypair::new(); + let cpi_context_keypair = Keypair::new(); + + let (rpc, state_tree_bundle) = { + let mut e2e_env = init_program_test_env(rpc, &env, 50).await; + e2e_env.indexer.state_merkle_trees.clear(); + e2e_env.keypair_action_config.fee_assert = false; + e2e_env + .indexer + .add_state_merkle_tree( + &mut e2e_env.rpc, + &merkle_tree_keypair, + &nullifier_queue_keypair, + &cpi_context_keypair, + None, + Some(forester.pubkey()), + TreeType::StateV1, + ) + .await; + + for _ in 0..num_entries { + e2e_env + .compress_sol_deterministic(&forester, 1_000_000, None) + .await; + e2e_env + .transfer_sol_deterministic(&forester, &Pubkey::new_unique(), None) + .await + .unwrap(); + } + + (e2e_env.rpc, e2e_env.indexer.state_merkle_trees[0].clone()) + }; + + (rpc, state_tree_bundle, forester) +} + +/// Read pending queue entries from the nullifier queue. +async fn read_queue_entries( + rpc: &mut LightProgramTest, + state_tree_bundle: &StateMerkleTreeBundle, + max_entries: usize, +) -> Vec { + let nullifier_queue = unsafe { + get_hash_set::(rpc, state_tree_bundle.accounts.nullifier_queue).await + } + .unwrap(); + + let onchain_tree = + get_concurrent_merkle_tree::( + rpc, + state_tree_bundle.accounts.merkle_tree, + ) + .await + .unwrap(); + let change_log_index = onchain_tree.changelog_index() as u64; + + let mut entries = Vec::new(); + for i in 0..nullifier_queue.get_capacity() { + if entries.len() >= max_entries { + break; + } + let bucket = nullifier_queue.get_bucket(i).unwrap(); + if let Some(bucket) = bucket { + if bucket.sequence_number.is_none() { + let account_hash = bucket.value_bytes(); + let leaf_index = state_tree_bundle + .merkle_tree + .get_leaf_index(&account_hash) + .unwrap() as u64; + let proof = state_tree_bundle + .merkle_tree + .get_proof_of_leaf(leaf_index as usize, false) + .unwrap(); + + entries.push(QueueEntry { + queue_index: i as u16, + leaf_index, + proof, + change_log_index, + }); + } + } + } + entries +} + +#[serial] +#[tokio::test] +async fn test_nullify_2_validation_and_success() { + let (mut rpc, state_tree_bundle, forester) = setup_tree_with_nullifier_queue_entries(1).await; + let entries = read_queue_entries(&mut rpc, &state_tree_bundle, 1).await; + let entry = &entries[0]; + + let valid_ix = create_nullify_2_instruction( + CreateNullify2InstructionInputs { + authority: forester.pubkey(), + nullifier_queue: state_tree_bundle.accounts.nullifier_queue, + merkle_tree: state_tree_bundle.accounts.merkle_tree, + change_log_index: entry.change_log_index, + leaves_queue_index: entry.queue_index, + index: entry.leaf_index, + proof: entry.proof.clone().try_into().unwrap(), + derivation: forester.pubkey(), + is_metadata_forester: true, + }, + 0, + ); + + // Test: empty proof accounts → InvalidProofAccountsLength. + let mut empty_proof_accounts_ix = valid_ix.clone(); + empty_proof_accounts_ix + .accounts + .truncate(empty_proof_accounts_ix.accounts.len() - entry.proof.len()); + let result = rpc + .create_and_send_transaction(&[empty_proof_accounts_ix], &forester.pubkey(), &[&forester]) + .await; + assert_rpc_error(result, 0, RegistryError::InvalidProofAccountsLength.into()).unwrap(); + + // Test: success with valid instruction. + rpc.create_and_send_transaction(&[valid_ix], &forester.pubkey(), &[&forester]) + .await + .unwrap(); +} + +#[serial] +#[tokio::test] +async fn test_legacy_nullify_still_succeeds() { + let (mut rpc, state_tree_bundle, forester) = setup_tree_with_nullifier_queue_entries(1).await; + let entries = read_queue_entries(&mut rpc, &state_tree_bundle, 1).await; + let entry = &entries[0]; + + let legacy_ix = create_nullify_instruction( + CreateNullifyInstructionInputs { + authority: forester.pubkey(), + nullifier_queue: state_tree_bundle.accounts.nullifier_queue, + merkle_tree: state_tree_bundle.accounts.merkle_tree, + change_log_indices: vec![entry.change_log_index], + leaves_queue_indices: vec![entry.queue_index], + indices: vec![entry.leaf_index], + proofs: vec![entry.proof.clone()], + derivation: forester.pubkey(), + is_metadata_forester: true, + }, + 0, + ); + rpc.create_and_send_transaction(&[legacy_ix], &forester.pubkey(), &[&forester]) + .await + .unwrap(); +} + +#[serial] +#[tokio::test] +async fn test_paired_nullify_2_in_single_transaction() { + let (mut rpc, state_tree_bundle, forester) = setup_tree_with_nullifier_queue_entries(2).await; + let entries = read_queue_entries(&mut rpc, &state_tree_bundle, 2).await; + assert!( + entries.len() >= 2, + "need at least 2 queue entries, got {}", + entries.len() + ); + + let ix_0 = create_nullify_2_instruction( + CreateNullify2InstructionInputs { + authority: forester.pubkey(), + nullifier_queue: state_tree_bundle.accounts.nullifier_queue, + merkle_tree: state_tree_bundle.accounts.merkle_tree, + change_log_index: entries[0].change_log_index, + leaves_queue_index: entries[0].queue_index, + index: entries[0].leaf_index, + proof: entries[0].proof.clone().try_into().unwrap(), + derivation: forester.pubkey(), + is_metadata_forester: true, + }, + 0, + ); + let ix_1 = create_nullify_2_instruction( + CreateNullify2InstructionInputs { + authority: forester.pubkey(), + nullifier_queue: state_tree_bundle.accounts.nullifier_queue, + merkle_tree: state_tree_bundle.accounts.merkle_tree, + change_log_index: entries[1].change_log_index, + leaves_queue_index: entries[1].queue_index, + index: entries[1].leaf_index, + proof: entries[1].proof.clone().try_into().unwrap(), + derivation: forester.pubkey(), + is_metadata_forester: true, + }, + 0, + ); + + // Both nullify_2 instructions in a single transaction (the core pairing use case). + rpc.create_and_send_transaction(&[ix_0, ix_1], &forester.pubkey(), &[&forester]) + .await + .unwrap(); +} diff --git a/programs/registry/src/account_compression_cpi/nullify.rs b/programs/registry/src/account_compression_cpi/nullify.rs index 818e2b43a8..a4f03d8511 100644 --- a/programs/registry/src/account_compression_cpi/nullify.rs +++ b/programs/registry/src/account_compression_cpi/nullify.rs @@ -5,6 +5,8 @@ use anchor_lang::prelude::*; use crate::epoch::register_epoch::ForesterEpochPda; +pub(crate) const NULLIFY_2_PROOF_ACCOUNTS_LEN: usize = 16; + #[derive(Accounts)] pub struct NullifyLeaves<'info> { /// CHECK: only eligible foresters can nullify leaves. Is checked in ix. @@ -61,3 +63,12 @@ pub fn process_nullify( proofs, ) } + +pub(crate) fn extract_proof_nodes_from_remaining_accounts( + remaining_accounts: &[AccountInfo<'_>], +) -> Vec<[u8; 32]> { + remaining_accounts + .iter() + .map(|account_info| account_info.key().to_bytes()) + .collect() +} diff --git a/programs/registry/src/account_compression_cpi/sdk.rs b/programs/registry/src/account_compression_cpi/sdk.rs index f002c35499..7ee3e1a575 100644 --- a/programs/registry/src/account_compression_cpi/sdk.rs +++ b/programs/registry/src/account_compression_cpi/sdk.rs @@ -9,7 +9,7 @@ use light_batched_merkle_tree::{ initialize_state_tree::InitStateTreeAccountsInstructionData, }; use light_system_program::program::LightSystemProgram; -use solana_sdk::instruction::Instruction; +use solana_sdk::instruction::{AccountMeta, Instruction}; use crate::utils::{ get_cpi_authority_pda, get_forester_epoch_pda_from_authority, get_protocol_config_pda_address, @@ -26,6 +26,18 @@ pub struct CreateNullifyInstructionInputs { pub is_metadata_forester: bool, } +pub struct CreateNullify2InstructionInputs { + pub authority: Pubkey, + pub nullifier_queue: Pubkey, + pub merkle_tree: Pubkey, + pub change_log_index: u64, + pub leaves_queue_index: u16, + pub index: u64, + pub proof: [[u8; 32]; 16], + pub derivation: Pubkey, + pub is_metadata_forester: bool, +} + pub fn create_nullify_instruction( inputs: CreateNullifyInstructionInputs, epoch: u64, @@ -62,6 +74,49 @@ pub fn create_nullify_instruction( } } +pub fn create_nullify_2_instruction( + inputs: CreateNullify2InstructionInputs, + epoch: u64, +) -> Instruction { + let register_program_pda = get_registered_program_pda(&crate::ID); + let registered_forester_pda = if inputs.is_metadata_forester { + None + } else { + Some(get_forester_epoch_pda_from_authority(&inputs.derivation, epoch).0) + }; + let (cpi_authority, bump) = get_cpi_authority_pda(); + let instruction_data = crate::instruction::Nullify2 { + bump, + change_log_index: inputs.change_log_index, + leaves_queue_index: inputs.leaves_queue_index, + index: inputs.index, + }; + + let base_accounts = crate::accounts::NullifyLeaves { + authority: inputs.authority, + registered_forester_pda, + registered_program_pda: register_program_pda, + nullifier_queue: inputs.nullifier_queue, + merkle_tree: inputs.merkle_tree, + log_wrapper: NOOP_PUBKEY.into(), + cpi_authority, + account_compression_program: account_compression::ID, + }; + let mut accounts = base_accounts.to_account_metas(Some(true)); + for node in inputs.proof { + accounts.push(AccountMeta::new_readonly( + Pubkey::new_from_array(node), + false, + )); + } + + Instruction { + program_id: crate::ID, + accounts, + data: instruction_data.data(), + } +} + #[derive(Clone, Debug, PartialEq)] pub struct CreateMigrateStateInstructionInputs { pub authority: Pubkey, @@ -545,3 +600,85 @@ pub fn create_rollover_batch_address_tree_instruction( data: instruction_data.data(), } } + +#[cfg(test)] +mod tests { + use anchor_lang::Discriminator; + + use super::*; + + #[test] + fn create_nullify_instruction_uses_legacy_payload() { + let authority = Pubkey::new_unique(); + let derivation = Pubkey::new_unique(); + let nullifier_queue = Pubkey::new_unique(); + let merkle_tree = Pubkey::new_unique(); + let proof = (0..16) + .map(|i| { + let mut node = [0u8; 32]; + node[0] = i as u8; + node + }) + .collect::>(); + let ix = create_nullify_instruction( + CreateNullifyInstructionInputs { + authority, + nullifier_queue, + merkle_tree, + change_log_indices: vec![7], + leaves_queue_indices: vec![11], + indices: vec![42], + proofs: vec![proof], + derivation, + is_metadata_forester: false, + }, + 1, + ); + + assert_eq!(ix.program_id, crate::ID); + assert_eq!(ix.accounts.len(), 8); + assert_eq!(&ix.data[..8], crate::instruction::Nullify::DISCRIMINATOR); + assert_eq!(ix.data.len(), 559); + } + + #[test] + fn create_nullify_2_instruction_uses_minimal_payload_and_remaining_accounts() { + let authority = Pubkey::new_unique(); + let derivation = Pubkey::new_unique(); + let nullifier_queue = Pubkey::new_unique(); + let merkle_tree = Pubkey::new_unique(); + let proof = (0..16) + .map(|i| { + let mut node = [0u8; 32]; + node[0] = i as u8; + node + }) + .collect::>(); + let ix = create_nullify_2_instruction( + CreateNullify2InstructionInputs { + authority, + nullifier_queue, + merkle_tree, + change_log_index: 7, + leaves_queue_index: 11, + index: 42, + proof: proof.clone().try_into().unwrap(), + derivation, + is_metadata_forester: false, + }, + 1, + ); + + assert_eq!(ix.program_id, crate::ID); + assert_eq!(ix.accounts.len(), 8 + 16); + for (account_meta, node) in ix.accounts[8..].iter().zip(proof.iter()) { + assert_eq!(account_meta.pubkey, Pubkey::new_from_array(*node)); + assert!(!account_meta.is_signer); + assert!(!account_meta.is_writable); + } + + assert_eq!(&ix.data[..8], crate::instruction::Nullify2::DISCRIMINATOR); + // 8-byte discriminator + 19-byte scalar payload (u8 + u64 + u16 + u64). + assert_eq!(ix.data.len(), 27); + } +} diff --git a/programs/registry/src/errors.rs b/programs/registry/src/errors.rs index 7c445d2ca3..e0a176fb41 100644 --- a/programs/registry/src/errors.rs +++ b/programs/registry/src/errors.rs @@ -42,4 +42,6 @@ pub enum RegistryError { BorrowAccountDataFailed, #[msg("Failed to serialize instruction data")] SerializationFailed, + #[msg("Nullify2 proof accounts length is invalid")] + InvalidProofAccountsLength, } diff --git a/programs/registry/src/lib.rs b/programs/registry/src/lib.rs index a21b58cd4b..d4ce2f2aa0 100644 --- a/programs/registry/src/lib.rs +++ b/programs/registry/src/lib.rs @@ -11,6 +11,9 @@ use light_merkle_tree_metadata::merkle_tree::MerkleTreeMetadata; pub mod account_compression_cpi; pub mod errors; +use account_compression_cpi::nullify::{ + extract_proof_nodes_from_remaining_accounts, NULLIFY_2_PROOF_ACCOUNTS_LEN, +}; pub use account_compression_cpi::{ batch_append::*, batch_nullify::*, batch_update_address_tree::*, initialize_batched_address_tree::*, initialize_batched_state_tree::*, @@ -420,6 +423,37 @@ pub mod light_registry { ) } + pub fn nullify_2<'info>( + ctx: Context<'_, '_, '_, 'info, NullifyLeaves<'info>>, + bump: u8, + change_log_index: u64, + leaves_queue_index: u16, + index: u64, + ) -> Result<()> { + let metadata = ctx.accounts.merkle_tree.load()?.metadata; + check_forester( + &metadata, + ctx.accounts.authority.key(), + ctx.accounts.nullifier_queue.key(), + &mut ctx.accounts.registered_forester_pda, + DEFAULT_WORK_V1, + )?; + + if ctx.remaining_accounts.len() != NULLIFY_2_PROOF_ACCOUNTS_LEN { + return err!(RegistryError::InvalidProofAccountsLength); + } + let proof_nodes = extract_proof_nodes_from_remaining_accounts(ctx.remaining_accounts); + + process_nullify( + &ctx, + bump, + vec![change_log_index], + vec![leaves_queue_index], + vec![index], + vec![proof_nodes], + ) + } + #[allow(clippy::too_many_arguments)] pub fn update_address_merkle_tree( ctx: Context,