From 6a8a156eb2dc977cf22dc60cc10e02280d5e6a6a Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Thu, 15 Sep 2022 17:27:55 -0600 Subject: [PATCH 1/9] draft version --- dbtc/cli.py | 26 +++++++++ dbtc/client/cloud/base.py | 107 +++++++++++++++++++++++++++++++++++++- 2 files changed, 131 insertions(+), 2 deletions(-) diff --git a/dbtc/cli.py b/dbtc/cli.py index 465cb5e..d4ee3e7 100644 --- a/dbtc/cli.py +++ b/dbtc/cli.py @@ -967,6 +967,32 @@ def test_connection( json.loads(payload), ) +@app.command() +def trigger_job_for_ci( + ctx: typer.Context, + account_id: int = ACCOUNT_ID, + job_id: int = JOB_ID, + payload: str = PAYLOAD, + should_poll: bool = typer.Option( + True, + help='Poll until job completion (status is one of success, failure, or ' + 'cancelled)', + ), + poll_interval: int = typer.Option( + 10, '--poll-interval', help='Number of seconds to wait in between polling.' + ) +): + """Trigger job to run.""" + _dbt_cloud_request( + ctx, + 'trigger_job_for_ci', + account_id, + job_id, + json.loads(payload), + should_poll=should_poll, + poll_interval=poll_interval, + ) + @app.command() def trigger_job( diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index 4475569..8e72676 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -1,10 +1,12 @@ # stdlib import argparse +from datetime import datetime import enum import shlex import time from functools import partial, wraps from typing import Dict, Iterable, List +import uuid # third party import requests @@ -786,7 +788,7 @@ def list_invited_users(self, account_id: int) -> Dict: @v2 def list_jobs( - self, account_id: int, *, order_by: str = None, project_id: int = None + self, account_id: int, *, order_by: str = None, project_id: int = None, ) -> Dict: """List jobs in an account or specific project. @@ -795,7 +797,7 @@ def list_jobs( order_by (str, optional): Field to order the result by. Use - to indicate reverse order. project_id (int, optional): Numeric ID of the project containing jobs - """ + """ return self._simple_request( f'accounts/{account_id}/jobs/', params={'order_by': order_by, 'project_id': project_id}, @@ -992,6 +994,107 @@ def test_connection(self, account_id: int, payload: Dict) -> Dict: f'accounts/{account_id}/connections/test/', method='post', json=payload ) + @v2 + def trigger_job_for_ci( + self, + account_id: int, + job_id: int, + payload: Dict, + *, + should_poll: bool = True, + poll_interval: int = 10, + ): + + """Trigger a job by its ID - designed to enable running CI jobs in parallel + + Args: + account_id (int): Numeric ID of the account to retrieve + job_id (int): Numeric ID of the job to trigger + payload (dict): Payload required for post request + should_poll (bool, optional): Poll until completion if `True`, completion + is one of success, failure, or cancelled + poll_interval (int, optional): Number of seconds to wait in between + polling + name_like (str, optional): Job prefix to identify the CI job "pool" + """ + #TODO: this should be abstracted somewhere + def run_status_formatted(run: Dict, time: float) -> str: + """Format a string indicating status of job. + Args: + run (dict): Dictionary representation of a Run + time (float): Elapsed time since job triggered + """ + status = JobRunStatus(run['data']['status']).name + url = run['data']['href'] + return ( + f'Status: "{status.capitalize()}", Elapsed time: {round(time, 0)}s' + f', View here: {url}' + ) + + most_recent_job_run = self.list_runs( + account_id=account_id, + job_definition_id=job_id, + limit=1, + order_by='-id' + )['data'][0] + + job_run_status = most_recent_job_run['status_humanized'] + self.console.log(f'Status for most recent run of job {job_id} is {job_run_status}.') + + if job_run_status not in ['Queued', 'Starting', 'Running']: + self.console.log(f'Triggering base CI job {job_id}.') + + else: + self.console.log(f'Replicating base CI job.') + job_definition = self.get_job( + account_id=account_id, + job_id=job_id + )['data'] + + job_definition['name'] = job_definition['name'] + '-' + datetime.now().strftime('%Y-%m-%d--%H:%M:%S') + job_definition['id'] = None + job_definition['dbt_version'] = None + keys = ['id', 'name', 'execution', 'account_id', 'project_id', 'environment_id', + 'dbt_version', 'execute_steps', 'state', 'deferring_job_definition_id', + 'triggers', 'settings', 'schedule'] + job_definition = {k:v for k,v in job_definition.items() if k in keys} + print(job_definition) + new_job = self.create_job( + account_id=account_id, + project_id=job_definition['project_id'], + payload=job_definition + ) + + print(new_job) + + run = self._simple_request( + f'accounts/{account_id}/jobs/{job_id}/run/', + method='post', + json=payload, + ) + if not run['status']['is_success']: + self.console.log(f'Run NOT triggered for job {job_id}. See run response.') + return run + + self.console.log(run_status_formatted(run, 0)) + if should_poll: + start = time.time() + run_id = run['data']['id'] + while True: + time.sleep(poll_interval) + run = self.get_run(account_id, run_id) + status = run['data']['status'] + self.console.log(run_status_formatted(run, time.time() - start)) + if status in [ + JobRunStatus.SUCCESS, + JobRunStatus.CANCELLED, + JobRunStatus.ERROR, + ]: + break + + return run + + @v2 def trigger_job( self, From 680ffc0701509f08964e456e5b0e9895ccbc33ae Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Thu, 22 Sep 2022 16:08:30 -0600 Subject: [PATCH 2/9] demo-able version --- dbtc/client/cloud/base.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index aef5c01..b5602f0 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -1013,6 +1013,7 @@ def trigger_job_for_ci( *, should_poll: bool = True, poll_interval: int = 10, + delete_replicated_job = True, ): """Trigger a job by its ID - designed to enable running CI jobs in parallel @@ -1025,7 +1026,9 @@ def trigger_job_for_ci( is one of success, failure, or cancelled poll_interval (int, optional): Number of seconds to wait in between polling - name_like (str, optional): Job prefix to identify the CI job "pool" + delete_replicated_job (bool, optional): True is the default, which will + automatically clean up any jobs replicated due to the base CI job + being busy at PR time. """ #TODO: this should be abstracted somewhere def run_status_formatted(run: Dict, time: float) -> str: @@ -1053,6 +1056,7 @@ def run_status_formatted(run: Dict, time: float) -> str: if job_run_status not in ['Queued', 'Starting', 'Running']: self.console.log(f'Triggering base CI job {job_id}.') + delete_replicated_job = False # IMPORTANT: this prevents removing the base CI job else: self.console.log(f'Replicating base CI job.') @@ -1061,21 +1065,22 @@ def run_status_formatted(run: Dict, time: float) -> str: job_id=job_id )['data'] + # TODO: this is gross - need a more ergnomic way to express these job_definition['name'] = job_definition['name'] + '-' + datetime.now().strftime('%Y-%m-%d--%H:%M:%S') job_definition['id'] = None job_definition['dbt_version'] = None + + # the dbt Cloud API will fail job create requests that contain extra keys. This is the minimal set required to + # create a new job keys = ['id', 'name', 'execution', 'account_id', 'project_id', 'environment_id', 'dbt_version', 'execute_steps', 'state', 'deferring_job_definition_id', 'triggers', 'settings', 'schedule'] job_definition = {k:v for k,v in job_definition.items() if k in keys} - print(job_definition) new_job = self.create_job( account_id=account_id, - project_id=job_definition['project_id'], payload=job_definition - ) - - print(new_job) + )['data'] + job_id = new_job['id'] run = self._simple_request( f'accounts/{account_id}/jobs/{job_id}/run/', @@ -1102,6 +1107,9 @@ def run_status_formatted(run: Dict, time: float) -> str: ]: break + if delete_replicated_job: + self.delete_job(account_id=account_id, job_id=job_id) + return run From 15bf9e18d8530c177c2a5252ff3e60ca2858d71e Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Fri, 23 Sep 2022 09:50:47 -0600 Subject: [PATCH 3/9] create configs directory --- dbtc/client/cloud/configs/__init__.py | 0 .../cloud/configs/cloud_api_request_templates.py | 0 dbtc/client/cloud/configs/dbt_cli_configs.py | 15 +++++++++++++++ 3 files changed, 15 insertions(+) create mode 100644 dbtc/client/cloud/configs/__init__.py create mode 100644 dbtc/client/cloud/configs/cloud_api_request_templates.py create mode 100644 dbtc/client/cloud/configs/dbt_cli_configs.py diff --git a/dbtc/client/cloud/configs/__init__.py b/dbtc/client/cloud/configs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dbtc/client/cloud/configs/cloud_api_request_templates.py b/dbtc/client/cloud/configs/cloud_api_request_templates.py new file mode 100644 index 0000000..e69de29 diff --git a/dbtc/client/cloud/configs/dbt_cli_configs.py b/dbtc/client/cloud/configs/dbt_cli_configs.py new file mode 100644 index 0000000..370432f --- /dev/null +++ b/dbtc/client/cloud/configs/dbt_cli_configs.py @@ -0,0 +1,15 @@ +RUN_COMMANDS = ['build', 'run', 'test', 'seed', 'snapshot'] +GLOBAL_CLI_ARGS = { + 'warn_error': {'flags': ('--warn-error',), 'action': 'store_true'}, + 'use_experimental_parser': { + 'flags': ('--use-experimental-parser',), + 'action': 'store_true', + }, +} +SUB_COMMAND_CLI_ARGS = { + 'vars': {'flags': ('--vars',)}, + 'args': {'flags': ('--args',)}, + 'fail_fast': {'flags': ('-x', '--fail-fast'), 'action': 'store_true'}, + 'full_refresh': {'flags': ('--full-refresh',), 'action': 'store_true'}, + 'store_failures': {'flags': ('--store-failures',), 'action': 'store_true'}, +} From 917b4b2ff7f1a1f1d9425bcf65fd8393d98305b6 Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Fri, 23 Sep 2022 12:06:19 -0600 Subject: [PATCH 4/9] sketch configs idea --- dbtc/client/cloud/base.py | 43 +++++++------------ .../configs/cloud_api_request_templates.py | 0 dbtc/client/cloud/configs/dbt_cloud_api.py | 5 +++ .../{dbt_cli_configs.py => dbt_core_cli.py} | 9 ++-- 4 files changed, 26 insertions(+), 31 deletions(-) delete mode 100644 dbtc/client/cloud/configs/cloud_api_request_templates.py create mode 100644 dbtc/client/cloud/configs/dbt_cloud_api.py rename dbtc/client/cloud/configs/{dbt_cli_configs.py => dbt_core_cli.py} (82%) diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index b5602f0..31a1422 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -13,7 +13,12 @@ # first party from dbtc.client.base import _Client - +from dbtc.client.cloud.configs.dbt_cloud_api import create_job_request +from dbtc.client.cloud.configs.dbt_core_cli import ( + run_commands, + global_cli_args, + sub_command_cli_args +) class JobRunStatus(enum.IntEnum): QUEUED = 1 @@ -24,23 +29,6 @@ class JobRunStatus(enum.IntEnum): CANCELLED = 30 -RUN_COMMANDS = ['build', 'run', 'test', 'seed', 'snapshot'] -GLOBAL_CLI_ARGS = { - 'warn_error': {'flags': ('--warn-error',), 'action': 'store_true'}, - 'use_experimental_parser': { - 'flags': ('--use-experimental-parser',), - 'action': 'store_true', - }, -} -SUB_COMMAND_CLI_ARGS = { - 'vars': {'flags': ('--vars',)}, - 'args': {'flags': ('--args',)}, - 'fail_fast': {'flags': ('-x', '--fail-fast'), 'action': 'store_true'}, - 'full_refresh': {'flags': ('--full-refresh',), 'action': 'store_true'}, - 'store_failures': {'flags': ('--store-failures',), 'action': 'store_true'}, -} - - def _version_decorator(func, version): @wraps(func) def wrapper(self, *args, **kwargs): @@ -62,7 +50,7 @@ def __init__(self, **kwargs): self.session = requests.Session() self.session.headers = self.headers self.parser = argparse.ArgumentParser() - all_cli_args = {**GLOBAL_CLI_ARGS, **SUB_COMMAND_CLI_ARGS} + all_cli_args = {**global_cli_args, **sub_command_cli_args} for arg_specs in all_cli_args.values(): flags = arg_specs['flags'] self.parser.add_argument( @@ -1070,17 +1058,16 @@ def run_status_formatted(run: Dict, time: float) -> str: job_definition['id'] = None job_definition['dbt_version'] = None - # the dbt Cloud API will fail job create requests that contain extra keys. This is the minimal set required to - # create a new job + # the dbt Cloud API will fail job create requests that contain extra keys. + # This is the minimal set required to create a new job keys = ['id', 'name', 'execution', 'account_id', 'project_id', 'environment_id', 'dbt_version', 'execute_steps', 'state', 'deferring_job_definition_id', 'triggers', 'settings', 'schedule'] job_definition = {k:v for k,v in job_definition.items() if k in keys} - new_job = self.create_job( + job_id = self.create_job( account_id=account_id, payload=job_definition - )['data'] - job_id = new_job['id'] + )['data']['id'] run = self._simple_request( f'accounts/{account_id}/jobs/{job_id}/run/', @@ -1206,9 +1193,9 @@ def parse_args(cli_args: Iterable[str], namespace: argparse.Namespace): sub_command = remaining[1] if ( - sub_command not in RUN_COMMANDS + sub_command not in run_commands and status in ['error', 'cancelled', 'skipped'] - ) or (sub_command in RUN_COMMANDS and status == 'skipped'): + ) or (sub_command in run_commands and status == 'skipped'): rerun_steps.append(command) # errors and failures are when we need to inspect to figure @@ -1243,10 +1230,10 @@ def parse_args(cli_args: Iterable[str], namespace: argparse.Namespace): ] ) global_args = parse_args( - GLOBAL_CLI_ARGS.keys(), namespace + global_cli_args.keys(), namespace ) sub_command_args = parse_args( - SUB_COMMAND_CLI_ARGS.keys(), namespace + sub_command_cli_args.keys(), namespace ) modified_command = f'dbt{global_args} {sub_command} -s {rerun_nodes}{sub_command_args}' # noqa: E501 rerun_steps.append(modified_command) diff --git a/dbtc/client/cloud/configs/cloud_api_request_templates.py b/dbtc/client/cloud/configs/cloud_api_request_templates.py deleted file mode 100644 index e69de29..0000000 diff --git a/dbtc/client/cloud/configs/dbt_cloud_api.py b/dbtc/client/cloud/configs/dbt_cloud_api.py new file mode 100644 index 0000000..e859e79 --- /dev/null +++ b/dbtc/client/cloud/configs/dbt_cloud_api.py @@ -0,0 +1,5 @@ + +create_job_request = { + 'name': '', + 'id': None +} \ No newline at end of file diff --git a/dbtc/client/cloud/configs/dbt_cli_configs.py b/dbtc/client/cloud/configs/dbt_core_cli.py similarity index 82% rename from dbtc/client/cloud/configs/dbt_cli_configs.py rename to dbtc/client/cloud/configs/dbt_core_cli.py index 370432f..19a43fc 100644 --- a/dbtc/client/cloud/configs/dbt_cli_configs.py +++ b/dbtc/client/cloud/configs/dbt_core_cli.py @@ -1,12 +1,15 @@ -RUN_COMMANDS = ['build', 'run', 'test', 'seed', 'snapshot'] -GLOBAL_CLI_ARGS = { + +run_commands = ['build', 'run', 'test', 'seed', 'snapshot'] + +global_cli_args = { 'warn_error': {'flags': ('--warn-error',), 'action': 'store_true'}, 'use_experimental_parser': { 'flags': ('--use-experimental-parser',), 'action': 'store_true', }, } -SUB_COMMAND_CLI_ARGS = { + +sub_command_cli_args = { 'vars': {'flags': ('--vars',)}, 'args': {'flags': ('--args',)}, 'fail_fast': {'flags': ('-x', '--fail-fast'), 'action': 'store_true'}, From db381535f94100d77fc5f0bc64111dc03bdc8250 Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Mon, 26 Sep 2022 19:02:56 -0600 Subject: [PATCH 5/9] adds autoscaling mode and moves some configs out of base _CloudClient --- dbtc/cli.py | 45 +-- dbtc/client/cloud/base.py | 392 +++++++++++---------- dbtc/client/cloud/configs/dbt_cloud_api.py | 44 ++- 3 files changed, 257 insertions(+), 224 deletions(-) diff --git a/dbtc/cli.py b/dbtc/cli.py index c97e8b9..9cad0e4 100644 --- a/dbtc/cli.py +++ b/dbtc/cli.py @@ -1,4 +1,5 @@ # stdlib +from enum import auto import json from typing import List, Optional @@ -968,32 +969,6 @@ def test_connection( json.loads(payload), ) -@app.command() -def trigger_job_for_ci( - ctx: typer.Context, - account_id: int = ACCOUNT_ID, - job_id: int = JOB_ID, - payload: str = PAYLOAD, - should_poll: bool = typer.Option( - True, - help='Poll until job completion (status is one of success, failure, or ' - 'cancelled)', - ), - poll_interval: int = typer.Option( - 10, '--poll-interval', help='Number of seconds to wait in between polling.' - ) -): - """Trigger job to run.""" - _dbt_cloud_request( - ctx, - 'trigger_job_for_ci', - account_id, - job_id, - json.loads(payload), - should_poll=should_poll, - poll_interval=poll_interval, - ) - @app.command() def trigger_job( @@ -1021,6 +996,22 @@ def trigger_job( 'job.' ), ), + mode: str = typer.Option( + 'standard', + help=( + 'Possible values are ["standard", "restart_from_failure", "autoscale"] ' + 'standard: runs existing job as-is ' + 'restart_from_failure: determine whether the last run of the target job ' + ' exited with an error. If yes, restart from the point of failure ' + 'autoscale: determine with the target job is currently running ' + ' If yes, create and then run the clone.' + ) + ), + autoscale_delete_post_run: bool = typer.Option( + True, help=( + 'Delete job created via autoscaling after it finishes running' + ) + ), ): """Trigger job to run.""" _dbt_cloud_request( @@ -1033,6 +1024,8 @@ def trigger_job( poll_interval=poll_interval, restart_from_failure=restart_from_failure, trigger_on_failure_only=trigger_on_failure_only, + mode=mode, + autoscale_delete_post_run=autoscale_delete_post_run, ) diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index 31a1422..c16e216 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -6,6 +6,7 @@ import time from functools import partial, wraps from typing import Dict, Iterable, List +from urllib import request import uuid # third party @@ -13,7 +14,7 @@ # first party from dbtc.client.base import _Client -from dbtc.client.cloud.configs.dbt_cloud_api import create_job_request +from dbtc.client.cloud.configs.dbt_cloud_api import dbtCloudAPIRequestFactory from dbtc.client.cloud.configs.dbt_core_cli import ( run_commands, global_cli_args, @@ -50,6 +51,7 @@ def __init__(self, **kwargs): self.session = requests.Session() self.session.headers = self.headers self.parser = argparse.ArgumentParser() + self.request_factory = dbtCloudAPIRequestFactory() all_cli_args = {**global_cli_args, **sub_command_cli_args} for arg_specs in all_cli_args.values(): flags = arg_specs['flags'] @@ -119,6 +121,12 @@ def _get_by_name(self, items: List, item_name: str, value: str = 'name'): except IndexError: obj = None return obj + + def _validate_job_run_mode(self, mode): + if mode not in ['standard', 'restart_from_failure', 'autoscale']: + return False + + return True @v3 def assign_group_permissions( @@ -991,114 +999,148 @@ def test_connection(self, account_id: int, payload: Dict) -> Dict: return self._simple_request( f'accounts/{account_id}/connections/test/', method='post', json=payload ) + + @v2 + def clone_job( + self, + account_id: int, + job_id: int, + ): - @v2 - def trigger_job_for_ci( - self, + """If a job is currently running, replicate the job definition to a new job + + Args: + account_id (int): Numeric ID of the account to retrieve + job_id (int): Numeric ID of the job to trigger + payload (dict): Payload required for post request + """ + + existing_job_definition = self.get_job( + account_id=account_id, + job_id=job_id + )['data'] + + return self.request_factory.create_job_request(data=existing_job_definition) + + @v2 + def _get_restart_job_definition( + self, account_id: int, job_id: int, payload: Dict, - *, - should_poll: bool = True, - poll_interval: int = 10, - delete_replicated_job = True, ): - """Trigger a job by its ID - designed to enable running CI jobs in parallel + """Identifies whether there was a failure on the previous run of the job. + When failures are identified, returns an updated job definition to + restart from the point of failure. Args: account_id (int): Numeric ID of the account to retrieve job_id (int): Numeric ID of the job to trigger payload (dict): Payload required for post request - should_poll (bool, optional): Poll until completion if `True`, completion - is one of success, failure, or cancelled - poll_interval (int, optional): Number of seconds to wait in between - polling - delete_replicated_job (bool, optional): True is the default, which will - automatically clean up any jobs replicated due to the base CI job - being busy at PR time. """ - #TODO: this should be abstracted somewhere - def run_status_formatted(run: Dict, time: float) -> str: - """Format a string indicating status of job. - Args: - run (dict): Dictionary representation of a Run - time (float): Elapsed time since job triggered - """ - status = JobRunStatus(run['data']['status']).name - url = run['data']['href'] - return ( - f'Status: "{status.capitalize()}", Elapsed time: {round(time, 0)}s' - f', View here: {url}' - ) + + def parse_args(cli_args: Iterable[str], namespace: argparse.Namespace): + string = '' + for arg in cli_args: + value = getattr(namespace, arg, None) + if value: + arg = arg.replace('_', '-') + if isinstance(value, bool): + string += f' --{arg}' + else: + string += f" --{arg} '{value}'" + return string - most_recent_job_run = self.list_runs( - account_id=account_id, - job_definition_id=job_id, - limit=1, - order_by='-id' + has_failures = False + + last_run_data = self.list_runs( + account_id=account_id, + include_related=['run_steps'], + job_definition_id=job_id, + order_by='-id', + limit=1, )['data'][0] - job_run_status = most_recent_job_run['status_humanized'] - self.console.log(f'Status for most recent run of job {job_id} is {job_run_status}.') + last_run_status = last_run_data['status_humanized'].lower() + last_run_id = last_run_data['id'] - if job_run_status not in ['Queued', 'Starting', 'Running']: - self.console.log(f'Triggering base CI job {job_id}.') - delete_replicated_job = False # IMPORTANT: this prevents removing the base CI job - - else: - self.console.log(f'Replicating base CI job.') - job_definition = self.get_job( - account_id=account_id, - job_id=job_id - )['data'] - - # TODO: this is gross - need a more ergnomic way to express these - job_definition['name'] = job_definition['name'] + '-' + datetime.now().strftime('%Y-%m-%d--%H:%M:%S') - job_definition['id'] = None - job_definition['dbt_version'] = None - - # the dbt Cloud API will fail job create requests that contain extra keys. - # This is the minimal set required to create a new job - keys = ['id', 'name', 'execution', 'account_id', 'project_id', 'environment_id', - 'dbt_version', 'execute_steps', 'state', 'deferring_job_definition_id', - 'triggers', 'settings', 'schedule'] - job_definition = {k:v for k,v in job_definition.items() if k in keys} - job_id = self.create_job( - account_id=account_id, - payload=job_definition - )['data']['id'] + if last_run_status == 'error': + rerun_steps = [] - run = self._simple_request( - f'accounts/{account_id}/jobs/{job_id}/run/', - method='post', - json=payload, - ) - if not run['status']['is_success']: - self.console.log(f'Run NOT triggered for job {job_id}. See run response.') - return run + for run_step in last_run_data['run_steps']: + status = run_step['status_humanized'].lower() + # Skipping cloning, profile setup, and dbt deps - always + # the first three steps in any run + if run_step['index'] <= 3 or status == 'success': + self.console.log( + f'Skipping rerun for command "{run_step["name"]}" ' + 'as it does not need to be repeated.' + ) - self.console.log(run_status_formatted(run, 0)) - if should_poll: - start = time.time() - run_id = run['data']['id'] - while True: - time.sleep(poll_interval) - run = self.get_run(account_id, run_id) - status = run['data']['status'] - self.console.log(run_status_formatted(run, time.time() - start)) - if status in [ - JobRunStatus.SUCCESS, - JobRunStatus.CANCELLED, - JobRunStatus.ERROR, - ]: - break + else: - if delete_replicated_job: - self.delete_job(account_id=account_id, job_id=job_id) - - return run + # get the dbt command used within this step + command = run_step['name'].partition('`')[2].partition('`')[0] + namespace, remaining = self.parser.parse_known_args( + shlex.split(command) + ) + sub_command = remaining[1] + + if ( + sub_command not in run_commands + and status in ['error', 'cancelled', 'skipped'] + ) or (sub_command in run_commands and status == 'skipped'): + rerun_steps.append(command) + # errors and failures are when we need to inspect to figure + # out the point of failure + else: + + # get the run results scoped to the step which had an error + # an error here indicates that either: + # 1) the fail-fast flag was set, in which case + # the run_results.json file was never created; or + # 2) there was a problem on dbt Cloud's side saving + # this artifact + try: + step_results = self.get_run_artifact( + account_id=account_id, + run_id=last_run_id, + path='run_results.json', + step=run_step['index'], + )['results'] + + # If the artifact isn't found, the API returns a 404 with + # no json. The ValueError will catch the JSONDecodeError + except ValueError: + rerun_steps.append(command) + else: + rerun_nodes = ' '.join( + [ + record['unique_id'].split('.')[2] + for record in step_results + if record['status'] + in ['error', 'skipped', 'fail'] + ] + ) + global_args = parse_args( + global_cli_args.keys(), namespace + ) + sub_command_args = parse_args( + sub_command_cli_args.keys(), namespace + ) + modified_command = f'dbt{global_args} {sub_command} -s {rerun_nodes}{sub_command_args}' # noqa: E501 + rerun_steps.append(modified_command) + self.console.log( + f'Modifying command "{command}" as an error ' + 'or failure was encountered.' + ) + if len(rerun_steps) > 0: + has_failures = True + payload.update({"steps_override": rerun_steps}) + + return payload, has_failures @v2 def trigger_job( @@ -1111,6 +1153,8 @@ def trigger_job( poll_interval: int = 10, restart_from_failure: bool = False, trigger_on_failure_only: bool = False, + mode: str = 'standard', + autoscale_delete_post_run: bool = True, ): """Trigger a job by its ID @@ -1128,7 +1172,15 @@ def trigger_job( restart_from_failure to True. This has the effect of only triggering the job when the prior invocation was not successful. Otherwise, the function will exit prior to triggering the job. - + mode (str, optional): Must be one of ['standard', 'restart_from_failure', + 'autoscaling']. + - standard mode triggers the job to run as-is. + - restart_from_failure checks for errors on the prior invocation and, + if found, restarts failed models only. + - autoscale checks whether the job_id is actively running. If so, + creates a copy of the running job + autoscale_delete_post_run (bool, optional): Only relevant when mode = 'autoscale' + Remove a job replicated via autoscaling after it finishes running. """ def run_status_formatted(run: Dict, time: float) -> str: @@ -1143,126 +1195,72 @@ def run_status_formatted(run: Dict, time: float) -> str: f'Status: "{status.capitalize()}", Elapsed time: {round(time, 0)}s' f', View here: {url}' ) + + # this is here to not break existing stuff 09.26.2022 + if restart_from_failure: + mode = 'restart_from_failure' - def parse_args(cli_args: Iterable[str], namespace: argparse.Namespace): - string = '' - for arg in cli_args: - value = getattr(namespace, arg, None) - if value: - arg = arg.replace('_', '-') - if isinstance(value, bool): - string += f' --{arg}' - else: - string += f" --{arg} '{value}'" - return string + mode_is_valid = self._validate_job_run_mode(mode) + if not mode_is_valid: + raise Exception(f'mode: {mode} is not one of ["standard", "restart_from_failure", "autoscale"]') - if restart_from_failure: + if mode == 'restart_from_failure': self.console.log(f'Restarting job {job_id} from last failed state.') - last_run_data = self.list_runs( + payload, has_failures = self._get_restart_job_definition( account_id=account_id, - include_related=['run_steps'], - job_definition_id=job_id, - order_by='-id', - limit=1, - )['data'][0] - - last_run_status = last_run_data['status_humanized'].lower() - last_run_id = last_run_data['id'] - - if last_run_status == 'error': - rerun_steps = [] - - for run_step in last_run_data['run_steps']: - - status = run_step['status_humanized'].lower() - # Skipping cloning, profile setup, and dbt deps - always - # the first three steps in any run - if run_step['index'] <= 3 or status == 'success': - self.console.log( - f'Skipping rerun for command "{run_step["name"]}" ' - 'as it does not need to be repeated.' - ) - - else: - - # get the dbt command used within this step - command = run_step['name'].partition('`')[2].partition('`')[0] - namespace, remaining = self.parser.parse_known_args( - shlex.split(command) - ) - sub_command = remaining[1] - - if ( - sub_command not in run_commands - and status in ['error', 'cancelled', 'skipped'] - ) or (sub_command in run_commands and status == 'skipped'): - rerun_steps.append(command) - - # errors and failures are when we need to inspect to figure - # out the point of failure - else: + job_id=job_id, + payload=payload + ) - # get the run results scoped to the step which had an error - # an error here indicates that either: - # 1) the fail-fast flag was set, in which case - # the run_results.json file was never created; or - # 2) there was a problem on dbt Cloud's side saving - # this artifact - try: - step_results = self.get_run_artifact( - account_id=account_id, - run_id=last_run_id, - path='run_results.json', - step=run_step['index'], - )['results'] - - # If the artifact isn't found, the API returns a 404 with - # no json. The ValueError will catch the JSONDecodeError - except ValueError: - rerun_steps.append(command) - else: - rerun_nodes = ' '.join( - [ - record['unique_id'].split('.')[2] - for record in step_results - if record['status'] - in ['error', 'skipped', 'fail'] - ] - ) - global_args = parse_args( - global_cli_args.keys(), namespace - ) - sub_command_args = parse_args( - sub_command_cli_args.keys(), namespace - ) - modified_command = f'dbt{global_args} {sub_command} -s {rerun_nodes}{sub_command_args}' # noqa: E501 - rerun_steps.append(modified_command) - self.console.log( - f'Modifying command "{command}" as an error ' - 'or failure was encountered.' - ) - - payload.update({"steps_override": rerun_steps}) + if trigger_on_failure_only and not has_failures: self.console.log( - f'Triggering modified job to re-run failed steps: {rerun_steps}' + f'Process triggered with trigger_on_failure_only set to True but no ' + 'failed run steps found. Terminating.' ) + return None + elif mode == 'autoscale': + self.console.log(f'Triggered with autoscaling set to True. Detecting any running instances') + most_recent_job_run = self.list_runs( + account_id=account_id, + job_definition_id=job_id, + limit=1, + order_by='-id' + )['data'][0] + most_recent_job_run_status = most_recent_job_run['status_humanized'] + + self.console.log(f'Status for most recent run of job {job_id} is {most_recent_job_run_status}.') + + if most_recent_job_run_status not in ['Queued', 'Starting', 'Running']: + self.console.log(f'autoscale set to true but base job with id {job_id} is free ' + 'triggering base job and ignoring autoscale configuration.') + autoscale_delete_post_run = False + else: - self.console.log( - 'Process triggered with restart_from_failure set to True but no ' - 'failed run steps found.' + self.console.log(f'job_id {job_id} has an active run. Cloning job.') + + new_job_definition = self.clone_job( + account_id=account_id, + job_id=job_id ) - if trigger_on_failure_only: - self.console.log( - 'Not triggering job because prior run was successful.' - ) - return + + #TODO: need to figure out the best way to disambiguate replicated jobs. + creation_time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S') + new_job_name = '-'.join([new_job_definition['name'], creation_time]) + new_job_definition['name'] = new_job_name + job_id = self.create_job( + account_id=account_id, + payload=new_job_definition + )['data']['id'] + + self.console.log(f'Created new job with job_id: {job_id}') run = self._simple_request( f'accounts/{account_id}/jobs/{job_id}/run/', method='post', json=payload, ) + if not run['status']['is_success']: self.console.log(f'Run NOT triggered for job {job_id}. See run response.') return run @@ -1282,6 +1280,12 @@ def parse_args(cli_args: Iterable[str], namespace: argparse.Namespace): JobRunStatus.ERROR, ]: break + + if mode == 'autoscale' and autoscale_delete_post_run: + self.delete_job( + account_id=account_id, + job_id=job_id + ) return run diff --git a/dbtc/client/cloud/configs/dbt_cloud_api.py b/dbtc/client/cloud/configs/dbt_cloud_api.py index e859e79..da3f0b9 100644 --- a/dbtc/client/cloud/configs/dbt_cloud_api.py +++ b/dbtc/client/cloud/configs/dbt_cloud_api.py @@ -1,5 +1,41 @@ +from typing import Dict -create_job_request = { - 'name': '', - 'id': None -} \ No newline at end of file +class dbtCloudAPIRequestFactory(object): + + def __init__(self, **kwargs): + for key,value in kwargs.items(): + setattr(self, key, value) + + def _create_job_request(self) -> Dict: + """Minimal set of required fields needed to create a new dbt Cloud job, including default values""" + return { + 'name': None, + 'id': None, + 'execution': None, + 'account_id': None, + 'project_id': None, + 'environment_id': None, + 'dbt_version': None, + 'execute_steps': None, + 'state': None, + 'deferring_job_definition_id': None, + 'triggers': None, + 'settings': None, + 'schedule': None + } + + def create_job_request(self, data={}) -> Dict: + """Completes the _create_job_request template with values from data and overrides + + Args: + data (dict): payload to create the initial request. Typically, this will be the result of a GET on the + job definition from an existing job to be used for dbt Cloud migrations + """ + # copy everything EXCEPT for the existing dbt Cloud job ID + result = self._create_job_request() + if data != {}: + for key in result.keys(): + if key != 'id': + result[key] = data[key] + + return result \ No newline at end of file From 1eb16834ffb154a2faef2b8c7628404ea8fe6ab9 Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Tue, 27 Sep 2022 10:52:08 -0600 Subject: [PATCH 6/9] create enums file --- dbtc/client/cloud/base.py | 11 +---------- dbtc/client/cloud/configs/enums.py | 11 +++++++++++ 2 files changed, 12 insertions(+), 10 deletions(-) create mode 100644 dbtc/client/cloud/configs/enums.py diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index c16e216..da15218 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -1,19 +1,18 @@ # stdlib import argparse from datetime import datetime -import enum import shlex import time from functools import partial, wraps from typing import Dict, Iterable, List from urllib import request -import uuid # third party import requests # first party from dbtc.client.base import _Client +from dbtc.client.cloud.configs.enums import JobRunStatus, JobRunModes from dbtc.client.cloud.configs.dbt_cloud_api import dbtCloudAPIRequestFactory from dbtc.client.cloud.configs.dbt_core_cli import ( run_commands, @@ -21,14 +20,6 @@ sub_command_cli_args ) -class JobRunStatus(enum.IntEnum): - QUEUED = 1 - STARTING = 2 - RUNNING = 3 - SUCCESS = 10 - ERROR = 20 - CANCELLED = 30 - def _version_decorator(func, version): @wraps(func) diff --git a/dbtc/client/cloud/configs/enums.py b/dbtc/client/cloud/configs/enums.py new file mode 100644 index 0000000..06bd16c --- /dev/null +++ b/dbtc/client/cloud/configs/enums.py @@ -0,0 +1,11 @@ +import + +class JobRunStatus(enum.IntEnum): + QUEUED = 1 + STARTING = 2 + RUNNING = 3 + SUCCESS = 10 + ERROR = 20 + CANCELLED = 30 + +class JobRunModes(enum.StrEnum): From d083cc218c537140ed8cb60216ea5892570a953a Mon Sep 17 00:00:00 2001 From: Matt Winkler Date: Tue, 27 Sep 2022 12:41:26 -0600 Subject: [PATCH 7/9] update enums --- dbtc/client/cloud/base.py | 2 +- dbtc/client/cloud/configs/enums.py | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index da15218..ec5e9ae 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -114,7 +114,7 @@ def _get_by_name(self, items: List, item_name: str, value: str = 'name'): return obj def _validate_job_run_mode(self, mode): - if mode not in ['standard', 'restart_from_failure', 'autoscale']: + if mode not in JobRunModes: return False return True diff --git a/dbtc/client/cloud/configs/enums.py b/dbtc/client/cloud/configs/enums.py index 06bd16c..245b7e7 100644 --- a/dbtc/client/cloud/configs/enums.py +++ b/dbtc/client/cloud/configs/enums.py @@ -1,4 +1,4 @@ -import +import enum class JobRunStatus(enum.IntEnum): QUEUED = 1 @@ -8,4 +8,7 @@ class JobRunStatus(enum.IntEnum): ERROR = 20 CANCELLED = 30 -class JobRunModes(enum.StrEnum): +class JobRunModes(str, enum.Enum): + STANDARD = 'standard' + RESTART = 'restart_from_failure' + AUTOSCALE = 'autoscale' From 59bbc9fb724c3d35bc40cd3772556504ddd1d763 Mon Sep 17 00:00:00 2001 From: Doug Guthrie Date: Tue, 27 Sep 2022 15:50:23 -0600 Subject: [PATCH 8/9] Add pydantic --- .pre-commit-config.yaml | 2 +- poetry.lock | 18 +++++++++++++++++- pyproject.toml | 1 + 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1fc7838..3011d45 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,7 +7,7 @@ repos: - id: isort stages: [commit,push] name: isort - entry: poetry run isort -rc + entry: poetry run isort language: system types: [python] - id: black diff --git a/poetry.lock b/poetry.lock index 261a20a..47a2e4f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -600,6 +600,21 @@ category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +[[package]] +name = "pydantic" +version = "1.10.2" +description = "Data validation and settings management using python type hints" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +typing-extensions = ">=4.1.0" + +[package.extras] +dotenv = ["python-dotenv (>=0.10.4)"] +email = ["email-validator (>=1.0.3)"] + [[package]] name = "pyflakes" version = "2.4.0" @@ -938,7 +953,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest- [metadata] lock-version = "1.1" python-versions = "^3.8" -content-hash = "02325e1d8e94719b09b921bed4f10cec541135cb29e02514367fa34594346b7b" +content-hash = "ecbd26f1cd1c50c8dd6a3199923a7c4bc888401255ea46a61097f8c3578ddb90" [metadata.files] appnope = [ @@ -1178,6 +1193,7 @@ pycodestyle = [ {file = "pycodestyle-2.8.0-py2.py3-none-any.whl", hash = "sha256:720f8b39dde8b293825e7ff02c475f3077124006db4f440dcbc9a20b76548a20"}, {file = "pycodestyle-2.8.0.tar.gz", hash = "sha256:eddd5847ef438ea1c7870ca7eb78a9d47ce0cdb4851a5523949f2601d0cbbe7f"}, ] +pydantic = [] pyflakes = [ {file = "pyflakes-2.4.0-py2.py3-none-any.whl", hash = "sha256:3bb3a3f256f4b7968c9c788781e4ff07dce46bdf12339dcda61053375426ee2e"}, {file = "pyflakes-2.4.0.tar.gz", hash = "sha256:05a85c2872edf37a4ed30b0cce2f6093e1d0581f8c19d7393122da7e25b2b24c"}, diff --git a/pyproject.toml b/pyproject.toml index c23c73d..c0fc77e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,7 @@ python = "^3.8" sgqlc = "^15.0" requests = "^2.27.1" typer = {extras = ["all"], version = "^0.6.1"} +pydantic = "^1.10.2" [tool.poetry.dev-dependencies] black = "^22.1.0" From ccea05c462706133260a7fbd7e2b9d45aa2b16a6 Mon Sep 17 00:00:00 2001 From: Doug Guthrie Date: Tue, 27 Sep 2022 16:08:19 -0600 Subject: [PATCH 9/9] Add support for cloning, resource validation --- dbtc/cli.py | 7 +- dbtc/client/cloud/base.py | 199 ++++++++++++--------- dbtc/client/cloud/configs/dbt_cloud_api.py | 43 +++-- dbtc/client/cloud/configs/dbt_core_cli.py | 1 - dbtc/client/cloud/configs/enums.py | 3 + dbtc/client/cloud/models/__init__.py | 2 + dbtc/client/cloud/models/constants.py | 7 + dbtc/client/cloud/models/job.py | 63 +++++++ dbtc/client/cloud/models/project.py | 23 +++ 9 files changed, 235 insertions(+), 113 deletions(-) create mode 100644 dbtc/client/cloud/models/__init__.py create mode 100644 dbtc/client/cloud/models/constants.py create mode 100644 dbtc/client/cloud/models/job.py create mode 100644 dbtc/client/cloud/models/project.py diff --git a/dbtc/cli.py b/dbtc/cli.py index 9cad0e4..75dcaae 100644 --- a/dbtc/cli.py +++ b/dbtc/cli.py @@ -1,5 +1,4 @@ # stdlib -from enum import auto import json from typing import List, Optional @@ -1005,12 +1004,10 @@ def trigger_job( ' exited with an error. If yes, restart from the point of failure ' 'autoscale: determine with the target job is currently running ' ' If yes, create and then run the clone.' - ) + ), ), autoscale_delete_post_run: bool = typer.Option( - True, help=( - 'Delete job created via autoscaling after it finishes running' - ) + True, help=('Delete job created via autoscaling after it finishes running') ), ): """Trigger job to run.""" diff --git a/dbtc/client/cloud/base.py b/dbtc/client/cloud/base.py index ec5e9ae..d9b9783 100644 --- a/dbtc/client/cloud/base.py +++ b/dbtc/client/cloud/base.py @@ -1,24 +1,23 @@ # stdlib import argparse -from datetime import datetime import shlex import time +from datetime import datetime from functools import partial, wraps from typing import Dict, Iterable, List -from urllib import request # third party import requests # first party from dbtc.client.base import _Client -from dbtc.client.cloud.configs.enums import JobRunStatus, JobRunModes -from dbtc.client.cloud.configs.dbt_cloud_api import dbtCloudAPIRequestFactory +from dbtc.client.cloud import models from dbtc.client.cloud.configs.dbt_core_cli import ( - run_commands, global_cli_args, - sub_command_cli_args + run_commands, + sub_command_cli_args, ) +from dbtc.client.cloud.configs.enums import JobRunModes, JobRunStatus def _version_decorator(func, version): @@ -31,6 +30,7 @@ def wrapper(self, *args, **kwargs): return wrapper +# Version Decorators v2 = partial(_version_decorator, version='v2') v3 = partial(_version_decorator, version='v3') v4 = partial(_version_decorator, version='v4') @@ -42,7 +42,6 @@ def __init__(self, **kwargs): self.session = requests.Session() self.session.headers = self.headers self.parser = argparse.ArgumentParser() - self.request_factory = dbtCloudAPIRequestFactory() all_cli_args = {**global_cli_args, **sub_command_cli_args} for arg_specs in all_cli_args.values(): flags = arg_specs['flags'] @@ -60,10 +59,28 @@ def _header_property(self): return 'api_key' + def _clone_resource(self, resource: str, **kwargs): + create_args = kwargs.pop('create_args', None) + payload = getattr(self, f'get_{resource}')(**kwargs)['data'] + + # Can't recreate a resource with an ID + payload.pop('id', None) + if create_args is not None: + kwargs = {k: v for k, v in kwargs.items() if k in create_args} + kwargs['payload'] = payload + return getattr(self, f'create_{resource}')(**kwargs) + def _make_request( self, path: str, *, method: str = 'get', **kwargs ) -> requests.Response: """Make request to API.""" + + # Model is not an argument that the request method accepts, needs to be removed + model = kwargs.pop('model', None) + if model is not None: + + # This will validate the payload as well as add any optional fields + kwargs['json'] = model(**kwargs['json']).dict() full_url = self.full_url(path) response = self.session.request(method=method, url=full_url, **kwargs) return response @@ -112,11 +129,11 @@ def _get_by_name(self, items: List, item_name: str, value: str = 'name'): except IndexError: obj = None return obj - + def _validate_job_run_mode(self, mode): if mode not in JobRunModes: return False - + return True @v3 @@ -183,6 +200,27 @@ def cancel_run(self, account_id: int, run_id: int) -> Dict: method='post', ) + @v2 + def clone_job( + self, + account_id: int, + job_id: int, + ): + + """Create a job using the configuration of another + + !!! tip + If a job is currently running, replicate the job definition to a new job, + and trigger + + Args: + account_id (int): Numeric ID of the account to retrieve + job_id (int): Numeric ID of the job to trigger + """ + return self._clone_resource( + 'job', account_id=account_id, job_id=job_id, create_args=['account_id'] + ) + @v3 def create_adapter(self, account_id: int, project_id: int, payload: Dict) -> Dict: """Create an adapter @@ -280,6 +318,7 @@ def create_job(self, account_id: int, payload: Dict) -> Dict: f'accounts/{account_id}/jobs/', method='post', json=payload, + model=models.Job, ) @v3 @@ -291,7 +330,10 @@ def create_project(self, account_id: int, payload: Dict) -> Dict: payload (dict): Dictionary representing the project to create """ return self._simple_request( - f'accounts/{account_id}/projects/', method='post', json=payload + f'accounts/{account_id}/projects/', + method='post', + json=payload, + model=models.Project, ) @v3 @@ -785,7 +827,11 @@ def list_invited_users(self, account_id: int) -> Dict: @v2 def list_jobs( - self, account_id: int, *, order_by: str = None, project_id: int = None, + self, + account_id: int, + *, + order_by: str = None, + project_id: int = None, ) -> Dict: """List jobs in an account or specific project. @@ -794,7 +840,7 @@ def list_jobs( order_by (str, optional): Field to order the result by. Use - to indicate reverse order. project_id (int, optional): Numeric ID of the project containing jobs - """ + """ return self._simple_request( f'accounts/{account_id}/jobs/', params={'order_by': order_by, 'project_id': project_id}, @@ -990,30 +1036,8 @@ def test_connection(self, account_id: int, payload: Dict) -> Dict: return self._simple_request( f'accounts/{account_id}/connections/test/', method='post', json=payload ) - - @v2 - def clone_job( - self, - account_id: int, - job_id: int, - ): - """If a job is currently running, replicate the job definition to a new job - - Args: - account_id (int): Numeric ID of the account to retrieve - job_id (int): Numeric ID of the job to trigger - payload (dict): Payload required for post request - """ - - existing_job_definition = self.get_job( - account_id=account_id, - job_id=job_id - )['data'] - - return self.request_factory.create_job_request(data=existing_job_definition) - - @v2 + @v2 def _get_restart_job_definition( self, account_id: int, @@ -1022,9 +1046,9 @@ def _get_restart_job_definition( ): """Identifies whether there was a failure on the previous run of the job. - When failures are identified, returns an updated job definition to + When failures are identified, returns an updated job definition to restart from the point of failure. - + Args: account_id (int): Numeric ID of the account to retrieve job_id (int): Numeric ID of the job to trigger @@ -1042,16 +1066,16 @@ def parse_args(cli_args: Iterable[str], namespace: argparse.Namespace): else: string += f" --{arg} '{value}'" return string - + has_failures = False last_run_data = self.list_runs( - account_id=account_id, - include_related=['run_steps'], - job_definition_id=job_id, - order_by='-id', - limit=1, - )['data'][0] + account_id=account_id, + include_related=['run_steps'], + job_definition_id=job_id, + order_by='-id', + limit=1, + )['data'][0] last_run_status = last_run_data['status_humanized'].lower() last_run_id = last_run_data['id'] @@ -1111,13 +1135,10 @@ def parse_args(cli_args: Iterable[str], namespace: argparse.Namespace): [ record['unique_id'].split('.')[2] for record in step_results - if record['status'] - in ['error', 'skipped', 'fail'] + if record['status'] in ['error', 'skipped', 'fail'] ] ) - global_args = parse_args( - global_cli_args.keys(), namespace - ) + global_args = parse_args(global_cli_args.keys(), namespace) sub_command_args = parse_args( sub_command_cli_args.keys(), namespace ) @@ -1129,10 +1150,10 @@ def parse_args(cli_args: Iterable[str], namespace: argparse.Namespace): ) if len(rerun_steps) > 0: has_failures = True - payload.update({"steps_override": rerun_steps}) - + payload.update({"steps_override": rerun_steps}) + return payload, has_failures - + @v2 def trigger_job( self, @@ -1163,14 +1184,15 @@ def trigger_job( restart_from_failure to True. This has the effect of only triggering the job when the prior invocation was not successful. Otherwise, the function will exit prior to triggering the job. - mode (str, optional): Must be one of ['standard', 'restart_from_failure', - 'autoscaling']. + mode (str, optional): Must be one of ['standard', 'restart_from_failure', + 'autoscaling']. - standard mode triggers the job to run as-is. - - restart_from_failure checks for errors on the prior invocation and, + - restart_from_failure checks for errors on the prior invocation and, if found, restarts failed models only. - autoscale checks whether the job_id is actively running. If so, creates a copy of the running job - autoscale_delete_post_run (bool, optional): Only relevant when mode = 'autoscale' + autoscale_delete_post_run (bool, optional): Only relevant when + mode = 'autoscale' Remove a job replicated via autoscaling after it finishes running. """ @@ -1186,62 +1208,66 @@ def run_status_formatted(run: Dict, time: float) -> str: f'Status: "{status.capitalize()}", Elapsed time: {round(time, 0)}s' f', View here: {url}' ) - + # this is here to not break existing stuff 09.26.2022 if restart_from_failure: mode = 'restart_from_failure' mode_is_valid = self._validate_job_run_mode(mode) if not mode_is_valid: - raise Exception(f'mode: {mode} is not one of ["standard", "restart_from_failure", "autoscale"]') + raise Exception( + f'mode: {mode} is not one of ' + '["standard", "restart_from_failure", "autoscale"]' + ) if mode == 'restart_from_failure': self.console.log(f'Restarting job {job_id} from last failed state.') payload, has_failures = self._get_restart_job_definition( - account_id=account_id, - job_id=job_id, - payload=payload + account_id=account_id, job_id=job_id, payload=payload ) if trigger_on_failure_only and not has_failures: self.console.log( - f'Process triggered with trigger_on_failure_only set to True but no ' - 'failed run steps found. Terminating.' + 'Process triggered with trigger_on_failure_only set to True but ' + 'no failed run steps found. Terminating.' ) return None elif mode == 'autoscale': - self.console.log(f'Triggered with autoscaling set to True. Detecting any running instances') + self.console.log( + 'Triggered with autoscaling set to True. ' + 'Detecting any running instances' + ) most_recent_job_run = self.list_runs( - account_id=account_id, - job_definition_id=job_id, - limit=1, - order_by='-id' + account_id=account_id, job_definition_id=job_id, limit=1, order_by='-id' )['data'][0] most_recent_job_run_status = most_recent_job_run['status_humanized'] - - self.console.log(f'Status for most recent run of job {job_id} is {most_recent_job_run_status}.') + + self.console.log( + f'Status for most recent run of job {job_id} ' + f'is {most_recent_job_run_status}.' + ) if most_recent_job_run_status not in ['Queued', 'Starting', 'Running']: - self.console.log(f'autoscale set to true but base job with id {job_id} is free ' - 'triggering base job and ignoring autoscale configuration.') + self.console.log( + f'autoscale set to true but base job with id {job_id} is free ' + 'triggering base job and ignoring autoscale configuration.' + ) autoscale_delete_post_run = False - + else: self.console.log(f'job_id {job_id} has an active run. Cloning job.') - + new_job_definition = self.clone_job( - account_id=account_id, - job_id=job_id + account_id=account_id, job_id=job_id ) - - #TODO: need to figure out the best way to disambiguate replicated jobs. + + # TODO: need to figure out the best way to disambiguate replicated jobs. creation_time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S') new_job_name = '-'.join([new_job_definition['name'], creation_time]) new_job_definition['name'] = new_job_name job_id = self.create_job( - account_id=account_id, - payload=new_job_definition + account_id=account_id, payload=new_job_definition )['data']['id'] self.console.log(f'Created new job with job_id: {job_id}') @@ -1251,7 +1277,7 @@ def run_status_formatted(run: Dict, time: float) -> str: method='post', json=payload, ) - + if not run['status']['is_success']: self.console.log(f'Run NOT triggered for job {job_id}. See run response.') return run @@ -1271,12 +1297,9 @@ def run_status_formatted(run: Dict, time: float) -> str: JobRunStatus.ERROR, ]: break - + if mode == 'autoscale' and autoscale_delete_post_run: - self.delete_job( - account_id=account_id, - job_id=job_id - ) + self.delete_job(account_id=account_id, job_id=job_id) return run @@ -1293,7 +1316,7 @@ def update_connection( payload (dict): Dictionary representing the connection to update """ return self._simple_request( - f'accounts/{account_id}/projects/{project_id}/connections/{connection_id}/', + f'accounts/{account_id}/projects/{project_id}/connections/{connection_id}/', # noqa: E501 method='post', json=payload, ) @@ -1311,7 +1334,7 @@ def update_credentials( payload (dict): Dictionary representing the credentials to update """ return self._simple_request( - f'accounts/{account_id}/projects/{project_id}/credentials/{credentials_id}/', # noqa: E50 + f'accounts/{account_id}/projects/{project_id}/credentials/{credentials_id}/', # noqa: E501 method='post', json=payload, ) diff --git a/dbtc/client/cloud/configs/dbt_cloud_api.py b/dbtc/client/cloud/configs/dbt_cloud_api.py index da3f0b9..c980c06 100644 --- a/dbtc/client/cloud/configs/dbt_cloud_api.py +++ b/dbtc/client/cloud/configs/dbt_cloud_api.py @@ -1,35 +1,40 @@ +# stdlib from typing import Dict + class dbtCloudAPIRequestFactory(object): - def __init__(self, **kwargs): - for key,value in kwargs.items(): + for key, value in kwargs.items(): setattr(self, key, value) def _create_job_request(self) -> Dict: - """Minimal set of required fields needed to create a new dbt Cloud job, including default values""" + """Minimal set of required fields needed to create a new dbt Cloud job, + including default values + """ return { 'name': None, 'id': None, 'execution': None, - 'account_id': None, - 'project_id': None, - 'environment_id': None, - 'dbt_version': None, - 'execute_steps': None, - 'state': None, - 'deferring_job_definition_id': None, - 'triggers': None, - 'settings': None, - 'schedule': None + 'account_id': None, + 'project_id': None, + 'environment_id': None, + 'dbt_version': None, + 'execute_steps': None, + 'state': None, + 'deferring_job_definition_id': None, + 'triggers': None, + 'settings': None, + 'schedule': None, } - + def create_job_request(self, data={}) -> Dict: - """Completes the _create_job_request template with values from data and overrides - + """Completes the _create_job_request template with values from data and + overrides + Args: - data (dict): payload to create the initial request. Typically, this will be the result of a GET on the - job definition from an existing job to be used for dbt Cloud migrations + data (dict): payload to create the initial request. Typically, this will be + the result of a GET on the job definition from an existing job to be used + for dbt Cloud migrations """ # copy everything EXCEPT for the existing dbt Cloud job ID result = self._create_job_request() @@ -38,4 +43,4 @@ def create_job_request(self, data={}) -> Dict: if key != 'id': result[key] = data[key] - return result \ No newline at end of file + return result diff --git a/dbtc/client/cloud/configs/dbt_core_cli.py b/dbtc/client/cloud/configs/dbt_core_cli.py index 19a43fc..bc9df6a 100644 --- a/dbtc/client/cloud/configs/dbt_core_cli.py +++ b/dbtc/client/cloud/configs/dbt_core_cli.py @@ -1,4 +1,3 @@ - run_commands = ['build', 'run', 'test', 'seed', 'snapshot'] global_cli_args = { diff --git a/dbtc/client/cloud/configs/enums.py b/dbtc/client/cloud/configs/enums.py index 245b7e7..e909e3f 100644 --- a/dbtc/client/cloud/configs/enums.py +++ b/dbtc/client/cloud/configs/enums.py @@ -1,5 +1,7 @@ +# stdlib import enum + class JobRunStatus(enum.IntEnum): QUEUED = 1 STARTING = 2 @@ -8,6 +10,7 @@ class JobRunStatus(enum.IntEnum): ERROR = 20 CANCELLED = 30 + class JobRunModes(str, enum.Enum): STANDARD = 'standard' RESTART = 'restart_from_failure' diff --git a/dbtc/client/cloud/models/__init__.py b/dbtc/client/cloud/models/__init__.py new file mode 100644 index 0000000..93852cb --- /dev/null +++ b/dbtc/client/cloud/models/__init__.py @@ -0,0 +1,2 @@ +from .job import Job # noqa: F401 +from .project import Project # noqa: F401 diff --git a/dbtc/client/cloud/models/constants.py b/dbtc/client/cloud/models/constants.py new file mode 100644 index 0000000..e861841 --- /dev/null +++ b/dbtc/client/cloud/models/constants.py @@ -0,0 +1,7 @@ +# stdlib +import enum + + +class State(enum.IntEnum): + active = 1 + deleted = 2 diff --git a/dbtc/client/cloud/models/job.py b/dbtc/client/cloud/models/job.py new file mode 100644 index 0000000..2f19d7e --- /dev/null +++ b/dbtc/client/cloud/models/job.py @@ -0,0 +1,63 @@ +# stdlib +from typing import List, Literal, Optional + +# third party +from pydantic import BaseModel + +from .constants import State + + +class _JobExecution(BaseModel): + timeout_seconds: int + + +class _JobSchedule(BaseModel): + cron: str + date: Literal['custom_cron', 'days_of_week', 'every_day'] + time: Literal['every_hour', 'at_exact_hours'] + + +class _JobSettings(BaseModel): + threads: int + target_name: str + + +class _JobTrigger(BaseModel): + github_webhook: bool + schedule: bool + git_provider_webhook: Optional[bool] = None + + +class Job(BaseModel): + + # Required + account_id: int + environment_id: int + generate_docs: bool + name: str + project_id: int + run_generate_sources: bool + state: Literal[State.active, State.deleted] + + # Optional + dbt_version: Optional[str] = None + deactivated: bool = False + deferring_job_definiton_id: Optional[int] = None + execute_steps: Optional[List[str]] = None + execution: Optional[_JobExecution] = None + id: Optional[int] = None + is_deferrable: Optional[bool] = False + run_failure_count: int = 0 + schedule: Optional[_JobSchedule] = None + settings: Optional[_JobSettings] = None + triggers: Optional[_JobTrigger] = None + + def __init__(self, **data): + schedule = data.get('schedule', {}) + date = schedule.get('date', {}).get('type', None) + time = schedule.get('time', {}).get('type', None) + if date is not None: + data['schedule']['date'] = date + if time is not None: + data['schedule']['time'] = time + super().__init__(**data) diff --git a/dbtc/client/cloud/models/project.py b/dbtc/client/cloud/models/project.py new file mode 100644 index 0000000..0136288 --- /dev/null +++ b/dbtc/client/cloud/models/project.py @@ -0,0 +1,23 @@ +# stdlib +from typing import Optional + +# third party +from pydantic import BaseModel + +from .constants import State + + +class Project(BaseModel): + + # Required + account_id: int + name: str + + # Optional + id: Optional[int] = None + connection_id: Optional[int] = None + dbt_project_subdirectory: Optional[str] = None + docs_job_id: Optional[int] = None + freshness_job_id: Optional[int] = None + repository_id: Optional[int] = None + state: int = State.active