Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import sqlalchemy
from singer_sdk.sql import SQLSink
from sqlalchemy.dialects import postgresql
from sqlalchemy.sql.expression import bindparam

from target_postgres.connector import PostgresConnector
Expand All @@ -17,6 +18,14 @@
from sqlalchemy.sql import Executable


# SQLAlchemy 2.1 deprecated ``select(...).distinct(*exprs)`` for rendering a
# PostgreSQL ``DISTINCT ON`` clause in favour of the ``postgresql.distinct_on``
# syntax extension applied via ``Select.ext()``. Neither the extension nor
# ``Select.ext()`` exist on SQLAlchemy 2.0.x, which this project still supports
# (``sqlalchemy~=2.0``), so detect the available API once at import time.
_HAS_DISTINCT_ON_EXT = hasattr(postgresql, "distinct_on")


class PostgresSink(SQLSink[PostgresConnector]):
"""Postgres target sink class."""

Expand All @@ -37,6 +46,11 @@ def append_only(self, value: bool) -> None:
"""Set the append_only attribute."""
self._append_only = value

@property
def use_on_conflict_upsert(self) -> bool:
"""True when load_method explicitly selects INSERT ... ON CONFLICT."""
return self.config.get("load_method") == "upsert-on-conflict"

@property
def connector(self) -> PostgresConnector:
"""Return the connector object.
Expand Down Expand Up @@ -312,6 +326,48 @@ def upsert(
select=select_stmt,
)
connection.execute(insert_stmt)
elif self.use_on_conflict_upsert:
# Single-statement upsert. Dedupe staging on the conflict key first:
# ON CONFLICT errors if two inserted rows share the target key.
pk_cols = [from_table.columns[k] for k in join_keys]
order_cols: list[t.Any] = list(pk_cols)
if "_sdc_extracted_at" in from_table.columns:
order_cols.append(
from_table.columns["_sdc_extracted_at"].desc().nullslast()
)

base_select = sqlalchemy.select(from_table.columns) # type: ignore[call-overload]
if _HAS_DISTINCT_ON_EXT:
# SQLAlchemy 2.1+: DISTINCT ON via the syntax-extension API.
dedup_select = base_select.ext(
postgresql.distinct_on(*pk_cols)
).order_by(*order_cols)
else:
# SQLAlchemy 2.0.x: legacy expression-based DISTINCT ON.
dedup_select = base_select.distinct(*pk_cols).order_by(*order_cols)

insert_stmt = postgresql.insert(to_table).from_select(
names=from_table.columns, # type: ignore[arg-type]
select=dedup_select,
)
set_columns = {
column_name: insert_stmt.excluded[column_name]
for column_name in self.schema["properties"]
if column_name not in join_keys
}
if set_columns:
connection.execute(
insert_stmt.on_conflict_do_update(
index_elements=list(join_keys),
set_=set_columns,
)
)
else:
connection.execute(
insert_stmt.on_conflict_do_nothing(
index_elements=list(join_keys),
)
)
else:
join_predicates = []
to_table_key: sqlalchemy.Column[t.Any]
Expand Down
20 changes: 20 additions & 0 deletions target_postgres/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,26 @@ def __init__(
+ "for more information."
),
),
th.Property(
"load_method",
th.StringType,
allowed_values=[
"append-only",
"upsert",
"overwrite",
"upsert-on-conflict",
],
description=(
"How records are written to the target table. `upsert` performs "
"MERGE-style UPDATE+INSERT via a staging table. `upsert-on-conflict` "
"uses a single `INSERT ... ON CONFLICT (pk) DO UPDATE SET ...` "
"statement, which is dramatically faster on TimescaleDB hypertables "
"with compressed chunks (only chunks with actual conflicts are "
"decompressed). `upsert-on-conflict` requires a real UNIQUE or "
"PRIMARY KEY index on the stream's key_properties. `append-only` "
"always inserts. `overwrite` truncates then inserts."
),
),
th.Property(
"interpret_content_encoding",
th.BooleanType,
Expand Down
36 changes: 36 additions & 0 deletions target_postgres/tests/test_target_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,42 @@ def test_duplicate_records(postgres_target):
verify_data(postgres_target, "test_duplicate_records", 2, "id", row)


def test_upsert_on_conflict_relational_data(postgres_config):
"""Final state under load_method='upsert-on-conflict' matches MERGE upsert.

Mirrors test_relational_data but with the new dispatch path. If the
INSERT ... ON CONFLICT ... DO UPDATE SET ... statement is wired correctly,
the post-upsert row set must equal the MERGE path's row set.
"""
config = {**postgres_config, "load_method": "upsert-on-conflict"}
target = TargetPostgres(config=config)

singer_file_to_target("user_location_data.singer", target)
singer_file_to_target("user_location_upsert_data.singer", target)

users = [
{"id": 1, "name": "Johny"},
{"id": 2, "name": "George"},
{"id": 3, "name": "Jacob"},
{"id": 4, "name": "Josh"},
{"id": 5, "name": "Jim"},
{"id": 8, "name": "Thomas"},
{"id": 12, "name": "Paul"},
{"id": 13, "name": "Mary"},
]
verify_data(target, "test_users", 8, "id", users)


def test_upsert_on_conflict_within_batch_dedup(postgres_config):
"""upsert-on-conflict dedupes within-batch duplicate keys before insert."""
config = {**postgres_config, "load_method": "upsert-on-conflict"}
target = TargetPostgres(config=config)

singer_file_to_target("duplicate_records.singer", target)
row = {"id": 1, "metric": 100}
verify_data(target, "test_duplicate_records", 2, "id", row)


def test_array_data(postgres_target):
file_name = "array_data.singer"
singer_file_to_target(file_name, postgres_target)
Expand Down
Loading