blob: b1684544b3b677c628c5a50408cb962d25aeed95 [file] [log] [blame]
#!/usr/bin/python2
# Copyright 2014 Muchos authors (see AUTHORS)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Script to help deploy a Fluo or Accumulo cluster (optionally to AWS EC2)
"""
import os
import sys
from sys import exit
import shutil
from botocore.exceptions import ClientError
from config import DeployConfig, HOST_VAR_DEFAULTS, PLAY_VAR_DEFAULTS
from util import parse_args, AMI_HELP_MSG
from os.path import isfile, join
import time
import subprocess
import boto3
class MuchosCluster:
def __init__(self, config):
self.config = config
def launch_node(self, hostname, services):
request = {'MinCount': 1, 'MaxCount': 1,
'NetworkInterfaces': [{'DeviceIndex': 0, 'AssociatePublicIpAddress': True,
'Groups': [self.config.sg_id]}]}
if self.config.has_option('ec2', 'subnet_id'):
request['SubnetId'] = self.config.get('ec2', 'subnet_id')
if 'worker' in services:
instance_type = self.config.get('ec2', 'worker_instance_type')
num_ephemeral = self.config.worker_num_ephemeral()
else:
instance_type = self.config.get('ec2', 'default_instance_type')
num_ephemeral = self.config.default_num_ephemeral()
request['InstanceType'] = instance_type
if self.config.has_option('ec2', 'aws_ami'):
image_id = self.config.get('ec2', 'aws_ami')
else:
session = boto3.session.Session()
image_id = self.config.get_image_id(instance_type, session.region_name)
if not image_id:
exit('ERROR - Image not found for instance type: '+instance_type)
request['ImageId'] = image_id
bdm = [{'DeviceName': '/dev/sda1',
'Ebs': {'DeleteOnTermination': True}}]
for i in range(0, num_ephemeral):
device = {'DeviceName': self.config.device_root + chr(ord('b') + i),
'VirtualName': self.config.ephemeral_root + str(i)}
bdm.append(device)
request['BlockDeviceMappings'] = bdm
if self.config.has_option('ec2', 'key_name'):
request['KeyName'] = self.config.get('ec2', 'key_name')
tags = [{'Key': 'Name', 'Value': self.config.cluster_name + '-' + hostname},
{'Key': 'Muchos', 'Value': self.config.cluster_name}]
for key, val in self.config.instance_tags().iteritems():
tags.append({'Key': key, 'Value': val})
request['TagSpecifications'] = [{'ResourceType': 'instance', 'Tags': tags}]
ec2 = boto3.client('ec2')
response = None
try:
response = ec2.run_instances(**request)
except ClientError as e:
exit("ERROR - Failed to launch EC2 instance due to exception:\n\n{0}\n\n{1}".format(e, AMI_HELP_MSG))
if response is None or len(response['Instances']) != 1:
exit('ERROR - Failed to start {0} node'.format(hostname))
print 'Launching {0} node using {1}'.format(hostname, image_id)
return response['Instances'][0]
def create_security_group(self):
ec2 = boto3.client('ec2')
sg = self.config.sg_name
create_group = True
group_id = None
try:
response = ec2.describe_security_groups(Filters=[{'Name': 'group-name', 'Values': [sg]}])
if len(response['SecurityGroups']) > 0:
group_id = response['SecurityGroups'][0]['GroupId']
create_group = False
except ClientError:
pass
if create_group:
print "Creating security group " + sg
request = {'Description': "Security group created by Muchos", 'GroupName': sg}
if self.config.has_option('ec2', 'vpc_id'):
request['VpcId'] = self.config.get('ec2', 'vpc_id')
response = ec2.create_security_group(**request)
group_id = response['GroupId']
ec2.authorize_security_group_ingress(GroupName=sg, SourceSecurityGroupName=sg)
ec2.authorize_security_group_ingress(GroupName=sg, IpProtocol='tcp', FromPort=22, ToPort=22,
CidrIp='0.0.0.0/0')
return group_id
def delete_security_group(self):
ec2 = boto3.client('ec2')
print "Attempting to delete security group '{0}'...".format(self.config.sg_name)
sg_exists = True
while sg_exists:
try:
request = {'GroupName': self.config.sg_name}
if self.config.has_option('ec2', 'vpc_id'):
request['VpcId'] = self.config.get('ec2', 'vpc_id')
ec2.delete_security_group(**request)
sg_exists = False
except ClientError as e:
print "Failed to delete security group '{0}' due exception below:\n{1}\nRetrying in 10 sec..."\
.format(self.config.sg_name, e)
time.sleep(10)
print "Deleted security group"
def launch(self):
if self.active_nodes():
exit('ERROR - There are already instances running for {0} cluster'.format(self.config.cluster_name))
if isfile(self.config.hosts_path):
exit("ERROR - A hosts file already exists at {0}. Please delete before running launch again"
.format(self.config.hosts_path))
session = boto3.session.Session()
self.config.verify_launch(session.region_name)
print "Launching {0} cluster".format(self.config.cluster_name)
if self.config.has_option('ec2', 'security_group_id'):
self.config.sg_id = self.config.get('ec2', 'security_group_id')
else:
self.config.sg_id = self.create_security_group()
instance_d = {}
for (hostname, services) in self.config.nodes().items():
instance = self.launch_node(hostname, services)
instance_d[instance['InstanceId']] = hostname
num_running = len(self.status(['running']))
num_expected = len(self.config.nodes())
while num_running != num_expected:
print "{0} of {1} nodes have started. Waiting another 5 sec..".format(num_running, num_expected)
time.sleep(5)
num_running = len(self.status(['running']))
with open(self.config.hosts_path, 'w') as hosts_file:
for instance in self.status(['running']):
public_ip = ''
if 'PublicIpAddress' in instance:
public_ip = instance['PublicIpAddress']
private_ip = instance['PrivateIpAddress']
hostname = instance_d[instance['InstanceId']]
print >>hosts_file, hostname, private_ip, public_ip
print "All {0} nodes have started. Created hosts file at {1}".format(num_expected, self.config.hosts_path)
def sync(self):
config = self.config
print 'Syncing ansible directory on {0} cluster proxy node'.format(config.cluster_name)
host_vars = HOST_VAR_DEFAULTS
play_vars = PLAY_VAR_DEFAULTS
for section in ("general", "ansible-vars", config.get('performance', 'profile')):
for (name, value) in config.items(section):
if name not in ('proxy_hostname', 'proxy_socks_port'):
if name in host_vars:
host_vars[name] = value
if name in play_vars:
play_vars[name] = value
cloud_provider = host_vars.get('cloud_provider', 'ec2')
node_type_map = {}
if cloud_provider == 'ec2':
node_type_map = config.node_type_map()
play_vars["mount_root"] = config.mount_root
play_vars["metrics_drive_ids"] = config.metrics_drive_ids()
play_vars["fstype"] = config.fstype()
play_vars["force_format"] = config.force_format()
if cloud_provider == 'baremetal':
play_vars["mount_root"] = config.get("baremetal", "mount_root")
play_vars["metrics_drive_ids"] = config.get("baremetal", "metrics_drives_ids").split(",")
mounts = config.get("baremetal", "mounts").split(",")
devices = config.get("baremetal", "devices").split(",")
for node_type in 'default', 'worker':
node_type_map[node_type] = {'mounts': mounts, 'devices': devices}
play_vars["node_type_map"] = node_type_map
host_vars['worker_data_dirs'] = str(node_type_map['worker']['mounts'])
host_vars['default_data_dirs'] = str(node_type_map['default']['mounts'])
with open(join(config.deploy_path, "ansible/site.yml"), 'w') as site_file:
print >>site_file, "- include: common.yml"
print >>site_file, "- include: accumulo.yml"
if config.has_service("mesosmaster"):
print >>site_file, "- include: mesos.yml"
if config.has_service("metrics"):
print >>site_file, "- include: metrics.yml"
if config.has_service('fluo'):
print >>site_file, "- include: fluo.yml"
ansible_conf = join(config.deploy_path, "ansible/conf")
with open(join(ansible_conf, "hosts"), 'w') as hosts_file:
print >>hosts_file, "[proxy]\n{0}".format(config.proxy_hostname())
print >>hosts_file, "\n[accumulomaster]\n{0}".format(config.get_service_hostnames("accumulomaster")[0])
print >>hosts_file, "\n[namenode]\n{0}".format(config.get_service_hostnames("namenode")[0])
print >>hosts_file, "\n[resourcemanager]\n{0}".format(config.get_service_hostnames("resourcemanager")[0])
if config.has_service("mesosmaster"):
print >>hosts_file, "\n[mesosmaster]\n{0}".format(config.get_service_hostnames("mesosmaster")[0])
if config.has_service("metrics"):
print >>hosts_file, "\n[metrics]\n{0}".format(config.get_service_hostnames("metrics")[0])
print >>hosts_file, "\n[zookeepers]"
for (index, zk_host) in enumerate(config.get_service_hostnames("zookeeper"), start=1):
print >>hosts_file, "{0} id={1}".format(zk_host, index)
if config.has_service('fluo'):
print >>hosts_file, "\n[fluo]"
for host in config.get_service_hostnames("fluo"):
print >>hosts_file, host
print >>hosts_file, "\n[workers]"
for worker_host in config.get_service_hostnames("worker"):
print >>hosts_file, worker_host
print >>hosts_file, "\n[accumulo:children]\naccumulomaster\nworkers"
print >>hosts_file, "\n[hadoop:children]\nnamenode\nresourcemanager\nworkers"
print >>hosts_file, "\n[nodes]"
for (private_ip, hostname) in config.get_private_ip_hostnames():
print >>hosts_file, "{0} ansible_ssh_host={1} node_type={2}".format(hostname, private_ip,
config.node_type(hostname))
print >>hosts_file, "\n[all:vars]"
for (name, value) in sorted(host_vars.items()):
print >>hosts_file, "{0} = {1}".format(name, value)
with open(join(config.deploy_path, "ansible/group_vars/all"), 'w') as play_vars_file:
for (name, value) in sorted(play_vars.items()):
print >>play_vars_file, "{0}: {1}".format(name, value)
# copy keys file to ansible/conf (if it exists)
conf_keys = join(config.deploy_path, "conf/keys")
ansible_keys = join(ansible_conf, "keys")
if isfile(conf_keys):
shutil.copyfile(conf_keys, ansible_keys)
else:
open(ansible_keys, 'w').close()
basedir = config.get('general', 'cluster_basedir')
cmd = "rsync -az --delete -e \"ssh -o 'StrictHostKeyChecking no'\""
subprocess.call("{cmd} {src} {usr}@{ldr}:{tdir}".format(cmd=cmd, src=join(config.deploy_path, "ansible"),
usr=config.get('general', 'cluster_user'), ldr=config.get_proxy_ip(), tdir=basedir),
shell=True)
self.exec_on_proxy_verified("{0}/ansible/scripts/install_ansible.sh".format(basedir), opts='-t')
def setup(self):
config = self.config
print 'Setting up {0} cluster'.format(config.cluster_name)
self.sync()
conf_upload = join(config.deploy_path, "conf/upload")
accumulo_tarball = join(conf_upload, "accumulo-{0}-bin.tar.gz".format(config.version("accumulo")))
fluo_tarball = join(conf_upload, "fluo-{0}-bin.tar.gz".format(config.version("fluo")))
basedir = config.get('general', 'cluster_basedir')
cluster_tarballs = "{0}/tarballs".format(basedir)
self.exec_on_proxy_verified("mkdir -p {0}".format(cluster_tarballs))
if isfile(accumulo_tarball):
self.send_to_proxy(accumulo_tarball, cluster_tarballs)
if isfile(fluo_tarball) and config.has_service('fluo'):
self.send_to_proxy(fluo_tarball, cluster_tarballs)
self.execute_playbook("site.yml")
def status(self, states):
ec2 = boto3.client('ec2')
response = ec2.describe_instances(Filters=[{'Name': 'tag:Muchos', 'Values': [self.config.cluster_name]}])
nodes = []
for res in response['Reservations']:
for inst in res['Instances']:
if inst['State']['Name'] in states:
nodes.append(inst)
return nodes
def active_nodes(self):
return self.status(['pending', 'running', 'stopping', 'stopped'])
@staticmethod
def print_nodes(nodes):
for node in nodes:
name = 'Unknown'
for tag in node['Tags']:
if tag['Key'] == 'Name':
name = tag['Value']
print " ", name, node['InstanceId'], node['PrivateIpAddress'], node.get('PublicIpAddress', '')
def terminate(self, hosts_path):
nodes = self.active_nodes()
print "The following {0} nodes in {1} cluster will be terminated:".format(len(nodes), self.config.cluster_name)
self.print_nodes(nodes)
response = raw_input("Do you want to continue? (y/n) ")
if response == "y":
ec2 = boto3.client('ec2')
for node in nodes:
ec2.terminate_instances(InstanceIds=[node['InstanceId']])
print "Terminated nodes."
if not self.config.has_option('ec2', 'security_group_id'):
self.delete_security_group()
if isfile(hosts_path):
os.remove(hosts_path)
print "Removed hosts file at ", hosts_path
else:
print "Aborted termination"
def ssh(self):
self.wait_until_proxy_ready()
fwd = ''
if self.config.has_option('general', 'proxy_socks_port'):
fwd = "-D " + self.config.get('general', 'proxy_socks_port')
ssh_command = "ssh -C -A -o 'StrictHostKeyChecking no' {fwd} {usr}@{ldr}".format(
usr=self.config.get('general', 'cluster_user'), ldr=self.config.get_proxy_ip(), fwd=fwd)
print "Logging into proxy using: {0}".format(ssh_command)
retcode = subprocess.call(ssh_command, shell=True)
if retcode != 0:
exit("ERROR - Command failed with return code of {0}: {1}".format(retcode, ssh_command))
def exec_on_proxy(self, command, opts=''):
ssh_command = "ssh -A -o 'StrictHostKeyChecking no' {opts} {usr}@{ldr} '{cmd}'".format(
usr=self.config.get('general', 'cluster_user'),
ldr=self.config.get_proxy_ip(), cmd=command, opts=opts)
return subprocess.call(ssh_command, shell=True), ssh_command
def exec_on_proxy_verified(self, command, opts=''):
(retcode, ssh_command) = self.exec_on_proxy(command, opts)
if retcode != 0:
exit("ERROR - Command failed with return code of {0}: {1}".format(retcode, ssh_command))
def wait_until_proxy_ready(self):
cluster_user = self.config.get('general', 'cluster_user')
print "Checking if '{0}' proxy can be reached using: ssh {1}@{2}"\
.format(self.config.proxy_hostname(), cluster_user, self.config.get_proxy_ip())
while True:
(retcode, ssh_command) = self.exec_on_proxy('pwd > /dev/null')
if retcode == 0:
print "Connected to proxy using SSH!"
time.sleep(1)
break
print "Proxy could not be accessed using SSH. Will retry in 5 sec..."
time.sleep(5)
def execute_playbook(self, playbook):
print "Executing '{0}' playbook".format(playbook)
basedir = self.config.get('general', 'cluster_basedir')
self.exec_on_proxy_verified("time -p ansible-playbook {base}/ansible/{playbook}"
.format(base=basedir, playbook=playbook), opts='-t')
def send_to_proxy(self, path, target, skip_if_exists=True):
print "Copying to proxy: ", path
cmd = "scp -o 'StrictHostKeyChecking no'"
if skip_if_exists:
cmd = "rsync --update --progress -e \"ssh -o 'StrictHostKeyChecking no'\""
subprocess.call("{cmd} {src} {usr}@{ldr}:{tdir}".format(
cmd=cmd, src=path, usr=self.config.get('general', 'cluster_user'), ldr=self.config.get_proxy_ip(),
tdir=target), shell=True)
def main():
deploy_path = os.environ.get('MUCHOS_HOME')
if not deploy_path:
exit('ERROR - MUCHOS_HOME env variable must be set!')
if not os.path.isdir(deploy_path):
exit('ERROR - Directory set by MUCHOS_HOME does not exist: '+deploy_path)
config_path = join(deploy_path, "conf/muchos.props")
if not isfile(config_path):
exit('ERROR - A config file does not exist at '+config_path)
hosts_dir = join(deploy_path, "conf/hosts/")
# parse command line args
retval = parse_args(hosts_dir)
if not retval:
print "Invalid command line arguments. For help, use 'muchos -h'"
sys.exit(1)
(opts, action, args) = retval
hosts_path = join(hosts_dir, opts.cluster)
config = DeployConfig(deploy_path, config_path, hosts_path, opts.cluster)
config.verify_config(action)
cluster = MuchosCluster(config)
if action == 'launch':
cluster.launch()
elif action == 'status':
nodes = cluster.status(['running'])
print "Found {0} nodes in {1} cluster".format(len(nodes), config.cluster_name)
cluster.print_nodes(nodes)
elif action == 'sync':
cluster.sync()
elif action == 'setup':
cluster.setup()
elif action == 'config':
if opts.property == 'all':
config.print_all()
else:
config.print_property(opts.property)
elif action == 'ssh':
cluster.ssh()
elif action in ('wipe', 'kill'):
if not isfile(hosts_path):
exit("Hosts file does not exist for cluster: "+hosts_path)
if action == 'wipe':
print "Killing all processes started by Muchos and wiping Muchos data from {0} cluster"\
.format(config.cluster_name)
elif action == 'kill':
print "Killing all processes started by Muchos on {0} cluster".format(config.cluster_name)
cluster.execute_playbook(action + ".yml")
elif action == 'terminate':
cluster.terminate(hosts_path)
else:
print 'ERROR - Unknown action:', action
main()