diff --git a/magicblock-chainlink/src/remote_account_provider/chain_laser_actor.rs b/magicblock-chainlink/src/remote_account_provider/chain_laser_actor.rs deleted file mode 100644 index 913c0b326..000000000 --- a/magicblock-chainlink/src/remote_account_provider/chain_laser_actor.rs +++ /dev/null @@ -1,945 +0,0 @@ -use std::{ - collections::{HashMap, HashSet}, - fmt, - pin::Pin, - sync::{ - atomic::{AtomicU16, AtomicU64, Ordering}, - Arc, - }, - time::Duration, -}; - -use futures_util::{Stream, StreamExt}; -use helius_laserstream::{ - client, - grpc::{ - subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, - SubscribeRequestFilterAccounts, SubscribeRequestFilterSlots, - SubscribeUpdate, - }, - ChannelOptions, LaserstreamConfig, LaserstreamError, -}; -use magicblock_core::logger::log_trace_debug; -use magicblock_metrics::metrics::{ - inc_account_subscription_account_updates_count, - inc_account_subscription_activations_count, - inc_per_program_account_updates_count, - inc_program_subscription_account_updates_count, -}; -use parking_lot::RwLock; -use solana_account::Account; -use solana_commitment_config::CommitmentLevel as SolanaCommitmentLevel; -use solana_pubkey::Pubkey; -use solana_sdk_ids::sysvar::clock; -use tokio::sync::{mpsc, oneshot}; -use tokio_stream::StreamMap; -use tonic::Code; -use tracing::*; - -use super::{ - chain_rpc_client::{ChainRpcClient, ChainRpcClientImpl}, - chain_slot::ChainSlot, -}; -use crate::remote_account_provider::{ - pubsub_common::{ - ChainPubsubActorMessage, MESSAGE_CHANNEL_SIZE, - SUBSCRIPTION_UPDATE_CHANNEL_SIZE, - }, - RemoteAccountProviderError, RemoteAccountProviderResult, - SubscriptionUpdate, -}; - -type LaserResult = Result; -type LaserStreamUpdate = (usize, LaserResult); -type LaserStream = Pin + Send>>; - -const PER_STREAM_SUBSCRIPTION_LIMIT: usize = 1_000; -const SUBSCRIPTION_ACTIVATION_INTERVAL_MILLIS: u64 = 10_000; -const SLOTS_BETWEEN_ACTIVATIONS: u64 = - SUBSCRIPTION_ACTIVATION_INTERVAL_MILLIS / 400; -const MAX_SLOTS_BACKFILL: u64 = 400; - -pub type SharedSubscriptions = Arc>>; - -// ----------------- -// Slots -// ----------------- -/// Shared slot tracking for activation lookback and chain slot synchronization. -#[derive(Debug)] -pub struct Slots { - /// The current slot on chain, shared with RemoteAccountProvider. - /// Updated via `update()` when slot updates are received from GRPC. - /// Metrics are automatically captured on updates. - pub chain_slot: ChainSlot, - /// The last slot at which activation happened (used for backfilling). - pub last_activation_slot: AtomicU64, - /// Whether this GRPC endpoint supports backfilling subscription updates. - pub supports_backfill: bool, -} - -// ----------------- -// AccountUpdateSource -// ----------------- -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum AccountUpdateSource { - Account, - Program, -} - -impl fmt::Display for AccountUpdateSource { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Account => write!(f, "account"), - Self::Program => write!(f, "program"), - } - } -} - -// ----------------- -// ChainLaserActor -// ----------------- -/// ChainLaserActor manages gRPC subscriptions to Helius Laser or Triton endpoints. -/// -/// ## Subscription Lifecycle -/// -/// 1. **Requested**: User calls `subscribe(pubkey)`. Pubkey is added to `subscriptions` set. -/// 2. **Queued**: Every [SUBSCRIPTION_ACTIVATION_INTERVAL_MILLIS], `update_active_subscriptions()` creates new streams. -/// 3. **Active**: Subscriptions are sent to Helius/Triton via gRPC streams in `active_subscriptions`. -/// 4. **Updates**: Account updates flow back via the streams and are forwarded to the consumer. -/// -/// ## Stream Management -/// -/// - Subscriptions are grouped into chunks of up to 1,000 per stream (Helius limit). -/// - Each chunk gets its own gRPC stream (`StreamMap`). -/// - When subscriptions change, ALL streams are dropped and recreated. -/// - This simplifies reasoning but loses in-flight updates during the transition. -/// -/// ## Reconnection Behavior -/// -/// - If a stream ends unexpectedly, `signal_connection_issue()` is called. -/// - The actor sends an abort signal to the submux, which triggers reconnection. -/// - The actor itself doesn't attempt to reconnect; it relies on external recovery. -pub struct ChainLaserActor { - /// Configuration used to create the laser client - laser_client_config: LaserstreamConfig, - /// Requested subscriptions, some may not be active yet. - /// Shared with ChainLaserClientImpl for sync access to - /// subscription_count and subscriptions_union. - subscriptions: SharedSubscriptions, - /// Pubkeys of currently active subscriptions - active_subscription_pubkeys: HashSet, - /// Subscriptions that have been activated via the helius provider - active_subscriptions: StreamMap, - /// Active streams for program subscriptions - program_subscriptions: Option<(HashSet, LaserStream)>, - /// Receives subscribe/unsubscribe messages to this actor - messages_receiver: mpsc::Receiver, - /// Sends updates for any account subscription that is received via - /// the Laser client subscription mechanism - subscription_updates_sender: mpsc::Sender, - /// The commitment level to use for subscriptions - commitment: CommitmentLevel, - /// Channel used to signal connection issues to the submux - abort_sender: mpsc::Sender<()>, - /// Slot tracking for chain slot synchronization and activation lookback - slots: Slots, - /// Unique client ID including the gRPC provider name for this actor instance used in logs - /// and metrics - client_id: String, - /// RPC client for diagnostics (e.g., fetching slot when falling behind) - rpc_client: ChainRpcClientImpl, -} - -impl ChainLaserActor { - pub fn new_from_url( - pubsub_url: &str, - client_id: &str, - api_key: &str, - commitment: SolanaCommitmentLevel, - abort_sender: mpsc::Sender<()>, - slots: Slots, - rpc_client: ChainRpcClientImpl, - ) -> ( - Self, - mpsc::Sender, - mpsc::Receiver, - SharedSubscriptions, - ) { - let channel_options = ChannelOptions { - connect_timeout_secs: Some(5), - http2_keep_alive_interval_secs: Some(15), - tcp_keepalive_secs: Some(30), - ..Default::default() - }; - let laser_client_config = LaserstreamConfig { - api_key: api_key.to_string(), - endpoint: pubsub_url.to_string(), - max_reconnect_attempts: Some(4), - channel_options, - replay: true, - }; - Self::new( - client_id, - laser_client_config, - commitment, - abort_sender, - slots, - rpc_client, - ) - } - - pub fn new( - client_id: &str, - laser_client_config: LaserstreamConfig, - commitment: SolanaCommitmentLevel, - abort_sender: mpsc::Sender<()>, - slots: Slots, - rpc_client: ChainRpcClientImpl, - ) -> ( - Self, - mpsc::Sender, - mpsc::Receiver, - SharedSubscriptions, - ) { - let (subscription_updates_sender, subscription_updates_receiver) = - mpsc::channel(SUBSCRIPTION_UPDATE_CHANNEL_SIZE); - let (messages_sender, messages_receiver) = - mpsc::channel(MESSAGE_CHANNEL_SIZE); - let commitment = grpc_commitment_from_solana(commitment); - - let subscriptions: SharedSubscriptions = Default::default(); - let shared_subscriptions = Arc::clone(&subscriptions); - - let me = Self { - laser_client_config, - messages_receiver, - subscriptions, - active_subscriptions: Default::default(), - active_subscription_pubkeys: Default::default(), - program_subscriptions: Default::default(), - subscription_updates_sender, - commitment, - abort_sender, - slots, - client_id: client_id.to_string(), - rpc_client, - }; - - ( - me, - messages_sender, - subscription_updates_receiver, - shared_subscriptions, - ) - } - - #[allow(dead_code)] - #[instrument(skip(self), fields(client_id = %self.client_id))] - fn shutdown(&mut self) { - info!("Shutting down laser actor"); - self.subscriptions.write().clear(); - self.active_subscriptions.clear(); - self.active_subscription_pubkeys.clear(); - } - - #[instrument(skip(self), fields(client_id = %self.client_id))] - pub async fn run(mut self) { - let mut activate_subs_interval = - tokio::time::interval(std::time::Duration::from_millis( - SUBSCRIPTION_ACTIVATION_INTERVAL_MILLIS, - )); - - loop { - tokio::select! { - // Actor messages - msg = self.messages_receiver.recv() => { - match msg { - Some(msg) => { - let is_shutdown = self.handle_msg(msg); - if is_shutdown { - break; - } - } - None => { - break; - } - } - } - // Account subscription updates - update = self.active_subscriptions.next(), if !self.active_subscriptions.is_empty() => { - match update { - Some(update) => { - self.handle_account_update(update).await; - } - None => { - debug!("Account subscription stream ended"); - Self::signal_connection_issue( - &self.subscriptions, - &mut self.active_subscriptions, - &mut self.active_subscription_pubkeys, - &mut self.program_subscriptions, - &self.abort_sender, - &self.client_id, - ) - .await; - } - } - }, - // Program subscription updates - update = async { - match &mut self.program_subscriptions { - Some((_, stream)) => stream.next().await, - None => std::future::pending().await, - } - }, if self.program_subscriptions.is_some() => { - match update { - Some(update) => { - self.handle_program_update(update).await; - } - None => { - debug!("Program subscription stream ended"); - Self::signal_connection_issue( - &self.subscriptions, - &mut self.active_subscriptions, - &mut self.active_subscription_pubkeys, - &mut self.program_subscriptions, - &self.abort_sender, - &self.client_id, - ) - .await; - } - } - }, - // Activate pending subscriptions - _ = activate_subs_interval.tick() => { - self.update_active_subscriptions(); - }, - - } - } - } - - fn handle_msg(&mut self, msg: ChainPubsubActorMessage) -> bool { - use ChainPubsubActorMessage::*; - match msg { - AccountSubscribe { - pubkey, response, .. - } => { - self.add_sub(pubkey, response); - false - } - AccountUnsubscribe { pubkey, response } => { - self.remove_sub(&pubkey, response); - false - } - ProgramSubscribe { pubkey, response } => { - let commitment = self.commitment; - let laser_client_config = self.laser_client_config.clone(); - self.add_program_sub(pubkey, commitment, laser_client_config); - let _ = response.send(Ok(())).inspect_err(|_| { - warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response"); - }); - false - } - Reconnect { response } => { - // We cannot do much more here to _reconnect_ since we will do so once we activate - // subscriptions again and that method does not return any error information. - // Subscriptions were already cleared when the connection issue was signaled. - let _ = response.send(Ok(())).inspect_err(|_| { - warn!( - client_id = self.client_id, - "Failed to send reconnect response" - ); - }); - false - } - Shutdown { response } => { - info!(client_id = self.client_id, "Received Shutdown message"); - Self::clear_subscriptions( - &self.subscriptions, - &mut self.active_subscriptions, - &mut self.active_subscription_pubkeys, - &mut self.program_subscriptions, - ); - let _ = response.send(Ok(())).inspect_err(|_| { - warn!( - client_id = self.client_id, - "Failed to send shutdown response" - ); - }); - true - } - } - } - - /// Tracks subscriptions, but does not yet activate them. - fn add_sub( - &mut self, - pubkey: Pubkey, - sub_response: oneshot::Sender>, - ) { - let inserted = { - // Fast path: check with read lock first - let already_subscribed = { - let subs = self.subscriptions.read(); - subs.contains(&pubkey) - }; - - if already_subscribed { - false - } else { - // Write lock only when we need to modify - let mut subs = self.subscriptions.write(); - subs.insert(pubkey); - true - } - }; - if !inserted { - trace!(pubkey = %pubkey, "Already subscribed to account"); - sub_response.send(Ok(())).unwrap_or_else(|_| { - warn!(pubkey = %pubkey, "Failed to send already subscribed response"); - }); - } else { - if self.active_subscriptions.is_empty() { - self.update_active_subscriptions(); - } - sub_response.send(Ok(())).unwrap_or_else(|_| { - warn!(pubkey = %pubkey, "Failed to send subscribe response"); - }) - } - } - - /// Removes a subscription, but does not yet deactivate it. - fn remove_sub( - &mut self, - pubkey: &Pubkey, - unsub_response: oneshot::Sender>, - ) { - let removed = self.subscriptions.write().remove(pubkey); - match removed { - true => { - trace!(pubkey = %pubkey, "Unsubscribed from account"); - unsub_response.send(Ok(())).unwrap_or_else(|_| { - warn!(pubkey = %pubkey, "Failed to send unsubscribe response"); - }); - } - false => { - unsub_response - .send(Err( - RemoteAccountProviderError::AccountSubscriptionDoesNotExist( - pubkey.to_string(), - ), - )) - .unwrap_or_else(|_| { - warn!(pubkey = %pubkey, "Failed to send unsubscribe response"); - }); - } - } - } - - fn update_active_subscriptions(&mut self) { - // Copy subscriptions and release the read lock immediately - let new_pubkeys: HashSet = { - let subs = self.subscriptions.read(); - // Check if the active subscriptions match what we already have - if subs.eq(&self.active_subscription_pubkeys) { - trace!( - count = subs.len(), - "Active subscriptions already up to date" - ); - return; - } - subs.iter().copied().collect() - }; - - inc_account_subscription_activations_count(&self.client_id); - - let mut new_subs: StreamMap = StreamMap::new(); - - // Re-create streams for all subscriptions - let sub_refs = new_pubkeys.iter().collect::>(); - - let chunks = sub_refs - .chunks(PER_STREAM_SUBSCRIPTION_LIMIT) - .map(|chunk| chunk.to_vec()) - .collect::>(); - - let (chain_slot, from_slot) = self - .determine_from_slot() - .map(|(cs, fs)| (Some(cs), Some(fs))) - .unwrap_or((None, None)); - - if tracing::enabled!(tracing::Level::TRACE) { - trace!( - account_count = new_pubkeys.len(), - chain_slot, - from_slot, - stream_count = chunks.len(), - "Activating account subscriptions" - ); - } - - for (idx, chunk) in chunks.into_iter().enumerate() { - let stream = Self::create_accounts_and_slot_stream( - &chunk, - &self.commitment, - &self.laser_client_config, - idx, - from_slot, - ); - new_subs.insert(idx, Box::pin(stream)); - } - - // Drop current active subscriptions by reassignig to new ones - self.active_subscriptions = new_subs; - self.active_subscription_pubkeys = new_pubkeys; - } - - /// Determines the from_slot for backfilling subscription updates. - /// - /// Returns `Some((chain_slot, from_slot))` if backfilling is supported and we have a valid chain slot, - /// otherwise returns `None`. - fn determine_from_slot(&self) -> Option<(u64, u64)> { - if !self.slots.supports_backfill { - return None; - } - - let chain_slot = self.slots.chain_slot.load(); - if chain_slot == 0 { - // If we didn't get a chain slot update yet we cannot backfill - return None; - } - - // Get last activation slot and update to current chain slot - let last_activation_slot = self - .slots - .last_activation_slot - .swap(chain_slot, Ordering::Relaxed); - - // when this is called the first time make the best effort to find a reasonable - // slot to backfill from. - let from_slot = if last_activation_slot == 0 { - chain_slot.saturating_sub(SLOTS_BETWEEN_ACTIVATIONS + 1) - } else { - // Limit how far back we go in order to avoid data loss errors - let target_slot = last_activation_slot.saturating_sub(1); - let delta = chain_slot.saturating_sub(target_slot); - if delta < MAX_SLOTS_BACKFILL { - target_slot - } else { - chain_slot.saturating_sub(MAX_SLOTS_BACKFILL) - } - }; - Some((chain_slot, from_slot)) - } - - /// Helper to create a dedicated stream for a number of accounts. - /// It includes a slot subscription for chain slot synchronization. - /// This is not 100% cleanly separated but avoids creating another connection - /// just for slot updates. - /// NOTE: no slot update subscription will be created until the first - /// accounts subscription is created. - fn create_accounts_and_slot_stream( - pubkeys: &[&Pubkey], - commitment: &CommitmentLevel, - laser_client_config: &LaserstreamConfig, - idx: usize, - from_slot: Option, - ) -> impl Stream { - let mut accounts = HashMap::new(); - accounts.insert( - format!("account_subs: {idx}"), - SubscribeRequestFilterAccounts { - account: pubkeys.iter().map(|pk| pk.to_string()).collect(), - ..Default::default() - }, - ); - - // Subscribe to slot updates for chain_slot synchronization - let mut slots = HashMap::new(); - slots.insert( - "slot_updates".to_string(), - SubscribeRequestFilterSlots { - filter_by_commitment: Some(true), - ..Default::default() - }, - ); - - let request = SubscribeRequest { - accounts, - slots, - commitment: Some((*commitment).into()), - // NOTE: triton does not support backfilling and we could not verify this with - // helius due to being rate limited. - from_slot, - ..Default::default() - }; - client::subscribe(laser_client_config.clone(), request).0 - } - - fn add_program_sub( - &mut self, - program_id: Pubkey, - commitment: CommitmentLevel, - laser_client_config: LaserstreamConfig, - ) { - if self - .program_subscriptions - .as_ref() - .map(|(subscribed_programs, _)| { - subscribed_programs.contains(&program_id) - }) - .unwrap_or(false) - { - trace!(program_id = %program_id, "Program subscription already exists"); - return; - } - - let mut subscribed_programs = self - .program_subscriptions - .as_ref() - .map(|x| x.0.iter().cloned().collect::>()) - .unwrap_or_default(); - - subscribed_programs.insert(program_id); - - let mut accounts = HashMap::new(); - accounts.insert( - format!("program_sub: {program_id}"), - SubscribeRequestFilterAccounts { - owner: subscribed_programs - .iter() - .map(|pk| pk.to_string()) - .collect(), - ..Default::default() - }, - ); - let request = SubscribeRequest { - accounts, - commitment: Some(commitment.into()), - ..Default::default() - }; - let stream = client::subscribe(laser_client_config.clone(), request).0; - self.program_subscriptions = - Some((subscribed_programs, Box::pin(stream))); - } - - /// Handles an update from one of the account data streams. - #[instrument(skip(self), fields(client_id = %self.client_id, stream_index = %idx))] - async fn handle_account_update( - &mut self, - (idx, result): LaserStreamUpdate, - ) { - match result { - Ok(subscribe_update) => { - self.process_subscription_update( - subscribe_update, - AccountUpdateSource::Account, - ) - .await; - } - Err(err) => { - self.handle_stream_error(&err, "account update").await; - } - } - } - - /// Handles an update from the program subscriptions stream. - #[instrument(skip(self), fields(client_id = %self.client_id))] - async fn handle_program_update(&mut self, result: LaserResult) { - match result { - Ok(subscribe_update) => { - self.process_subscription_update( - subscribe_update, - AccountUpdateSource::Program, - ) - .await; - } - Err(err) => { - self.handle_stream_error(&err, "program subscription").await; - } - } - } - - /// Common error handling for stream errors. Detects "fallen behind" errors - /// and spawns diagnostics to compare our last known slot with the actual - /// chain slot via RPC. - async fn handle_stream_error( - &mut self, - err: &LaserstreamError, - source: &str, - ) { - if is_fallen_behind_error(err) { - self.spawn_fallen_behind_diagnostics(source); - } - - error!(error = ?err, slots = ?self.slots, "Error in {} stream", source); - Self::signal_connection_issue( - &self.subscriptions, - &mut self.active_subscriptions, - &mut self.active_subscription_pubkeys, - &mut self.program_subscriptions, - &self.abort_sender, - &self.client_id, - ) - .await; - } - - /// Spawns an async task to fetch the current chain slot via RPC and log - /// how far behind we were when the "fallen behind" error occurred. - /// It also updates the current chain slot in our `chain_slot` tracker to - /// the fetched slot if it is higher than our last known slot. - fn spawn_fallen_behind_diagnostics(&self, source: &str) { - let chain_slot = self.slots.chain_slot.clone(); - let last_chain_slot = chain_slot.load(); - let rpc_client = self.rpc_client.clone(); - let client_id = self.client_id.clone(); - let source = source.to_string(); - - const TIMEOUT_SECS: u64 = 5; - // At 2.5 slots per sec when we factor by 5 we allow - // double the lag that would be caused by the max timeout alone - const MAX_ALLOWED_LAG_SLOTS: u64 = TIMEOUT_SECS * 5; - - tokio::spawn(async move { - let rpc_result = tokio::time::timeout( - Duration::from_secs(TIMEOUT_SECS), - rpc_client.get_slot(), - ) - .await; - - match rpc_result { - Ok(Ok(rpc_chain_slot)) => { - let slot_lag = - rpc_chain_slot.saturating_sub(last_chain_slot); - chain_slot.update(rpc_chain_slot); - if slot_lag > MAX_ALLOWED_LAG_SLOTS { - warn!( - %client_id, - last_chain_slot, - rpc_chain_slot, - slot_lag, - source, - "gRPC reportedly fell behind (DataLoss) due to chain_slot lagging" - ); - } - } - Ok(Err(rpc_err)) => { - debug!( - %client_id, - last_chain_slot, - error = ?rpc_err, - source, - "Failed to fetch RPC slot for DataLoss diagnostics" - ); - } - Err(_timeout) => { - debug!( - %client_id, - last_chain_slot, - source, - "Timeout fetching RPC slot for DataLoss diagnostics" - ); - } - } - }); - } - - fn clear_subscriptions( - subscriptions: &SharedSubscriptions, - active_subscriptions: &mut StreamMap, - active_subscription_pubkeys: &mut HashSet, - program_subscriptions: &mut Option<(HashSet, LaserStream)>, - ) { - subscriptions.write().clear(); - active_subscriptions.clear(); - active_subscription_pubkeys.clear(); - *program_subscriptions = None; - } - - /// Signals a connection issue by clearing all subscriptions and - /// sending a message on the abort channel. - /// NOTE: the laser client should handle reconnects internally, but - /// we add this as a backup in case it is unable to do so - #[instrument(skip(subscriptions, active_subscriptions, active_subscription_pubkeys, program_subscriptions, abort_sender), fields(client_id = %client_id))] - async fn signal_connection_issue( - subscriptions: &SharedSubscriptions, - active_subscriptions: &mut StreamMap, - active_subscription_pubkeys: &mut HashSet, - program_subscriptions: &mut Option<(HashSet, LaserStream)>, - abort_sender: &mpsc::Sender<()>, - client_id: &str, - ) { - static SIGNAL_CONNECTION_COUNT: AtomicU16 = AtomicU16::new(0); - log_trace_debug( - "Signaling connection issue", - "Signaled connection issue", - &client_id, - &RemoteAccountProviderError::ConnectionDisrupted, - 100, - &SIGNAL_CONNECTION_COUNT, - ); - - // Clear all subscriptions - Self::clear_subscriptions( - subscriptions, - active_subscriptions, - active_subscription_pubkeys, - program_subscriptions, - ); - - // Use try_send to avoid blocking and naturally coalesce signals - let _ = abort_sender.try_send(()).inspect_err(|err| { - // Channel full is expected when reconnect is already in progress - if !matches!(err, mpsc::error::TrySendError::Full(_)) { - error!(error = ?err, "Failed to signal connection issue"); - } - }); - } - - /// Processes a subscription update from either account or program streams. - /// We verified via a script that we get an update with Some(Account) when it is - /// closed. In that case lamports == 0 and owner is the system program. - /// Thus an update of `None` is not expected and can be ignored. - /// See: https://gist.github.com/thlorenz/d3d1a380678a030b3e833f8f979319ae - #[instrument( - skip(self, update), - fields( - client_id = %self.client_id, - pubkey = tracing::field::Empty, - slot = tracing::field::Empty, - source = %source, - ) - )] - async fn process_subscription_update( - &mut self, - update: SubscribeUpdate, - source: AccountUpdateSource, - ) { - let Some(update_oneof) = update.update_oneof else { - return; - }; - - // Handle slot updates - update chain_slot to max of current and received - if let UpdateOneof::Slot(slot_update) = &update_oneof { - self.slots.chain_slot.update(slot_update.slot); - return; - } - - let UpdateOneof::Account(acc) = update_oneof else { - return; - }; - - let (Some(account), slot) = (acc.account, acc.slot) else { - return; - }; - - let Ok(pubkey) = Pubkey::try_from(account.pubkey) else { - error!("Failed to parse pubkey"); - return; - }; - - tracing::Span::current() - .record("pubkey", tracing::field::display(pubkey)); - - let log_trace = if tracing::enabled!(tracing::Level::TRACE) { - if pubkey.eq(&clock::ID) { - static TRACE_CLOCK_COUNT: AtomicU64 = AtomicU64::new(0); - TRACE_CLOCK_COUNT - .fetch_add(1, Ordering::Relaxed) - .is_multiple_of(100) - } else { - true - } - } else { - false - }; - - tracing::Span::current().record("slot", slot); - - if log_trace { - trace!("Received subscription update"); - } - - let Ok(owner) = Pubkey::try_from(account.owner) else { - error!(pubkey = %pubkey, "Failed to parse owner pubkey"); - return; - }; - - if matches!(source, AccountUpdateSource::Program) { - inc_per_program_account_updates_count( - &self.client_id, - &owner.to_string(), - ); - } - - if !self.subscriptions.read().contains(&pubkey) { - return; - } - - let account = Account { - lamports: account.lamports, - data: account.data, - owner, - executable: account.executable, - rent_epoch: account.rent_epoch, - }; - let subscription_update = SubscriptionUpdate { - pubkey, - slot, - account: Some(account), - }; - - if pubkey != clock::ID { - match source { - AccountUpdateSource::Account => { - inc_account_subscription_account_updates_count( - &self.client_id, - ); - } - AccountUpdateSource::Program => { - inc_program_subscription_account_updates_count( - &self.client_id, - ); - } - } - } - - self.subscription_updates_sender - .send(subscription_update) - .await - .unwrap_or_else(|_| { - error!(pubkey = %pubkey, "Failed to send subscription update"); - }); - } -} - -// ----------------- -// Helpers -// ----------------- -fn grpc_commitment_from_solana( - commitment: SolanaCommitmentLevel, -) -> CommitmentLevel { - use SolanaCommitmentLevel::*; - match commitment { - Finalized => CommitmentLevel::Finalized, - Confirmed => CommitmentLevel::Confirmed, - Processed => CommitmentLevel::Processed, - } -} - -/// Detects if a LaserstreamError indicates the client has fallen behind the -/// stream and cannot catch up. This occurs when the client cannot consume -/// messages fast enough and falls more than 500 slots behind. -fn is_fallen_behind_error(err: &LaserstreamError) -> bool { - match err { - LaserstreamError::Status(status) => { - status.code() == Code::DataLoss - && status - .message() - .to_ascii_lowercase() - .contains("fallen behind") - } - _ => false, - } -} diff --git a/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/actor.rs b/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/actor.rs new file mode 100644 index 000000000..676e41bc7 --- /dev/null +++ b/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/actor.rs @@ -0,0 +1,716 @@ +use std::{ + fmt, + sync::{ + atomic::{AtomicU16, AtomicU64, Ordering}, + Arc, + }, + time::Duration, +}; + +use helius_laserstream::{ + grpc::{subscribe_update::UpdateOneof, CommitmentLevel, SubscribeUpdate}, + LaserstreamConfig, LaserstreamError, +}; +use magicblock_core::logger::log_trace_debug; +use magicblock_metrics::metrics::{ + inc_account_subscription_account_updates_count, + inc_per_program_account_updates_count, + inc_program_subscription_account_updates_count, +}; +use solana_account::Account; +use solana_commitment_config::CommitmentLevel as SolanaCommitmentLevel; +use solana_pubkey::Pubkey; +use solana_sdk_ids::sysvar::clock; +use tokio::sync::{mpsc, oneshot}; +use tonic::Code; +use tracing::*; + +use super::{ + LaserResult, SharedSubscriptions, StreamFactory, StreamHandle, + StreamManager, StreamManagerConfig, StreamUpdateSource, +}; +use crate::remote_account_provider::{ + chain_rpc_client::{ChainRpcClient, ChainRpcClientImpl}, + chain_slot::ChainSlot, + pubsub_common::{ + ChainPubsubActorMessage, MESSAGE_CHANNEL_SIZE, + SUBSCRIPTION_UPDATE_CHANNEL_SIZE, + }, + RemoteAccountProviderError, RemoteAccountProviderResult, + SubscriptionUpdate, +}; + +// ----------------- +// Slots +// ----------------- +/// Shared slot tracking for chain slot synchronization. +#[derive(Debug)] +pub struct Slots { + /// The current slot on chain, shared with RemoteAccountProvider. + /// Updated via `update()` when slot updates are received from GRPC. + /// Metrics are automatically captured on updates. + pub chain_slot: ChainSlot, + /// Whether this GRPC endpoint supports backfilling subscription updates. + pub supports_backfill: bool, +} + +// ----------------- +// AccountUpdateSource +// ----------------- +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum AccountUpdateSource { + Account, + Program, +} + +impl fmt::Display for AccountUpdateSource { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Account => write!(f, "account"), + Self::Program => write!(f, "program"), + } + } +} + +// ----------------- +// ChainLaserActor +// ----------------- +/// ChainLaserActor manages gRPC subscriptions to Helius Laser +/// or Triton endpoints. +/// +/// ## Subscription Lifecycle +/// +/// 1. **Requested**: User calls `subscribe(pubkey)`. +/// 2. **Active**: The pubkey is immediately forwarded to the +/// [StreamManager] which handles stream creation/chunking. +/// 3. **Updates**: Account updates flow back via the streams +/// and are forwarded to the consumer. +/// +/// ## Stream Management +/// +/// Stream creation, chunking, promotion, and optimization are +/// fully delegated to [StreamManager]. +/// +/// ## Reconnection Behavior +/// +/// - If a stream ends unexpectedly, `signal_connection_issue()` +/// is called. +/// - The actor sends an abort signal to the submux, which +/// triggers reconnection. +/// - The actor itself doesn't attempt to reconnect; it relies +/// on external recovery. +pub struct ChainLaserActor> { + /// Manager for creating and polling laser streams + stream_manager: StreamManager, + /// Receives subscribe/unsubscribe messages to this actor + messages_receiver: mpsc::Receiver, + /// Sends updates for any account subscription that is + /// received via the Laser client subscription mechanism + subscription_updates_sender: mpsc::Sender, + /// The commitment level to use for subscriptions + commitment: CommitmentLevel, + /// Channel used to signal connection issues to the submux + abort_sender: mpsc::Sender<()>, + /// Slot tracking for chain slot synchronization and + /// activation lookback + slots: Slots, + /// Unique client ID including the gRPC provider name for + /// this actor instance used in logs and metrics + client_id: String, + /// RPC client for diagnostics (e.g., fetching slot when + /// falling behind) + rpc_client: ChainRpcClientImpl, +} + +impl ChainLaserActor { + pub fn new_from_url( + pubsub_url: &str, + client_id: &str, + api_key: &str, + commitment: SolanaCommitmentLevel, + abort_sender: mpsc::Sender<()>, + slots: Slots, + rpc_client: ChainRpcClientImpl, + ) -> ( + Self, + mpsc::Sender, + mpsc::Receiver, + SharedSubscriptions, + ) { + let channel_options = helius_laserstream::ChannelOptions { + connect_timeout_secs: Some(5), + http2_keep_alive_interval_secs: Some(15), + tcp_keepalive_secs: Some(30), + ..Default::default() + }; + let laser_client_config = LaserstreamConfig { + api_key: api_key.to_string(), + endpoint: pubsub_url.to_string(), + max_reconnect_attempts: Some(4), + channel_options, + replay: true, + }; + Self::new( + client_id, + laser_client_config, + commitment, + abort_sender, + slots, + rpc_client, + ) + } + + pub fn new( + client_id: &str, + laser_client_config: LaserstreamConfig, + commitment: SolanaCommitmentLevel, + abort_sender: mpsc::Sender<()>, + slots: Slots, + rpc_client: ChainRpcClientImpl, + ) -> ( + Self, + mpsc::Sender, + mpsc::Receiver, + SharedSubscriptions, + ) { + let stream_factory = super::StreamFactoryImpl::new(laser_client_config); + Self::with_stream_factory( + client_id, + stream_factory, + commitment, + abort_sender, + slots, + rpc_client, + ) + } +} + +impl> ChainLaserActor { + /// Create actor with a custom stream factory (for testing) + pub fn with_stream_factory( + client_id: &str, + stream_factory: S, + commitment: SolanaCommitmentLevel, + abort_sender: mpsc::Sender<()>, + slots: Slots, + rpc_client: ChainRpcClientImpl, + ) -> ( + Self, + mpsc::Sender, + mpsc::Receiver, + SharedSubscriptions, + ) { + let (subscription_updates_sender, subscription_updates_receiver) = + mpsc::channel(SUBSCRIPTION_UPDATE_CHANNEL_SIZE); + let (messages_sender, messages_receiver) = + mpsc::channel(MESSAGE_CHANNEL_SIZE); + let commitment = grpc_commitment_from_solana(commitment); + + let chain_slot = if slots.supports_backfill { + Some(slots.chain_slot.clone()) + } else { + None + }; + let stream_manager = StreamManager::new( + StreamManagerConfig::default(), + stream_factory, + chain_slot, + client_id.to_string(), + ); + let shared_subscriptions = Arc::clone(stream_manager.subscriptions()); + + let me = Self { + stream_manager, + messages_receiver, + subscription_updates_sender, + commitment, + abort_sender, + slots, + client_id: client_id.to_string(), + rpc_client, + }; + + ( + me, + messages_sender, + subscription_updates_receiver, + shared_subscriptions, + ) + } + + #[allow(dead_code)] + #[instrument(skip(self), fields(client_id = %self.client_id))] + fn shutdown(&mut self) { + info!("Shutting down laser actor"); + Self::clear_subscriptions(&mut self.stream_manager); + } + + #[instrument(skip(self), fields(client_id = %self.client_id))] + pub async fn run(mut self) { + loop { + tokio::select! { + msg = self.messages_receiver.recv() => { + match msg { + Some(msg) => { + if self.handle_msg(msg).await { + break; + } + } + None => break, + } + }, + update = self.stream_manager.next_update(), if self.stream_manager.has_any_subscriptions() => { + match update { + Some((src, result)) => { + self.handle_stream_result( + src, result, + ).await; + } + None => { + debug!( + "Subscription stream ended" + ); + Self::signal_connection_issue( + &mut self.stream_manager, + &self.abort_sender, + &self.client_id, + ) + .await; + } + } + }, + } + } + } + + async fn handle_msg(&mut self, msg: ChainPubsubActorMessage) -> bool { + use ChainPubsubActorMessage::*; + match msg { + AccountSubscribe { + pubkey, response, .. + } => { + self.add_sub(pubkey, response).await; + false + } + AccountUnsubscribe { pubkey, response } => { + self.remove_sub(&pubkey, response); + false + } + ProgramSubscribe { pubkey, response } => { + let result = self + .stream_manager + .add_program_subscription(pubkey, &self.commitment) + .await; + let _ = response.send(result).inspect_err(|_| { + warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response"); + }); + false + } + Reconnect { response } => { + // We cannot do much more here to _reconnect_ since we will do so once we create + // subscriptions again. + // Subscriptions were already cleared when the connection issue was signaled. + let _ = response.send(Ok(())).inspect_err(|_| { + warn!( + client_id = self.client_id, + "Failed to send reconnect response" + ); + }); + false + } + Shutdown { response } => { + info!(client_id = self.client_id, "Received Shutdown message"); + Self::clear_subscriptions(&mut self.stream_manager); + let _ = response.send(Ok(())).inspect_err(|_| { + warn!( + client_id = self.client_id, + "Failed to send shutdown response" + ); + }); + true + } + } + } + + /// Subscribes to the given pubkey immediately by forwarding + /// to the stream manager. + async fn add_sub( + &mut self, + pubkey: Pubkey, + sub_response: oneshot::Sender>, + ) { + if self.stream_manager.is_subscribed(&pubkey) { + debug!( + pubkey = %pubkey, + "Already subscribed to account" + ); + sub_response.send(Ok(())).unwrap_or_else(|_| { + warn!(pubkey = %pubkey, "Failed to send already subscribed response"); + }); + return; + } + + let from_slot = self.compute_from_slot(); + let result = self + .stream_manager + .account_subscribe(&[pubkey], &self.commitment, from_slot) + .await; + + let response = match result { + Ok(()) => Ok(()), + Err(e) => { + error!( + pubkey = %pubkey, + error = ?e, + "Failed to subscribe to account" + ); + Err(e) + } + }; + sub_response.send(response).unwrap_or_else(|_| { + warn!( + pubkey = %pubkey, + "Failed to send subscribe response" + ); + }); + } + + /// Removes a subscription and forwards to the stream manager. + fn remove_sub( + &mut self, + pubkey: &Pubkey, + unsub_response: oneshot::Sender>, + ) { + if self.stream_manager.is_subscribed(pubkey) { + self.stream_manager.account_unsubscribe(&[*pubkey]); + trace!( + pubkey = %pubkey, + "Unsubscribed from account" + ); + unsub_response.send(Ok(())).unwrap_or_else(|_| { + warn!(pubkey = %pubkey, "Failed to send unsubscribe response"); + }); + } else { + unsub_response + .send(Err( + RemoteAccountProviderError::AccountSubscriptionDoesNotExist( + pubkey.to_string(), + ), + )) + .unwrap_or_else(|_| { + warn!(pubkey = %pubkey, "Failed to send unsubscribe response"); + }); + } + } + + /// Computes a `from_slot` for backfilling based on the + /// current chain slot. Returns `None` if backfilling is not + /// supported or the slot is still 0. + fn compute_from_slot(&self) -> Option { + if !self.slots.supports_backfill { + return None; + } + Some(self.slots.chain_slot.compute_from_slot()) + } + + /// Handles an update from any subscription stream. + #[instrument(skip(self), fields(client_id = %self.client_id))] + async fn handle_stream_result( + &mut self, + src: StreamUpdateSource, + result: LaserResult, + ) { + let update_source = match src { + StreamUpdateSource::Account => AccountUpdateSource::Account, + StreamUpdateSource::Program => AccountUpdateSource::Program, + }; + match result { + Ok(subscribe_update) => { + self.process_subscription_update( + subscribe_update, + update_source, + ) + .await; + } + Err(err) => { + let label = match src { + StreamUpdateSource::Account => "account update", + StreamUpdateSource::Program => "program subscription", + }; + self.handle_stream_error(&err, label).await; + } + } + } + + /// Common error handling for stream errors. Detects "fallen + /// behind" errors and spawns diagnostics to compare our last + /// known slot with the actual chain slot via RPC. + async fn handle_stream_error( + &mut self, + err: &LaserstreamError, + source: &str, + ) { + if is_fallen_behind_error(err) { + self.spawn_fallen_behind_diagnostics(source); + } + + error!( + error = ?err, + slots = ?self.slots, + "Error in {} stream", + source, + ); + Self::signal_connection_issue( + &mut self.stream_manager, + &self.abort_sender, + &self.client_id, + ) + .await; + } + + /// Spawns an async task to fetch the current chain slot via RPC and log + /// how far behind we were when the "fallen behind" error occurred. + /// It also updates the current chain slot in our `chain_slot` tracker to + /// the fetched slot if it is higher than our last known slot. + fn spawn_fallen_behind_diagnostics(&self, source: &str) { + let chain_slot = self.slots.chain_slot.clone(); + let last_chain_slot = chain_slot.load(); + let rpc_client = self.rpc_client.clone(); + let client_id = self.client_id.clone(); + let source = source.to_string(); + + const TIMEOUT_SECS: u64 = 5; + // At 2.5 slots per sec when we factor by 5 we allow + // double the lag that would be caused by the max timeout alone + const MAX_ALLOWED_LAG_SLOTS: u64 = TIMEOUT_SECS * 5; + + tokio::spawn(async move { + let rpc_result = tokio::time::timeout( + Duration::from_secs(TIMEOUT_SECS), + rpc_client.get_slot(), + ) + .await; + + match rpc_result { + Ok(Ok(rpc_chain_slot)) => { + let slot_lag = + rpc_chain_slot.saturating_sub(last_chain_slot); + chain_slot.update(rpc_chain_slot); + if slot_lag > MAX_ALLOWED_LAG_SLOTS { + warn!( + %client_id, + last_chain_slot, + rpc_chain_slot, + slot_lag, + source, + "gRPC reportedly fell behind (DataLoss) due to chain_slot lagging" + ); + } + } + Ok(Err(rpc_err)) => { + debug!( + %client_id, + last_chain_slot, + error = ?rpc_err, + source, + "Failed to fetch RPC slot for DataLoss diagnostics" + ); + } + Err(_timeout) => { + debug!( + %client_id, + last_chain_slot, + source, + "Timeout fetching RPC slot for DataLoss diagnostics" + ); + } + } + }); + } + + fn clear_subscriptions(stream_manager: &mut StreamManager) { + stream_manager.clear_account_subscriptions(); + stream_manager.clear_program_subscriptions(); + } + + /// Signals a connection issue by clearing all subscriptions + /// and sending a message on the abort channel. + /// NOTE: the laser client should handle reconnects + /// internally, but we add this as a backup in case it is + /// unable to do so + #[instrument(skip(stream_manager, abort_sender), fields(client_id = %client_id))] + async fn signal_connection_issue( + stream_manager: &mut StreamManager, + abort_sender: &mpsc::Sender<()>, + client_id: &str, + ) { + static SIGNAL_CONNECTION_COUNT: AtomicU16 = AtomicU16::new(0); + log_trace_debug( + "Signaling connection issue", + "Signaled connection issue", + &client_id, + &RemoteAccountProviderError::ConnectionDisrupted, + 100, + &SIGNAL_CONNECTION_COUNT, + ); + + Self::clear_subscriptions(stream_manager); + + // Use try_send to avoid blocking and naturally + // coalesce signals + let _ = abort_sender.try_send(()).inspect_err(|err| { + if !matches!(err, mpsc::error::TrySendError::Full(_)) { + error!( + error = ?err, + "Failed to signal connection issue" + ); + } + }); + } + + /// Processes a subscription update from either account or program streams. + /// We verified via a script that we get an update with Some(Account) when it is + /// closed. In that case lamports == 0 and owner is the system program. + /// Thus an update of `None` is not expected and can be ignored. + /// See: https://gist.github.com/thlorenz/d3d1a380678a030b3e833f8f979319ae + #[instrument( + skip(self, update), + fields( + client_id = %self.client_id, + pubkey = tracing::field::Empty, + slot = tracing::field::Empty, + source = %source, + ) + )] + async fn process_subscription_update( + &mut self, + update: SubscribeUpdate, + source: AccountUpdateSource, + ) { + let Some(update_oneof) = update.update_oneof else { + return; + }; + + // Handle slot updates - update chain_slot to max of current and received + if let UpdateOneof::Slot(slot_update) = &update_oneof { + self.slots.chain_slot.update(slot_update.slot); + return; + } + + let UpdateOneof::Account(acc) = update_oneof else { + return; + }; + + let (Some(account), slot) = (acc.account, acc.slot) else { + return; + }; + + let Ok(pubkey) = Pubkey::try_from(account.pubkey) else { + error!("Failed to parse pubkey"); + return; + }; + + tracing::Span::current() + .record("pubkey", tracing::field::display(pubkey)); + + let log_trace = if tracing::enabled!(tracing::Level::TRACE) { + if pubkey.eq(&clock::ID) { + static TRACE_CLOCK_COUNT: AtomicU64 = AtomicU64::new(0); + TRACE_CLOCK_COUNT + .fetch_add(1, Ordering::Relaxed) + .is_multiple_of(100) + } else { + true + } + } else { + false + }; + + tracing::Span::current().record("slot", slot); + + if log_trace { + trace!("Received subscription update"); + } + + let Ok(owner) = Pubkey::try_from(account.owner) else { + error!(pubkey = %pubkey, "Failed to parse owner pubkey"); + return; + }; + + if matches!(source, AccountUpdateSource::Program) { + inc_per_program_account_updates_count( + &self.client_id, + &owner.to_string(), + ); + } + + if !self.stream_manager.is_subscribed(&pubkey) { + return; + } + + let account = Account { + lamports: account.lamports, + data: account.data, + owner, + executable: account.executable, + rent_epoch: account.rent_epoch, + }; + let subscription_update = SubscriptionUpdate { + pubkey, + slot, + account: Some(account), + }; + + if pubkey != clock::ID { + match source { + AccountUpdateSource::Account => { + inc_account_subscription_account_updates_count( + &self.client_id, + ); + } + AccountUpdateSource::Program => { + inc_program_subscription_account_updates_count( + &self.client_id, + ); + } + } + } + + self.subscription_updates_sender + .send(subscription_update) + .await + .unwrap_or_else(|_| { + error!(pubkey = %pubkey, "Failed to send subscription update"); + }); + } +} + +// ----------------- +// Helpers +// ----------------- +fn grpc_commitment_from_solana( + commitment: SolanaCommitmentLevel, +) -> CommitmentLevel { + use SolanaCommitmentLevel::*; + match commitment { + Finalized => CommitmentLevel::Finalized, + Confirmed => CommitmentLevel::Confirmed, + Processed => CommitmentLevel::Processed, + } +} + +/// Detects if a LaserstreamError indicates the client has fallen behind the +/// stream and cannot catch up. This occurs when the client cannot consume +/// messages fast enough and falls more than 500 slots behind. +fn is_fallen_behind_error(err: &LaserstreamError) -> bool { + match err { + LaserstreamError::Status(status) => { + status.code() == Code::DataLoss + && status + .message() + .to_ascii_lowercase() + .contains("fallen behind") + } + _ => false, + } +} diff --git a/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/mock.rs b/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/mock.rs new file mode 100644 index 000000000..04eb66fa0 --- /dev/null +++ b/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/mock.rs @@ -0,0 +1,140 @@ +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use helius_laserstream::{grpc::SubscribeRequest, LaserstreamError}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use super::{LaserResult, StreamFactory}; +use crate::remote_account_provider::{ + chain_laser_actor::{LaserStreamWithHandle, StreamHandle}, + RemoteAccountProviderResult, +}; + +/// A test mock that captures subscription requests and allows driving +/// streams programmatically. +#[derive(Clone)] +pub struct MockStreamFactory { + /// Every `SubscribeRequest` passed to `subscribe()` is recorded + /// here so tests can assert on filter contents, commitment levels, + /// etc. + captured_requests: Arc>>, + + /// Requests sent through a `MockStreamHandle::write()` call are + /// recorded here so tests can verify handle-driven updates. + handle_requests: Arc>>, + + /// A sender that the test uses to push `LaserResult` items into + /// the streams returned by `subscribe()`. + /// Each call to `subscribe()` creates a new mpsc channel; the rx + /// side becomes the returned stream, and the tx side is stored + /// here so the test can drive updates. + stream_senders: Arc>>>>, +} + +impl MockStreamFactory { + /// Create a new mock stream factory + pub fn new() -> Self { + Self { + captured_requests: Arc::new(Mutex::new(Vec::new())), + handle_requests: Arc::new(Mutex::new(Vec::new())), + stream_senders: Arc::new(Mutex::new(Vec::new())), + } + } + + /// Get the captured subscription requests (from `subscribe()`) + pub fn captured_requests(&self) -> Vec { + self.captured_requests.lock().unwrap().clone() + } + + /// Get the requests sent through stream handles (from + /// `handle.write()`) + pub fn handle_requests(&self) -> Vec { + self.handle_requests.lock().unwrap().clone() + } + + /// Push an error update to a specific stream + pub fn push_error_to_stream(&self, idx: usize, error: LaserstreamError) { + let senders = self.stream_senders.lock().unwrap(); + if let Some(sender) = senders.get(idx) { + let _ = sender.send(Err(error)); + } + } + + /// Push an update to a specific stream by index + pub fn push_update_to_stream(&self, idx: usize, update: LaserResult) { + let senders = self.stream_senders.lock().unwrap(); + if let Some(sender) = senders.get(idx) { + let _ = sender.send(update); + } + } + + /// Get the number of active streams + pub fn active_stream_count(&self) -> usize { + self.stream_senders.lock().unwrap().len() + } + + /// Close a specific stream by index + pub fn close_stream(&self, idx: usize) { + let mut senders = self.stream_senders.lock().unwrap(); + if idx < senders.len() { + senders.remove(idx); + } + } +} + +impl Default for MockStreamFactory { + fn default() -> Self { + Self::new() + } +} + +/// Mock handle that records write requests and drains them into the +/// shared `handle_requests` vec on the factory. +#[derive(Clone)] +pub struct MockStreamHandle { + handle_requests: Arc>>, +} + +#[async_trait] +impl StreamHandle for MockStreamHandle { + async fn write( + &self, + request: SubscribeRequest, + ) -> Result<(), LaserstreamError> { + self.handle_requests.lock().unwrap().push(request); + Ok(()) + } +} + +#[async_trait] +impl StreamFactory for MockStreamFactory { + async fn subscribe( + &self, + request: SubscribeRequest, + ) -> RemoteAccountProviderResult> + { + // Record the initial subscribe request + self.captured_requests.lock().unwrap().push(request.clone()); + + // Create a channel for driving LaserResult items into the + // stream + let (stream_tx, stream_rx) = mpsc::unbounded_channel::(); + let stream = Box::pin(UnboundedReceiverStream::new(stream_rx)); + + let stream_tx = Arc::new(stream_tx); + self.stream_senders.lock().unwrap().push(stream_tx); + + // The handle shares the factory's handle_requests vec so + // every write is visible to tests immediately. + let handle = MockStreamHandle { + handle_requests: Arc::clone(&self.handle_requests), + }; + + // Write the actual request to the handle (mirroring + // production behaviour of sending it over the network). + handle.write(request).await.unwrap(); + + Ok(LaserStreamWithHandle { stream, handle }) + } +} diff --git a/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/mod.rs b/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/mod.rs new file mode 100644 index 000000000..4d6f0b2d6 --- /dev/null +++ b/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/mod.rs @@ -0,0 +1,72 @@ +use std::{collections::HashSet, pin::Pin, sync::Arc}; + +use futures_util::Stream; +use helius_laserstream::{ + grpc::{SubscribeRequest, SubscribeUpdate}, + LaserstreamError, +}; +use parking_lot::RwLock; +use solana_pubkey::Pubkey; +use tokio::time::Duration; + +pub use self::{ + actor::{ChainLaserActor, Slots}, + stream_factory::{ + LaserStreamWithHandle, StreamFactory, StreamFactoryImpl, StreamHandle, + StreamHandleImpl, + }, + stream_manager::{StreamManager, StreamManagerConfig, StreamUpdateSource}, +}; +use crate::remote_account_provider::{ + RemoteAccountProviderError, RemoteAccountProviderResult, +}; + +pub type SharedSubscriptions = Arc>>; + +mod actor; +#[cfg(test)] +mod mock; +mod stream_factory; +mod stream_manager; + +/// Retry a `handle.write(request)` call with linear backoff. +/// +/// Tries up to `MAX_RETRIES` (5) times with 50 ms × attempt +/// backoff. Returns the original error after all retries are +/// exhausted. +pub(crate) async fn write_with_retry( + handle: &S, + task: &str, + request: SubscribeRequest, +) -> RemoteAccountProviderResult<()> { + const MAX_RETRIES: usize = 5; + let mut retries = MAX_RETRIES; + let initial_retries = retries; + + loop { + match handle.write(request.clone()).await { + Ok(()) => return Ok(()), + Err(err) => { + if retries > 0 { + retries -= 1; + let backoff_ms = 50u64 * (initial_retries - retries) as u64; + tokio::time::sleep(Duration::from_millis(backoff_ms)).await; + continue; + } + return Err( + RemoteAccountProviderError::GrpcSubscriptionUpdateFailed( + task.to_string(), + MAX_RETRIES, + format!("{err} ({err:?})"), + ), + ); + } + } + } +} + +/// Result of a laser stream operation +pub type LaserResult = Result; + +/// A laser stream of subscription updates +pub type LaserStream = Pin + Send>>; diff --git a/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_factory.rs b/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_factory.rs new file mode 100644 index 000000000..04b89b555 --- /dev/null +++ b/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_factory.rs @@ -0,0 +1,101 @@ +use async_trait::async_trait; +use helius_laserstream::{ + grpc::SubscribeRequest, LaserstreamError, + StreamHandle as HeliusStreamHandle, +}; + +use super::LaserStream; +use crate::remote_account_provider::RemoteAccountProviderResult; + +/// A trait to represent the [HeliusStreamHandle]. +/// This is needed since we cannot create the helius one since +/// [helius_laserstream::StreamHandle::write_tx] is private and there is no constructor. +#[async_trait] +pub trait StreamHandle { + /// Send a new subscription request to update the active subscription. + async fn write( + &self, + request: SubscribeRequest, + ) -> Result<(), LaserstreamError>; +} + +pub struct StreamHandleImpl { + pub handle: HeliusStreamHandle, +} + +#[async_trait] +impl StreamHandle for StreamHandleImpl { + async fn write( + &self, + request: SubscribeRequest, + ) -> Result<(), LaserstreamError> { + // This async operation gets forwarded to the underlying subscription sender of the laser + // client and completes after the given item has been fully processed into the sink, + // including flushing. + // The assumption is that at that point it has been processed on the receiver end and the + // subscription is updated. + // See: https://github.com/helius-labs/laserstream-sdk/blob/v0.2.2/rust/src/client.rs#L196-L201 + self.handle.write(request).await + } +} + +/// Abstraction over stream creation for testability +#[async_trait] +pub trait StreamFactory: Send + Sync + 'static { + /// Create a stream for the given subscription request. + async fn subscribe( + &self, + request: SubscribeRequest, + ) -> RemoteAccountProviderResult>; +} + +pub struct LaserStreamWithHandle { + pub(crate) stream: LaserStream, + pub(crate) handle: S, +} + +/// Production stream factory that wraps helius client subscribe +pub struct StreamFactoryImpl { + config: helius_laserstream::LaserstreamConfig, +} + +impl StreamFactoryImpl { + pub fn new(config: helius_laserstream::LaserstreamConfig) -> Self { + Self { config } + } +} + +#[async_trait] +impl StreamFactory for StreamFactoryImpl { + /// This implementation creates the underlying gRPC stream with the request (which + /// returns immediately) and then writes the same request to the handle so it is sent + /// over the network before returning asynchronously. + async fn subscribe( + &self, + request: SubscribeRequest, + ) -> RemoteAccountProviderResult> + { + // NOTE: this call returns immediately yielding subscription errors and account updates + // via the stream, thus the subscription has not been received yet upstream + // NOTE: we need to use the same request as otherwise there is a potential race condition + // where our `write` below completes before this call and the request is overwritten + // with an empty one. + // Inside helius_laserstream::client::subscribe the request is cloned for that reason to + // be able to attempt the request multiple times during reconnect attempts. + // Given our requests contain a max of about 2_000 pubkeys, an extra clone here is a small + // price to pay to avoid this race condition. + let (stream, handle) = helius_laserstream::client::subscribe( + self.config.clone(), + request.clone(), + ); + let handle = StreamHandleImpl { handle }; + // Write to the handle and await it which at least guarantees that it has + // been sent over the network, even though there is still no guarantee it has been + // processed and that the subscription became active immediately + super::write_with_retry(&handle, "subscribe", request).await?; + Ok(LaserStreamWithHandle { + stream: Box::pin(stream), + handle, + }) + } +} diff --git a/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs b/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs new file mode 100644 index 000000000..5abc9e7cf --- /dev/null +++ b/magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs @@ -0,0 +1,1712 @@ +use std::collections::{HashMap, HashSet}; + +use helius_laserstream::grpc::{ + CommitmentLevel, SubscribeRequest, SubscribeRequestFilterAccounts, + SubscribeRequestFilterSlots, +}; +use magicblock_metrics::metrics; +use solana_pubkey::Pubkey; +use tokio_stream::StreamMap; + +use super::{ + write_with_retry, LaserResult, LaserStream, LaserStreamWithHandle, + SharedSubscriptions, StreamFactory, +}; +use crate::remote_account_provider::{ + chain_laser_actor::StreamHandle, chain_slot::ChainSlot, + RemoteAccountProviderResult, +}; + +/// Identifies whether a stream update came from an account or +/// program subscription stream. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum StreamUpdateSource { + Account, + Program, +} + +/// Identifies a stream within the [StreamMap]. +/// +/// Each variant maps to a stream category. The `usize` index +/// corresponds to the position within the respective `Vec` of +/// handles. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +enum StreamKey { + CurrentNew, + UnoptimizedOld(usize), + OptimizedOld(usize), + Program, +} + +impl StreamKey { + fn source(&self) -> StreamUpdateSource { + match self { + StreamKey::Program => StreamUpdateSource::Program, + _ => StreamUpdateSource::Account, + } + } +} + +/// Configuration for the generational stream manager. +pub struct StreamManagerConfig { + /// Max subscriptions per optimized old stream chunk. + pub max_subs_in_old_optimized: usize, + /// Max unoptimized old streams before optimization is triggered. + pub max_old_unoptimized: usize, + /// Max subscriptions in the current-new stream before it is + /// promoted to an unoptimized old stream. + pub max_subs_in_new: usize, +} + +impl Default for StreamManagerConfig { + fn default() -> Self { + Self { + max_subs_in_old_optimized: 2000, + max_old_unoptimized: 10, + max_subs_in_new: 200, + } + } +} + +/// Manages the creation and lifecycle of GRPC laser streams. +/// +/// Account subscriptions follow a generational approach: +/// - New subscriptions go into the *current-new* stream. +/// - When the current-new stream exceeds [StreamManagerConfig::max_subs_in_new] it is +/// promoted to the [Self::unoptimized_old_handles] vec and a fresh current-new stream is created. +/// - When [Self::unoptimized_old_handles] exceed [StreamManagerConfig::max_old_unoptimized], +/// optimization is triggered which rebuilds all streams from the +/// `subscriptions` set into [StreamManager::optimized_old_handles] chunked by +/// [StreamManagerConfig::max_subs_in_old_optimized]. +/// +/// Unsubscribe only removes from the [Self::subscriptions] HashSet — it +/// never touches streams. Updates for unsubscribed pubkeys are +/// ignored at the actor level. +/// Unsubscribed accounts are dropped as part of optimization. +/// +/// Streams are stored in a persistent [StreamMap] keyed by +/// [StreamKey]. The map is only updated when stream topology +/// changes (subscribe, promote, optimize, clear). The +/// corresponding handles are stored separately for use in +/// [Self::update_subscriptions]. +#[allow(unused)] +pub struct StreamManager> { + /// Configures limits for stream management + config: StreamManagerConfig, + /// The factory used to create streams + stream_factory: SF, + /// The canonical set of currently active account subscriptions. + /// These include subscriptions maintained across the different set + /// of streams. + subscriptions: SharedSubscriptions, + /// Pubkeys that are part of the current-new stream's filter. + current_new_subs: HashSet, + + // -- Handles (needed for update_subscriptions) -- + /// Handle for the current-new stream. + current_new_handle: Option, + /// Handles for unoptimized old streams. + unoptimized_old_handles: Vec, + /// Handles for optimized old streams. + optimized_old_handles: Vec, + /// Handle + pubkey set for program subscriptions. + program_sub: Option<(HashSet, S)>, + + // -- All streams live here. -- + /// Streams separated from the handles in order to allow using them + /// inside a StreamMap + /// They are addressed via the StreamKey which includes an index for + /// [Self::unoptimized_old_handles] and [Self::optimized_old_handles]. + /// The key index matches the index of the corresponding vec. + /// Persistent stream map polled by [Self::next_update]. + /// Updated only when stream topology changes. + stream_map: StreamMap, + + /// Optional chain slot tracker for computing `from_slot` + /// during optimization. When set, optimized streams will + /// request backfill from a lookback window before the + /// current chain slot so that no updates are missed while + /// the old streams are being replaced. + /// For [Self::account_subscribe], `from_slot` is provided by + /// the caller (actor). + chain_slot: Option, + + /// Client identifier used as a label for stream metrics. + client_id: String, +} + +#[allow(unused)] +impl> StreamManager { + pub fn new( + config: StreamManagerConfig, + stream_factory: SF, + chain_slot: Option, + client_id: String, + ) -> Self { + Self { + config, + stream_factory, + subscriptions: Default::default(), + current_new_subs: HashSet::new(), + current_new_handle: None, + unoptimized_old_handles: Vec::new(), + optimized_old_handles: Vec::new(), + program_sub: None, + stream_map: StreamMap::new(), + chain_slot, + client_id, + } + } + + // --------------------- + // Account subscription + // --------------------- + + /// Subscribe to account updates for the given pubkeys. + /// + /// Each pubkey is added to [Self::subscriptions] and to the + /// current-new stream. If the current-new stream exceeds + /// [StreamManagerConfig::max_subs_in_new] it is promoted and + /// a fresh one is created. If unoptimized old handles exceed + /// [StreamManagerConfig::max_old_unoptimized], optimization + /// is triggered. + pub async fn account_subscribe( + &mut self, + pubkeys: &[Pubkey], + commitment: &CommitmentLevel, + from_slot: Option, + ) -> RemoteAccountProviderResult<()> { + // Filter out pubkeys already in subscriptions. + let new_pks: Vec = { + let subs = self.subscriptions.read(); + pubkeys + .iter() + .filter(|pk| !subs.contains(pk)) + .copied() + .collect() + }; + + if new_pks.is_empty() { + return Ok(()); + } + + // Update the current-new stream with the full + // current_new_subs filter (either create new if doesn't + // exist, or update existing via write). + // We tentatively add new_pks to current_new_subs so the + // request includes them, but only persist into subscriptions + // after the stream update succeeds. + for pk in &new_pks { + self.current_new_subs.insert(*pk); + } + + let result = if let Some(handle) = &self.current_new_handle { + let request = Self::build_account_request( + &self.current_new_subs.iter().collect::>(), + commitment, + from_slot, + ); + write_with_retry(handle, "account_subscribe", request).await + } else { + let pks: Vec = + self.current_new_subs.iter().copied().collect(); + let pk_refs: Vec<&Pubkey> = pks.iter().collect(); + self.insert_current_new_stream(&pk_refs, commitment, from_slot) + .await + }; + + // Revert tentative current_new_subs additions if the stream update failed + if result.is_err() { + for pk in &new_pks { + self.current_new_subs.remove(pk); + } + result?; + } + + // Update active subscriptions with new pubkeys only after stream update + { + let mut subs = self.subscriptions.write(); + for pk in &new_pks { + subs.insert(*pk); + } + } + + // Promote if current-new exceeds threshold. + // The entire current-new stream (including any overflow) is + // moved to unoptimized old. A fresh, empty current-new will + // be created on the next subscribe call. + if self.current_new_subs.len() > self.config.max_subs_in_new { + // Move current-new to unoptimized old. + if let Some(stream) = self.stream_map.remove(&StreamKey::CurrentNew) + { + let idx = self.unoptimized_old_handles.len(); + self.stream_map + .insert(StreamKey::UnoptimizedOld(idx), stream); + } + if let Some(handle) = self.current_new_handle.take() { + self.unoptimized_old_handles.push(handle); + } + self.current_new_subs.clear(); + + // If unoptimized old handles exceed the limit, + // optimize. + if self.unoptimized_old_handles.len() + > self.config.max_old_unoptimized + { + self.optimize(commitment).await?; + } + + self.update_stream_metrics(); + } + + Ok(()) + } + + /// Unsubscribe the given pubkeys. + /// + /// Removes them from the `subscriptions` HashSet only — streams + /// are never modified. Updates for these pubkeys will be ignored + /// by the actor. + pub fn account_unsubscribe(&mut self, pubkeys: &[Pubkey]) { + let mut subs = self.subscriptions.write(); + for pk in pubkeys { + subs.remove(pk); + } + } + + /// Clears all account subscriptions and drops all account + /// streams. + pub fn clear_account_subscriptions(&mut self) { + self.subscriptions.write().clear(); + self.current_new_subs.clear(); + self.current_new_handle = None; + self.stream_map.remove(&StreamKey::CurrentNew); + for i in 0..self.unoptimized_old_handles.len() { + self.stream_map.remove(&StreamKey::UnoptimizedOld(i)); + } + self.unoptimized_old_handles.clear(); + for i in 0..self.optimized_old_handles.len() { + self.stream_map.remove(&StreamKey::OptimizedOld(i)); + } + self.optimized_old_handles.clear(); + self.update_stream_metrics(); + } + + /// Returns `true` if any account stream exists. + pub fn has_account_subscriptions(&self) -> bool { + self.current_new_handle.is_some() + || !self.unoptimized_old_handles.is_empty() + || !self.optimized_old_handles.is_empty() + } + + /// Polls all streams in the [StreamMap], returning the next + /// available update tagged with its source. + /// Returns `None` when the map is empty. + pub async fn next_update( + &mut self, + ) -> Option<(StreamUpdateSource, LaserResult)> { + use tokio_stream::StreamExt; + let (key, result) = self.stream_map.next().await?; + Some((key.source(), result)) + } + + /// Returns `true` if any stream (account or program) exists. + pub fn has_any_subscriptions(&self) -> bool { + !self.stream_map.is_empty() + } + + /// Computes a `from_slot` for backfilling based on the + /// current chain slot. Returns `None` if no chain slot + /// tracker is available + fn compute_from_slot(&self) -> Option { + self.chain_slot.as_ref().map(ChainSlot::compute_from_slot) + } + + /// Emits the current optimized/unoptimized stream counts as + /// metrics. + fn update_stream_metrics(&self) { + metrics::set_grpc_optimized_streams_gauge( + &self.client_id, + self.optimized_old_handles.len(), + ); + metrics::set_grpc_unoptimized_streams_gauge( + &self.client_id, + self.unoptimized_old_handles.len(), + ); + } + + /// Rebuild all account streams from `subscriptions`. + /// + /// 1. Chunk `subscriptions` into groups of + /// `max_subs_in_old_optimized`. + /// 2. Create a new stream for each chunk → + /// `optimized_old_handles`. + /// 3. Clear `unoptimized_old_handles`. + /// 4. Reset the current-new stream (empty filter). + /// + /// NOTE: the caller is expected to clear all subscriptions if optimize + /// and thus the method invoking it ([Self::account_subscribe]) returns an error (the actor does so). + /// Otherwise streams may end up in an inconsistent state if a subscription attempt + /// fails. + pub async fn optimize( + &mut self, + commitment: &CommitmentLevel, + ) -> RemoteAccountProviderResult<()> { + // Remove all account streams from the map but keep them + // alive until the new optimized streams are created to + // avoid a gap without any active streams (race condition). + let _prev_current_new = self.stream_map.remove(&StreamKey::CurrentNew); + let _prev_unoptimized: Vec<_> = (0..self.unoptimized_old_handles.len()) + .filter_map(|i| { + self.stream_map.remove(&StreamKey::UnoptimizedOld(i)) + }) + .collect(); + let _prev_optimized: Vec<_> = (0..self.optimized_old_handles.len()) + .filter_map(|i| self.stream_map.remove(&StreamKey::OptimizedOld(i))) + .collect(); + + // Collect all active subscriptions and chunk them. + let all_pks: Vec = + self.subscriptions.read().iter().copied().collect(); + + // Build optimized old streams from chunks. + let from_slot = self.compute_from_slot(); + self.optimized_old_handles = Vec::new(); + for (i, chunk) in all_pks + .chunks(self.config.max_subs_in_old_optimized) + .enumerate() + { + let refs: Vec<&Pubkey> = chunk.iter().collect(); + let LaserStreamWithHandle { stream, handle } = self + .stream_factory + .subscribe(Self::build_account_request( + &refs, commitment, from_slot, + )) + .await?; + self.stream_map.insert(StreamKey::OptimizedOld(i), stream); + self.optimized_old_handles.push(handle); + } + + // Clear unoptimized old handles. + self.unoptimized_old_handles.clear(); + + // Reset the current-new stream. + self.current_new_subs.clear(); + self.current_new_handle = None; + + self.update_stream_metrics(); + + // Old streams are dropped here when _prev_* go out of scope, + // after the new optimized streams are already active. + Ok(()) + } + + /// Returns `true` if the pubkey is in the active + /// `subscriptions` set. + pub fn is_subscribed(&self, pubkey: &Pubkey) -> bool { + self.subscriptions.read().contains(pubkey) + } + + // --------------------------------------------------------- + // Accessors — internal state inspection + // --------------------------------------------------------- + + /// Returns a reference to the shared subscriptions. + pub fn subscriptions(&self) -> &SharedSubscriptions { + &self.subscriptions + } + + /// Returns the number of pubkeys in the current-new stream's + /// filter. + fn current_new_sub_count(&self) -> usize { + self.current_new_subs.len() + } + + /// Returns a reference to the current-new stream's pubkey + /// set. + fn current_new_subs(&self) -> &HashSet { + &self.current_new_subs + } + + /// Returns the number of unoptimized old streams. + fn unoptimized_old_stream_count(&self) -> usize { + self.unoptimized_old_handles.len() + } + + /// Returns the number of optimized old streams. + fn optimized_old_stream_count(&self) -> usize { + self.optimized_old_handles.len() + } + + /// Returns the total number of account streams across all + /// generations. + fn account_stream_count(&self) -> usize { + let current = usize::from(self.current_new_handle.is_some()); + self.optimized_old_handles.len() + + self.unoptimized_old_handles.len() + + current + } + + // --------------------------------------------------------- + // Internal helpers + // --------------------------------------------------------- + + /// Build a `SubscribeRequest` for the given account pubkeys. + /// Includes a slot subscription for chain slot + /// synchronisation. + fn build_account_request( + pubkeys: &[&Pubkey], + commitment: &CommitmentLevel, + from_slot: Option, + ) -> SubscribeRequest { + let mut accounts = HashMap::new(); + accounts.insert( + "account_subs".to_string(), + SubscribeRequestFilterAccounts { + account: pubkeys.iter().map(|pk| pk.to_string()).collect(), + ..Default::default() + }, + ); + + let mut slots = HashMap::new(); + slots.insert( + "slot_updates".to_string(), + SubscribeRequestFilterSlots { + filter_by_commitment: Some(true), + ..Default::default() + }, + ); + + SubscribeRequest { + accounts, + slots, + commitment: Some((*commitment).into()), + from_slot, + ..Default::default() + } + } + + /// Create an account stream via the factory and insert it + /// as the current-new stream in the [StreamMap]. + async fn insert_current_new_stream( + &mut self, + pubkeys: &[&Pubkey], + commitment: &CommitmentLevel, + from_slot: Option, + ) -> RemoteAccountProviderResult<()> { + let request = + Self::build_account_request(pubkeys, commitment, from_slot); + let LaserStreamWithHandle { stream, handle } = + self.stream_factory.subscribe(request).await?; + self.stream_map.insert(StreamKey::CurrentNew, stream); + self.current_new_handle = Some(handle); + Ok(()) + } + + /// Adds a program subscription. If the program is already + /// subscribed, this is a no-op. Otherwise, updates the + /// program stream to include all subscribed programs. + pub async fn add_program_subscription( + &mut self, + program_id: Pubkey, + commitment: &CommitmentLevel, + ) -> RemoteAccountProviderResult<()> { + if self + .program_sub + .as_ref() + .is_some_and(|(subs, _)| subs.contains(&program_id)) + { + return Ok(()); + } + + if let Some((mut subscribed_programs, handle)) = self.program_sub.take() + { + subscribed_programs.insert(program_id); + let request = + Self::build_program_request(&subscribed_programs, commitment); + match write_with_retry(&handle, "program_subscribe", request).await + { + Ok(()) => { + self.program_sub = Some((subscribed_programs, handle)); + } + Err(e) => { + self.program_sub = Some((subscribed_programs, handle)); + return Err(e); + } + } + } else { + let mut subscribed_programs = HashSet::new(); + subscribed_programs.insert(program_id); + let LaserStreamWithHandle { stream, handle } = self + .create_program_stream(&subscribed_programs, commitment) + .await?; + self.stream_map.insert(StreamKey::Program, stream); + self.program_sub = Some((subscribed_programs, handle)); + } + + Ok(()) + } + + /// Returns whether there are active program subscriptions. + pub fn has_program_subscriptions(&self) -> bool { + self.program_sub.is_some() + } + + /// Clears all program subscriptions. + pub fn clear_program_subscriptions(&mut self) { + self.stream_map.remove(&StreamKey::Program); + self.program_sub = None; + } + + /// Build a `SubscribeRequest` for the given program IDs. + fn build_program_request( + program_ids: &HashSet, + commitment: &CommitmentLevel, + ) -> SubscribeRequest { + let mut accounts = HashMap::new(); + accounts.insert( + "program_sub".to_string(), + SubscribeRequestFilterAccounts { + owner: program_ids.iter().map(|pk| pk.to_string()).collect(), + ..Default::default() + }, + ); + + SubscribeRequest { + accounts, + commitment: Some((*commitment).into()), + ..Default::default() + } + } + + /// Creates a subscription stream for program updates. + async fn create_program_stream( + &mut self, + program_ids: &HashSet, + commitment: &CommitmentLevel, + ) -> RemoteAccountProviderResult> { + let request = Self::build_program_request(program_ids, commitment); + self.stream_factory.subscribe(request).await + } +} + +#[cfg(test)] +mod tests { + use helius_laserstream::grpc::CommitmentLevel; + use solana_pubkey::Pubkey; + + use super::*; + use crate::remote_account_provider::chain_laser_actor::mock::{ + MockStreamFactory, MockStreamHandle, + }; + + // ----------------- + // Helpers + // ----------------- + fn test_config() -> StreamManagerConfig { + StreamManagerConfig { + max_subs_in_old_optimized: 10, + max_old_unoptimized: 3, + max_subs_in_new: 5, + } + } + + fn create_manager() -> ( + StreamManager, + MockStreamFactory, + ) { + let factory = MockStreamFactory::new(); + let manager = StreamManager::new( + test_config(), + factory.clone(), + None, + "test".to_string(), + ); + (manager, factory) + } + + fn make_pubkeys(n: usize) -> Vec { + (0..n).map(|_| Pubkey::new_unique()).collect() + } + + /// Collect all account pubkey strings from a captured + /// `SubscribeRequest`'s account filters. + fn account_pubkeys_from_request(req: &SubscribeRequest) -> HashSet { + req.accounts + .values() + .flat_map(|f| f.account.iter().cloned()) + .collect() + } + + /// Assert that `subscriptions()` contains exactly `expected` + /// (order-independent, exact count). + fn assert_subscriptions_eq( + mgr: &StreamManager, + expected: &[Pubkey], + ) { + let subs = mgr.subscriptions().read(); + assert_eq!( + subs.len(), + expected.len(), + "expected {} subscriptions, got {}", + expected.len(), + subs.len(), + ); + for pk in expected { + assert!(subs.contains(pk), "subscription set missing pubkey {pk}",); + } + } + + /// Assert that a `SubscribeRequest` filter contains exactly the + /// given pubkeys (order-independent, exact count). + fn assert_request_has_exact_pubkeys( + req: &SubscribeRequest, + expected: &[Pubkey], + ) { + let filter = account_pubkeys_from_request(req); + assert_eq!( + filter.len(), + expected.len(), + "expected {} pubkeys in filter, got {}", + expected.len(), + filter.len(), + ); + for pk in expected { + assert!( + filter.contains(&pk.to_string()), + "request filter missing pubkey {pk}", + ); + } + } + + // --------------------------------------------------------- + // Additional helpers + // --------------------------------------------------------- + + const COMMITMENT: CommitmentLevel = CommitmentLevel::Processed; + + /// Subscribe `n` pubkeys one-at-a-time, returning the created + /// pubkeys. + async fn subscribe_n( + mgr: &mut StreamManager, + n: usize, + ) -> Vec { + let pks = make_pubkeys(n); + mgr.account_subscribe(&pks, &COMMITMENT, None) + .await + .unwrap(); + pks + } + + /// Subscribe pubkeys in batches of `batch` until `total` pubkeys + /// have been subscribed. Returns all created pubkeys. + async fn subscribe_in_batches( + mgr: &mut StreamManager, + total: usize, + batch: usize, + ) -> Vec { + let mut all = Vec::new(); + let mut remaining = total; + while remaining > 0 { + let n = remaining.min(batch); + let pks = make_pubkeys(n); + mgr.account_subscribe(&pks, &COMMITMENT, None) + .await + .unwrap(); + all.extend(pks); + remaining -= n; + } + all + } + + /// Returns the union of all account pubkey strings across all + /// captured requests from `start_idx` onward. + fn all_filter_pubkeys_from( + factory: &MockStreamFactory, + start_idx: usize, + ) -> HashSet { + factory + .captured_requests() + .iter() + .skip(start_idx) + .flat_map(account_pubkeys_from_request) + .collect() + } + + // ------------------------------------------------------------- + // 1. Subscription Tracking + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_subscribe_single_pubkey_adds_to_subscriptions() { + let (mut mgr, factory) = create_manager(); + let pk = Pubkey::new_unique(); + + mgr.account_subscribe(&[pk], &COMMITMENT, None) + .await + .unwrap(); + + assert_subscriptions_eq(&mgr, &[pk]); + + // The subscribe call writes the actual request via the handle + let handle_reqs = factory.handle_requests(); + assert_eq!(handle_reqs.len(), 1); + assert_request_has_exact_pubkeys(&handle_reqs[0], &[pk]); + } + + #[tokio::test] + async fn test_subscribe_multiple_pubkeys_at_once() { + let (mut mgr, factory) = create_manager(); + let pks = make_pubkeys(5); + + mgr.account_subscribe(&pks, &COMMITMENT, None) + .await + .unwrap(); + + assert_subscriptions_eq(&mgr, &pks); + + // The subscribe call writes the actual request via the handle + let handle_reqs = factory.handle_requests(); + assert_eq!(handle_reqs.len(), 1); + assert_request_has_exact_pubkeys(&handle_reqs[0], &pks); + } + + #[tokio::test] + async fn test_subscribe_duplicate_pubkey_is_noop() { + let (mut mgr, factory) = create_manager(); + let pk = Pubkey::new_unique(); + + mgr.account_subscribe(&[pk], &COMMITMENT, None) + .await + .unwrap(); + let calls_after_first = factory.captured_requests().len(); + + mgr.account_subscribe(&[pk], &COMMITMENT, None) + .await + .unwrap(); + + assert_subscriptions_eq(&mgr, &[pk]); + assert_eq!(factory.captured_requests().len(), calls_after_first); + } + + #[tokio::test] + async fn test_subscribe_incremental_calls_accumulate() { + let (mut mgr, factory) = create_manager(); + let pks = make_pubkeys(3); + + mgr.account_subscribe(&[pks[0]], &COMMITMENT, None) + .await + .unwrap(); + mgr.account_subscribe(&[pks[1]], &COMMITMENT, None) + .await + .unwrap(); + mgr.account_subscribe(&[pks[2]], &COMMITMENT, None) + .await + .unwrap(); + + assert_subscriptions_eq(&mgr, &pks); + + // All pubkey additions go through handle.write() which + // accumulates (first via subscribe, then via + // update_subscriptions) + let handle_reqs = factory.handle_requests(); + assert!(!handle_reqs.is_empty()); + let last_handle_req = handle_reqs.last().unwrap(); + assert_request_has_exact_pubkeys(last_handle_req, &pks); + } + + // ------------------------------------------------------------- + // 2. Current-New Stream Lifecycle + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_new_stream_created_on_first_subscribe() { + let (mut mgr, factory) = create_manager(); + assert_eq!(mgr.account_stream_count(), 0); + + subscribe_n(&mut mgr, 1).await; + + assert_eq!(mgr.account_stream_count(), 1); + assert_eq!(factory.active_stream_count(), 1); + } + + #[tokio::test] + async fn test_current_new_stream_stays_below_threshold() { + let (mut mgr, _factory) = create_manager(); + // MAX_NEW - 1 = 4 + subscribe_in_batches(&mut mgr, 4, 2).await; + + assert_eq!(mgr.account_stream_count(), 1); + assert_eq!(mgr.unoptimized_old_stream_count(), 0); + } + + #[tokio::test] + async fn test_current_new_stream_promoted_at_threshold() { + let (mut mgr, factory) = create_manager(); + // Subscribe MAX_NEW (5) pubkeys first. + let first_five = make_pubkeys(5); + mgr.account_subscribe(&first_five, &COMMITMENT, None) + .await + .unwrap(); + assert_eq!(mgr.unoptimized_old_stream_count(), 0); + + // Subscribe the 6th pubkey → triggers promotion. + let sixth = Pubkey::new_unique(); + mgr.account_subscribe(&[sixth], &COMMITMENT, None) + .await + .unwrap(); + + assert_eq!(mgr.unoptimized_old_stream_count(), 1); + // After promotion current-new starts empty (all pubkeys + // including the 6th moved into unoptimized old). + assert_eq!(mgr.current_new_sub_count(), 0); + + // The promoted stream was last updated (via handle.write) + // with all 6 pubkeys. + let handle_reqs = factory.handle_requests(); + let promoted_req = handle_reqs.last().unwrap(); + let all_pks: Vec = first_five + .iter() + .copied() + .chain(std::iter::once(sixth)) + .collect(); + assert_request_has_exact_pubkeys(promoted_req, &all_pks); + } + + #[tokio::test] + async fn test_multiple_promotions_accumulate_unoptimized() { + let (mut mgr, factory) = create_manager(); + // First promotion: subscribe 6 pubkeys (exceeds MAX_NEW=5). + let pks1 = subscribe_n(&mut mgr, 6).await; + assert_eq!(mgr.unoptimized_old_stream_count(), 1); + assert_eq!(mgr.current_new_sub_count(), 0); + + // Second promotion: subscribe 6 more to exceed again. + let pks2 = subscribe_n(&mut mgr, 6).await; + assert_eq!(mgr.unoptimized_old_stream_count(), 2); + assert_eq!(mgr.current_new_sub_count(), 0); + + // Each promoted stream was created via subscribe() with + // its batch of 6 pubkeys. + let captured = factory.captured_requests(); + assert_eq!(captured.len(), 2); + assert_request_has_exact_pubkeys(&captured[0], &pks1); + assert_request_has_exact_pubkeys(&captured[1], &pks2); + } + + // ------------------------------------------------------------- + // 3. Optimization Trigger via MAX_OLD_UNOPTIMIZED + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_optimization_triggered_when_unoptimized_exceeds_max() { + let (mut mgr, _factory) = create_manager(); + // MAX_OLD_UNOPTIMIZED = 3. We need 4 promotions. + // Each promotion needs > MAX_NEW (5) pubkeys in current-new. + // Subscribe 6 four times → 4 promotions. + for _ in 0..3 { + subscribe_n(&mut mgr, 6).await; + } + assert_eq!(mgr.unoptimized_old_stream_count(), 3); + + // 4th promotion triggers optimization. + subscribe_n(&mut mgr, 6).await; + + // After optimization: unoptimized should be empty. + assert_eq!(mgr.unoptimized_old_stream_count(), 0); + // Optimized old streams should exist. + let total_subs = mgr.subscriptions().read().len(); + let expected_optimized = total_subs.div_ceil(10); // ceil(total / MAX_OLD_OPTIMIZED) + assert_eq!(mgr.optimized_old_stream_count(), expected_optimized,); + } + + #[tokio::test] + async fn test_optimization_not_triggered_below_max_unoptimized() { + let (mut mgr, _factory) = create_manager(); + // Exactly MAX_OLD_UNOPTIMIZED (3) promotions. + for _ in 0..3 { + subscribe_n(&mut mgr, 6).await; + } + assert_eq!(mgr.unoptimized_old_stream_count(), 3); + assert_eq!(mgr.optimized_old_stream_count(), 0); + } + + // ------------------------------------------------------------- + // 4. Manual / Interval-Driven Optimization + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_optimize_creates_correct_number_of_optimized_streams() { + let (mut mgr, _factory) = create_manager(); + subscribe_n(&mut mgr, 25).await; + + mgr.optimize(&COMMITMENT).await.unwrap(); + + // ceil(25 / 10) = 3 + assert_eq!(mgr.optimized_old_stream_count(), 3); + } + + #[tokio::test] + async fn test_optimize_clears_unoptimized_old_streams() { + let (mut mgr, _factory) = create_manager(); + // Create several unoptimized old streams. + for _ in 0..3 { + subscribe_n(&mut mgr, 6).await; + } + assert!(mgr.unoptimized_old_stream_count() > 0); + + mgr.optimize(&COMMITMENT).await.unwrap(); + + assert_eq!(mgr.unoptimized_old_stream_count(), 0); + assert!(mgr.optimized_old_stream_count() > 0); + } + + #[tokio::test] + async fn test_optimize_resets_current_new_stream() { + let (mut mgr, _factory) = create_manager(); + subscribe_n(&mut mgr, 8).await; + + mgr.optimize(&COMMITMENT).await.unwrap(); + + assert_eq!(mgr.current_new_sub_count(), 0); + } + + #[tokio::test] + async fn test_optimize_excludes_unsubscribed_pubkeys() { + let (mut mgr, factory) = create_manager(); + let pks = subscribe_n(&mut mgr, 15).await; + + // Unsubscribe 5 of them. + let to_unsub: Vec = pks[0..5].to_vec(); + mgr.account_unsubscribe(&to_unsub); + + let reqs_before = factory.captured_requests().len(); + mgr.optimize(&COMMITMENT).await.unwrap(); + + // Optimized streams should only contain the 10 remaining + // pubkeys. + let remaining: HashSet = + pks[5..].iter().map(|pk| pk.to_string()).collect(); + let filter_pks = all_filter_pubkeys_from(&factory, reqs_before); + assert_eq!(filter_pks.len(), 10); + for pk in &to_unsub { + assert!( + !filter_pks.contains(&pk.to_string()), + "unsubscribed pubkey {pk} found in optimized filter", + ); + } + for pk_str in &remaining { + assert!( + filter_pks.contains(pk_str), + "expected pubkey {pk_str} missing from optimized filter", + ); + } + } + + #[tokio::test] + async fn test_optimize_with_zero_subscriptions() { + let (mut mgr, _factory) = create_manager(); + let pks = subscribe_n(&mut mgr, 5).await; + mgr.account_unsubscribe(&pks); + + mgr.optimize(&COMMITMENT).await.unwrap(); + + assert_eq!(mgr.optimized_old_stream_count(), 0); + assert_eq!(mgr.unoptimized_old_stream_count(), 0); + } + + #[tokio::test] + async fn test_optimize_idempotent() { + let (mut mgr, _factory) = create_manager(); + subscribe_n(&mut mgr, 15).await; + + mgr.optimize(&COMMITMENT).await.unwrap(); + let count_after_first = mgr.optimized_old_stream_count(); + + mgr.optimize(&COMMITMENT).await.unwrap(); + assert_eq!(mgr.optimized_old_stream_count(), count_after_first,); + } + + // ------------------------------------------------------------- + // 5. Behavior During Optimization + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_subscribe_during_optimization_goes_to_current_new() { + let (mut mgr, _factory) = create_manager(); + subscribe_n(&mut mgr, 20).await; + + mgr.optimize(&COMMITMENT).await.unwrap(); + + // Subscribe a new pubkey after optimization. + let new_pk = Pubkey::new_unique(); + mgr.account_subscribe(&[new_pk], &COMMITMENT, None) + .await + .unwrap(); + + assert!(mgr.subscriptions().read().contains(&new_pk)); + assert!(mgr.current_new_subs().contains(&new_pk)); + } + + #[tokio::test] + async fn test_no_double_optimization_trigger() { + let (mut mgr, _factory) = create_manager(); + // Fill up to MAX_OLD_UNOPTIMIZED. + for _ in 0..3 { + subscribe_n(&mut mgr, 6).await; + } + assert_eq!(mgr.unoptimized_old_stream_count(), 3); + + // 4th promotion triggers optimization. + subscribe_n(&mut mgr, 6).await; + assert_eq!(mgr.unoptimized_old_stream_count(), 0); + let optimized_after_first = mgr.optimized_old_stream_count(); + + // Now subscribe enough to exceed MAX_SUBS_IN_NEW again, + // causing a promotion. Since optimization just ran, it should + // NOT trigger again immediately. + subscribe_n(&mut mgr, 6).await; + // Unoptimized grows by 1 but no second optimization. + assert!(mgr.unoptimized_old_stream_count() <= 1); + assert_eq!(mgr.optimized_old_stream_count(), optimized_after_first,); + } + + // ------------------------------------------------------------- + // 6. Unsubscribe + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_unsubscribe_removes_from_subscriptions_set() { + let (mut mgr, _factory) = create_manager(); + let pks = make_pubkeys(3); + mgr.account_subscribe(&pks, &COMMITMENT, None) + .await + .unwrap(); + + mgr.account_unsubscribe(&[pks[1]]); + + assert_subscriptions_eq(&mgr, &[pks[0], pks[2]]); + } + + #[test] + fn test_unsubscribe_nonexistent_pubkey_is_noop() { + let (mut mgr, _factory) = create_manager(); + let random = Pubkey::new_unique(); + + mgr.account_unsubscribe(&[random]); + + assert!(mgr.subscriptions().read().is_empty()); + } + + #[tokio::test] + async fn test_unsubscribe_already_unsubscribed_pubkey() { + let (mut mgr, _factory) = create_manager(); + let pk = Pubkey::new_unique(); + mgr.account_subscribe(&[pk], &COMMITMENT, None) + .await + .unwrap(); + + mgr.account_unsubscribe(&[pk]); + mgr.account_unsubscribe(&[pk]); + + assert!(mgr.subscriptions().read().is_empty()); + } + + #[tokio::test] + async fn test_unsubscribe_does_not_modify_streams() { + let (mut mgr, factory) = create_manager(); + let pks = make_pubkeys(4); + mgr.account_subscribe(&pks, &COMMITMENT, None) + .await + .unwrap(); + let calls_before = factory.captured_requests().len(); + + mgr.account_unsubscribe(&pks[0..2]); + + // No new factory calls after unsubscribe. + assert_eq!(factory.captured_requests().len(), calls_before); + // Current-new subs still contain all 4 (streams not updated). + for pk in &pks { + assert!(mgr.current_new_subs().contains(pk)); + } + } + + #[tokio::test] + async fn test_unsubscribe_all_then_optimize_clears_streams() { + let (mut mgr, _factory) = create_manager(); + // Subscribe 8 pubkeys (creates current-new + 1 unoptimized). + let pks = subscribe_n(&mut mgr, 8).await; + mgr.account_unsubscribe(&pks); + + mgr.optimize(&COMMITMENT).await.unwrap(); + + assert_eq!(mgr.optimized_old_stream_count(), 0); + assert_eq!(mgr.unoptimized_old_stream_count(), 0); + } + + #[tokio::test] + async fn test_unsubscribe_batch() { + let (mut mgr, factory) = create_manager(); + let pks = make_pubkeys(5); + mgr.account_subscribe(&pks, &COMMITMENT, None) + .await + .unwrap(); + let calls_before = factory.captured_requests().len(); + + mgr.account_unsubscribe(&[pks[0], pks[2], pks[4]]); + + assert_subscriptions_eq(&mgr, &[pks[1], pks[3]]); + assert_eq!(factory.captured_requests().len(), calls_before); + } + + // ------------------------------------------------------------- + // 7. Subscription Membership Check + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_is_subscribed_returns_true_for_active() { + let (mut mgr, _factory) = create_manager(); + let pk = Pubkey::new_unique(); + mgr.account_subscribe(&[pk], &COMMITMENT, None) + .await + .unwrap(); + + assert!(mgr.is_subscribed(&pk)); + } + + #[tokio::test] + async fn test_is_subscribed_returns_false_after_unsubscribe() { + let (mut mgr, _factory) = create_manager(); + let pk = Pubkey::new_unique(); + mgr.account_subscribe(&[pk], &COMMITMENT, None) + .await + .unwrap(); + mgr.account_unsubscribe(&[pk]); + + assert!(!mgr.is_subscribed(&pk)); + } + + #[test] + fn test_is_subscribed_returns_false_for_never_subscribed() { + let (mgr, _factory) = create_manager(); + let random = Pubkey::new_unique(); + + assert!(!mgr.is_subscribed(&random)); + } + + // ------------------------------------------------------------- + // 8. Stream Count Across Generations + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_account_stream_count_includes_all_generations() { + let (mut mgr, _factory) = create_manager(); + // Create optimized old streams. + subscribe_n(&mut mgr, 15).await; + mgr.optimize(&COMMITMENT).await.unwrap(); + + // Create an unoptimized old stream via promotion. + subscribe_n(&mut mgr, 6).await; + + // After promotion current-new is empty (no handle), so + // the count is optimized + unoptimized only. + let count = mgr.account_stream_count(); + assert!(count > 0); + assert_eq!( + count, + mgr.optimized_old_stream_count() + + mgr.unoptimized_old_stream_count(), + ); + } + + #[test] + fn test_account_stream_count_zero_when_no_subscriptions() { + let (mgr, _factory) = create_manager(); + assert_eq!(mgr.account_stream_count(), 0); + } + + #[tokio::test] + async fn test_account_stream_count_after_optimize_drops_unoptimized() { + let (mut mgr, _factory) = create_manager(); + // Create unoptimized old streams. + for _ in 0..2 { + subscribe_n(&mut mgr, 6).await; + } + assert!(mgr.unoptimized_old_stream_count() > 0); + + mgr.optimize(&COMMITMENT).await.unwrap(); + + assert_eq!(mgr.unoptimized_old_stream_count(), 0); + // Only optimized old streams remain (current-new is empty + // after optimize). + assert_eq!( + mgr.account_stream_count(), + mgr.optimized_old_stream_count(), + ); + } + + // ------------------------------------------------------------- + // 9. Edge Cases and Stress + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_subscribe_exactly_at_max_subs_in_new_no_promotion() { + let (mut mgr, _factory) = create_manager(); + // Exactly MAX_NEW (5) pubkeys — should NOT promote. + subscribe_n(&mut mgr, 5).await; + + assert_eq!(mgr.unoptimized_old_stream_count(), 0); + assert_eq!(mgr.account_stream_count(), 1); + } + + #[tokio::test] + async fn test_single_pubkey_optimization() { + let (mut mgr, _factory) = create_manager(); + subscribe_n(&mut mgr, 1).await; + + mgr.optimize(&COMMITMENT).await.unwrap(); + + assert_eq!(mgr.optimized_old_stream_count(), 1); + assert_eq!(mgr.current_new_sub_count(), 0); + } + + #[tokio::test] + async fn test_subscribe_max_old_optimized_plus_one() { + let (mut mgr, _factory) = create_manager(); + // MAX_OLD_OPTIMIZED + 1 = 11 + subscribe_n(&mut mgr, 11).await; + + mgr.optimize(&COMMITMENT).await.unwrap(); + + assert_eq!(mgr.optimized_old_stream_count(), 2); + } + + #[tokio::test] + async fn test_large_scale_subscribe_and_optimize() { + let (mut mgr, factory) = create_manager(); + let pks = subscribe_n(&mut mgr, 50).await; + + let reqs_before = factory.captured_requests().len(); + mgr.optimize(&COMMITMENT).await.unwrap(); + + // ceil(50 / 10) = 5 + assert_eq!(mgr.optimized_old_stream_count(), 5); + assert_eq!(mgr.subscriptions().read().len(), 50); + assert_eq!(mgr.current_new_sub_count(), 0); + + // Verify the union of all optimized stream filters equals all + // 50 pubkeys. + let filter_pks = all_filter_pubkeys_from(&factory, reqs_before); + assert_eq!(filter_pks.len(), 50); + for pk in &pks { + assert!(filter_pks.contains(&pk.to_string())); + } + } + + #[tokio::test] + async fn test_interleaved_subscribe_unsubscribe_then_optimize() { + let (mut mgr, factory) = create_manager(); + let pks = subscribe_n(&mut mgr, 20).await; + // Unsubscribe 8 scattered. + let unsub1: Vec = + pks.iter().step_by(2).take(8).copied().collect(); + mgr.account_unsubscribe(&unsub1); + + // Subscribe 5 new ones. + let new_pks = subscribe_n(&mut mgr, 5).await; + // Unsubscribe 2 of the new ones. + mgr.account_unsubscribe(&new_pks[0..2]); + + let expected_count = 20 - 8 + 5 - 2; + assert_eq!(mgr.subscriptions().read().len(), expected_count); + + let reqs_before = factory.captured_requests().len(); + mgr.optimize(&COMMITMENT).await.unwrap(); + + let filter_pks = all_filter_pubkeys_from(&factory, reqs_before); + assert_eq!(filter_pks.len(), expected_count); + // Verify unsubscribed pubkeys are absent. + for pk in &unsub1 { + assert!(!filter_pks.contains(&pk.to_string())); + } + for pk in &new_pks[0..2] { + assert!(!filter_pks.contains(&pk.to_string())); + } + } + + #[tokio::test] + async fn test_rapid_subscribe_unsubscribe_same_pubkey() { + let (mut mgr, _factory) = create_manager(); + let pk = Pubkey::new_unique(); + + mgr.account_subscribe(&[pk], &COMMITMENT, None) + .await + .unwrap(); + mgr.account_unsubscribe(&[pk]); + mgr.account_subscribe(&[pk], &COMMITMENT, None) + .await + .unwrap(); + + assert!(mgr.subscriptions().read().contains(&pk)); + assert!(mgr.current_new_subs().contains(&pk)); + } + + // ------------------------------------------------------------- + // 10. Stream Factory Interaction Verification + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_factory_called_with_correct_commitment() { + let (mut mgr, factory) = create_manager(); + let commitment = CommitmentLevel::Finalized; + let pk = Pubkey::new_unique(); + + mgr.account_subscribe(&[pk], &commitment, None) + .await + .unwrap(); + + // The subscribe call writes the actual request via the handle + let handle_reqs = factory.handle_requests(); + assert_eq!(handle_reqs.len(), 1); + assert_eq!( + handle_reqs[0].commitment, + Some(i32::from(CommitmentLevel::Finalized)), + ); + } + + #[tokio::test] + async fn test_factory_called_with_slot_filter() { + let (mut mgr, factory) = create_manager(); + subscribe_n(&mut mgr, 1).await; + + // The subscribe call writes the actual request via the handle + let handle_reqs = factory.handle_requests(); + assert!(!handle_reqs[0].slots.is_empty()); + } + + #[tokio::test] + async fn test_optimize_factory_calls_contain_chunked_pubkeys() { + let (mut mgr, factory) = create_manager(); + subscribe_n(&mut mgr, 15).await; + + let reqs_before = factory.captured_requests().len(); + mgr.optimize(&COMMITMENT).await.unwrap(); + + let optimize_reqs: Vec<_> = factory + .captured_requests() + .into_iter() + .skip(reqs_before) + .collect(); + assert_eq!(optimize_reqs.len(), 2); + + let first_pks = account_pubkeys_from_request(&optimize_reqs[0]); + let second_pks = account_pubkeys_from_request(&optimize_reqs[1]); + assert_eq!(first_pks.len(), 10); + assert_eq!(second_pks.len(), 5); + + // No overlap. + assert!(first_pks.is_disjoint(&second_pks)); + } + + #[tokio::test] + async fn test_factory_not_called_on_unsubscribe() { + let (mut mgr, factory) = create_manager(); + subscribe_n(&mut mgr, 5).await; + let calls_before = factory.captured_requests().len(); + + let pks: Vec = + mgr.subscriptions().read().iter().take(3).copied().collect(); + mgr.account_unsubscribe(&pks); + + assert_eq!(factory.captured_requests().len(), calls_before); + } + + // ------------------------------------------------------------- + // 11. from_slot Support + // ------------------------------------------------------------- + + #[tokio::test] + async fn test_from_slot_set_on_subscribe_request() { + let (mut mgr, factory) = create_manager(); + let pk = Pubkey::new_unique(); + + mgr.account_subscribe(&[pk], &COMMITMENT, Some(42)) + .await + .unwrap(); + + // The subscribe call writes the actual request via the handle + let handle_reqs = factory.handle_requests(); + assert_eq!(handle_reqs.len(), 1); + assert_eq!(handle_reqs[0].from_slot, Some(42)); + } + + #[tokio::test] + async fn test_from_slot_none_when_not_provided() { + let (mut mgr, factory) = create_manager(); + let pk = Pubkey::new_unique(); + + mgr.account_subscribe(&[pk], &COMMITMENT, None) + .await + .unwrap(); + + let reqs = factory.captured_requests(); + assert_eq!(reqs.len(), 1); + assert_eq!(reqs[0].from_slot, None); + } + + #[tokio::test] + async fn test_from_slot_forwarded_to_handle_write() { + let (mut mgr, factory) = create_manager(); + let pks = make_pubkeys(2); + + // First call creates the stream via subscribe (writes via + // handle internally). + mgr.account_subscribe(&[pks[0]], &COMMITMENT, Some(100)) + .await + .unwrap(); + // Second call updates via handle.write(). + mgr.account_subscribe(&[pks[1]], &COMMITMENT, Some(200)) + .await + .unwrap(); + + let handle_reqs = factory.handle_requests(); + // First write with pks[0] and from_slot=100, second write with + // pks[1] and from_slot=200 + assert_eq!(handle_reqs.len(), 2); + assert_eq!(handle_reqs[0].from_slot, Some(100)); + assert_eq!(handle_reqs[1].from_slot, Some(200)); + } + + #[tokio::test] + async fn test_optimize_sets_from_slot_none_without_chain_slot() { + let (mut mgr, factory) = create_manager(); + mgr.account_subscribe(&make_pubkeys(5), &COMMITMENT, Some(42)) + .await + .unwrap(); + + let reqs_before = factory.captured_requests().len(); + mgr.optimize(&COMMITMENT).await.unwrap(); + + let optimize_reqs: Vec<_> = factory + .captured_requests() + .into_iter() + .skip(reqs_before) + .collect(); + assert!(!optimize_reqs.is_empty()); + for req in &optimize_reqs { + assert_eq!( + req.from_slot, None, + "optimized streams should have from_slot=None \ + when no chain_slot is set", + ); + } + } + + #[tokio::test] + async fn test_optimize_uses_from_slot_with_chain_slot() { + use std::sync::{atomic::AtomicU64, Arc}; + + use crate::remote_account_provider::chain_slot::ChainSlot; + + let current_slot: u64 = 1000; + let chain_slot = ChainSlot::new(Arc::new(AtomicU64::new(current_slot))); + let factory = MockStreamFactory::new(); + let mut mgr = StreamManager::new( + test_config(), + factory.clone(), + Some(chain_slot), + "test".to_string(), + ); + + mgr.account_subscribe(&make_pubkeys(5), &COMMITMENT, Some(42)) + .await + .unwrap(); + + let reqs_before = factory.captured_requests().len(); + mgr.optimize(&COMMITMENT).await.unwrap(); + + let optimize_reqs: Vec<_> = factory + .captured_requests() + .into_iter() + .skip(reqs_before) + .collect(); + assert!(!optimize_reqs.is_empty()); + let expected_from_slot = + current_slot - ChainSlot::MAX_SLOTS_SUB_ACTIVATION; + for req in &optimize_reqs { + assert_eq!( + req.from_slot, + Some(expected_from_slot), + "optimized streams should backfill from \ + chain_slot - MAX_SLOTS_SUB_ACTIVATION", + ); + } + } + + // --------------------------------------------------------- + // 12. next_update Stream Updates + // --------------------------------------------------------- + + #[tokio::test] + async fn test_next_update_receives_account_updates() { + use std::time::Duration; + + use helius_laserstream::grpc::SubscribeUpdate; + + let (mut mgr, factory) = create_manager(); + subscribe_n(&mut mgr, 2).await; + + factory.push_update_to_stream(0, Ok(SubscribeUpdate::default())); + + let result = + tokio::time::timeout(Duration::from_millis(100), mgr.next_update()) + .await + .expect("next_update timed out"); + + let (source, update) = result.expect("stream ended"); + assert_eq!(source, StreamUpdateSource::Account); + assert!(update.is_ok()); + } + + #[tokio::test] + async fn test_next_update_receives_program_updates() { + use std::time::Duration; + + use helius_laserstream::grpc::SubscribeUpdate; + + let (mut mgr, factory) = create_manager(); + let program_id = Pubkey::new_unique(); + mgr.add_program_subscription(program_id, &COMMITMENT) + .await + .unwrap(); + + factory.push_update_to_stream(0, Ok(SubscribeUpdate::default())); + + let result = + tokio::time::timeout(Duration::from_millis(100), mgr.next_update()) + .await + .expect("next_update timed out"); + + let (source, update) = result.expect("stream ended"); + assert_eq!(source, StreamUpdateSource::Program); + assert!(update.is_ok()); + } + + #[tokio::test] + async fn test_next_update_receives_mixed_account_and_program() { + use std::time::Duration; + + use helius_laserstream::grpc::SubscribeUpdate; + + let (mut mgr, factory) = create_manager(); + + // Account stream → index 0 + subscribe_n(&mut mgr, 2).await; + // Program stream → index 1 + let program_id = Pubkey::new_unique(); + mgr.add_program_subscription(program_id, &COMMITMENT) + .await + .unwrap(); + + factory.push_update_to_stream(0, Ok(SubscribeUpdate::default())); + factory.push_update_to_stream(1, Ok(SubscribeUpdate::default())); + + let mut sources = Vec::new(); + for _ in 0..2 { + let result = tokio::time::timeout( + Duration::from_millis(100), + mgr.next_update(), + ) + .await + .expect("next_update timed out"); + + let (source, update) = result.expect("stream ended"); + assert!(update.is_ok()); + sources.push(source); + } + + assert!( + sources.contains(&StreamUpdateSource::Account), + "expected an Account update", + ); + assert!( + sources.contains(&StreamUpdateSource::Program), + "expected a Program update", + ); + } + + #[tokio::test] + async fn test_next_update_propagates_account_errors() { + use std::time::Duration; + + use helius_laserstream::LaserstreamError; + use tonic::Code; + + let (mut mgr, factory) = create_manager(); + subscribe_n(&mut mgr, 2).await; + + let status = tonic::Status::new(Code::Internal, "test error"); + let error = LaserstreamError::Status(status); + + factory.push_error_to_stream(0, error); + + let result = + tokio::time::timeout(Duration::from_millis(100), mgr.next_update()) + .await + .expect("next_update timed out"); + + let (source, update) = result.expect("stream ended"); + assert_eq!(source, StreamUpdateSource::Account); + assert!(update.is_err()); + } + + #[tokio::test] + async fn test_next_update_propagates_program_errors() { + use std::time::Duration; + + use helius_laserstream::LaserstreamError; + use tonic::Code; + + let (mut mgr, factory) = create_manager(); + let program_id = Pubkey::new_unique(); + mgr.add_program_subscription(program_id, &COMMITMENT) + .await + .unwrap(); + + let status = tonic::Status::new(Code::Internal, "program stream error"); + let error = LaserstreamError::Status(status); + + // Program stream is at index 0 when only program stream exists + factory.push_error_to_stream(0, error); + + let result = + tokio::time::timeout(Duration::from_millis(100), mgr.next_update()) + .await + .expect("next_update timed out"); + + let (source, update) = result.expect("stream ended"); + assert_eq!(source, StreamUpdateSource::Program); + assert!(update.is_err()); + } + + #[tokio::test] + async fn test_stream_closure_propagates_as_stream_end() { + use std::time::Duration; + + let (mut mgr, factory) = create_manager(); + subscribe_n(&mut mgr, 2).await; + + // Close the account stream (index 0) by removing its sender + factory.close_stream(0); + + // next_update should return None (stream ended) since the + // underlying channel was closed + let result = + tokio::time::timeout(Duration::from_millis(100), mgr.next_update()) + .await + .expect("next_update timed out"); + + // When a stream is closed, next_update returns None + assert!(result.is_none()); + } +} diff --git a/magicblock-chainlink/src/remote_account_provider/chain_slot.rs b/magicblock-chainlink/src/remote_account_provider/chain_slot.rs index d4d6e6b76..a3d5eb552 100644 --- a/magicblock-chainlink/src/remote_account_provider/chain_slot.rs +++ b/magicblock-chainlink/src/remote_account_provider/chain_slot.rs @@ -32,4 +32,16 @@ impl ChainSlot { pub fn load(&self) -> u64 { self.slot.load(Ordering::Relaxed) } + + /// The maximum amount of slots we expect to pass from the time + /// a subscription is requested until the point when it is + /// activated. ~10 secs + pub const MAX_SLOTS_SUB_ACTIVATION: u64 = 25; + + /// Computes a `from_slot` for backfilling based on the current + /// chain slot. + pub fn compute_from_slot(&self) -> u64 { + let current = self.load(); + current.saturating_sub(Self::MAX_SLOTS_SUB_ACTIVATION) + } } diff --git a/magicblock-chainlink/src/remote_account_provider/chain_updates_client.rs b/magicblock-chainlink/src/remote_account_provider/chain_updates_client.rs index 4dad375a4..0b922d791 100644 --- a/magicblock-chainlink/src/remote_account_provider/chain_updates_client.rs +++ b/magicblock-chainlink/src/remote_account_provider/chain_updates_client.rs @@ -70,7 +70,6 @@ impl ChainUpdatesClient { let slots = Slots { chain_slot: ChainSlot::new(chain_slot), - last_activation_slot: AtomicU64::new(0), supports_backfill: *supports_backfill, }; Ok(ChainUpdatesClient::Laser( diff --git a/magicblock-chainlink/src/remote_account_provider/errors.rs b/magicblock-chainlink/src/remote_account_provider/errors.rs index 15d7415e2..4284ef2c6 100644 --- a/magicblock-chainlink/src/remote_account_provider/errors.rs +++ b/magicblock-chainlink/src/remote_account_provider/errors.rs @@ -114,6 +114,11 @@ pub enum RemoteAccountProviderError { "The LoaderV4 program {0} account state deserialization failed: {1}" )] LoaderV4StateDeserializationFailed(Pubkey, String), + + #[error( + "Failed to update gRPC subscription to {0} after {1} retries: {2}" + )] + GrpcSubscriptionUpdateFailed(String, usize, String), } impl From for RemoteAccountProviderError diff --git a/magicblock-metrics/src/metrics/mod.rs b/magicblock-metrics/src/metrics/mod.rs index a4d7c1025..d340c28f9 100644 --- a/magicblock-metrics/src/metrics/mod.rs +++ b/magicblock-metrics/src/metrics/mod.rs @@ -486,6 +486,29 @@ lazy_static::lazy_static! { ), &["client_id"], ).unwrap(); + + // ----------------- + // GRPC Streams + // ----------------- + static ref GRPC_OPTIMIZED_STREAMS_GAUGE: IntGaugeVec = + IntGaugeVec::new( + Opts::new( + "grpc_optimized_streams_gauge", + "Number of optimized GRPC streams", + ), + &["client_id"], + ) + .unwrap(); + + static ref GRPC_UNOPTIMIZED_STREAMS_GAUGE: IntGaugeVec = + IntGaugeVec::new( + Opts::new( + "grpc_unoptimized_streams_gauge", + "Number of unoptimized GRPC streams", + ), + &["client_id"], + ) + .unwrap(); } pub(crate) fn register() { @@ -564,6 +587,8 @@ pub(crate) fn register() { register!(PUBSUB_CLIENT_RESUBSCRIBE_DELAY_MILLISECONDS_GAUGE); register!(PUBSUB_CLIENT_RESUBSCRIBED_GAUGE); register!(PUBSUB_CLIENT_CONNECTIONS_GAUGE); + register!(GRPC_OPTIMIZED_STREAMS_GAUGE); + register!(GRPC_UNOPTIMIZED_STREAMS_GAUGE); }); } @@ -875,3 +900,15 @@ pub fn set_pubsub_client_connections_count(client_id: &str, count: usize) { .with_label_values(&[client_id]) .set(count as i64); } + +pub fn set_grpc_optimized_streams_gauge(client_id: &str, count: usize) { + GRPC_OPTIMIZED_STREAMS_GAUGE + .with_label_values(&[client_id]) + .set(count as i64); +} + +pub fn set_grpc_unoptimized_streams_gauge(client_id: &str, count: usize) { + GRPC_UNOPTIMIZED_STREAMS_GAUGE + .with_label_values(&[client_id]) + .set(count as i64); +}