-
Notifications
You must be signed in to change notification settings - Fork 37
feat: implement StreamManager for gRPC subscription management #985
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
thlorenz
wants to merge
93
commits into
master
Choose a base branch
from
thlorenz/grpc-generational
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
93 commits
Select commit
Hold shift + click to select a range
9973fb2
chore: add persub limit option to pubsub config
thlorenz 93cf550
feat: initial pool impl using pubsub client trait
thlorenz 0c21aa8
chore: extract pubsub connection to separate module
thlorenz 43d9b7a
test: add mock PubsubConnection and make pool generic
thlorenz bb2b31d
chore: account sub tests for pool
thlorenz f220952
test: add comprehensive tests for pubsub pool account and program sub…
thlorenz 50a86bd
chore: fix overkill and incomplete error conversion
thlorenz 4512806
chore: prevent multi connection creation race condition
thlorenz 29158ee
chore: reconnect pubsub pool on recovery
thlorenz ec5073a
refactor: extract subscribe logic to helper method
thlorenz a37d798
chore: harden pubsub reconnect to ensure all existing subs are closed
thlorenz 5c4a206
feat: add pubsub_client_connections_gauge metric
thlorenz 5c98860
fix: fmt
thlorenz 0179218
Merge branch 'master' into thlorenz/websocket-pool-conections
thlorenz 58089c8
tmp: dial down max subs per connection
thlorenz 6bb8e0d
chore: fix unsub bug
thlorenz 02f3dd8
chore: subscriptions fn returns hashset
thlorenz b8ba3e7
chore: subs union
thlorenz 72ce021
chore: introducing union/intersection
thlorenz 4dd13c0
chore: laser client has access to shared subscriptions
thlorenz 4d823e7
chore: all clients return subs (instead option)
thlorenz 06a4f67
chore: optimize set intersection method for submux
thlorenz c3d5329
feat: reconciler considers union vs. intersection of subscriptions
thlorenz 230328f
chore: test reconciler
thlorenz 5e1668a
chore: move previously existint reconciler tests to same module
thlorenz 4047bdc
chore: fix bug in reconciler logic
thlorenz 940db38
chore: no more reconciliation outside reconciler
thlorenz cc75e15
Merge branch 'master' into thlorenz/websocket-pool-conections
thlorenz 35e31a8
chore: fmt + lint
thlorenz 5253d66
chore: remove subscription_count method
thlorenz bf4a759
Merge branch 'thlorenz/websocket-pool-conections' into thlorenz/pool+…
thlorenz 41f2040
Merge branch 'master' into thlorenz/websocket-pool-conections
thlorenz b8df74e
Merge branch 'master' into thlorenz/pool+better-reconciler
thlorenz 0e3810d
chore: fmt
thlorenz 46f1a1c
Merge branch 'thlorenz/websocket-pool-conections' into thlorenz/pool+…
thlorenz cdee786
fix: lint
thlorenz 2d280a5
fix: fmt
thlorenz 7ec483a
chore: fix coderabbits
thlorenz d401b8c
chore: fix import
thlorenz d7bd7a9
chore: remove read/write lock with potential race condition
thlorenz db8480d
ci: attempt to fix protoc discovery
thlorenz c41ddb4
tmp: remove non-problemeatic workflows for quicker triaging
thlorenz 2afd9a1
ci: bust cache
thlorenz 486e44d
Merge branch 'master' into thlorenz/websocket-pool-conections
thlorenz ec54854
Merge branch 'master' into thlorenz/pool+better-reconciler
thlorenz e3b3f04
Revert "tmp: remove non-problemeatic workflows for quicker triaging"
thlorenz 0aeb168
Merge branch 'thlorenz/websocket-pool-conections' into thlorenz/pool+…
thlorenz a0a8fd8
chore: abstract stream factory to later test stream management
thlorenz 5d58665
chore: make StreamFactory proper generic to avoid perf overhead
thlorenz 00ff5c8
refactor: extract stream management into StreamManager
thlorenz ea5f62c
chore: prep new account subscribe
thlorenz c99b2fc
chore: initial tests
thlorenz 64eff4e
chore: pass initial set of tests
thlorenz cca4244
chore: add remaining tests (not passing)
thlorenz 2df5ea6
feat: initial impl to get all tests to pass
thlorenz 2ea0e9c
chore: minor cleanup
thlorenz bf48eee
chore: minor cleanup and clarifications
thlorenz b2990a6
chore: prep to include handle in subscribe return
thlorenz 613768e
chore: stream handle test integration
thlorenz 67ba602
Merge branch 'master' into thlorenz/pool+better-reconciler+grpc-gener…
thlorenz da4c6e7
chore: operate on full streams with handles
thlorenz 38f5a12
chore: remove unnecessary phantom data
thlorenz 93bb3ac
chore: current subs updated via write to channel
thlorenz fabc155
chore: adding retry logic to sub write
thlorenz a0c6116
chore: bubble errors from account_subscribe
thlorenz c8fd9da
chore: minor clarification
thlorenz 26a6f51
chore: program subs reuse stream via handle write
thlorenz 558a1ae
chore: support account sub from_slot in stream manager
thlorenz e7e9188
chore: initial stream manager integration with some problems
thlorenz 84bfa2b
chore: single subscriptions source in stream manager
thlorenz ceeaafd
chore: separate handles from streams to allow polling streammap
thlorenz 45d7261
chore: add next_update tests
thlorenz 22414b3
chore: fmt
thlorenz e940542
chore: remove dead_code
thlorenz 6f95bd6
Merge branch 'master' into thlorenz/grpc-generational
thlorenz 7fe1a68
chore: testing error propagation
thlorenz 2fd5a65
chore: add test that stream ended is handled correctly
thlorenz 724b2db
Merge branch 'master' into thlorenz/grpc-generational
thlorenz 67520ef
chore: fmt
thlorenz dadba1c
chore: fix two coderabbits
thlorenz 2e184f0
chore: optimize add program sub
thlorenz 0a9dfd7
chore: obsolete activation slot
thlorenz 1531973
chore: use async stream sub for current stream sub
thlorenz d720957
chore: move handle write to stream factory
thlorenz 13b5b7a
chore: fix optimize race condition
thlorenz 40a754e
chore: provide from_slot when optimizing
thlorenz 7f5547c
chore: keep overflow in current + assert subs in other streams
thlorenz 79161a2
chore: fmt
thlorenz 8e6bf09
chore: fix loss of program subs on failure
thlorenz af3936e
chore: remove current subs if subscription failed
thlorenz 7704488
chore: add note about optimize stream inconsistency in case of failure
thlorenz 6c1e8d5
chore: retry all writes to handles
thlorenz 4827309
chore: refactor out stream_factory
thlorenz File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
945 changes: 0 additions & 945 deletions
945
magicblock-chainlink/src/remote_account_provider/chain_laser_actor.rs
This file was deleted.
Oops, something went wrong.
716 changes: 716 additions & 0 deletions
716
magicblock-chainlink/src/remote_account_provider/chain_laser_actor/actor.rs
Large diffs are not rendered by default.
Oops, something went wrong.
140 changes: 140 additions & 0 deletions
140
magicblock-chainlink/src/remote_account_provider/chain_laser_actor/mock.rs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,140 @@ | ||
| use std::sync::{Arc, Mutex}; | ||
|
|
||
| use async_trait::async_trait; | ||
| use helius_laserstream::{grpc::SubscribeRequest, LaserstreamError}; | ||
| use tokio::sync::mpsc; | ||
| use tokio_stream::wrappers::UnboundedReceiverStream; | ||
|
|
||
| use super::{LaserResult, StreamFactory}; | ||
| use crate::remote_account_provider::{ | ||
| chain_laser_actor::{LaserStreamWithHandle, StreamHandle}, | ||
| RemoteAccountProviderResult, | ||
| }; | ||
|
|
||
| /// A test mock that captures subscription requests and allows driving | ||
| /// streams programmatically. | ||
| #[derive(Clone)] | ||
| pub struct MockStreamFactory { | ||
| /// Every `SubscribeRequest` passed to `subscribe()` is recorded | ||
| /// here so tests can assert on filter contents, commitment levels, | ||
| /// etc. | ||
| captured_requests: Arc<Mutex<Vec<SubscribeRequest>>>, | ||
|
|
||
| /// Requests sent through a `MockStreamHandle::write()` call are | ||
| /// recorded here so tests can verify handle-driven updates. | ||
| handle_requests: Arc<Mutex<Vec<SubscribeRequest>>>, | ||
|
|
||
| /// A sender that the test uses to push `LaserResult` items into | ||
| /// the streams returned by `subscribe()`. | ||
| /// Each call to `subscribe()` creates a new mpsc channel; the rx | ||
| /// side becomes the returned stream, and the tx side is stored | ||
| /// here so the test can drive updates. | ||
| stream_senders: Arc<Mutex<Vec<Arc<mpsc::UnboundedSender<LaserResult>>>>>, | ||
| } | ||
|
|
||
| impl MockStreamFactory { | ||
| /// Create a new mock stream factory | ||
| pub fn new() -> Self { | ||
| Self { | ||
| captured_requests: Arc::new(Mutex::new(Vec::new())), | ||
| handle_requests: Arc::new(Mutex::new(Vec::new())), | ||
| stream_senders: Arc::new(Mutex::new(Vec::new())), | ||
| } | ||
| } | ||
|
|
||
| /// Get the captured subscription requests (from `subscribe()`) | ||
| pub fn captured_requests(&self) -> Vec<SubscribeRequest> { | ||
| self.captured_requests.lock().unwrap().clone() | ||
| } | ||
|
|
||
| /// Get the requests sent through stream handles (from | ||
| /// `handle.write()`) | ||
| pub fn handle_requests(&self) -> Vec<SubscribeRequest> { | ||
| self.handle_requests.lock().unwrap().clone() | ||
| } | ||
|
|
||
| /// Push an error update to a specific stream | ||
| pub fn push_error_to_stream(&self, idx: usize, error: LaserstreamError) { | ||
| let senders = self.stream_senders.lock().unwrap(); | ||
| if let Some(sender) = senders.get(idx) { | ||
| let _ = sender.send(Err(error)); | ||
| } | ||
| } | ||
|
|
||
| /// Push an update to a specific stream by index | ||
| pub fn push_update_to_stream(&self, idx: usize, update: LaserResult) { | ||
| let senders = self.stream_senders.lock().unwrap(); | ||
| if let Some(sender) = senders.get(idx) { | ||
| let _ = sender.send(update); | ||
| } | ||
| } | ||
|
|
||
| /// Get the number of active streams | ||
| pub fn active_stream_count(&self) -> usize { | ||
| self.stream_senders.lock().unwrap().len() | ||
| } | ||
|
|
||
| /// Close a specific stream by index | ||
| pub fn close_stream(&self, idx: usize) { | ||
| let mut senders = self.stream_senders.lock().unwrap(); | ||
| if idx < senders.len() { | ||
| senders.remove(idx); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl Default for MockStreamFactory { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| } | ||
|
|
||
| /// Mock handle that records write requests and drains them into the | ||
| /// shared `handle_requests` vec on the factory. | ||
| #[derive(Clone)] | ||
| pub struct MockStreamHandle { | ||
| handle_requests: Arc<Mutex<Vec<SubscribeRequest>>>, | ||
| } | ||
|
|
||
| #[async_trait] | ||
| impl StreamHandle for MockStreamHandle { | ||
| async fn write( | ||
| &self, | ||
| request: SubscribeRequest, | ||
| ) -> Result<(), LaserstreamError> { | ||
| self.handle_requests.lock().unwrap().push(request); | ||
| Ok(()) | ||
| } | ||
| } | ||
|
|
||
| #[async_trait] | ||
| impl StreamFactory<MockStreamHandle> for MockStreamFactory { | ||
| async fn subscribe( | ||
| &self, | ||
| request: SubscribeRequest, | ||
| ) -> RemoteAccountProviderResult<LaserStreamWithHandle<MockStreamHandle>> | ||
| { | ||
| // Record the initial subscribe request | ||
| self.captured_requests.lock().unwrap().push(request.clone()); | ||
|
|
||
| // Create a channel for driving LaserResult items into the | ||
| // stream | ||
| let (stream_tx, stream_rx) = mpsc::unbounded_channel::<LaserResult>(); | ||
| let stream = Box::pin(UnboundedReceiverStream::new(stream_rx)); | ||
|
|
||
| let stream_tx = Arc::new(stream_tx); | ||
| self.stream_senders.lock().unwrap().push(stream_tx); | ||
|
|
||
| // The handle shares the factory's handle_requests vec so | ||
| // every write is visible to tests immediately. | ||
| let handle = MockStreamHandle { | ||
| handle_requests: Arc::clone(&self.handle_requests), | ||
| }; | ||
|
|
||
| // Write the actual request to the handle (mirroring | ||
| // production behaviour of sending it over the network). | ||
| handle.write(request).await.unwrap(); | ||
|
|
||
| Ok(LaserStreamWithHandle { stream, handle }) | ||
| } | ||
| } | ||
72 changes: 72 additions & 0 deletions
72
magicblock-chainlink/src/remote_account_provider/chain_laser_actor/mod.rs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| use std::{collections::HashSet, pin::Pin, sync::Arc}; | ||
|
|
||
| use futures_util::Stream; | ||
| use helius_laserstream::{ | ||
| grpc::{SubscribeRequest, SubscribeUpdate}, | ||
| LaserstreamError, | ||
| }; | ||
| use parking_lot::RwLock; | ||
| use solana_pubkey::Pubkey; | ||
| use tokio::time::Duration; | ||
|
|
||
| pub use self::{ | ||
| actor::{ChainLaserActor, Slots}, | ||
| stream_factory::{ | ||
| LaserStreamWithHandle, StreamFactory, StreamFactoryImpl, StreamHandle, | ||
| StreamHandleImpl, | ||
| }, | ||
| stream_manager::{StreamManager, StreamManagerConfig, StreamUpdateSource}, | ||
| }; | ||
| use crate::remote_account_provider::{ | ||
| RemoteAccountProviderError, RemoteAccountProviderResult, | ||
| }; | ||
|
|
||
| pub type SharedSubscriptions = Arc<RwLock<HashSet<Pubkey>>>; | ||
|
|
||
| mod actor; | ||
| #[cfg(test)] | ||
| mod mock; | ||
| mod stream_factory; | ||
| mod stream_manager; | ||
|
|
||
| /// Retry a `handle.write(request)` call with linear backoff. | ||
| /// | ||
| /// Tries up to `MAX_RETRIES` (5) times with 50 ms × attempt | ||
| /// backoff. Returns the original error after all retries are | ||
| /// exhausted. | ||
| pub(crate) async fn write_with_retry<S: StreamHandle>( | ||
| handle: &S, | ||
| task: &str, | ||
| request: SubscribeRequest, | ||
| ) -> RemoteAccountProviderResult<()> { | ||
| const MAX_RETRIES: usize = 5; | ||
| let mut retries = MAX_RETRIES; | ||
| let initial_retries = retries; | ||
|
|
||
| loop { | ||
| match handle.write(request.clone()).await { | ||
| Ok(()) => return Ok(()), | ||
| Err(err) => { | ||
| if retries > 0 { | ||
| retries -= 1; | ||
| let backoff_ms = 50u64 * (initial_retries - retries) as u64; | ||
| tokio::time::sleep(Duration::from_millis(backoff_ms)).await; | ||
| continue; | ||
| } | ||
| return Err( | ||
| RemoteAccountProviderError::GrpcSubscriptionUpdateFailed( | ||
| task.to_string(), | ||
| MAX_RETRIES, | ||
| format!("{err} ({err:?})"), | ||
| ), | ||
| ); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Result of a laser stream operation | ||
| pub type LaserResult = Result<SubscribeUpdate, LaserstreamError>; | ||
|
|
||
| /// A laser stream of subscription updates | ||
| pub type LaserStream = Pin<Box<dyn Stream<Item = LaserResult> + Send>>; |
101 changes: 101 additions & 0 deletions
101
magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_factory.rs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,101 @@ | ||
| use async_trait::async_trait; | ||
| use helius_laserstream::{ | ||
| grpc::SubscribeRequest, LaserstreamError, | ||
| StreamHandle as HeliusStreamHandle, | ||
| }; | ||
|
|
||
| use super::LaserStream; | ||
| use crate::remote_account_provider::RemoteAccountProviderResult; | ||
|
|
||
| /// A trait to represent the [HeliusStreamHandle]. | ||
| /// This is needed since we cannot create the helius one since | ||
| /// [helius_laserstream::StreamHandle::write_tx] is private and there is no constructor. | ||
| #[async_trait] | ||
| pub trait StreamHandle { | ||
| /// Send a new subscription request to update the active subscription. | ||
| async fn write( | ||
| &self, | ||
| request: SubscribeRequest, | ||
| ) -> Result<(), LaserstreamError>; | ||
| } | ||
|
|
||
| pub struct StreamHandleImpl { | ||
| pub handle: HeliusStreamHandle, | ||
| } | ||
|
|
||
| #[async_trait] | ||
| impl StreamHandle for StreamHandleImpl { | ||
| async fn write( | ||
| &self, | ||
| request: SubscribeRequest, | ||
| ) -> Result<(), LaserstreamError> { | ||
| // This async operation gets forwarded to the underlying subscription sender of the laser | ||
| // client and completes after the given item has been fully processed into the sink, | ||
| // including flushing. | ||
| // The assumption is that at that point it has been processed on the receiver end and the | ||
| // subscription is updated. | ||
| // See: https://github.com/helius-labs/laserstream-sdk/blob/v0.2.2/rust/src/client.rs#L196-L201 | ||
| self.handle.write(request).await | ||
| } | ||
| } | ||
|
|
||
| /// Abstraction over stream creation for testability | ||
| #[async_trait] | ||
| pub trait StreamFactory<S: StreamHandle>: Send + Sync + 'static { | ||
| /// Create a stream for the given subscription request. | ||
| async fn subscribe( | ||
| &self, | ||
| request: SubscribeRequest, | ||
| ) -> RemoteAccountProviderResult<LaserStreamWithHandle<S>>; | ||
| } | ||
|
|
||
| pub struct LaserStreamWithHandle<S: StreamHandle> { | ||
| pub(crate) stream: LaserStream, | ||
| pub(crate) handle: S, | ||
| } | ||
|
|
||
| /// Production stream factory that wraps helius client subscribe | ||
| pub struct StreamFactoryImpl { | ||
| config: helius_laserstream::LaserstreamConfig, | ||
| } | ||
|
|
||
| impl StreamFactoryImpl { | ||
| pub fn new(config: helius_laserstream::LaserstreamConfig) -> Self { | ||
| Self { config } | ||
| } | ||
| } | ||
|
|
||
| #[async_trait] | ||
| impl StreamFactory<StreamHandleImpl> for StreamFactoryImpl { | ||
| /// This implementation creates the underlying gRPC stream with the request (which | ||
| /// returns immediately) and then writes the same request to the handle so it is sent | ||
| /// over the network before returning asynchronously. | ||
| async fn subscribe( | ||
| &self, | ||
| request: SubscribeRequest, | ||
| ) -> RemoteAccountProviderResult<LaserStreamWithHandle<StreamHandleImpl>> | ||
| { | ||
| // NOTE: this call returns immediately yielding subscription errors and account updates | ||
| // via the stream, thus the subscription has not been received yet upstream | ||
| // NOTE: we need to use the same request as otherwise there is a potential race condition | ||
| // where our `write` below completes before this call and the request is overwritten | ||
| // with an empty one. | ||
| // Inside helius_laserstream::client::subscribe the request is cloned for that reason to | ||
| // be able to attempt the request multiple times during reconnect attempts. | ||
| // Given our requests contain a max of about 2_000 pubkeys, an extra clone here is a small | ||
| // price to pay to avoid this race condition. | ||
| let (stream, handle) = helius_laserstream::client::subscribe( | ||
| self.config.clone(), | ||
| request.clone(), | ||
| ); | ||
| let handle = StreamHandleImpl { handle }; | ||
| // Write to the handle and await it which at least guarantees that it has | ||
| // been sent over the network, even though there is still no guarantee it has been | ||
| // processed and that the subscription became active immediately | ||
| super::write_with_retry(&handle, "subscribe", request).await?; | ||
| Ok(LaserStreamWithHandle { | ||
| stream: Box::pin(stream), | ||
| handle, | ||
| }) | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.