Skip to content

Commit 7d74690

Browse files
authored
Feat: Add config flag to infer the state schema per dbt target (#5485)
1 parent 9f573b4 commit 7d74690

File tree

9 files changed

+148
-5
lines changed

9 files changed

+148
-5
lines changed

sqlmesh/cli/project_init.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,15 @@ def _gen_config(
116116
- invalidselectstarexpansion
117117
- noambiguousprojections
118118
""",
119-
ProjectTemplate.DBT: f"""# --- Virtual Data Environment Mode ---
119+
ProjectTemplate.DBT: f"""# --- DBT-specific options ---
120+
dbt:
121+
# This configuration ensures that each dbt target gets its own isolated state.
122+
# The inferred state schemas are named "sqlmesh_state_<profile name>_<target schema>", eg "sqlmesh_state_jaffle_shop_dev"
123+
# If this is undesirable, you may manually configure the gateway to use a specific state schema name
124+
# https://sqlmesh.readthedocs.io/en/stable/integrations/dbt/#selecting-a-different-state-connection
125+
infer_state_schema_name: True
126+
127+
# --- Virtual Data Environment Mode ---
120128
# Enable Virtual Data Environments (VDE) for *development* environments.
121129
# Note that the production environment in dbt projects is not virtual by default to maintain compatibility with existing tooling.
122130
# https://sqlmesh.readthedocs.io/en/stable/guides/configuration/#virtual-data-environment-modes

sqlmesh/core/config/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,6 @@
3636
from sqlmesh.core.config.naming import NameInferenceConfig as NameInferenceConfig
3737
from sqlmesh.core.config.linter import LinterConfig as LinterConfig
3838
from sqlmesh.core.config.plan import PlanConfig as PlanConfig
39-
from sqlmesh.core.config.root import Config as Config
39+
from sqlmesh.core.config.root import Config as Config, DbtConfig as DbtConfig
4040
from sqlmesh.core.config.run import RunConfig as RunConfig
4141
from sqlmesh.core.config.scheduler import BuiltInSchedulerConfig as BuiltInSchedulerConfig

sqlmesh/core/config/dbt.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from sqlmesh.core.config.base import BaseConfig
2+
3+
4+
class DbtConfig(BaseConfig):
5+
"""
6+
Represents dbt-specific options on the SQLMesh root config.
7+
8+
These options are only taken into account for dbt projects and are ignored on native projects
9+
"""
10+
11+
infer_state_schema_name: bool = False
12+
"""If set, indicates to the dbt loader that the state schema should be inferred based on the profile/target
13+
so that each target gets its own isolated state"""

sqlmesh/core/config/loader.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,12 +172,17 @@ def load_config_from_paths(
172172
if dbt_project_file:
173173
from sqlmesh.dbt.loader import sqlmesh_config
174174

175+
infer_state_schema_name = False
176+
if dbt := non_python_config.dbt:
177+
infer_state_schema_name = dbt.infer_state_schema_name
178+
175179
dbt_python_config = sqlmesh_config(
176180
project_root=dbt_project_file.parent,
177181
dbt_profile_name=kwargs.pop("profile", None),
178182
dbt_target_name=kwargs.pop("target", None),
179183
variables=variables,
180184
threads=kwargs.pop("threads", None),
185+
infer_state_schema_name=infer_state_schema_name,
181186
)
182187
if type(dbt_python_config) != config_type:
183188
dbt_python_config = convert_config_type(dbt_python_config, config_type)

sqlmesh/core/config/root.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from sqlmesh.core.config.linter import LinterConfig as LinterConfig
3737
from sqlmesh.core.config.plan import PlanConfig
3838
from sqlmesh.core.config.run import RunConfig
39+
from sqlmesh.core.config.dbt import DbtConfig
3940
from sqlmesh.core.config.scheduler import (
4041
BuiltInSchedulerConfig,
4142
SchedulerConfig,
@@ -173,6 +174,7 @@ class Config(BaseConfig):
173174
linter: LinterConfig = LinterConfig()
174175
janitor: JanitorConfig = JanitorConfig()
175176
cache_dir: t.Optional[str] = None
177+
dbt: t.Optional[DbtConfig] = None
176178

177179
_FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = {
178180
"gateways": UpdateStrategy.NESTED_UPDATE,
@@ -191,6 +193,7 @@ class Config(BaseConfig):
191193
"before_all": UpdateStrategy.EXTEND,
192194
"after_all": UpdateStrategy.EXTEND,
193195
"linter": UpdateStrategy.NESTED_UPDATE,
196+
"dbt": UpdateStrategy.NESTED_UPDATE,
194197
}
195198

196199
_connection_config_validator = connection_config_validator

sqlmesh/dbt/loader.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
ConnectionConfig,
1212
GatewayConfig,
1313
ModelDefaultsConfig,
14+
DbtConfig as RootDbtConfig,
1415
)
1516
from sqlmesh.core.environment import EnvironmentStatements
1617
from sqlmesh.core.loader import CacheBase, LoadedProject, Loader
@@ -51,6 +52,7 @@ def sqlmesh_config(
5152
variables: t.Optional[t.Dict[str, t.Any]] = None,
5253
threads: t.Optional[int] = None,
5354
register_comments: t.Optional[bool] = None,
55+
infer_state_schema_name: bool = False,
5456
**kwargs: t.Any,
5557
) -> Config:
5658
project_root = project_root or Path()
@@ -72,16 +74,40 @@ def sqlmesh_config(
7274
# the to_sqlmesh() function on TargetConfig maps self.threads -> concurrent_tasks
7375
profile.target.threads = threads
7476

77+
gateway_kwargs = {}
78+
if infer_state_schema_name:
79+
profile_name = context.profile_name
80+
81+
# Note: we deliberately isolate state based on the target *schema* and not the target name.
82+
# It is assumed that the project will define a target, eg 'dev', and then in each users own ~/.dbt/profiles.yml the schema
83+
# for the 'dev' target is overriden to something user-specific, rather than making the target name itself user-specific.
84+
# This means that the schema name is the indicator of isolated state, not the target name which may be re-used across multiple schemas.
85+
target_schema = profile.target.schema_
86+
87+
# dbt-core doesnt allow schema to be undefined, but it does allow an empty string, and then just
88+
# fails at runtime when `CREATE SCHEMA ""` doesnt work
89+
if not target_schema:
90+
raise ConfigError(
91+
f"Target '{profile.target_name}' does not specify a schema.\n"
92+
"A schema is required in order to infer where to store SQLMesh state"
93+
)
94+
95+
inferred_state_schema_name = f"sqlmesh_state_{profile_name}_{target_schema}"
96+
logger.info("Inferring state schema: %s", inferred_state_schema_name)
97+
gateway_kwargs["state_schema"] = inferred_state_schema_name
98+
7599
return Config(
76100
loader=loader,
77101
model_defaults=model_defaults,
78102
variables=variables or {},
103+
dbt=RootDbtConfig(infer_state_schema_name=infer_state_schema_name),
79104
**{
80105
"default_gateway": profile.target_name if "gateways" not in kwargs else "",
81106
"gateways": {
82107
profile.target_name: GatewayConfig(
83108
connection=profile.target.to_sqlmesh(**target_to_sqlmesh_args),
84109
state_connection=state_connection,
110+
**gateway_kwargs,
85111
)
86112
}, # type: ignore
87113
**kwargs,

tests/dbt/test_config.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from sqlmesh.core.dialect import jinja_query
1616
from sqlmesh.core.model import SqlModel
1717
from sqlmesh.core.model.kind import OnDestructiveChange, OnAdditiveChange
18+
from sqlmesh.core.state_sync import CachingStateSync, EngineAdapterStateSync
1819
from sqlmesh.dbt.builtin import Api
1920
from sqlmesh.dbt.column import ColumnConfig
2021
from sqlmesh.dbt.common import Dependencies
@@ -46,7 +47,8 @@
4647
)
4748
from sqlmesh.dbt.test import TestConfig
4849
from sqlmesh.utils.errors import ConfigError
49-
from sqlmesh.utils.yaml import load as yaml_load
50+
from sqlmesh.utils.yaml import load as yaml_load, dump as yaml_dump
51+
from tests.dbt.conftest import EmptyProjectCreator
5052

5153
pytestmark = pytest.mark.dbt
5254

@@ -1211,3 +1213,37 @@ def test_empty_vars_config(tmp_path):
12111213
# Verify the variables are empty (not causing any issues)
12121214
assert project.packages["test_empty_vars"].variables == {}
12131215
assert project.context.variables == {}
1216+
1217+
1218+
def test_infer_state_schema_name(create_empty_project: EmptyProjectCreator):
1219+
project_dir, _ = create_empty_project("test_foo", "dev")
1220+
1221+
# infer_state_schema_name defaults to False if omitted
1222+
config = sqlmesh_config(project_root=project_dir)
1223+
assert config.dbt
1224+
assert not config.dbt.infer_state_schema_name
1225+
assert config.get_state_schema() == "sqlmesh"
1226+
1227+
# create_empty_project() uses the default dbt template for sqlmesh yaml config which
1228+
# sets infer_state_schema_name=True
1229+
ctx = Context(paths=[project_dir])
1230+
assert ctx.config.dbt
1231+
assert ctx.config.dbt.infer_state_schema_name
1232+
assert ctx.config.get_state_schema() == "sqlmesh_state_test_foo_main"
1233+
assert isinstance(ctx.state_sync, CachingStateSync)
1234+
assert isinstance(ctx.state_sync.state_sync, EngineAdapterStateSync)
1235+
assert ctx.state_sync.state_sync.schema == "sqlmesh_state_test_foo_main"
1236+
1237+
# If the user delberately overrides state_schema then we should respect this choice
1238+
config_file = project_dir / "sqlmesh.yaml"
1239+
config_yaml = yaml_load(config_file)
1240+
config_yaml["gateways"] = {"dev": {"state_schema": "state_override"}}
1241+
config_file.write_text(yaml_dump(config_yaml))
1242+
1243+
ctx = Context(paths=[project_dir])
1244+
assert ctx.config.dbt
1245+
assert ctx.config.dbt.infer_state_schema_name
1246+
assert ctx.config.get_state_schema() == "state_override"
1247+
assert isinstance(ctx.state_sync, CachingStateSync)
1248+
assert isinstance(ctx.state_sync.state_sync, EngineAdapterStateSync)
1249+
assert ctx.state_sync.state_sync.schema == "state_override"

tests/dbt/test_integration.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
from sqlmesh.core.config.connection import DuckDBConnectionConfig
2020
from sqlmesh.core.engine_adapter import DuckDBEngineAdapter
2121
from sqlmesh.utils.pandas import columns_to_types_from_df
22-
from sqlmesh.utils.yaml import YAML
22+
from sqlmesh.utils.yaml import YAML, load as yaml_load, dump as yaml_dump
23+
from sqlmesh_dbt.operations import init_project_if_required
2324
from tests.utils.pandas import compare_dataframes, create_df
2425

2526
# Some developers had issues with this test freezing locally so we mark it as cicdonly
@@ -604,3 +605,50 @@ def test_dbt_node_info(jaffle_shop_duckdb_context: Context):
604605
relationship_audit.node.dbt_node_info.name
605606
== "relationships_orders_customer_id__customer_id__ref_customers_"
606607
)
608+
609+
610+
def test_state_schema_isolation_per_target(jaffle_shop_duckdb: Path):
611+
profiles_file = jaffle_shop_duckdb / "profiles.yml"
612+
613+
profiles_yml = yaml_load(profiles_file)
614+
615+
# make prod / dev config identical with the exception of a different default schema to simulate using the same warehouse
616+
profiles_yml["jaffle_shop"]["outputs"]["prod"] = {
617+
**profiles_yml["jaffle_shop"]["outputs"]["dev"]
618+
}
619+
profiles_yml["jaffle_shop"]["outputs"]["prod"]["schema"] = "prod_schema"
620+
profiles_yml["jaffle_shop"]["outputs"]["dev"]["schema"] = "dev_schema"
621+
622+
profiles_file.write_text(yaml_dump(profiles_yml))
623+
624+
init_project_if_required(jaffle_shop_duckdb)
625+
626+
# start off with the prod target
627+
prod_ctx = Context(paths=[jaffle_shop_duckdb], config_loader_kwargs={"target": "prod"})
628+
assert prod_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_prod_schema"
629+
assert all("prod_schema" in fqn for fqn in prod_ctx.models)
630+
assert prod_ctx.plan(auto_apply=True).has_changes
631+
assert not prod_ctx.plan(auto_apply=True).has_changes
632+
633+
# dev target should have changes - new state separate from prod
634+
dev_ctx = Context(paths=[jaffle_shop_duckdb], config_loader_kwargs={"target": "dev"})
635+
assert dev_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_dev_schema"
636+
assert all("dev_schema" in fqn for fqn in dev_ctx.models)
637+
assert dev_ctx.plan(auto_apply=True).has_changes
638+
assert not dev_ctx.plan(auto_apply=True).has_changes
639+
640+
# no explicitly specified target should use dev because that's what's set for the default in the profiles.yml
641+
assert profiles_yml["jaffle_shop"]["target"] == "dev"
642+
default_ctx = Context(paths=[jaffle_shop_duckdb])
643+
assert default_ctx.config.get_state_schema() == "sqlmesh_state_jaffle_shop_dev_schema"
644+
assert all("dev_schema" in fqn for fqn in default_ctx.models)
645+
assert not default_ctx.plan(auto_apply=True).has_changes
646+
647+
# an explicit state schema override set in `sqlmesh.yaml` should use that
648+
sqlmesh_yaml_file = jaffle_shop_duckdb / "sqlmesh.yaml"
649+
sqlmesh_yaml = yaml_load(sqlmesh_yaml_file)
650+
sqlmesh_yaml["gateways"] = {"dev": {"state_schema": "sqlmesh_dev_state_override"}}
651+
sqlmesh_yaml_file.write_text(yaml_dump(sqlmesh_yaml))
652+
default_ctx = Context(paths=[jaffle_shop_duckdb])
653+
assert default_ctx.config.get_state_schema() == "sqlmesh_dev_state_override"
654+
assert all("dev_schema" in fqn for fqn in default_ctx.models)

tests/fixtures/dbt/empty_project/profiles.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@ empty_project:
33
target: __DEFAULT_TARGET__
44

55
outputs:
6-
duckdb:
6+
__DEFAULT_TARGET__:
77
type: duckdb
8+
# database is required for dbt < 1.5 where our adapter deliberately doesnt infer the database from the path and
9+
# defaults it to "main", which raises a "project catalog doesnt match context catalog" error
10+
# ref: https://github.com/TobikoData/sqlmesh/pull/1109
11+
database: empty_project
812
path: 'empty_project.duckdb'
913
threads: 4

0 commit comments

Comments
 (0)