Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,40 @@
# EMR clusters plugin

# EMR clusters plugin (emr-clusters)
Plugin to add AWS EMR support in Dataiku DSS

## Documentation
## Contributors
- Dataiku ([dataiku](https://github.com/dataiku))
- Dennis P. Michalopoulos ([dmichalopoulos](https://github.com/dmichalopoulos))

https://doc.dataiku.com/dss/latest/hadoop/dynamic-emr.html
## Version
0.0.3

## License
## Documentation
https://doc.dataiku.com/dss/latest/hadoop/dynamic-emr.html

## License
This project is licensed under the Apache Software License.

## Release Notes (v0.0.3)

### New macro: `emr-copy-cluster`
- Allows you to launch a new cluster whose settings are copied from an existing cluster
- Copyable clusters are limited to those created by the `emr-create-cluster` macro

### New cluster parameter configurations added to `emr-create-cluster`
- Add your own EMR software application configurations
- Request spot instances for CORE and TASK nodes at a specified bid price (defaults to on-demand price)
- Select which software applications you want to install (from a limited set)
- Specify desired number of EBS volumes per instance group
- Specify whether or not to use EBS-optimized volumes (applies to all groups)
- Add termination protection to your cluster (turned off automatically when stopping a created/copied cluster)

### Various code refactorings
- Moved core logic of `Cluster.start(..)` and `Cluster.stop(..)` functions to `cluster_ops.py`
- Created `boto_params.py` to store class-level variables for all required `boto3` client commands and responses
- Some rewording of parameter descriptions

### In development (not yet implemented)
- Allow user to specify python libraries to install on each cluster node
- Allow user to specify shell commands (e.g., to create directories or perform other custom actions) to run on each instance once cluster launch is complete
- Make it such that both the above are performed on any new instances added after rescaling operations, or when spot instances are terminated and replaced

Empty file added __init__.py
Empty file.
2 changes: 1 addition & 1 deletion code-env/python/desc.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"acceptedPythonInterpreters": ["PYTHON27", "PYTHON35", "PYTHON36"],
"acceptedPythonInterpreters": ["PYTHON27", "PYTHON35", "PYTHON36", "PYTHON37"],
"forceConda": false,
"installCorePackages": true,
"installJupyterSupport": false
Expand Down
5 changes: 4 additions & 1 deletion code-env/python/spec/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
boto3
boto3
paramiko
numpy
tabulate
2 changes: 1 addition & 1 deletion plugin.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"id" : "emr-clusters",
"version" : "0.0.2",
"version" : "0.0.3",
"meta" : {
"label" : "EMR clusters",
"description" : "Dynamically create Amazon EMR clusters or attach to existing ones.",
Expand Down
Empty file added python-clusters/__init__.py
Empty file.
Empty file.
17 changes: 7 additions & 10 deletions python-clusters/emr-attach-to-existing-cluster/cluster.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@
import boto3
import os, json, logging
import dku_emr
import logging
from dataiku.cluster import Cluster
from cluster_ops import ClusterAttacher, ClusterStopper

# This actually belongs in the main entry point
logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)


class MyCluster(Cluster):

def __init__(self, cluster_id, cluster_name, config, plugin_config):
self.cluster_id = cluster_id
self.cluster_name = cluster_name
self.config = config
self.plugin_config = plugin_config

def start(self):
region_name = self.config.get("awsRegionId") or dku_emr.get_current_region()
client = boto3.client("emr", region_name=region_name)
clusterId = self.config["emrClusterId"]
logging.info("Attaching to EMR cluster id %s" % clusterId)
return dku_emr.make_cluster_keys_and_data(client, clusterId, create_user_dir=True)
return ClusterAttacher(my_cluster=self).attach_cluster()

def stop(self, data):
"""
Since we attached to an existing cluster, we don't stop it
"""
logging.info("Detaching: nothing to do")
return ClusterStopper(my_cluster=self).detach_cluster(data)
Empty file.
46 changes: 46 additions & 0 deletions python-clusters/emr-copy-cluster/cluster.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"meta": {
"label": "EMR cluster (copy of existing cluster)",
"description": "Create a new cluster whose settings will be copied from an existing cluster. Limited to clusters created by DSS, i.e., excludes attached clusters created externally.",
"icon": "icon-cloud"
},

"impersonate": false,

"permissions": [],

"resultType": "HTML",

"macroRoles": [
{"type":"CLUSTER", "targetParamsKey": "dss_cluster_id", "limitToSamePlugin": true}
],

"params": [
{
"name": "source_dss_cluster_id",
"label": "Source DSS Cluster id",
"type": "CLUSTER",
"description": "Identifier of the DSS cluster from which to create a copy",
"mandatory": true
},
{
"name": "excluded_cluster_types",
"label": "Cluster types that can NOT be copied",
"type": "STRING",
"description": "These types of clusters cannot be copied/duplicated",
"defaultValue": "pycluster_emr-clusters_emr-attach-to-existing-cluster",
"visibilityCondition": false
}
],

"actions" : [
{
"id" : "fetch-nodes-keys",
"meta" : {
"label" : "Fetch node description keys",
"description" : "Fetches node description keys",
"icon" : "icon-search"
}
}
]
}
29 changes: 29 additions & 0 deletions python-clusters/emr-copy-cluster/cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os
import logging
from dataiku.cluster import Cluster

from cluster_ops import ClusterCopier, ClusterStopper

# This actually belongs in the main entry point
logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)


class MyCluster(Cluster):

def __init__(self, cluster_id, cluster_name, config, plugin_config):
self.cluster_id = cluster_id
self.cluster_name = cluster_name
self.config = config
self.plugin_config = plugin_config
self.resource_dir = os.getenv('DKU_CUSTOM_RESOURCE_FOLDER')
self.excluded_cluster_types = [x.strip() for x in self.config.get('excluded_cluster_types').split(",")]
self.source_cluster_id = self.config.get('source_dss_cluster_id')
self.source_cluster_name = self.source_cluster_id

def start(self):
return ClusterCopier(self).copy_cluster()

def stop(self, data):
return ClusterStopper(self).terminate_cluster(data)

Empty file.
Loading