Skip to content

Commit 1bb7f87

Browse files
authored
feature: implement libp2p node for worker, validator, and orchestrator (#622)
1 parent b6847f1 commit 1bb7f87

File tree

47 files changed

+2704
-2012
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+2704
-2012
lines changed

Cargo.lock

Lines changed: 25 additions & 173 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@ members = [
55
"crates/validator",
66
"crates/shared",
77
"crates/orchestrator",
8+
"crates/p2p",
89
"crates/dev-utils",
910
]
1011
resolver = "2"
1112

1213
[workspace.dependencies]
1314
shared = { path = "crates/shared" }
15+
p2p = { path = "crates/p2p" }
16+
1417
actix-web = "4.9.0"
1518
clap = { version = "4.5.27", features = ["derive"] }
1619
serde = { version = "1.0.219", features = ["derive"] }
@@ -42,6 +45,7 @@ rand_core_v6 = { package = "rand_core", version = "0.6.4", features = ["std"] }
4245
ipld-core = "0.4"
4346
rust-ipfs = "0.14"
4447
cid = "0.11"
48+
tracing = "0.1.41"
4549

4650
[workspace.package]
4751
version = "0.3.11"

crates/discovery/src/chainsync/sync.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ async fn sync_single_node(
155155
})?;
156156

157157
let balance = provider.get_balance(node_address).await.map_err(|e| {
158-
error!("Error retrieving balance for node {}: {}", node_address, e);
158+
error!("Error retrieving balance for node {node_address}: {e}");
159159
anyhow::anyhow!("Failed to retrieve node balance")
160160
})?;
161161
n.latest_balance = Some(balance);
@@ -166,8 +166,7 @@ async fn sync_single_node(
166166
.await
167167
.map_err(|e| {
168168
error!(
169-
"Error retrieving node info for provider {} and node {}: {}",
170-
provider_address, node_address, e
169+
"Error retrieving node info for provider {provider_address} and node {node_address}: {e}"
171170
);
172171
anyhow::anyhow!("Failed to retrieve node info")
173172
})?;
@@ -177,10 +176,7 @@ async fn sync_single_node(
177176
.get_provider(provider_address)
178177
.await
179178
.map_err(|e| {
180-
error!(
181-
"Error retrieving provider info for {}: {}",
182-
provider_address, e
183-
);
179+
error!("Error retrieving provider info for {provider_address}: {e}");
184180
anyhow::anyhow!("Failed to retrieve provider info")
185181
})?;
186182

crates/orchestrator/Cargo.toml

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,35 +7,35 @@ edition.workspace = true
77
workspace = true
88

99
[dependencies]
10+
p2p = { workspace = true}
11+
shared = { workspace = true }
12+
1013
actix-web = { workspace = true }
11-
actix-web-prometheus = "0.1.2"
1214
alloy = { workspace = true }
1315
anyhow = { workspace = true }
14-
async-trait = "0.1.88"
15-
base64 = "0.22.1"
1616
chrono = { workspace = true, features = ["serde"] }
1717
clap = { workspace = true }
1818
env_logger = { workspace = true }
1919
futures = { workspace = true }
20-
google-cloud-auth = "0.18.0"
21-
google-cloud-storage = "0.24.0"
2220
hex = { workspace = true }
2321
log = { workspace = true }
24-
prometheus = "0.14.0"
25-
rand = "0.9.0"
2622
redis = { workspace = true, features = ["tokio-comp"] }
2723
redis-test = { workspace = true }
2824
reqwest = { workspace = true }
2925
serde = { workspace = true }
3026
serde_json = { workspace = true }
31-
shared = { workspace = true }
3227
tokio = { workspace = true }
28+
tokio-util = { workspace = true }
3329
url = { workspace = true }
30+
uuid = { workspace = true }
31+
32+
actix-web-prometheus = "0.1.2"
33+
google-cloud-auth = "0.18.0"
34+
google-cloud-storage = "0.24.0"
35+
prometheus = "0.14.0"
36+
rand = "0.9.0"
3437
utoipa = { version = "5.3.0", features = ["actix_extras", "chrono", "uuid"] }
3538
utoipa-swagger-ui = { version = "9.0.2", features = ["actix-web", "debug-embed", "reqwest", "vendored"] }
36-
uuid = { workspace = true }
37-
iroh = { workspace = true }
38-
rand_v8 = { workspace = true }
3939

4040
[dev-dependencies]
4141
mockito = { workspace = true }

crates/orchestrator/src/api/routes/groups.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,6 @@ async fn fetch_node_logs_p2p(
236236

237237
match node {
238238
Some(node) => {
239-
// Check if P2P client is available
240-
let p2p_client = app_state.p2p_client.clone();
241-
242239
// Check if node has P2P information
243240
let (worker_p2p_id, worker_p2p_addresses) =
244241
match (&node.worker_p2p_id, &node.worker_p2p_addresses) {
@@ -254,11 +251,22 @@ async fn fetch_node_logs_p2p(
254251
};
255252

256253
// Send P2P request for task logs
257-
match tokio::time::timeout(
258-
Duration::from_secs(NODE_REQUEST_TIMEOUT),
259-
p2p_client.get_task_logs(node_address, worker_p2p_id, worker_p2p_addresses),
260-
)
261-
.await
254+
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
255+
let get_task_logs_request = crate::p2p::GetTaskLogsRequest {
256+
worker_wallet_address: node_address,
257+
worker_p2p_id: worker_p2p_id.clone(),
258+
worker_addresses: worker_p2p_addresses.clone(),
259+
response_tx,
260+
};
261+
if let Err(e) = app_state.get_task_logs_tx.send(get_task_logs_request).await {
262+
error!("Failed to send GetTaskLogsRequest for node {node_address}: {e}");
263+
return json!({
264+
"success": false,
265+
"error": format!("Failed to send request: {}", e),
266+
"status": node.status.to_string()
267+
});
268+
};
269+
match tokio::time::timeout(Duration::from_secs(NODE_REQUEST_TIMEOUT), response_rx).await
262270
{
263271
Ok(Ok(log_lines)) => {
264272
json!({

crates/orchestrator/src/api/routes/nodes.rs

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,22 @@ async fn restart_node_task(node_id: web::Path<String>, app_state: Data<AppState>
164164
.as_ref()
165165
.expect("worker_p2p_addresses should be present");
166166

167-
match app_state
168-
.p2p_client
169-
.restart_task(node_address, p2p_id, p2p_addresses)
170-
.await
171-
{
167+
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
168+
let restart_task_request = crate::p2p::RestartTaskRequest {
169+
worker_wallet_address: node.address,
170+
worker_p2p_id: p2p_id.clone(),
171+
worker_addresses: p2p_addresses.clone(),
172+
response_tx,
173+
};
174+
if let Err(e) = app_state.restart_task_tx.send(restart_task_request).await {
175+
error!("Failed to send restart task request: {e}");
176+
return HttpResponse::InternalServerError().json(json!({
177+
"success": false,
178+
"error": "Failed to send restart task request"
179+
}));
180+
}
181+
182+
match response_rx.await {
172183
Ok(_) => HttpResponse::Ok().json(json!({
173184
"success": true,
174185
"message": "Task restarted successfully"
@@ -240,11 +251,22 @@ async fn get_node_logs(node_id: web::Path<String>, app_state: Data<AppState>) ->
240251
}));
241252
};
242253

243-
match app_state
244-
.p2p_client
245-
.get_task_logs(node_address, p2p_id, p2p_addresses)
246-
.await
247-
{
254+
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
255+
let get_task_logs_request = crate::p2p::GetTaskLogsRequest {
256+
worker_wallet_address: node.address,
257+
worker_p2p_id: p2p_id.clone(),
258+
worker_addresses: p2p_addresses.clone(),
259+
response_tx,
260+
};
261+
if let Err(e) = app_state.get_task_logs_tx.send(get_task_logs_request).await {
262+
error!("Failed to send get task logs request: {e}");
263+
return HttpResponse::InternalServerError().json(json!({
264+
"success": false,
265+
"error": "Failed to send get task logs request"
266+
}));
267+
}
268+
269+
match response_rx.await {
248270
Ok(logs) => HttpResponse::Ok().json(json!({
249271
"success": true,
250272
"logs": logs

crates/orchestrator/src/api/server.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::api::routes::task::tasks_routes;
55
use crate::api::routes::{heartbeat::heartbeat_routes, metrics::metrics_routes};
66
use crate::metrics::MetricsContext;
77
use crate::models::node::NodeStatus;
8-
use crate::p2p::client::P2PClient;
8+
use crate::p2p::{GetTaskLogsRequest, RestartTaskRequest};
99
use crate::plugins::node_groups::NodeGroupsPlugin;
1010
use crate::scheduler::Scheduler;
1111
use crate::store::core::{RedisStore, StoreContext};
@@ -23,6 +23,7 @@ use shared::utils::StorageProvider;
2323
use shared::web3::contracts::core::builder::Contracts;
2424
use shared::web3::wallet::WalletProvider;
2525
use std::sync::Arc;
26+
use tokio::sync::mpsc::Sender;
2627
use utoipa::{
2728
openapi::security::{ApiKey, ApiKeyValue, SecurityScheme},
2829
Modify, OpenApi,
@@ -116,17 +117,18 @@ async fn health_check(data: web::Data<AppState>) -> HttpResponse {
116117
}
117118

118119
pub(crate) struct AppState {
119-
pub store_context: Arc<StoreContext>,
120-
pub storage_provider: Option<Arc<dyn StorageProvider>>,
121-
pub heartbeats: Arc<LoopHeartbeats>,
122-
pub redis_store: Arc<RedisStore>,
123-
pub hourly_upload_limit: i64,
124-
pub contracts: Option<Contracts<WalletProvider>>,
125-
pub pool_id: u32,
126-
pub scheduler: Scheduler,
127-
pub node_groups_plugin: Option<Arc<NodeGroupsPlugin>>,
128-
pub metrics: Arc<MetricsContext>,
129-
pub p2p_client: Arc<P2PClient>,
120+
pub(crate) store_context: Arc<StoreContext>,
121+
pub(crate) storage_provider: Option<Arc<dyn StorageProvider>>,
122+
pub(crate) heartbeats: Arc<LoopHeartbeats>,
123+
pub(crate) redis_store: Arc<RedisStore>,
124+
pub(crate) hourly_upload_limit: i64,
125+
pub(crate) contracts: Option<Contracts<WalletProvider>>,
126+
pub(crate) pool_id: u32,
127+
pub(crate) scheduler: Scheduler,
128+
pub(crate) node_groups_plugin: Option<Arc<NodeGroupsPlugin>>,
129+
pub(crate) metrics: Arc<MetricsContext>,
130+
pub(crate) get_task_logs_tx: Sender<GetTaskLogsRequest>,
131+
pub(crate) restart_task_tx: Sender<RestartTaskRequest>,
130132
}
131133

132134
#[allow(clippy::too_many_arguments)]
@@ -145,7 +147,8 @@ pub async fn start_server(
145147
scheduler: Scheduler,
146148
node_groups_plugin: Option<Arc<NodeGroupsPlugin>>,
147149
metrics: Arc<MetricsContext>,
148-
p2p_client: Arc<P2PClient>,
150+
get_task_logs_tx: Sender<GetTaskLogsRequest>,
151+
restart_task_tx: Sender<RestartTaskRequest>,
149152
) -> Result<(), Error> {
150153
info!("Starting server at http://{host}:{port}");
151154
let app_state = Data::new(AppState {
@@ -159,7 +162,8 @@ pub async fn start_server(
159162
scheduler,
160163
node_groups_plugin,
161164
metrics,
162-
p2p_client,
165+
get_task_logs_tx,
166+
restart_task_tx,
163167
});
164168
let node_store = app_state.store_context.node_store.clone();
165169
let node_store_clone = node_store.clone();

crates/orchestrator/src/api/tests/helper.rs

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ use std::sync::Arc;
1818
use url::Url;
1919

2020
#[cfg(test)]
21-
pub async fn create_test_app_state() -> Data<AppState> {
21+
pub(crate) async fn create_test_app_state() -> Data<AppState> {
2222
use shared::utils::MockStorageProvider;
2323

2424
use crate::{
25-
metrics::MetricsContext, p2p::client::P2PClient, scheduler::Scheduler,
26-
utils::loop_heartbeats::LoopHeartbeats, ServerMode,
25+
metrics::MetricsContext, scheduler::Scheduler, utils::loop_heartbeats::LoopHeartbeats,
26+
ServerMode,
2727
};
2828

2929
let store = Arc::new(RedisStore::new_test());
@@ -46,12 +46,8 @@ pub async fn create_test_app_state() -> Data<AppState> {
4646
let mock_storage = MockStorageProvider::new();
4747
let storage_provider = Arc::new(mock_storage);
4848
let metrics = Arc::new(MetricsContext::new(1.to_string()));
49-
let wallet = Wallet::new(
50-
"0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97",
51-
Url::parse("http://localhost:8545").unwrap(),
52-
)
53-
.unwrap();
54-
let p2p_client = Arc::new(P2PClient::new(wallet.clone()).await.unwrap());
49+
let (get_task_logs_tx, _) = tokio::sync::mpsc::channel(1);
50+
let (restart_task_tx, _) = tokio::sync::mpsc::channel(1);
5551

5652
Data::new(AppState {
5753
store_context: store_context.clone(),
@@ -64,17 +60,17 @@ pub async fn create_test_app_state() -> Data<AppState> {
6460
scheduler,
6561
node_groups_plugin: None,
6662
metrics,
67-
p2p_client: p2p_client.clone(),
63+
get_task_logs_tx,
64+
restart_task_tx,
6865
})
6966
}
7067

7168
#[cfg(test)]
72-
pub async fn create_test_app_state_with_nodegroups() -> Data<AppState> {
69+
pub(crate) async fn create_test_app_state_with_nodegroups() -> Data<AppState> {
7370
use shared::utils::MockStorageProvider;
7471

7572
use crate::{
7673
metrics::MetricsContext,
77-
p2p::client::P2PClient,
7874
plugins::node_groups::{NodeGroupConfiguration, NodeGroupsPlugin},
7975
scheduler::Scheduler,
8076
utils::loop_heartbeats::LoopHeartbeats,
@@ -116,12 +112,8 @@ pub async fn create_test_app_state_with_nodegroups() -> Data<AppState> {
116112
let mock_storage = MockStorageProvider::new();
117113
let storage_provider = Arc::new(mock_storage);
118114
let metrics = Arc::new(MetricsContext::new(1.to_string()));
119-
let wallet = Wallet::new(
120-
"0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97",
121-
Url::parse("http://localhost:8545").unwrap(),
122-
)
123-
.unwrap();
124-
let p2p_client = Arc::new(P2PClient::new(wallet.clone()).await.unwrap());
115+
let (get_task_logs_tx, _) = tokio::sync::mpsc::channel(1);
116+
let (restart_task_tx, _) = tokio::sync::mpsc::channel(1);
125117

126118
Data::new(AppState {
127119
store_context: store_context.clone(),
@@ -134,12 +126,13 @@ pub async fn create_test_app_state_with_nodegroups() -> Data<AppState> {
134126
scheduler,
135127
node_groups_plugin,
136128
metrics,
137-
p2p_client: p2p_client.clone(),
129+
get_task_logs_tx,
130+
restart_task_tx,
138131
})
139132
}
140133

141134
#[cfg(test)]
142-
pub fn setup_contract() -> Contracts<WalletProvider> {
135+
pub(crate) fn setup_contract() -> Contracts<WalletProvider> {
143136
let coordinator_key = "0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97";
144137
let rpc_url: Url = Url::parse("http://localhost:8545").unwrap();
145138
let wallet = Wallet::new(coordinator_key, rpc_url).unwrap();
@@ -154,12 +147,12 @@ pub fn setup_contract() -> Contracts<WalletProvider> {
154147
}
155148

156149
#[cfg(test)]
157-
pub async fn create_test_app_state_with_metrics() -> Data<AppState> {
150+
pub(crate) async fn create_test_app_state_with_metrics() -> Data<AppState> {
158151
use shared::utils::MockStorageProvider;
159152

160153
use crate::{
161-
metrics::MetricsContext, p2p::client::P2PClient, scheduler::Scheduler,
162-
utils::loop_heartbeats::LoopHeartbeats, ServerMode,
154+
metrics::MetricsContext, scheduler::Scheduler, utils::loop_heartbeats::LoopHeartbeats,
155+
ServerMode,
163156
};
164157

165158
let store = Arc::new(RedisStore::new_test());
@@ -182,12 +175,8 @@ pub async fn create_test_app_state_with_metrics() -> Data<AppState> {
182175
let mock_storage = MockStorageProvider::new();
183176
let storage_provider = Arc::new(mock_storage);
184177
let metrics = Arc::new(MetricsContext::new("0".to_string()));
185-
let wallet = Wallet::new(
186-
"0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97",
187-
Url::parse("http://localhost:8545").unwrap(),
188-
)
189-
.unwrap();
190-
let p2p_client = Arc::new(P2PClient::new(wallet.clone()).await.unwrap());
178+
let (get_task_logs_tx, _) = tokio::sync::mpsc::channel(1);
179+
let (restart_task_tx, _) = tokio::sync::mpsc::channel(1);
191180

192181
Data::new(AppState {
193182
store_context: store_context.clone(),
@@ -200,6 +189,7 @@ pub async fn create_test_app_state_with_metrics() -> Data<AppState> {
200189
scheduler,
201190
node_groups_plugin: None,
202191
metrics,
203-
p2p_client: p2p_client.clone(),
192+
get_task_logs_tx,
193+
restart_task_tx,
204194
})
205195
}

0 commit comments

Comments
 (0)