Skip to content

Commit bbdbd48

Browse files
authored
feat: batch expired snapshots (#5486)
1 parent 3535195 commit bbdbd48

File tree

10 files changed

+1004
-173
lines changed

10 files changed

+1004
-173
lines changed

docs/reference/configuration.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,10 @@ Formatting settings for the `sqlmesh format` command and UI.
125125

126126
Configuration for the `sqlmesh janitor` command.
127127

128-
| Option | Description | Type | Required |
129-
|--------------------------|----------------------------------------------------------------------------------------------------------------------------|:-------:|:--------:|
130-
| `warn_on_delete_failure` | Whether to warn instead of erroring if the janitor fails to delete the expired environment schema / views (Default: False) | boolean | N |
128+
| Option | Description | Type | Required |
129+
|---------------------------------|----------------------------------------------------------------------------------------------------------------------------|:-------:|:--------:|
130+
| `warn_on_delete_failure` | Whether to warn instead of erroring if the janitor fails to delete the expired environment schema / views (Default: False) | boolean | N |
131+
| `expired_snapshots_batch_size` | Maximum number of expired snapshots to clean in a single batch (Default: 200) | int | N |
131132

132133

133134
## UI

sqlmesh/core/config/janitor.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,26 @@
11
from __future__ import annotations
22

3+
import typing as t
34

45
from sqlmesh.core.config.base import BaseConfig
6+
from sqlmesh.utils.pydantic import field_validator
57

68

79
class JanitorConfig(BaseConfig):
810
"""The configuration for the janitor.
911
1012
Args:
1113
warn_on_delete_failure: Whether to warn instead of erroring if the janitor fails to delete the expired environment schema / views.
14+
expired_snapshots_batch_size: Maximum number of expired snapshots to clean in a single batch.
1215
"""
1316

1417
warn_on_delete_failure: bool = False
18+
expired_snapshots_batch_size: t.Optional[int] = None
19+
20+
@field_validator("expired_snapshots_batch_size", mode="before")
21+
@classmethod
22+
def _validate_batch_size(cls, value: int) -> int:
23+
batch_size = int(value)
24+
if batch_size <= 0:
25+
raise ValueError("expired_snapshots_batch_size must be greater than 0")
26+
return batch_size

sqlmesh/core/context.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@
109109
StateSync,
110110
cleanup_expired_views,
111111
)
112+
from sqlmesh.core.state_sync.common import delete_expired_snapshots
112113
from sqlmesh.core.table_diff import TableDiff
113114
from sqlmesh.core.test import (
114115
ModelTextTestResult,
@@ -2852,19 +2853,14 @@ def _run_janitor(self, ignore_ttl: bool = False) -> None:
28522853
# Clean up expired environments by removing their views and schemas
28532854
self._cleanup_environments(current_ts=current_ts)
28542855

2855-
cleanup_targets = self.state_sync.get_expired_snapshots(
2856-
ignore_ttl=ignore_ttl, current_ts=current_ts
2857-
)
2858-
2859-
# Remove the expired snapshots tables
2860-
self.snapshot_evaluator.cleanup(
2861-
target_snapshots=cleanup_targets,
2862-
on_complete=self.console.update_cleanup_progress,
2856+
delete_expired_snapshots(
2857+
self.state_sync,
2858+
self.snapshot_evaluator,
2859+
current_ts=current_ts,
2860+
ignore_ttl=ignore_ttl,
2861+
console=self.console,
2862+
batch_size=self.config.janitor.expired_snapshots_batch_size,
28632863
)
2864-
2865-
# Delete the expired snapshot records from the state sync
2866-
self.state_sync.delete_expired_snapshots(ignore_ttl=ignore_ttl, current_ts=current_ts)
2867-
28682864
self.state_sync.compact_intervals()
28692865

28702866
def _cleanup_environments(self, current_ts: t.Optional[int] = None) -> None:

sqlmesh/core/state_sync/base.py

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from sqlmesh import migrations
1212
from sqlmesh.core.environment import (
1313
Environment,
14-
EnvironmentNamingInfo,
1514
EnvironmentStatements,
1615
EnvironmentSummary,
1716
)
@@ -21,17 +20,20 @@
2120
SnapshotIdLike,
2221
SnapshotIdAndVersionLike,
2322
SnapshotInfoLike,
24-
SnapshotTableCleanupTask,
25-
SnapshotTableInfo,
2623
SnapshotNameVersion,
2724
SnapshotIdAndVersion,
2825
)
2926
from sqlmesh.core.snapshot.definition import Interval, SnapshotIntervals
3027
from sqlmesh.utils import major_minor
3128
from sqlmesh.utils.date import TimeLike
3229
from sqlmesh.utils.errors import SQLMeshError
33-
from sqlmesh.utils.pydantic import PydanticModel, ValidationInfo, field_validator
34-
from sqlmesh.core.state_sync.common import StateStream
30+
from sqlmesh.utils.pydantic import PydanticModel, field_validator
31+
from sqlmesh.core.state_sync.common import (
32+
StateStream,
33+
ExpiredSnapshotBatch,
34+
PromotionResult,
35+
ExpiredBatchRange,
36+
)
3537

3638
logger = logging.getLogger(__name__)
3739

@@ -72,20 +74,6 @@ def _schema_version_validator(cls, v: t.Any) -> int:
7274
SCHEMA_VERSION: int = MIN_SCHEMA_VERSION + len(MIGRATIONS) - 1
7375

7476

75-
class PromotionResult(PydanticModel):
76-
added: t.List[SnapshotTableInfo]
77-
removed: t.List[SnapshotTableInfo]
78-
removed_environment_naming_info: t.Optional[EnvironmentNamingInfo]
79-
80-
@field_validator("removed_environment_naming_info")
81-
def _validate_removed_environment_naming_info(
82-
cls, v: t.Optional[EnvironmentNamingInfo], info: ValidationInfo
83-
) -> t.Optional[EnvironmentNamingInfo]:
84-
if v and not info.data.get("removed"):
85-
raise ValueError("removed_environment_naming_info must be None if removed is empty")
86-
return v
87-
88-
8977
class StateReader(abc.ABC):
9078
"""Abstract base class for read-only operations on snapshot and environment state."""
9179

@@ -315,15 +303,21 @@ def export(self, environment_names: t.Optional[t.List[str]] = None) -> StateStre
315303

316304
@abc.abstractmethod
317305
def get_expired_snapshots(
318-
self, current_ts: t.Optional[int] = None, ignore_ttl: bool = False
319-
) -> t.List[SnapshotTableCleanupTask]:
320-
"""Aggregates the id's of the expired snapshots and creates a list of table cleanup tasks.
306+
self,
307+
*,
308+
batch_range: ExpiredBatchRange,
309+
current_ts: t.Optional[int] = None,
310+
ignore_ttl: bool = False,
311+
) -> t.Optional[ExpiredSnapshotBatch]:
312+
"""Returns a single batch of expired snapshots ordered by (updated_ts, name, identifier).
321313
322-
Expired snapshots are snapshots that have exceeded their time-to-live
323-
and are no longer in use within an environment.
314+
Args:
315+
current_ts: Timestamp used to evaluate expiration.
316+
ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
317+
batch_range: The range of the batch to fetch.
324318
325319
Returns:
326-
The list of table cleanup tasks.
320+
A batch describing expired snapshots or None if no snapshots are pending cleanup.
327321
"""
328322

329323
@abc.abstractmethod
@@ -363,16 +357,21 @@ def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
363357

364358
@abc.abstractmethod
365359
def delete_expired_snapshots(
366-
self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None
360+
self,
361+
batch_range: ExpiredBatchRange,
362+
ignore_ttl: bool = False,
363+
current_ts: t.Optional[int] = None,
367364
) -> None:
368365
"""Removes expired snapshots.
369366
370367
Expired snapshots are snapshots that have exceeded their time-to-live
371368
and are no longer in use within an environment.
372369
373370
Args:
371+
batch_range: The range of snapshots to delete in this batch.
374372
ignore_ttl: Ignore the TTL on the snapshot when considering it expired. This has the effect of deleting
375373
all snapshots that are not referenced in any environment
374+
current_ts: Timestamp used to evaluate expiration.
376375
"""
377376

378377
@abc.abstractmethod

sqlmesh/core/state_sync/cache.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
)
1313
from sqlmesh.core.snapshot.definition import Interval, SnapshotIntervals
1414
from sqlmesh.core.state_sync.base import DelegatingStateSync, StateSync
15+
from sqlmesh.core.state_sync.common import ExpiredBatchRange
1516
from sqlmesh.utils.date import TimeLike, now_timestamp
1617

1718

@@ -108,11 +109,17 @@ def delete_snapshots(self, snapshot_ids: t.Iterable[SnapshotIdLike]) -> None:
108109
self.state_sync.delete_snapshots(snapshot_ids)
109110

110111
def delete_expired_snapshots(
111-
self, ignore_ttl: bool = False, current_ts: t.Optional[int] = None
112+
self,
113+
batch_range: ExpiredBatchRange,
114+
ignore_ttl: bool = False,
115+
current_ts: t.Optional[int] = None,
112116
) -> None:
113-
current_ts = current_ts or now_timestamp()
114117
self.snapshot_cache.clear()
115-
self.state_sync.delete_expired_snapshots(current_ts=current_ts, ignore_ttl=ignore_ttl)
118+
self.state_sync.delete_expired_snapshots(
119+
batch_range=batch_range,
120+
ignore_ttl=ignore_ttl,
121+
current_ts=current_ts,
122+
)
116123

117124
def add_snapshots_intervals(self, snapshots_intervals: t.Sequence[SnapshotIntervals]) -> None:
118125
for snapshot_intervals in snapshots_intervals:

0 commit comments

Comments
 (0)