From 57187c6d1da55f0b9b0f50255e079dc85eb99e6a Mon Sep 17 00:00:00 2001 From: Eduardo Bizarro Date: Mon, 18 Feb 2019 13:45:16 -0300 Subject: [PATCH 1/2] Composite primary key --- operators/s3_to_redshift_operator.py | 61 +++++++++++++++++++--------- 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/operators/s3_to_redshift_operator.py b/operators/s3_to_redshift_operator.py index 194260b..ab38032 100644 --- a/operators/s3_to_redshift_operator.py +++ b/operators/s3_to_redshift_operator.py @@ -58,7 +58,8 @@ class S3ToRedshiftOperator(BaseOperator): :param incremental_key: *(optional)* The incremental key to compare new data against the destination table with. Only required if using a load_type of - "upsert". + "upsert". This may be either + a list or a string. :type incremental_key: string :param foreign_key: *(optional)* This specifies any foreign_keys in the table and which corresponding table @@ -239,20 +240,36 @@ def getS3Conn(): # and the primary key is the same. # (e.g. Source: {"id": 1, "updated_at": "2017-01-02 00:00:00"}; # Destination: {"id": 1, "updated_at": "2017-01-01 00:00:00"}) + if isinstance(self.primary_key, list): + where_pk = "" + for i, item in enumerate(self.primary_key): + where_pk += """ + "{rs_schema}"."{rs_table}"."{item}" = "{rs_schema}"."{rs_table}{rs_suffix}"."{item}" + """.format( + rs_schema=self.redshift_schema, + rs_table=self.table, + rs_suffix=self.temp_suffix, + item=item, + ) + if i != (len(self.primary_key) - 1): + where_pk += " AND " + else: + where_pk = '"{rs_schema}"."{rs_table}"."{rs_pk}" = "{rs_schema}"."{rs_table}{rs_suffix}"."{rs_pk}"' - delete_sql = \ - ''' + delete_sql = """ DELETE FROM "{rs_schema}"."{rs_table}" USING "{rs_schema}"."{rs_table}{rs_suffix}" - WHERE "{rs_schema}"."{rs_table}"."{rs_pk}" = - "{rs_schema}"."{rs_table}{rs_suffix}"."{rs_pk}" + WHERE {where_pk} AND "{rs_schema}"."{rs_table}{rs_suffix}"."{rs_ik}" >= "{rs_schema}"."{rs_table}"."{rs_ik}" - '''.format(rs_schema=self.redshift_schema, - rs_table=self.table, - rs_pk=self.primary_key, - rs_suffix=self.temp_suffix, - rs_ik=self.incremental_key) + """.format( + rs_schema=self.redshift_schema, + rs_table=self.table, + rs_pk=self.primary_key, + rs_suffix=self.temp_suffix, + rs_ik=self.incremental_key, + where_pk=where_pk, + ) # Delete records from the source table where the incremental_key # is greater than or equal to the incremental_key of the destination @@ -264,19 +281,20 @@ def getS3Conn(): # (e.g. Source: {"id": 1, "updated_at": "2017-01-01 00:00:00"}; # Destination: {"id": 1, "updated_at": "2017-01-02 00:00:00"}) - delete_confirm_sql = \ - ''' + delete_confirm_sql = """ DELETE FROM "{rs_schema}"."{rs_table}{rs_suffix}" USING "{rs_schema}"."{rs_table}" - WHERE "{rs_schema}"."{rs_table}{rs_suffix}"."{rs_pk}" = - "{rs_schema}"."{rs_table}"."{rs_pk}" + WHERE {where_pk} AND "{rs_schema}"."{rs_table}"."{rs_ik}" >= "{rs_schema}"."{rs_table}{rs_suffix}"."{rs_ik}" - '''.format(rs_schema=self.redshift_schema, - rs_table=self.table, - rs_pk=self.primary_key, - rs_suffix=self.temp_suffix, - rs_ik=self.incremental_key) + """.format( + rs_schema=self.redshift_schema, + rs_table=self.table, + rs_pk=self.primary_key, + rs_suffix=self.temp_suffix, + rs_ik=self.incremental_key, + where_pk=where_pk, + ) append_sql = \ ''' @@ -371,7 +389,10 @@ def create_if_not_exists(self, schema, pg_hook, temp=False): sk = '' if self.primary_key: - pk = ', primary key("{0}")'.format(self.primary_key) + if isinstance(self.primary_key, str): + pk = ", primary key({0})".format(self.primary_key) + elif isinstance(self.primary_key, list): + pk = ", primary key({0})".format(", ".join(self.primary_key)) if self.foreign_key: if isinstance(self.foreign_key, list): From 77580a0958613d0e8e8fb20e0606ff4da25535ca Mon Sep 17 00:00:00 2001 From: Eduardo Bizarro Date: Thu, 28 Feb 2019 00:41:45 -0300 Subject: [PATCH 2/2] Fix condition when primary_key is a string --- operators/s3_to_redshift_operator.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/operators/s3_to_redshift_operator.py b/operators/s3_to_redshift_operator.py index ab38032..a9ca31f 100644 --- a/operators/s3_to_redshift_operator.py +++ b/operators/s3_to_redshift_operator.py @@ -254,7 +254,13 @@ def getS3Conn(): if i != (len(self.primary_key) - 1): where_pk += " AND " else: - where_pk = '"{rs_schema}"."{rs_table}"."{rs_pk}" = "{rs_schema}"."{rs_table}{rs_suffix}"."{rs_pk}"' + where_pk = '"{rs_schema}"."{rs_table}"."{rs_pk}" = "{rs_schema}"."{rs_table}{rs_suffix}"."{rs_pk}"'.format( + rs_schema=self.redshift_schema, + rs_table=self.table, + rs_pk=self.primary_key, + rs_suffix=self.temp_suffix, + rs_ik=self.incremental_key, + ) delete_sql = """ DELETE FROM "{rs_schema}"."{rs_table}"