diff --git a/Cargo.toml b/Cargo.toml index d0b30e3..711ae1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,13 +16,13 @@ default = ["parallel"] parallel = ["rayon", "crossbeam-channel"] [dev-dependencies] -criterion = "0.3" +criterion = "0.5.1" [[bench]] name = "dep_graph" harness = false [dependencies] -crossbeam-channel = { version = "0.4", optional = true } +crossbeam-channel = { version = "0.5.8", optional = true } rayon = { version = "1.5", optional = true } num_cpus = "1.13" diff --git a/src/graph.rs b/src/graph.rs index c49eb7b..72e6438 100644 --- a/src/graph.rs +++ b/src/graph.rs @@ -9,6 +9,7 @@ pub type InnerDependencyMap = HashMap>; pub type DependencyMap = Arc>>; /// Dependency graph +#[derive(Debug, Default)] pub struct DepGraph where I: Clone + fmt::Debug + Eq + Hash + PartialEq + Send + Sync + 'static, @@ -43,16 +44,9 @@ where if node.deps().is_empty() { ready_nodes.push(node.id().clone()); - } - - for node_dep in node.deps() { - if !rdeps.contains_key(node_dep) { - let mut dep_rdeps = HashSet::new(); - dep_rdeps.insert(node.id().clone()); - rdeps.insert(node_dep.clone(), dep_rdeps.clone()); - } else { - let dep_rdeps = rdeps.get_mut(node_dep).unwrap(); - dep_rdeps.insert(node.id().clone()); + } else { + for node_dep in node.deps() { + rdeps.entry(node_dep.clone()).or_default().insert(node.id().clone()); } } } @@ -65,6 +59,20 @@ where } } +impl Clone for DepGraph +where + I: Clone + fmt::Debug + Eq + Hash + PartialEq + Send + Sync + 'static, +{ + fn clone(&self) -> Self { + Self { + ready_nodes: self.ready_nodes.clone(), + // clone the inner HashMap so that a new iteration can be started + deps: Arc::new(RwLock::new(self.deps.read().unwrap().clone())), + rdeps: Arc::new(RwLock::new(self.rdeps.read().unwrap().clone())), + } + } +} + impl IntoIterator for DepGraph where I: Clone + fmt::Debug + Eq + Hash + PartialEq + Send + Sync + 'static, @@ -73,7 +81,7 @@ where type IntoIter = DepGraphIter; fn into_iter(self) -> Self::IntoIter { - DepGraphIter::::new(self.ready_nodes.clone(), self.deps.clone(), self.rdeps) + DepGraphIter::::new(self.ready_nodes, self.deps, self.rdeps) } } @@ -133,33 +141,32 @@ pub fn remove_node_id( where I: Clone + fmt::Debug + Eq + Hash + PartialEq + Send + Sync + 'static, { - let rdep_ids = { - match rdeps.read().unwrap().get(&id) { - Some(node) => node.clone(), - // If no node depends on a node, it will not appear - // in rdeps. - None => Default::default(), - } - }; - let mut deps = deps.write().unwrap(); - let next_nodes = rdep_ids - .iter() - .filter_map(|rdep_id| { - let rdep = match deps.get_mut(&rdep_id) { - Some(rdep) => rdep, - None => return None, - }; - - rdep.remove(&id); - - if rdep.is_empty() { - Some(rdep_id.clone()) - } else { - None - } - }) - .collect(); + + let next_nodes = if let Some(rdep_ids) = rdeps.read().unwrap().get(&id) { + let next_nodes = rdep_ids + .iter() + .filter_map(|rdep_id| { + let rdep = match deps.get_mut(rdep_id) { + Some(rdep) => rdep, + None => return None, + }; + + rdep.remove(&id); + + if rdep.is_empty() { + Some(rdep_id.clone()) + } else { + None + } + }) + .collect(); + + next_nodes + } else { + // If no node depends on a node, it will not appear in rdeps. + vec![] + }; // Remove the current node from the list of dependencies. deps.remove(&id); diff --git a/src/graph_par.rs b/src/graph_par.rs index b5c6f7d..e393b3e 100644 --- a/src/graph_par.rs +++ b/src/graph_par.rs @@ -91,7 +91,7 @@ where (*self.counter).fetch_sub(1, Ordering::SeqCst); self.item_done_tx .send(self.inner.clone()) - .expect("could not send message") + .unwrap_or_else(|err| panic!("could not send message: {}", err)) } } @@ -170,9 +170,11 @@ where let (item_done_tx, item_done_rx) = crossbeam_channel::unbounded::(); // Inject ready nodes - ready_nodes - .iter() - .for_each(|node| item_ready_tx.send(node.clone()).unwrap()); + ready_nodes.into_iter().for_each(|node| { + item_ready_tx + .send(node) + .unwrap_or_else(|err| panic!("could not send message: {}", err)) + }); // Clone Arcs for dispatcher thread let loop_timeout = timeout.clone(); @@ -190,8 +192,11 @@ where // Send the next available nodes to the channel. next_nodes - .iter() - .for_each(|node_id| item_ready_tx.send(node_id.clone()).unwrap()); + .into_iter() + .for_each(|node_id| { + item_ready_tx.send(node_id) + .unwrap_or_else(|err| panic!("could not send message: {}", err)) + }); // If there are no more nodes, leave the loop if deps.read().unwrap().is_empty() { @@ -269,7 +274,7 @@ where CB: ProducerCallback, { callback.callback(DepGraphProducer { - counter: self.counter.clone(), + counter: self.counter, item_ready_rx: self.item_ready_rx, item_done_tx: self.item_done_tx, }) @@ -327,8 +332,8 @@ where fn into_iter(self) -> Self::IntoIter { Self { - counter: self.counter.clone(), - item_ready_rx: self.item_ready_rx.clone(), + counter: self.counter, + item_ready_rx: self.item_ready_rx, item_done_tx: self.item_done_tx, } }