Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
9973fb2
chore: add persub limit option to pubsub config
thlorenz Feb 6, 2026
93cf550
feat: initial pool impl using pubsub client trait
thlorenz Feb 6, 2026
0c21aa8
chore: extract pubsub connection to separate module
thlorenz Feb 6, 2026
43d9b7a
test: add mock PubsubConnection and make pool generic
thlorenz Feb 6, 2026
bb2b31d
chore: account sub tests for pool
thlorenz Feb 7, 2026
f220952
test: add comprehensive tests for pubsub pool account and program sub…
thlorenz Feb 7, 2026
50a86bd
chore: fix overkill and incomplete error conversion
thlorenz Feb 7, 2026
4512806
chore: prevent multi connection creation race condition
thlorenz Feb 7, 2026
29158ee
chore: reconnect pubsub pool on recovery
thlorenz Feb 7, 2026
ec5073a
refactor: extract subscribe logic to helper method
thlorenz Feb 7, 2026
a37d798
chore: harden pubsub reconnect to ensure all existing subs are closed
thlorenz Feb 9, 2026
5c4a206
feat: add pubsub_client_connections_gauge metric
thlorenz Feb 9, 2026
5c98860
fix: fmt
thlorenz Feb 9, 2026
0179218
Merge branch 'master' into thlorenz/websocket-pool-conections
thlorenz Feb 9, 2026
58089c8
tmp: dial down max subs per connection
thlorenz Feb 10, 2026
6bb8e0d
chore: fix unsub bug
thlorenz Feb 10, 2026
02f3dd8
chore: subscriptions fn returns hashset
thlorenz Feb 11, 2026
b8ba3e7
chore: subs union
thlorenz Feb 11, 2026
72ce021
chore: introducing union/intersection
thlorenz Feb 12, 2026
4dd13c0
chore: laser client has access to shared subscriptions
thlorenz Feb 12, 2026
4d823e7
chore: all clients return subs (instead option)
thlorenz Feb 12, 2026
06a4f67
chore: optimize set intersection method for submux
thlorenz Feb 12, 2026
c3d5329
feat: reconciler considers union vs. intersection of subscriptions
thlorenz Feb 12, 2026
230328f
chore: test reconciler
thlorenz Feb 12, 2026
5e1668a
chore: move previously existint reconciler tests to same module
thlorenz Feb 12, 2026
4047bdc
chore: fix bug in reconciler logic
thlorenz Feb 12, 2026
940db38
chore: no more reconciliation outside reconciler
thlorenz Feb 12, 2026
cc75e15
Merge branch 'master' into thlorenz/websocket-pool-conections
thlorenz Feb 12, 2026
35e31a8
chore: fmt + lint
thlorenz Feb 12, 2026
5253d66
chore: remove subscription_count method
thlorenz Feb 12, 2026
bf4a759
Merge branch 'thlorenz/websocket-pool-conections' into thlorenz/pool+…
thlorenz Feb 12, 2026
41f2040
Merge branch 'master' into thlorenz/websocket-pool-conections
thlorenz Feb 13, 2026
b8df74e
Merge branch 'master' into thlorenz/pool+better-reconciler
thlorenz Feb 13, 2026
0e3810d
chore: fmt
thlorenz Feb 13, 2026
46f1a1c
Merge branch 'thlorenz/websocket-pool-conections' into thlorenz/pool+…
thlorenz Feb 13, 2026
cdee786
fix: lint
thlorenz Feb 13, 2026
2d280a5
fix: fmt
thlorenz Feb 13, 2026
7ec483a
chore: fix coderabbits
thlorenz Feb 13, 2026
d401b8c
chore: fix import
thlorenz Feb 13, 2026
d7bd7a9
chore: remove read/write lock with potential race condition
thlorenz Feb 13, 2026
db8480d
ci: attempt to fix protoc discovery
thlorenz Feb 13, 2026
c41ddb4
tmp: remove non-problemeatic workflows for quicker triaging
thlorenz Feb 13, 2026
2afd9a1
ci: bust cache
thlorenz Feb 13, 2026
486e44d
Merge branch 'master' into thlorenz/websocket-pool-conections
thlorenz Feb 13, 2026
ec54854
Merge branch 'master' into thlorenz/pool+better-reconciler
thlorenz Feb 13, 2026
e3b3f04
Revert "tmp: remove non-problemeatic workflows for quicker triaging"
thlorenz Feb 13, 2026
0aeb168
Merge branch 'thlorenz/websocket-pool-conections' into thlorenz/pool+…
thlorenz Feb 13, 2026
a0a8fd8
chore: abstract stream factory to later test stream management
thlorenz Feb 13, 2026
5d58665
chore: make StreamFactory proper generic to avoid perf overhead
thlorenz Feb 16, 2026
00ff5c8
refactor: extract stream management into StreamManager
thlorenz Feb 16, 2026
ea5f62c
chore: prep new account subscribe
thlorenz Feb 16, 2026
c99b2fc
chore: initial tests
thlorenz Feb 16, 2026
64eff4e
chore: pass initial set of tests
thlorenz Feb 16, 2026
cca4244
chore: add remaining tests (not passing)
thlorenz Feb 16, 2026
2df5ea6
feat: initial impl to get all tests to pass
thlorenz Feb 16, 2026
2ea0e9c
chore: minor cleanup
thlorenz Feb 16, 2026
bf48eee
chore: minor cleanup and clarifications
thlorenz Feb 17, 2026
b2990a6
chore: prep to include handle in subscribe return
thlorenz Feb 17, 2026
613768e
chore: stream handle test integration
thlorenz Feb 17, 2026
67ba602
Merge branch 'master' into thlorenz/pool+better-reconciler+grpc-gener…
thlorenz Feb 17, 2026
da4c6e7
chore: operate on full streams with handles
thlorenz Feb 17, 2026
38f5a12
chore: remove unnecessary phantom data
thlorenz Feb 19, 2026
93bb3ac
chore: current subs updated via write to channel
thlorenz Feb 19, 2026
fabc155
chore: adding retry logic to sub write
thlorenz Feb 19, 2026
a0c6116
chore: bubble errors from account_subscribe
thlorenz Feb 19, 2026
c8fd9da
chore: minor clarification
thlorenz Feb 19, 2026
26a6f51
chore: program subs reuse stream via handle write
thlorenz Feb 19, 2026
558a1ae
chore: support account sub from_slot in stream manager
thlorenz Feb 19, 2026
e7e9188
chore: initial stream manager integration with some problems
thlorenz Feb 19, 2026
84bfa2b
chore: single subscriptions source in stream manager
thlorenz Feb 19, 2026
ceeaafd
chore: separate handles from streams to allow polling streammap
thlorenz Feb 19, 2026
45d7261
chore: add next_update tests
thlorenz Feb 19, 2026
22414b3
chore: fmt
thlorenz Feb 19, 2026
e940542
chore: remove dead_code
thlorenz Feb 19, 2026
6f95bd6
Merge branch 'master' into thlorenz/grpc-generational
thlorenz Feb 19, 2026
7fe1a68
chore: testing error propagation
thlorenz Feb 20, 2026
2fd5a65
chore: add test that stream ended is handled correctly
thlorenz Feb 20, 2026
724b2db
Merge branch 'master' into thlorenz/grpc-generational
thlorenz Feb 20, 2026
67520ef
chore: fmt
thlorenz Feb 20, 2026
dadba1c
chore: fix two coderabbits
thlorenz Feb 20, 2026
2e184f0
chore: optimize add program sub
thlorenz Feb 20, 2026
0a9dfd7
chore: obsolete activation slot
thlorenz Feb 20, 2026
1531973
chore: use async stream sub for current stream sub
thlorenz Feb 20, 2026
d720957
chore: move handle write to stream factory
thlorenz Feb 20, 2026
13b5b7a
chore: fix optimize race condition
thlorenz Feb 20, 2026
40a754e
chore: provide from_slot when optimizing
thlorenz Feb 20, 2026
7f5547c
chore: keep overflow in current + assert subs in other streams
thlorenz Feb 20, 2026
79161a2
chore: fmt
thlorenz Feb 20, 2026
8e6bf09
chore: fix loss of program subs on failure
thlorenz Feb 20, 2026
af3936e
chore: remove current subs if subscription failed
thlorenz Feb 20, 2026
7704488
chore: add note about optimize stream inconsistency in case of failure
thlorenz Feb 20, 2026
6c1e8d5
chore: retry all writes to handles
thlorenz Feb 20, 2026
4827309
chore: refactor out stream_factory
thlorenz Feb 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
945 changes: 0 additions & 945 deletions magicblock-chainlink/src/remote_account_provider/chain_laser_actor.rs

This file was deleted.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use helius_laserstream::{grpc::SubscribeRequest, LaserstreamError};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;

use super::{LaserResult, StreamFactory};
use crate::remote_account_provider::{
chain_laser_actor::{LaserStreamWithHandle, StreamHandle},
RemoteAccountProviderResult,
};

/// A test mock that captures subscription requests and allows driving
/// streams programmatically.
#[derive(Clone)]
pub struct MockStreamFactory {
/// Every `SubscribeRequest` passed to `subscribe()` is recorded
/// here so tests can assert on filter contents, commitment levels,
/// etc.
captured_requests: Arc<Mutex<Vec<SubscribeRequest>>>,

/// Requests sent through a `MockStreamHandle::write()` call are
/// recorded here so tests can verify handle-driven updates.
handle_requests: Arc<Mutex<Vec<SubscribeRequest>>>,

/// A sender that the test uses to push `LaserResult` items into
/// the streams returned by `subscribe()`.
/// Each call to `subscribe()` creates a new mpsc channel; the rx
/// side becomes the returned stream, and the tx side is stored
/// here so the test can drive updates.
stream_senders: Arc<Mutex<Vec<Arc<mpsc::UnboundedSender<LaserResult>>>>>,
}

impl MockStreamFactory {
/// Create a new mock stream factory
pub fn new() -> Self {
Self {
captured_requests: Arc::new(Mutex::new(Vec::new())),
handle_requests: Arc::new(Mutex::new(Vec::new())),
stream_senders: Arc::new(Mutex::new(Vec::new())),
}
}

/// Get the captured subscription requests (from `subscribe()`)
pub fn captured_requests(&self) -> Vec<SubscribeRequest> {
self.captured_requests.lock().unwrap().clone()
}

/// Get the requests sent through stream handles (from
/// `handle.write()`)
pub fn handle_requests(&self) -> Vec<SubscribeRequest> {
self.handle_requests.lock().unwrap().clone()
}

/// Push an error update to a specific stream
pub fn push_error_to_stream(&self, idx: usize, error: LaserstreamError) {
let senders = self.stream_senders.lock().unwrap();
if let Some(sender) = senders.get(idx) {
let _ = sender.send(Err(error));
}
}

/// Push an update to a specific stream by index
pub fn push_update_to_stream(&self, idx: usize, update: LaserResult) {
let senders = self.stream_senders.lock().unwrap();
if let Some(sender) = senders.get(idx) {
let _ = sender.send(update);
}
}

/// Get the number of active streams
pub fn active_stream_count(&self) -> usize {
self.stream_senders.lock().unwrap().len()
}

/// Close a specific stream by index
pub fn close_stream(&self, idx: usize) {
let mut senders = self.stream_senders.lock().unwrap();
if idx < senders.len() {
senders.remove(idx);
}
}
}

impl Default for MockStreamFactory {
fn default() -> Self {
Self::new()
}
}

/// Mock handle that records write requests and drains them into the
/// shared `handle_requests` vec on the factory.
#[derive(Clone)]
pub struct MockStreamHandle {
handle_requests: Arc<Mutex<Vec<SubscribeRequest>>>,
}

#[async_trait]
impl StreamHandle for MockStreamHandle {
async fn write(
&self,
request: SubscribeRequest,
) -> Result<(), LaserstreamError> {
self.handle_requests.lock().unwrap().push(request);
Ok(())
}
}

#[async_trait]
impl StreamFactory<MockStreamHandle> for MockStreamFactory {
async fn subscribe(
&self,
request: SubscribeRequest,
) -> RemoteAccountProviderResult<LaserStreamWithHandle<MockStreamHandle>>
{
// Record the initial subscribe request
self.captured_requests.lock().unwrap().push(request.clone());

// Create a channel for driving LaserResult items into the
// stream
let (stream_tx, stream_rx) = mpsc::unbounded_channel::<LaserResult>();
let stream = Box::pin(UnboundedReceiverStream::new(stream_rx));

let stream_tx = Arc::new(stream_tx);
self.stream_senders.lock().unwrap().push(stream_tx);

// The handle shares the factory's handle_requests vec so
// every write is visible to tests immediately.
let handle = MockStreamHandle {
handle_requests: Arc::clone(&self.handle_requests),
};

// Write the actual request to the handle (mirroring
// production behaviour of sending it over the network).
handle.write(request).await.unwrap();

Ok(LaserStreamWithHandle { stream, handle })
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::{collections::HashSet, pin::Pin, sync::Arc};

use futures_util::Stream;
use helius_laserstream::{
grpc::{SubscribeRequest, SubscribeUpdate},
LaserstreamError,
};
use parking_lot::RwLock;
use solana_pubkey::Pubkey;
use tokio::time::Duration;

pub use self::{
actor::{ChainLaserActor, Slots},
stream_factory::{
LaserStreamWithHandle, StreamFactory, StreamFactoryImpl, StreamHandle,
StreamHandleImpl,
},
stream_manager::{StreamManager, StreamManagerConfig, StreamUpdateSource},
};
use crate::remote_account_provider::{
RemoteAccountProviderError, RemoteAccountProviderResult,
};

pub type SharedSubscriptions = Arc<RwLock<HashSet<Pubkey>>>;

mod actor;
#[cfg(test)]
mod mock;
mod stream_factory;
mod stream_manager;

/// Retry a `handle.write(request)` call with linear backoff.
///
/// Tries up to `MAX_RETRIES` (5) times with 50 ms × attempt
/// backoff. Returns the original error after all retries are
/// exhausted.
pub(crate) async fn write_with_retry<S: StreamHandle>(
handle: &S,
task: &str,
request: SubscribeRequest,
) -> RemoteAccountProviderResult<()> {
const MAX_RETRIES: usize = 5;
let mut retries = MAX_RETRIES;
let initial_retries = retries;

loop {
match handle.write(request.clone()).await {
Ok(()) => return Ok(()),
Err(err) => {
if retries > 0 {
retries -= 1;
let backoff_ms = 50u64 * (initial_retries - retries) as u64;
tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
continue;
}
return Err(
RemoteAccountProviderError::GrpcSubscriptionUpdateFailed(
task.to_string(),
MAX_RETRIES,
format!("{err} ({err:?})"),
),
);
}
}
}
}

/// Result of a laser stream operation
pub type LaserResult = Result<SubscribeUpdate, LaserstreamError>;

/// A laser stream of subscription updates
pub type LaserStream = Pin<Box<dyn Stream<Item = LaserResult> + Send>>;
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use async_trait::async_trait;
use helius_laserstream::{
grpc::SubscribeRequest, LaserstreamError,
StreamHandle as HeliusStreamHandle,
};

use super::LaserStream;
use crate::remote_account_provider::RemoteAccountProviderResult;

/// A trait to represent the [HeliusStreamHandle].
/// This is needed since we cannot create the helius one since
/// [helius_laserstream::StreamHandle::write_tx] is private and there is no constructor.
#[async_trait]
pub trait StreamHandle {
/// Send a new subscription request to update the active subscription.
async fn write(
&self,
request: SubscribeRequest,
) -> Result<(), LaserstreamError>;
}

pub struct StreamHandleImpl {
pub handle: HeliusStreamHandle,
}

#[async_trait]
impl StreamHandle for StreamHandleImpl {
async fn write(
&self,
request: SubscribeRequest,
) -> Result<(), LaserstreamError> {
// This async operation gets forwarded to the underlying subscription sender of the laser
// client and completes after the given item has been fully processed into the sink,
// including flushing.
// The assumption is that at that point it has been processed on the receiver end and the
// subscription is updated.
// See: https://github.com/helius-labs/laserstream-sdk/blob/v0.2.2/rust/src/client.rs#L196-L201
self.handle.write(request).await
}
}

/// Abstraction over stream creation for testability
#[async_trait]
pub trait StreamFactory<S: StreamHandle>: Send + Sync + 'static {
/// Create a stream for the given subscription request.
async fn subscribe(
&self,
request: SubscribeRequest,
) -> RemoteAccountProviderResult<LaserStreamWithHandle<S>>;
}

pub struct LaserStreamWithHandle<S: StreamHandle> {
pub(crate) stream: LaserStream,
pub(crate) handle: S,
}

/// Production stream factory that wraps helius client subscribe
pub struct StreamFactoryImpl {
config: helius_laserstream::LaserstreamConfig,
}

impl StreamFactoryImpl {
pub fn new(config: helius_laserstream::LaserstreamConfig) -> Self {
Self { config }
}
}

#[async_trait]
impl StreamFactory<StreamHandleImpl> for StreamFactoryImpl {
/// This implementation creates the underlying gRPC stream with the request (which
/// returns immediately) and then writes the same request to the handle so it is sent
/// over the network before returning asynchronously.
async fn subscribe(
&self,
request: SubscribeRequest,
) -> RemoteAccountProviderResult<LaserStreamWithHandle<StreamHandleImpl>>
{
// NOTE: this call returns immediately yielding subscription errors and account updates
// via the stream, thus the subscription has not been received yet upstream
// NOTE: we need to use the same request as otherwise there is a potential race condition
// where our `write` below completes before this call and the request is overwritten
// with an empty one.
// Inside helius_laserstream::client::subscribe the request is cloned for that reason to
// be able to attempt the request multiple times during reconnect attempts.
// Given our requests contain a max of about 2_000 pubkeys, an extra clone here is a small
// price to pay to avoid this race condition.
let (stream, handle) = helius_laserstream::client::subscribe(
self.config.clone(),
request.clone(),
);
let handle = StreamHandleImpl { handle };
// Write to the handle and await it which at least guarantees that it has
// been sent over the network, even though there is still no guarantee it has been
// processed and that the subscription became active immediately
super::write_with_retry(&handle, "subscribe", request).await?;
Ok(LaserStreamWithHandle {
stream: Box::pin(stream),
handle,
})
}
}
Loading