Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ default = ["parallel"]
parallel = ["rayon", "crossbeam-channel"]

[dev-dependencies]
criterion = "0.3"
criterion = "0.5.1"

[[bench]]
name = "dep_graph"
harness = false

[dependencies]
crossbeam-channel = { version = "0.4", optional = true }
crossbeam-channel = { version = "0.5.8", optional = true }
rayon = { version = "1.5", optional = true }
num_cpus = "1.13"
81 changes: 44 additions & 37 deletions src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub type InnerDependencyMap<I> = HashMap<I, HashSet<I>>;
pub type DependencyMap<I> = Arc<RwLock<InnerDependencyMap<I>>>;

/// Dependency graph
#[derive(Debug, Default)]
pub struct DepGraph<I>
where
I: Clone + fmt::Debug + Eq + Hash + PartialEq + Send + Sync + 'static,
Expand Down Expand Up @@ -43,16 +44,9 @@ where

if node.deps().is_empty() {
ready_nodes.push(node.id().clone());
}

for node_dep in node.deps() {
if !rdeps.contains_key(node_dep) {
let mut dep_rdeps = HashSet::new();
dep_rdeps.insert(node.id().clone());
rdeps.insert(node_dep.clone(), dep_rdeps.clone());
} else {
let dep_rdeps = rdeps.get_mut(node_dep).unwrap();
dep_rdeps.insert(node.id().clone());
} else {
for node_dep in node.deps() {
rdeps.entry(node_dep.clone()).or_default().insert(node.id().clone());
}
}
}
Expand All @@ -65,6 +59,20 @@ where
}
}

impl<I: Clone> Clone for DepGraph<I>
where
I: Clone + fmt::Debug + Eq + Hash + PartialEq + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
ready_nodes: self.ready_nodes.clone(),
// clone the inner HashMap so that a new iteration can be started
deps: Arc::new(RwLock::new(self.deps.read().unwrap().clone())),
rdeps: Arc::new(RwLock::new(self.rdeps.read().unwrap().clone())),
}
}
}

impl<I> IntoIterator for DepGraph<I>
where
I: Clone + fmt::Debug + Eq + Hash + PartialEq + Send + Sync + 'static,
Expand All @@ -73,7 +81,7 @@ where
type IntoIter = DepGraphIter<I>;

fn into_iter(self) -> Self::IntoIter {
DepGraphIter::<I>::new(self.ready_nodes.clone(), self.deps.clone(), self.rdeps)
DepGraphIter::<I>::new(self.ready_nodes, self.deps, self.rdeps)
}
}

Expand Down Expand Up @@ -133,33 +141,32 @@ pub fn remove_node_id<I>(
where
I: Clone + fmt::Debug + Eq + Hash + PartialEq + Send + Sync + 'static,
{
let rdep_ids = {
match rdeps.read().unwrap().get(&id) {
Some(node) => node.clone(),
// If no node depends on a node, it will not appear
// in rdeps.
None => Default::default(),
}
};

let mut deps = deps.write().unwrap();
let next_nodes = rdep_ids
.iter()
.filter_map(|rdep_id| {
let rdep = match deps.get_mut(&rdep_id) {
Some(rdep) => rdep,
None => return None,
};

rdep.remove(&id);

if rdep.is_empty() {
Some(rdep_id.clone())
} else {
None
}
})
.collect();

let next_nodes = if let Some(rdep_ids) = rdeps.read().unwrap().get(&id) {
let next_nodes = rdep_ids
.iter()
.filter_map(|rdep_id| {
let rdep = match deps.get_mut(rdep_id) {
Some(rdep) => rdep,
None => return None,
};

rdep.remove(&id);

if rdep.is_empty() {
Some(rdep_id.clone())
} else {
None
}
})
.collect();

next_nodes
} else {
// If no node depends on a node, it will not appear in rdeps.
vec![]
};

// Remove the current node from the list of dependencies.
deps.remove(&id);
Expand Down
23 changes: 14 additions & 9 deletions src/graph_par.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ where
(*self.counter).fetch_sub(1, Ordering::SeqCst);
self.item_done_tx
.send(self.inner.clone())
.expect("could not send message")
.unwrap_or_else(|err| panic!("could not send message: {}", err))
}
}

Expand Down Expand Up @@ -170,9 +170,11 @@ where
let (item_done_tx, item_done_rx) = crossbeam_channel::unbounded::<I>();

// Inject ready nodes
ready_nodes
.iter()
.for_each(|node| item_ready_tx.send(node.clone()).unwrap());
ready_nodes.into_iter().for_each(|node| {
item_ready_tx
.send(node)
.unwrap_or_else(|err| panic!("could not send message: {}", err))
});

// Clone Arcs for dispatcher thread
let loop_timeout = timeout.clone();
Expand All @@ -190,8 +192,11 @@ where

// Send the next available nodes to the channel.
next_nodes
.iter()
.for_each(|node_id| item_ready_tx.send(node_id.clone()).unwrap());
.into_iter()
.for_each(|node_id| {
item_ready_tx.send(node_id)
.unwrap_or_else(|err| panic!("could not send message: {}", err))
});

// If there are no more nodes, leave the loop
if deps.read().unwrap().is_empty() {
Expand Down Expand Up @@ -269,7 +274,7 @@ where
CB: ProducerCallback<Self::Item>,
{
callback.callback(DepGraphProducer {
counter: self.counter.clone(),
counter: self.counter,
item_ready_rx: self.item_ready_rx,
item_done_tx: self.item_done_tx,
})
Expand Down Expand Up @@ -327,8 +332,8 @@ where

fn into_iter(self) -> Self::IntoIter {
Self {
counter: self.counter.clone(),
item_ready_rx: self.item_ready_rx.clone(),
counter: self.counter,
item_ready_rx: self.item_ready_rx,
item_done_tx: self.item_done_tx,
}
}
Expand Down