blob: e7fc78cf163ece9ec8c7f018e6e0435297862a65 [file] [log] [blame]
#!/usr/bin/env python3
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
from sys import exit
from botocore.exceptions import ClientError
from .util import AMI_HELP_MSG, get_block_device_map
from os.path import isfile
import time
import boto3
from .existing import ExistingCluster
import json
from string import Template
class Ec2Cluster(ExistingCluster):
def __init__(self, config):
ExistingCluster.__init__(self, config)
def launch_node(self, hostname, services, sg_id):
request = self.init_request(hostname, services, sg_id)
request['MinCount'] = 1
request['MaxCount'] = 1
tags = [{'Key': 'Name', 'Value': self.config.cluster_name + '-' + hostname},
{'Key': 'Muchos', 'Value': self.config.cluster_name}]
for key, val in self.config.instance_tags().items():
tags.append({'Key': key, 'Value': val})
request['TagSpecifications'] = [{'ResourceType': 'instance', 'Tags': tags}]
if self.config.has_option('ec2', 'user_data_path'):
user_data_path = self.config.get('ec2', 'user_data_path')
with open(user_data_path, 'r') as user_data_file:
user_data = user_data_file.read()
request['UserData'] = user_data
ec2 = boto3.client('ec2')
response = None
try:
response = ec2.run_instances(**request)
except ClientError as e:
exit("ERROR - Failed to launch EC2 instance due to exception:\n\n{0}\n\n{1}".format(e, AMI_HELP_MSG))
if response is None or len(response['Instances']) != 1:
exit('ERROR - Failed to start {0} node'.format(hostname))
print('Launching {0} node using {1}'.format(hostname, request['ImageId']))
return response['Instances'][0]
def create_security_group(self):
ec2 = boto3.client('ec2')
sg = self.config.sg_name
create_group = True
group_id = None
try:
response = ec2.describe_security_groups(Filters=[{'Name': 'group-name', 'Values': [sg]}])
if len(response['SecurityGroups']) > 0:
group_id = response['SecurityGroups'][0]['GroupId']
create_group = False
except ClientError:
pass
if create_group:
print("Creating security group " + sg)
request = {'Description': "Security group created by Muchos", 'GroupName': sg}
if self.config.has_option('ec2', 'vpc_id'):
request['VpcId'] = self.config.get('ec2', 'vpc_id')
response = ec2.create_security_group(**request)
group_id = response['GroupId']
ec2.authorize_security_group_ingress(GroupName=sg, SourceSecurityGroupName=sg)
ec2.authorize_security_group_ingress(GroupName=sg, IpProtocol='tcp', FromPort=22, ToPort=22,
CidrIp='0.0.0.0/0')
return group_id
def delete_security_group(self):
sg_id = None
ec2 = boto3.client('ec2')
try:
response = ec2.describe_security_groups(Filters=[{'Name': 'group-name', 'Values': [self.config.sg_name]}])
if len(response['SecurityGroups']) > 0:
sg_id = response['SecurityGroups'][0]['GroupId']
except ClientError:
pass
if not sg_id:
print("Could not find security group '{0}'".format(self.config.sg_name))
return
print("Attempting to delete security group '{0}' with id '{1}'...".format(self.config.sg_name, sg_id))
sg_exists = True
while sg_exists:
try:
request = {'GroupId': sg_id}
ec2.delete_security_group(**request)
sg_exists = False
except ClientError as e:
print("Failed to delete security group '{0}' due exception below:\n{1}\nRetrying in 10 sec..."
.format(self.config.sg_name, e))
time.sleep(10)
print("Deleted security group")
def init_request(self, hostname, services, sg_id):
associate_public_ip = True
if self.config.has_option('ec2', 'associate_public_ip'):
associate_public_ip = self.config.get('ec2', 'associate_public_ip').strip().lower() == 'true'
request = {'NetworkInterfaces': [{'DeviceIndex': 0, 'AssociatePublicIpAddress': associate_public_ip,
'Groups': [sg_id]}]}
if self.config.has_option('ec2', 'subnet_id'):
request['NetworkInterfaces'][0]['SubnetId'] = self.config.get('ec2', 'subnet_id')
if 'worker' in services:
instance_type = self.config.get('ec2', 'worker_instance_type')
else:
instance_type = self.config.get('ec2', 'default_instance_type')
request['InstanceType'] = instance_type
request['InstanceInitiatedShutdownBehavior'] = self.config.get('ec2', 'shutdown_behavior')
if not self.config.has_option('ec2', 'aws_ami'):
exit('aws_ami property must be set!')
image_id = self.config.get('ec2', 'aws_ami')
if not image_id:
exit('aws_ami property was not properly')
request['ImageId'] = image_id
request['BlockDeviceMappings'] = get_block_device_map(instance_type)
if self.config.has_option('ec2', 'key_name'):
request['KeyName'] = self.config.get('ec2', 'key_name')
return request
def launch(self):
if self.active_nodes():
exit('ERROR - There are already instances running for {0} cluster'.format(self.config.cluster_name))
if isfile(self.config.hosts_path):
exit("ERROR - A hosts file already exists at {0}. Please delete before running launch again"
.format(self.config.hosts_path))
self.config.verify_launch()
print("Launching {0} cluster".format(self.config.cluster_name))
if self.config.has_option('ec2', 'security_group_id'):
sg_id = self.config.get('ec2', 'security_group_id')
else:
sg_id = self.create_security_group()
instance_d = {}
for (hostname, services) in list(self.config.nodes().items()):
instance = self.launch_node(hostname, services, sg_id)
instance_d[instance['InstanceId']] = hostname
num_running = len(self.get_status(['running']))
num_expected = len(self.config.nodes())
while num_running != num_expected:
print("{0} of {1} nodes have started. Waiting another 5 sec..".format(num_running, num_expected))
time.sleep(5)
num_running = len(self.get_status(['running']))
with open(self.config.hosts_path, 'w') as hosts_file:
for instance in self.get_status(['running']):
public_ip = ''
if 'PublicIpAddress' in instance:
public_ip = instance['PublicIpAddress']
private_ip = instance['PrivateIpAddress']
hostname = instance_d[instance['InstanceId']]
print("{0} {1} {2}".format(hostname, private_ip, public_ip), file=hosts_file)
print("All {0} nodes have started. Created hosts file at {1}".format(num_expected, self.config.hosts_path))
def status(self):
nodes = self.get_status(['running'])
print("Found {0} nodes in {1} cluster".format(len(nodes), self.config.cluster_name))
self.print_nodes(nodes)
def get_status(self, states):
ec2 = boto3.client('ec2')
response = ec2.describe_instances(Filters=[{'Name': 'tag:Muchos', 'Values': [self.config.cluster_name]}])
nodes = []
for res in response['Reservations']:
for inst in res['Instances']:
if inst['State']['Name'] in states:
nodes.append(inst)
return nodes
def active_nodes(self):
return self.get_status(['pending', 'running', 'stopping', 'stopped'])
@staticmethod
def print_nodes(nodes):
for node in nodes:
name = 'Unknown'
for tag in node['Tags']:
if tag['Key'] == 'Name':
name = tag['Value']
print(" ", name, node['InstanceId'], node['PrivateIpAddress'], node.get('PublicIpAddress', ''))
def terminate(self):
nodes = self.active_nodes()
print("The following {0} nodes in {1} cluster will be terminated:".format(len(nodes), self.config.cluster_name))
self.print_nodes(nodes)
response = input("Do you want to continue? (y/n) ")
if response == "y":
ec2 = boto3.client('ec2')
for node in nodes:
ec2.terminate_instances(InstanceIds=[node['InstanceId']])
print("Terminated nodes.")
if not self.config.has_option('ec2', 'security_group_id'):
self.delete_security_group()
if isfile(self.config.hosts_path):
os.remove(self.config.hosts_path)
print("Removed hosts file at ", self.config.hosts_path)
else:
print("Aborted termination")
def wipe(self):
self.execute_playbook("wipe.yml")
class Ec2ClusterTemplate(Ec2Cluster):
def __init__(self, config):
Ec2Cluster.__init__(self, config)
def launch(self):
print("Using cluster template '{0}' to launch nodes".format(self.config.cluster_template_d['id']))
super().launch()
def init_request(self, hostname, services, sg_id):
# the first service in the list denotes the node's target template
print("Template '{0}' selected for {1}".format(services[0], hostname))
# interpolate any values from the ec2 config section and create request
ec2_d = dict(self.config.items('ec2'))
ec2_d['security_group_id'] = sg_id
return json.loads(Template(self.config.cluster_template_d[services[0]]).substitute(ec2_d))