Skip to content

Commit e1510ce

Browse files
authored
chore: move janitor functions to janitor.py (#5510)
1 parent 6fb471f commit e1510ce

File tree

6 files changed

+466
-422
lines changed

6 files changed

+466
-422
lines changed

sqlmesh/core/context.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,8 @@
107107
CachingStateSync,
108108
StateReader,
109109
StateSync,
110-
cleanup_expired_views,
111110
)
112-
from sqlmesh.core.state_sync.common import delete_expired_snapshots
111+
from sqlmesh.core.janitor import cleanup_expired_views, delete_expired_snapshots
113112
from sqlmesh.core.table_diff import TableDiff
114113
from sqlmesh.core.test import (
115114
ModelTextTestResult,

sqlmesh/core/janitor.py

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
from __future__ import annotations
2+
3+
import typing as t
4+
5+
from sqlglot import exp
6+
7+
from sqlmesh.core.engine_adapter import EngineAdapter
8+
from sqlmesh.core.console import Console
9+
from sqlmesh.core.dialect import schema_
10+
from sqlmesh.core.environment import Environment
11+
from sqlmesh.core.snapshot import SnapshotEvaluator
12+
from sqlmesh.core.state_sync import StateSync
13+
from sqlmesh.core.state_sync.common import (
14+
logger,
15+
iter_expired_snapshot_batches,
16+
RowBoundary,
17+
ExpiredBatchRange,
18+
)
19+
from sqlmesh.utils.errors import SQLMeshError
20+
21+
22+
def cleanup_expired_views(
23+
default_adapter: EngineAdapter,
24+
engine_adapters: t.Dict[str, EngineAdapter],
25+
environments: t.List[Environment],
26+
warn_on_delete_failure: bool = False,
27+
console: t.Optional[Console] = None,
28+
) -> None:
29+
expired_schema_or_catalog_environments = [
30+
environment
31+
for environment in environments
32+
if environment.suffix_target.is_schema or environment.suffix_target.is_catalog
33+
]
34+
expired_table_environments = [
35+
environment for environment in environments if environment.suffix_target.is_table
36+
]
37+
38+
# We have to use the corresponding adapter if the virtual layer is gateway managed
39+
def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> EngineAdapter:
40+
if gateway_managed and gateway:
41+
return engine_adapters.get(gateway, default_adapter)
42+
return default_adapter
43+
44+
catalogs_to_drop: t.Set[t.Tuple[EngineAdapter, str]] = set()
45+
schemas_to_drop: t.Set[t.Tuple[EngineAdapter, exp.Table]] = set()
46+
47+
# Collect schemas and catalogs to drop
48+
for engine_adapter, expired_catalog, expired_schema, suffix_target in {
49+
(
50+
(engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)),
51+
snapshot.qualified_view_name.catalog_for_environment(
52+
environment.naming_info, dialect=engine_adapter.dialect
53+
),
54+
snapshot.qualified_view_name.schema_for_environment(
55+
environment.naming_info, dialect=engine_adapter.dialect
56+
),
57+
environment.suffix_target,
58+
)
59+
for environment in expired_schema_or_catalog_environments
60+
for snapshot in environment.snapshots
61+
if snapshot.is_model and not snapshot.is_symbolic
62+
}:
63+
if suffix_target.is_catalog:
64+
if expired_catalog:
65+
catalogs_to_drop.add((engine_adapter, expired_catalog))
66+
else:
67+
schema = schema_(expired_schema, expired_catalog)
68+
schemas_to_drop.add((engine_adapter, schema))
69+
70+
# Drop the views for the expired environments
71+
for engine_adapter, expired_view in {
72+
(
73+
(engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)),
74+
snapshot.qualified_view_name.for_environment(
75+
environment.naming_info, dialect=engine_adapter.dialect
76+
),
77+
)
78+
for environment in expired_table_environments
79+
for snapshot in environment.snapshots
80+
if snapshot.is_model and not snapshot.is_symbolic
81+
}:
82+
try:
83+
engine_adapter.drop_view(expired_view, ignore_if_not_exists=True)
84+
if console:
85+
console.update_cleanup_progress(expired_view)
86+
except Exception as e:
87+
message = f"Failed to drop the expired environment view '{expired_view}': {e}"
88+
if warn_on_delete_failure:
89+
logger.warning(message)
90+
else:
91+
raise SQLMeshError(message) from e
92+
93+
# Drop the schemas for the expired environments
94+
for engine_adapter, schema in schemas_to_drop:
95+
try:
96+
engine_adapter.drop_schema(
97+
schema,
98+
ignore_if_not_exists=True,
99+
cascade=True,
100+
)
101+
if console:
102+
console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect))
103+
except Exception as e:
104+
message = f"Failed to drop the expired environment schema '{schema}': {e}"
105+
if warn_on_delete_failure:
106+
logger.warning(message)
107+
else:
108+
raise SQLMeshError(message) from e
109+
110+
# Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs
111+
# catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog'
112+
for engine_adapter, catalog in catalogs_to_drop:
113+
if engine_adapter.SUPPORTS_CREATE_DROP_CATALOG:
114+
try:
115+
engine_adapter.drop_catalog(catalog)
116+
if console:
117+
console.update_cleanup_progress(catalog)
118+
except Exception as e:
119+
message = f"Failed to drop the expired environment catalog '{catalog}': {e}"
120+
if warn_on_delete_failure:
121+
logger.warning(message)
122+
else:
123+
raise SQLMeshError(message) from e
124+
125+
126+
def delete_expired_snapshots(
127+
state_sync: StateSync,
128+
snapshot_evaluator: SnapshotEvaluator,
129+
*,
130+
current_ts: int,
131+
ignore_ttl: bool = False,
132+
batch_size: t.Optional[int] = None,
133+
console: t.Optional[Console] = None,
134+
) -> None:
135+
"""Delete all expired snapshots in batches.
136+
137+
This helper function encapsulates the logic for deleting expired snapshots in batches,
138+
eliminating code duplication across different use cases.
139+
140+
Args:
141+
state_sync: StateSync instance to query and delete expired snapshots from.
142+
snapshot_evaluator: SnapshotEvaluator instance to clean up tables associated with snapshots.
143+
current_ts: Timestamp used to evaluate expiration.
144+
ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
145+
batch_size: Maximum number of snapshots to fetch per batch.
146+
console: Optional console for reporting progress.
147+
148+
Returns:
149+
The total number of deleted expired snapshots.
150+
"""
151+
num_expired_snapshots = 0
152+
for batch in iter_expired_snapshot_batches(
153+
state_reader=state_sync,
154+
current_ts=current_ts,
155+
ignore_ttl=ignore_ttl,
156+
batch_size=batch_size,
157+
):
158+
end_info = (
159+
f"updated_ts={batch.batch_range.end.updated_ts}"
160+
if isinstance(batch.batch_range.end, RowBoundary)
161+
else f"limit={batch.batch_range.end.batch_size}"
162+
)
163+
logger.info(
164+
"Processing batch of size %s with end %s",
165+
len(batch.expired_snapshot_ids),
166+
end_info,
167+
)
168+
snapshot_evaluator.cleanup(
169+
target_snapshots=batch.cleanup_tasks,
170+
on_complete=console.update_cleanup_progress if console else None,
171+
)
172+
state_sync.delete_expired_snapshots(
173+
batch_range=ExpiredBatchRange(
174+
start=RowBoundary.lowest_boundary(),
175+
end=batch.batch_range.end,
176+
),
177+
ignore_ttl=ignore_ttl,
178+
)
179+
logger.info("Cleaned up expired snapshots batch")
180+
num_expired_snapshots += len(batch.expired_snapshot_ids)
181+
logger.info("Cleaned up %s expired snapshots", num_expired_snapshots)

sqlmesh/core/state_sync/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,4 @@
2020
Versions as Versions,
2121
)
2222
from sqlmesh.core.state_sync.cache import CachingStateSync as CachingStateSync
23-
from sqlmesh.core.state_sync.common import cleanup_expired_views as cleanup_expired_views
2423
from sqlmesh.core.state_sync.db import EngineAdapterStateSync as EngineAdapterStateSync

sqlmesh/core/state_sync/common.py

Lines changed: 1 addition & 168 deletions
Original file line numberDiff line numberDiff line change
@@ -11,132 +11,23 @@
1111
from pydantic_core.core_schema import ValidationInfo
1212
from sqlglot import exp
1313

14-
from sqlmesh.core.console import Console
15-
from sqlmesh.core.dialect import schema_
1614
from sqlmesh.utils.pydantic import PydanticModel, field_validator
1715
from sqlmesh.core.environment import Environment, EnvironmentStatements, EnvironmentNamingInfo
18-
from sqlmesh.utils.errors import SQLMeshError
1916
from sqlmesh.core.snapshot import (
2017
Snapshot,
21-
SnapshotEvaluator,
2218
SnapshotId,
2319
SnapshotTableCleanupTask,
2420
SnapshotTableInfo,
2521
)
2622

2723
if t.TYPE_CHECKING:
28-
from sqlmesh.core.engine_adapter.base import EngineAdapter
29-
from sqlmesh.core.state_sync.base import Versions, StateReader, StateSync
24+
from sqlmesh.core.state_sync.base import Versions, StateReader
3025

3126
logger = logging.getLogger(__name__)
3227

3328
EXPIRED_SNAPSHOT_DEFAULT_BATCH_SIZE = 200
3429

3530

36-
def cleanup_expired_views(
37-
default_adapter: EngineAdapter,
38-
engine_adapters: t.Dict[str, EngineAdapter],
39-
environments: t.List[Environment],
40-
warn_on_delete_failure: bool = False,
41-
console: t.Optional[Console] = None,
42-
) -> None:
43-
expired_schema_or_catalog_environments = [
44-
environment
45-
for environment in environments
46-
if environment.suffix_target.is_schema or environment.suffix_target.is_catalog
47-
]
48-
expired_table_environments = [
49-
environment for environment in environments if environment.suffix_target.is_table
50-
]
51-
52-
# We have to use the corresponding adapter if the virtual layer is gateway managed
53-
def get_adapter(gateway_managed: bool, gateway: t.Optional[str] = None) -> EngineAdapter:
54-
if gateway_managed and gateway:
55-
return engine_adapters.get(gateway, default_adapter)
56-
return default_adapter
57-
58-
catalogs_to_drop: t.Set[t.Tuple[EngineAdapter, str]] = set()
59-
schemas_to_drop: t.Set[t.Tuple[EngineAdapter, exp.Table]] = set()
60-
61-
# Collect schemas and catalogs to drop
62-
for engine_adapter, expired_catalog, expired_schema, suffix_target in {
63-
(
64-
(engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)),
65-
snapshot.qualified_view_name.catalog_for_environment(
66-
environment.naming_info, dialect=engine_adapter.dialect
67-
),
68-
snapshot.qualified_view_name.schema_for_environment(
69-
environment.naming_info, dialect=engine_adapter.dialect
70-
),
71-
environment.suffix_target,
72-
)
73-
for environment in expired_schema_or_catalog_environments
74-
for snapshot in environment.snapshots
75-
if snapshot.is_model and not snapshot.is_symbolic
76-
}:
77-
if suffix_target.is_catalog:
78-
if expired_catalog:
79-
catalogs_to_drop.add((engine_adapter, expired_catalog))
80-
else:
81-
schema = schema_(expired_schema, expired_catalog)
82-
schemas_to_drop.add((engine_adapter, schema))
83-
84-
# Drop the views for the expired environments
85-
for engine_adapter, expired_view in {
86-
(
87-
(engine_adapter := get_adapter(environment.gateway_managed, snapshot.model_gateway)),
88-
snapshot.qualified_view_name.for_environment(
89-
environment.naming_info, dialect=engine_adapter.dialect
90-
),
91-
)
92-
for environment in expired_table_environments
93-
for snapshot in environment.snapshots
94-
if snapshot.is_model and not snapshot.is_symbolic
95-
}:
96-
try:
97-
engine_adapter.drop_view(expired_view, ignore_if_not_exists=True)
98-
if console:
99-
console.update_cleanup_progress(expired_view)
100-
except Exception as e:
101-
message = f"Failed to drop the expired environment view '{expired_view}': {e}"
102-
if warn_on_delete_failure:
103-
logger.warning(message)
104-
else:
105-
raise SQLMeshError(message) from e
106-
107-
# Drop the schemas for the expired environments
108-
for engine_adapter, schema in schemas_to_drop:
109-
try:
110-
engine_adapter.drop_schema(
111-
schema,
112-
ignore_if_not_exists=True,
113-
cascade=True,
114-
)
115-
if console:
116-
console.update_cleanup_progress(schema.sql(dialect=engine_adapter.dialect))
117-
except Exception as e:
118-
message = f"Failed to drop the expired environment schema '{schema}': {e}"
119-
if warn_on_delete_failure:
120-
logger.warning(message)
121-
else:
122-
raise SQLMeshError(message) from e
123-
124-
# Drop any catalogs that were associated with a snapshot where the engine adapter supports dropping catalogs
125-
# catalogs_to_drop is only populated when environment_suffix_target is set to 'catalog'
126-
for engine_adapter, catalog in catalogs_to_drop:
127-
if engine_adapter.SUPPORTS_CREATE_DROP_CATALOG:
128-
try:
129-
engine_adapter.drop_catalog(catalog)
130-
if console:
131-
console.update_cleanup_progress(catalog)
132-
except Exception as e:
133-
message = f"Failed to drop the expired environment catalog '{catalog}': {e}"
134-
if warn_on_delete_failure:
135-
logger.warning(message)
136-
else:
137-
raise SQLMeshError(message) from e
138-
139-
14031
def transactional() -> t.Callable[[t.Callable], t.Callable]:
14132
def decorator(func: t.Callable) -> t.Callable:
14233
@wraps(func)
@@ -429,61 +320,3 @@ def iter_expired_snapshot_batches(
429320
start=batch.batch_range.end,
430321
end=LimitBoundary(batch_size=batch_size),
431322
)
432-
433-
434-
def delete_expired_snapshots(
435-
state_sync: StateSync,
436-
snapshot_evaluator: SnapshotEvaluator,
437-
*,
438-
current_ts: int,
439-
ignore_ttl: bool = False,
440-
batch_size: t.Optional[int] = None,
441-
console: t.Optional[Console] = None,
442-
) -> None:
443-
"""Delete all expired snapshots in batches.
444-
445-
This helper function encapsulates the logic for deleting expired snapshots in batches,
446-
eliminating code duplication across different use cases.
447-
448-
Args:
449-
state_sync: StateSync instance to query and delete expired snapshots from.
450-
snapshot_evaluator: SnapshotEvaluator instance to clean up tables associated with snapshots.
451-
current_ts: Timestamp used to evaluate expiration.
452-
ignore_ttl: If True, include snapshots regardless of TTL (only checks if unreferenced).
453-
batch_size: Maximum number of snapshots to fetch per batch.
454-
console: Optional console for reporting progress.
455-
456-
Returns:
457-
The total number of deleted expired snapshots.
458-
"""
459-
num_expired_snapshots = 0
460-
for batch in iter_expired_snapshot_batches(
461-
state_reader=state_sync,
462-
current_ts=current_ts,
463-
ignore_ttl=ignore_ttl,
464-
batch_size=batch_size,
465-
):
466-
end_info = (
467-
f"updated_ts={batch.batch_range.end.updated_ts}"
468-
if isinstance(batch.batch_range.end, RowBoundary)
469-
else f"limit={batch.batch_range.end.batch_size}"
470-
)
471-
logger.info(
472-
"Processing batch of size %s with end %s",
473-
len(batch.expired_snapshot_ids),
474-
end_info,
475-
)
476-
snapshot_evaluator.cleanup(
477-
target_snapshots=batch.cleanup_tasks,
478-
on_complete=console.update_cleanup_progress if console else None,
479-
)
480-
state_sync.delete_expired_snapshots(
481-
batch_range=ExpiredBatchRange(
482-
start=RowBoundary.lowest_boundary(),
483-
end=batch.batch_range.end,
484-
),
485-
ignore_ttl=ignore_ttl,
486-
)
487-
logger.info("Cleaned up expired snapshots batch")
488-
num_expired_snapshots += len(batch.expired_snapshot_ids)
489-
logger.info("Cleaned up %s expired snapshots", num_expired_snapshots)

0 commit comments

Comments
 (0)