diff --git a/plugin.json b/plugin.json index 9c4d824..8bdb223 100644 --- a/plugin.json +++ b/plugin.json @@ -1,6 +1,6 @@ { "id": "emr-clusters", - "version": "0.0.2", + "version": "0.0.3", "meta": { "label": "EMR clusters", "description": "Dynamically create Amazon EMR clusters or attach to existing ones.", diff --git a/python-clusters/emr-attach-to-existing-cluster/cluster.json b/python-clusters/emr-attach-to-existing-cluster/cluster.json index 49c8c4e..8959c7e 100644 --- a/python-clusters/emr-attach-to-existing-cluster/cluster.json +++ b/python-clusters/emr-attach-to-existing-cluster/cluster.json @@ -35,7 +35,7 @@ "name": "secretKey", "label": "AWS Secret Key", "description": "If empty, uses AWS credentials from the environment (IAM role, ~/.aws/credentials or AWS_ACCESS_KEY_ID environment variable)", - "type": "STRING", + "type": "PASSWORD", "visibilityCondition": "!model.useRole" }, { diff --git a/python-clusters/emr-create-cluster/cluster.json b/python-clusters/emr-create-cluster/cluster.json index b4fbbe1..e619550 100644 --- a/python-clusters/emr-create-cluster/cluster.json +++ b/python-clusters/emr-create-cluster/cluster.json @@ -42,6 +42,19 @@ "label" : "Instance count (core)", "defaultValue" : 2 }, + { + "name": "enableCoreAutoScaling", + "type": "BOOLEAN", + "label": "Enable Core Node Autoscaling", + "defaultValue": false + }, + { + "name": "coreNodeAutoScalingPolicy", + "type": "TEXTAREA", + "label": "Core Node Autoscaling Policy", + "description": "Use JSON format, see here: https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-automatic-scaling.html", + "visibilityCondition": "model.enableCoreAutoScaling" + }, { "name": "taskInstanceType", "label" : "Instance type (task)", @@ -56,6 +69,19 @@ "label" : "Instance count (task)", "defaultValue" : 0 }, + { + "name": "enableTaskAutoScaling", + "type": "BOOLEAN", + "label": "Enable Task Node Autoscaling", + "defaultValue": false + }, + { + "name": "taskNodeAutoScalingPolicy", + "type": "TEXTAREA", + "label": "Task Node Autoscaling Policy", + "description": "Use JSON format, see here: https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-automatic-scaling.html", + "visibilityCondition": "model.enableTaskAutoScaling" + }, { "name": "emrVersion", "label" : "EMR Version", @@ -121,7 +147,14 @@ "description" : "IAM role to attach to the nodes of the cluster. Needs to have ability to call required AWS services from the cluster, like S3", "mandatory" : true }, - + { + "name": "autoScalingRole", + "label": "Autoscaling Role", + "type": "STRING", + "description": "IAM role with permissions to add and terminate instances when scaling activities are triggered", + "defaultValue": "EMR_AutoScaling_DefaultRole", + "mandatory" : true + }, { "type" : "SEPARATOR", "label": "Hive Metastore" @@ -248,7 +281,7 @@ "name": "secretKey", "label": "AWS Secret Key", "description": "If empty, uses AWS credentials from the environment (IAM role, ~/.aws/credentials or AWS_ACCESS_KEY_ID environment variable)", - "type": "STRING", + "type": "PASSWORD", "visibilityCondition": "!model.useRole" }, { @@ -256,6 +289,12 @@ "label": "EMR Security Configuration", "type": "STRING", "description": "EMR Security Configuration to use, must already exist" + }, + { + "name": "bootstrapActions", + "label": "EMR Bootstrap Actions", + "type": "KEY_VALUE_LIST", + "description": "Key should be S3 path (s3://path/to/file) and value should be comma separated list of args: arg1,arg2,arg3, if needed" } ], diff --git a/python-clusters/emr-create-cluster/cluster.py b/python-clusters/emr-create-cluster/cluster.py index 0c53891..98d8fdd 100644 --- a/python-clusters/emr-create-cluster/cluster.py +++ b/python-clusters/emr-create-cluster/cluster.py @@ -31,6 +31,22 @@ def start(self): extraArgs["SecurityConfiguration"] = self.config["securityConfiguration"] if self.config.get("ebsRootVolumeSize", 0): extraArgs["EbsRootVolumeSize"] = int(self.config["ebsRootVolumeSize"]) + if "bootstrapActions" in self.config: + extraArgs["BootstrapActions"] = [] + + for idx, ba in enumerate(self.config["bootstrapActions"]): + if len(ba["to"]) > 0: + args = ba["to"].split(",") + else: + args = [] + config = { + "Name": "action_{}".format(idx), + "ScriptBootstrapAction": { + "Path": ba["from"], + "Args": args + } + } + extraArgs["BootstrapActions"].append(config) security_groups = [] if "additionalSecurityGroups" in self.config: @@ -53,20 +69,38 @@ def start(self): if self.config.get("coreInstanceCount"): if not self.config.get("coreInstanceType"): raise Exception("Missing core instance type") - instances['InstanceGroups'].append({ + instance_group = { 'InstanceRole': 'CORE', 'InstanceType': self.config["coreInstanceType"], 'InstanceCount': int(self.config["coreInstanceCount"]) - }) + } + + if self.config.get("enableCoreAutoScaling"): + core_node_policy = json.loads(self.config.get("coreNodeAutoScalingPolicy")) + + for rule in core_node_policy["Rules"]: + rule["Trigger"]["CloudWatchAlarmDefinition"]["Dimensions"] = [{"Key": "JobFlowId", "Value": "${emr.clusterId}"}] + instance_group["AutoScalingPolicy"] = core_node_policy + + instances['InstanceGroups'].append(instance_group) if self.config.get("taskInstanceCount"): if not self.config.get("taskInstanceType"): raise Exception("Missing task instance type") - instances['InstanceGroups'].append({ + instance_group = { 'InstanceRole': 'TASK', 'InstanceType': self.config["taskInstanceType"], 'InstanceCount': int(self.config["taskInstanceCount"]) - }) + } + + if self.config.get("enableTaskAutoScaling"): + task_node_policy = json.loads(self.config.get("taskNodeAutoScalingPolicy")) + + for rule in task_node_policy["Rules"]: + rule["Trigger"]["CloudWatchAlarmDefinition"]["Dimensions"] = [{"Key": "JobFlowId", "Value": "${emr.clusterId}"}] + instance_group["AutoScalingPolicy"] = task_node_policy + + instances['InstanceGroups'].append(instance_group) if self.config.get("securityConfig"): extraArgs["SecurityConfiguration"] = self.config.get("securityConfig") @@ -145,6 +179,7 @@ def start(self): VisibleToAllUsers=True, JobFlowRole=self.config["nodesRole"], ServiceRole=self.config["serviceRole"], + AutoScalingRole=self.config["autoScalingRole"], Tags=tags, **extraArgs )