Skip to content

Commit 0583f96

Browse files
Feat(dbt): Add support for selected resources context variable (#5177)
1 parent 4c42b45 commit 0583f96

File tree

13 files changed

+195
-1
lines changed

13 files changed

+195
-1
lines changed

sqlmesh/core/context.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1677,6 +1677,11 @@ def plan_builder(
16771677
end_override_per_model=max_interval_end_per_model,
16781678
console=self.console,
16791679
user_provided_flags=user_provided_flags,
1680+
selected_models={
1681+
dbt_name
1682+
for model in model_selector.expand_model_selections(select_models or "*")
1683+
if (dbt_name := snapshots[model].node.dbt_name)
1684+
},
16801685
explain=explain or False,
16811686
ignore_cron=ignore_cron or False,
16821687
)

sqlmesh/core/environment.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ def execute_environment_statements(
312312
start: t.Optional[TimeLike] = None,
313313
end: t.Optional[TimeLike] = None,
314314
execution_time: t.Optional[TimeLike] = None,
315+
selected_models: t.Optional[t.Set[str]] = None,
315316
) -> None:
316317
try:
317318
rendered_expressions = [
@@ -327,6 +328,7 @@ def execute_environment_statements(
327328
execution_time=execution_time,
328329
environment_naming_info=environment_naming_info,
329330
engine_adapter=adapter,
331+
selected_models=selected_models,
330332
)
331333
]
332334
except Exception as e:

sqlmesh/core/node.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ class _Node(PydanticModel):
199199
interval_unit_: t.Optional[IntervalUnit] = Field(alias="interval_unit", default=None)
200200
tags: t.List[str] = []
201201
stamp: t.Optional[str] = None
202+
dbt_name: t.Optional[str] = None # dbt node name
202203
_path: t.Optional[Path] = None
203204
_data_hash: t.Optional[str] = None
204205
_metadata_hash: t.Optional[str] = None

sqlmesh/core/plan/builder.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ def __init__(
129129
end_override_per_model: t.Optional[t.Dict[str, datetime]] = None,
130130
console: t.Optional[PlanBuilderConsole] = None,
131131
user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None,
132+
selected_models: t.Optional[t.Set[str]] = None,
132133
):
133134
self._context_diff = context_diff
134135
self._no_gaps = no_gaps
@@ -169,6 +170,7 @@ def __init__(
169170
self._console = console or get_console()
170171
self._choices: t.Dict[SnapshotId, SnapshotChangeCategory] = {}
171172
self._user_provided_flags = user_provided_flags
173+
self._selected_models = selected_models
172174
self._explain = explain
173175

174176
self._start = start
@@ -347,6 +349,7 @@ def build(self) -> Plan:
347349
ensure_finalized_snapshots=self._ensure_finalized_snapshots,
348350
ignore_cron=self._ignore_cron,
349351
user_provided_flags=self._user_provided_flags,
352+
selected_models=self._selected_models,
350353
)
351354
self._latest_plan = plan
352355
return plan

sqlmesh/core/plan/definition.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ class Plan(PydanticModel, frozen=True):
7070
execution_time_: t.Optional[TimeLike] = Field(default=None, alias="execution_time")
7171

7272
user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None
73+
selected_models: t.Optional[t.Set[str]] = None
74+
"""Models that have been selected for this plan (used for dbt selected_resources)"""
7375

7476
@cached_property
7577
def start(self) -> TimeLike:
@@ -282,6 +284,7 @@ def to_evaluatable(self) -> EvaluatablePlan:
282284
},
283285
environment_statements=self.context_diff.environment_statements,
284286
user_provided_flags=self.user_provided_flags,
287+
selected_models=self.selected_models,
285288
)
286289

287290
@cached_property
@@ -319,6 +322,7 @@ class EvaluatablePlan(PydanticModel):
319322
disabled_restatement_models: t.Set[str]
320323
environment_statements: t.Optional[t.List[EnvironmentStatements]] = None
321324
user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None
325+
selected_models: t.Optional[t.Set[str]] = None
322326

323327
def is_selected_for_backfill(self, model_fqn: str) -> bool:
324328
return self.models_to_backfill is None or model_fqn in self.models_to_backfill

sqlmesh/core/plan/evaluator.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ def visit_before_all_stage(self, stage: stages.BeforeAllStage, plan: Evaluatable
137137
start=plan.start,
138138
end=plan.end,
139139
execution_time=plan.execution_time,
140+
selected_models=plan.selected_models,
140141
)
141142

142143
def visit_after_all_stage(self, stage: stages.AfterAllStage, plan: EvaluatablePlan) -> None:
@@ -150,6 +151,7 @@ def visit_after_all_stage(self, stage: stages.AfterAllStage, plan: EvaluatablePl
150151
start=plan.start,
151152
end=plan.end,
152153
execution_time=plan.execution_time,
154+
selected_models=plan.selected_models,
153155
)
154156

155157
def visit_create_snapshot_records_stage(
@@ -257,6 +259,7 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla
257259
allow_destructive_snapshots=plan.allow_destructive_models,
258260
allow_additive_snapshots=plan.allow_additive_models,
259261
selected_snapshot_ids=stage.selected_snapshot_ids,
262+
selected_models=plan.selected_models,
260263
)
261264
if errors:
262265
raise PlanError("Plan application failed.")

sqlmesh/core/scheduler.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,7 @@ def run_merged_intervals(
416416
start: t.Optional[TimeLike] = None,
417417
end: t.Optional[TimeLike] = None,
418418
allow_destructive_snapshots: t.Optional[t.Set[str]] = None,
419+
selected_models: t.Optional[t.Set[str]] = None,
419420
allow_additive_snapshots: t.Optional[t.Set[str]] = None,
420421
selected_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
421422
run_environment_statements: bool = False,
@@ -472,6 +473,7 @@ def run_merged_intervals(
472473
start=start,
473474
end=end,
474475
execution_time=execution_time,
476+
selected_models=selected_models,
475477
)
476478

477479
# We only need to create physical tables if the snapshot is not representative or if it
@@ -533,6 +535,7 @@ def run_node(node: SchedulingUnit) -> None:
533535
allow_destructive_snapshots=allow_destructive_snapshots,
534536
allow_additive_snapshots=allow_additive_snapshots,
535537
target_table_exists=snapshot.snapshot_id not in snapshots_to_create,
538+
selected_models=selected_models,
536539
)
537540

538541
evaluation_duration_ms = now_timestamp() - execution_start_ts
@@ -602,6 +605,7 @@ def run_node(node: SchedulingUnit) -> None:
602605
start=start,
603606
end=end,
604607
execution_time=execution_time,
608+
selected_models=selected_models,
605609
)
606610

607611
self.state_sync.recycle()
@@ -808,6 +812,7 @@ def _run_or_audit(
808812
run_environment_statements=run_environment_statements,
809813
audit_only=audit_only,
810814
auto_restatement_triggers=auto_restatement_triggers,
815+
selected_models={s.node.dbt_name for s in merged_intervals if s.node.dbt_name},
811816
)
812817

813818
return CompletionStatus.FAILURE if errors else CompletionStatus.SUCCESS

sqlmesh/dbt/builtin.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,7 @@ def create_builtin_globals(
545545
"run_query": sql_execution.run_query,
546546
"statement": sql_execution.statement,
547547
"graph": adapter.graph,
548+
"selected_resources": list(jinja_globals.get("selected_models") or []),
548549
}
549550
)
550551

sqlmesh/dbt/model.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -689,6 +689,7 @@ def to_sqlmesh(
689689
extract_dependencies_from_query=False,
690690
allow_partials=allow_partials,
691691
virtual_environment_mode=virtual_environment_mode,
692+
dbt_name=self.node_name,
692693
**optional_kwargs,
693694
**model_kwargs,
694695
)

sqlmesh/dbt/seed.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ def to_sqlmesh(
9292
audit_definitions=audit_definitions,
9393
virtual_environment_mode=virtual_environment_mode,
9494
start=self.start or context.sqlmesh_config.model_defaults.start,
95+
dbt_name=self.node_name,
9596
**kwargs,
9697
)
9798

0 commit comments

Comments
 (0)