11use graph:: components:: network_provider:: ProviderName ;
2- use graph:: endpoint:: EndpointMetrics ;
3-
2+ use graph:: endpoint:: { ConnectionType , EndpointMetrics , RequestLabels } ;
3+ use graph :: prelude :: alloy :: rpc :: json_rpc :: { RequestPacket , ResponsePacket } ;
44use graph:: prelude:: * ;
55use graph:: url:: Url ;
66use std:: sync:: Arc ;
7+ use std:: task:: { Context , Poll } ;
8+ use tower:: Service ;
9+
10+ use alloy:: transports:: { TransportError , TransportFut } ;
711
8- // Alloy imports for transport types
912use graph:: prelude:: alloy:: transports:: { http:: Http , ipc:: IpcConnect , ws:: WsConnect } ;
1013
1114/// Abstraction over different transport types for Alloy providers.
1215#[ derive( Clone , Debug ) ]
1316pub enum Transport {
1417 RPC {
15- client : Http < reqwest :: Client > ,
18+ client : alloy :: rpc :: client :: RpcClient ,
1619 metrics : Arc < EndpointMetrics > ,
1720 provider : ProviderName ,
1821 url : String ,
@@ -43,9 +46,6 @@ impl Transport {
4346 }
4447
4548 /// Creates a JSON-RPC over HTTP transport.
46- ///
47- /// Note: JSON-RPC over HTTP doesn't always support subscribing to new
48- /// blocks (one such example is Infura's HTTP endpoint).
4949 pub fn new_rpc (
5050 rpc : Url ,
5151 headers : graph:: http:: HeaderMap ,
@@ -60,93 +60,83 @@ impl Transport {
6060
6161 let rpc_url = rpc. to_string ( ) ;
6262
63+ // Create HTTP transport with metrics collection
64+ let http_transport = Http :: with_client ( client, rpc) ;
65+ let metrics_transport =
66+ MetricsHttp :: new ( http_transport, metrics. clone ( ) , provider. as_ref ( ) . into ( ) ) ;
67+ let rpc_client = alloy:: rpc:: client:: RpcClient :: new ( metrics_transport, false ) ;
68+
6369 Transport :: RPC {
64- client : Http :: with_client ( client , rpc ) ,
70+ client : rpc_client ,
6571 metrics,
6672 provider : provider. as_ref ( ) . into ( ) ,
6773 url : rpc_url,
6874 }
6975 }
7076}
7177
72- /*
73- impl web3::Transport for Transport {
74- type Out = Pin<Box<dyn Future<Output = Result<Value, web3::error::Error>> + Send + 'static>>;
75-
76- fn prepare(&self, method: &str, params: Vec<Value>) -> (RequestId, Call) {
77- match self {
78- Transport::RPC {
79- client,
80- metrics: _,
81- provider: _,
82- url: _,
83- } => client.prepare(method, params),
84- Transport::IPC { transport, path: _ } => transport.prepare(method, params),
85- Transport::WS { transport, url: _ } => transport.prepare(method, params),
78+ /// Custom HTTP transport wrapper that collects metrics
79+ #[ derive( Clone ) ]
80+ pub struct MetricsHttp {
81+ inner : Http < reqwest:: Client > ,
82+ metrics : Arc < EndpointMetrics > ,
83+ provider : ProviderName ,
84+ }
85+
86+ impl MetricsHttp {
87+ pub fn new (
88+ inner : Http < reqwest:: Client > ,
89+ metrics : Arc < EndpointMetrics > ,
90+ provider : ProviderName ,
91+ ) -> Self {
92+ Self {
93+ inner,
94+ metrics,
95+ provider,
8696 }
8797 }
98+ }
99+
100+ // Implement tower::Service trait for MetricsHttp to intercept RPC calls
101+ impl Service < RequestPacket > for MetricsHttp {
102+ type Response = ResponsePacket ;
103+ type Error = TransportError ;
104+ type Future = TransportFut < ' static > ;
105+
106+ fn poll_ready ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , Self :: Error > > {
107+ self . inner . poll_ready ( cx)
108+ }
88109
89- fn send(&self, id: RequestId, request: Call) -> Self::Out {
90- match self {
91- Transport::RPC {
92- client,
93- metrics,
110+ fn call ( & mut self , request : RequestPacket ) -> Self :: Future {
111+ let metrics = self . metrics . clone ( ) ;
112+ let provider = self . provider . clone ( ) ;
113+ let mut inner = self . inner . clone ( ) ;
114+
115+ Box :: pin ( async move {
116+ // Extract method name from request
117+ let method = match & request {
118+ RequestPacket :: Single ( req) => req. method ( ) . to_string ( ) ,
119+ RequestPacket :: Batch ( reqs) => reqs
120+ . first ( )
121+ . map ( |r| r. method ( ) . to_string ( ) )
122+ . unwrap_or_else ( || "batch" . to_string ( ) ) ,
123+ } ;
124+
125+ let labels = RequestLabels {
94126 provider,
95- url: _,
96- } => {
97- let metrics = metrics.cheap_clone();
98- let client = client.clone();
99- let method = match request {
100- Call::MethodCall(ref m) => m.method.as_str(),
101- _ => "unknown",
102- };
103-
104- let labels = RequestLabels {
105- provider: provider.clone(),
106- req_type: method.into(),
107- conn_type: graph::endpoint::ConnectionType::Rpc,
108- };
109- let out = async move {
110- let out = client.send(id, request).await;
111- match out {
112- Ok(_) => metrics.success(&labels),
113- Err(_) => metrics.failure(&labels),
114- }
115-
116- out
117- };
118-
119- Box::pin(out)
127+ req_type : method. into ( ) ,
128+ conn_type : ConnectionType :: Rpc ,
129+ } ;
130+
131+ // Call inner transport and track metrics
132+ let result = inner. call ( request) . await ;
133+
134+ match & result {
135+ Ok ( _) => metrics. success ( & labels) ,
136+ Err ( _) => metrics. failure ( & labels) ,
120137 }
121- Transport::IPC { transport, path: _ } => Box::pin(transport.send(id, request)),
122- Transport::WS { transport, url: _ } => Box::pin(transport.send(id, request)),
123- }
124- }
125- }
126- */
127-
128- /*
129- impl web3::BatchTransport for Transport {
130- type Batch = Box<
131- dyn Future<Output = Result<Vec<Result<Value, web3::error::Error>>, web3::error::Error>>
132- + Send
133- + Unpin,
134- >;
135-
136- fn send_batch<T>(&self, requests: T) -> Self::Batch
137- where
138- T: IntoIterator<Item = (RequestId, Call)>,
139- {
140- match self {
141- Transport::RPC {
142- client,
143- metrics: _,
144- provider: _,
145- url: _,
146- } => Box::new(client.send_batch(requests)),
147- Transport::IPC { transport, path: _ } => Box::new(transport.send_batch(requests)),
148- Transport::WS { transport, url: _ } => Box::new(transport.send_batch(requests)),
149- }
138+
139+ result
140+ } )
150141 }
151142}
152- */
0 commit comments