|
13 | 13 | ) |
14 | 14 | from sqlmesh.utils.errors import SQLMeshError |
15 | 15 | from sqlmesh.utils.connection_pool import ConnectionPool |
| 16 | +from sqlmesh.core.schema_diff import TableAlterOperation |
| 17 | +from sqlmesh.utils import random_id |
16 | 18 |
|
17 | 19 |
|
18 | 20 | logger = logging.getLogger(__name__) |
@@ -153,6 +155,113 @@ def set_current_catalog(self, catalog_name: str) -> None: |
153 | 155 | f"Unable to switch catalog to {catalog_name}, catalog ended up as {catalog_after_switch}" |
154 | 156 | ) |
155 | 157 |
|
| 158 | + def alter_table( |
| 159 | + self, alter_expressions: t.Union[t.List[exp.Alter], t.List[TableAlterOperation]] |
| 160 | + ) -> None: |
| 161 | + """ |
| 162 | + Applies alter expressions to a table. Fabric has limited support for ALTER TABLE, |
| 163 | + so this method implements a workaround for column type changes. |
| 164 | + This method is self-contained and sets its own catalog context. |
| 165 | + """ |
| 166 | + if not alter_expressions: |
| 167 | + return |
| 168 | + |
| 169 | + # Get the target table from the first expression to determine the correct catalog. |
| 170 | + first_op = alter_expressions[0] |
| 171 | + expression = first_op.expression if isinstance(first_op, TableAlterOperation) else first_op |
| 172 | + if not isinstance(expression, exp.Alter) or not expression.this.catalog: |
| 173 | + # Fallback for unexpected scenarios |
| 174 | + logger.warning( |
| 175 | + "Could not determine catalog from alter expression, executing with current context." |
| 176 | + ) |
| 177 | + super().alter_table(alter_expressions) |
| 178 | + return |
| 179 | + |
| 180 | + target_catalog = expression.this.catalog |
| 181 | + self.set_current_catalog(target_catalog) |
| 182 | + |
| 183 | + with self.transaction(): |
| 184 | + for op in alter_expressions: |
| 185 | + expression = op.expression if isinstance(op, TableAlterOperation) else op |
| 186 | + |
| 187 | + if not isinstance(expression, exp.Alter): |
| 188 | + self.execute(expression) |
| 189 | + continue |
| 190 | + |
| 191 | + for action in expression.actions: |
| 192 | + table_name = expression.this |
| 193 | + |
| 194 | + table_name_without_catalog = table_name.copy() |
| 195 | + table_name_without_catalog.set("catalog", None) |
| 196 | + |
| 197 | + is_type_change = isinstance(action, exp.AlterColumn) and action.args.get( |
| 198 | + "dtype" |
| 199 | + ) |
| 200 | + |
| 201 | + if is_type_change: |
| 202 | + column_to_alter = action.this |
| 203 | + new_type = action.args["dtype"] |
| 204 | + temp_column_name_str = f"{column_to_alter.name}__{random_id(short=True)}" |
| 205 | + temp_column_name = exp.to_identifier(temp_column_name_str) |
| 206 | + |
| 207 | + logger.info( |
| 208 | + "Applying workaround for column '%s' on table '%s' to change type to '%s'.", |
| 209 | + column_to_alter.sql(), |
| 210 | + table_name.sql(), |
| 211 | + new_type.sql(), |
| 212 | + ) |
| 213 | + |
| 214 | + # Step 1: Add a temporary column. |
| 215 | + add_column_expr = exp.Alter( |
| 216 | + this=table_name_without_catalog.copy(), |
| 217 | + kind="TABLE", |
| 218 | + actions=[ |
| 219 | + exp.ColumnDef(this=temp_column_name.copy(), kind=new_type.copy()) |
| 220 | + ], |
| 221 | + ) |
| 222 | + add_sql = self._to_sql(add_column_expr) |
| 223 | + self.execute(add_sql) |
| 224 | + |
| 225 | + # Step 2: Copy and cast data. |
| 226 | + update_sql = self._to_sql( |
| 227 | + exp.Update( |
| 228 | + this=table_name_without_catalog.copy(), |
| 229 | + expressions=[ |
| 230 | + exp.EQ( |
| 231 | + this=temp_column_name.copy(), |
| 232 | + expression=exp.Cast( |
| 233 | + this=column_to_alter.copy(), to=new_type.copy() |
| 234 | + ), |
| 235 | + ) |
| 236 | + ], |
| 237 | + ) |
| 238 | + ) |
| 239 | + self.execute(update_sql) |
| 240 | + |
| 241 | + # Step 3: Drop the original column. |
| 242 | + drop_sql = self._to_sql( |
| 243 | + exp.Alter( |
| 244 | + this=table_name_without_catalog.copy(), |
| 245 | + kind="TABLE", |
| 246 | + actions=[exp.Drop(this=column_to_alter.copy(), kind="COLUMN")], |
| 247 | + ) |
| 248 | + ) |
| 249 | + self.execute(drop_sql) |
| 250 | + |
| 251 | + # Step 4: Rename the temporary column. |
| 252 | + old_name_qualified = f"{table_name_without_catalog.sql(dialect=self.dialect)}.{temp_column_name.sql(dialect=self.dialect)}" |
| 253 | + new_name_unquoted = column_to_alter.sql( |
| 254 | + dialect=self.dialect, identify=False |
| 255 | + ) |
| 256 | + rename_sql = f"EXEC sp_rename '{old_name_qualified}', '{new_name_unquoted}', 'COLUMN'" |
| 257 | + self.execute(rename_sql) |
| 258 | + else: |
| 259 | + # For other alterations, execute directly. |
| 260 | + direct_alter_expr = exp.Alter( |
| 261 | + this=table_name_without_catalog.copy(), kind="TABLE", actions=[action] |
| 262 | + ) |
| 263 | + self.execute(direct_alter_expr) |
| 264 | + |
156 | 265 |
|
157 | 266 | class FabricHttpClient: |
158 | 267 | def __init__(self, tenant_id: str, workspace_id: str, client_id: str, client_secret: str): |
|
0 commit comments