Skip to content

Commit cbd1b97

Browse files
committed
Refactor
1 parent 33ac43f commit cbd1b97

File tree

4 files changed

+127
-115
lines changed

4 files changed

+127
-115
lines changed

sqlmesh/core/config/connection.py

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1891,7 +1891,7 @@ class TrinoConnectionConfig(ConnectionConfig):
18911891

18921892
# SQLMesh options
18931893
schema_location_mapping: t.Optional[dict[re.Pattern, str]] = None
1894-
trino_to_delta_timestamp_mapping: t.Optional[dict[exp.DataType, exp.DataType]] = None
1894+
timestamp_mapping: t.Optional[dict[exp.DataType, exp.DataType]] = None
18951895
concurrent_tasks: int = 4
18961896
register_comments: bool = True
18971897
pre_ping: t.Literal[False] = False
@@ -1916,43 +1916,30 @@ def _validate_regex_keys(
19161916
)
19171917
return compiled
19181918

1919-
@field_validator("trino_to_delta_timestamp_mapping", mode="before")
1919+
@field_validator("timestamp_mapping", mode="before")
19201920
@classmethod
1921-
def _validate_trino_to_delta_timestamp_mapping(
1921+
def _validate_timestamp_mapping(
19221922
cls, value: t.Optional[dict[str, str]]
19231923
) -> t.Optional[dict[exp.DataType, exp.DataType]]:
19241924
if value is None:
19251925
return value
19261926

1927-
# Delta Lake only supports these timestamp types
1928-
valid_delta_types = {
1929-
exp.DataType.build("TIMESTAMP"),
1930-
exp.DataType.build("TIMESTAMPNTZ"),
1931-
}
1932-
19331927
result: dict[exp.DataType, exp.DataType] = {}
19341928
for source_type, target_type in value.items():
19351929
try:
19361930
source_datatype = exp.DataType.build(source_type)
19371931
except ParseError:
19381932
raise ConfigError(
1939-
"Invalid SQL type string in trino_to_delta_timestamp_mapping: "
1933+
f"Invalid SQL type string in timestamp_mapping: "
19401934
f"'{source_type}' is not a valid SQL data type."
19411935
)
19421936
try:
19431937
target_datatype = exp.DataType.build(target_type)
19441938
except ParseError:
19451939
raise ConfigError(
1946-
"Invalid SQL type string in trino_to_delta_timestamp_mapping: "
1940+
f"Invalid SQL type string in timestamp_mapping: "
19471941
f"'{target_type}' is not a valid SQL data type."
19481942
)
1949-
# Delta Lake types must be TIMESTAMP or TIMESTAMP_NTZ
1950-
if target_datatype not in valid_delta_types:
1951-
raise ConfigError(
1952-
f"Invalid target type in trino_to_delta_timestamp_mapping: "
1953-
f"'{target_type}' is not a valid Delta Lake timestamp type. "
1954-
f"Valid types are: TIMESTAMP, TIMESTAMP_NTZ, or TIMESTAMPNTZ."
1955-
)
19561943
result[source_datatype] = target_datatype
19571944

19581945
return result
@@ -2061,7 +2048,7 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]:
20612048
def _extra_engine_config(self) -> t.Dict[str, t.Any]:
20622049
return {
20632050
"schema_location_mapping": self.schema_location_mapping,
2064-
"trino_to_delta_timestamp_mapping": self.trino_to_delta_timestamp_mapping,
2051+
"timestamp_mapping": self.timestamp_mapping,
20652052
}
20662053

20672054

sqlmesh/core/engine_adapter/trino.py

Lines changed: 46 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,30 @@ def schema_location_mapping(self) -> t.Optional[t.Dict[re.Pattern, str]]:
7575
return self._extra_config.get("schema_location_mapping")
7676

7777
@property
78-
def trino_to_delta_timestamp_mapping(
79-
self,
80-
) -> t.Optional[t.Dict[exp.DataType, exp.DataType]]:
81-
return self._extra_config.get("trino_to_delta_timestamp_mapping")
78+
def timestamp_mapping(self) -> t.Optional[t.Dict[exp.DataType, exp.DataType]]:
79+
return self._extra_config.get("timestamp_mapping")
80+
81+
def _apply_timestamp_mapping(
82+
self, columns_to_types: t.Dict[str, exp.DataType]
83+
) -> t.Tuple[t.Dict[str, exp.DataType], t.Set[str]]:
84+
"""Apply custom timestamp mapping to column types.
85+
86+
Returns:
87+
A tuple of (mapped_columns_to_types, mapped_column_names) where mapped_column_names
88+
contains the names of columns that were found in the mapping.
89+
"""
90+
if not self.timestamp_mapping:
91+
return columns_to_types, set()
92+
93+
result = {}
94+
mapped_columns: t.Set[str] = set()
95+
for column, column_type in columns_to_types.items():
96+
if column_type in self.timestamp_mapping:
97+
result[column] = self.timestamp_mapping[column_type]
98+
mapped_columns.add(column)
99+
else:
100+
result[column] = column_type
101+
return result, mapped_columns
82102

83103
@property
84104
def catalog_support(self) -> CatalogSupport:
@@ -292,8 +312,11 @@ def _build_schema_exp(
292312
is_view: bool = False,
293313
materialized: bool = False,
294314
) -> exp.Schema:
315+
target_columns_to_types, mapped_columns = self._apply_timestamp_mapping(
316+
target_columns_to_types
317+
)
295318
if "delta_lake" in self.get_catalog_type_from_table(table):
296-
target_columns_to_types = self._to_delta_ts(target_columns_to_types)
319+
target_columns_to_types = self._to_delta_ts(target_columns_to_types, mapped_columns)
297320

298321
return super()._build_schema_exp(
299322
table, target_columns_to_types, column_descriptions, expressions, is_view
@@ -319,10 +342,15 @@ def _scd_type_2(
319342
source_columns: t.Optional[t.List[str]] = None,
320343
**kwargs: t.Any,
321344
) -> None:
345+
mapped_columns: t.Set[str] = set()
346+
if target_columns_to_types:
347+
target_columns_to_types, mapped_columns = self._apply_timestamp_mapping(
348+
target_columns_to_types
349+
)
322350
if target_columns_to_types and "delta_lake" in self.get_catalog_type_from_table(
323351
target_table
324352
):
325-
target_columns_to_types = self._to_delta_ts(target_columns_to_types)
353+
target_columns_to_types = self._to_delta_ts(target_columns_to_types, mapped_columns)
326354

327355
return super()._scd_type_2(
328356
target_table,
@@ -352,30 +380,23 @@ def _scd_type_2(
352380
# - `timestamp(3) with time zone` for timezone-aware
353381
# https://trino.io/docs/current/connector/delta-lake.html#delta-lake-to-trino-type-mapping
354382
def _to_delta_ts(
355-
self, columns_to_types: t.Dict[str, exp.DataType]
383+
self,
384+
columns_to_types: t.Dict[str, exp.DataType],
385+
skip_columns: t.Optional[t.Set[str]] = None,
356386
) -> t.Dict[str, exp.DataType]:
357387
ts6 = exp.DataType.build("timestamp(6)")
358388
ts3_tz = exp.DataType.build("timestamp(3) with time zone")
389+
skip = skip_columns or set()
359390

360-
delta_columns_to_types = {}
391+
delta_columns_to_types = {
392+
k: ts6 if k not in skip and v.is_type(exp.DataType.Type.TIMESTAMP) else v
393+
for k, v in columns_to_types.items()
394+
}
361395

362-
for column, column_type in columns_to_types.items():
363-
target_type = column_type
364-
if (
365-
self.trino_to_delta_timestamp_mapping
366-
and target_type in self.trino_to_delta_timestamp_mapping
367-
):
368-
target_type = self.trino_to_delta_timestamp_mapping[target_type]
369-
# Delta lake's TIMESTAMP type is time zone aware
370-
if target_type.is_type(exp.DataType.Type.TIMESTAMP):
371-
target_type = exp.DataType.build("timestamptz")
372-
elif target_type.is_type(exp.DataType.Type.TIMESTAMPNTZ):
373-
target_type = exp.DataType.build("timestamp")
374-
if target_type.is_type(exp.DataType.Type.TIMESTAMP):
375-
target_type = ts6
376-
elif target_type.is_type(exp.DataType.Type.TIMESTAMPTZ):
377-
target_type = ts3_tz
378-
delta_columns_to_types[column] = target_type
396+
delta_columns_to_types = {
397+
k: ts3_tz if k not in skip and v.is_type(exp.DataType.Type.TIMESTAMPTZ) else v
398+
for k, v in delta_columns_to_types.items()
399+
}
379400

380401
return delta_columns_to_types
381402

tests/core/engine_adapter/test_trino.py

Lines changed: 59 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -404,54 +404,55 @@ def test_delta_timestamps(make_mocked_engine_adapter: t.Callable):
404404
}
405405

406406

407-
def test_trino_to_delta_timestamp_mapping():
408-
"""Test that trino_to_delta_timestamp_mapping config property is properly defined and accessible."""
407+
def test_timestamp_mapping():
408+
"""Test that timestamp_mapping config property is properly defined and accessible."""
409409
config = TrinoConnectionConfig(
410410
user="user",
411411
host="host",
412412
catalog="catalog",
413413
)
414414

415415
adapter = config.create_engine_adapter()
416-
assert adapter.trino_to_delta_timestamp_mapping is None
416+
assert adapter.timestamp_mapping is None
417417

418418
config = TrinoConnectionConfig(
419419
user="user",
420420
host="host",
421421
catalog="catalog",
422-
trino_to_delta_timestamp_mapping={
423-
"TIMESTAMP": "TIMESTAMP",
424-
"TIMESTAMP(3)": "TIMESTAMP_NTZ",
422+
timestamp_mapping={
423+
"TIMESTAMP": "TIMESTAMP(6)",
424+
"TIMESTAMP(3)": "TIMESTAMP WITH TIME ZONE",
425425
},
426426
)
427427
adapter = config.create_engine_adapter()
428-
assert adapter.trino_to_delta_timestamp_mapping is not None
429-
assert adapter.trino_to_delta_timestamp_mapping[
430-
exp.DataType.build("TIMESTAMP")
431-
] == exp.DataType.build("TIMESTAMP")
428+
assert adapter.timestamp_mapping is not None
429+
assert adapter.timestamp_mapping[exp.DataType.build("TIMESTAMP")] == exp.DataType.build(
430+
"TIMESTAMP(6)"
431+
)
432432

433433

434434
def test_delta_timestamps_with_custom_mapping(make_mocked_engine_adapter: t.Callable):
435-
"""Test that _to_delta_ts respects custom trino_to_delta_timestamp_mapping."""
436-
# Create config with custom timestamp mapping using valid Delta Lake types
437-
# Delta Lake only supports: TIMESTAMP (TZ-aware) and TIMESTAMP_NTZ (no TZ)
435+
"""Test that _apply_timestamp_mapping + _to_delta_ts respects custom timestamp_mapping."""
436+
# Create config with custom timestamp mapping
437+
# Mapped columns are skipped by _to_delta_ts
438438
config = TrinoConnectionConfig(
439439
user="user",
440440
host="host",
441441
catalog="catalog",
442-
trino_to_delta_timestamp_mapping={
443-
"TIMESTAMP": "TIMESTAMP",
444-
"TIMESTAMP(1)": "TIMESTAMP",
445-
"TIMESTAMP WITH TIME ZONE": "TIMESTAMP",
446-
"TIMESTAMP(1) WITH TIME ZONE": "TIMESTAMP",
442+
timestamp_mapping={
443+
"TIMESTAMP": "TIMESTAMP(3)",
444+
"TIMESTAMP(1)": "TIMESTAMP(3)",
445+
"TIMESTAMP WITH TIME ZONE": "TIMESTAMP(6) WITH TIME ZONE",
446+
"TIMESTAMP(1) WITH TIME ZONE": "TIMESTAMP(6) WITH TIME ZONE",
447447
},
448448
)
449449

450450
adapter = make_mocked_engine_adapter(
451-
TrinoEngineAdapter, trino_to_delta_timestamp_mapping=config.trino_to_delta_timestamp_mapping
451+
TrinoEngineAdapter, timestamp_mapping=config.timestamp_mapping
452452
)
453453

454-
ts3_tz = exp.DataType.build("timestamp(3) with time zone")
454+
ts3 = exp.DataType.build("timestamp(3)")
455+
ts6_tz = exp.DataType.build("timestamp(6) with time zone")
455456

456457
columns_to_types = {
457458
"ts": exp.DataType.build("TIMESTAMP"),
@@ -460,32 +461,37 @@ def test_delta_timestamps_with_custom_mapping(make_mocked_engine_adapter: t.Call
460461
"ts_tz_1": exp.DataType.build("TIMESTAMP(1) WITH TIME ZONE"),
461462
}
462463

463-
delta_columns_to_types = adapter._to_delta_ts(columns_to_types)
464+
# Apply mapping first, then convert to delta types (skipping mapped columns)
465+
mapped_columns_to_types, mapped_column_names = adapter._apply_timestamp_mapping(
466+
columns_to_types
467+
)
468+
delta_columns_to_types = adapter._to_delta_ts(mapped_columns_to_types, mapped_column_names)
464469

465-
# All types mapped to Delta's TIMESTAMP get converted to TIMESTAMPTZ then ts3_tz
470+
# All types were mapped, so _to_delta_ts skips them - they keep their mapped types
466471
assert delta_columns_to_types == {
467-
"ts": ts3_tz,
468-
"ts_1": ts3_tz,
469-
"ts_tz": ts3_tz,
470-
"ts_tz_1": ts3_tz,
472+
"ts": ts3,
473+
"ts_1": ts3,
474+
"ts_tz": ts6_tz,
475+
"ts_tz_1": ts6_tz,
471476
}
472477

473478

474479
def test_delta_timestamps_with_partial_mapping(make_mocked_engine_adapter: t.Callable):
475-
"""Test that _to_delta_ts uses custom mapping for specified types and defaults for others."""
480+
"""Test that _apply_timestamp_mapping + _to_delta_ts uses custom mapping for specified types."""
476481
config = TrinoConnectionConfig(
477482
user="user",
478483
host="host",
479484
catalog="catalog",
480-
trino_to_delta_timestamp_mapping={
481-
"TIMESTAMP": "TIMESTAMP",
485+
timestamp_mapping={
486+
"TIMESTAMP": "TIMESTAMP(3)",
482487
},
483488
)
484489

485490
adapter = make_mocked_engine_adapter(
486-
TrinoEngineAdapter, trino_to_delta_timestamp_mapping=config.trino_to_delta_timestamp_mapping
491+
TrinoEngineAdapter, timestamp_mapping=config.timestamp_mapping
487492
)
488493

494+
ts3 = exp.DataType.build("TIMESTAMP(3)")
489495
ts6 = exp.DataType.build("timestamp(6)")
490496
ts3_tz = exp.DataType.build("timestamp(3) with time zone")
491497

@@ -495,15 +501,19 @@ def test_delta_timestamps_with_partial_mapping(make_mocked_engine_adapter: t.Cal
495501
"ts_tz": exp.DataType.build("TIMESTAMP WITH TIME ZONE"),
496502
}
497503

498-
delta_columns_to_types = adapter._to_delta_ts(columns_to_types)
504+
# Apply mapping first, then convert to delta types (skipping mapped columns)
505+
mapped_columns_to_types, mapped_column_names = adapter._apply_timestamp_mapping(
506+
columns_to_types
507+
)
508+
delta_columns_to_types = adapter._to_delta_ts(mapped_columns_to_types, mapped_column_names)
499509

500-
# TIMESTAMP is in mapping → TIMESTAMP → TIMESTAMPTZ → ts3_tz
501-
# TIMESTAMP(1) is NOT in mapping (exact match), uses default TIMESTAMP → ts6
510+
# TIMESTAMP is in mapping → TIMESTAMP(3), skipped by _to_delta_ts
511+
# TIMESTAMP(1) is NOT in mapping, uses default TIMESTAMP → ts6
502512
# TIMESTAMP WITH TIME ZONE is NOT in mapping, uses default TIMESTAMPTZ → ts3_tz
503513
assert delta_columns_to_types == {
504-
"ts": ts3_tz,
514+
"ts": ts3, # Mapped to TIMESTAMP(3), skipped by _to_delta_ts
505515
"ts_1": ts6, # Not in mapping, uses default
506-
"ts_tz": ts3_tz,
516+
"ts_tz": ts3_tz, # Not in mapping, uses default
507517
}
508518

509519

@@ -861,22 +871,22 @@ def test_insert_overwrite_time_partition_iceberg(
861871

862872

863873
def test_delta_timestamps_with_non_timestamp_columns(make_mocked_engine_adapter: t.Callable):
864-
"""Test that _to_delta_ts handles non-timestamp columns alongside custom mapping."""
874+
"""Test that _apply_timestamp_mapping + _to_delta_ts handles non-timestamp columns."""
865875
config = TrinoConnectionConfig(
866876
user="user",
867877
host="host",
868878
catalog="catalog",
869-
trino_to_delta_timestamp_mapping={
870-
"TIMESTAMP": "TIMESTAMP",
879+
timestamp_mapping={
880+
"TIMESTAMP": "TIMESTAMP(3)",
871881
},
872882
)
873883

874884
adapter = make_mocked_engine_adapter(
875-
TrinoEngineAdapter, trino_to_delta_timestamp_mapping=config.trino_to_delta_timestamp_mapping
885+
TrinoEngineAdapter, timestamp_mapping=config.timestamp_mapping
876886
)
877887

888+
ts3 = exp.DataType.build("TIMESTAMP(3)")
878889
ts6 = exp.DataType.build("timestamp(6)")
879-
ts3_tz = exp.DataType.build("timestamp(3) with time zone")
880890

881891
columns_to_types = {
882892
"ts": exp.DataType.build("TIMESTAMP"),
@@ -886,13 +896,17 @@ def test_delta_timestamps_with_non_timestamp_columns(make_mocked_engine_adapter:
886896
"decimal_col": exp.DataType.build("DECIMAL(10,2)"),
887897
}
888898

889-
delta_columns_to_types = adapter._to_delta_ts(columns_to_types)
899+
# Apply mapping first, then convert to delta types (skipping mapped columns)
900+
mapped_columns_to_types, mapped_column_names = adapter._apply_timestamp_mapping(
901+
columns_to_types
902+
)
903+
delta_columns_to_types = adapter._to_delta_ts(mapped_columns_to_types, mapped_column_names)
890904

891-
# TIMESTAMP is in mapping → TIMESTAMP → TIMESTAMPTZ → ts3_tz
905+
# TIMESTAMP is in mapping → TIMESTAMP(3), skipped by _to_delta_ts
892906
# TIMESTAMP(1) is NOT in mapping (exact match), uses default TIMESTAMP → ts6
893907
# Non-timestamp columns should pass through unchanged
894908
assert delta_columns_to_types == {
895-
"ts": ts3_tz,
909+
"ts": ts3, # Mapped to TIMESTAMP(3), skipped by _to_delta_ts
896910
"ts_1": ts6, # Not in mapping, uses default
897911
"int_col": exp.DataType.build("INT"),
898912
"varchar_col": exp.DataType.build("VARCHAR(100)"),
@@ -906,11 +920,11 @@ def test_delta_timestamps_with_empty_mapping(make_mocked_engine_adapter: t.Calla
906920
user="user",
907921
host="host",
908922
catalog="catalog",
909-
trino_to_delta_timestamp_mapping={},
923+
timestamp_mapping={},
910924
)
911925

912926
adapter = make_mocked_engine_adapter(
913-
TrinoEngineAdapter, trino_to_delta_timestamp_mapping=config.trino_to_delta_timestamp_mapping
927+
TrinoEngineAdapter, timestamp_mapping=config.timestamp_mapping
914928
)
915929

916930
ts6 = exp.DataType.build("timestamp(6)")

0 commit comments

Comments
 (0)