blob: 23e82ebed1bd9717e03dec45b08cff90bb15d40b [file] [log] [blame]
#!/usr/bin/env python3
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
from sys import exit
from botocore.exceptions import ClientError
from .util import AMI_HELP_MSG, get_block_device_map
from os import path
import time
import boto3
from .existing import ExistingCluster
import json
from string import Template
class Ec2Cluster(ExistingCluster):
def __init__(self, config):
ExistingCluster.__init__(self, config)
def launch_node(self, hostname, services, sg_id):
request = self.init_request(hostname, services, sg_id)
request["MinCount"] = 1
request["MaxCount"] = 1
tags = [
{
"Key": "Name",
"Value": self.config.cluster_name + "-" + hostname,
},
{"Key": "Muchos", "Value": self.config.cluster_name},
]
for key, val in self.config.instance_tags().items():
tags.append({"Key": key, "Value": val})
request["TagSpecifications"] = [
{"ResourceType": "instance", "Tags": tags}
]
if self.config.has_option("ec2", "user_data_path"):
user_data_path = self.config.get("ec2", "user_data_path")
with open(user_data_path, "r") as user_data_file:
user_data = user_data_file.read()
request["UserData"] = user_data
ec2 = boto3.client("ec2")
response = None
try:
response = ec2.run_instances(**request)
except ClientError as e:
exit(
"ERROR - Failed to launch EC2 instance due to exception:"
"\n\n{0}\n\n{1}".format(e, AMI_HELP_MSG)
)
if response is None or len(response["Instances"]) != 1:
exit("ERROR - Failed to start {0} node".format(hostname))
print(
"Launching {0} node using {1}".format(hostname, request["ImageId"])
)
return response["Instances"][0]
def create_security_group(self):
ec2 = boto3.client("ec2")
sg = self.config.sg_name
create_group = True
group_id = None
try:
response = ec2.describe_security_groups(
Filters=[{"Name": "group-name", "Values": [sg]}]
)
if len(response["SecurityGroups"]) > 0:
group_id = response["SecurityGroups"][0]["GroupId"]
create_group = False
except ClientError:
pass
if create_group:
print("Creating security group " + sg)
request = {
"Description": "Security group created by Muchos",
"GroupName": sg,
}
if self.config.has_option("ec2", "vpc_id"):
request["VpcId"] = self.config.get("ec2", "vpc_id")
response = ec2.create_security_group(**request)
group_id = response["GroupId"]
ec2.authorize_security_group_ingress(
GroupName=sg, SourceSecurityGroupName=sg
)
ec2.authorize_security_group_ingress(
GroupName=sg,
IpProtocol="tcp",
FromPort=22,
ToPort=22,
CidrIp="0.0.0.0/0",
)
return group_id
def delete_security_group(self):
sg_id = None
ec2 = boto3.client("ec2")
try:
response = ec2.describe_security_groups(
Filters=[
{"Name": "group-name", "Values": [self.config.sg_name]}
]
)
if len(response["SecurityGroups"]) > 0:
sg_id = response["SecurityGroups"][0]["GroupId"]
except ClientError:
pass
if not sg_id:
print(
"Could not find security group '{0}'".format(
self.config.sg_name
)
)
return
print(
"Attempting to delete security group '{0}' "
"with id '{1}'...".format(self.config.sg_name, sg_id)
)
sg_exists = True
while sg_exists:
try:
request = {"GroupId": sg_id}
ec2.delete_security_group(**request)
sg_exists = False
except ClientError as e:
print(
"Failed to delete security group '{0}' due to "
"exception below:\n{1}\nRetrying in 10 sec...".format(
self.config.sg_name, e
)
)
time.sleep(10)
print("Deleted security group")
def init_request(self, hostname, services, sg_id):
associate_public_ip = True
if self.config.has_option("ec2", "associate_public_ip"):
associate_public_ip = (
self.config.get("ec2", "associate_public_ip").strip().lower()
== "true"
)
request = {
"NetworkInterfaces": [
{
"DeviceIndex": 0,
"AssociatePublicIpAddress": associate_public_ip,
"Groups": [sg_id],
}
]
}
if self.config.has_option("ec2", "subnet_id"):
request["NetworkInterfaces"][0]["SubnetId"] = self.config.get(
"ec2", "subnet_id"
)
if "worker" in services:
instance_type = self.config.get("ec2", "worker_instance_type")
else:
instance_type = self.config.get("ec2", "default_instance_type")
request["InstanceType"] = instance_type
request["InstanceInitiatedShutdownBehavior"] = self.config.get(
"ec2", "shutdown_behavior"
)
if not self.config.has_option("ec2", "aws_ami"):
exit("aws_ami property must be set!")
image_id = self.config.get("ec2", "aws_ami")
if not image_id:
exit("aws_ami property was not properly")
request["ImageId"] = image_id
request["BlockDeviceMappings"] = get_block_device_map(instance_type)
if self.config.has_option("ec2", "key_name"):
request["KeyName"] = self.config.get("ec2", "key_name")
return request
def launch(self):
if self.active_nodes():
exit(
"ERROR - There are already instances "
"running for {0} cluster".format(self.config.cluster_name)
)
if path.isfile(self.config.hosts_path):
exit(
"ERROR - A hosts file already exists at {0}. "
"Please delete before running launch again".format(
self.config.hosts_path
)
)
self.config.verify_launch()
print("Launching {0} cluster".format(self.config.cluster_name))
if self.config.has_option("ec2", "security_group_id"):
sg_id = self.config.get("ec2", "security_group_id")
else:
sg_id = self.create_security_group()
instance_d = {}
for hostname, services in list(self.config.nodes().items()):
instance = self.launch_node(hostname, services, sg_id)
instance_d[instance["InstanceId"]] = hostname
num_running = len(self.get_status(["running"]))
num_expected = len(self.config.nodes())
while num_running != num_expected:
print(
"{0} of {1} nodes have started. "
"Waiting another 5 sec..".format(num_running, num_expected)
)
time.sleep(5)
num_running = len(self.get_status(["running"]))
with open(self.config.hosts_path, "w") as hosts_file:
for instance in self.get_status(["running"]):
public_ip = ""
if "PublicIpAddress" in instance:
public_ip = instance["PublicIpAddress"]
private_ip = instance["PrivateIpAddress"]
hostname = instance_d[instance["InstanceId"]]
print(
"{0} {1} {2}".format(hostname, private_ip, public_ip),
file=hosts_file,
)
print(
"All {0} nodes have started. Created hosts file at {1}".format(
num_expected, self.config.hosts_path
)
)
def status(self):
nodes = self.get_status(["running"])
print(
"Found {0} nodes in {1} cluster".format(
len(nodes), self.config.cluster_name
)
)
self.print_nodes(nodes)
def get_status(self, states):
ec2 = boto3.client("ec2")
response = ec2.describe_instances(
Filters=[
{"Name": "tag:Muchos", "Values": [self.config.cluster_name]}
]
)
nodes = []
for res in response["Reservations"]:
for inst in res["Instances"]:
if inst["State"]["Name"] in states:
nodes.append(inst)
return nodes
def active_nodes(self):
return self.get_status(["pending", "running", "stopping", "stopped"])
@staticmethod
def print_nodes(nodes):
for node in nodes:
name = "Unknown"
for tag in node["Tags"]:
if tag["Key"] == "Name":
name = tag["Value"]
print(
" ",
name,
node["InstanceId"],
node["PrivateIpAddress"],
node.get("PublicIpAddress", ""),
)
def stop(self):
nodes = self.active_nodes()
print(
"The following {0} nodes in {1} cluster "
"will be stopped:".format(len(nodes), self.config.cluster_name)
)
ec2 = boto3.client("ec2")
for node in nodes:
ec2.stop_instances(InstanceIds=[node["InstanceId"]])
self.print_nodes(nodes)
print("Stopped nodes.")
def start(self):
nodes = self.active_nodes()
print(
"The following {0} nodes in {1} cluster "
"will be started:".format(len(nodes), self.config.cluster_name)
)
ec2 = boto3.client("ec2")
for node in nodes:
ec2.start_instances(InstanceIds=[node["InstanceId"]])
self.print_nodes(nodes)
print("Started nodes.")
def terminate(self):
nodes = self.active_nodes()
print(
"The following {0} nodes in {1} cluster "
"will be terminated:".format(len(nodes), self.config.cluster_name)
)
self.print_nodes(nodes)
response = input("Do you want to continue? (y/n) ")
if response == "y":
ec2 = boto3.client("ec2")
for node in nodes:
ec2.terminate_instances(InstanceIds=[node["InstanceId"]])
print("Terminated nodes.")
if not self.config.has_option("ec2", "security_group_id"):
self.delete_security_group()
if path.isfile(self.config.hosts_path):
os.remove(self.config.hosts_path)
print("Removed hosts file at ", self.config.hosts_path)
else:
print("Aborted termination")
def wipe(self):
super().wipe()
class Ec2ClusterTemplate(Ec2Cluster):
def __init__(self, config):
Ec2Cluster.__init__(self, config)
def launch(self):
print(
"Using cluster template '{0}' to launch nodes".format(
self.config.cluster_template_d["id"]
)
)
super().launch()
def init_request(self, hostname, services, sg_id):
# the first service in the list denotes the node's target template
print("Template '{0}' selected for {1}".format(services[0], hostname))
# interpolate any values from the ec2 config section and create request
ec2_d = dict(self.config.items("ec2"))
ec2_d["security_group_id"] = sg_id
return json.loads(
Template(self.config.cluster_template_d[services[0]]).substitute(
ec2_d
)
)