From 8f4a3f3bd18127119a33a39a94307757715a2e55 Mon Sep 17 00:00:00 2001 From: dmichalopoulos_komply Date: Mon, 26 Aug 2019 11:53:49 -0500 Subject: [PATCH 01/11] May not be needed, but adding __int__.py files anyway. --- __init__.py | 0 python-clusters/__init__.py | 0 python-clusters/emr-attach-to-existing-cluster/__init__.py | 0 python-runnables/__init__.py | 0 4 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 __init__.py create mode 100644 python-clusters/__init__.py create mode 100644 python-clusters/emr-attach-to-existing-cluster/__init__.py create mode 100644 python-runnables/__init__.py diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python-clusters/__init__.py b/python-clusters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python-clusters/emr-attach-to-existing-cluster/__init__.py b/python-clusters/emr-attach-to-existing-cluster/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python-runnables/__init__.py b/python-runnables/__init__.py new file mode 100644 index 0000000..e69de29 From 6f49860be647908857c45eedb37c0b57cfefa618 Mon Sep 17 00:00:00 2001 From: dmichalopoulos_komply Date: Mon, 26 Aug 2019 12:01:25 -0500 Subject: [PATCH 02/11] boto_params.py contains class-level variables for many commonly used boto3 params. cluster_ops.py is now the home of all primary cluster operations (starting, stopping/terminating/detaching, and copying (new feature)). --- python-lib/__init__.py | 0 python-lib/boto_params.py | 64 +++++++ python-lib/cluster_ops.py | 382 +++++++++++++++++++++++++++++++++++++ python-lib/ssh/__init__.py | 0 python-lib/ssh/client.py | 116 +++++++++++ 5 files changed, 562 insertions(+) create mode 100644 python-lib/__init__.py create mode 100644 python-lib/boto_params.py create mode 100644 python-lib/cluster_ops.py create mode 100644 python-lib/ssh/__init__.py create mode 100644 python-lib/ssh/client.py diff --git a/python-lib/__init__.py b/python-lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python-lib/boto_params.py b/python-lib/boto_params.py new file mode 100644 index 0000000..6ecdb27 --- /dev/null +++ b/python-lib/boto_params.py @@ -0,0 +1,64 @@ + + +class Constant(object): + OnDemand = "ON_DEMAND" + Spot = "SPOT" + Master = "MASTER" + Core = "CORE" + Task = "TASK" + Gp2 = "gp2" + AwaitingFulfillment = "AWAITING_FULFILLMENT" + Provisioning = "PROVISIONING" + Bootstrapping = "BOOTSTRAPPING" + Running = "RUNNING" + + +class Arg(object): + Key = "Key" + Value = "Value" + Name = "Name" + Id = "Id" + Cluster = "Cluster" + Release = "ReleaseLabel" + Instances = "Instances" + Applications = "Applications" + VisibleToAllUsers = "VisibleToAllUsers" + JobFlowRole = "JobFlowRole" + ServiceRole = "ServiceRole" + Tags = "Tags" + SecurityConfig = "SecurityConfiguration" + EbsRootVolSize = "EbsRootVolumeSize" + Iops = "Iops" + Configurations = "Configurations" + InstanceGroups = "InstanceGroups" + InstanceGroupType = "InstanceGroupType" + Ec2InstanceAttributes = "Ec2InstanceAttributes" + KeepJobFlowAliveWhenNoSteps = "KeepJobFlowAliveWhenNoSteps" + Ec2SubnetId = "Ec2SubnetId" + Ec2KeyName = "Ec2KeyName" + AddlMasterSecurityGroups = "AdditionalMasterSecurityGroups" + AddlSlaveSecurityGroups = "AdditionalSlaveSecurityGroups" + TerminationProtected = "TerminationProtected" + InstanceRole = 'InstanceRole' + Market = 'Market' + BidPrice = "BidPrice" + InstanceType = "InstanceType" + InstanceCount = 'InstanceCount' + RunningInstanceCount = "RunningInstanceCount" + EbsConfig = 'EbsConfiguration' + EbsBlockDeviceConfigs = 'EbsBlockDeviceConfigs' + EbsVolSpec = "VolumeSpecification" + EbsVolType = "VolumeType" + EbsVolSizeGb = "SizeInGB" + EbsVolsPerInstance = "VolumesPerInstance" + EbsOptimized = "EbsOptimized" + Classification = "Classification" + Properties = "Properties" + LogUri = "LogUri" + PrivateIpAddress = "PrivateIpAddress" + Status = "Status" + State = "State" + + +class Response(object): + JobFlowId = "JobFlowId" \ No newline at end of file diff --git a/python-lib/cluster_ops.py b/python-lib/cluster_ops.py new file mode 100644 index 0000000..3c42d2e --- /dev/null +++ b/python-lib/cluster_ops.py @@ -0,0 +1,382 @@ +import boto3 +import os +import logging +import json +import subprocess + +import dku_emr +import dataiku +from dataiku.cluster import Cluster +from boto_params import Arg, Constant, Response +from ssh.client import RemoteSSHClient + +# This actually belongs in the main entry point +logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO) +logging.getLogger().setLevel(logging.INFO) + + +class ClusterBuilder(object): + + def __init__(self, my_cluster): + self.my_cluster = my_cluster + + def build_cluster(self): + region = self.my_cluster.config.get("awsRegionId") or dku_emr.get_current_region() + client = boto3.client('emr', region_name=region) + release = 'emr-%s' % self.my_cluster.config["emrVersion"] + name_prefix = "dss-" + name = ( + name_prefix + self.my_cluster.cluster_id + if self.my_cluster.cluster_id[0:len(name_prefix)] != name_prefix + else self.my_cluster.cluster_id + ) + logging.info("starting cluster, release=%s name=%s" % (release, name)) + + extra_args = {} + + # Path for logs in S3 + logs_path = self.my_cluster.config.get("logsPath") + if logs_path is not None: + if "s3://" in logs_path: + extra_args[Arg.LogUri] = logs_path + else: + raise Exception("'{}' is not a valid S3 path".format(logs_path)) + + # Use specified security config + if "securityConfiguration" in self.my_cluster.config: + extra_args[Arg.SecurityConfig] = self.my_cluster.config["securityConfiguration"] + + # EBS root volume size (minimum of 10) + extra_args[Arg.EbsRootVolSize] = int(self.my_cluster.config.get('ebsRootVolumeSize') or 25) + + # EMR app (e.g., Spark, Hive) configs + extra_args[Arg.Configurations] = self._get_software_configs() + + # EMR instance groups and configs + instances = self._get_instance_configs() + + # EMR applications to install + applications = self._get_apps_to_install() + + # Tags + tags = self._get_tags(name) + + # All args to run_job_flow(..) + job_flow_params = { + Arg.Name: name, + Arg.Release: release, + Arg.Instances: instances, + Arg.Applications: applications, + Arg.VisibleToAllUsers: True, + Arg.JobFlowRole: self.my_cluster.config["nodesRole"], + Arg.ServiceRole: self.my_cluster.config["serviceRole"], + Arg.Tags: tags + } + job_flow_params.update(extra_args) + + logging.info("Starting cluster: %s", job_flow_params) + + response = client.run_job_flow(**job_flow_params) + cluster_id = response[Response.JobFlowId] + logging.info("cluster_id=%s" % cluster_id) + + logging.info("waiting for cluster to start") + client.get_waiter('cluster_running').wait(ClusterId=cluster_id) + + cluster_keys_and_data = dku_emr.make_cluster_keys_and_data( + client, cluster_id, create_user_dir=True, create_databases=self.my_cluster.config.get("databasesToCreate") + ) + + # TODO: Implement install of python libs and post-launch SSH commands to run on each node + # try: + # python_libs = self.my_cluster.config.get('pythonLibs') or self.my_cluster.config.get('python_libs') + # if str(python_libs) not in {"None", "[]"}: + # self._install_python_libs() + # + # extra_setup_cmds = self.my_cluster.config.get('extraSetup') or self.my_cluster.config.get('extra_setup') + # if str(extra_setup_cmds) not in {"None", "[]"}: + # self._run_shell_commands() + # + # return cluster_keys_and_data + # + # except Exception as e: + # logging.info(e) + # logging.info("Setup failed. Terminating cluster.") + # self.my_cluster.stop(data={'emrClusterId': cluster_id}) + return cluster_keys_and_data + + def _get_instance_configs(self): + subnet = self.my_cluster.config.get("subnetId") or dku_emr.get_current_subnet() + security_groups = self._get_security_groups() + instances = { + Arg.InstanceGroups: [self._get_master_group_config()], + Arg.KeepJobFlowAliveWhenNoSteps: True, + Arg.Ec2SubnetId: subnet, + Arg.AddlMasterSecurityGroups: security_groups, + Arg.AddlSlaveSecurityGroups: security_groups, + Arg.TerminationProtected: (self.my_cluster.config.get('terminationProtected') or False) + } + + # Need >= 1 core nodes + if not self.my_cluster.config.get("coreInstanceType"): + raise Exception("Missing core instance type") + instances[Arg.InstanceGroups].append(self._get_slave_group_config(Constant.Core.lower())) + + # Don't need task group instance type unless requested count >= 1 + if self.my_cluster.config.get("taskInstanceCount"): + if not self.my_cluster.config.get("taskInstanceType"): + raise Exception("Missing task instance type") + instances[Arg.InstanceGroups].append(self._get_slave_group_config(Constant.Task.lower())) + + ec2_keyname = self._get_ec2_keyname() + if ec2_keyname: + instances[Arg.Ec2KeyName] = ec2_keyname + + return instances + + def _get_ec2_keyname(self): + """Use user-specified keypair, o/w the one specified (if any) in default settings""" + ec2_keyname = self.my_cluster.config.get('ec2KeyName') + + return ec2_keyname if ec2_keyname != "" else None + + def _get_master_group_config(self): + return { + Arg.InstanceRole: Constant.Master, + Arg.Market: Constant.OnDemand, + Arg.InstanceType: self.my_cluster.config["masterInstanceType"], + Arg.InstanceCount: 1, + Arg.EbsConfig: { + Arg.EbsBlockDeviceConfigs: [ + { + Arg.EbsVolSpec: { + Arg.EbsVolType: Constant.Gp2, + # Arg.Iops: 123, + Arg.EbsVolSizeGb: self.my_cluster.config.get('masterEbsSize') or defaults['size_gb'] + }, + Arg.EbsVolsPerInstance: self.my_cluster.config.get('masterEbsCount') or defaults['count'] + }, + ], + Arg.EbsOptimized: self.my_cluster.config.get('ebsOptimized') or True + }, + } + + def _get_slave_group_config(self, group_id): + core = Constant.Core.lower() + task = Constant.Task.lower() + if group_id not in {core, task}: + raise Exception( + "Unknown instance group type. Must be either '{}' or '{}'.".format(core, task) + ) + + if self.my_cluster.config.get('{}InstanceCount'.format(group_id)) == 0: + if group_id == core: + raise Exception( + "0 {} instances requested. You must request at least 1.".format(Constant.Core.upper()) + ) + return None + + instance_group = { + Arg.InstanceRole: group_id.upper(), + Arg.InstanceType: self.my_cluster.config['{}InstanceType'.format(group_id)], + Arg.InstanceCount: self.my_cluster.config['{}InstanceCount'.format(group_id)], + Arg.EbsConfig: { + Arg.EbsBlockDeviceConfigs: [ + { + Arg.EbsVolSpec: { + Arg.EbsVolType: Constant.Gp2, + # Arg.Iops: 123, + Arg.EbsVolSizeGb: self.my_cluster.config.get('{}EbsSize'.format(group_id)) + }, + Arg.EbsVolsPerInstance: self.my_cluster.config.get('{}EbsCount'.format(group_id)) + }, + ], + Arg.EbsOptimized: self.my_cluster.config.get('ebsOptimized') or True + } + } + market = Constant.OnDemand + if self.my_cluster.config.get('{}UseSpotInstances'.format(group_id)): + market = Constant.Spot + bid_price = self.my_cluster.config.get('{}BidPrice'.format(group_id)) or None + if bid_price: + instance_group[Arg.BidPrice] = bid_price + + instance_group[Arg.Market] = market + + return instance_group + + def _get_software_configs(self): + hive_props = {} + + if self.my_cluster.config["metastoreDBMode"] == "CUSTOM_JDBC": + hive_props = { + "javax.jdo.option.ConnectionURL": self.my_cluster.config["metastoreJDBCURL"], + "javax.jdo.option.ConnectionDriverName": self.my_cluster.config["metastoreJDBCDriver"], + "javax.jdo.option.ConnectionUserName": self.my_cluster.config["metastoreJDBCUser"], + "javax.jdo.option.ConnectionPassword": self.my_cluster.config["metastoreJDBCPassword"], + } + elif self.my_cluster.config["metastoreDBMode"] == "MYSQL": + hive_props = { + "javax.jdo.option.ConnectionURL": "jdbc:mysql://{}:3306/hive?createDatabaseIfNotExist=true".format( + self.my_cluster.config["metastoreMySQLHost"] + ), + "javax.jdo.option.ConnectionDriverName": "org.mariadb.jdbc.Driver", + "javax.jdo.option.ConnectionUserName": self.my_cluster.config["metastoreMySQLUser"], + "javax.jdo.option.ConnectionPassword": self.my_cluster.config["metastoreMySQLPassword"] + } + elif self.my_cluster.config["metastoreDBMode"] == "AWS_GLUE_DATA_CATALOG": + hive_props = { + "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" + } + + configurations = self.my_cluster.config.get('softwareConfig') or [] + if not isinstance(configurations, list): + configurations = json.JSONDecoder().decode(configurations) + hive_site_config = ClusterBuilder._get_app_config(config_id="hive-site", configurations=configurations) + if hive_site_config is not None: + hive_site_config['Properties'].update(hive_props) + else: + configurations.append( + { + Arg.Classification: "hive-site", + Arg.Properties: hive_props + } + ) + + self.my_cluster.config['softwareConfig'] = json.JSONEncoder().encode(configurations) + + return configurations + + @staticmethod + def _get_app_config(config_id, configurations): + app_config_search = [c for c in configurations if c[Arg.Classification] == config_id] + + return app_config_search[0] if len(app_config_search) > 0 else None + + def _get_apps_to_install(self): + app_install_key_pattern = "installapp" + app_install_keys = [ + { + 'Name': key.lower().split(app_install_key_pattern)[-1].capitalize() + } for key in self.my_cluster.config.keys() + if self.my_cluster.config.get(key) and len(key.lower().split(app_install_key_pattern)) > 1 + ] + + return app_install_keys if len(app_install_keys) > 0 else [] + + def _get_security_groups(self): + if self.my_cluster.config.get("additionalSecurityGroups"): + return [ + x.strip() for x in self.my_cluster.config.get("additionalSecurityGroups").split(",") + ] + + return [] + + def _get_tags(self, name): + tags = [ + { + Arg.Key: "Name", + Arg.Value: name + } + ] + for tag in (self.my_cluster.config.get("tags") or []): + if tag[Arg.Key] == "Name": + next( + ( + v for i, v in enumerate(tags) if v[Arg.Key] == "Name" + ) + ).update({Arg.Key: tag[Arg.Key], Arg.Value: tag[Arg.Value]}) + continue + + tags.append( + { + Arg.Key: tag[Arg.Key], + Arg.Value: tag[Arg.Value] + } + ) + + return tags + + # TODO: Implement + def _install_python_libs(self): + raise NotImplementedError("To be implemented...") + + # TODO: Implement + def _run_shell_commands(self): + raise NotImplementedError("To be implemented...") + + +class ClusterStopper(object): + def __init__(self, my_cluster): + self.my_cluster = my_cluster + + def terminate_cluster(self, data): + emr_cluster_id = data["emrClusterId"] + region = self.my_cluster.config.get("awsRegionId") or dku_emr.get_current_region() + client = boto3.client('emr', region_name=region) + + # Make sure termination protection is turned off + client.set_termination_protection(JobFlowIds=[emr_cluster_id], TerminationProtected=False) + client.terminate_job_flows(JobFlowIds=[emr_cluster_id]) + + def detach_cluster(self, data): + """ + Since we attached to an existing cluster, we don't stop it + """ + msg = "Detaching. Nothing to do." + logging.info(msg) + + return msg + + +class ClusterAttacher(object): + + def __init__(self, my_cluster): + self.my_cluster = my_cluster + + def attach_cluster(self): + region_name = self.my_cluster.config.get("awsRegionId") or dku_emr.get_current_region() + client = boto3.client("emr", region_name=region_name) + cluster_id = self.my_cluster.config["emrClusterId"] + logging.info("Attaching to EMR cluster id %s" % cluster_id) + + return dku_emr.make_cluster_keys_and_data(client, cluster_id, create_user_dir=True) + + +class ClusterCopier(object): + + def __init__(self, my_cluster): + self.my_cluster = my_cluster + + def copy_cluster(self): + dss_client = dataiku.api_client() + existing_clusters = dss_client.list_clusters() + cluster_id_search = [c['id'] for c in existing_clusters if c['id'] == self.my_cluster.source_cluster_id] + + if len(cluster_id_search) == 0: + raise Exception("Cluster id '{}' does not exist.".format(self.my_cluster.source_cluster_id)) + + template_cluster_settings = dss_client.get_cluster(cluster_id_search[0]).get_settings().settings + if template_cluster_settings['type'] in self.my_cluster.excluded_cluster_types: + raise Exception("Cannot copy from cluster of type '{}'.".format(template_cluster_settings['type'])) + + # Use cluster.json file to specify settings new new cluster + params_template_path = os.path.join( + "/".join(self.my_cluster.resource_dir.split("/")[0:-1]), + "python-clusters/emr-create-cluster" + ) + # Copy settings from source/template cluster to apply to new one + params_template = json.load(open(os.path.join(params_template_path, "cluster.json"), "r")) + params_to_copy = template_cluster_settings['params']['config'] + for key, val in params_to_copy.items(): + params_template[key] = val + + self.my_cluster.config = params_template + + logging.info("Building new cluster '{}' using '{}' as a template".format( + self.my_cluster.cluster_id, self.my_cluster.source_cluster_id) + ) + + return ClusterBuilder(self.my_cluster).build_cluster() + + \ No newline at end of file diff --git a/python-lib/ssh/__init__.py b/python-lib/ssh/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python-lib/ssh/client.py b/python-lib/ssh/client.py new file mode 100644 index 0000000..af74be8 --- /dev/null +++ b/python-lib/ssh/client.py @@ -0,0 +1,116 @@ +import paramiko +import dataiku +import subprocess +import boto3 +import tempfile +import time +from collections import Iterable +from tabulate import tabulate + + +class RemoteSSHClient(object): + + @staticmethod + def get_rsa_keyfile(path): + if not isinstance(path, str): + raise Exception("Parameter 'path': Expected type str. Found {}".format(type(path))) + + if "s3://" in path: + s3_path_parts = path.split("s3://")[1].split("/") + bucket_name = s3_path_parts[0] + bucket_key = "/".join(s3_path_parts[1:-1]) + "/" + file_name = s3_path_parts[-1] + keyfile = tempfile.NamedTemporaryFile(suffix=".pem") + keyfile.write( + boto3.resource('s3').Object(bucket_name, bucket_key + file_name).get()['Body'].read() + ) + keyfile.seek(0) + + return paramiko.RSAKey.from_private_key_file(keyfile.name) + + return paramiko.RSAKey.from_private_key_file(path) + + def __init__(self, host, username, rsa_key_or_path, autoconnect=True): + self.host = host + self.username = username + self.rsa_key = rsa_key_or_path + if not isinstance(self.rsa_key, paramiko.rsakey.RSAKey): + self.rsa_key = RemoteSSHClient.get_rsa_keyfile(rsa_key_or_path) + + self.client = paramiko.SSHClient() + self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + self.is_connected = False + if autoconnect: + self.client.connect(hostname=self.host, username=self.username, pkey=self.rsa_key) + self.client.invoke_shell() + self.is_connected = True + + def __del__(self): + self.client.close() + + def connect(self): + self.client.connect(hostname=self.host, username=self.username, pkey=self.rsa_key) + # self.client.invoke_shell() + self.is_connected = True + + def close(self): + if self.client is not None and self.is_connected: + self.client.close() + self.is_connected = False + print("Closed connection to host %s." % self.host) + + def execute(self, cmds, print_summary=False, disconnect=False, use_channel=False): + if not self.is_connected: + raise ValueError("Cannot execute commands on host %s. You first need to open a connection." % self.host) + + formatted_cmds = RemoteSSHClient._check_and_format_commands(cmds) + + print("Executing the following commands: '{}'".format(list(cmds))) + runner = None + if use_channel: + print("Running via channel.") + runner = self.client.get_transport().open_session() + + runner = runner or self.client + _, stdout, stderr = runner.exec_command(formatted_cmds) + cmd_run_info = { + 'cmds': formatted_cmds, + 'stdout': stdout, + 'stderr': stderr, + 'use_channel': use_channel + } + + if use_channel: + runner.close() + + if print_summary: + print("\nCommand run summary:") + print( + tabulate( + list(cmd_run_info.values()), + list(cmd_run_info.keys()) + ), + "\n" + ) + + if disconnect: + self.close() + + return cmd_run_info + + @staticmethod + def _check_and_format_commands(cmds): + if not isinstance(cmds, Iterable) or isinstance(cmds, str): + raise TypeError("Parameter 'cmds': Expected type Iterable or str. Found {}.".format(type(cmds))) + + if isinstance(cmds, Iterable): + for c in cmds: + if not (isinstance(c, str)): + raise TypeError( + "Parameter 'cmds' contains invalid non-str element {} of type {}".format(c, type(c)) + ) + + return " && ".join(cmds) + + return cmds + \ No newline at end of file From 79ec7f208b7b59c3690eafa1f2b8c090563d6d56 Mon Sep 17 00:00:00 2001 From: dmichalopoulos_komply Date: Mon, 26 Aug 2019 12:02:07 -0500 Subject: [PATCH 03/11] Updated version from 0.0.2 to 0.0.3 --- plugin.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin.json b/plugin.json index 4731d4c..8b47154 100644 --- a/plugin.json +++ b/plugin.json @@ -1,6 +1,6 @@ { "id" : "emr-clusters", - "version" : "0.0.2", + "version" : "0.0.3", "meta" : { "label" : "EMR clusters", "description" : "Dynamically create Amazon EMR clusters or attach to existing ones.", From 2eb43ecc4891cacae6d94260549f243c0627f781 Mon Sep 17 00:00:00 2001 From: dmichalopoulos_komply Date: Mon, 26 Aug 2019 12:02:55 -0500 Subject: [PATCH 04/11] Refactoring to use centralized cluster management operations in cluster_ops.py. --- .../emr-attach-to-existing-cluster/cluster.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/python-clusters/emr-attach-to-existing-cluster/cluster.py b/python-clusters/emr-attach-to-existing-cluster/cluster.py index bc9805b..2a8b59a 100644 --- a/python-clusters/emr-attach-to-existing-cluster/cluster.py +++ b/python-clusters/emr-attach-to-existing-cluster/cluster.py @@ -1,28 +1,25 @@ -import boto3 -import os, json, logging -import dku_emr +import logging from dataiku.cluster import Cluster +from cluster_ops import ClusterAttacher, ClusterStopper # This actually belongs in the main entry point logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO) logging.getLogger().setLevel(logging.INFO) + class MyCluster(Cluster): + def __init__(self, cluster_id, cluster_name, config, plugin_config): self.cluster_id = cluster_id self.cluster_name = cluster_name self.config = config self.plugin_config = plugin_config - + def start(self): - region_name = self.config.get("awsRegionId") or dku_emr.get_current_region() - client = boto3.client("emr", region_name=region_name) - clusterId = self.config["emrClusterId"] - logging.info("Attaching to EMR cluster id %s" % clusterId) - return dku_emr.make_cluster_keys_and_data(client, clusterId, create_user_dir=True) + return ClusterAttacher(my_cluster=self).attach_cluster() def stop(self, data): """ Since we attached to an existing cluster, we don't stop it """ - logging.info("Detaching: nothing to do") + return ClusterStopper(my_cluster=self).detach_cluster(data) From 128b3dae0a4561a045b591ecd9c061c29c433313 Mon Sep 17 00:00:00 2001 From: dmichalopoulos_komply Date: Mon, 26 Aug 2019 12:03:46 -0500 Subject: [PATCH 05/11] Code to support new macro for copying existing clusters (only allowed for clusters created by DSS, i.e., not attached external clusters). --- python-clusters/emr-copy-cluster/__init__.py | 0 python-clusters/emr-copy-cluster/cluster.json | 46 +++++++++++++++++++ python-clusters/emr-copy-cluster/cluster.py | 29 ++++++++++++ 3 files changed, 75 insertions(+) create mode 100644 python-clusters/emr-copy-cluster/__init__.py create mode 100644 python-clusters/emr-copy-cluster/cluster.json create mode 100644 python-clusters/emr-copy-cluster/cluster.py diff --git a/python-clusters/emr-copy-cluster/__init__.py b/python-clusters/emr-copy-cluster/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python-clusters/emr-copy-cluster/cluster.json b/python-clusters/emr-copy-cluster/cluster.json new file mode 100644 index 0000000..fc5c105 --- /dev/null +++ b/python-clusters/emr-copy-cluster/cluster.json @@ -0,0 +1,46 @@ +{ + "meta": { + "label": "EMR cluster (copy of existing cluster)", + "description": "Create a new cluster whose settings will be copied from an existing cluster. Limited to clusters created by DSS, i.e., excludes attached clusters created externally.", + "icon": "icon-cloud" + }, + + "impersonate": false, + + "permissions": [], + + "resultType": "HTML", + + "macroRoles": [ + {"type":"CLUSTER", "targetParamsKey": "dss_cluster_id", "limitToSamePlugin": true} + ], + + "params": [ + { + "name": "source_dss_cluster_id", + "label": "Source DSS Cluster id", + "type": "CLUSTER", + "description": "Identifier of the DSS cluster from which to create a copy", + "mandatory": true + }, + { + "name": "excluded_cluster_types", + "label": "Cluster types that can NOT be copied", + "type": "STRING", + "description": "These types of clusters cannot be copied/duplicated", + "defaultValue": "pycluster_emr-clusters_emr-attach-to-existing-cluster", + "visibilityCondition": false + } + ], + + "actions" : [ + { + "id" : "fetch-nodes-keys", + "meta" : { + "label" : "Fetch node description keys", + "description" : "Fetches node description keys", + "icon" : "icon-search" + } + } + ] +} diff --git a/python-clusters/emr-copy-cluster/cluster.py b/python-clusters/emr-copy-cluster/cluster.py new file mode 100644 index 0000000..eaf61be --- /dev/null +++ b/python-clusters/emr-copy-cluster/cluster.py @@ -0,0 +1,29 @@ +import os +import logging +from dataiku.cluster import Cluster + +from cluster_ops import ClusterCopier, ClusterStopper + +# This actually belongs in the main entry point +logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO) +logging.getLogger().setLevel(logging.INFO) + + +class MyCluster(Cluster): + + def __init__(self, cluster_id, cluster_name, config, plugin_config): + self.cluster_id = cluster_id + self.cluster_name = cluster_name + self.config = config + self.plugin_config = plugin_config + self.resource_dir = os.getenv('DKU_CUSTOM_RESOURCE_FOLDER') + self.excluded_cluster_types = [x.strip() for x in self.config.get('excluded_cluster_types').split(",")] + self.source_cluster_id = self.config.get('source_dss_cluster_id') + self.source_cluster_name = self.source_cluster_id + + def start(self): + return ClusterCopier(self).copy_cluster() + + def stop(self, data): + return ClusterStopper(self).terminate_cluster(data) + From f2186719f8fb74a0fd57647bacd17c1cf88235c3 Mon Sep 17 00:00:00 2001 From: dmichalopoulos_komply Date: Mon, 26 Aug 2019 12:05:09 -0500 Subject: [PATCH 06/11] Refactoring to use centralized cluster management operations in cluster_ops.py. Replaces majority of boto3 string parameter references with those defined in boto_params.py --- .../emr-create-cluster/__init__.py | 0 .../emr-create-cluster/cluster.json | 315 +++++++++++++++--- python-clusters/emr-create-cluster/cluster.py | 149 +-------- 3 files changed, 274 insertions(+), 190 deletions(-) create mode 100644 python-clusters/emr-create-cluster/__init__.py diff --git a/python-clusters/emr-create-cluster/__init__.py b/python-clusters/emr-create-cluster/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python-clusters/emr-create-cluster/cluster.json b/python-clusters/emr-create-cluster/cluster.json index c124f1c..6eafbc6 100644 --- a/python-clusters/emr-create-cluster/cluster.json +++ b/python-clusters/emr-create-cluster/cluster.json @@ -1,84 +1,312 @@ -/* This file is the descriptor for the custom python cluster */ { - /* Meta data for display purposes */ "meta" : { - "label" : "EMR cluster (create cluster)", + "label" : "EMR cluster (create new cluster)", "description" : "Creates and manages EMR clusters", "icon" : "icon-puzzle-piece" }, + + "resourceKeys" : [], "params": [ + { + "type" : "SEPARATOR", + "label": "CLUSTER" + }, + { "name" : "awsRegionId", "label" : "AWS Region", "type": "STRING", - "description": "AWS region id. Leave empty to use region of current instance.", + "description": "AWS region id. Leave empty to use region of current instance", "mandatory" : false, "defaultValue": "us-east-1" }, + { + "name": "emrVersion", + "label" : "EMR Version", + "type": "STRING", + "defaultValue" : "5.23.0", + "description" : "EMR release label, e.g. 5.23.0", + "mandatory" : true + }, + { + "name": "ebsRootVolumeSize", + "label" : "Root volume size", + "type": "INT", + "description" : "Root device EBS volume size in GB, or 0 to use default size. Must be <= 100.", + "defaultValue" : 25, + "mandatory" : true + }, + { + "name": "ebsOptimized", + "label" : "Use optimized EBS volumes", + "type": "BOOLEAN", + "description" : "Applies to all instance groups", + "defaultValue" : true, + "mandatory" : true + }, + { + "name": "tags", + "label": "Tags", + "type" : "KEY_VALUE_LIST", + "description" : "AWS tags to add on the cluster" + }, + { + "name" : "logsPath", + "label" : "Path for logs", + "type" : "STRING", + "mandatory": false, + "description" : "Set specific S3 path where logs will be stored. Optional." + }, + { + "name": "terminationProtected", + "label": "Termination protection", + "description": "Prevents accidental cluster termination.", + "type": "BOOLEAN", + "mandatory": true, + "defaultValue": false + }, + { + "name": "pythonLibs", + "label": "Python libs to install (Advanced)", + "type": "TEXTAREA", + "defaultValue": null, + "description": "Python libraries to install on each node. Use the same formatting as a pip requirements.txt file.", + "mandatory": false, + "visibilityCondition": false + }, + { + "name": "extraSetup", + "label": "Extra setup commands (Advanced)", + "type": "TEXTAREA", + "defaultValue": null, + "description": "Shell commands (set paths or create dirs, for example) to run on each node", + "mandatory": false, + "visibilityCondition": false + }, + + { + "type" : "SEPARATOR", + "label": "APPLICATIONS" + }, + + { + "name": "installAppHadoop", + "type": "BOOLEAN", + "label": "Hadoop", + "defaultValue": true, + "mandatory": true + }, + { + "name": "installAppHive", + "type": "BOOLEAN", + "label": "Hive", + "defaultValue": true, + "mandatory": true + }, + { + "name": "installAppSpark", + "type": "BOOLEAN", + "label": "Spark", + "defaultValue": true, + "mandatory": true + }, + { + "name": "installAppTez", + "type": "BOOLEAN", + "label": "Tez", + "defaultValue": true, + "mandatory": true + }, + { + "name": "installAppPig", + "type": "BOOLEAN", + "label": "Pig", + "defaultValue": true, + "mandatory": true + }, + { + "name": "installAppLivy", + "type": "BOOLEAN", + "label": "Livy", + "defaultValue": true, + "mandatory": true + }, + { + "name": "installAppZookeeper", + "type": "BOOLEAN", + "label": "ZooKeeper", + "defaultValue": false, + "mandatory": true + }, + { + "name": "installAppGanglia", + "type": "BOOLEAN", + "label": "Ganglia", + "defaultValue": true, + "mandatory": true + }, + + { + "name": "softwareConfig", + "label": "Software configurations", + "type": "TEXTAREA", + "description": "Specify desired software config settings as a JSON list of properties. Leave empty to use default EMR settings. For more info, see https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html.", + "defaultValue": null, + "mandatory": false + }, + + { + "type" : "SEPARATOR", + "label": "MASTER NODE" + }, + { "name": "masterInstanceType", - "label" : "Instance type (master)", + "label" : "Instance type", "type": "STRING", - "description": "EC2 instance type for the master instance. Mandatory.", - "defaultValue" : "m4.2xlarge", + "description": "EC2 instance type. Mandatory.", + "defaultValue" : "m5a.4xlarge", "mandatory" : true }, + { + "name": "masterEbsSize", + "type": "INT", + "label" : "EBS storage size (GB)", + "description": "Storage size for each EBS volume. Must be b/w 10 and 1024.", + "defaultValue" : 250, + "mandatory": true + }, + { + "name": "masterEbsCount", + "type": "INT", + "label" : "EBS volume count", + "description": "Number of EBS volumes.", + "defaultValue" : 4, + "mandatory": true + }, + + { + "type" : "SEPARATOR", + "label": "CORE NODES" + }, + { "name": "coreInstanceType", - "label" : "Instance type (core)", + "label" : "Instance type", "type": "STRING", - "description": "Instance type for the CORE instance group (compute + storage slave nodes).", - "defaultValue": "m4.2xlarge", - "mandatory" : false + "description": "Instance type for the CORE instance group (compute + storage slave nodes). Mandatory.", + "defaultValue": "m4.4xlarge", + "mandatory" : true }, { "name": "coreInstanceCount", "type": "INT", - "label" : "Instance count (core)", - "defaultValue" : 2 + "label" : "Instance count", + "description": "Number of instances (must be >= 1).", + "defaultValue" : 2, + "mandatory": true + }, + { + "name": "coreEbsSize", + "type": "INT", + "label" : "EBS volume size (GB).", + "description": "EBS volume size per instance instance. Must be b/w 10 and 1024.", + "defaultValue" : 500, + "mandatory": true + }, + { + "name": "coreEbsCount", + "type": "INT", + "label" : "EBS volume count", + "description": "Number of EBS volumes per instance", + "defaultValue" : 1, + "mandatory": true + }, + { + "name": "coreUseSpotInstances", + "type": "BOOLEAN", + "label": "Use spot instances", + "description": "Spot instances are cheaper, but can be terminated without warning. Not recommended for CORE group.", + "defaultValue": false, + "mandatory": true + }, + { + "name": "coreBidPrice", + "type": "DOUBLE", + "defaultValue": null, + "label": "Bid price (per instance)", + "description": "Max hourly rate (price) you are willing to pay per instance. Leave blank or set to 0 use on-demand price.", + "visibilityCondition": "model.coreUseSpotInstances" + }, + + { + "type" : "SEPARATOR", + "label": "TASK NODES" }, + { "name": "taskInstanceType", - "label" : "Instance type (task)", + "label" : "Instance type", "type": "STRING", "description": "Instance type for the TASK instance group (compute-only slave nodes).", - "defaultValue" : "m4.2xlarge", + "defaultValue" : "m4.4xlarge", "mandatory" : false }, { "name": "taskInstanceCount", "type": "INT", - "label" : "Instance count (task)", - "defaultValue" : 0 + "label" : "Instance count", + "description": "Number of instances. Set to 0 if no task instances are desired.", + "defaultValue" : 0, + "mandatory" : true }, { - "name": "emrVersion", - "label" : "EMR Version", - "type": "STRING", - "defaultValue" : "5.11.1", - "description" : "EMR release label, e.g. 5.11.1", - "mandatory" : true + "name": "taskEbsSize", + "type": "INT", + "label" : "EBS volume size (GB).", + "description": "EBS volume size per instance. Must be b/w 10 and 1024.", + "defaultValue" : 500, + "mandatory": true, + "visibilityCondition": "model.taskInstanceCount > 0" }, { - "name": "ebsRootVolumeSize", - "label" : "Root volume size", + "name": "taskEbsCount", "type": "INT", - "description" : "Root device EBS volume size in GB, or 0 to use default size", - "defaultValue" : 0 + "label" : "EBS volume count", + "description": "Number of EBS volumes per instance", + "defaultValue" : 1, + "mandatory": true, + "visibilityCondition": "model.taskInstanceCount > 0" + }, + { + "name": "taskUseSpotInstances", + "type": "BOOLEAN", + "defaultValue": false, + "label": "Use spot instances", + "description": "Spot instances are cheaper, but can be terminated without warning", + "visibilityCondition": "model.taskInstanceCount > 0" + }, + { + "name": "taskBidPrice", + "type": "DOUBLE", + "defaultValue": null, + "label": "Bid price (per task instance)", + "description": "Max hourly rate (price) you are willing to pay per instance. Leave blank or set to 0 to use on-demand price.", + "visibilityCondition": "model.taskInstanceCount > 0 && model.taskUseSpotInstances" }, - { "type" : "SEPARATOR", "label": "Networking" }, + { "name": "subnetId", "label" : "VPC subnet id", - "description": "VPC subnet in which the cluster must be deployed. Leave empty to use that of the current instance.", + "description": "VPC subnet (e.g., subnet-XXXXXX) in which the cluster must be deployed. Leave empty to use that of the current instance.", "type": "STRING", - "defaultValue" : "subnet-XXXXXX", + "defaultValue" : null, "mandatory" : false }, { @@ -86,6 +314,7 @@ "label" : "Additional security groups", "type" : "STRING", "mandatory": false, + "defaultValue": null, "description" : "Optional. Comma-separated list of security groups ids to add to the nodes" }, @@ -94,11 +323,12 @@ "type" : "SEPARATOR", "label": "Security" }, + { "name": "ec2KeyName", "label" : "Key pair name", "type": "STRING", - "description" : "Optional but recommended. SSH Keypair to add on the cluster nodes", + "description" : "Optional but recommended. Name of SSH keypair to add on the cluster nodes.", "mandatory" : false }, { @@ -106,7 +336,7 @@ "label" : "Service role", "type": "STRING", "defaultValue": "EMR_DefaultRole", - "description" : "IAM role to attach to the service. Must have ability to call required AWS services to create the cluster", + "description" : "IAM role to attach to the service. Must have ability to call mandatory AWS services to create the cluster.", "mandatory" : true }, { @@ -114,7 +344,7 @@ "label" : "Nodes role", "type": "STRING", "defaultValue": "EMR_EC2_DefaultRole", - "description" : "IAM role to attach to the nodes of the cluster. Needs to have ability to call required AWS services from the cluster, like S3", + "description" : "IAM role to attach to the nodes of the cluster. Needs to have ability to call mandatory AWS services from the cluster, like S3", "mandatory" : true }, @@ -122,6 +352,7 @@ "type" : "SEPARATOR", "label": "Hive Metastore" }, + { "name": "metastoreDBMode", "label" : "Metastore database", @@ -201,24 +432,6 @@ "defaultValue" : "dataiku", "description" : "Comma-separated list of databases to create upon startup", "mandatory" : false - }, - - { - "type" : "SEPARATOR", - "label": "Misc" - }, - { - "name": "tags", - "label": "Tags", - "type" : "KEY_VALUE_LIST", - "description" : "AWS tags to add on the cluster" - }, - { - "name" : "logsPath", - "label" : "Path for logs", - "type" : "STRING", - "mandatory": false, - "description" : "Optional. S3 path where logs will be stored" } ], diff --git a/python-clusters/emr-create-cluster/cluster.py b/python-clusters/emr-create-cluster/cluster.py index 6144264..53a7830 100644 --- a/python-clusters/emr-create-cluster/cluster.py +++ b/python-clusters/emr-create-cluster/cluster.py @@ -1,158 +1,29 @@ -import boto3 -import dku_emr -import os, json, argparse, logging +import os +import logging + from dataiku.cluster import Cluster +from cluster_ops import ClusterBuilder, ClusterStopper # This actually belongs in the main entry point logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO) logging.getLogger().setLevel(logging.INFO) + class MyCluster(Cluster): + def __init__(self, cluster_id, cluster_name, config, plugin_config): self.cluster_id = cluster_id self.cluster_name = cluster_name self.config = config self.plugin_config = plugin_config - - def start(self): - region = self.config.get("awsRegionId") or dku_emr.get_current_region() - client = boto3.client('emr', region_name=region) - release = 'emr-%s' % self.config["emrVersion"] - - name = "DSS cluster id=%s name=%s" % (self.cluster_id, self.cluster_name) - - logging.info("starting cluster, release=%s name=%s" % (release, name)) - - extraArgs = {} - if "logsPath" in self.config: - extraArgs['LogUri'] = self.config["logsPath"] - if "securityConfiguration" in self.config: - extraArgs["SecurityConfiguration"] = self.config["securityConfiguration"] - if self.config.get("ebsRootVolumeSize", 0): - extraArgs["EbsRootVolumeSize"] = self.config["ebsRootVolumeSize"] - - security_groups = [] - if "additionalSecurityGroups" in self.config: - security_groups = [x.strip() for x in self.config["additionalSecurityGroups"].split(",")] - - subnet = self.config.get("subnetId") or dku_emr.get_current_subnet() + self.resource_dir = os.getenv('DKU_CUSTOM_RESOURCE_FOLDER') - instances = { - 'InstanceGroups': [{ - 'InstanceRole': 'MASTER', - 'InstanceType': self.config["masterInstanceType"], - 'InstanceCount': 1 - }], - 'KeepJobFlowAliveWhenNoSteps': True, - 'Ec2SubnetId': subnet, - 'AdditionalMasterSecurityGroups': security_groups, - 'AdditionalSlaveSecurityGroups': security_groups - } - - if self.config.get("coreInstanceCount"): - if not self.config.get("coreInstanceType"): - raise Exception("Missing core instance type") - instances['InstanceGroups'].append({ - 'InstanceRole': 'CORE', - 'InstanceType': self.config["coreInstanceType"], - 'InstanceCount': self.config["coreInstanceCount"] - }) - - if self.config.get("taskInstanceCount"): - if not self.config.get("taskInstanceType"): - raise Exception("Missing task instance type") - instances['InstanceGroups'].append({ - 'InstanceRole': 'TASK', - 'InstanceType': self.config["taskInstanceType"], - 'InstanceCount': self.config["taskInstanceCount"] - }) - - if "ec2KeyName" in self.config: - instances['Ec2KeyName'] = self.config["ec2KeyName"] - - tags = [{'Key': 'Name', 'Value': name}] - for tag in self.config.get("tags", []): - tags.append({"Key" : tag["from"], "Value" : tag["to"]}) - - if self.config["metastoreDBMode"] == "CUSTOM_JDBC": - props = { - "javax.jdo.option.ConnectionURL" : self.config["metastoreJDBCURL"], - "javax.jdo.option.ConnectionDriverName": self.config["metastoreJDBCDriver"], - "javax.jdo.option.ConnectionUserName": self.config["metastoreJDBCUser"], - "javax.jdo.option.ConnectionPassword": self.config["metastoreJDBCPassword"], - } - Configurations = [{"Classification": "hive-site", "Properties" : props}] - extraArgs["Configurations"] = Configurations - elif self.config["metastoreDBMode"] == "MYSQL": - props = { - "javax.jdo.option.ConnectionURL" : "jdbc:mysql://%s:3306/hive?createDatabaseIfNotExist=true" % self.config["metastoreMySQLHost"], - "javax.jdo.option.ConnectionDriverName": "org.mariadb.jdbc.Driver", - "javax.jdo.option.ConnectionUserName": self.config["metastoreMySQLUser"], - "javax.jdo.option.ConnectionPassword": self.config["metastoreMySQLPassword"] - } - Configurations = [{"Classification": "hive-site", "Properties" : props}] - extraArgs["Configurations"] = Configurations - elif self.config["metastoreDBMode"] == "AWS_GLUE_DATA_CATALOG": - props = { - "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" - } - Configurations = [{"Classification": "hive-site", "Properties" : props}] - extraArgs["Configurations"] = Configurations - - logging.info("Starting cluster: %s", dict( - Name=name, - ReleaseLabel=release, - Instances=instances, - Applications=[ - {"Name": "Hadoop"}, - {"Name": "Hive"}, - {"Name": "Tez"}, - {"Name": "Pig"}, - {"Name": "Spark"}, - {"Name": "Zookeeper"} - ], - VisibleToAllUsers=True, - JobFlowRole=self.config["nodesRole"], - ServiceRole=self.config["serviceRole"], - Tags=tags, - **extraArgs - )) - - response = client.run_job_flow( - Name=name, - ReleaseLabel=release, - Instances=instances, - Applications=[ - {"Name": "Hadoop"}, - {"Name": "Hive"}, - {"Name": "Tez"}, - {"Name": "Pig"}, - {"Name": "Spark"}, - {"Name": "Zookeeper"} - ], - VisibleToAllUsers=True, - JobFlowRole=self.config["nodesRole"], - ServiceRole=self.config["serviceRole"], - Tags=tags, - **extraArgs - ) - - clusterId = response['JobFlowId'] - logging.info("clusterId=%s" % clusterId) - - logging.info("waiting for cluster to start") - client.get_waiter('cluster_running').wait(ClusterId=clusterId) - - return dku_emr.make_cluster_keys_and_data(client, clusterId, create_user_dir=True, create_databases=self.config.get("databasesToCreate")) + def start(self): + return ClusterBuilder(my_cluster=self).build_cluster() def stop(self, data): """ Stop the cluster - :param data: the dict of data that the start() method produced for the cluster """ - emrClusterId = data["emrClusterId"] - - region = self.config.get("awsRegionId") or dku_emr.get_current_region() - client = boto3.client('emr', region_name=region) - client.terminate_job_flows(JobFlowIds=[emrClusterId]) + return ClusterStopper(my_cluster=self).terminate_cluster(data) From c475d399a9460ecc8110abc96ae317718f2c78a0 Mon Sep 17 00:00:00 2001 From: dmichalopoulos_komply Date: Mon, 26 Aug 2019 12:06:27 -0500 Subject: [PATCH 07/11] Updated README to include info on (proposed) version 0.0.3 enhancements in this branch. --- README.md | 39 ++++++++++++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 38caf6a..419955d 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,40 @@ -# EMR clusters plugin - +# EMR clusters plugin (emr-clusters) Plugin to add AWS EMR support in Dataiku DSS -## Documentation +## Contributors +- Dataiku ([dataiku](https://github.com/dataiku)) +- Dennis P. Michalopoulos ([dmichalopoulos](https://github.com/dmichalopoulos)) -https://doc.dataiku.com/dss/latest/hadoop/dynamic-emr.html +## Version +0.0.3 -## License +## Documentation +https://doc.dataiku.com/dss/latest/hadoop/dynamic-emr.html +## License This project is licensed under the Apache Software License. + +## Release Notes (v0.0.3) + +### New macro: `emr-copy-cluster` +- Allows you to launch a new cluster whose settings are copied from an existing cluster +- Copyable clusters are limited to those created by the `emr-create-cluster` macro + +### New cluster parameter configurations added to `emr-create-cluster` +- Add your own EMR software application configurations +- Request spot instances for CORE and TASK nodes at a specified bid price (defaults to on-demand price) +- Select which software applications you want to install (from a limited set) +- Specify desired number of EBS volumes per instance group +- Specify whether or not to use EBS-optimized volumes (applies to all groups) +- Add termination protection to your cluster (turned off automatically when stopping a created/copied cluster) + +### Various code refactorings +- Moved core logic of `Cluster.start(..)` and `Cluster.stop(..)` functions to `cluster_ops.py` +- Created `boto_params.py` to store class-level variables for all required `boto3` client commands and responses +- Some rewording of parameter descriptions + +### In development (not yet implemented) +- Allow user to specify python libraries to install on each cluster node +- Allow user to specify shell commands (e.g., to create directories or perform other custom actions) to run on each instance once cluster launch is complete +- Make it such that both the above are performed on any new instances added after rescaling operations, or when spot instances are terminated and replaced + From ca9ee62694127031521c9d7cbdd67e494bae7e05 Mon Sep 17 00:00:00 2001 From: dmichalopoulos_komply Date: Mon, 26 Aug 2019 12:07:23 -0500 Subject: [PATCH 08/11] Updated requirements.txt, and added support for Python 3.7 in desc.json. --- code-env/python/desc.json | 2 +- code-env/python/spec/requirements.txt | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/code-env/python/desc.json b/code-env/python/desc.json index d639a2b..0dd48be 100644 --- a/code-env/python/desc.json +++ b/code-env/python/desc.json @@ -1,5 +1,5 @@ { - "acceptedPythonInterpreters": ["PYTHON27", "PYTHON35", "PYTHON36"], + "acceptedPythonInterpreters": ["PYTHON27", "PYTHON35", "PYTHON36", "PYTHON37"], "forceConda": false, "installCorePackages": true, "installJupyterSupport": false diff --git a/code-env/python/spec/requirements.txt b/code-env/python/spec/requirements.txt index 1db657b..04058a9 100644 --- a/code-env/python/spec/requirements.txt +++ b/code-env/python/spec/requirements.txt @@ -1 +1,4 @@ -boto3 \ No newline at end of file +boto3 +paramiko +numpy +tabulate \ No newline at end of file From 28da93f95b1bd777bc5e33695db432fec66e322f Mon Sep 17 00:00:00 2001 From: dmichalopoulos_komply Date: Thu, 29 Aug 2019 08:24:31 -0500 Subject: [PATCH 09/11] Added more to description of "terminationProtection" param in emr-create-cluster/cluster.json. --- python-clusters/emr-create-cluster/cluster.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python-clusters/emr-create-cluster/cluster.json b/python-clusters/emr-create-cluster/cluster.json index 6eafbc6..89e4983 100644 --- a/python-clusters/emr-create-cluster/cluster.json +++ b/python-clusters/emr-create-cluster/cluster.json @@ -61,7 +61,7 @@ { "name": "terminationProtected", "label": "Termination protection", - "description": "Prevents accidental cluster termination.", + "description": "Prevents accidental cluster termination from AWS console. Automatically turned off when request to stop/detach is made.", "type": "BOOLEAN", "mandatory": true, "defaultValue": false From 7c10e21b5d255530cef606b2d813278a6e1066c9 Mon Sep 17 00:00:00 2001 From: dmichalopoulos_komply Date: Thu, 29 Aug 2019 08:25:37 -0500 Subject: [PATCH 10/11] Started working on functions for running user-specified shell commands on cluster nodes. --- python-lib/ssh/cmd_runner.py | 206 +++++++++++++++++++++++++++++++++++ 1 file changed, 206 insertions(+) create mode 100644 python-lib/ssh/cmd_runner.py diff --git a/python-lib/ssh/cmd_runner.py b/python-lib/ssh/cmd_runner.py new file mode 100644 index 0000000..9a3296a --- /dev/null +++ b/python-lib/ssh/cmd_runner.py @@ -0,0 +1,206 @@ +import boto3 +import paramiko +import sys +import subprocess +from time import sleep +from numpy import unique +import dataiku +import logging + +from client import RemoteSSHClient + + +def get_emr_cluster_info(dss_cluster_id): + """Returns dictionary containing info about the EMR cluster used by a project.""" + # client = client or dataiku.api_client() + # cluster_info_macro = proj.get_macro("pyrunnable_emr-clusters_get-cluster-info") + # dss_cluster_info = {'dss_cluster_id': dss_cluster_id} + # response = cluster_info_macro.get_result(cluster_info_macro.run(params=dss_cluster_info)) + + cluster = dataiku.api_client().get_cluster(dss_cluster_id) + cluster_settings = cluster.get_settings().settings + emr_cluster_id = cluster_settings['data']['emrClusterId'] + aws_region_id = cluster_settings['params']['config']['awsRegionId'] + + boto_client = boto3.client('emr', region_name=aws_region_id) + instance_groups = { + g['Id']: {} + for g in boto_client.list_instance_groups(ClusterId=emr_cluster_id) + } + for gid in instance_groups.keys(): + pass + + dss_cluster_info = { + 'dss_cluster_id': dss_cluster_id, + 'emr_cluster_id': emr_cluster_id, + 'instance_groups': instance_groups + } + + logging.info("retrieving master instance") + master_instances = boto_client.list_instances(ClusterId=emr_cluster_id, + InstanceGroupTypes=['MASTER'], + InstanceStates=['AWAITING_FULFILLMENT', 'PROVISIONING', 'BOOTSTRAPPING', + 'RUNNING']) + master_instance_info = {"privateIpAddress": master_instances['Instances'][0]["PublicIpAddress"]} + + # if cluster_mgr_project_key is not None: + # cluster_mgr_info = client.get_project(cluster_mgr_project_key).get_variables( + # )['standard']['emr']['clusters'].get(dss_cluster_id) + # if cluster_mgr_info is not None: + # instance_groups = cluster_mgr_info['instance_groups'] + # for dss_grp_info in dss_cluster_info['instanceGroups']: + # grp_id = dss_grp_info['instanceGroupId'] + # cluster_mgr_grp_info = instance_groups.get(grp_id) + # if cluster_mgr_grp_info: + # dss_grp_info.update( + # {'resizable': cluster_mgr_grp_info.get('resizable', False)} + # ) + + return dss_cluster_info + + +def cleanup(ip_to_connectionvals_dict, scen_vars, all_exit_zero_status): + print("Closing all channels and connections ...") + for v in ip_to_connectionvals_dict.values(): + v['client'].close() + ''' + for w in v.values(): + w['stdin'].channel.close() + w['stdout'].channel.close() + w['stderr'].channel.close() + ''' + abort_cond = bool(scen_vars['abort_on_nonzero_exit']) + # if abort_cond and all_exit_zero_status != 0: + # print("\n WARNING: NOT ALL REMOTE PROCESSES RETURNED EXIT CODE 0. ABORTING SCENARIO.\n") + # DSSScenario( + # api_client(), + # "DSS_ADMIN", + # "INSTALL_ON_EMR_NODES" + # ).abort() + + +def run_install_commands(scen_vars, cluster_id, instance_group_ids): + dss_id = scen_vars["dss_instance_id"] + username = scen_vars['username'] + keyfile_path = scen_vars['keyfile_path'] + delete_keyfile = False + # if "://" in keyfile_path: + # keyfile_path, delete_keyfile = helpers.download_keyfile(keyfile_path) + + key = paramiko.RSAKey.from_private_key_file(keyfile_path) + ssh_cmd_list = scen_vars['ssh_cmd_list'] + all_exit_zero_status = 0 + ip_to_sshvals_dict = {} + + print("GOING TO RUN FOLLOWING COMMANDS:", ssh_cmd_list) + + try: + emr = boto3.client( + 'emr', + aws_access_key_id=scen_vars['s3_access_key'], + aws_secret_access_key=scen_vars['s3_secret_key'], + region_name=scen_vars['aws_region'] + ) + paginator = emr.get_paginator('list_instances') + + sleep_interval = float(scen_vars['sleep_interval']) + for gid in instance_group_ids: + page_iterator = paginator.paginate(ClusterId=cluster_id, InstanceGroupId=gid, InstanceStates=['RUNNING']) + node_ips = [] + for p in page_iterator: + node_ips += [x['PublicIpAddress'] for x in p['Instances']] + for ip in node_ips: + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect(hostname=ip, username=username, pkey=key) + client.invoke_shell() + ip_to_sshvals_dict[ip] = {} + ip_to_sshvals_dict[ip]['client'] = client + for cmd in ssh_cmd_list: + stdin, stdout, stderr = client.exec_command(cmd) + sleep(sleep_interval) + ip_to_sshvals_dict[ip][cmd] = {} + ip_to_sshvals_dict[ip][cmd]['stdin'] = stdin + ip_to_sshvals_dict[ip][cmd]['stdout'] = stdout + ip_to_sshvals_dict[ip][cmd]['stderr'] = stderr + ip_to_sshvals_dict[ip][cmd]['exit_status'] = -1 + ip_to_sshvals_dict[ip][cmd]['is_done'] = False + + all_done = False + while not all_done: + all_done = True + for ip, v in ip_to_sshvals_dict.items(): + query_items = {k: v[k] for k in v.keys() if k != 'client'} + for cmd, w in query_items.items(): + if not w['is_done']: + if w['stdout'].channel.exit_status_ready(): + w['exit_status'] = w['stdout'].channel.recv_exit_status() + w['is_done'] = True + print("\nProcess '%s' on node %s returned exit status %s\n" % (cmd, ip, w['exit_status'])) + all_exit_zero_status = max(all_exit_zero_status, abs(w['exit_status'])) + continue + all_done = False + + print("\nAll processes complete.\n") + + process_exit_statuses = [] + for ip in ip_to_sshvals_dict.keys(): + process_exit_statuses += [ + int(r['exit_status']) for cmd, r in ip_to_sshvals_dict[ip].items() \ + if cmd != "client" + ] + process_exit_statuses = unique(process_exit_statuses) + print( + "COMMAND EXIT STATUS SUMMARY --> All successful: {}\n".format( + True if (len(process_exit_statuses) == 1 and process_exit_statuses[0] == 0) \ + else False + ) + ) + + for ip in ip_to_sshvals_dict.keys(): + process_exit_statuses = unique( + [ + int(r['exit_status']) for cmd, r in ip_to_sshvals_dict[ip].items() \ + if cmd != "client" + ] + ) + print( + "Node {} --> All successful: {}".format( + ip, + True if (len(process_exit_statuses) == 1 and process_exit_statuses[0] == 0) \ + else False + ) + ) + for cmd, results in ip_to_sshvals_dict[ip].items(): + if cmd == "client": + continue + print("\t'{}': {}".format(cmd, results['exit_status'])) + print("\n") + + except: + print("Unexpected error:", sys.exc_info()[0]) + raise + + finally: + if delete_keyfile: + subprocess.check_output("rm " + keyfile_path, shell=True) + + cleanup(ip_to_sshvals_dict, scen_vars, all_exit_zero_status) + + +def run(): + # scen = Scenario() + # scen_vars = scen.get_all_variables() + # project_key = scen_vars['projectKey'] + # project_vars = api_client().get_project(project_key).get_variables()['standard'] + # tier = scen_vars["dss_instance_type"] + # emr_params = project_vars['emr'][tier] + # cluster_id = scen_vars['cluster_id'] if 'cluster_id' in scen_vars else emr_params['cluster_id'] + # instance_group_ids = None + # if 'instance_group_ids' in scen_vars: + # instance_group_ids = scen_vars['instance_group_ids'].split(",") + # else: + # instance_group_ids = emr_params['task_instance_group_id'].split(",") + # + # run_install_commands(scen_vars, cluster_id, instance_group_ids) + pass From cccd35af2511dcf89b6db3da60d6d6eaa374acf4 Mon Sep 17 00:00:00 2001 From: dmichalopoulos_komply Date: Thu, 29 Aug 2019 08:26:07 -0500 Subject: [PATCH 11/11] Started working on functions for running user-specified shell commands on cluster nodes. --- python-lib/ssh/client.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python-lib/ssh/client.py b/python-lib/ssh/client.py index af74be8..ddf3e3b 100644 --- a/python-lib/ssh/client.py +++ b/python-lib/ssh/client.py @@ -1,6 +1,4 @@ import paramiko -import dataiku -import subprocess import boto3 import tempfile import time @@ -73,6 +71,7 @@ def execute(self, cmds, print_summary=False, disconnect=False, use_channel=False runner = runner or self.client _, stdout, stderr = runner.exec_command(formatted_cmds) + # time.sleep(1) cmd_run_info = { 'cmds': formatted_cmds, 'stdout': stdout,