Skip to content

Commit 94b1357

Browse files
authored
Fix: Migrate schemas when deploying a metadata snapshot that was based on unpromoted forward-only snapshot (#5102)
1 parent 3f6d3e7 commit 94b1357

File tree

6 files changed

+54
-105
lines changed

6 files changed

+54
-105
lines changed

sqlmesh/core/plan/stages.py

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from sqlmesh.core.snapshot.definition import (
99
DeployabilityIndex,
1010
Snapshot,
11-
SnapshotChangeCategory,
1211
SnapshotTableInfo,
1312
SnapshotId,
1413
Interval,
@@ -251,18 +250,7 @@ def build(self, plan: EvaluatablePlan) -> t.List[PlanStage]:
251250
deployability_index = DeployabilityIndex.all_deployable()
252251

253252
snapshots_with_schema_migration = [
254-
s
255-
for s in snapshots.values()
256-
if s.is_paused
257-
and s.is_model
258-
and not s.is_symbolic
259-
and (
260-
not deployability_index_for_creation.is_representative(s)
261-
or (
262-
s.is_view
263-
and s.change_category == SnapshotChangeCategory.INDIRECT_NON_BREAKING
264-
)
265-
)
253+
s for s in snapshots.values() if s.requires_schema_migration_in_prod
266254
]
267255

268256
snapshots_to_intervals = self._missing_intervals(

sqlmesh/core/snapshot/definition.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1352,6 +1352,21 @@ def expiration_ts(self) -> int:
13521352
check_categorical_relative_expression=False,
13531353
)
13541354

1355+
@property
1356+
def requires_schema_migration_in_prod(self) -> bool:
1357+
"""Returns whether or not this snapshot requires a schema migration when deployed to production."""
1358+
return (
1359+
self.is_paused
1360+
and self.is_model
1361+
and not self.is_symbolic
1362+
and (
1363+
(self.previous_version and self.previous_version.version == self.version)
1364+
or self.model.forward_only
1365+
or bool(self.model.physical_version)
1366+
or self.is_view
1367+
)
1368+
)
1369+
13551370
@property
13561371
def ttl_ms(self) -> int:
13571372
return self.expiration_ts - self.updated_ts

sqlmesh/core/snapshot/evaluator.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -918,11 +918,7 @@ def _migrate_snapshot(
918918
adapter: EngineAdapter,
919919
deployability_index: DeployabilityIndex,
920920
) -> None:
921-
if (
922-
not snapshot.is_paused
923-
or not snapshot.is_model
924-
or (deployability_index.is_representative(snapshot) and not snapshot.is_view)
925-
):
921+
if not snapshot.requires_schema_migration_in_prod:
926922
return
927923

928924
deployability_index = DeployabilityIndex.all_deployable()

tests/core/test_integration.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1381,6 +1381,35 @@ def test_indirect_non_breaking_change_after_forward_only_in_dev(init_and_plan_co
13811381
)
13821382

13831383

1384+
@time_machine.travel("2023-01-08 15:00:00 UTC", tick=True)
1385+
def test_metadata_change_after_forward_only_results_in_migration(init_and_plan_context: t.Callable):
1386+
context, plan = init_and_plan_context("examples/sushi")
1387+
context.apply(plan)
1388+
1389+
# Make a forward-only change
1390+
model = context.get_model("sushi.waiter_revenue_by_day")
1391+
model = model.copy(update={"kind": model.kind.copy(update={"forward_only": True})})
1392+
model = add_projection_to_model(t.cast(SqlModel, model))
1393+
context.upsert_model(model)
1394+
plan = context.plan("dev", skip_tests=True, auto_apply=True, no_prompts=True)
1395+
assert len(plan.new_snapshots) == 2
1396+
assert all(s.change_category == SnapshotChangeCategory.FORWARD_ONLY for s in plan.new_snapshots)
1397+
1398+
# Follow-up with a metadata change in the same environment
1399+
model = model.copy(update={"owner": "new_owner"})
1400+
context.upsert_model(model)
1401+
plan = context.plan("dev", skip_tests=True, auto_apply=True, no_prompts=True)
1402+
assert len(plan.new_snapshots) == 2
1403+
assert all(s.change_category == SnapshotChangeCategory.METADATA for s in plan.new_snapshots)
1404+
1405+
# Deploy the latest change to prod
1406+
context.plan("prod", skip_tests=True, auto_apply=True, no_prompts=True)
1407+
1408+
# Check that the new column was added in prod
1409+
columns = context.engine_adapter.columns("sushi.waiter_revenue_by_day")
1410+
assert "one" in columns
1411+
1412+
13841413
@time_machine.travel("2023-01-08 15:00:00 UTC")
13851414
def test_forward_only_precedence_over_indirect_non_breaking(init_and_plan_context: t.Callable):
13861415
context, plan = init_and_plan_context("examples/sushi")

tests/core/test_plan_stages.py

Lines changed: 0 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,93 +1211,6 @@ def test_build_plan_stages_environment_suffix_target_changed(
12111211
)
12121212

12131213

1214-
def test_build_plan_stages_indirect_non_breaking_no_migration(
1215-
snapshot_a: Snapshot, snapshot_b: Snapshot, make_snapshot, mocker: MockerFixture
1216-
) -> None:
1217-
# Categorize snapshot_a as forward-only
1218-
new_snapshot_a = make_snapshot(
1219-
snapshot_a.model.copy(update={"stamp": "new_version"}),
1220-
)
1221-
new_snapshot_a.previous_versions = snapshot_a.all_versions
1222-
new_snapshot_a.categorize_as(SnapshotChangeCategory.NON_BREAKING)
1223-
1224-
new_snapshot_b = make_snapshot(
1225-
snapshot_b.model.copy(),
1226-
nodes={'"a"': new_snapshot_a.model},
1227-
)
1228-
new_snapshot_b.previous_versions = snapshot_b.all_versions
1229-
new_snapshot_b.change_category = SnapshotChangeCategory.INDIRECT_NON_BREAKING
1230-
new_snapshot_b.version = new_snapshot_b.previous_version.data_version.version
1231-
1232-
state_reader = mocker.Mock(spec=StateReader)
1233-
state_reader.get_snapshots.return_value = {}
1234-
existing_environment = Environment(
1235-
name="prod",
1236-
snapshots=[snapshot_a.table_info, snapshot_b.table_info],
1237-
start_at="2023-01-01",
1238-
end_at="2023-01-02",
1239-
plan_id="previous_plan",
1240-
previous_plan_id=None,
1241-
promoted_snapshot_ids=[snapshot_a.snapshot_id, snapshot_b.snapshot_id],
1242-
finalized_ts=to_timestamp("2023-01-02"),
1243-
)
1244-
state_reader.get_environment.return_value = existing_environment
1245-
1246-
# Create environment
1247-
environment = Environment(
1248-
name="prod",
1249-
snapshots=[new_snapshot_a.table_info, new_snapshot_b.table_info],
1250-
start_at="2023-01-01",
1251-
end_at="2023-01-02",
1252-
plan_id="test_plan",
1253-
previous_plan_id="previous_plan",
1254-
promoted_snapshot_ids=[new_snapshot_a.snapshot_id, new_snapshot_b.snapshot_id],
1255-
)
1256-
1257-
# Create evaluatable plan
1258-
plan = EvaluatablePlan(
1259-
start="2023-01-01",
1260-
end="2023-01-02",
1261-
new_snapshots=[new_snapshot_a, new_snapshot_b],
1262-
environment=environment,
1263-
no_gaps=False,
1264-
skip_backfill=False,
1265-
empty_backfill=False,
1266-
restatements={},
1267-
is_dev=False,
1268-
allow_destructive_models=set(),
1269-
forward_only=False,
1270-
end_bounded=False,
1271-
ensure_finalized_snapshots=False,
1272-
directly_modified_snapshots=[new_snapshot_a.snapshot_id],
1273-
indirectly_modified_snapshots={
1274-
new_snapshot_a.name: [new_snapshot_b.snapshot_id],
1275-
},
1276-
metadata_updated_snapshots=[],
1277-
removed_snapshots=[],
1278-
requires_backfill=True,
1279-
models_to_backfill=None,
1280-
execution_time="2023-01-02",
1281-
disabled_restatement_models=set(),
1282-
environment_statements=None,
1283-
user_provided_flags=None,
1284-
)
1285-
1286-
# Build plan stages
1287-
stages = build_plan_stages(plan, state_reader, None)
1288-
1289-
# Verify stages
1290-
assert len(stages) == 7
1291-
1292-
assert isinstance(stages[0], CreateSnapshotRecordsStage)
1293-
assert isinstance(stages[1], PhysicalLayerUpdateStage)
1294-
assert isinstance(stages[2], BackfillStage)
1295-
assert isinstance(stages[3], EnvironmentRecordUpdateStage)
1296-
assert isinstance(stages[4], UnpauseStage)
1297-
assert isinstance(stages[5], VirtualLayerUpdateStage)
1298-
assert isinstance(stages[6], FinalizeEnvironmentStage)
1299-
1300-
13011214
def test_build_plan_stages_indirect_non_breaking_view_migration(
13021215
snapshot_a: Snapshot, snapshot_c: Snapshot, make_snapshot, mocker: MockerFixture
13031216
) -> None:

tests/core/test_snapshot_evaluator.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,6 +1180,7 @@ def columns(table_name):
11801180
)
11811181
snapshot = make_snapshot(model, version="1")
11821182
snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY
1183+
snapshot.previous_versions = snapshot.all_versions
11831184

11841185
evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable())
11851186

@@ -1217,6 +1218,7 @@ def test_migrate_missing_table(mocker: MockerFixture, make_snapshot):
12171218
)
12181219
snapshot = make_snapshot(model, version="1")
12191220
snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY
1221+
snapshot.previous_versions = snapshot.all_versions
12201222

12211223
evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable())
12221224

@@ -1714,6 +1716,7 @@ def columns(table_name):
17141716
)
17151717
snapshot = make_snapshot(model, version="1")
17161718
snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY
1719+
snapshot.previous_versions = snapshot.all_versions
17171720

17181721
with pytest.raises(NodeExecutionFailedError) as ex:
17191722
evaluator.migrate([snapshot], {}, deployability_index=DeployabilityIndex.none_deployable())
@@ -1735,6 +1738,7 @@ def columns(table_name):
17351738
)
17361739
snapshot = make_snapshot(model, version="1")
17371740
snapshot.change_category = SnapshotChangeCategory.FORWARD_ONLY
1741+
snapshot.previous_versions = snapshot.all_versions
17381742

17391743
logger = logging.getLogger("sqlmesh.core.snapshot.evaluator")
17401744
with patch.object(logger, "warning") as mock_logger:
@@ -3654,6 +3658,7 @@ def test_migrate_snapshot(snapshot: Snapshot, mocker: MockerFixture, adapter_moc
36543658

36553659
new_snapshot = make_snapshot(updated_model)
36563660
new_snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
3661+
new_snapshot.previous_versions = snapshot.all_versions
36573662
new_snapshot.version = snapshot.version
36583663

36593664
assert new_snapshot.table_name() == snapshot.table_name()
@@ -3724,6 +3729,7 @@ def test_migrate_managed(adapter_mock, make_snapshot, mocker: MockerFixture):
37243729
)
37253730
snapshot: Snapshot = make_snapshot(model)
37263731
snapshot.categorize_as(SnapshotChangeCategory.FORWARD_ONLY)
3732+
snapshot.previous_versions = snapshot.all_versions
37273733

37283734
# no schema changes - no-op
37293735
adapter_mock.get_alter_expressions.return_value = []
@@ -3925,6 +3931,7 @@ def columns(table_name):
39253931
)
39263932
snapshot_1 = make_snapshot(model, version="1")
39273933
snapshot_1.change_category = SnapshotChangeCategory.FORWARD_ONLY
3934+
snapshot_1.previous_versions = snapshot_1.all_versions
39283935
model_2 = SqlModel(
39293936
name="test_schema.test_model_2",
39303937
kind=IncrementalByTimeRangeKind(
@@ -3935,6 +3942,7 @@ def columns(table_name):
39353942
)
39363943
snapshot_2 = make_snapshot(model_2, version="1")
39373944
snapshot_2.change_category = SnapshotChangeCategory.FORWARD_ONLY
3945+
snapshot_2.previous_versions = snapshot_2.all_versions
39383946
evaluator.migrate(
39393947
[snapshot_1, snapshot_2], {}, deployability_index=DeployabilityIndex.none_deployable()
39403948
)

0 commit comments

Comments
 (0)