From d957b8c3aad3077c69a67dc8c8a57a92e54fbdbe Mon Sep 17 00:00:00 2001 From: Amin Yahyaabadi Date: Fri, 1 Sep 2023 21:22:26 -0700 Subject: [PATCH 1/7] feat: implement clone, debug, and default for DepGraph --- src/graph.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/graph.rs b/src/graph.rs index c49eb7b..4b769a7 100644 --- a/src/graph.rs +++ b/src/graph.rs @@ -9,6 +9,7 @@ pub type InnerDependencyMap = HashMap>; pub type DependencyMap = Arc>>; /// Dependency graph +#[derive(Clone, Debug, Default)] pub struct DepGraph where I: Clone + fmt::Debug + Eq + Hash + PartialEq + Send + Sync + 'static, From 6cff5b09f9373aeaf28ee4b89659ecd6c5a4ac81 Mon Sep 17 00:00:00 2001 From: Amin Yahyaabadi Date: Fri, 1 Sep 2023 21:30:00 -0700 Subject: [PATCH 2/7] fix: remove excess clone calls --- src/graph.rs | 6 +++--- src/graph_par.rs | 14 +++++++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/graph.rs b/src/graph.rs index 4b769a7..e3a8498 100644 --- a/src/graph.rs +++ b/src/graph.rs @@ -50,7 +50,7 @@ where if !rdeps.contains_key(node_dep) { let mut dep_rdeps = HashSet::new(); dep_rdeps.insert(node.id().clone()); - rdeps.insert(node_dep.clone(), dep_rdeps.clone()); + rdeps.insert(node_dep.clone(), dep_rdeps); } else { let dep_rdeps = rdeps.get_mut(node_dep).unwrap(); dep_rdeps.insert(node.id().clone()); @@ -74,7 +74,7 @@ where type IntoIter = DepGraphIter; fn into_iter(self) -> Self::IntoIter { - DepGraphIter::::new(self.ready_nodes.clone(), self.deps.clone(), self.rdeps) + DepGraphIter::::new(self.ready_nodes, self.deps, self.rdeps) } } @@ -147,7 +147,7 @@ where let next_nodes = rdep_ids .iter() .filter_map(|rdep_id| { - let rdep = match deps.get_mut(&rdep_id) { + let rdep = match deps.get_mut(rdep_id) { Some(rdep) => rdep, None => return None, }; diff --git a/src/graph_par.rs b/src/graph_par.rs index b5c6f7d..1a924b1 100644 --- a/src/graph_par.rs +++ b/src/graph_par.rs @@ -171,8 +171,8 @@ where // Inject ready nodes ready_nodes - .iter() - .for_each(|node| item_ready_tx.send(node.clone()).unwrap()); + .into_iter() + .for_each(|node| item_ready_tx.send(node).unwrap()); // Clone Arcs for dispatcher thread let loop_timeout = timeout.clone(); @@ -190,8 +190,8 @@ where // Send the next available nodes to the channel. next_nodes - .iter() - .for_each(|node_id| item_ready_tx.send(node_id.clone()).unwrap()); + .into_iter() + .for_each(|node_id| item_ready_tx.send(node_id).unwrap()); // If there are no more nodes, leave the loop if deps.read().unwrap().is_empty() { @@ -269,7 +269,7 @@ where CB: ProducerCallback, { callback.callback(DepGraphProducer { - counter: self.counter.clone(), + counter: self.counter, item_ready_rx: self.item_ready_rx, item_done_tx: self.item_done_tx, }) @@ -327,8 +327,8 @@ where fn into_iter(self) -> Self::IntoIter { Self { - counter: self.counter.clone(), - item_ready_rx: self.item_ready_rx.clone(), + counter: self.counter, + item_ready_rx: self.item_ready_rx, item_done_tx: self.item_done_tx, } } From df89e242ef4cb3497735d07ea5b1298cba44b2fe Mon Sep 17 00:00:00 2001 From: Amin Yahyaabadi Date: Fri, 1 Sep 2023 22:25:40 -0700 Subject: [PATCH 3/7] fix: manually clone the inner hashmaps --- src/graph.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/graph.rs b/src/graph.rs index e3a8498..2cced98 100644 --- a/src/graph.rs +++ b/src/graph.rs @@ -9,7 +9,7 @@ pub type InnerDependencyMap = HashMap>; pub type DependencyMap = Arc>>; /// Dependency graph -#[derive(Clone, Debug, Default)] +#[derive(Debug, Default)] pub struct DepGraph where I: Clone + fmt::Debug + Eq + Hash + PartialEq + Send + Sync + 'static, @@ -66,6 +66,20 @@ where } } +impl Clone for DepGraph +where + I: Clone + fmt::Debug + Eq + Hash + PartialEq + Send + Sync + 'static, +{ + fn clone(&self) -> Self { + Self { + ready_nodes: self.ready_nodes.clone(), + // clone the inner HashMap so that a new iteration can be started + deps: Arc::new(RwLock::new(self.deps.read().unwrap().clone())), + rdeps: Arc::new(RwLock::new(self.rdeps.read().unwrap().clone())), + } + } +} + impl IntoIterator for DepGraph where I: Clone + fmt::Debug + Eq + Hash + PartialEq + Send + Sync + 'static, From 61c8243c8a973633c19cb1638611889668ace4e0 Mon Sep 17 00:00:00 2001 From: Amin Yahyaabadi Date: Fri, 1 Sep 2023 21:41:56 -0700 Subject: [PATCH 4/7] fix: print the send_error message when panicing --- src/graph_par.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/graph_par.rs b/src/graph_par.rs index 1a924b1..e393b3e 100644 --- a/src/graph_par.rs +++ b/src/graph_par.rs @@ -91,7 +91,7 @@ where (*self.counter).fetch_sub(1, Ordering::SeqCst); self.item_done_tx .send(self.inner.clone()) - .expect("could not send message") + .unwrap_or_else(|err| panic!("could not send message: {}", err)) } } @@ -170,9 +170,11 @@ where let (item_done_tx, item_done_rx) = crossbeam_channel::unbounded::(); // Inject ready nodes - ready_nodes - .into_iter() - .for_each(|node| item_ready_tx.send(node).unwrap()); + ready_nodes.into_iter().for_each(|node| { + item_ready_tx + .send(node) + .unwrap_or_else(|err| panic!("could not send message: {}", err)) + }); // Clone Arcs for dispatcher thread let loop_timeout = timeout.clone(); @@ -191,7 +193,10 @@ where // Send the next available nodes to the channel. next_nodes .into_iter() - .for_each(|node_id| item_ready_tx.send(node_id).unwrap()); + .for_each(|node_id| { + item_ready_tx.send(node_id) + .unwrap_or_else(|err| panic!("could not send message: {}", err)) + }); // If there are no more nodes, leave the loop if deps.read().unwrap().is_empty() { From bc2de3da4dc69bf0bfdfbdbb850cd2a392b66192 Mon Sep 17 00:00:00 2001 From: Amin Yahyaabadi Date: Fri, 1 Sep 2023 21:48:49 -0700 Subject: [PATCH 5/7] fix: update dependencies --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d0b30e3..711ae1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,13 +16,13 @@ default = ["parallel"] parallel = ["rayon", "crossbeam-channel"] [dev-dependencies] -criterion = "0.3" +criterion = "0.5.1" [[bench]] name = "dep_graph" harness = false [dependencies] -crossbeam-channel = { version = "0.4", optional = true } +crossbeam-channel = { version = "0.5.8", optional = true } rayon = { version = "1.5", optional = true } num_cpus = "1.13" From 16eda57547503182b08108bc06a95baed4d6792a Mon Sep 17 00:00:00 2001 From: Amin Yahyaabadi Date: Sat, 2 Sep 2023 17:35:08 -0700 Subject: [PATCH 6/7] fix: avoid cloning the rdep in remove_node_id --- src/graph.rs | 51 +++++++++++++++++++++++++-------------------------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/src/graph.rs b/src/graph.rs index 2cced98..00bc090 100644 --- a/src/graph.rs +++ b/src/graph.rs @@ -148,33 +148,32 @@ pub fn remove_node_id( where I: Clone + fmt::Debug + Eq + Hash + PartialEq + Send + Sync + 'static, { - let rdep_ids = { - match rdeps.read().unwrap().get(&id) { - Some(node) => node.clone(), - // If no node depends on a node, it will not appear - // in rdeps. - None => Default::default(), - } - }; - let mut deps = deps.write().unwrap(); - let next_nodes = rdep_ids - .iter() - .filter_map(|rdep_id| { - let rdep = match deps.get_mut(rdep_id) { - Some(rdep) => rdep, - None => return None, - }; - - rdep.remove(&id); - - if rdep.is_empty() { - Some(rdep_id.clone()) - } else { - None - } - }) - .collect(); + + let next_nodes = if let Some(rdep_ids) = rdeps.read().unwrap().get(&id) { + let next_nodes = rdep_ids + .iter() + .filter_map(|rdep_id| { + let rdep = match deps.get_mut(rdep_id) { + Some(rdep) => rdep, + None => return None, + }; + + rdep.remove(&id); + + if rdep.is_empty() { + Some(rdep_id.clone()) + } else { + None + } + }) + .collect(); + + next_nodes + } else { + // If no node depends on a node, it will not appear in rdeps. + vec![] + }; // Remove the current node from the list of dependencies. deps.remove(&id); From 10fb0d5b9266821a5eae51581683c52aec79cabf Mon Sep 17 00:00:00 2001 From: Amin Yahyaabadi Date: Sat, 2 Sep 2023 18:24:26 -0700 Subject: [PATCH 7/7] fix: use rdeps.entry to populate the rdeps --- src/graph.rs | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/src/graph.rs b/src/graph.rs index 00bc090..72e6438 100644 --- a/src/graph.rs +++ b/src/graph.rs @@ -44,16 +44,9 @@ where if node.deps().is_empty() { ready_nodes.push(node.id().clone()); - } - - for node_dep in node.deps() { - if !rdeps.contains_key(node_dep) { - let mut dep_rdeps = HashSet::new(); - dep_rdeps.insert(node.id().clone()); - rdeps.insert(node_dep.clone(), dep_rdeps); - } else { - let dep_rdeps = rdeps.get_mut(node_dep).unwrap(); - dep_rdeps.insert(node.id().clone()); + } else { + for node_dep in node.deps() { + rdeps.entry(node_dep.clone()).or_default().insert(node.id().clone()); } } }