Skip to content

Commit 378d8be

Browse files
committed
fix: prevent subscription cycles when adding subscriptions
1 parent 51ae8f2 commit 378d8be

File tree

7 files changed

+131
-9
lines changed

7 files changed

+131
-9
lines changed

lib/dal/src/attribute/prototype.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,13 @@ impl AttributePrototype {
571571
ctx: &DalContext,
572572
prototype_id: AttributePrototypeId,
573573
) -> AttributePrototypeResult<()> {
574+
// Also remove all apas that use this prototype, to prevent hanging APAs
575+
// during subscription creation
576+
let apa_ids = AttributePrototypeArgument::list_ids_for_prototype(ctx, prototype_id).await?;
577+
for apa_id in apa_ids {
578+
AttributePrototypeArgument::remove(ctx, apa_id).await?;
579+
}
580+
574581
ctx.workspace_snapshot()?
575582
.remove_node_by_id(prototype_id)
576583
.await?;

lib/dal/src/attribute/value.rs

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ use crate::{
7373
TransactionsError,
7474
attribute::prototype::AttributePrototypeError,
7575
change_set::ChangeSetError,
76+
component::subscription_graph::SubscriptionGraph,
7677
func::{
7778
FuncExecutionPk,
7879
argument::{
@@ -289,6 +290,11 @@ pub enum AttributeValueError {
289290
subscription: String,
290291
subscription_prop_kind: PropKind,
291292
},
293+
#[error("Subscription would create a cycle between components")]
294+
SubscriptionWouldCycle {
295+
subscriber_av_id: AttributeValueId,
296+
subscription: String,
297+
},
292298
#[error("transactions error: {0}")]
293299
Transactions(#[from] TransactionsError),
294300
#[error("try lock error: {0}")]
@@ -1929,9 +1935,13 @@ impl AttributeValue {
19291935

19301936
/// Set the source of this attribute value to one or more subscriptions.
19311937
///
1932-
/// This overwrites or overrides any existing value; if your intent is to append
1933-
/// subscriptions, you should first call AttributeValue::subscriptions() and append to that
1934-
/// list.
1938+
/// This overwrites or overrides any existing value; if your intent is to
1939+
/// append subscriptions, you should first call
1940+
/// AttributeValue::subscriptions() and append to that list.
1941+
///
1942+
/// Note that subscriptions can create cycles in the graph. Use the
1943+
/// `SubscriptionGraph` to construct a dep graph of all components via their
1944+
/// subscriptions to check if a subscription creates a cycle.
19351945
pub async fn set_to_subscription(
19361946
ctx: &DalContext,
19371947
subscriber_av_id: AttributeValueId,
@@ -1942,20 +1952,30 @@ impl AttributeValue {
19421952
// Make sure the subscribed-to path is valid (i.e. it doesn't have to resolve
19431953
// to a value *right now*, but it must be a valid path to the schema as it
19441954
// exists--correct prop names, numeric indices for arrays, etc.)
1955+
let subscription_title = subscription.fmt_title(ctx).await;
19451956
let subscription_prop_id = subscription.validate(ctx).await?;
19461957
let subscription_prop_kind = Prop::kind(ctx, subscription_prop_id).await?;
19471958
let subscriber_prop_kind = AttributeValue::prop_kind(ctx, subscriber_av_id).await?;
19481959
if !subscription_prop_kind.js_compatible_with(subscriber_prop_kind) {
19491960
return Err(AttributeValueError::SubscriptionTypeMismatch {
19501961
subscriber_av_id,
19511962
subscriber_prop_kind,
1952-
subscription: subscription.fmt_title(ctx).await,
1963+
subscription: subscription_title.clone(),
19531964
subscription_prop_kind,
19541965
});
19551966
}
19561967

19571968
Self::set_to_subscription_unchecked(ctx, subscriber_av_id, subscription, func_id, reason)
1958-
.await
1969+
.await?;
1970+
1971+
if SubscriptionGraph::new(ctx).await?.is_cyclic() {
1972+
Err(AttributeValueError::SubscriptionWouldCycle {
1973+
subscriber_av_id,
1974+
subscription: subscription_title,
1975+
})
1976+
} else {
1977+
Ok(())
1978+
}
19591979
}
19601980

19611981
/// Set the source of this attribute value to one or more subscriptions.
@@ -1984,6 +2004,7 @@ impl AttributeValue {
19842004
// Add the subscriptions as the argument
19852005
let arg_id = FuncArgument::single_arg_for_func(ctx, func_id).await?;
19862006
let apa = AttributePrototypeArgument::new(ctx, prototype_id, arg_id, subscription).await?;
2007+
dbg!("created apa for subscription", apa.id(), prototype_id);
19872008
AttributePrototypeArgument::add_reason(ctx, apa.id(), reason).await?;
19882009

19892010
// DVU all the way!

lib/dal/src/component.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ pub mod properties;
188188
pub mod qualification;
189189
pub mod resource;
190190
pub mod socket;
191+
pub mod subscription_graph;
191192
pub mod suggestion;
192193

193194
#[remain::sorted]
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
//! Component subscription graph.
2+
//!
3+
//! Creates a dependency graph by walking value subscription edges, so that we
4+
//! can detect if the subscriptions would create a cycle
5+
//!
6+
use si_id::ComponentId;
7+
8+
use super::{
9+
Component,
10+
ComponentResult,
11+
};
12+
use crate::{
13+
AttributePrototype,
14+
AttributeValue,
15+
DalContext,
16+
attribute::prototype::argument::AttributePrototypeArgument,
17+
dependency_graph::DependencyGraph,
18+
};
19+
20+
pub struct SubscriptionGraph {
21+
inner: DependencyGraph<ComponentId>,
22+
}
23+
24+
impl SubscriptionGraph {
25+
pub async fn new(ctx: &DalContext) -> ComponentResult<Self> {
26+
let components = Component::list_ids(ctx).await?;
27+
let mut inner = DependencyGraph::new();
28+
29+
for component_id in components {
30+
for (_, apa_id) in Component::subscribers(ctx, component_id).await? {
31+
let prototype_id = AttributePrototypeArgument::prototype_id(ctx, apa_id).await?;
32+
for av_id in AttributePrototype::attribute_value_ids(ctx, prototype_id).await? {
33+
let subscriber_component_id = AttributeValue::component_id(ctx, av_id).await?;
34+
// Don't add self-subscriptions
35+
if subscriber_component_id == component_id {
36+
continue;
37+
}
38+
inner.id_depends_on(subscriber_component_id, component_id);
39+
}
40+
}
41+
}
42+
43+
Ok(Self { inner })
44+
}
45+
46+
pub fn is_cyclic(&self) -> bool {
47+
self.inner.is_cyclic()
48+
}
49+
}

lib/dal/src/dependency_graph.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,4 +140,8 @@ impl<T: Copy + std::cmp::Ord + std::cmp::Eq + std::cmp::PartialEq> DependencyGra
140140
pub fn all_ids(&self) -> impl Iterator<Item = T> {
141141
self.graph.node_weights().copied()
142142
}
143+
144+
pub fn is_cyclic(&self) -> bool {
145+
petgraph::algo::is_cyclic_directed(&self.graph)
146+
}
143147
}

lib/dal/tests/integration_test/attribute_value/subscription.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,47 @@ async fn remove_subscribed_component(ctx: &mut DalContext) -> Result<()> {
733733
Ok(())
734734
}
735735

736+
#[test]
737+
async fn subscription_cycles(ctx: &mut DalContext) -> Result<()> {
738+
create_testy_variant(ctx).await?;
739+
component::create(ctx, "testy", "subscriber").await?;
740+
component::create(ctx, "testy", "source").await?;
741+
component::create(ctx, "testy", "source_2").await?;
742+
743+
value::subscribe(
744+
ctx,
745+
("subscriber", "/domain/Value"),
746+
("source", "/domain/Value"),
747+
)
748+
.await?;
749+
750+
value::subscribe(
751+
ctx,
752+
("source", "/domain/Value"),
753+
("source_2", "/domain/Value"),
754+
)
755+
.await?;
756+
757+
// Self subscription should be fine
758+
value::subscribe(
759+
ctx,
760+
("source_2", "/domain/Value3"),
761+
("source_2", "/domain/Value4"),
762+
)
763+
.await?;
764+
765+
let bad = value::subscribe(
766+
ctx,
767+
("source_2", "/domain/Value2"),
768+
("subscriber", "/domain/Value2"),
769+
)
770+
.await;
771+
772+
assert!(bad.is_err(), "cyclic subscription should fail");
773+
774+
Ok(())
775+
}
776+
736777
async fn create_testy_variant(ctx: &DalContext) -> Result<()> {
737778
// Make a variant with a Value prop
738779
variant::create(
@@ -744,6 +785,8 @@ async fn create_testy_variant(ctx: &DalContext) -> Result<()> {
744785
props: [
745786
{ name: "Value", kind: "string" },
746787
{ name: "Value2", kind: "string" },
788+
{ name: "Value3", kind: "string" },
789+
{ name: "Value4", kind: "string" },
747790
{ name: "Values", kind: "array",
748791
entry: { name: "ValuesItem", kind: "string" },
749792
},

lib/dal/tests/integration_test/attribute_value/subscription/default_subscriptions.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,7 @@ async fn test_set_as_default_subscription_source(ctx: &DalContext) -> Result<()>
126126
assert_eq!(expected_defaults, possible_defaults);
127127

128128
for default_sub in possible_defaults {
129-
default_sub
130-
.subscribe(ctx)
131-
.await
132-
.unwrap_or_else(|_| panic!("should be able to subscribe: {default_sub:?}"));
129+
default_sub.subscribe(ctx).await?;
133130
}
134131

135132
let conflicting_av_id = Component::attribute_values_for_prop_by_id(

0 commit comments

Comments
 (0)