diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 3ce84331..f25c570c 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -8,6 +8,7 @@ import sqlalchemy from singer_sdk.sql import SQLSink +from sqlalchemy.dialects import postgresql from sqlalchemy.sql.expression import bindparam from target_postgres.connector import PostgresConnector @@ -17,6 +18,14 @@ from sqlalchemy.sql import Executable +# SQLAlchemy 2.1 deprecated ``select(...).distinct(*exprs)`` for rendering a +# PostgreSQL ``DISTINCT ON`` clause in favour of the ``postgresql.distinct_on`` +# syntax extension applied via ``Select.ext()``. Neither the extension nor +# ``Select.ext()`` exist on SQLAlchemy 2.0.x, which this project still supports +# (``sqlalchemy~=2.0``), so detect the available API once at import time. +_HAS_DISTINCT_ON_EXT = hasattr(postgresql, "distinct_on") + + class PostgresSink(SQLSink[PostgresConnector]): """Postgres target sink class.""" @@ -37,6 +46,11 @@ def append_only(self, value: bool) -> None: """Set the append_only attribute.""" self._append_only = value + @property + def use_on_conflict_upsert(self) -> bool: + """True when load_method explicitly selects INSERT ... ON CONFLICT.""" + return self.config.get("load_method") == "upsert-on-conflict" + @property def connector(self) -> PostgresConnector: """Return the connector object. @@ -312,6 +326,48 @@ def upsert( select=select_stmt, ) connection.execute(insert_stmt) + elif self.use_on_conflict_upsert: + # Single-statement upsert. Dedupe staging on the conflict key first: + # ON CONFLICT errors if two inserted rows share the target key. + pk_cols = [from_table.columns[k] for k in join_keys] + order_cols: list[t.Any] = list(pk_cols) + if "_sdc_extracted_at" in from_table.columns: + order_cols.append( + from_table.columns["_sdc_extracted_at"].desc().nullslast() + ) + + base_select = sqlalchemy.select(from_table.columns) # type: ignore[call-overload] + if _HAS_DISTINCT_ON_EXT: + # SQLAlchemy 2.1+: DISTINCT ON via the syntax-extension API. + dedup_select = base_select.ext( + postgresql.distinct_on(*pk_cols) + ).order_by(*order_cols) + else: + # SQLAlchemy 2.0.x: legacy expression-based DISTINCT ON. + dedup_select = base_select.distinct(*pk_cols).order_by(*order_cols) + + insert_stmt = postgresql.insert(to_table).from_select( + names=from_table.columns, # type: ignore[arg-type] + select=dedup_select, + ) + set_columns = { + column_name: insert_stmt.excluded[column_name] + for column_name in self.schema["properties"] + if column_name not in join_keys + } + if set_columns: + connection.execute( + insert_stmt.on_conflict_do_update( + index_elements=list(join_keys), + set_=set_columns, + ) + ) + else: + connection.execute( + insert_stmt.on_conflict_do_nothing( + index_elements=list(join_keys), + ) + ) else: join_predicates = [] to_table_key: sqlalchemy.Column[t.Any] diff --git a/target_postgres/target.py b/target_postgres/target.py index e34b0434..3bf67a7c 100644 --- a/target_postgres/target.py +++ b/target_postgres/target.py @@ -218,6 +218,26 @@ def __init__( + "for more information." ), ), + th.Property( + "load_method", + th.StringType, + allowed_values=[ + "append-only", + "upsert", + "overwrite", + "upsert-on-conflict", + ], + description=( + "How records are written to the target table. `upsert` performs " + "MERGE-style UPDATE+INSERT via a staging table. `upsert-on-conflict` " + "uses a single `INSERT ... ON CONFLICT (pk) DO UPDATE SET ...` " + "statement, which is dramatically faster on TimescaleDB hypertables " + "with compressed chunks (only chunks with actual conflicts are " + "decompressed). `upsert-on-conflict` requires a real UNIQUE or " + "PRIMARY KEY index on the stream's key_properties. `append-only` " + "always inserts. `overwrite` truncates then inserts." + ), + ), th.Property( "interpret_content_encoding", th.BooleanType, diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index 4818f4f5..8cf6d87e 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -419,6 +419,42 @@ def test_duplicate_records(postgres_target): verify_data(postgres_target, "test_duplicate_records", 2, "id", row) +def test_upsert_on_conflict_relational_data(postgres_config): + """Final state under load_method='upsert-on-conflict' matches MERGE upsert. + + Mirrors test_relational_data but with the new dispatch path. If the + INSERT ... ON CONFLICT ... DO UPDATE SET ... statement is wired correctly, + the post-upsert row set must equal the MERGE path's row set. + """ + config = {**postgres_config, "load_method": "upsert-on-conflict"} + target = TargetPostgres(config=config) + + singer_file_to_target("user_location_data.singer", target) + singer_file_to_target("user_location_upsert_data.singer", target) + + users = [ + {"id": 1, "name": "Johny"}, + {"id": 2, "name": "George"}, + {"id": 3, "name": "Jacob"}, + {"id": 4, "name": "Josh"}, + {"id": 5, "name": "Jim"}, + {"id": 8, "name": "Thomas"}, + {"id": 12, "name": "Paul"}, + {"id": 13, "name": "Mary"}, + ] + verify_data(target, "test_users", 8, "id", users) + + +def test_upsert_on_conflict_within_batch_dedup(postgres_config): + """upsert-on-conflict dedupes within-batch duplicate keys before insert.""" + config = {**postgres_config, "load_method": "upsert-on-conflict"} + target = TargetPostgres(config=config) + + singer_file_to_target("duplicate_records.singer", target) + row = {"id": 1, "metric": 100} + verify_data(target, "test_duplicate_records", 2, "id", row) + + def test_array_data(postgres_target): file_name = "array_data.singer" singer_file_to_target(file_name, postgres_target)