Skip to content
This repository was archived by the owner on Apr 29, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.7.16
3.6.8
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ PyPrind = "*"
nose = "*"

[dev-packages]
ipython = "*"

[requires]
python_version = "3.6"
610 changes: 400 additions & 210 deletions Pipfile.lock

Large diffs are not rendered by default.

26 changes: 26 additions & 0 deletions dataduct/etl/etl_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from ..pipeline import RedshiftDatabase
from ..pipeline import PostgresDatabase
from ..pipeline import MssqlDatabase
from ..pipeline import SnowflakeDatabase
from ..pipeline import S3Node
from ..pipeline import SNSAlarm
from ..pipeline import Schedule
Expand Down Expand Up @@ -186,6 +187,7 @@ def __init__(self, name, frequency='one-time', ec2_resource_config=None,
self._redshift_database = None
self._postgres_database = None
self._mssql_databases = None
self._snowflake_databases = None
self._ec2_resource = None
self._emr_cluster = None
self.create_base_objects()
Expand Down Expand Up @@ -515,6 +517,30 @@ def mssql_databases(self):
) for db in config.mssql.keys() }
return self._mssql_databases

@property
def snowflake_databases(self):
"""Get the snowflake database associated with the pipeline

Note:
This will create the object if it doesn't exist

Returns:
snowflake_database(Object): lazily-constructed snowflake database
"""


if not self._snowflake_databases:
self._snowflake_databases = {db: self.create_pipeline_object(
object_class=SnowflakeDatabase,
username=config.snowflake[db]['USERNAME'],
password=config.snowflake[db]['PASSWORD'],
role=config.snowflake[db]['ROLE'],
account_name=config.snowflake[db]['ACCOUNT_NAME'],
warehouse=config.snowflake[db]['WAREHOUSE'],
jdbc_driver_uri=config.snowflake[db]['JDBC_DRIVER_URI'],
database=config.snowflake[db]['DATABASE_NAME'],
) for db in config.snowflake.keys() }
return self._snowflake_databases

def step(self, step_id):
"""Fetch a single step from the pipeline
Expand Down
2 changes: 2 additions & 0 deletions dataduct/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@
from .sql_activity import SqlActivity
from .emr_configuration import EmrConfiguration
from .property import Property
from .snowflake_database import SnowflakeDatabase
from .snowflake_node import SnowflakeNode
56 changes: 56 additions & 0 deletions dataduct/pipeline/snowflake_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
Pipeline object class for snowflake Jdbc database
see http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-jdbcdatabase.html
"""

from ..config import Config
from .pipeline_object import PipelineObject
from ..utils.exceptions import ETLConfigError
from IPython import embed

config = Config()
if not hasattr(config, 'snowflake'):
raise ETLConfigError('Snowflake credentials missing from config')

class SnowflakeDatabase(PipelineObject):
"""Jdbc resource class
"""

def __init__(self,
id,
account_name=None,
role=None,
database=None,
warehouse=None,
username=None,
jdbc_driver_uri=None,
password=None):
"""Constructor for the Snowflake class

Args:
id(str): id of the object
host(str):
port(str):
database(str):
jdbc_driver_uri(str):
username(str): username for the database
password(str): password for the database
"""

if (None in [ jdbc_driver_uri, username, password]):
raise ETLConfigError('Snowflake credentials missing from config')

connection_string = "jdbc:snowflake://" + account_name + ".snowflakecomputing.com?" + "role=" + role + "&warehouse=" + warehouse + "&db=" + database
jdbc_driver_class = "net.snowflake.client.jdbc.SnowflakeDriver"

kwargs = {
'id': id,
'type': 'JdbcDatabase',
'connectionString': connection_string,
'jdbcDriverClass': jdbc_driver_class,
'jdbcDriverJarUri': jdbc_driver_uri,
'username': username,
'*password': password,
}
super(SnowflakeDatabase, self).__init__(**kwargs)

44 changes: 44 additions & 0 deletions dataduct/pipeline/snowflake_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""
Pipeline object class for SqlNode
"""

from ..utils.exceptions import ETLInputError
from .pipeline_object import PipelineObject
from .schedule import Schedule


class SnowflakeNode(PipelineObject):
"""SQL Data Node class
"""

def __init__(self, id, schedule, host, database, username, password,
select_query, insert_query, table, depends_on=None):
"""Constructor for the SqlNode class

Args:
id(str): id of the object
schedule(Schedule): pipeline schedule
database(SnowflakeDatabase): database name on the RDS host
sql(str): sql to be executed
table(str): table to be read
"""

# Validate inputs
if not isinstance(schedule, Schedule):
raise ETLInputError(
'Input schedule must be of the type Schedule')

if not depends_on:
depends_on = list()

kwargs = {
'id': id,
'type': 'SqlDataNode',
'schedule': schedule,
'database': database,
'selectQuery': select_query,
'insertQuery': insert_query,
'table': table,
'dependsOn': depends_on,
}
super(SnowflakeNode, self).__init__(**kwargs)
13 changes: 12 additions & 1 deletion dataduct/steps/sql_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ class SqlCommandStep(ETLStep):

def __init__(self,
redshift_database,
snowflake_database=None,
script=None,
script_file=None,
script_arguments=None,
queue=None,
sql_script=None,
command=None,
wrap_transaction=True,
snowflake_db_ref=None,
account_name=None,
**kwargs):
"""Constructor for the SqlCommandStep class

Expand Down Expand Up @@ -64,13 +67,18 @@ def __init__(self,
logger.debug('Sql Query:')
logger.debug(sql_script)

if snowflake_database:
db_to_run = snowflake_database
else:
db_to_run = redshift_database

self.create_pipeline_object(
object_class=SqlActivity,
max_retries=self.max_retries,
resource=self.resource,
worker_group=self.worker_group,
schedule=self.schedule,
database=redshift_database,
database=db_to_run,
script_arguments=script_arguments,
depends_on=self.depends_on,
script=script,
Expand All @@ -88,5 +96,8 @@ def arguments_processor(cls, etl, input_args):
input_args = cls.pop_inputs(input_args)
step_args = cls.base_arguments_processor(etl, input_args)
step_args['redshift_database'] = etl.redshift_database
if 'snowflake_db_ref' in step_args:
db_ref = step_args['snowflake_db_ref']
step_args['snowflake_database'] = etl.snowflake_databases[db_ref]

return step_args