Skip to content

Commit a98b5c4

Browse files
authored
Feat: Check freshness for mixed external & sqlmesh models (#5499)
1 parent 4638b5c commit a98b5c4

File tree

5 files changed

+531
-168
lines changed

5 files changed

+531
-168
lines changed

sqlmesh/core/context.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@
154154
)
155155
from sqlmesh.core.snapshot import Node
156156

157+
from sqlmesh.core.snapshot.definition import Intervals
158+
157159
ModelOrSnapshot = t.Union[str, Model, Snapshot]
158160
NodeOrSnapshot = t.Union[str, Model, StandaloneAudit, Snapshot]
159161

@@ -276,6 +278,7 @@ def __init__(
276278
default_dialect: t.Optional[str] = None,
277279
default_catalog: t.Optional[str] = None,
278280
is_restatement: t.Optional[bool] = None,
281+
parent_intervals: t.Optional[Intervals] = None,
279282
variables: t.Optional[t.Dict[str, t.Any]] = None,
280283
blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None,
281284
):
@@ -287,6 +290,7 @@ def __init__(
287290
self._variables = variables or {}
288291
self._blueprint_variables = blueprint_variables or {}
289292
self._is_restatement = is_restatement
293+
self._parent_intervals = parent_intervals
290294

291295
@property
292296
def default_dialect(self) -> t.Optional[str]:
@@ -315,6 +319,10 @@ def gateway(self) -> t.Optional[str]:
315319
def is_restatement(self) -> t.Optional[bool]:
316320
return self._is_restatement
317321

322+
@property
323+
def parent_intervals(self) -> t.Optional[Intervals]:
324+
return self._parent_intervals
325+
318326
def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]:
319327
"""Returns a variable value."""
320328
return self._variables.get(var_name.lower(), default)

sqlmesh/core/scheduler.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ def batch_intervals(
352352
)
353353
for snapshot, intervals in merged_intervals.items()
354354
}
355-
snapshot_batches = {}
355+
snapshot_batches: t.Dict[Snapshot, Intervals] = {}
356356
all_unready_intervals: t.Dict[str, set[Interval]] = {}
357357
for snapshot_id in dag:
358358
if snapshot_id not in snapshot_intervals:
@@ -364,13 +364,22 @@ def batch_intervals(
364364

365365
adapter = self.snapshot_evaluator.get_adapter(snapshot.model_gateway)
366366

367+
parent_intervals: Intervals = []
368+
for parent_id in snapshot.parents:
369+
parent_snapshot, _ = snapshot_intervals.get(parent_id, (None, []))
370+
if not parent_snapshot or parent_snapshot.is_external:
371+
continue
372+
373+
parent_intervals.extend(snapshot_batches[parent_snapshot])
374+
367375
context = ExecutionContext(
368376
adapter,
369377
self.snapshots_by_name,
370378
deployability_index,
371379
default_dialect=adapter.dialect,
372380
default_catalog=self.default_catalog,
373381
is_restatement=is_restatement,
382+
parent_intervals=parent_intervals,
374383
)
375384

376385
intervals = self._check_ready_intervals(

sqlmesh/core/signal.py

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import typing as t
44
from sqlmesh.utils import UniqueKeyDict, registry_decorator
5+
from sqlmesh.utils.errors import MissingSourceError
56

67
if t.TYPE_CHECKING:
78
from sqlmesh.core.context import ExecutionContext
@@ -42,7 +43,16 @@ class signal(registry_decorator):
4243

4344

4445
@signal()
45-
def freshness(batch: DatetimeRanges, snapshot: Snapshot, context: ExecutionContext) -> bool:
46+
def freshness(
47+
batch: DatetimeRanges,
48+
snapshot: Snapshot,
49+
context: ExecutionContext,
50+
) -> bool:
51+
"""
52+
Implements model freshness as a signal, i.e it considers this model to be fresh if:
53+
- Any upstream SQLMesh model has available intervals to compute i.e is fresh
54+
- Any upstream external model has been altered since the last time the model was evaluated
55+
"""
4656
adapter = context.engine_adapter
4757
if context.is_restatement or not adapter.SUPPORTS_METADATA_TABLE_LAST_MODIFIED_TS:
4858
return True
@@ -54,24 +64,35 @@ def freshness(batch: DatetimeRanges, snapshot: Snapshot, context: ExecutionConte
5464
if deployability_index.is_deployable(snapshot)
5565
else snapshot.dev_last_altered_ts
5666
)
67+
5768
if not last_altered_ts:
5869
return True
5970

6071
parent_snapshots = {context.snapshots[p.name] for p in snapshot.parents}
61-
if len(parent_snapshots) != len(snapshot.node.depends_on) or not all(
62-
p.is_external for p in parent_snapshots
63-
):
64-
# The mismatch can happen if e.g an external model is not registered in the project
72+
73+
upstream_parent_snapshots = {p for p in parent_snapshots if not p.is_external}
74+
external_parents = snapshot.node.depends_on - {p.name for p in upstream_parent_snapshots}
75+
76+
if context.parent_intervals:
77+
# At least one upstream sqlmesh model has intervals to compute (i.e is fresh),
78+
# so the current model is considered fresh too
6579
return True
6680

67-
# Finding new data means that the upstream depedencies have been altered
68-
# since the last time the model was evaluated
69-
upstream_dep_has_new_data = any(
70-
upstream_last_altered_ts > last_altered_ts
71-
for upstream_last_altered_ts in adapter.get_table_last_modified_ts(
72-
[p.name for p in parent_snapshots]
81+
if external_parents:
82+
external_last_altered_timestamps = adapter.get_table_last_modified_ts(
83+
list(external_parents)
84+
)
85+
86+
if len(external_last_altered_timestamps) != len(external_parents):
87+
raise MissingSourceError(
88+
f"Expected {len(external_parents)} sources to be present, but got {len(external_last_altered_timestamps)}."
89+
)
90+
91+
# Finding new data means that the upstream depedencies have been altered
92+
# since the last time the model was evaluated
93+
return any(
94+
external_last_altered_ts > last_altered_ts
95+
for external_last_altered_ts in external_last_altered_timestamps
7396
)
74-
)
7597

76-
# Returning true is a no-op, returning False nullifies the batch so the model will not be evaluated.
77-
return upstream_dep_has_new_data
98+
return False

0 commit comments

Comments
 (0)