Skip to content

Commit 15d2cd0

Browse files
authored
Fix: Incorrect treatment of the model's table as missing when processing batches with index > 0 (#5560)
1 parent a0668dc commit 15d2cd0

File tree

2 files changed

+38
-1
lines changed

2 files changed

+38
-1
lines changed

sqlmesh/core/scheduler.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,10 @@ def run_node(node: SchedulingUnit) -> None:
547547
execution_time=execution_time,
548548
)
549549
else:
550+
# If batch_index > 0, then the target table must exist since the first batch would have created it
551+
target_table_exists = (
552+
snapshot.snapshot_id not in snapshots_to_create or node.batch_index > 0
553+
)
550554
audit_results = self.evaluate(
551555
snapshot=snapshot,
552556
environment_naming_info=environment_naming_info,
@@ -557,7 +561,7 @@ def run_node(node: SchedulingUnit) -> None:
557561
batch_index=node.batch_index,
558562
allow_destructive_snapshots=allow_destructive_snapshots,
559563
allow_additive_snapshots=allow_additive_snapshots,
560-
target_table_exists=snapshot.snapshot_id not in snapshots_to_create,
564+
target_table_exists=target_table_exists,
561565
selected_models=selected_models,
562566
)
563567

tests/core/integration/test_change_scenarios.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1482,3 +1482,36 @@ def test_annotated_self_referential_model(init_and_plan_context: t.Callable):
14821482

14831483
df = context.fetchdf("SELECT one FROM memory.sushi.test_self_ref")
14841484
assert len(df) == 0
1485+
1486+
1487+
@time_machine.travel("2023-01-08 00:00:00 UTC")
1488+
def test_creating_stage_for_first_batch_only(init_and_plan_context: t.Callable):
1489+
context, _ = init_and_plan_context("examples/sushi")
1490+
1491+
expressions = d.parse(
1492+
"""
1493+
MODEL (
1494+
name memory.sushi.test_batch_size,
1495+
kind INCREMENTAL_BY_UNIQUE_KEY (
1496+
unique_key one,
1497+
batch_size 1,
1498+
),
1499+
1500+
start '2023-01-01',
1501+
);
1502+
1503+
CREATE SCHEMA IF NOT EXISTS test_schema;
1504+
CREATE TABLE IF NOT EXISTS test_schema.creating_counter (a INT);
1505+
1506+
SELECT 1::INT AS one;
1507+
1508+
@IF(@runtime_stage = 'creating', INSERT INTO test_schema.creating_counter (a) VALUES (1));
1509+
"""
1510+
)
1511+
model = load_sql_based_model(expressions)
1512+
context.upsert_model(model)
1513+
1514+
context.plan("prod", skip_tests=True, no_prompts=True, auto_apply=True)
1515+
assert (
1516+
context.engine_adapter.fetchone("SELECT COUNT(*) FROM test_schema.creating_counter")[0] == 1
1517+
)

0 commit comments

Comments
 (0)