@@ -39,6 +39,7 @@ use graph::{
3939use graph:: { derive:: CheapClone , futures03:: future:: join_all} ;
4040
4141use crate :: {
42+ catalog:: Catalog ,
4243 deployment:: { OnSync , SubgraphHealth } ,
4344 primary:: { self , DeploymentId , Mirror as PrimaryMirror , Primary , Site } ,
4445 relational:: {
@@ -88,7 +89,7 @@ impl Shard {
8889 . all ( |c| c. is_ascii_lowercase ( ) || c. is_ascii_digit ( ) || c == '_' )
8990 {
9091 return Err ( StoreError :: InvalidIdentifier ( format ! (
91- "shard name `{}` is invalid: shard names must only contain lowercase alphanumeric characters or '_'" , name
92+ "shard name `{name }` is invalid: shard names must only contain lowercase alphanumeric characters or '_'"
9293 ) ) ) ;
9394 }
9495 Ok ( Shard ( name) )
@@ -351,21 +352,52 @@ impl SubgraphStore {
351352 // assignment that we used last time to avoid creating
352353 // the same deployment in another shard
353354 let ( shard, node_id) = self . place ( & name, & network_name, node_id) . await ?;
355+
354356 let mut conn = self . primary_conn ( ) . await ?;
355- let ( site, site_was_created) = conn
356- . allocate_site ( shard, schema. id ( ) , network_name, graft_base)
357- . await ?;
358- let node_id = conn. assigned_node ( & site) . await ?. unwrap_or ( node_id) ;
359- ( site, !site_was_created, node_id)
357+ conn. transaction ( |conn| {
358+ async {
359+ let ( site, site_was_created) = conn
360+ . allocate_site ( shard, schema. id ( ) , network_name, graft_base)
361+ . await ?;
362+ let node_id = conn. assigned_node ( & site) . await ?. unwrap_or ( node_id) ;
363+ let site = Arc :: new ( site) ;
364+
365+ if let Some ( graft_base) = graft_base {
366+ // Ensure that the graft base exists
367+ let base_layout = self . layout ( graft_base) . await ?;
368+ let entities_with_causality_region =
369+ deployment. manifest . entities_with_causality_region . clone ( ) ;
370+ let catalog = Catalog :: for_tests (
371+ site. cheap_clone ( ) ,
372+ entities_with_causality_region. into_iter ( ) . collect ( ) ,
373+ ) ?;
374+ let layout = Layout :: new ( site. cheap_clone ( ) , schema, catalog) ?;
375+
376+ let errors = layout. can_copy_from ( & base_layout) ;
377+ if !errors. is_empty ( ) {
378+ return Err ( StoreError :: Unknown ( anyhow ! (
379+ "The subgraph `{}` cannot be used as the graft base \
380+ for `{}` because the schemas are incompatible:\n - {}",
381+ & base_layout. catalog. site. namespace,
382+ & layout. catalog. site. namespace,
383+ errors. join( "\n - " )
384+ ) ) ) ;
385+ }
386+ }
387+
388+ Ok ( ( site, !site_was_created, node_id) )
389+ }
390+ . scope_boxed ( )
391+ } )
392+ . await ?
360393 } ;
361- let site = Arc :: new ( site) ;
362394
363395 // if the deployment already exists, we don't need to perform any copying
364396 // so we can set graft_base to None
365397 // if it doesn't exist, we need to copy the graft base to the new deployment
366- let graft_base_layout = if !exists {
367- let graft_base = match deployment . graft_base . as_ref ( ) {
368- Some ( base) => Some ( self . layout ( & base) . await ?) ,
398+ if !exists {
399+ let graft_base = match graft_base {
400+ Some ( base) => Some ( self . layout ( base) . await ?) ,
369401 None => None ,
370402 } ;
371403
@@ -375,9 +407,6 @@ impl SubgraphStore {
375407 . record_active_copy ( graft_base. site . as_ref ( ) , site. as_ref ( ) )
376408 . await ?;
377409 }
378- graft_base
379- } else {
380- None
381410 } ;
382411
383412 // Create the actual databases schema and metadata entries
@@ -386,7 +415,7 @@ impl SubgraphStore {
386415 . get ( & site. shard )
387416 . ok_or_else ( || StoreError :: UnknownShard ( site. shard . to_string ( ) ) ) ?;
388417
389- let index_def = if let Some ( graft) = & graft_base. clone ( ) {
418+ let index_def = if let Some ( graft) = graft_base {
390419 if let Some ( site) = self . sites . get ( graft) {
391420 let store = self
392421 . stores
@@ -406,7 +435,6 @@ impl SubgraphStore {
406435 schema,
407436 deployment,
408437 site. clone ( ) ,
409- graft_base_layout,
410438 replace,
411439 OnSync :: None ,
412440 index_def,
@@ -731,18 +759,15 @@ impl Inner {
731759
732760 if src. id == dst. id {
733761 return Err ( StoreError :: Unknown ( anyhow ! (
734- "can not copy deployment {} onto itself" ,
735- src_loc
762+ "can not copy deployment {src_loc} onto itself"
736763 ) ) ) ;
737764 }
738765 // The very last thing we do when we set up a copy here is assign it
739766 // to a node. Therefore, if `dst` is already assigned, this function
740767 // should not have been called.
741768 if let Some ( node) = self . mirror . assigned_node ( dst. as_ref ( ) ) . await ? {
742769 return Err ( StoreError :: Unknown ( anyhow ! (
743- "can not copy into deployment {} since it is already assigned to node `{}`" ,
744- dst_loc,
745- node
770+ "can not copy into deployment {dst_loc} since it is already assigned to node `{node}`"
746771 ) ) ) ;
747772 }
748773 let deployment = src_store. load_deployment ( src. clone ( ) ) . await ?;
@@ -758,8 +783,6 @@ impl Inner {
758783 history_blocks_override : None ,
759784 } ;
760785
761- let graft_base = self . layout ( & src. deployment ) . await ?;
762-
763786 self . primary_conn ( )
764787 . await ?
765788 . record_active_copy ( src. as_ref ( ) , dst. as_ref ( ) )
@@ -776,7 +799,6 @@ impl Inner {
776799 & src_layout. input_schema ,
777800 deployment,
778801 dst. clone ( ) ,
779- Some ( graft_base) ,
780802 false ,
781803 on_sync,
782804 Some ( index_def) ,
0 commit comments