Simplify support for multiple cluster types (#248)

* Renamed cloud_provider config to cluster_type
* cluster_type supports ec2 & existing clusters
* Ec2Cluster & ExistingCluster are now in seperates classes
* Ec2Cluster extends ExistingCluster and depends on boto
* ExistingCluster has now boto dependency
diff --git a/ansible/roles/common/tasks/drives.yml b/ansible/roles/common/tasks/drives.yml
index 2aff96a..4f3c315 100644
--- a/ansible/roles/common/tasks/drives.yml
+++ b/ansible/roles/common/tasks/drives.yml
@@ -17,21 +17,21 @@
 - name: "unmount default drive at /mnt"
   mount: name=/mnt src=/dev/xvdb fstype=auto state=unmounted
-  when: cloud_provider == 'ec2'
+  when: cluster_type == 'ec2'
 - name: "unmount all ephemeral"
   mount: name={{ item.0 }} src={{ item.1 }} fstype=auto state=unmounted
-  when: cloud_provider == 'ec2' and force_format == 'yes'
+  when: cluster_type == 'ec2' and force_format == 'yes'
     - "{{ node_type_map[node_type].mounts }}"
     - "{{ node_type_map[node_type].devices }}"
 - name: "format drives"
   filesystem: fstype={{ fstype }} dev={{ item }} force={{ force_format }}
-  when: cloud_provider == 'ec2'
+  when: cluster_type == 'ec2'
   with_items: "{{ node_type_map[node_type].devices }}"
 - name: "mount drives"
   mount: name={{ item.0 }} src={{ item.1 }} fstype=auto state=mounted
     opts=defaults,nofail,noatime,nodiratime,comment=cloudconfig passno=2
-  when: cloud_provider == 'ec2'
+  when: cluster_type == 'ec2'
     - "{{ node_type_map[node_type].mounts }}"
     - "{{ node_type_map[node_type].devices }}"
diff --git a/conf/muchos.props.example b/conf/muchos.props.example
index e4c1e67..d9825a2 100644
--- a/conf/muchos.props.example
+++ b/conf/muchos.props.example
@@ -14,8 +14,8 @@
 # limitations under the License.
-# Type of cloud service (ec2 or baremetal)
-cloud_provider = ec2
+# Cluster type (ec2 or existing)
+cluster_type = ec2
 # Cluster user name (install command will SSH to cluster using this user)
 # Leave default below if launching cluster in AWS
 cluster_user = centos
@@ -77,7 +77,7 @@
 # Shutdown behavior of EC2 instances: terminate or stop
 shutdown_behavior = stop
 mount_root = /var/disk
 mounts = /var/disk01,/var/disk02,/var/disk03
 devices = /dev/hdb1,/dev/hdc1,/dev/hdd1
diff --git a/lib/ b/lib/
index a4d5a9a..383c242 100644
--- a/lib/
+++ b/lib/
@@ -16,418 +16,12 @@
 # limitations under the License.
-Script to help deploy a Fluo or Accumulo cluster (optionally to AWS EC2)
 import os
 import sys
 from sys import exit
-import shutil
-from botocore.exceptions import ClientError
-from muchos.config import DeployConfig, HOST_VAR_DEFAULTS, PLAY_VAR_DEFAULTS
-from muchos.util import parse_args, AMI_HELP_MSG, get_block_device_map
+from muchos.config import DeployConfig
+from muchos.util import parse_args
 from os.path import isfile, join
-import time
-import subprocess
-import boto3
-class MuchosCluster:
-    def __init__(self, config):
-        self.config = config
-    def launch_node(self, hostname, services, sg_id):
-        associate_public_ip = True
-        if self.config.has_option('ec2', 'associate_public_ip'):
-            associate_public_ip = self.config.get('ec2', 'associate_public_ip').strip().lower() == 'true'
-        request = {'MinCount': 1, 'MaxCount': 1,
-                   'NetworkInterfaces': [{'DeviceIndex': 0, 'AssociatePublicIpAddress': associate_public_ip,
-                                          'Groups': [sg_id]}]}
-        if self.config.has_option('ec2', 'subnet_id'):
-            request['NetworkInterfaces'][0]['SubnetId'] = self.config.get('ec2', 'subnet_id')
-        if 'worker' in services:
-            instance_type = self.config.get('ec2', 'worker_instance_type')
-        else:
-            instance_type = self.config.get('ec2', 'default_instance_type')
-        request['InstanceType'] = instance_type
-        request['InstanceInitiatedShutdownBehavior'] = self.config.get('ec2', 'shutdown_behavior')
-        if not self.config.has_option('ec2', 'aws_ami'):
-            exit('aws_ami property must be set!')
-        image_id = self.config.get('ec2', 'aws_ami')
-        if not image_id:
-            exit('aws_ami property was not properly')
-        request['ImageId'] = image_id
-        request['BlockDeviceMappings'] = get_block_device_map(instance_type)
-        if self.config.has_option('ec2', 'key_name'):
-            request['KeyName'] = self.config.get('ec2', 'key_name')
-        tags = [{'Key': 'Name', 'Value': self.config.cluster_name + '-' + hostname},
-                {'Key': 'Muchos', 'Value': self.config.cluster_name}]
-        for key, val in self.config.instance_tags().items():
-            tags.append({'Key': key, 'Value': val})
-        request['TagSpecifications'] = [{'ResourceType': 'instance', 'Tags': tags}]
-        if self.config.has_option('ec2', 'user_data_path'):
-            user_data_path = self.config.get('ec2', 'user_data_path')
-            with open(user_data_path, 'r') as user_data_file:
-                user_data =
-            request['UserData'] = user_data
-        ec2 = boto3.client('ec2')
-        response = None
-        try:
-            response = ec2.run_instances(**request)
-        except ClientError as e:
-            exit("ERROR - Failed to launch EC2 instance due to exception:\n\n{0}\n\n{1}".format(e, AMI_HELP_MSG))
-        if response is None or len(response['Instances']) != 1:
-            exit('ERROR - Failed to start {0} node'.format(hostname))
-        print('Launching {0} node using {1}'.format(hostname, image_id))
-        return response['Instances'][0]
-    def create_security_group(self):
-        ec2 = boto3.client('ec2')
-        sg = self.config.sg_name
-        create_group = True
-        group_id = None
-        try:
-            response = ec2.describe_security_groups(Filters=[{'Name': 'group-name', 'Values': [sg]}])
-            if len(response['SecurityGroups']) > 0:
-                group_id = response['SecurityGroups'][0]['GroupId']
-                create_group = False
-        except ClientError:
-            pass
-        if create_group:
-            print("Creating security group " + sg)
-            request = {'Description': "Security group created by Muchos", 'GroupName': sg}
-            if self.config.has_option('ec2', 'vpc_id'):
-                request['VpcId'] = self.config.get('ec2', 'vpc_id')
-            response = ec2.create_security_group(**request)
-            group_id = response['GroupId']
-            ec2.authorize_security_group_ingress(GroupName=sg, SourceSecurityGroupName=sg)
-            ec2.authorize_security_group_ingress(GroupName=sg, IpProtocol='tcp', FromPort=22, ToPort=22,
-                                                 CidrIp='')
-        return group_id
-    def delete_security_group(self):
-        sg_id = None
-        ec2 = boto3.client('ec2')
-        try:
-            response = ec2.describe_security_groups(Filters=[{'Name': 'group-name', 'Values': [self.config.sg_name]}])
-            if len(response['SecurityGroups']) > 0:
-                sg_id = response['SecurityGroups'][0]['GroupId']
-        except ClientError:
-            pass
-        if not sg_id:
-            print("Could not find security group '{0}'".format(self.config.sg_name))
-            return
-        print("Attempting to delete security group '{0}' with id '{1}'...".format(self.config.sg_name, sg_id))
-        sg_exists = True
-        while sg_exists:
-            try:
-                request = {'GroupId': sg_id}
-                ec2.delete_security_group(**request)
-                sg_exists = False
-            except ClientError as e:
-                print("Failed to delete security group '{0}' due exception below:\n{1}\nRetrying in 10 sec..."\
-                    .format(self.config.sg_name, e))
-                time.sleep(10)
-        print("Deleted security group")
-    def launch(self):
-        if self.active_nodes():
-            exit('ERROR - There are already instances running for {0} cluster'.format(self.config.cluster_name))
-        if isfile(self.config.hosts_path):
-            exit("ERROR - A hosts file already exists at {0}.  Please delete before running launch again"
-                 .format(self.config.hosts_path))
-        self.config.verify_launch()
-        print("Launching {0} cluster".format(self.config.cluster_name))
-        if self.config.has_option('ec2', 'security_group_id'):
-            sg_id = self.config.get('ec2', 'security_group_id')
-        else:
-            sg_id = self.create_security_group()
-        instance_d = {}
-        for (hostname, services) in list(self.config.nodes().items()):
-            instance = self.launch_node(hostname, services, sg_id)
-            instance_d[instance['InstanceId']] = hostname
-        num_running = len(self.status(['running']))
-        num_expected = len(self.config.nodes())
-        while num_running != num_expected:
-            print("{0} of {1} nodes have started.  Waiting another 5 sec..".format(num_running, num_expected))
-            time.sleep(5)
-            num_running = len(self.status(['running']))
-        with open(self.config.hosts_path, 'w') as hosts_file:
-            for instance in self.status(['running']):
-                public_ip = ''
-                if 'PublicIpAddress' in instance:
-                    public_ip = instance['PublicIpAddress']
-                private_ip = instance['PrivateIpAddress']
-                hostname = instance_d[instance['InstanceId']]
-                print("{0} {1} {2}".format(hostname, private_ip, public_ip), file=hosts_file)
-        print("All {0} nodes have started. Created hosts file at {1}".format(num_expected, self.config.hosts_path))
-    def sync(self):
-        config = self.config
-        print('Syncing ansible directory on {0} cluster proxy node'.format(config.cluster_name))
-        host_vars = HOST_VAR_DEFAULTS
-        play_vars = PLAY_VAR_DEFAULTS
-        for section in ("general", "ansible-vars", config.get('performance', 'profile')):
-            for (name, value) in config.items(section):
-                if name not in ('proxy_hostname', 'proxy_socks_port'):
-                    if name in host_vars:
-                        host_vars[name] = value
-                    if name in play_vars:
-                        play_vars[name] = value
-        play_vars['accumulo_sha256'] = config.checksum('accumulo')
-        play_vars['fluo_sha256'] = config.checksum('fluo')
-        play_vars['fluo_yarn_sha256'] = config.checksum('fluo_yarn')
-        play_vars['hadoop_sha256'] = config.checksum('hadoop')
-        play_vars['spark_sha256'] = config.checksum('spark')
-        play_vars['zookeeper_sha256'] = config.checksum('zookeeper')
-        cloud_provider = host_vars.get('cloud_provider', 'ec2')
-        node_type_map = {}
-        if cloud_provider == 'ec2':
-            node_type_map = config.node_type_map()
-            play_vars["mount_root"] = config.mount_root
-            play_vars["metrics_drive_ids"] = config.metrics_drive_ids()
-            play_vars["fstype"] = config.fstype()
-            play_vars["force_format"] = config.force_format()
-            play_vars["shutdown_delay_minutes"] = config.get("ec2", "shutdown_delay_minutes")
-        if cloud_provider == 'baremetal':
-            play_vars["mount_root"] = config.get("baremetal", "mount_root")
-            play_vars["metrics_drive_ids"] = config.get("baremetal", "metrics_drives_ids").split(",")
-            mounts = config.get("baremetal", "mounts").split(",")
-            devices = config.get("baremetal", "devices").split(",")
-            for node_type in 'default', 'worker':
-                node_type_map[node_type] = {'mounts': mounts, 'devices': devices}
-        play_vars["node_type_map"] = node_type_map
-        host_vars['worker_data_dirs'] = str(node_type_map['worker']['mounts'])
-        host_vars['default_data_dirs'] = str(node_type_map['default']['mounts'])
-        with open(join(config.deploy_path, "ansible/site.yml"), 'w') as site_file:
-            print("- import_playbook: common.yml", file=site_file)
-            if config.has_service("spark"):
-                print("- import_playbook: spark.yml", file=site_file)
-            print("- import_playbook: hadoop.yml", file=site_file)
-            print("- import_playbook: zookeeper.yml", file=site_file)
-            if config.has_service("metrics"):
-                print("- import_playbook: metrics.yml", file=site_file)
-            print("- import_playbook: accumulo.yml", file=site_file)
-            if config.has_service('fluo'):
-                print("- import_playbook: fluo.yml", file=site_file)
-            if config.has_service('fluo_yarn'):
-                print("- import_playbook: fluo_yarn.yml", file=site_file)
-            if config.has_service("mesosmaster"):
-                print("- import_playbook: mesos.yml", file=site_file)
-            if config.has_service("swarmmanager"):
-                print("- import_playbook: docker.yml", file=site_file)
-        ansible_conf = join(config.deploy_path, "ansible/conf")
-        with open(join(ansible_conf, "hosts"), 'w') as hosts_file:
-            print("[proxy]\n{0}".format(config.proxy_hostname()), file=hosts_file)
-            print("\n[accumulomaster]\n{0}".format(config.get_service_hostnames("accumulomaster")[0]), file=hosts_file)
-            print("\n[namenode]\n{0}".format(config.get_service_hostnames("namenode")[0]), file=hosts_file)
-            print("\n[resourcemanager]\n{0}".format(config.get_service_hostnames("resourcemanager")[0]), file=hosts_file)
-            if config.has_service("spark"):
-                print("\n[spark]\n{0}".format(config.get_service_hostnames("spark")[0]), file=hosts_file)
-            if config.has_service("mesosmaster"):
-                print("\n[mesosmaster]\n{0}".format(config.get_service_hostnames("mesosmaster")[0]), file=hosts_file)
-            if config.has_service("metrics"):
-                print("\n[metrics]\n{0}".format(config.get_service_hostnames("metrics")[0]), file=hosts_file)
-            if config.has_service("swarmmanager"):
-                print("\n[swarmmanager]\n{0}".format(config.get_service_hostnames("swarmmanager")[0]), file=hosts_file)
-            print("\n[zookeepers]", file=hosts_file)
-            for (index, zk_host) in enumerate(config.get_service_hostnames("zookeeper"), start=1):
-                print("{0} id={1}".format(zk_host, index), file=hosts_file)
-            if config.has_service('fluo'):
-                print("\n[fluo]", file=hosts_file)
-                for host in config.get_service_hostnames("fluo"):
-                    print(host, file=hosts_file)
-            if config.has_service('fluo_yarn'):
-                print("\n[fluo_yarn]", file=hosts_file)
-                for host in config.get_service_hostnames("fluo_yarn"):
-                    print(host, file=hosts_file)
-            print("\n[workers]", file=hosts_file)
-            for worker_host in config.get_service_hostnames("worker"):
-                print(worker_host, file=hosts_file)
-            print("\n[accumulo:children]\naccumulomaster\nworkers", file=hosts_file)
-            print("\n[hadoop:children]\nnamenode\nresourcemanager\nworkers", file=hosts_file)
-            print("\n[nodes]", file=hosts_file)
-            for (private_ip, hostname) in config.get_private_ip_hostnames():
-                print("{0} ansible_ssh_host={1} node_type={2}".format(hostname, private_ip,
-                                                                                    config.node_type(hostname)), file=hosts_file)
-            print("\n[all:vars]", file=hosts_file)
-            for (name, value) in sorted(host_vars.items()):
-                print("{0} = {1}".format(name, value), file=hosts_file)
-        with open(join(config.deploy_path, "ansible/group_vars/all"), 'w') as play_vars_file:
-            for (name, value) in sorted(play_vars.items()):
-                print("{0}: {1}".format(name, value), file=play_vars_file)
-        # copy keys file to ansible/conf (if it exists)
-        conf_keys = join(config.deploy_path, "conf/keys")
-        ansible_keys = join(ansible_conf, "keys")
-        if isfile(conf_keys):
-            shutil.copyfile(conf_keys, ansible_keys)
-        else:
-            open(ansible_keys, 'w').close()
-        basedir = config.get('general', 'cluster_basedir')
-        cmd = "rsync -az --delete -e \"ssh -o 'StrictHostKeyChecking no'\""
-"{cmd} {src} {usr}@{ldr}:{tdir}".format(cmd=cmd, src=join(config.deploy_path, "ansible"),
-                        usr=config.get('general', 'cluster_user'), ldr=config.get_proxy_ip(), tdir=basedir),
-                        shell=True)
-        self.exec_on_proxy_verified("{0}/ansible/scripts/".format(basedir), opts='-t')
-    def setup(self):
-        config = self.config
-        print('Setting up {0} cluster'.format(config.cluster_name))
-        self.sync()
-        conf_upload = join(config.deploy_path, "conf/upload")
-        accumulo_tarball = join(conf_upload, "accumulo-{0}-bin.tar.gz".format(config.version("accumulo")))
-        fluo_tarball = join(conf_upload, "fluo-{0}-bin.tar.gz".format(config.version("fluo")))
-        fluo_yarn_tarball = join(conf_upload, "fluo-yarn-{0}-bin.tar.gz".format(config.version("fluo_yarn")))
-        basedir = config.get('general', 'cluster_basedir')
-        cluster_tarballs = "{0}/tarballs".format(basedir)
-        self.exec_on_proxy_verified("mkdir -p {0}".format(cluster_tarballs))
-        if isfile(accumulo_tarball):
-            self.send_to_proxy(accumulo_tarball, cluster_tarballs)
-        if isfile(fluo_tarball) and config.has_service('fluo'):
-            self.send_to_proxy(fluo_tarball, cluster_tarballs)
-        if isfile(fluo_yarn_tarball) and config.has_service('fluo_yarn'):
-            self.send_to_proxy(fluo_yarn_tarball, cluster_tarballs)
-        self.execute_playbook("site.yml")
-    def status(self, states):
-        ec2 = boto3.client('ec2')
-        response = ec2.describe_instances(Filters=[{'Name': 'tag:Muchos', 'Values': [self.config.cluster_name]}])
-        nodes = []
-        for res in response['Reservations']:
-            for inst in res['Instances']:
-                if inst['State']['Name'] in states:
-                    nodes.append(inst)
-        return nodes
-    def active_nodes(self):
-        return self.status(['pending', 'running', 'stopping', 'stopped'])
-    @staticmethod
-    def print_nodes(nodes):
-        for node in nodes:
-            name = 'Unknown'
-            for tag in node['Tags']:
-                if tag['Key'] == 'Name':
-                    name = tag['Value']
-            print("  ", name, node['InstanceId'], node['PrivateIpAddress'], node.get('PublicIpAddress', ''))
-    def terminate(self, hosts_path):
-        nodes = self.active_nodes()
-        print("The following {0} nodes in {1} cluster will be terminated:".format(len(nodes), self.config.cluster_name))
-        self.print_nodes(nodes)
-        response = input("Do you want to continue? (y/n) ")
-        if response == "y":
-            ec2 = boto3.client('ec2')
-            for node in nodes:
-                ec2.terminate_instances(InstanceIds=[node['InstanceId']])
-            print("Terminated nodes.")
-            if not self.config.has_option('ec2', 'security_group_id'):
-                self.delete_security_group()
-            if isfile(hosts_path):
-                os.remove(hosts_path)
-                print("Removed hosts file at ", hosts_path)
-        else:
-            print("Aborted termination")
-    def ssh(self):
-        self.wait_until_proxy_ready()
-        fwd = ''
-        if self.config.has_option('general', 'proxy_socks_port'):
-            fwd = "-D " + self.config.get('general', 'proxy_socks_port')
-        ssh_command = "ssh -C -A -o 'StrictHostKeyChecking no' {fwd} {usr}@{ldr}".format(
-            usr=self.config.get('general', 'cluster_user'), ldr=self.config.get_proxy_ip(), fwd=fwd)
-        print("Logging into proxy using: {0}".format(ssh_command))
-        retcode =, shell=True)
-        if retcode != 0:
-            exit("ERROR - Command failed with return code of {0}: {1}".format(retcode, ssh_command))
-    def exec_on_proxy(self, command, opts=''):
-        ssh_command = "ssh -A -o 'StrictHostKeyChecking no' {opts} {usr}@{ldr} '{cmd}'".format(
-                usr=self.config.get('general', 'cluster_user'),
-                ldr=self.config.get_proxy_ip(), cmd=command, opts=opts)
-        return, shell=True), ssh_command
-    def exec_on_proxy_verified(self, command, opts=''):
-        (retcode, ssh_command) = self.exec_on_proxy(command, opts)
-        if retcode != 0:
-            exit("ERROR - Command failed with return code of {0}: {1}".format(retcode, ssh_command))
-    def wait_until_proxy_ready(self):
-        cluster_user = self.config.get('general', 'cluster_user')
-        print("Checking if '{0}' proxy can be reached using: ssh {1}@{2}"\
-            .format(self.config.proxy_hostname(), cluster_user, self.config.get_proxy_ip()))
-        while True:
-            (retcode, ssh_command) = self.exec_on_proxy('pwd > /dev/null')
-            if retcode == 0:
-                print("Connected to proxy using SSH!")
-                time.sleep(1)
-                break
-            print("Proxy could not be accessed using SSH.  Will retry in 5 sec...")
-            time.sleep(5)
-    def execute_playbook(self, playbook):
-        print("Executing '{0}' playbook".format(playbook))
-        basedir = self.config.get('general', 'cluster_basedir')
-        self.exec_on_proxy_verified("time -p ansible-playbook {base}/ansible/{playbook}"
-                                    .format(base=basedir, playbook=playbook), opts='-t')
-    def send_to_proxy(self, path, target, skip_if_exists=True):
-        print("Copying to proxy: ", path)
-        cmd = "scp -o 'StrictHostKeyChecking no'"
-        if skip_if_exists:
-            cmd = "rsync --update --progress -e \"ssh -o 'StrictHostKeyChecking no'\""
-"{cmd} {src} {usr}@{ldr}:{tdir}".format(
-            cmd=cmd, src=path, usr=self.config.get('general', 'cluster_user'), ldr=self.config.get_proxy_ip(),
-            tdir=target), shell=True)
 def main():
@@ -435,14 +29,14 @@
     if not deploy_path:
         exit('ERROR - MUCHOS_HOME env variable must be set!')
     if not os.path.isdir(deploy_path):
-        exit('ERROR - Directory set by MUCHOS_HOME does not exist: '+deploy_path)
+        exit('ERROR - Directory set by MUCHOS_HOME does not exist: ' + deploy_path)
     config_path = join(deploy_path, "conf/muchos.props")
     if not isfile(config_path):
-        exit('ERROR - A config file does not exist at '+config_path)
+        exit('ERROR - A config file does not exist at ' + config_path)
     checksums_path = join(deploy_path, "conf/checksums")
     if not isfile(checksums_path):
-        exit('ERROR - A checksums file does not exist at '+checksums_path)
+        exit('ERROR - A checksums file does not exist at ' + checksums_path)
     hosts_dir = join(deploy_path, "conf/hosts/")
@@ -458,40 +52,23 @@
     config = DeployConfig(deploy_path, config_path, hosts_path, checksums_path, opts.cluster)
-    cluster = MuchosCluster(config)
-    if action == 'launch':
-        cluster.launch()
-    elif action == 'status':
-        nodes = cluster.status(['running'])
-        print("Found {0} nodes in {1} cluster".format(len(nodes), config.cluster_name))
-        cluster.print_nodes(nodes)
-    elif action == 'sync':
-        cluster.sync()
-    elif action == 'setup':
-        cluster.setup()
-    elif action == 'config':
+    if action == 'config':
         if == 'all':
-    elif action == 'ssh':
-        cluster.ssh()
-    elif action in ('wipe', 'kill', 'cancel_shutdown'):
-        if not isfile(hosts_path):
-            exit("Hosts file does not exist for cluster: "+hosts_path)
-        if action == 'wipe':
-            print("Killing all processes started by Muchos and wiping Muchos data from {0} cluster"\
-                .format(config.cluster_name))
-        elif action == 'kill':
-            print("Killing all processes started by Muchos on {0} cluster".format(config.cluster_name))
-        elif action == 'cancel_shutdown':
-            print("Cancelling automatic shutdown of {0} cluster".format(config.cluster_name))
-        cluster.execute_playbook(action + ".yml")
-    elif action == 'terminate':
-        cluster.terminate(hosts_path)
-        print('ERROR - Unknown action:', action)
+        cluster_type = config.get('general', 'cluster_type')
+        if cluster_type == 'existing':
+            from muchos.existing import ExistingCluster
+            cluster = ExistingCluster(config)
+            cluster.perform(action)
+        elif cluster_type == 'ec2':
+            from muchos.ec2 import Ec2Cluster
+            cluster = Ec2Cluster(config)
+            cluster.perform(action)
+        else:
+            exit('Unknown cluster_type: ' + cluster_type)
diff --git a/lib/muchos/ b/lib/muchos/
index 36f0604..2fabce2 100644
--- a/lib/muchos/
+++ b/lib/muchos/
@@ -297,7 +297,7 @@
   'accumulo_password': None,
   'accumulo_tarball': 'accumulo-{{ accumulo_version }}-bin.tar.gz',
   'accumulo_version': None,
-  'cloud_provider': None,
+  'cluster_type': None,
   'cluster_basedir': None,
   'cluster_user': None,
   'default_data_dirs': None,
diff --git a/lib/muchos/ b/lib/muchos/
new file mode 100644
index 0000000..57a354f
--- /dev/null
+++ b/lib/muchos/
@@ -0,0 +1,229 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+from sys import exit
+from botocore.exceptions import ClientError
+from .util import AMI_HELP_MSG, get_block_device_map
+from os.path import isfile
+import time
+import boto3
+from .existing import ExistingCluster
+class Ec2Cluster(ExistingCluster):
+    def __init__(self, config):
+        ExistingCluster.__init__(self, config)
+    def launch_node(self, hostname, services, sg_id):
+        associate_public_ip = True
+        if self.config.has_option('ec2', 'associate_public_ip'):
+            associate_public_ip = self.config.get('ec2', 'associate_public_ip').strip().lower() == 'true'
+        request = {'MinCount': 1, 'MaxCount': 1,
+                   'NetworkInterfaces': [{'DeviceIndex': 0, 'AssociatePublicIpAddress': associate_public_ip,
+                                          'Groups': [sg_id]}]}
+        if self.config.has_option('ec2', 'subnet_id'):
+            request['NetworkInterfaces'][0]['SubnetId'] = self.config.get('ec2', 'subnet_id')
+        if 'worker' in services:
+            instance_type = self.config.get('ec2', 'worker_instance_type')
+        else:
+            instance_type = self.config.get('ec2', 'default_instance_type')
+        request['InstanceType'] = instance_type
+        request['InstanceInitiatedShutdownBehavior'] = self.config.get('ec2', 'shutdown_behavior')
+        if not self.config.has_option('ec2', 'aws_ami'):
+            exit('aws_ami property must be set!')
+        image_id = self.config.get('ec2', 'aws_ami')
+        if not image_id:
+            exit('aws_ami property was not properly')
+        request['ImageId'] = image_id
+        request['BlockDeviceMappings'] = get_block_device_map(instance_type)
+        if self.config.has_option('ec2', 'key_name'):
+            request['KeyName'] = self.config.get('ec2', 'key_name')
+        tags = [{'Key': 'Name', 'Value': self.config.cluster_name + '-' + hostname},
+                {'Key': 'Muchos', 'Value': self.config.cluster_name}]
+        for key, val in self.config.instance_tags().items():
+            tags.append({'Key': key, 'Value': val})
+        request['TagSpecifications'] = [{'ResourceType': 'instance', 'Tags': tags}]
+        if self.config.has_option('ec2', 'user_data_path'):
+            user_data_path = self.config.get('ec2', 'user_data_path')
+            with open(user_data_path, 'r') as user_data_file:
+                user_data =
+            request['UserData'] = user_data
+        ec2 = boto3.client('ec2')
+        response = None
+        try:
+            response = ec2.run_instances(**request)
+        except ClientError as e:
+            exit("ERROR - Failed to launch EC2 instance due to exception:\n\n{0}\n\n{1}".format(e, AMI_HELP_MSG))
+        if response is None or len(response['Instances']) != 1:
+            exit('ERROR - Failed to start {0} node'.format(hostname))
+        print('Launching {0} node using {1}'.format(hostname, image_id))
+        return response['Instances'][0]
+    def create_security_group(self):
+        ec2 = boto3.client('ec2')
+        sg = self.config.sg_name
+        create_group = True
+        group_id = None
+        try:
+            response = ec2.describe_security_groups(Filters=[{'Name': 'group-name', 'Values': [sg]}])
+            if len(response['SecurityGroups']) > 0:
+                group_id = response['SecurityGroups'][0]['GroupId']
+                create_group = False
+        except ClientError:
+            pass
+        if create_group:
+            print("Creating security group " + sg)
+            request = {'Description': "Security group created by Muchos", 'GroupName': sg}
+            if self.config.has_option('ec2', 'vpc_id'):
+                request['VpcId'] = self.config.get('ec2', 'vpc_id')
+            response = ec2.create_security_group(**request)
+            group_id = response['GroupId']
+            ec2.authorize_security_group_ingress(GroupName=sg, SourceSecurityGroupName=sg)
+            ec2.authorize_security_group_ingress(GroupName=sg, IpProtocol='tcp', FromPort=22, ToPort=22,
+                                                 CidrIp='')
+        return group_id
+    def delete_security_group(self):
+        sg_id = None
+        ec2 = boto3.client('ec2')
+        try:
+            response = ec2.describe_security_groups(Filters=[{'Name': 'group-name', 'Values': [self.config.sg_name]}])
+            if len(response['SecurityGroups']) > 0:
+                sg_id = response['SecurityGroups'][0]['GroupId']
+        except ClientError:
+            pass
+        if not sg_id:
+            print("Could not find security group '{0}'".format(self.config.sg_name))
+            return
+        print("Attempting to delete security group '{0}' with id '{1}'...".format(self.config.sg_name, sg_id))
+        sg_exists = True
+        while sg_exists:
+            try:
+                request = {'GroupId': sg_id}
+                ec2.delete_security_group(**request)
+                sg_exists = False
+            except ClientError as e:
+                print("Failed to delete security group '{0}' due exception below:\n{1}\nRetrying in 10 sec..."
+                      .format(self.config.sg_name, e))
+                time.sleep(10)
+        print("Deleted security group")
+    def launch(self):
+        if self.active_nodes():
+            exit('ERROR - There are already instances running for {0} cluster'.format(self.config.cluster_name))
+        if isfile(self.config.hosts_path):
+            exit("ERROR - A hosts file already exists at {0}.  Please delete before running launch again"
+                 .format(self.config.hosts_path))
+        self.config.verify_launch()
+        print("Launching {0} cluster".format(self.config.cluster_name))
+        if self.config.has_option('ec2', 'security_group_id'):
+            sg_id = self.config.get('ec2', 'security_group_id')
+        else:
+            sg_id = self.create_security_group()
+        instance_d = {}
+        for (hostname, services) in list(self.config.nodes().items()):
+            instance = self.launch_node(hostname, services, sg_id)
+            instance_d[instance['InstanceId']] = hostname
+        num_running = len(self.get_status(['running']))
+        num_expected = len(self.config.nodes())
+        while num_running != num_expected:
+            print("{0} of {1} nodes have started.  Waiting another 5 sec..".format(num_running, num_expected))
+            time.sleep(5)
+            num_running = len(self.get_status(['running']))
+        with open(self.config.hosts_path, 'w') as hosts_file:
+            for instance in self.get_status(['running']):
+                public_ip = ''
+                if 'PublicIpAddress' in instance:
+                    public_ip = instance['PublicIpAddress']
+                private_ip = instance['PrivateIpAddress']
+                hostname = instance_d[instance['InstanceId']]
+                print("{0} {1} {2}".format(hostname, private_ip, public_ip), file=hosts_file)
+        print("All {0} nodes have started. Created hosts file at {1}".format(num_expected, self.config.hosts_path))
+    def status(self):
+        nodes = self.get_status(['running'])
+        print("Found {0} nodes in {1} cluster".format(len(nodes), self.config.cluster_name))
+        self.print_nodes(nodes)
+    def get_status(self, states):
+        ec2 = boto3.client('ec2')
+        response = ec2.describe_instances(Filters=[{'Name': 'tag:Muchos', 'Values': [self.config.cluster_name]}])
+        nodes = []
+        for res in response['Reservations']:
+            for inst in res['Instances']:
+                if inst['State']['Name'] in states:
+                    nodes.append(inst)
+        return nodes
+    def active_nodes(self):
+        return self.get_status(['pending', 'running', 'stopping', 'stopped'])
+    @staticmethod
+    def print_nodes(nodes):
+        for node in nodes:
+            name = 'Unknown'
+            for tag in node['Tags']:
+                if tag['Key'] == 'Name':
+                    name = tag['Value']
+            print("  ", name, node['InstanceId'], node['PrivateIpAddress'], node.get('PublicIpAddress', ''))
+    def terminate(self):
+        nodes = self.active_nodes()
+        print("The following {0} nodes in {1} cluster will be terminated:".format(len(nodes), self.config.cluster_name))
+        self.print_nodes(nodes)
+        response = input("Do you want to continue? (y/n) ")
+        if response == "y":
+            ec2 = boto3.client('ec2')
+            for node in nodes:
+                ec2.terminate_instances(InstanceIds=[node['InstanceId']])
+            print("Terminated nodes.")
+            if not self.config.has_option('ec2', 'security_group_id'):
+                self.delete_security_group()
+            if isfile(self.config.hosts_path):
+                os.remove(self.config.hosts_path)
+                print("Removed hosts file at ", self.config.hosts_path)
+        else:
+            print("Aborted termination")
diff --git a/lib/muchos/ b/lib/muchos/
new file mode 100644
index 0000000..46c48d3
--- /dev/null
+++ b/lib/muchos/
@@ -0,0 +1,270 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import shutil
+import subprocess
+import time
+from os.path import isfile, join
+from sys import exit
+class ExistingCluster:
+    def __init__(self, config):
+        self.config = config
+    def launch(self):
+        exit('ERROR - Attempting to launch when cluster_type is set to "existing"')
+    def sync(self):
+        config = self.config
+        print('Syncing ansible directory on {0} cluster proxy node'.format(config.cluster_name))
+        host_vars = HOST_VAR_DEFAULTS
+        play_vars = PLAY_VAR_DEFAULTS
+        for section in ("general", "ansible-vars", config.get('performance', 'profile')):
+            for (name, value) in config.items(section):
+                if name not in ('proxy_hostname', 'proxy_socks_port'):
+                    if name in host_vars:
+                        host_vars[name] = value
+                    if name in play_vars:
+                        play_vars[name] = value
+        play_vars['accumulo_sha256'] = config.checksum('accumulo')
+        play_vars['fluo_sha256'] = config.checksum('fluo')
+        play_vars['fluo_yarn_sha256'] = config.checksum('fluo_yarn')
+        play_vars['hadoop_sha256'] = config.checksum('hadoop')
+        play_vars['spark_sha256'] = config.checksum('spark')
+        play_vars['zookeeper_sha256'] = config.checksum('zookeeper')
+        cluster_type = host_vars.get('cluster_type', 'ec2')
+        node_type_map = {}
+        if cluster_type == 'ec2':
+            node_type_map = config.node_type_map()
+            play_vars["mount_root"] = config.mount_root
+            play_vars["metrics_drive_ids"] = config.metrics_drive_ids()
+            play_vars["fstype"] = config.fstype()
+            play_vars["force_format"] = config.force_format()
+            play_vars["shutdown_delay_minutes"] = config.get("ec2", "shutdown_delay_minutes")
+        if cluster_type == 'existing':
+            play_vars["mount_root"] = config.get("existing", "mount_root")
+            play_vars["metrics_drive_ids"] = config.get("existing", "metrics_drives_ids").split(",")
+            mounts = config.get("existing", "mounts").split(",")
+            devices = config.get("existing", "devices").split(",")
+            for node_type in 'default', 'worker':
+                node_type_map[node_type] = {'mounts': mounts, 'devices': devices}
+        play_vars["node_type_map"] = node_type_map
+        host_vars['worker_data_dirs'] = str(node_type_map['worker']['mounts'])
+        host_vars['default_data_dirs'] = str(node_type_map['default']['mounts'])
+        with open(join(config.deploy_path, "ansible/site.yml"), 'w') as site_file:
+            print("- import_playbook: common.yml", file=site_file)
+            if config.has_service("spark"):
+                print("- import_playbook: spark.yml", file=site_file)
+            print("- import_playbook: hadoop.yml", file=site_file)
+            print("- import_playbook: zookeeper.yml", file=site_file)
+            if config.has_service("metrics"):
+                print("- import_playbook: metrics.yml", file=site_file)
+            print("- import_playbook: accumulo.yml", file=site_file)
+            if config.has_service('fluo'):
+                print("- import_playbook: fluo.yml", file=site_file)
+            if config.has_service('fluo_yarn'):
+                print("- import_playbook: fluo_yarn.yml", file=site_file)
+            if config.has_service("mesosmaster"):
+                print("- import_playbook: mesos.yml", file=site_file)
+            if config.has_service("swarmmanager"):
+                print("- import_playbook: docker.yml", file=site_file)
+        ansible_conf = join(config.deploy_path, "ansible/conf")
+        with open(join(ansible_conf, "hosts"), 'w') as hosts_file:
+            print("[proxy]\n{0}".format(config.proxy_hostname()), file=hosts_file)
+            print("\n[accumulomaster]\n{0}".format(config.get_service_hostnames("accumulomaster")[0]), file=hosts_file)
+            print("\n[namenode]\n{0}".format(config.get_service_hostnames("namenode")[0]), file=hosts_file)
+            print("\n[resourcemanager]\n{0}".format(config.get_service_hostnames("resourcemanager")[0]),
+                  file=hosts_file)
+            if config.has_service("spark"):
+                print("\n[spark]\n{0}".format(config.get_service_hostnames("spark")[0]), file=hosts_file)
+            if config.has_service("mesosmaster"):
+                print("\n[mesosmaster]\n{0}".format(config.get_service_hostnames("mesosmaster")[0]), file=hosts_file)
+            if config.has_service("metrics"):
+                print("\n[metrics]\n{0}".format(config.get_service_hostnames("metrics")[0]), file=hosts_file)
+            if config.has_service("swarmmanager"):
+                print("\n[swarmmanager]\n{0}".format(config.get_service_hostnames("swarmmanager")[0]), file=hosts_file)
+            print("\n[zookeepers]", file=hosts_file)
+            for (index, zk_host) in enumerate(config.get_service_hostnames("zookeeper"), start=1):
+                print("{0} id={1}".format(zk_host, index), file=hosts_file)
+            if config.has_service('fluo'):
+                print("\n[fluo]", file=hosts_file)
+                for host in config.get_service_hostnames("fluo"):
+                    print(host, file=hosts_file)
+            if config.has_service('fluo_yarn'):
+                print("\n[fluo_yarn]", file=hosts_file)
+                for host in config.get_service_hostnames("fluo_yarn"):
+                    print(host, file=hosts_file)
+            print("\n[workers]", file=hosts_file)
+            for worker_host in config.get_service_hostnames("worker"):
+                print(worker_host, file=hosts_file)
+            print("\n[accumulo:children]\naccumulomaster\nworkers", file=hosts_file)
+            print("\n[hadoop:children]\nnamenode\nresourcemanager\nworkers", file=hosts_file)
+            print("\n[nodes]", file=hosts_file)
+            for (private_ip, hostname) in config.get_private_ip_hostnames():
+                print("{0} ansible_ssh_host={1} node_type={2}".format(hostname, private_ip,
+                                                                      config.node_type(hostname)), file=hosts_file)
+            print("\n[all:vars]", file=hosts_file)
+            for (name, value) in sorted(host_vars.items()):
+                print("{0} = {1}".format(name, value), file=hosts_file)
+        with open(join(config.deploy_path, "ansible/group_vars/all"), 'w') as play_vars_file:
+            for (name, value) in sorted(play_vars.items()):
+                print("{0}: {1}".format(name, value), file=play_vars_file)
+        # copy keys file to ansible/conf (if it exists)
+        conf_keys = join(config.deploy_path, "conf/keys")
+        ansible_keys = join(ansible_conf, "keys")
+        if isfile(conf_keys):
+            shutil.copyfile(conf_keys, ansible_keys)
+        else:
+            open(ansible_keys, 'w').close()
+        basedir = config.get('general', 'cluster_basedir')
+        cmd = "rsync -az --delete -e \"ssh -o 'StrictHostKeyChecking no'\""
+"{cmd} {src} {usr}@{ldr}:{tdir}".format(cmd=cmd, src=join(config.deploy_path, "ansible"),
+                                                                usr=config.get('general', 'cluster_user'),
+                                                                ldr=config.get_proxy_ip(), tdir=basedir),
+                        shell=True)
+        self.exec_on_proxy_verified("{0}/ansible/scripts/".format(basedir), opts='-t')
+    def setup(self):
+        config = self.config
+        print('Setting up {0} cluster'.format(config.cluster_name))
+        self.sync()
+        conf_upload = join(config.deploy_path, "conf/upload")
+        accumulo_tarball = join(conf_upload, "accumulo-{0}-bin.tar.gz".format(config.version("accumulo")))
+        fluo_tarball = join(conf_upload, "fluo-{0}-bin.tar.gz".format(config.version("fluo")))
+        fluo_yarn_tarball = join(conf_upload, "fluo-yarn-{0}-bin.tar.gz".format(config.version("fluo_yarn")))
+        basedir = config.get('general', 'cluster_basedir')
+        cluster_tarballs = "{0}/tarballs".format(basedir)
+        self.exec_on_proxy_verified("mkdir -p {0}".format(cluster_tarballs))
+        if isfile(accumulo_tarball):
+            self.send_to_proxy(accumulo_tarball, cluster_tarballs)
+        if isfile(fluo_tarball) and config.has_service('fluo'):
+            self.send_to_proxy(fluo_tarball, cluster_tarballs)
+        if isfile(fluo_yarn_tarball) and config.has_service('fluo_yarn'):
+            self.send_to_proxy(fluo_yarn_tarball, cluster_tarballs)
+        self.execute_playbook("site.yml")
+    @staticmethod
+    def status():
+        exit("ERROR - 'status' command cannot be used when cluster_type=existing")
+    @staticmethod
+    def terminate():
+        exit("ERROR - 'terminate' command cannot be used when cluster_type=existing")
+    def ssh(self):
+        self.wait_until_proxy_ready()
+        fwd = ''
+        if self.config.has_option('general', 'proxy_socks_port'):
+            fwd = "-D " + self.config.get('general', 'proxy_socks_port')
+        ssh_command = "ssh -C -A -o 'StrictHostKeyChecking no' {fwd} {usr}@{ldr}".format(
+            usr=self.config.get('general', 'cluster_user'), ldr=self.config.get_proxy_ip(), fwd=fwd)
+        print("Logging into proxy using: {0}".format(ssh_command))
+        retcode =, shell=True)
+        if retcode != 0:
+            exit("ERROR - Command failed with return code of {0}: {1}".format(retcode, ssh_command))
+    def exec_on_proxy(self, command, opts=''):
+        ssh_command = "ssh -A -o 'StrictHostKeyChecking no' {opts} {usr}@{ldr} '{cmd}'".format(
+            usr=self.config.get('general', 'cluster_user'),
+            ldr=self.config.get_proxy_ip(), cmd=command, opts=opts)
+        return, shell=True), ssh_command
+    def exec_on_proxy_verified(self, command, opts=''):
+        (retcode, ssh_command) = self.exec_on_proxy(command, opts)
+        if retcode != 0:
+            exit("ERROR - Command failed with return code of {0}: {1}".format(retcode, ssh_command))
+    def wait_until_proxy_ready(self):
+        cluster_user = self.config.get('general', 'cluster_user')
+        print("Checking if '{0}' proxy can be reached using: ssh {1}@{2}"
+              .format(self.config.proxy_hostname(), cluster_user, self.config.get_proxy_ip()))
+        while True:
+            (retcode, ssh_command) = self.exec_on_proxy('pwd > /dev/null')
+            if retcode == 0:
+                print("Connected to proxy using SSH!")
+                time.sleep(1)
+                break
+            print("Proxy could not be accessed using SSH.  Will retry in 5 sec...")
+            time.sleep(5)
+    def execute_playbook(self, playbook):
+        print("Executing '{0}' playbook".format(playbook))
+        basedir = self.config.get('general', 'cluster_basedir')
+        self.exec_on_proxy_verified("time -p ansible-playbook {base}/ansible/{playbook}"
+                                    .format(base=basedir, playbook=playbook), opts='-t')
+    def send_to_proxy(self, path, target, skip_if_exists=True):
+        print("Copying to proxy: ", path)
+        cmd = "scp -o 'StrictHostKeyChecking no'"
+        if skip_if_exists:
+            cmd = "rsync --update --progress -e \"ssh -o 'StrictHostKeyChecking no'\""
+"{cmd} {src} {usr}@{ldr}:{tdir}".format(
+            cmd=cmd, src=path, usr=self.config.get('general', 'cluster_user'), ldr=self.config.get_proxy_ip(),
+            tdir=target), shell=True)
+    def perform(self, action):
+        if action == 'launch':
+            self.launch()
+        elif action == 'status':
+            self.status()
+        elif action == 'sync':
+            self.sync()
+        elif action == 'setup':
+            self.setup()
+        elif action == 'ssh':
+            self.ssh()
+        elif action in ('wipe', 'kill', 'cancel_shutdown'):
+            if not isfile(self.config.hosts_path):
+                exit("Hosts file does not exist for cluster: " + self.config.hosts_path)
+            if action == 'wipe':
+                print("Killing all processes started by Muchos and wiping Muchos data from {0} cluster"
+                      .format(self.config.cluster_name))
+            elif action == 'kill':
+                print("Killing all processes started by Muchos on {0} cluster".format(self.config.cluster_name))
+            elif action == 'cancel_shutdown':
+                print("Cancelling automatic shutdown of {0} cluster".format(self.config.cluster_name))
+            self.execute_playbook(action + ".yml")
+        elif action == 'terminate':
+            self.terminate()
+        else:
+            print('ERROR - Unknown action:', action)