Skip to content

Commit 5edf538

Browse files
authored
fix: remove state sync from migrate (#5502)
1 parent bbdbd48 commit 5edf538

File tree

43 files changed

+94
-155
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+94
-155
lines changed

sqlmesh/core/state_sync/db/facade.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,7 @@ def migrate(
469469
) -> None:
470470
"""Migrate the state sync to the latest SQLMesh / SQLGlot version."""
471471
self.migrator.migrate(
472-
self,
472+
self.schema,
473473
skip_backup=skip_backup,
474474
promoted_snapshots_only=promoted_snapshots_only,
475475
)

sqlmesh/core/state_sync/db/migrator.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
MIN_SCHEMA_VERSION,
3131
MIN_SQLMESH_VERSION,
3232
)
33-
from sqlmesh.core.state_sync.base import StateSync
3433
from sqlmesh.core.state_sync.db.environment import EnvironmentState
3534
from sqlmesh.core.state_sync.db.interval import IntervalState
3635
from sqlmesh.core.state_sync.db.snapshot import SnapshotState
@@ -85,7 +84,7 @@ def __init__(
8584

8685
def migrate(
8786
self,
88-
state_sync: StateSync,
87+
schema: t.Optional[str],
8988
skip_backup: bool = False,
9089
promoted_snapshots_only: bool = True,
9190
) -> None:
@@ -94,7 +93,7 @@ def migrate(
9493
migration_start_ts = time.perf_counter()
9594

9695
try:
97-
migrate_rows = self._apply_migrations(state_sync, skip_backup)
96+
migrate_rows = self._apply_migrations(schema, skip_backup)
9897

9998
if not migrate_rows and major_minor(SQLMESH_VERSION) == versions.minor_sqlmesh_version:
10099
return
@@ -153,7 +152,7 @@ def rollback(self) -> None:
153152

154153
def _apply_migrations(
155154
self,
156-
state_sync: StateSync,
155+
schema: t.Optional[str],
157156
skip_backup: bool,
158157
) -> bool:
159158
versions = self.version_state.get_versions()
@@ -184,10 +183,10 @@ def _apply_migrations(
184183

185184
for migration in migrations:
186185
logger.info(f"Applying migration {migration}")
187-
migration.migrate_schemas(state_sync)
186+
migration.migrate_schemas(engine_adapter=self.engine_adapter, schema=schema)
188187
if state_table_exist:
189188
# No need to run DML for the initial migration since all tables are empty
190-
migration.migrate_rows(state_sync)
189+
migration.migrate_rows(engine_adapter=self.engine_adapter, schema=schema)
191190

192191
snapshot_count_after = self.snapshot_state.count()
193192

sqlmesh/migrations/v0000_baseline.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,12 @@
44
from sqlmesh.utils.migration import blob_text_type, index_text_type
55

66

7-
def migrate_schemas(state_sync, **kwargs): # type: ignore
8-
schema = state_sync.schema
9-
engine_adapter = state_sync.engine_adapter
10-
7+
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
118
intervals_table = "_intervals"
129
snapshots_table = "_snapshots"
1310
environments_table = "_environments"
1411
versions_table = "_versions"
15-
if state_sync.schema:
12+
if schema:
1613
engine_adapter.create_schema(schema)
1714
intervals_table = f"{schema}.{intervals_table}"
1815
snapshots_table = f"{schema}.{snapshots_table}"
@@ -94,5 +91,5 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore
9491
engine_adapter.create_index(intervals_table, "_intervals_name_version_idx", ("name", "version"))
9592

9693

97-
def migrate_rows(state_sync, **kwargs): # type: ignore
94+
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
9895
pass

sqlmesh/migrations/v0061_mysql_fix_blob_text_type.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,9 @@
99
from sqlmesh.utils.migration import blob_text_type
1010

1111

12-
def migrate_schemas(state_sync, **kwargs): # type: ignore
13-
engine_adapter = state_sync.engine_adapter
12+
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
1413
if engine_adapter.dialect != "mysql":
1514
return
16-
17-
schema = state_sync.schema
1815
environments_table = "_environments"
1916
snapshots_table = "_snapshots"
2017

@@ -46,5 +43,5 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore
4643
engine_adapter.execute(alter_table_exp)
4744

4845

49-
def migrate_rows(state_sync, **kwargs): # type: ignore
46+
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
5047
pass
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
"""Add the gateway model attribute."""
22

33

4-
def migrate_schemas(state_sync, **kwargs): # type: ignore
4+
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
55
pass
66

77

8-
def migrate_rows(state_sync, **kwargs): # type: ignore
8+
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
99
pass

sqlmesh/migrations/v0063_change_signals.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,13 @@
77
from sqlmesh.utils.migration import index_text_type, blob_text_type
88

99

10-
def migrate_schemas(state_sync, **kwargs): # type: ignore
10+
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
1111
pass
1212

1313

14-
def migrate_rows(state_sync, **kwargs): # type: ignore
14+
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
1515
import pandas as pd
1616

17-
engine_adapter = state_sync.engine_adapter
18-
schema = state_sync.schema
1917
snapshots_table = "_snapshots"
2018
index_type = index_text_type(engine_adapter.dialect)
2119
if schema:

sqlmesh/migrations/v0064_join_when_matched_strings.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,13 @@
77
from sqlmesh.utils.migration import index_text_type, blob_text_type
88

99

10-
def migrate_schemas(state_sync, **kwargs): # type: ignore
10+
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
1111
pass
1212

1313

14-
def migrate_rows(state_sync, **kwargs): # type: ignore
14+
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
1515
import pandas as pd
1616

17-
engine_adapter = state_sync.engine_adapter
18-
schema = state_sync.schema
1917
snapshots_table = "_snapshots"
2018
index_type = index_text_type(engine_adapter.dialect)
2119
if schema:
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
"""Add the optimize_query model attribute."""
22

33

4-
def migrate_schemas(state_sync, **kwargs): # type: ignore
4+
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
55
pass
66

77

8-
def migrate_rows(state_sync, **kwargs): # type: ignore
8+
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
99
pass

sqlmesh/migrations/v0066_add_auto_restatements.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@
55
from sqlmesh.utils.migration import index_text_type
66

77

8-
def migrate_schemas(state_sync, **kwargs): # type: ignore
9-
engine_adapter = state_sync.engine_adapter
10-
schema = state_sync.schema
8+
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
119
auto_restatements_table = "_auto_restatements"
1210
intervals_table = "_intervals"
1311

@@ -40,9 +38,7 @@ def migrate_schemas(state_sync, **kwargs): # type: ignore
4038
engine_adapter.execute(alter_table_exp)
4139

4240

43-
def migrate_rows(state_sync, **kwargs): # type: ignore
44-
engine_adapter = state_sync.engine_adapter
45-
schema = state_sync.schema
41+
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
4642
intervals_table = "_intervals"
4743

4844
if schema:
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
"""Add full precision for tsql to support nanoseconds."""
22

33

4-
def migrate_schemas(state_sync, **kwargs): # type: ignore
4+
def migrate_schemas(engine_adapter, schema, **kwargs): # type: ignore
55
pass
66

77

8-
def migrate_rows(state_sync, **kwargs): # type: ignore
8+
def migrate_rows(engine_adapter, schema, **kwargs): # type: ignore
99
pass

0 commit comments

Comments
 (0)