From 77be26cf20661003d4dc1e44f13baac29ec91cc8 Mon Sep 17 00:00:00 2001 From: Matt Elgazar Date: Wed, 27 May 2026 14:12:43 -0500 Subject: [PATCH 1/3] add upsert-on-conflict load method --- target_postgres/sinks.py | 53 +++++++++++++++++++ target_postgres/target.py | 21 ++++++++ target_postgres/tests/test_target_postgres.py | 44 +++++++++++++++ 3 files changed, 118 insertions(+) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 3ce84331..060c37e2 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -8,6 +8,7 @@ import sqlalchemy from singer_sdk.sql import SQLSink +from sqlalchemy.dialects import postgresql from sqlalchemy.sql.expression import bindparam from target_postgres.connector import PostgresConnector @@ -37,6 +38,11 @@ def append_only(self, value: bool) -> None: """Set the append_only attribute.""" self._append_only = value + @property + def use_on_conflict_upsert(self) -> bool: + """True when load_method explicitly selects INSERT ... ON CONFLICT.""" + return self.config.get("load_method") == "upsert-on-conflict" + @property def connector(self) -> PostgresConnector: """Return the connector object. @@ -312,6 +318,53 @@ def upsert( select=select_stmt, ) connection.execute(insert_stmt) + elif self.use_on_conflict_upsert: + # INSERT ... ON CONFLICT (pk) DO UPDATE SET ... — single statement. + # Dramatically faster than the MERGE pattern on TimescaleDB hypertables + # with compressed chunks, since only chunks with actual key collisions + # need to be decompressed (vs. every chunk hit by the staging key set). + # Requires a real UNIQUE or PRIMARY KEY index on `join_keys`. + # + # Postgres errors with "ON CONFLICT DO UPDATE command cannot affect + # row a second time" if two inserted rows share the conflict target, + # so dedupe staging on the conflict key first. When _sdc_extracted_at + # is present we keep the newest record per key; otherwise the winner + # is arbitrary (but deterministic per query plan). + pk_cols = [from_table.columns[k] for k in join_keys] + order_cols: list[t.Any] = list(pk_cols) + if "_sdc_extracted_at" in from_table.columns: + order_cols.append( + from_table.columns["_sdc_extracted_at"].desc().nullslast() + ) + + dedup_select = ( + sqlalchemy.select(from_table.columns) # type: ignore[call-overload] + .distinct(*pk_cols) + .order_by(*order_cols) + ) + + insert_stmt = postgresql.insert(to_table).from_select( + names=from_table.columns, # type: ignore[arg-type] + select=dedup_select, + ) + set_columns = { + column_name: insert_stmt.excluded[column_name] + for column_name in self.schema["properties"] + if column_name not in join_keys + } + if set_columns: + connection.execute( + insert_stmt.on_conflict_do_update( + index_elements=list(join_keys), + set_=set_columns, + ) + ) + else: + connection.execute( + insert_stmt.on_conflict_do_nothing( + index_elements=list(join_keys), + ) + ) else: join_predicates = [] to_table_key: sqlalchemy.Column[t.Any] diff --git a/target_postgres/target.py b/target_postgres/target.py index e34b0434..cc400f42 100644 --- a/target_postgres/target.py +++ b/target_postgres/target.py @@ -218,6 +218,27 @@ def __init__( + "for more information." ), ), + th.Property( + "load_method", + th.StringType, + default="upsert", + allowed_values=[ + "append-only", + "upsert", + "overwrite", + "upsert-on-conflict", + ], + description=( + "How records are written to the target table. `upsert` performs " + "MERGE-style UPDATE+INSERT via a staging table. `upsert-on-conflict` " + "uses a single `INSERT ... ON CONFLICT (pk) DO UPDATE SET ...` " + "statement, which is dramatically faster on TimescaleDB hypertables " + "with compressed chunks (only chunks with actual conflicts are " + "decompressed). `upsert-on-conflict` requires a real UNIQUE or " + "PRIMARY KEY index on the stream's key_properties. `append-only` " + "always inserts. `overwrite` truncates then inserts." + ), + ), th.Property( "interpret_content_encoding", th.BooleanType, diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index 4818f4f5..bb548c26 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -419,6 +419,50 @@ def test_duplicate_records(postgres_target): verify_data(postgres_target, "test_duplicate_records", 2, "id", row) +def test_upsert_on_conflict_relational_data(postgres_config): + """Final state under load_method='upsert-on-conflict' matches MERGE upsert. + + Mirrors test_relational_data but with the new dispatch path. If the + INSERT ... ON CONFLICT ... DO UPDATE SET ... statement is wired correctly, + the post-upsert row set must equal the MERGE path's row set. + """ + config = {**postgres_config, "load_method": "upsert-on-conflict"} + target = TargetPostgres(config=config) + + singer_file_to_target("user_location_data.singer", target) + singer_file_to_target("user_location_upsert_data.singer", target) + + users = [ + {"id": 1, "name": "Johny"}, + {"id": 2, "name": "George"}, + {"id": 3, "name": "Jacob"}, + {"id": 4, "name": "Josh"}, + {"id": 5, "name": "Jim"}, + {"id": 8, "name": "Thomas"}, + {"id": 12, "name": "Paul"}, + {"id": 13, "name": "Mary"}, + ] + verify_data(target, "test_users", 8, "id", users) + + +def test_upsert_on_conflict_within_batch_dedup(postgres_config): + """`load_method='upsert-on-conflict'` dedupes staging via DISTINCT ON. + + duplicate_records.singer emits 5 records with three rows sharing id=1 + (metric 1, 10, 100) and two rows sharing id=2 (metric 2, 20). Without the + DISTINCT ON pre-dedupe step, Postgres would raise + "ON CONFLICT DO UPDATE command cannot affect row a second time" because + multiple inserted rows would target the same conflict key. The last record + per key (highest _sdc_extracted_at) must win. + """ + config = {**postgres_config, "load_method": "upsert-on-conflict"} + target = TargetPostgres(config=config) + + singer_file_to_target("duplicate_records.singer", target) + row = {"id": 1, "metric": 100} + verify_data(target, "test_duplicate_records", 2, "id", row) + + def test_array_data(postgres_target): file_name = "array_data.singer" singer_file_to_target(file_name, postgres_target) From b060ffe100c677c3f17cd16e842892d71d315a33 Mon Sep 17 00:00:00 2001 From: Matt Elgazar Date: Fri, 29 May 2026 11:16:56 -0500 Subject: [PATCH 2/3] rm default upsert and cleanup comments --- target_postgres/sinks.py | 13 ++----------- target_postgres/target.py | 1 - target_postgres/tests/test_target_postgres.py | 10 +--------- 3 files changed, 3 insertions(+), 21 deletions(-) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 060c37e2..4790465e 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -319,17 +319,8 @@ def upsert( ) connection.execute(insert_stmt) elif self.use_on_conflict_upsert: - # INSERT ... ON CONFLICT (pk) DO UPDATE SET ... — single statement. - # Dramatically faster than the MERGE pattern on TimescaleDB hypertables - # with compressed chunks, since only chunks with actual key collisions - # need to be decompressed (vs. every chunk hit by the staging key set). - # Requires a real UNIQUE or PRIMARY KEY index on `join_keys`. - # - # Postgres errors with "ON CONFLICT DO UPDATE command cannot affect - # row a second time" if two inserted rows share the conflict target, - # so dedupe staging on the conflict key first. When _sdc_extracted_at - # is present we keep the newest record per key; otherwise the winner - # is arbitrary (but deterministic per query plan). + # Single-statement upsert. Dedupe staging on the conflict key first: + # ON CONFLICT errors if two inserted rows share the target key. pk_cols = [from_table.columns[k] for k in join_keys] order_cols: list[t.Any] = list(pk_cols) if "_sdc_extracted_at" in from_table.columns: diff --git a/target_postgres/target.py b/target_postgres/target.py index cc400f42..3bf67a7c 100644 --- a/target_postgres/target.py +++ b/target_postgres/target.py @@ -221,7 +221,6 @@ def __init__( th.Property( "load_method", th.StringType, - default="upsert", allowed_values=[ "append-only", "upsert", diff --git a/target_postgres/tests/test_target_postgres.py b/target_postgres/tests/test_target_postgres.py index bb548c26..8cf6d87e 100644 --- a/target_postgres/tests/test_target_postgres.py +++ b/target_postgres/tests/test_target_postgres.py @@ -446,15 +446,7 @@ def test_upsert_on_conflict_relational_data(postgres_config): def test_upsert_on_conflict_within_batch_dedup(postgres_config): - """`load_method='upsert-on-conflict'` dedupes staging via DISTINCT ON. - - duplicate_records.singer emits 5 records with three rows sharing id=1 - (metric 1, 10, 100) and two rows sharing id=2 (metric 2, 20). Without the - DISTINCT ON pre-dedupe step, Postgres would raise - "ON CONFLICT DO UPDATE command cannot affect row a second time" because - multiple inserted rows would target the same conflict key. The last record - per key (highest _sdc_extracted_at) must win. - """ + """upsert-on-conflict dedupes within-batch duplicate keys before insert.""" config = {**postgres_config, "load_method": "upsert-on-conflict"} target = TargetPostgres(config=config) From 09d038814f5957254c7f04c29863a52a5d5fce49 Mon Sep 17 00:00:00 2001 From: Matt Elgazar Date: Fri, 29 May 2026 17:09:39 -0500 Subject: [PATCH 3/3] fix tests --- target_postgres/sinks.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/target_postgres/sinks.py b/target_postgres/sinks.py index 4790465e..f25c570c 100644 --- a/target_postgres/sinks.py +++ b/target_postgres/sinks.py @@ -18,6 +18,14 @@ from sqlalchemy.sql import Executable +# SQLAlchemy 2.1 deprecated ``select(...).distinct(*exprs)`` for rendering a +# PostgreSQL ``DISTINCT ON`` clause in favour of the ``postgresql.distinct_on`` +# syntax extension applied via ``Select.ext()``. Neither the extension nor +# ``Select.ext()`` exist on SQLAlchemy 2.0.x, which this project still supports +# (``sqlalchemy~=2.0``), so detect the available API once at import time. +_HAS_DISTINCT_ON_EXT = hasattr(postgresql, "distinct_on") + + class PostgresSink(SQLSink[PostgresConnector]): """Postgres target sink class.""" @@ -328,11 +336,15 @@ def upsert( from_table.columns["_sdc_extracted_at"].desc().nullslast() ) - dedup_select = ( - sqlalchemy.select(from_table.columns) # type: ignore[call-overload] - .distinct(*pk_cols) - .order_by(*order_cols) - ) + base_select = sqlalchemy.select(from_table.columns) # type: ignore[call-overload] + if _HAS_DISTINCT_ON_EXT: + # SQLAlchemy 2.1+: DISTINCT ON via the syntax-extension API. + dedup_select = base_select.ext( + postgresql.distinct_on(*pk_cols) + ).order_by(*order_cols) + else: + # SQLAlchemy 2.0.x: legacy expression-based DISTINCT ON. + dedup_select = base_select.distinct(*pk_cols).order_by(*order_cols) insert_stmt = postgresql.insert(to_table).from_select( names=from_table.columns, # type: ignore[arg-type]