blob: 883c89ea61c760b7e452552da3a61452787805a8 [file] [log] [blame]
# *****************************************************************************
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# ******************************************************************************
import boto3
from botocore.client import Config
import json, urllib2
import time
import logging
import traceback
import sys
import backoff
import random
import string
from dlab.fab import *
import actions_lib
def get_instance_hostname(tag_name, instance_name):
try:
public = ''
private = ''
ec2 = boto3.resource('ec2')
instances = ec2.instances.filter(
Filters=[{'Name': 'tag:{}'.format(tag_name), 'Values': [instance_name]},
{'Name': 'instance-state-name', 'Values': ['running']}])
for instance in instances:
public = getattr(instance, 'public_dns_name')
private = getattr(instance, 'private_dns_name')
if public:
return public
else:
return private
if public == '' and private == '':
raise Exception("Unable to find instance hostname with instance name: " + instance_name)
except Exception as err:
logging.error("Error with finding instance hostname with instance name: " + instance_name + " : " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Error with finding instance hostname", "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_vpc_endpoints(vpc_id):
try:
# Returns LIST of Endpoint DICTIONARIES
ec2 = boto3.client('ec2')
endpoints = ec2.describe_vpc_endpoints(
Filters=[{
'Name': 'vpc-id',
'Values': [vpc_id]
}]
).get('VpcEndpoints')
return endpoints
except Exception as err:
logging.error("Error with getting VPC Endpoints: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Error with getting VPC Endpoints", "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_route_tables(vpc, tags):
try:
ec2 = boto3.client('ec2')
tag_name = json.loads(tags).get('Key')
tag_value = json.loads(tags).get('Value')
rts = []
result = ec2.describe_route_tables(
Filters=[
{'Name': 'vpc-id', 'Values': [vpc]},
{'Name': 'tag-key', 'Values': [tag_name]},
{'Name': 'tag-value', 'Values': [tag_value]}
]
).get('RouteTables')
for i in result:
rts.append(i.get('RouteTableId'))
return rts
except Exception as err:
logging.error("Error with getting Route tables: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Error with getting Route tables", "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_bucket_by_name(bucket_name):
try:
s3 = boto3.resource('s3', config=Config(signature_version='s3v4'))
for bucket in s3.buckets.all():
if bucket.name == bucket_name:
return bucket.name
return ''
except Exception as err:
logging.error("Error with getting bucket by name: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Error with getting bucket by name", "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_instance_ip_address(tag_name, instance_name):
try:
ec2 = boto3.resource('ec2')
instances = ec2.instances.filter(
Filters=[{'Name': 'tag:{}'.format(tag_name), 'Values': [instance_name]},
{'Name': 'instance-state-name', 'Values': ['running']}])
ips = {}
for instance in instances:
public = getattr(instance, 'public_ip_address')
private = getattr(instance, 'private_ip_address')
ips = {'Public': public, 'Private': private}
if ips == {}:
raise Exception("Unable to find instance IP addresses with instance name: " + instance_name)
return ips
except Exception as err:
logging.error("Error with getting ip address by name: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Error with getting ip address by name", "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_instance_ip_address_by_id(instance_id):
try:
ec2 = boto3.resource('ec2')
instances = ec2.instances.filter(
Filters = [{'Name': 'instance-id', 'Values': [instance_id]},
{'Name': 'instance-state-name', 'Values': ['running']}])
ips = {}
for instance in instances:
public = getattr(instance, 'public_ip_address')
private = getattr(instance, 'private_ip_address')
ips = {'Public': public, 'Private': private}
if ips == {}:
raise Exception("Unable to find instance IP addresses with instance id: " + instance_id)
return ips
except Exception as err:
logging.error("Error with getting ip address by id: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Error with getting ip address by id", "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_instance_private_ip_address(tag_name, instance_name):
try:
actions_lib.create_aws_config_files()
return get_instance_ip_address(tag_name, instance_name).get('Private')
except Exception as err:
logging.error("Error with getting private ip address by name: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Error with getting private ip address by name", "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
@backoff.on_predicate(backoff.fibo, max_tries=5)
def get_ami_id_by_name(ami_name, state="*"):
ec2 = boto3.resource('ec2')
try:
for image in ec2.images.filter(Filters=[{'Name': 'name', 'Values': [ami_name]}, {'Name': 'state', 'Values': [state]}]):
return image.id
except Exception as err:
logging.error("Error with getting AMI ID by name: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Error with getting AMI ID by name",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
return ''
return ''
def get_ami_id_by_instance_name(instance_name):
ec2 = boto3.resource('ec2')
try:
for instance in ec2.instances.filter(Filters=[{'Name': 'tag:{}'.format('Name'), 'Values': [instance_name]}]):
return instance.image_id
except Exception as err:
logging.error("Error with getting AMI ID by instance name: " + str(
err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Error with getting AMI ID by instance name",
"error_message": str(
err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
return ''
return ''
def get_security_group_by_name(security_group_name):
try:
ec2 = boto3.resource('ec2')
for security_group in ec2.security_groups.filter(Filters=[{'Name': 'group-name', 'Values': [security_group_name]}]):
return security_group.id
except Exception as err:
logging.error("Error with getting Security Group ID by name: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with getting Security Group ID by name",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
return ''
return ''
def get_instance_attr(instance_id, attribute_name):
try:
ec2 = boto3.resource('ec2')
instances = ec2.instances.filter(
Filters=[{'Name': 'instance-id', 'Values': [instance_id]},
{'Name': 'instance-state-name', 'Values': ['running']}])
for instance in instances:
return getattr(instance, attribute_name)
return ''
except Exception as err:
logging.error("Error with getting instance attribute: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with getting instance attribute",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_instance_by_name(tag_name, instance_name):
try:
ec2 = boto3.resource('ec2')
instances = ec2.instances.filter(
Filters=[{'Name': 'tag:{}'.format(tag_name), 'Values': [instance_name]},
{'Name': 'instance-state-name', 'Values': ['running','pending','stopping','stopped']}])
for instance in instances:
return instance.id
return ''
except Exception as err:
logging.error("Error with getting instance ID by name: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with getting instance ID by name",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_role_by_name(role_name):
try:
iam = boto3.resource('iam')
for role in iam.roles.all():
if role.name == role_name:
return role.name
return ''
except Exception as err:
logging.error("Error with getting role by name: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with getting role by name",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_subnet_by_cidr(cidr, vpc_id=''):
try:
ec2 = boto3.resource('ec2')
if vpc_id:
for subnet in ec2.subnets.filter(Filters=[
{'Name': 'cidrBlock', 'Values': [cidr]},
{'Name': 'vpc-id', 'Values': [vpc_id]}
]):
return subnet.id
else:
for subnet in ec2.subnets.filter(Filters=[
{'Name': 'cidrBlock', 'Values': [cidr]}
]):
return subnet.id
return ''
except Exception as err:
logging.error("Error with getting Subnet ID by CIDR: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with getting Subnet ID by CIDR",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_subnet_by_tag(tag, subnet_id=False, vpc_id=''):
try:
ec2 = boto3.resource('ec2')
if vpc_id:
for subnet in ec2.subnets.filter(Filters=[
{'Name': 'tag-key', 'Values': [tag.get('Key')]},
{'Name': 'tag-value', 'Values': [tag.get('Value')]},
{'Name': 'vpc-id', 'Values': [vpc_id]}
]):
if subnet_id:
return subnet.id
else:
return subnet.cidr_block
else:
for subnet in ec2.subnets.filter(Filters=[
{'Name': 'tag-key', 'Values': [tag.get('Key')]},
{'Name': 'tag-value', 'Values': [tag.get('Value')]}
]):
if subnet_id:
return subnet.id
else:
return subnet.cidr_block
return ''
except Exception as err:
logging.error("Error with getting Subnet CIDR block by tag: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with getting Subnet CIDR block by tag",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_vpc_by_cidr(cidr):
try:
ec2 = boto3.resource('ec2')
for vpc in ec2.vpcs.filter(Filters=[{'Name': 'cidr', 'Values': [cidr]}]):
return vpc.id
return ''
except Exception as err:
logging.error("Error with getting VPC ID by CIDR: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with getting VPC ID by CIDR",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_cidr_by_vpc(vpc_id):
try:
client = boto3.client('ec2')
cidr = client.describe_vpcs(VpcIds=[vpc_id]).get('Vpcs')[0].get('CidrBlock')
return cidr
except Exception as err:
logging.error("Error with getting VPC CidrBlock by id: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with getting VPC CidrBlock by id",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_vpc_by_tag(tag_name, tag_value):
try:
ec2 = boto3.resource('ec2')
for vpc in ec2.vpcs.filter(Filters=[{'Name': 'tag-key', 'Values': [tag_name]}, {'Name': 'tag-value', 'Values': [tag_value]}]):
return vpc.id
return ''
except Exception as err:
logging.error("Error with getting VPC ID by tag: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with getting VPC ID by tag",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_peering_by_tag(tag_name, tag_value):
try:
client = boto3.client('ec2')
peering_id = client.describe_vpc_peering_connections(Filters=[{'Name': 'tag-key', 'Values': [tag_name]}, {'Name': 'tag-value', 'Values': [tag_value]},
{'Name': 'status-code', 'Values': ['active']}]).get('VpcPeeringConnections')[0].get('VpcPeeringConnectionId')
return peering_id
except Exception as err:
logging.error("Error with getting peering connection ID by tag: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with getting peering connection ID by tag",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_vpc_cidr_by_id(vpc_id):
try:
cidr_list = list()
ec2 = boto3.client('ec2')
for vpc in ec2.describe_vpcs(VpcIds=[vpc_id]).get('Vpcs'):
for cidr_set in vpc.get('CidrBlockAssociationSet'):
cidr_list.append(cidr_set.get('CidrBlock'))
return cidr_list
return ''
except Exception as err:
logging.error("Error with getting VPC CIDR by ID: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with getting VPC CIDR by ID",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_emr_info(id, key=''):
try:
emr = boto3.client('emr')
info = emr.describe_cluster(ClusterId=id)['Cluster']
if key:
try:
result = info[key]
except:
print("Cluster has no {} attribute".format(key))
result = info
else:
result = info
return result
except Exception as err:
logging.error("Error with getting EMR information: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with getting EMR information",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_emr_list(tag_name, type='Key', emr_count=False, emr_active=False):
try:
emr = boto3.client('emr')
if emr_count:
clusters = emr.list_clusters(
ClusterStates=['RUNNING', 'WAITING', 'STARTING', 'BOOTSTRAPPING', 'TERMINATING']
)
else:
clusters = emr.list_clusters(
ClusterStates=['RUNNING', 'WAITING', 'STARTING', 'BOOTSTRAPPING']
)
if emr_active:
clusters = emr.list_clusters(
ClusterStates=['RUNNING', 'STARTING', 'BOOTSTRAPPING', 'TERMINATING']
)
clusters = clusters.get('Clusters')
clusters_list = []
for i in clusters:
response = emr.describe_cluster(ClusterId=i.get('Id'))
time.sleep(5)
tag = response.get('Cluster').get('Tags')
for j in tag:
if tag_name in j.get(type):
clusters_list.append(i.get('Id'))
return clusters_list
except Exception as err:
logging.error("Error with getting EMR list: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with getting EMR list",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_not_configured_emr_list(tag_name, instance_name):
try:
emr = boto3.client('emr')
clusters = emr.list_clusters(ClusterStates=['WAITING'])
clusters = clusters.get('Clusters')
clusters_list = []
for i in clusters:
tags_found = 0
response = emr.describe_cluster(ClusterId=i.get('Id'))
time.sleep(5)
tag = response.get('Cluster').get('Tags')
for j in tag:
if tag_name in j.get('Key'):
tags_found += 1
if instance_name in j.get('Value'):
tags_found += 1
if tags_found >= 2:
clusters_list.append(i.get('Id'))
return clusters_list
except Exception as err:
logging.error("Error with getting not configured EMR list: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with getting not configured EMR list",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_not_configured_emr(tag_name, instance_name, return_name=False):
try:
emr = boto3.client('emr')
clusters_list = get_not_configured_emr_list(tag_name, instance_name)
if clusters_list:
for cluster_id in clusters_list:
response = emr.describe_cluster(ClusterId=cluster_id)
time.sleep(5)
tag = response.get('Cluster').get('Tags')
for j in tag:
if j.get('Value') == 'not-configured':
if return_name:
return response.get('Cluster').get('Name')
else:
return True
return False
else:
return False
except Exception as err:
logging.error("Error with getting not configured EMR list: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with getting not configured EMR list",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_emr_id_by_name(name):
try:
cluster_id = ''
emr = boto3.client('emr')
clusters = emr.list_clusters(
ClusterStates=['RUNNING', 'WAITING', 'STARTING', 'BOOTSTRAPPING']
)
clusters = clusters.get('Clusters')
for i in clusters:
response = emr.describe_cluster(ClusterId=i.get('Id'))
time.sleep(5)
if response.get('Cluster').get('Name') == name:
cluster_id = i.get('Id')
if cluster_id == '':
raise Exception("Unable to find EMR cluster by name: " + name)
return cluster_id
except Exception as err:
logging.error("Error with getting EMR ID by name: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with getting EMR ID by name",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_emr_instances_list(cluster_id, instance_type=''):
#instance_type 'MASTER' or 'CORE'
try:
emr = boto3.client('emr')
if instance_type != '':
instances = emr.list_instances(ClusterId=cluster_id, InstanceGroupTypes=[instance_type])
else:
instances = emr.list_instances(ClusterId=cluster_id)
return instances.get('Instances')
except Exception as err:
logging.error("Error with getting EMR instances list: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with getting EMR instances list",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_ec2_list(tag_name, value=''):
try:
ec2 = boto3.resource('ec2')
if value:
notebook_instances = ec2.instances.filter(
Filters=[{'Name': 'instance-state-name', 'Values': ['running', 'stopped']},
{'Name': 'tag:{}'.format(tag_name), 'Values': ['{}*'.format(value)]}])
else:
notebook_instances = ec2.instances.filter(
Filters=[{'Name': 'instance-state-name', 'Values': ['running', 'stopped']},
{'Name': 'tag:{}'.format(tag_name), 'Values': ['*nb*']}])
return notebook_instances
except Exception as err:
logging.error("Error with getting EC2 list: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with getting EC2 list",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def provide_index(resource_type, tag_name, tag_value=''):
try:
ids = []
if resource_type == 'EMR':
if tag_value:
list = get_emr_list(tag_value, 'Value', True)
else:
list = get_emr_list(tag_name, 'Key', True)
emr = boto3.client('emr')
for i in list:
response = emr.describe_cluster(ClusterId=i)
time.sleep(5)
number = response.get('Cluster').get('Name').split('-')[-1]
if number not in ids:
ids.append(int(number))
elif resource_type == 'EC2':
if tag_value:
list = get_ec2_list(tag_name, tag_value)
else:
list = get_ec2_list(tag_name)
for i in list:
for tag in i.tags:
if tag['Key'] == 'Name':
ids.append(int(tag['Value'].split('-')[-1]))
else:
print("Incorrect resource type!")
index = 1
while True:
if index not in ids:
break
else:
index += 1
return index
except Exception as err:
logging.error("Error with providing index: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with providing index",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_route_table_by_tag(tag_name, tag_value):
try:
client = boto3.client('ec2')
route_tables = client.describe_route_tables(
Filters=[{'Name': 'tag:{}'.format(tag_name), 'Values': ['{}'.format(tag_value)]}])
rt_id = route_tables.get('RouteTables')[0].get('RouteTableId')
return rt_id
except Exception as err:
logging.error("Error with getting Route table by tag: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Error with getting Route table by tag",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
@backoff.on_predicate(backoff.fibo, max_tries=4)
def get_ami_id(ami_name):
try:
client = boto3.client('ec2')
image_id = ''
response = client.describe_images(
Filters=[
{
'Name': 'name',
'Values': [ami_name]
},
{
'Name': 'virtualization-type', 'Values': ['hvm']
},
{
'Name': 'state', 'Values': ['available']
},
{
'Name': 'root-device-name', 'Values': ['/dev/sda1']
},
{
'Name': 'root-device-type', 'Values': ['ebs']
},
{
'Name': 'architecture', 'Values': ['x86_64']
}
])
response = response.get('Images')
for i in response:
image_id = i.get('ImageId')
if image_id == '':
raise Exception("Unable to find image id with name: " + ami_name)
return image_id
except Exception as err:
logging.error("Failed to find AMI: " + ami_name + " : " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to find AMI", "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_iam_profile(profile_name, count=0):
client = boto3.client('iam')
iam_profile = ''
try:
if count < 10:
response = client.get_instance_profile(InstanceProfileName=profile_name)
iam_profile = response.get('InstanceProfile').get('InstanceProfileName')
time.sleep(5)
print('IAM profile checked. Creating instance...')
else:
print("Unable to find IAM profile by name: {}".format(profile_name))
return False
except:
count += 1
print('IAM profile is not available yet. Waiting...')
time.sleep(5)
get_iam_profile(profile_name, count)
print(iam_profile)
return iam_profile
def check_security_group(security_group_name, count=0):
try:
ec2 = boto3.resource('ec2')
if count < 20:
for security_group in ec2.security_groups.filter(Filters=[{'Name': 'group-name', 'Values': [security_group_name]}]):
while security_group.id == '':
count = count + 1
time.sleep(10)
print("Security group is not available yet. Waiting...")
check_security_group(security_group_name, count)
if security_group.id == '':
raise Exception("Unable to check Security group by name: " + security_group_name)
return security_group.id
except Exception as err:
logging.error("Error with checking Security group by name: " + security_group_name + " : " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Error with checking Security group by name", "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def emr_waiter(tag_name):
if len(get_emr_list(tag_name, 'Value', False, True)) > 0 or os.path.exists('/response/.emr_creating_' + os.environ['exploratory_name'] or get_not_configured_emr(tag_name)):
with hide('stderr', 'running', 'warnings'):
local("echo 'Some EMR cluster is still being created/terminated, waiting..'")
time.sleep(60)
emr_waiter(tag_name)
else:
return True
def get_spark_version(cluster_name):
spark_version = ''
emr = boto3.client('emr')
clusters = emr.list_clusters(ClusterStates=['WAITING'])
clusters = clusters.get('Clusters')
for i in clusters:
response = emr.describe_cluster(ClusterId=i.get('Id'))
time.sleep(5)
if response.get("Cluster").get("Name") == cluster_name:
response = response.get("Cluster").get("Applications")
for j in response:
if j.get("Name") == 'Spark':
spark_version = j.get("Version")
return spark_version
def get_hadoop_version(cluster_name):
hadoop_version = ''
emr = boto3.client('emr')
clusters = emr.list_clusters(ClusterStates=['WAITING'])
clusters = clusters.get('Clusters')
for i in clusters:
response = emr.describe_cluster(ClusterId=i.get('Id'))
time.sleep(5)
if response.get("Cluster").get("Name") == cluster_name:
response = response.get("Cluster").get("Applications")
for j in response:
if j.get("Name") == 'Hadoop':
hadoop_version = j.get("Version")
return hadoop_version[0:3]
def get_instance_status(tag_name, instance_name):
client = boto3.client('ec2')
response = client.describe_instances(Filters=[
{'Name': 'tag:{}'.format(tag_name), 'Values': [instance_name]}]).get('Reservations')
for i in response:
if len(response) > 1:
inst = i.get('Instances')
for j in inst:
if j.get('State').get('Name') == 'running':
return j.get('State').get('Name')
else:
inst = i.get('Instances')
for j in inst:
return j.get('State').get('Name')
return 'not-running'
def get_list_instance_statuses(instance_ids):
data = []
client = boto3.client('ec2')
for h in instance_ids:
host = {}
try:
response = client.describe_instances(InstanceIds=[h.get('id')]).get('Reservations')
for i in response:
inst = i.get('Instances')
for j in inst:
host['id'] = j.get('InstanceId')
host['status'] = j.get('State').get('Name')
data.append(host)
except:
host['id'] = h.get('id')
host['status'] = 'terminated'
data.append(host)
return data
def get_list_cluster_statuses(cluster_ids, data=[]):
client = boto3.client('emr')
for i in cluster_ids:
host = {}
try:
response = client.describe_cluster(ClusterId=i.get('id')).get('Cluster')
host['id'] = i.get('id')
if response.get('Status').get('State').lower() == 'waiting':
host['status'] = 'running'
elif response.get('Status').get('State').lower() == 'running':
host['status'] = 'configuring'
else:
host['status'] = response.get('Status').get('State').lower()
data.append(host)
except:
host['id'] = i.get('id')
host['status'] = 'terminated'
data.append(host)
return data
def get_allocation_id_by_elastic_ip(elastic_ip):
try:
client = boto3.client('ec2')
response = client.describe_addresses(PublicIps=[elastic_ip]).get('Addresses')
for i in response:
return i.get('AllocationId')
except Exception as err:
logging.error("Error with getting allocation id by elastic ip: " + elastic_ip + " : " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Error with getting allocation id by elastic ip", "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_ec2_price(instance_shape, region):
try:
price = '0.001'
# Price API endpoints: us-east-1, ap-south-1
client = boto3.client('pricing', 'us-east-1')
# Price API require full name of region, for example: eu-west-1 -> 'EU (Ireland)'
# endpoints will be loaded from: botocore/botocore/data/endpoints.json
data = client._loader._cache.get(('load_data', 'endpoints'))
standard_partition = filter(lambda x: 'AWS Standard' == x['partitionName'], data['partitions'])[0]
region_description = standard_partition['regions'][region]['description']
response = client.get_products(
ServiceCode='AmazonEC2',
Filters=[
{
'Type': 'TERM_MATCH',
'Field': 'instanceType',
'Value': '{0}'.format(instance_shape)
},
{
'Type': 'TERM_MATCH',
'Field': 'operatingSystem',
'Value': 'Linux'
},
{
'Type': 'TERM_MATCH',
'Field': 'tenancy',
'Value': 'Shared'
},
{
'Type': 'TERM_MATCH',
'Field': 'location',
'Value': '{0}'.format(region_description)
},
{
'Type': 'TERM_MATCH',
'Field': 'preInstalledSw',
'Value': 'NA'
},
{
'Type': 'TERM_MATCH',
'Field': 'capacityStatus',
'Value': 'UnusedCapacityReservation'
}
],
FormatVersion='aws_v1',
NextToken='',
MaxResults=1
)
data = json.loads(response['PriceList'][0])
ondemand = data['terms']['OnDemand']
for offer in ondemand:
if (data['product']['sku'] in offer):
for i in ondemand[offer]['priceDimensions'].keys():
price = ondemand[offer]['priceDimensions'][i]['pricePerUnit']['USD']
return price
except Exception as err:
logging.error("Error with getting EC2 price: " + str(err) + "\n Traceback: " +
traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Error with getting EC2 price",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_spot_instances_status(cluster_id):
try:
ec2 = boto3.client('ec2')
emr = boto3.client('emr')
ec2_ids = emr.list_instances(ClusterId=cluster_id).get('Instances')
ids_list = []
for ins in ec2_ids:
ids_list.append(ins.get('Ec2InstanceId'))
response = ec2.describe_spot_instance_requests(Filters=[
{'Name': 'instance-id', 'Values': ids_list}]).get('SpotInstanceRequests')
if response:
for i in response:
if i.get('Status').get('Code') != 'fulfilled':
return False, i.get('Status').get('Code'), i.get('Status').get('Message')
return True, i.get('Status').get('Code'), "Spot instances have been successfully created!"
return False, None, "Spot instances status weren't received for cluster id {}".format(cluster_id)
except Exception as err:
logging.error("Error with getting Spot instances status: " + str(err) + "\n Traceback: " +
traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Error with getting Spot instances status",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def node_count(cluster_name):
try:
ec2 = boto3.client('ec2')
node_list = ec2.describe_instances(Filters=[
{'Name': 'instance-state-name', 'Values': ['running']},
{'Name': 'tag:Name', 'Values': [cluster_name + '-*']}]).get('Reservations')
result = len(node_list)
return result
except Exception as err:
logging.error("Error with counting nodes in cluster: " + str(err) + "\n Traceback: " +
traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Error with counting nodes in cluster",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_list_private_ip_by_conf_type_and_id(conf_type, instance_id):
try:
private_list_ip = []
if conf_type == 'edge_node':
private_list_ip.append(
get_instance_ip_address_by_id(
instance_id).get('Private'))
elif conf_type == 'exploratory':
private_list_ip.append(
get_instance_ip_address('Name', instance_id).get('Private'))
elif conf_type == 'computational_resource':
group_tag_name = os.environ['conf_service_base_name'] + ':' + instance_id
print(group_tag_name)
instance_list = get_ec2_list(os.environ['conf_tag_resource_id'], group_tag_name)
for instance in instance_list:
private_list_ip.append(
get_instance_ip_address_by_id(instance.id).get('Private'))
return private_list_ip
except Exception as err:
logging.error("Error getting private ip by conf_type and id: " + str(err) + "\n Traceback: " +
traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Error getting private ip by conf_type and id",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def format_sg(sg_rules):
try:
formatted_sg_rules = list()
for rule in sg_rules:
if rule['IpRanges']:
for ip_range in rule['IpRanges']:
formatted_rule = dict()
for key in rule.keys():
if key == 'IpRanges':
formatted_rule['IpRanges'] = [ip_range]
else:
formatted_rule[key] = rule[key]
if formatted_rule not in formatted_sg_rules:
formatted_sg_rules.append(formatted_rule)
else:
formatted_sg_rules.append(rule)
return formatted_sg_rules
except Exception as err:
logging.error("Error with formatting SG rules: " + str(err) + "\n Traceback: " +
traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Error with formatting SG rules",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)