Skip to content

Commit eaff03f

Browse files
authored
Merge pull request #564 from PrimeIntellect-ai/release/v.0.3.5
Release: v.0.3.5
2 parents be374bb + ff7cecf commit eaff03f

File tree

17 files changed

+1118
-305
lines changed

17 files changed

+1118
-305
lines changed

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ iroh = "0.34.1"
4040
rand_v8 = { package = "rand", version = "0.8.5", features = ["std"] }
4141
rand_core_v6 = { package = "rand_core", version = "0.6.4", features = ["std"] }
4242
[workspace.package]
43-
version = "0.3.4"
43+
version = "0.3.5"
4444
edition = "2021"
4545

4646
[workspace.features]

crates/orchestrator/src/api/routes/nodes.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,14 @@ async fn restart_node_task(node_id: web::Path<String>, app_state: Data<AppState>
158158
}));
159159
}
160160

161-
let p2p_id = node.worker_p2p_id.as_ref().unwrap();
162-
let p2p_addresses = node.worker_p2p_addresses.as_ref().unwrap();
161+
let p2p_id = node
162+
.worker_p2p_id
163+
.as_ref()
164+
.expect("worker_p2p_id should be present");
165+
let p2p_addresses = node
166+
.worker_p2p_addresses
167+
.as_ref()
168+
.expect("worker_p2p_addresses should be present");
163169

164170
match app_state
165171
.p2p_client
@@ -227,8 +233,24 @@ async fn get_node_logs(node_id: web::Path<String>, app_state: Data<AppState>) ->
227233
}));
228234
}
229235

230-
let p2p_id = node.worker_p2p_id.as_ref().unwrap();
231-
let p2p_addresses = node.worker_p2p_addresses.as_ref().unwrap();
236+
let p2p_id = match node.worker_p2p_id.as_ref() {
237+
Some(id) => id,
238+
None => {
239+
return HttpResponse::BadRequest().json(json!({
240+
"success": false,
241+
"error": "Node does not have worker p2p id"
242+
}));
243+
}
244+
};
245+
let p2p_addresses = match node.worker_p2p_addresses.as_ref() {
246+
Some(addresses) => addresses,
247+
None => {
248+
return HttpResponse::BadRequest().json(json!({
249+
"success": false,
250+
"error": "Node does not have worker p2p addresses"
251+
}));
252+
}
253+
};
232254

233255
match app_state
234256
.p2p_client

crates/orchestrator/src/api/routes/storage.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ fn generate_file_name(template: &str, original_name: &str) -> String {
335335
mod tests {
336336

337337
use std::collections::HashMap;
338+
use std::sync::Arc;
338339

339340
use super::*;
340341
use crate::plugins::StatusUpdatePlugin;
@@ -580,13 +581,14 @@ mod tests {
580581
compute_requirements: None,
581582
};
582583

583-
let plugin = NodeGroupsPlugin::new(
584+
let plugin = Arc::new(NodeGroupsPlugin::new(
584585
vec![config],
585586
app_state.redis_store.clone(),
586587
app_state.store_context.clone(),
587588
None,
588589
None,
589-
);
590+
));
591+
let _ = plugin.clone().register_observer().await;
590592

591593
let _ = plugin
592594
.handle_status_change(&node, &NodeStatus::Healthy)

crates/orchestrator/src/main.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use shared::web3::contracts::core::builder::ContractBuilder;
4141
use shared::web3::contracts::structs::compute_pool::PoolStatus;
4242
use shared::web3::wallet::Wallet;
4343
use std::sync::Arc;
44+
use std::time::Duration;
4445
use tokio::task::JoinSet;
4546
use url::Url;
4647

@@ -176,6 +177,32 @@ async fn main() -> Result<()> {
176177
let store = Arc::new(RedisStore::new(&args.redis_store_url));
177178
let store_context = Arc::new(StoreContext::new(store.clone()));
178179

180+
if server_mode == ServerMode::ProcessorOnly || server_mode == ServerMode::Full {
181+
info!("Migrating node store to hash");
182+
if let Err(e) = store_context.node_store.migrate_json_to_hash().await {
183+
error!("Error migrating node store to hash: {}", e);
184+
}
185+
info!("Node store migrated to hash");
186+
} else {
187+
// Wait for all nodes to be migrated to hash format
188+
loop {
189+
match store_context.node_store.count_non_hash_format_nodes().await {
190+
Ok(0) => {
191+
info!("All nodes are in hash format");
192+
break;
193+
}
194+
Ok(count) => {
195+
info!("Waiting for {} nodes to be migrated to hash format", count);
196+
tokio::time::sleep(Duration::from_secs(1)).await;
197+
}
198+
Err(e) => {
199+
error!("Error counting non-hash format nodes: {}", e);
200+
tokio::time::sleep(Duration::from_secs(1)).await;
201+
}
202+
}
203+
}
204+
}
205+
179206
let p2p_client = Arc::new(P2PClient::new(wallet.clone()).await.unwrap());
180207

181208
let contracts = ContractBuilder::new(wallet.provider())
@@ -259,7 +286,14 @@ async fn main() -> Result<()> {
259286

260287
let status_group_plugin = group_plugin.clone();
261288
let group_plugin_for_server = group_plugin.clone();
262-
node_groups_plugin = Some(Arc::new(group_plugin_for_server));
289+
let group_plugin_arc = Arc::new(group_plugin_for_server);
290+
291+
// Register the plugin as a task observer
292+
if let Err(e) = group_plugin_arc.clone().register_observer().await {
293+
error!("Failed to register node groups plugin as observer: {}", e);
294+
}
295+
296+
node_groups_plugin = Some(group_plugin_arc);
263297
scheduler_plugins.push(Box::new(group_plugin));
264298
status_update_plugins.push(Box::new(status_group_plugin));
265299
info!("Plugin: Node group plugin initialized");

crates/orchestrator/src/models/node.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -65,22 +65,12 @@ impl From<DiscoveryNode> for OrchestratorNode {
6565
}
6666
}
6767

68-
impl OrchestratorNode {
69-
pub fn from_string(s: &str) -> Self {
70-
let mut node: Self = serde_json::from_str(s).unwrap();
71-
if node.status == NodeStatus::Dead || node.status == NodeStatus::Ejected {
72-
node.task_id = None;
73-
node.task_state = None;
74-
}
75-
node
76-
}
77-
}
78-
7968
impl fmt::Display for OrchestratorNode {
8069
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
8170
write!(f, "{}", serde_json::to_string(self).unwrap())
8271
}
8372
}
73+
8474
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default, ToSchema)]
8575
pub enum NodeStatus {
8676
#[default]

crates/orchestrator/src/plugins/node_groups/mod.rs

Lines changed: 91 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -166,22 +166,24 @@ impl NodeGroupsPlugin {
166166
}
167167
});
168168

169-
let plugin = Self {
169+
Self {
170170
configuration_templates: sorted_configs,
171171
store,
172172
store_context,
173173
node_groups_heartbeats,
174174
webhook_plugins,
175175
task_switching_policy,
176176
proximity_optimization_policy,
177-
};
177+
}
178+
}
178179

179-
plugin
180-
.store_context
180+
/// Register this plugin as a task observer (async)
181+
pub async fn register_observer(self: Arc<Self>) -> Result<()> {
182+
self.store_context
181183
.task_store
182-
.add_observer(Arc::new(plugin.clone()));
183-
184-
plugin
184+
.add_observer(self.clone())
185+
.await;
186+
Ok(())
185187
}
186188

187189
/// Check if a node is compatible with a configuration's compute requirements
@@ -776,19 +778,93 @@ impl NodeGroupsPlugin {
776778
let mut total_nodes = BTreeSet::new();
777779
let mut groups_to_dissolve = Vec::new();
778780

779-
// Select groups for merging
780-
for group in compatible_groups {
781-
if total_nodes.len() + group.nodes.len() <= config.max_group_size {
782-
merge_batch.push(group.clone());
783-
total_nodes.extend(group.nodes.iter().cloned());
784-
groups_to_dissolve.push(group.id.clone());
781+
// If proximity optimization is enabled, try to use location-based selection
782+
if self.proximity_optimization_policy.enabled {
783+
// Get node information for location data
784+
let nodes = self.store_context.node_store.get_nodes().await?;
785+
let node_map: HashMap<String, &OrchestratorNode> = nodes
786+
.iter()
787+
.map(|node| (node.address.to_string(), node))
788+
.collect();
789+
790+
// Try to find a seed group with location data
791+
let seed_group = compatible_groups.iter().find(|group| {
792+
group
793+
.nodes
794+
.iter()
795+
.next()
796+
.and_then(|addr| node_map.get(addr))
797+
.and_then(|node| node.location.as_ref())
798+
.is_some()
799+
});
800+
801+
if let Some(seed) = seed_group {
802+
// Found a seed with location, use proximity-based selection
803+
let seed_node = node_map.get(seed.nodes.iter().next().unwrap()).unwrap();
785804

786-
if total_nodes.len() >= config.max_group_size {
787-
break;
805+
merge_batch.push(seed.clone());
806+
total_nodes.extend(seed.nodes.iter().cloned());
807+
groups_to_dissolve.push(seed.id.clone());
808+
809+
// Create a sorted list of remaining groups by proximity
810+
let mut remaining_with_distance: Vec<(f64, &NodeGroup)> = compatible_groups
811+
.iter()
812+
.filter(|g| g.id != seed.id)
813+
.filter_map(|group| {
814+
let node_addr = group.nodes.iter().next()?;
815+
let node = node_map.get(node_addr)?;
816+
let node_loc = node.location.as_ref()?;
817+
let seed_loc = seed_node.location.as_ref()?;
818+
let distance = Self::calculate_distance(seed_loc, node_loc);
819+
Some((distance, group))
820+
})
821+
.collect();
822+
823+
remaining_with_distance
824+
.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
825+
826+
// Add closest groups first
827+
for (_, group) in remaining_with_distance {
828+
if total_nodes.len() + group.nodes.len() <= config.max_group_size {
829+
merge_batch.push(group.clone());
830+
total_nodes.extend(group.nodes.iter().cloned());
831+
groups_to_dissolve.push(group.id.clone());
832+
833+
if total_nodes.len() >= config.max_group_size {
834+
break;
835+
}
836+
}
788837
}
789838
}
790839
}
791840

841+
// If no proximity-based selection happened or we still need more groups, use original logic
842+
if merge_batch.is_empty()
843+
|| (total_nodes.len() < config.max_group_size
844+
&& total_nodes.len() < config.min_group_size)
845+
{
846+
// Reset if we didn't get enough nodes
847+
if total_nodes.len() < config.min_group_size {
848+
merge_batch.clear();
849+
total_nodes.clear();
850+
groups_to_dissolve.clear();
851+
}
852+
853+
// Original selection logic
854+
for group in compatible_groups {
855+
if !groups_to_dissolve.contains(&group.id)
856+
&& total_nodes.len() + group.nodes.len() <= config.max_group_size
857+
{
858+
merge_batch.push(group.clone());
859+
total_nodes.extend(group.nodes.iter().cloned());
860+
groups_to_dissolve.push(group.id.clone());
861+
862+
if total_nodes.len() >= config.max_group_size {
863+
break;
864+
}
865+
}
866+
}
867+
}
792868
// Validate merge conditions
793869
if !self
794870
.is_merge_beneficial(&merge_batch, total_nodes.len())

0 commit comments

Comments
 (0)