| # ***************************************************************************** |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| # ****************************************************************************** |
| |
| import boto3 |
| import botocore |
| from botocore.client import Config |
| import backoff |
| from botocore.exceptions import ClientError |
| import time |
| import sys |
| import os |
| import json |
| from fabric.api import * |
| from fabric.contrib.files import exists |
| import logging |
| from dlab.meta_lib import * |
| from dlab.fab import * |
| import traceback |
| import urllib2 |
| import meta_lib |
| import dlab.fab |
| import uuid |
| import ast |
| |
| |
| def backoff_log(err): |
| logging.info("Unable to create Tag: " + \ |
| str(err) + "\n Traceback: " + \ |
| traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to create Tag", \ |
| "error_message": str(err) + "\n Traceback: " + \ |
| traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def put_to_bucket(bucket_name, local_file, destination_file): |
| try: |
| s3 = boto3.client('s3', config=Config(signature_version='s3v4'), region_name=os.environ['aws_region']) |
| with open(local_file, 'rb') as data: |
| s3.upload_fileobj(data, bucket_name, destination_file, ExtraArgs={'ServerSideEncryption': 'AES256'}) |
| return True |
| except Exception as err: |
| logging.info("Unable to upload files to S3 bucket: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to upload files to S3 bucket", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| return False |
| |
| |
| def create_s3_bucket(bucket_name, bucket_tags, region, bucket_name_tag): |
| try: |
| s3 = boto3.resource('s3', config=Config(signature_version='s3v4')) |
| if region == "us-east-1": |
| bucket = s3.create_bucket(Bucket=bucket_name) |
| else: |
| bucket = s3.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={'LocationConstraint': region}) |
| boto3.client('s3', config=Config(signature_version='s3v4')).put_bucket_encryption( |
| Bucket=bucket_name, ServerSideEncryptionConfiguration={ |
| 'Rules': [ |
| { |
| 'ApplyServerSideEncryptionByDefault': { |
| 'SSEAlgorithm': 'AES256' |
| } |
| }, |
| ] |
| }) |
| tags = list() |
| tags.append({'Key': os.environ['conf_tag_resource_id'], |
| 'Value': os.environ['conf_service_base_name'] + ':' + bucket_name_tag}) |
| for tag in bucket_tags.split(','): |
| tags.append( |
| { |
| 'Key': tag.split(':')[0], |
| 'Value': tag.split(':')[1] |
| } |
| ) |
| tagging = bucket.Tagging() |
| tagging.put(Tagging={'TagSet': tags}) |
| tagging.reload() |
| return bucket.name |
| except Exception as err: |
| logging.info("Unable to create S3 bucket: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to create S3 bucket", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def create_vpc(vpc_cidr, tag): |
| try: |
| ec2 = boto3.resource('ec2') |
| vpc = ec2.create_vpc(CidrBlock=vpc_cidr) |
| create_tag(vpc.id, tag) |
| return vpc.id |
| except Exception as err: |
| logging.info("Unable to create VPC: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to create VPC", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def enable_vpc_dns(vpc_id): |
| try: |
| client = boto3.client('ec2') |
| client.modify_vpc_attribute(VpcId=vpc_id, |
| EnableDnsHostnames={'Value': True}) |
| except Exception as err: |
| logging.info("Unable to modify VPC attributes: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to modify VPC attributes", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def remove_vpc(vpc_id): |
| try: |
| client = boto3.client('ec2') |
| client.delete_vpc(VpcId=vpc_id) |
| print("VPC {} has been removed".format(vpc_id)) |
| except Exception as err: |
| logging.info("Unable to remove VPC: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to remove VPC", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| @backoff.on_exception(backoff.expo, |
| botocore.exceptions.ClientError, |
| max_tries=40, |
| on_giveup=backoff_log) |
| def create_tag(resource, tag, with_tag_res_id=True): |
| print('Tags for the resource {} will be created'.format(resource)) |
| tags_list = list() |
| ec2 = boto3.client('ec2') |
| if type(tag) == dict: |
| resource_name = tag.get('Value') |
| resource_tag = tag |
| else: |
| resource_name = json.loads(tag).get('Value') |
| resource_tag = json.loads(tag) |
| if type(resource) != list: |
| resource = [resource] |
| tags_list.append(resource_tag) |
| if with_tag_res_id: |
| tags_list.append( |
| { |
| 'Key': os.environ['conf_tag_resource_id'], |
| 'Value': os.environ['conf_service_base_name'] + ':' + resource_name |
| } |
| ) |
| tags_list.append( |
| { |
| 'Key': os.environ['conf_billing_tag_key'], |
| 'Value': os.environ['conf_billing_tag_value'] |
| } |
| ) |
| if 'conf_additional_tags' in os.environ: |
| for tag in os.environ['conf_additional_tags'].split(';'): |
| tags_list.append( |
| { |
| 'Key': tag.split(':')[0], |
| 'Value': tag.split(':')[1] |
| } |
| ) |
| ec2.create_tags( |
| Resources=resource, |
| Tags=tags_list |
| ) |
| |
| |
| def remove_emr_tag(emr_id, tag): |
| try: |
| emr = boto3.client('emr') |
| emr.remove_tags(ResourceId=emr_id, TagKeys=tag) |
| except Exception as err: |
| logging.info("Unable to remove Tag: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to remove Tag", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def create_rt(vpc_id, infra_tag_name, infra_tag_value, secondary): |
| try: |
| tag = {"Key": infra_tag_name, "Value": infra_tag_value} |
| route_table = [] |
| ec2 = boto3.client('ec2') |
| rt = ec2.create_route_table(VpcId=vpc_id) |
| rt_id = rt.get('RouteTable').get('RouteTableId') |
| route_table.append(rt_id) |
| print('Created Route-Table with ID: {}'.format(rt_id)) |
| create_tag(route_table, json.dumps(tag)) |
| if not secondary: |
| ig = ec2.create_internet_gateway() |
| ig_id = ig.get('InternetGateway').get('InternetGatewayId') |
| route_table = [] |
| route_table.append(ig_id) |
| create_tag(route_table, json.dumps(tag)) |
| ec2.attach_internet_gateway(InternetGatewayId=ig_id, VpcId=vpc_id) |
| ec2.create_route(DestinationCidrBlock='0.0.0.0/0', RouteTableId=rt_id, GatewayId=ig_id) |
| return rt_id |
| except Exception as err: |
| logging.info( |
| "Unable to create Route Table: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to create Route Table", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def create_subnet(vpc_id, subnet, tag, zone): |
| try: |
| ec2 = boto3.resource('ec2') |
| if zone != "": |
| subnet = ec2.create_subnet(VpcId=vpc_id, CidrBlock=subnet, AvailabilityZone=zone) |
| else: |
| subnet = ec2.create_subnet(VpcId=vpc_id, CidrBlock=subnet) |
| create_tag(subnet.id, tag) |
| subnet.reload() |
| return subnet.id |
| except Exception as err: |
| logging.info("Unable to create Subnet: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to create Subnet", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def create_security_group(security_group_name, vpc_id, security_group_rules, egress, tag): |
| try: |
| ec2 = boto3.resource('ec2') |
| tag_name = {"Key": "Name", "Value": security_group_name} |
| group = ec2.create_security_group(GroupName=security_group_name, Description='security_group_name', |
| VpcId=vpc_id) |
| time.sleep(10) |
| create_tag(group.id, tag) |
| create_tag(group.id, tag_name) |
| if 'conf_billing_tag_key' in os.environ and 'conf_billing_tag_value' in os.environ: |
| create_tag(group.id, {'Key': os.environ['conf_billing_tag_key'], |
| 'Value': os.environ['conf_billing_tag_value']}) |
| try: |
| group.revoke_egress(IpPermissions=[{"IpProtocol": "-1", "IpRanges": [{"CidrIp": "0.0.0.0/0"}], |
| "UserIdGroupPairs": [], "PrefixListIds": []}]) |
| except: |
| print("Mentioned rule does not exist") |
| for rule in security_group_rules: |
| group.authorize_ingress(IpPermissions=[rule]) |
| for rule in egress: |
| group.authorize_egress(IpPermissions=[rule]) |
| return group.id |
| except Exception as err: |
| logging.info( |
| "Unable to create security group: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to create security group", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def create_route_by_id(subnet_id, vpc_id, peering_id, another_cidr): |
| client = boto3.client('ec2') |
| try: |
| table_id = client.describe_route_tables(Filters=[{'Name': 'association.subnet-id', 'Values': [subnet_id]}]).get( |
| 'RouteTables') |
| if table_id: |
| final_id = table_id[0]['Associations'][0]['RouteTableId'] |
| else: |
| table_id = client.describe_route_tables( |
| Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}, {'Name': 'association.main', 'Values': ['true']}]).get( |
| 'RouteTables') |
| final_id = table_id[0]['Associations'][0]['RouteTableId'] |
| for table in table_id: |
| routes = table.get('Routes') |
| routeExists = False |
| for route in routes: |
| if route.get('DestinationCidrBlock') == another_cidr: |
| routeExists = True |
| if not routeExists: |
| client.create_route( |
| DestinationCidrBlock=another_cidr, |
| VpcPeeringConnectionId=peering_id, |
| RouteTableId=final_id) |
| except Exception as err: |
| logging.info("Unable to create route: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to create route", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def create_peer_routes(peering_id, service_base_name): |
| client = boto3.client('ec2') |
| try: |
| route_tables = client.describe_route_tables( |
| Filters=[{'Name': 'tag:{}-tag'.format(service_base_name), 'Values': ['{}'.format( |
| service_base_name)]}]).get('RouteTables') |
| route_tables2 = client.describe_route_tables(Filters=[ |
| {'Name': 'tag:{}-secondary-tag'.format(service_base_name), 'Values': ['{}'.format( |
| service_base_name)]}]).get('RouteTables') |
| for table in route_tables: |
| routes = table.get('Routes') |
| routeExists = False |
| for route in routes: |
| if route.get('DestinationCidrBlock') == os.environ['conf_vpc2_cidr'].replace("'", ""): |
| routeExists = True |
| if not routeExists: |
| client.create_route( |
| DestinationCidrBlock=os.environ['conf_vpc2_cidr'].replace("'", ""), |
| VpcPeeringConnectionId=peering_id, |
| RouteTableId=table.get('RouteTableId')) |
| for table in route_tables2: |
| routes = table.get('Routes') |
| routeExists = False |
| for route in routes: |
| if route.get('DestinationCidrBlock') == os.environ['conf_vpc_cidr'].replace("'", ""): |
| routeExists = True |
| if not routeExists: |
| client.create_route( |
| DestinationCidrBlock=os.environ['conf_vpc_cidr'].replace("'", ""), |
| VpcPeeringConnectionId=peering_id, |
| RouteTableId=table.get('RouteTableId')) |
| except Exception as err: |
| logging.info("Unable to create route: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to create route", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def create_peering_connection(vpc_id, vpc2_id, service_base_name): |
| try: |
| ec2 = boto3.resource('ec2') |
| client = boto3.client('ec2') |
| tag = {"Key": service_base_name + '-tag', "Value": service_base_name} |
| tag_name = {"Key": 'Name', "Value": "{0}-peering-connection".format(service_base_name)} |
| peering = ec2.create_vpc_peering_connection(PeerVpcId=vpc_id, VpcId=vpc2_id) |
| client.accept_vpc_peering_connection(VpcPeeringConnectionId=peering.id) |
| client.modify_vpc_peering_connection_options( |
| AccepterPeeringConnectionOptions={ |
| 'AllowDnsResolutionFromRemoteVpc': True, |
| }, |
| RequesterPeeringConnectionOptions={ |
| 'AllowDnsResolutionFromRemoteVpc': True, |
| }, |
| VpcPeeringConnectionId=peering.id |
| ) |
| create_tag(peering.id, json.dumps(tag)) |
| create_tag(peering.id, json.dumps(tag_name)) |
| return peering.id |
| except Exception as err: |
| logging.info("Unable to create peering connection: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to create peering connection", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def enable_auto_assign_ip(subnet_id): |
| try: |
| client = boto3.client('ec2') |
| client.modify_subnet_attribute(MapPublicIpOnLaunch={'Value': True}, SubnetId=subnet_id) |
| except Exception as err: |
| logging.info("Unable to create Subnet: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to create Subnet", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def create_instance(definitions, instance_tag, primary_disk_size=12): |
| try: |
| ec2 = boto3.resource('ec2') |
| security_groups_ids = [] |
| for chunk in definitions.security_group_ids.split(','): |
| security_groups_ids.append(chunk.strip()) |
| user_data = '' |
| if definitions.user_data_file != '': |
| try: |
| with open(definitions.user_data_file, 'r') as f: |
| for line in f: |
| user_data = user_data + line |
| f.close() |
| except: |
| print("Error reading user-data file") |
| if definitions.instance_class == 'notebook': |
| instances = ec2.create_instances(ImageId=definitions.ami_id, MinCount=1, MaxCount=1, |
| BlockDeviceMappings=[ |
| { |
| "DeviceName": "/dev/sda1", |
| "Ebs": |
| { |
| "VolumeSize": int(primary_disk_size) |
| } |
| }, |
| { |
| "DeviceName": "/dev/sdb", |
| "Ebs": |
| { |
| "VolumeSize": int(definitions.instance_disk_size) |
| } |
| }], |
| KeyName=definitions.key_name, |
| SecurityGroupIds=security_groups_ids, |
| InstanceType=definitions.instance_type, |
| SubnetId=definitions.subnet_id, |
| IamInstanceProfile={'Name': definitions.iam_profile}, |
| UserData=user_data) |
| elif definitions.instance_class == 'dataengine': |
| instances = ec2.create_instances(ImageId=definitions.ami_id, MinCount=1, MaxCount=1, |
| BlockDeviceMappings=[ |
| { |
| "DeviceName": "/dev/sda1", |
| "Ebs": |
| { |
| "VolumeSize": int(primary_disk_size) |
| } |
| }], |
| KeyName=definitions.key_name, |
| SecurityGroupIds=security_groups_ids, |
| InstanceType=definitions.instance_type, |
| SubnetId=definitions.subnet_id, |
| IamInstanceProfile={'Name': definitions.iam_profile}, |
| UserData=user_data) |
| elif definitions.instance_class == 'ssn': |
| get_iam_profile(definitions.iam_profile) |
| instances = ec2.create_instances(ImageId=definitions.ami_id, MinCount=1, MaxCount=1, |
| BlockDeviceMappings=[ |
| { |
| "DeviceName": "/dev/sda1", |
| "Ebs": |
| { |
| "VolumeSize": int(primary_disk_size) |
| } |
| }], |
| KeyName=definitions.key_name, |
| SecurityGroupIds=security_groups_ids, |
| InstanceType=definitions.instance_type, |
| SubnetId=definitions.subnet_id, |
| IamInstanceProfile={'Name': definitions.iam_profile}, |
| UserData=user_data) |
| else: |
| get_iam_profile(definitions.iam_profile) |
| instances = ec2.create_instances(ImageId=definitions.ami_id, MinCount=1, MaxCount=1, |
| KeyName=definitions.key_name, |
| SecurityGroupIds=security_groups_ids, |
| InstanceType=definitions.instance_type, |
| SubnetId=definitions.subnet_id, |
| IamInstanceProfile={'Name': definitions.iam_profile}, |
| UserData=user_data) |
| for instance in instances: |
| print("Waiting for instance {} become running.".format(instance.id)) |
| instance.wait_until_running() |
| tag = {'Key': 'Name', 'Value': definitions.node_name} |
| create_tag(instance.id, tag) |
| create_tag(instance.id, instance_tag) |
| tag_intance_volume(instance.id, definitions.node_name, instance_tag) |
| return instance.id |
| return '' |
| except Exception as err: |
| logging.info("Unable to create EC2: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to create EC2", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def tag_intance_volume(instance_id, node_name, instance_tag): |
| try: |
| print('volume tagging') |
| volume_list = meta_lib.get_instance_attr(instance_id, 'block_device_mappings') |
| counter = 0 |
| instance_tag_value = instance_tag.get('Value') |
| for volume in volume_list: |
| if counter == 1: |
| volume_postfix = '-volume-secondary' |
| else: |
| volume_postfix = '-volume-primary' |
| tag = {'Key': 'Name', |
| 'Value': node_name + volume_postfix} |
| volume_tag = instance_tag |
| volume_tag['Value'] = instance_tag_value + volume_postfix |
| volume_id = volume.get('Ebs').get('VolumeId') |
| create_tag(volume_id, tag) |
| create_tag(volume_id, volume_tag) |
| counter += 1 |
| except Exception as err: |
| logging.info( |
| "Unable to tag volumes: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to tag volumes", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def tag_emr_volume(cluster_id, node_name, billing_tag): |
| try: |
| client = boto3.client('emr') |
| cluster = client.list_instances(ClusterId=cluster_id) |
| instances = cluster['Instances'] |
| for instance in instances: |
| instance_tag = {'Key': os.environ['conf_service_base_name'] + '-tag', |
| 'Value': node_name} |
| tag_intance_volume(instance['Ec2InstanceId'], node_name, instance_tag) |
| except Exception as err: |
| logging.info( |
| "Unable to tag emr volumes: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to tag emr volumes", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def create_iam_role(role_name, role_profile, region, service='ec2', tag=None, user_tag=None): |
| conn = boto3.client('iam') |
| try: |
| if region == 'cn-north-1': |
| conn.create_role( |
| RoleName=role_name, |
| AssumeRolePolicyDocument= |
| '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"Service":["' + service + |
| '.amazonaws.com.cn"]},"Action":["sts:AssumeRole"]}]}') |
| else: |
| conn.create_role( |
| RoleName=role_name, AssumeRolePolicyDocument= |
| '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"Service":["' + service + |
| '.amazonaws.com"]},"Action":["sts:AssumeRole"]}]}') |
| if tag: |
| conn.tag_role(RoleName=role_name, Tags=[tag]) |
| conn.tag_role(RoleName=role_name, Tags=[{"Key": "Name", "Value": role_name}]) |
| if user_tag: |
| conn.tag_role(RoleName=role_name, Tags=[user_tag]) |
| if 'conf_billing_tag_key' in os.environ and 'conf_billing_tag_value' in os.environ: |
| conn.tag_role(RoleName=role_name, Tags=[{'Key': os.environ['conf_billing_tag_key'], |
| 'Value': os.environ['conf_billing_tag_value']}]) |
| if 'project_name' in os.environ: |
| conn.tag_role(RoleName=role_name, Tags=[{'Key': "project_tag", |
| 'Value': os.environ['project_name']}]) |
| if 'endpoint_name' in os.environ: |
| conn.tag_role(RoleName=role_name, Tags=[{'Key': "endpoint_tag", |
| 'Value': os.environ['endpoint_name']}]) |
| except botocore.exceptions.ClientError as e_role: |
| if e_role.response['Error']['Code'] == 'EntityAlreadyExists': |
| print("IAM role already exists. Reusing...") |
| else: |
| logging.info("Unable to create IAM role: " + str(e_role.response['Error']['Message']) + |
| "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to create IAM role", |
| "error_message": str(e_role.response['Error']['Message']) + "\n Traceback: " + |
| traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| return |
| if service == 'ec2': |
| try: |
| conn.create_instance_profile(InstanceProfileName=role_profile) |
| waiter = conn.get_waiter('instance_profile_exists') |
| waiter.wait(InstanceProfileName=role_profile) |
| except botocore.exceptions.ClientError as e_profile: |
| if e_profile.response['Error']['Code'] == 'EntityAlreadyExists': |
| print("Instance profile already exists. Reusing...") |
| else: |
| logging.info("Unable to create Instance Profile: " + str(e_profile.response['Error']['Message']) + |
| "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to create Instance Profile", |
| "error_message": str(e_profile.response['Error']['Message']) + "\n Traceback: " + |
| traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| return |
| try: |
| conn.add_role_to_instance_profile(InstanceProfileName=role_profile, RoleName=role_name) |
| time.sleep(30) |
| except botocore.exceptions.ClientError as err: |
| logging.info("Unable to add IAM role to instance profile: " + str(err.response['Error']['Message']) + |
| "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to add IAM role to instance profile", |
| "error_message": str(err.response['Error']['Message']) + "\n Traceback: " + |
| traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def attach_policy(role_name, policy_arn): |
| try: |
| conn = boto3.client('iam') |
| conn.attach_role_policy(PolicyArn=policy_arn, RoleName=role_name) |
| time.sleep(30) |
| except botocore.exceptions.ClientError as err: |
| logging.info("Unable to attach Policy: " + str(err.response['Error']['Message']) + "\n Traceback: " + |
| traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to attach Policy", |
| "error_message": str(err.response['Error']['Message']) + "\n Traceback: " + |
| traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def create_attach_policy(policy_name, role_name, file_path): |
| try: |
| conn = boto3.client('iam') |
| with open(file_path, 'r') as myfile: |
| json_file = myfile.read() |
| conn.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=json_file) |
| except Exception as err: |
| logging.info("Unable to attach Policy: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to attach Policy", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def create_route_53_record(hosted_zone_id, hosted_zone_name, subdomain, ip_address): |
| try: |
| if 'ssn_assume_role_arn' in os.environ: |
| role_session_name = str(uuid.uuid4()).split('-')[0] |
| sts_client = boto3.client('sts') |
| credentials = sts_client.assume_role( |
| RoleArn=os.environ['ssn_assume_role_arn'], |
| RoleSessionName=role_session_name |
| ).get('Credentials') |
| route53_client = boto3.client('route53', |
| aws_access_key_id=credentials.get('AccessKeyId'), |
| aws_secret_access_key=credentials.get('SecretAccessKey'), |
| aws_session_token=credentials.get('SessionToken') |
| ) |
| else: |
| route53_client = boto3.client('route53') |
| route53_client.change_resource_record_sets( |
| HostedZoneId=hosted_zone_id, |
| ChangeBatch={ |
| 'Changes': [ |
| { |
| 'Action': 'CREATE', |
| 'ResourceRecordSet': { |
| 'Name': "{}.{}".format(subdomain, hosted_zone_name), |
| 'Type': 'A', |
| 'TTL': 300, |
| 'ResourceRecords': [ |
| { |
| 'Value': ip_address |
| } |
| ] |
| } |
| } |
| ] |
| } |
| ) |
| except Exception as err: |
| logging.info("Unable to create Route53 record: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to create Route53 record", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def remove_route_53_record(hosted_zone_id, hosted_zone_name, subdomain): |
| try: |
| if 'ssn_assume_role_arn' in os.environ: |
| role_session_name = str(uuid.uuid4()).split('-')[0] |
| sts_client = boto3.client('sts') |
| credentials = sts_client.assume_role( |
| RoleArn=os.environ['ssn_assume_role_arn'], |
| RoleSessionName=role_session_name |
| ).get('Credentials') |
| route53_client = boto3.client('route53', |
| aws_access_key_id=credentials.get('AccessKeyId'), |
| aws_secret_access_key=credentials.get('SecretAccessKey'), |
| aws_session_token=credentials.get('SessionToken') |
| ) |
| else: |
| route53_client = boto3.client('route53') |
| for record_set in route53_client.list_resource_record_sets( |
| HostedZoneId=hosted_zone_id).get('ResourceRecordSets'): |
| if record_set['Name'] == "{}.{}.".format(subdomain, hosted_zone_name): |
| for record in record_set['ResourceRecords']: |
| route53_client.change_resource_record_sets( |
| HostedZoneId=hosted_zone_id, |
| ChangeBatch={ |
| 'Changes': [ |
| { |
| 'Action': 'DELETE', |
| 'ResourceRecordSet': { |
| 'Name': record_set['Name'], |
| 'Type': 'A', |
| 'TTL': 300, |
| 'ResourceRecords': [ |
| { |
| 'Value': record['Value'] |
| } |
| ] |
| } |
| } |
| ] |
| } |
| ) |
| except Exception as err: |
| logging.info("Unable to remove Route53 record: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to remove Route53 record", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def allocate_elastic_ip(): |
| try: |
| client = boto3.client('ec2') |
| response = client.allocate_address(Domain='vpc') |
| return response.get('AllocationId') |
| except Exception as err: |
| logging.info("Unable to allocate Elastic IP: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to allocate Elastic IP", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def release_elastic_ip(allocation_id): |
| try: |
| client = boto3.client('ec2') |
| client.release_address(AllocationId=allocation_id) |
| except Exception as err: |
| logging.info("Unable to release Elastic IP: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to release Elastic IP", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def associate_elastic_ip(instance_id, allocation_id): |
| try: |
| client = boto3.client('ec2') |
| response = client.associate_address(InstanceId=instance_id, AllocationId=allocation_id) |
| return response.get('AssociationId') |
| except Exception as err: |
| logging.info("Unable to associate Elastic IP: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to associate Elastic IP", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def disassociate_elastic_ip(association_id): |
| try: |
| client = boto3.client('ec2') |
| client.disassociate_address(AssociationId=association_id) |
| except Exception as err: |
| logging.info("Unable to disassociate Elastic IP: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to disassociate Elastic IP", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def remove_ec2(tag_name, tag_value): |
| try: |
| ec2 = boto3.resource('ec2') |
| client = boto3.client('ec2') |
| association_id = '' |
| allocation_id = '' |
| inst = ec2.instances.filter( |
| Filters=[{'Name': 'instance-state-name', 'Values': ['running', 'stopped', 'pending', 'stopping']}, |
| {'Name': 'tag:{}'.format(tag_name), 'Values': ['{}'.format(tag_value)]}]) |
| instances = list(inst) |
| if instances: |
| for instance in instances: |
| try: |
| response = client.describe_instances(InstanceIds=[instance.id]) |
| for i in response.get('Reservations'): |
| for h in i.get('Instances'): |
| elastic_ip = h.get('PublicIpAddress') |
| try: |
| response = client.describe_addresses(PublicIps=[elastic_ip]).get('Addresses') |
| for el_ip in response: |
| allocation_id = el_ip.get('AllocationId') |
| association_id = el_ip.get('AssociationId') |
| disassociate_elastic_ip(association_id) |
| release_elastic_ip(allocation_id) |
| print("Releasing Elastic IP: {}".format(elastic_ip)) |
| except: |
| print("There is no such Elastic IP: {}".format(elastic_ip)) |
| except Exception as err: |
| print(err) |
| print("There is no Elastic IP to disassociate from instance: {}".format(instance.id)) |
| client.terminate_instances(InstanceIds=[instance.id]) |
| waiter = client.get_waiter('instance_terminated') |
| waiter.wait(InstanceIds=[instance.id]) |
| print("The instance {} has been terminated successfully".format(instance.id)) |
| else: |
| print("There are no instances with '{}' tag to terminate".format(tag_name)) |
| except Exception as err: |
| logging.info("Unable to remove EC2: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to EC2", "error_message": str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def stop_ec2(tag_name, tag_value): |
| try: |
| ec2 = boto3.resource('ec2') |
| client = boto3.client('ec2') |
| inst = ec2.instances.filter( |
| Filters=[{'Name': 'instance-state-name', 'Values': ['running', 'pending']}, |
| {'Name': 'tag:{}'.format(tag_name), 'Values': ['{}'.format(tag_value)]}]) |
| instances = list(inst) |
| if instances: |
| id_instances = list() |
| for instance in instances: |
| id_instances.append(instance.id) |
| client.stop_instances(InstanceIds=id_instances) |
| waiter = client.get_waiter('instance_stopped') |
| waiter.wait(InstanceIds=id_instances) |
| print("The instances {} have been stopped successfully".format(id_instances)) |
| else: |
| print("There are no instances with {} name to stop".format(tag_value)) |
| except Exception as err: |
| logging.info("Unable to stop EC2: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to stop EC2", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def start_ec2(tag_name, tag_value): |
| try: |
| ec2 = boto3.resource('ec2') |
| client = boto3.client('ec2') |
| inst = ec2.instances.filter( |
| Filters=[{'Name': 'instance-state-name', 'Values': ['stopped']}, |
| {'Name': 'tag:{}'.format(tag_name), 'Values': ['{}'.format(tag_value)]}]) |
| instances = list(inst) |
| if instances: |
| id_instances = list() |
| for instance in instances: |
| id_instances.append(instance.id) |
| client.start_instances(InstanceIds=id_instances) |
| waiter = client.get_waiter('instance_status_ok') |
| waiter.wait(InstanceIds=id_instances) |
| print("The instances {} have been started successfully".format(id_instances)) |
| else: |
| print("There are no instances with {} name to start".format(tag_value)) |
| except Exception as err: |
| logging.info("Unable to start EC2: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to start EC2", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def remove_detach_iam_policies(role_name, action=''): |
| client = boto3.client('iam') |
| service_base_name = os.environ['conf_service_base_name'] |
| try: |
| policy_list = client.list_attached_role_policies(RoleName=role_name).get('AttachedPolicies') |
| for i in policy_list: |
| policy_arn = i.get('PolicyArn') |
| client.detach_role_policy(RoleName=role_name, PolicyArn=policy_arn) |
| print("The IAM policy {} has been detached successfully".format(policy_arn)) |
| if action == 'delete' and service_base_name in i.get('PolicyName'): |
| client.delete_policy(PolicyArn=policy_arn) |
| print("The IAM policy {} has been deleted successfully".format(policy_arn)) |
| except Exception as err: |
| logging.info("Unable to remove/detach IAM policy: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to remove/detach IAM policy", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def remove_roles_and_profiles(role_name, role_profile_name): |
| client = boto3.client('iam') |
| try: |
| client.remove_role_from_instance_profile(InstanceProfileName=role_profile_name, RoleName=role_name) |
| client.delete_instance_profile(InstanceProfileName=role_profile_name) |
| client.delete_role(RoleName=role_name) |
| print("The IAM role {0} and instance profile {1} have been deleted successfully".format(role_name, |
| role_profile_name)) |
| except Exception as err: |
| logging.info("Unable to remove IAM role/profile: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to remove IAM role/profile", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def remove_all_iam_resources(instance_type, project_name='', endpoint_name=''): |
| try: |
| client = boto3.client('iam') |
| service_base_name = os.environ['conf_service_base_name'] |
| roles_list = [] |
| if project_name: |
| start_prefix = '{}-{}-{}-'.format(service_base_name, project_name, endpoint_name) |
| else: |
| start_prefix = '{}-'.format(service_base_name) |
| for item in client.list_roles(MaxItems=250).get("Roles"): |
| if item.get("RoleName").startswith(start_prefix): |
| roles_list.append(item.get('RoleName')) |
| if roles_list: |
| roles_list.sort(reverse=True) |
| for iam_role in roles_list: |
| if '-ssn-role' in iam_role and instance_type == 'ssn' or instance_type == 'all': |
| try: |
| client.delete_role_policy(RoleName=iam_role, PolicyName='{0}-ssn-policy'.format( |
| service_base_name)) |
| except: |
| print('There is no policy {}-ssn-policy to delete'.format(service_base_name)) |
| role_profiles = client.list_instance_profiles_for_role(RoleName=iam_role).get('InstanceProfiles') |
| if role_profiles: |
| for i in role_profiles: |
| role_profile_name = i.get('InstanceProfileName') |
| if role_profile_name == '{0}-ssn-profile'.format(service_base_name): |
| remove_roles_and_profiles(iam_role, role_profile_name) |
| else: |
| print("There is no instance profile for {}".format(iam_role)) |
| client.delete_role(RoleName=iam_role) |
| print("The IAM role {} has been deleted successfully".format(iam_role)) |
| if '-edge-role' in iam_role: |
| if instance_type == 'edge' and project_name in iam_role: |
| remove_detach_iam_policies(iam_role, 'delete') |
| role_profile_name = '{0}-{1}-{2}-edge-profile'.format(service_base_name, project_name, |
| os.environ['endpoint_name'].lower()) |
| try: |
| client.get_instance_profile(InstanceProfileName=role_profile_name) |
| remove_roles_and_profiles(iam_role, role_profile_name) |
| except: |
| print("There is no instance profile for {}".format(iam_role)) |
| client.delete_role(RoleName=iam_role) |
| print("The IAM role {} has been deleted successfully".format(iam_role)) |
| if instance_type == 'all': |
| remove_detach_iam_policies(iam_role, 'delete') |
| role_profile_name = client.list_instance_profiles_for_role( |
| RoleName=iam_role).get('InstanceProfiles') |
| if role_profile_name: |
| for i in role_profile_name: |
| role_profile_name = i.get('InstanceProfileName') |
| remove_roles_and_profiles(iam_role, role_profile_name) |
| else: |
| print("There is no instance profile for {}".format(iam_role)) |
| client.delete_role(RoleName=iam_role) |
| print("The IAM role {} has been deleted successfully".format(iam_role)) |
| if '-nb-de-role' in iam_role: |
| if instance_type == 'notebook' and project_name in iam_role: |
| remove_detach_iam_policies(iam_role) |
| role_profile_name = '{0}-{1}-{2}-nb-de-profile'.format(service_base_name, project_name, |
| os.environ['endpoint_name'].lower()) |
| try: |
| client.get_instance_profile(InstanceProfileName=role_profile_name) |
| remove_roles_and_profiles(iam_role, role_profile_name) |
| except: |
| print("There is no instance profile for {}".format(iam_role)) |
| client.delete_role(RoleName=iam_role) |
| print("The IAM role {} has been deleted successfully".format(iam_role)) |
| if instance_type == 'all': |
| remove_detach_iam_policies(iam_role) |
| role_profile_name = client.list_instance_profiles_for_role( |
| RoleName=iam_role).get('InstanceProfiles') |
| if role_profile_name: |
| for i in role_profile_name: |
| role_profile_name = i.get('InstanceProfileName') |
| remove_roles_and_profiles(iam_role, role_profile_name) |
| else: |
| print("There is no instance profile for {}".format(iam_role)) |
| client.delete_role(RoleName=iam_role) |
| print("The IAM role {} has been deleted successfully".format(iam_role)) |
| else: |
| print("There are no IAM roles to delete. Checking instance profiles...") |
| profile_list = [] |
| for item in client.list_instance_profiles(MaxItems=250).get("InstanceProfiles"): |
| if item.get("InstanceProfileName").startswith(start_prefix): |
| profile_list.append(item.get('InstanceProfileName')) |
| if profile_list: |
| for instance_profile in profile_list: |
| if '-ssn-profile' in instance_profile and instance_type == 'ssn' or instance_type == 'all': |
| client.delete_instance_profile(InstanceProfileName=instance_profile) |
| print("The instance profile {} has been deleted successfully".format(instance_profile)) |
| if '-edge-profile' in instance_profile: |
| if instance_type == 'edge' and project_name in instance_profile: |
| client.delete_instance_profile(InstanceProfileName=instance_profile) |
| print("The instance profile {} has been deleted successfully".format(instance_profile)) |
| if instance_type == 'all': |
| client.delete_instance_profile(InstanceProfileName=instance_profile) |
| print("The instance profile {} has been deleted successfully".format(instance_profile)) |
| if '-nb-de-profile' in instance_profile: |
| if instance_type == 'notebook' and project_name in instance_profile: |
| client.delete_instance_profile(InstanceProfileName=instance_profile) |
| print("The instance profile {} has been deleted successfully".format(instance_profile)) |
| if instance_type == 'all': |
| client.delete_instance_profile(InstanceProfileName=instance_profile) |
| print("The instance profile {} has been deleted successfully".format(instance_profile)) |
| else: |
| print("There are no instance profiles to delete") |
| except Exception as err: |
| logging.info("Unable to remove some of the IAM resources: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to remove some of the IAM resources", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def s3_cleanup(bucket, cluster_name, user_name): |
| s3_res = boto3.resource('s3', config=Config(signature_version='s3v4')) |
| client = boto3.client('s3', config=Config(signature_version='s3v4'), region_name=os.environ['aws_region']) |
| try: |
| client.head_bucket(Bucket=bucket) |
| except: |
| print("There is no bucket {} or you do not permission to access it".format(bucket)) |
| sys.exit(0) |
| try: |
| resource = s3_res.Bucket(bucket) |
| prefix = user_name + '/' + cluster_name + "/" |
| for i in resource.objects.filter(Prefix=prefix): |
| s3_res.Object(resource.name, i.key).delete() |
| except Exception as err: |
| logging.info("Unable to clean S3 bucket: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to clean S3 bucket", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def remove_s3(bucket_type='all', scientist=''): |
| try: |
| client = boto3.client('s3', config=Config(signature_version='s3v4'), region_name=os.environ['aws_region']) |
| s3 = boto3.resource('s3') |
| bucket_list = [] |
| if bucket_type == 'ssn': |
| bucket_name = (os.environ['conf_service_base_name'] + '-ssn-bucket').lower().replace('_', '-') |
| bucket_list.append(('{0}-{1}-shared-bucket'.format(os.environ['conf_service_base_name'], |
| os.environ['default_endpoint_name'])).lower().replace('_', '-')) |
| elif bucket_type == 'edge': |
| bucket_name = (os.environ['conf_service_base_name'] + '-' + "{}".format(scientist) + '-' + |
| os.environ['endpoint_name'] + '-bucket').lower().replace('_', '-') |
| else: |
| bucket_name = (os.environ['conf_service_base_name']).lower().replace('_', '-') |
| for item in client.list_buckets().get('Buckets'): |
| if bucket_name in item.get('Name'): |
| for i in client.get_bucket_tagging(Bucket=item.get('Name')).get('TagSet'): |
| i.get('Key') |
| if i.get('Key') == os.environ['conf_service_base_name'].lower() + '-tag': |
| bucket_list.append(item.get('Name')) |
| for s3bucket in bucket_list: |
| if s3bucket: |
| bucket = s3.Bucket(s3bucket) |
| bucket.objects.all().delete() |
| print("The S3 bucket {} has been cleaned".format(s3bucket)) |
| client.delete_bucket(Bucket=s3bucket) |
| print("The S3 bucket {} has been deleted successfully".format(s3bucket)) |
| else: |
| print("There are no buckets to delete") |
| except Exception as err: |
| logging.info("Unable to remove S3 bucket: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to remove S3 bucket", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def remove_subnets(tag_value): |
| try: |
| ec2 = boto3.resource('ec2') |
| client = boto3.client('ec2') |
| tag_name = os.environ['conf_service_base_name'].lower() + '-tag' |
| tag2_name = os.environ['conf_service_base_name'].lower() + '-secondary-tag' |
| subnets = ec2.subnets.filter( |
| Filters=[{'Name': 'tag:{}'.format(tag_name), 'Values': [tag_value]}]) |
| subnets2 = ec2.subnets.filter( |
| Filters=[{'Name': 'tag:{}'.format(tag2_name), 'Values': [tag_value]}]) |
| if subnets or subnets2: |
| if subnets: |
| for subnet in subnets: |
| client.delete_subnet(SubnetId=subnet.id) |
| print("The subnet {} has been deleted successfully".format(subnet.id)) |
| if subnets2: |
| for subnet in subnets2: |
| client.delete_subnet(SubnetId=subnet.id) |
| print("The subnet {} has been deleted successfully".format(subnet.id)) |
| else: |
| print("There are no private subnets to delete") |
| except Exception as err: |
| logging.info("Unable to remove subnet: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to remove subnet", "error_message": str(err) + "\n Traceback: " + |
| traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def remove_peering(tag_value): |
| try: |
| client = boto3.client('ec2') |
| tag_name = os.environ['conf_service_base_name'].lower() + '-tag' |
| if os.environ['conf_duo_vpc_enable'] == 'true': |
| peering_id = client.describe_vpc_peering_connections(Filters=[ |
| {'Name': 'tag-key', 'Values': [tag_name]}, |
| {'Name': 'tag-value', 'Values': [tag_value]}, |
| {'Name': 'status-code', 'Values': |
| ['active']}]).get('VpcPeeringConnections')[0].get('VpcPeeringConnectionId') |
| if peering_id: |
| client.delete_vpc_peering_connection(VpcPeeringConnectionId=peering_id) |
| print("Peering connection {} has been deleted successfully".format(peering_id)) |
| else: |
| print("There are no peering connections to delete") |
| else: |
| print("There are no peering connections to delete because duo vpc option is disabled") |
| except Exception as err: |
| logging.info("Unable to remove peering connection: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to remove peering connection", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def remove_sgroups(tag_value): |
| try: |
| ec2 = boto3.resource('ec2') |
| client = boto3.client('ec2') |
| tag_name = os.environ['conf_service_base_name'] |
| sgs = ec2.security_groups.filter( |
| Filters=[{'Name': 'tag:{}'.format(tag_name), 'Values': [tag_value]}]) |
| if sgs: |
| for sg in sgs: |
| client.delete_security_group(GroupId=sg.id) |
| print("The security group {} has been deleted successfully".format(sg.id)) |
| else: |
| print("There are no security groups to delete") |
| except Exception as err: |
| logging.info("Unable to remove SG: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to remove SG", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def add_inbound_sg_rule(sg_id, rule): |
| try: |
| client = boto3.client('ec2') |
| client.authorize_security_group_ingress( |
| GroupId=sg_id, |
| IpPermissions=[rule] |
| ) |
| except Exception as err: |
| if err.response['Error']['Code'] == 'InvalidPermission.Duplicate': |
| print("The following inbound rule is already exist:") |
| print(str(rule)) |
| else: |
| logging.info("Unable to add inbound rule to SG: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to add inbound rule to SG", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def add_outbound_sg_rule(sg_id, rule): |
| try: |
| client = boto3.client('ec2') |
| client.authorize_security_group_egress( |
| GroupId=sg_id, |
| IpPermissions=[rule] |
| ) |
| except Exception as err: |
| if err.response['Error']['Code'] == 'InvalidPermission.Duplicate': |
| print("The following outbound rule is already exist:") |
| print(str(rule)) |
| else: |
| logging.info("Unable to add outbound rule to SG: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to add outbound rule to SG", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def deregister_image(image_name='*'): |
| try: |
| resource = boto3.resource('ec2') |
| client = boto3.client('ec2') |
| for image in resource.images.filter( |
| Filters=[{'Name': 'tag-value', 'Values': [os.environ['conf_service_base_name']]}, |
| {'Name': 'tag-value', 'Values': [image_name]}]): |
| client.deregister_image(ImageId=image.id) |
| for device in image.block_device_mappings: |
| if device.get('Ebs'): |
| client.delete_snapshot(SnapshotId=device.get('Ebs').get('SnapshotId')) |
| print("Notebook AMI {} has been deregistered successfully".format(image.id)) |
| except Exception as err: |
| logging.info("Unable to de-register image: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to de-register image", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def terminate_emr(id): |
| try: |
| emr = boto3.client('emr') |
| emr.terminate_job_flows( |
| JobFlowIds=[id] |
| ) |
| waiter = emr.get_waiter('cluster_terminated') |
| waiter.wait(ClusterId=id) |
| except Exception as err: |
| logging.info("Unable to remove EMR: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)) |
| append_result(str({"error": "Unable to remove EMR", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def remove_kernels(emr_name, tag_name, nb_tag_value, ssh_user, key_path, emr_version, computational_name=''): |
| try: |
| ec2 = boto3.resource('ec2') |
| inst = ec2.instances.filter( |
| Filters=[{'Name': 'instance-state-name', 'Values': ['running']}, |
| {'Name': 'tag:{}'.format(tag_name), 'Values': ['{}'.format(nb_tag_value)]}]) |
| instances = list(inst) |
| if instances: |
| for instance in instances: |
| private = getattr(instance, 'private_dns_name') |
| env.hosts = "{}".format(private) |
| env.user = "{}".format(ssh_user) |
| env.key_filename = "{}".format(key_path) |
| env.host_string = env.user + "@" + env.hosts |
| sudo('rm -rf /home/{}/.local/share/jupyter/kernels/*_{}'.format(ssh_user, emr_name)) |
| if exists('/home/{}/.ensure_dir/dataengine-service_{}_interpreter_ensured'.format(ssh_user, emr_name)): |
| if os.environ['notebook_multiple_clusters'] == 'true': |
| try: |
| livy_port = sudo("cat /opt/" + emr_version + "/" + emr_name + |
| "/livy/conf/livy.conf | grep livy.server.port | tail -n 1 | " |
| "awk '{printf $3}'") |
| process_number = sudo("netstat -natp 2>/dev/null | grep ':" + livy_port + |
| "' | awk '{print $7}' | sed 's|/.*||g'") |
| sudo('kill -9 ' + process_number) |
| sudo('systemctl disable livy-server-' + livy_port) |
| except: |
| print("Wasn't able to find Livy server for this EMR!") |
| sudo('sed -i \"s/^export SPARK_HOME.*/export SPARK_HOME=\/opt\/spark/\" ' |
| '/opt/zeppelin/conf/zeppelin-env.sh') |
| sudo("rm -rf /home/{}/.ensure_dir/dataengine-service_interpreter_ensure".format(ssh_user)) |
| zeppelin_url = 'http://' + private + ':8080/api/interpreter/setting/' |
| opener = urllib2.build_opener(urllib2.ProxyHandler({})) |
| req = opener.open(urllib2.Request(zeppelin_url)) |
| r_text = req.read() |
| interpreter_json = json.loads(r_text) |
| interpreter_prefix = emr_name |
| for interpreter in interpreter_json['body']: |
| if interpreter_prefix in interpreter['name']: |
| print("Interpreter with ID: {0} and name: {1} will be removed from zeppelin!". |
| format(interpreter['id'], interpreter['name'])) |
| request = urllib2.Request(zeppelin_url + interpreter['id'], data='') |
| request.get_method = lambda: 'DELETE' |
| url = opener.open(request) |
| print(url.read()) |
| sudo('chown ' + ssh_user + ':' + ssh_user + ' -R /opt/zeppelin/') |
| sudo('systemctl daemon-reload') |
| sudo("service zeppelin-notebook stop") |
| sudo("service zeppelin-notebook start") |
| zeppelin_restarted = False |
| while not zeppelin_restarted: |
| sudo('sleep 5') |
| result = sudo('nmap -p 8080 localhost | grep "closed" > /dev/null; echo $?') |
| result = result[:1] |
| if result == '1': |
| zeppelin_restarted = True |
| sudo('sleep 5') |
| sudo('rm -rf /home/{}/.ensure_dir/dataengine-service_{}_interpreter_ensured'.format(ssh_user, |
| emr_name)) |
| if exists('/home/{}/.ensure_dir/rstudio_dataengine-service_ensured'.format(ssh_user)): |
| dlab.fab.remove_rstudio_dataengines_kernel(computational_name, ssh_user) |
| sudo('rm -rf /opt/' + emr_version + '/' + emr_name + '/') |
| print("Notebook's {} kernels were removed".format(env.hosts)) |
| else: |
| print("There are no notebooks to clean kernels.") |
| except Exception as err: |
| logging.info("Unable to remove kernels on Notebook: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to remove kernels on Notebook", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def remove_route_tables(tag_name, ssn=False): |
| try: |
| client = boto3.client('ec2') |
| rtables = client.describe_route_tables(Filters=[{'Name': 'tag-key', 'Values': [tag_name]}]).get('RouteTables') |
| for rtable in rtables: |
| if rtable: |
| rtable_associations = rtable.get('Associations') |
| rtable = rtable.get('RouteTableId') |
| if ssn: |
| for association in rtable_associations: |
| client.disassociate_route_table(AssociationId=association.get('RouteTableAssociationId')) |
| print("Association {} has been removed".format(association.get('RouteTableAssociationId'))) |
| client.delete_route_table(RouteTableId=rtable) |
| print("Route table {} has been removed".format(rtable)) |
| else: |
| print("There are no route tables to remove") |
| except Exception as err: |
| logging.info("Unable to remove route table: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to remove route table", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def remove_internet_gateways(vpc_id, tag_name, tag_value): |
| try: |
| ig_id = '' |
| client = boto3.client('ec2') |
| response = client.describe_internet_gateways( |
| Filters=[ |
| {'Name': 'tag-key', 'Values': [tag_name]}, |
| {'Name': 'tag-value', 'Values': [tag_value]}]).get('InternetGateways') |
| for i in response: |
| ig_id = i.get('InternetGatewayId') |
| client.detach_internet_gateway(InternetGatewayId=ig_id, VpcId=vpc_id) |
| print("Internet gateway {0} has been detached from VPC {1}".format(ig_id, vpc_id.format)) |
| client.delete_internet_gateway(InternetGatewayId=ig_id) |
| print("Internet gateway {} has been deleted successfully".format(ig_id)) |
| except Exception as err: |
| logging.info("Unable to remove internet gateway: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to remove internet gateway", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def remove_vpc_endpoints(vpc_id): |
| try: |
| client = boto3.client('ec2') |
| response = client.describe_vpc_endpoints(Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}]).get('VpcEndpoints') |
| for i in response: |
| client.delete_vpc_endpoints(VpcEndpointIds=[i.get('VpcEndpointId')]) |
| print("VPC Endpoint {} has been removed successfully".format(i.get('VpcEndpointId'))) |
| except Exception as err: |
| logging.info("Unable to remove VPC Endpoint: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to remove VPC Endpoint", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def create_image_from_instance(tag_name='', instance_name='', image_name='', tags=''): |
| try: |
| ec2 = boto3.resource('ec2') |
| client = boto3.client('ec2') |
| instances = ec2.instances.filter( |
| Filters=[{'Name': 'tag:{}'.format(tag_name), 'Values': [instance_name]}, |
| {'Name': 'instance-state-name', 'Values': ['running']}]) |
| for instance in instances: |
| image = instance.create_image(Name=image_name, |
| Description='Automatically created image for notebook server', |
| NoReboot=False) |
| image.load() |
| while image.state != 'available': |
| local("echo Waiting for image creation; sleep 20") |
| image.load() |
| tag = {'Key': 'Name', 'Value': image_name} |
| sbn_tag = {'Key': 'SBN', 'Value': os.environ['conf_service_base_name']} |
| response = client.describe_images(ImageIds=[image.id]).get('Images')[0].get('BlockDeviceMappings') |
| for ebs in response: |
| if ebs.get('Ebs'): |
| snapshot_id = ebs.get('Ebs').get('SnapshotId') |
| create_tag(snapshot_id, sbn_tag) |
| create_tag(snapshot_id, tag) |
| create_tag(image.id, sbn_tag) |
| create_tag(image.id, tag) |
| if tags: |
| all_tags = json.loads(tags) |
| for key in all_tags.keys(): |
| tag = {'Key': key, 'Value': all_tags[key]} |
| create_tag(image.id, tag) |
| return image.id |
| return '' |
| except botocore.exceptions.ClientError as err: |
| if err.response['Error']['Code'] == 'InvalidAMIName.Duplicate': |
| print("Image is already created.") |
| else: |
| logging.info("Unable to create image: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to create image", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def install_emr_spark(args): |
| s3_client = boto3.client('s3', config=Config(signature_version='s3v4'), region_name=args.region) |
| s3_client.download_file(args.bucket, args.project_name + '/' + args.cluster_name + '/spark.tar.gz', |
| '/tmp/spark.tar.gz') |
| s3_client.download_file(args.bucket, args.project_name + '/' + args.cluster_name + '/spark-checksum.chk', |
| '/tmp/spark-checksum.chk') |
| if 'WARNING' in local('md5sum -c /tmp/spark-checksum.chk', capture=True): |
| local('rm -f /tmp/spark.tar.gz') |
| s3_client.download_file(args.bucket, args.project_name + '/' + args.cluster_name + '/spark.tar.gz', |
| '/tmp/spark.tar.gz') |
| if 'WARNING' in local('md5sum -c /tmp/spark-checksum.chk', capture=True): |
| print("The checksum of spark.tar.gz is mismatched. It could be caused by aws network issue.") |
| sys.exit(1) |
| local('sudo tar -zhxvf /tmp/spark.tar.gz -C /opt/' + args.emr_version + '/' + args.cluster_name + '/') |
| |
| |
| def jars(args, emr_dir): |
| print("Downloading jars...") |
| s3_client = boto3.client('s3', config=Config(signature_version='s3v4'), region_name=args.region) |
| s3_client.download_file(args.bucket, 'jars/' + args.emr_version + '/jars.tar.gz', '/tmp/jars.tar.gz') |
| s3_client.download_file(args.bucket, 'jars/' + args.emr_version + '/jars-checksum.chk', '/tmp/jars-checksum.chk') |
| if 'WARNING' in local('md5sum -c /tmp/jars-checksum.chk', capture=True): |
| local('rm -f /tmp/jars.tar.gz') |
| s3_client.download_file(args.bucket, 'jars/' + args.emr_version + '/jars.tar.gz', '/tmp/jars.tar.gz') |
| if 'WARNING' in local('md5sum -c /tmp/jars-checksum.chk', capture=True): |
| print("The checksum of jars.tar.gz is mismatched. It could be caused by aws network issue.") |
| sys.exit(1) |
| local('tar -zhxvf /tmp/jars.tar.gz -C ' + emr_dir) |
| |
| |
| def yarn(args, yarn_dir): |
| print("Downloading yarn configuration...") |
| if args.region == 'cn-north-1': |
| s3client = boto3.client('s3', config=Config(signature_version='s3v4'), |
| endpoint_url='https://s3.cn-north-1.amazonaws.com.cn', region_name=args.region) |
| s3resource = boto3.resource('s3', config=Config(signature_version='s3v4'), |
| endpoint_url='https://s3.cn-north-1.amazonaws.com.cn', region_name=args.region) |
| else: |
| s3client = boto3.client('s3', config=Config(signature_version='s3v4'), region_name=args.region) |
| s3resource = boto3.resource('s3', config=Config(signature_version='s3v4')) |
| get_files(s3client, s3resource, args.project_name + '/' + args.cluster_name + '/config/', args.bucket, yarn_dir) |
| local('sudo mv ' + yarn_dir + args.project_name + '/' + args.cluster_name + '/config/* ' + yarn_dir) |
| local('sudo rm -rf ' + yarn_dir + args.project_name + '/') |
| |
| |
| def get_files(s3client, s3resource, dist, bucket, local): |
| s3list = s3client.get_paginator('list_objects') |
| for result in s3list.paginate(Bucket=bucket, Delimiter='/', Prefix=dist): |
| if result.get('CommonPrefixes') is not None: |
| for subdir in result.get('CommonPrefixes'): |
| get_files(s3client, s3resource, subdir.get('Prefix'), bucket, local) |
| if result.get('Contents') is not None: |
| for file in result.get('Contents'): |
| if not os.path.exists(os.path.dirname(local + os.sep + file.get('Key'))): |
| os.makedirs(os.path.dirname(local + os.sep + file.get('Key'))) |
| s3resource.meta.client.download_file(bucket, file.get('Key'), local + os.sep + file.get('Key')) |
| |
| |
| def get_cluster_python_version(region, bucket, user_name, cluster_name): |
| s3_client = boto3.client('s3', config=Config(signature_version='s3v4'), region_name=region) |
| s3_client.download_file(bucket, user_name + '/' + cluster_name + '/python_version', '/tmp/python_version') |
| |
| |
| def get_gitlab_cert(bucket, certfile): |
| try: |
| s3 = boto3.resource('s3') |
| s3.Bucket(bucket).download_file(certfile, certfile) |
| return True |
| except botocore.exceptions.ClientError as err: |
| if err.response['Error']['Code'] == "404": |
| print("The object does not exist.") |
| return False |
| |
| |
| def create_aws_config_files(generate_full_config=False): |
| try: |
| aws_user_dir = os.environ['AWS_DIR'] |
| logging.info(local("rm -rf " + aws_user_dir + " 2>&1", capture=True)) |
| logging.info(local("mkdir -p " + aws_user_dir + " 2>&1", capture=True)) |
| |
| with open(aws_user_dir + '/config', 'w') as aws_file: |
| aws_file.write("[default]\n") |
| aws_file.write("region = {}\n".format(os.environ['aws_region'])) |
| |
| if generate_full_config: |
| with open(aws_user_dir + '/credentials', 'w') as aws_file: |
| aws_file.write("[default]\n") |
| aws_file.write("aws_access_key_id = {}\n".format(os.environ['aws_access_key'])) |
| aws_file.write("aws_secret_access_key = {}\n".format(os.environ['aws_secret_access_key'])) |
| |
| logging.info(local("chmod 600 " + aws_user_dir + "/*" + " 2>&1", capture=True)) |
| logging.info(local("chmod 550 " + aws_user_dir + " 2>&1", capture=True)) |
| |
| return True |
| except Exception as err: |
| print('Error: {0}'.format(err)) |
| sys.exit(1) |
| |
| |
| def installing_python(region, bucket, user_name, cluster_name, application='', pip_mirror='', numpy_version='1.14.3'): |
| get_cluster_python_version(region, bucket, user_name, cluster_name) |
| with file('/tmp/python_version') as f: |
| python_version = f.read() |
| python_version = python_version[0:5] |
| if not os.path.exists('/opt/python/python' + python_version): |
| local('wget https://www.python.org/ftp/python/' + python_version + |
| '/Python-' + python_version + '.tgz -O /tmp/Python-' + python_version + '.tgz') |
| local('tar zxvf /tmp/Python-' + python_version + '.tgz -C /tmp/') |
| with lcd('/tmp/Python-' + python_version): |
| local('./configure --prefix=/opt/python/python' + python_version + |
| ' --with-zlib-dir=/usr/local/lib/ --with-ensurepip=install') |
| local('sudo make altinstall') |
| with lcd('/tmp/'): |
| local('sudo rm -rf Python-' + python_version + '/') |
| if region == 'cn-north-1': |
| local('sudo -i /opt/python/python{}/bin/python{} -m pip install -U pip=={} --no-cache-dir'.format( |
| python_version, python_version[0:3], os.environ['conf_pip_version'])) |
| local('sudo mv /etc/pip.conf /etc/back_pip.conf') |
| local('sudo touch /etc/pip.conf') |
| local('sudo echo "[global]" >> /etc/pip.conf') |
| local('sudo echo "timeout = 600" >> /etc/pip.conf') |
| local('sudo -i virtualenv /opt/python/python' + python_version) |
| venv_command = '/bin/bash /opt/python/python' + python_version + '/bin/activate' |
| pip_command = '/opt/python/python' + python_version + '/bin/pip' + python_version[:3] |
| if region == 'cn-north-1': |
| try: |
| local(venv_command + ' && sudo -i ' + pip_command + |
| ' install -i https://{0}/simple --trusted-host {0} --timeout 60000 -U pip==9.0.3 ' |
| '--no-cache-dir'.format(pip_mirror)) |
| local(venv_command + ' && sudo -i ' + pip_command + ' install pyzmq==17.0.0') |
| local(venv_command + ' && sudo -i ' + pip_command + |
| ' install -i https://{0}/simple --trusted-host {0} --timeout 60000 ipython ipykernel ' |
| '--no-cache-dir'.format(pip_mirror)) |
| local(venv_command + ' && sudo -i ' + pip_command + ' install NumPy=={0}'.format(numpy_version)) |
| local(venv_command + ' && sudo -i ' + pip_command + |
| ' install -i https://{0}/simple --trusted-host {0} --timeout 60000 boto boto3 SciPy ' |
| 'Matplotlib==2.0.2 pandas Sympy Pillow sklearn --no-cache-dir'.format(pip_mirror)) |
| # Need to refactor when we add GPU cluster |
| if application == 'deeplearning': |
| local(venv_command + ' && sudo -i ' + pip_command + |
| ' install -i https://{0}/simple --trusted-host {0} --timeout 60000 mxnet-cu80 opencv-python ' |
| 'keras Theano --no-cache-dir'.format(pip_mirror)) |
| python_without_dots = python_version.replace('.', '') |
| local(venv_command + ' && sudo -i ' + pip_command + |
| ' install https://cntk.ai/PythonWheel/GPU/cntk-2.0rc3-cp{0}-cp{0}m-linux_x86_64.whl ' |
| '--no-cache-dir'.format(python_without_dots[:2])) |
| local('sudo rm /etc/pip.conf') |
| local('sudo mv /etc/back_pip.conf /etc/pip.conf') |
| except: |
| local('sudo rm /etc/pip.conf') |
| local('sudo mv /etc/back_pip.conf /etc/pip.conf') |
| local('sudo rm -rf /opt/python/python{}/'.format(python_version)) |
| sys.exit(1) |
| else: |
| local(venv_command + ' && sudo -i ' + pip_command + ' install -U pip==9.0.3') |
| local(venv_command + ' && sudo -i ' + pip_command + ' install pyzmq==17.0.0') |
| local(venv_command + ' && sudo -i ' + pip_command + ' install ipython ipykernel --no-cache-dir') |
| local(venv_command + ' && sudo -i ' + pip_command + ' install NumPy=={}'.format(numpy_version)) |
| local(venv_command + ' && sudo -i ' + pip_command + |
| ' install boto boto3 SciPy Matplotlib==2.0.2 pandas Sympy Pillow sklearn ' |
| '--no-cache-dir') |
| # Need to refactor when we add GPU cluster |
| if application == 'deeplearning': |
| local(venv_command + ' && sudo -i ' + pip_command + |
| ' install mxnet-cu80 opencv-python keras Theano --no-cache-dir') |
| python_without_dots = python_version.replace('.', '') |
| local(venv_command + ' && sudo -i ' + pip_command + |
| ' install https://cntk.ai/PythonWheel/GPU/cntk-2.0rc3-cp{0}-cp{0}m-linux_x86_64.whl ' |
| '--no-cache-dir'.format(python_without_dots[:2])) |
| local('sudo rm -rf /usr/bin/python{}-dp'.format(python_version[0:3])) |
| local('sudo ln -fs /opt/python/python{0}/bin/python{1} /usr/bin/python{1}-dp'.format(python_version, |
| python_version[0:3])) |
| |
| |
| def spark_defaults(args): |
| spark_def_path = '/opt/' + args.emr_version + '/' + args.cluster_name + '/spark/conf/spark-defaults.conf' |
| for i in eval(args.excluded_lines): |
| local(""" sudo bash -c " sed -i '/""" + i + """/d' """ + spark_def_path + """ " """) |
| local(""" sudo bash -c " sed -i '/#/d' """ + spark_def_path + """ " """) |
| local(""" sudo bash -c " sed -i '/^\s*$/d' """ + spark_def_path + """ " """) |
| local(""" sudo bash -c "sed -i '/spark.driver.extraClassPath/,/spark.driver.extraLibraryPath/s|""" |
| """/usr|/opt/DATAENGINE-SERVICE_VERSION/jars/usr|g' """ + spark_def_path + """ " """) |
| local( |
| """ sudo bash -c "sed -i '/spark.yarn.dist.files/s/\/etc\/spark\/conf/\/opt\/DATAENGINE-SERVICE_VERSION\/CLUSTER\/conf/g' """ |
| + spark_def_path + """ " """) |
| template_file = spark_def_path |
| with open(template_file, 'r') as f: |
| text = f.read() |
| text = text.replace('DATAENGINE-SERVICE_VERSION', args.emr_version) |
| text = text.replace('CLUSTER', args.cluster_name) |
| with open(spark_def_path, 'w') as f: |
| f.write(text) |
| if args.region == 'us-east-1': |
| endpoint_url = 'https://s3.amazonaws.com' |
| elif args.region == 'cn-north-1': |
| endpoint_url = "https://s3.{}.amazonaws.com.cn".format(args.region) |
| else: |
| endpoint_url = 'https://s3-' + args.region + '.amazonaws.com' |
| local("""bash -c 'echo "spark.hadoop.fs.s3a.endpoint """ + endpoint_url + """ " >> """ + |
| spark_def_path + """'""") |
| local('echo "spark.hadoop.fs.s3a.server-side-encryption-algorithm AES256" >> {}'.format(spark_def_path)) |
| |
| |
| def ensure_local_jars(os_user, jars_dir): |
| if not exists('/home/{}/.ensure_dir/local_jars_ensured'.format(os_user)): |
| try: |
| sudo('mkdir -p {0}'.format(jars_dir)) |
| sudo('wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/{0}/hadoop-aws-{0}.jar -O \ |
| {1}hadoop-aws-{0}.jar'.format('2.7.4', jars_dir)) |
| sudo('wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/{0}/aws-java-sdk-{0}.jar -O \ |
| {1}aws-java-sdk-{0}.jar'.format('1.7.4', jars_dir)) |
| # sudo('wget https://maven.twttr.com/com/hadoop/gplcompression/hadoop-lzo/{0}/hadoop-lzo-{0}.jar -O \ |
| # {1}hadoop-lzo-{0}.jar'.format('0.4.20', jars_dir)) |
| sudo('touch /home/{}/.ensure_dir/local_jars_ensured'.format(os_user)) |
| except: |
| sys.exit(1) |
| |
| |
| def configure_local_spark(jars_dir, templates_dir, memory_type='driver'): |
| try: |
| # Checking if spark.jars parameter was generated previously |
| spark_jars_paths = None |
| if exists('/opt/spark/conf/spark-defaults.conf'): |
| try: |
| spark_jars_paths = sudo('cat /opt/spark/conf/spark-defaults.conf | grep -e "^spark.jars " ') |
| except: |
| spark_jars_paths = None |
| region = sudo('curl http://169.254.169.254/latest/meta-data/placement/availability-zone')[:-1] |
| if region == 'us-east-1': |
| endpoint_url = 'https://s3.amazonaws.com' |
| elif region == 'cn-north-1': |
| endpoint_url = "https://s3.{}.amazonaws.com.cn".format(region) |
| else: |
| endpoint_url = 'https://s3-' + region + '.amazonaws.com' |
| put(templates_dir + 'notebook_spark-defaults_local.conf', '/tmp/notebook_spark-defaults_local.conf') |
| sudo('echo "spark.hadoop.fs.s3a.endpoint {}" >> /tmp/notebook_spark-defaults_local.conf'.format( |
| endpoint_url)) |
| sudo('echo "spark.hadoop.fs.s3a.server-side-encryption-algorithm AES256" >> ' |
| '/tmp/notebook_spark-defaults_local.conf') |
| if os.environ['application'] == 'zeppelin': |
| sudo('echo \"spark.jars $(ls -1 ' + jars_dir + '* | tr \'\\n\' \',\')\" >> ' |
| '/tmp/notebook_spark-defaults_local.conf') |
| sudo('\cp -f /tmp/notebook_spark-defaults_local.conf /opt/spark/conf/spark-defaults.conf') |
| if memory_type == 'driver': |
| spark_memory = dlab.fab.get_spark_memory() |
| sudo('sed -i "/spark.*.memory/d" /opt/spark/conf/spark-defaults.conf') |
| sudo('echo "spark.{0}.memory {1}m" >> /opt/spark/conf/spark-defaults.conf'.format(memory_type, |
| spark_memory)) |
| if 'spark_configurations' in os.environ: |
| dlab_header = sudo('cat /tmp/notebook_spark-defaults_local.conf | grep "^#"') |
| spark_configurations = ast.literal_eval(os.environ['spark_configurations']) |
| new_spark_defaults = list() |
| spark_defaults = sudo('cat /opt/spark/conf/spark-defaults.conf') |
| current_spark_properties = spark_defaults.split('\n') |
| for param in current_spark_properties: |
| if param.split(' ')[0] != '#': |
| for config in spark_configurations: |
| if config['Classification'] == 'spark-defaults': |
| for property in config['Properties']: |
| if property == param.split(' ')[0]: |
| param = property + ' ' + config['Properties'][property] |
| else: |
| new_spark_defaults.append(property + ' ' + config['Properties'][property]) |
| new_spark_defaults.append(param) |
| new_spark_defaults = set(new_spark_defaults) |
| sudo("echo '{}' > /opt/spark/conf/spark-defaults.conf".format(dlab_header)) |
| for prop in new_spark_defaults: |
| prop = prop.rstrip() |
| sudo('echo "{}" >> /opt/spark/conf/spark-defaults.conf'.format(prop)) |
| sudo('sed -i "/^\s*$/d" /opt/spark/conf/spark-defaults.conf') |
| if spark_jars_paths: |
| sudo('echo "{}" >> /opt/spark/conf/spark-defaults.conf'.format(spark_jars_paths)) |
| except Exception as err: |
| print('Error:', str(err)) |
| sys.exit(1) |
| |
| |
| def configure_zeppelin_emr_interpreter(emr_version, cluster_name, region, spark_dir, os_user, yarn_dir, bucket, |
| user_name, endpoint_url, multiple_emrs): |
| try: |
| port_number_found = False |
| zeppelin_restarted = False |
| default_port = 8998 |
| get_cluster_python_version(region, bucket, user_name, cluster_name) |
| with file('/tmp/python_version') as f: |
| python_version = f.read() |
| python_version = python_version[0:5] |
| livy_port = '' |
| livy_path = '/opt/{0}/{1}/livy/'.format(emr_version, cluster_name) |
| spark_libs = "/opt/{0}/jars/usr/share/aws/aws-java-sdk/aws-java-sdk-core*.jar /opt/{0}/jars/usr/lib/hadoop" \ |
| "/hadoop-aws*.jar /opt/" + \ |
| "{0}/jars/usr/share/aws/aws-java-sdk/aws-java-sdk-s3-*.jar /opt/{0}" + \ |
| "/jars/usr/lib/hadoop-lzo/lib/hadoop-lzo-*.jar".format(emr_version) |
| # fix due to: Multiple py4j files found under ..../spark/python/lib |
| # py4j-0.10.7-src.zip still in folder. Versions may varies. |
| local('rm /opt/{0}/{1}/spark/python/lib/py4j-src.zip'.format(emr_version, cluster_name)) |
| |
| local('echo \"Configuring emr path for Zeppelin\"') |
| local('sed -i \"s/^export SPARK_HOME.*/export SPARK_HOME=\/opt\/{0}\/{1}\/spark/\" ' |
| '/opt/zeppelin/conf/zeppelin-env.sh'.format(emr_version, cluster_name)) |
| local('sed -i "s/^export HADOOP_CONF_DIR.*/export HADOOP_CONF_DIR=' + \ |
| '\/opt\/{0}\/{1}\/conf/" /opt/{0}/{1}/spark/conf/spark-env.sh'.format(emr_version, cluster_name)) |
| local('echo \"spark.jars $(ls {0} | tr \'\\n\' \',\')\" >> /opt/{1}/{2}/spark/conf/spark-defaults.conf' |
| .format(spark_libs, emr_version, cluster_name)) |
| local('sed -i "/spark.executorEnv.PYTHONPATH/d" /opt/{0}/{1}/spark/conf/spark-defaults.conf' |
| .format(emr_version, cluster_name)) |
| local('sed -i "/spark.yarn.dist.files/d" /opt/{0}/{1}/spark/conf/spark-defaults.conf' |
| .format(emr_version, cluster_name)) |
| local('sudo chown {0}:{0} -R /opt/zeppelin/'.format(os_user)) |
| local('sudo systemctl daemon-reload') |
| local('sudo service zeppelin-notebook stop') |
| local('sudo service zeppelin-notebook start') |
| while not zeppelin_restarted: |
| local('sleep 5') |
| result = local('sudo bash -c "nmap -p 8080 localhost | grep closed > /dev/null" ; echo $?', capture=True) |
| result = result[:1] |
| if result == '1': |
| zeppelin_restarted = True |
| local('sleep 5') |
| local('echo \"Configuring emr spark interpreter for Zeppelin\"') |
| if multiple_emrs == 'true': |
| while not port_number_found: |
| port_free = local('sudo bash -c "nmap -p ' + str(default_port) + |
| ' localhost | grep closed > /dev/null" ; echo $?', capture=True) |
| port_free = port_free[:1] |
| if port_free == '0': |
| livy_port = default_port |
| port_number_found = True |
| else: |
| default_port += 1 |
| local('sudo echo "livy.server.port = {0}" >> {1}conf/livy.conf'.format(str(livy_port), livy_path)) |
| local('sudo echo "livy.spark.master = yarn" >> {}conf/livy.conf'.format(livy_path)) |
| if os.path.exists('{}conf/spark-blacklist.conf'.format(livy_path)): |
| local('sudo sed -i "s/^/#/g" {}conf/spark-blacklist.conf'.format(livy_path)) |
| local(''' sudo echo "export SPARK_HOME={0}" >> {1}conf/livy-env.sh'''.format(spark_dir, livy_path)) |
| local(''' sudo echo "export HADOOP_CONF_DIR={0}" >> {1}conf/livy-env.sh'''.format(yarn_dir, livy_path)) |
| local(''' sudo echo "export PYSPARK3_PYTHON=python{0}" >> {1}conf/livy-env.sh'''.format(python_version[0:3], |
| livy_path)) |
| template_file = "/tmp/dataengine-service_interpreter.json" |
| fr = open(template_file, 'r+') |
| text = fr.read() |
| text = text.replace('CLUSTER_NAME', cluster_name) |
| text = text.replace('SPARK_HOME', spark_dir) |
| text = text.replace('ENDPOINTURL', endpoint_url) |
| text = text.replace('LIVY_PORT', str(livy_port)) |
| fw = open(template_file, 'w') |
| fw.write(text) |
| fw.close() |
| for _ in range(5): |
| try: |
| local("curl --noproxy localhost -H 'Content-Type: application/json' -X POST -d " + |
| "@/tmp/dataengine-service_interpreter.json http://localhost:8080/api/interpreter/setting") |
| break |
| except: |
| local('sleep 5') |
| local('sudo cp /opt/livy-server-cluster.service /etc/systemd/system/livy-server-{}.service' |
| .format(str(livy_port))) |
| local("sudo sed -i 's|OS_USER|{0}|' /etc/systemd/system/livy-server-{1}.service" |
| .format(os_user, str(livy_port))) |
| local("sudo sed -i 's|LIVY_PATH|{0}|' /etc/systemd/system/livy-server-{1}.service" |
| .format(livy_path, str(livy_port))) |
| local('sudo chmod 644 /etc/systemd/system/livy-server-{}.service'.format(str(livy_port))) |
| local("sudo systemctl daemon-reload") |
| local("sudo systemctl enable livy-server-{}".format(str(livy_port))) |
| local('sudo systemctl start livy-server-{}'.format(str(livy_port))) |
| else: |
| template_file = "/tmp/dataengine-service_interpreter.json" |
| p_versions = ["2", "{}-dp".format(python_version[:3])] |
| for p_version in p_versions: |
| fr = open(template_file, 'r+') |
| text = fr.read() |
| text = text.replace('CLUSTERNAME', cluster_name) |
| text = text.replace('PYTHONVERSION', p_version) |
| text = text.replace('SPARK_HOME', spark_dir) |
| text = text.replace('PYTHONVER_SHORT', p_version[:1]) |
| text = text.replace('ENDPOINTURL', endpoint_url) |
| text = text.replace('DATAENGINE-SERVICE_VERSION', emr_version) |
| tmp_file = "/tmp/emr_spark_py{}_interpreter.json".format(p_version) |
| fw = open(tmp_file, 'w') |
| fw.write(text) |
| fw.close() |
| for _ in range(5): |
| try: |
| local("curl --noproxy localhost -H 'Content-Type: application/json' -X POST -d " + |
| "@/tmp/emr_spark_py" + p_version + |
| "_interpreter.json http://localhost:8080/api/interpreter/setting") |
| break |
| except: |
| local('sleep 5') |
| local('touch /home/' + os_user + '/.ensure_dir/dataengine-service_' + cluster_name + '_interpreter_ensured') |
| except: |
| sys.exit(1) |
| |
| |
| def configure_dataengine_spark(cluster_name, jars_dir, cluster_dir, datalake_enabled, spark_configs=''): |
| local("jar_list=`find {0} -name '*.jar' | tr '\\n' ',' | sed 's/,$//'` ; echo \"spark.jars $jar_list\" >> \ |
| /tmp/{1}/notebook_spark-defaults_local.conf".format(jars_dir, cluster_name)) |
| region = local('curl http://169.254.169.254/latest/meta-data/placement/availability-zone', capture=True)[:-1] |
| if region == 'us-east-1': |
| endpoint_url = 'https://s3.amazonaws.com' |
| elif region == 'cn-north-1': |
| endpoint_url = "https://s3.{}.amazonaws.com.cn".format(region) |
| else: |
| endpoint_url = 'https://s3-' + region + '.amazonaws.com' |
| local("""bash -c 'echo "spark.hadoop.fs.s3a.endpoint """ + endpoint_url + |
| """" >> /tmp/{}/notebook_spark-defaults_local.conf'""".format(cluster_name)) |
| local('echo "spark.hadoop.fs.s3a.server-side-encryption-algorithm AES256" >> ' |
| '/tmp/{}/notebook_spark-defaults_local.conf'.format(cluster_name)) |
| if os.path.exists('{0}spark/conf/spark-defaults.conf'.format(cluster_dir)): |
| additional_spark_properties = local('diff --changed-group-format="%>" --unchanged-group-format="" ' |
| '/tmp/{0}/notebook_spark-defaults_local.conf ' |
| '{1}spark/conf/spark-defaults.conf | grep -v "^#"'.format( |
| cluster_name, cluster_dir), capture=True) |
| for property in additional_spark_properties.split('\n'): |
| local('echo "{0}" >> /tmp/{1}/notebook_spark-defaults_local.conf'.format(property, cluster_name)) |
| local('cp -f /tmp/{0}/notebook_spark-defaults_local.conf {1}spark/conf/spark-defaults.conf'.format(cluster_name, |
| cluster_dir)) |
| if spark_configs: |
| dlab_header = local('cat /tmp/{0}/notebook_spark-defaults_local.conf | grep "^#"'.format(cluster_name), |
| capture=True) |
| spark_configurations = ast.literal_eval(spark_configs) |
| new_spark_defaults = list() |
| spark_defaults = local('cat {0}spark/conf/spark-defaults.conf'.format(cluster_dir), capture=True) |
| current_spark_properties = spark_defaults.split('\n') |
| for param in current_spark_properties: |
| if param.split(' ')[0] != '#': |
| for config in spark_configurations: |
| if config['Classification'] == 'spark-defaults': |
| for property in config['Properties']: |
| if property == param.split(' ')[0]: |
| param = property + ' ' + config['Properties'][property] |
| else: |
| new_spark_defaults.append(property + ' ' + config['Properties'][property]) |
| new_spark_defaults.append(param) |
| new_spark_defaults = set(new_spark_defaults) |
| local("echo '{0}' > {1}/spark/conf/spark-defaults.conf".format(dlab_header, cluster_dir)) |
| for prop in new_spark_defaults: |
| prop = prop.rstrip() |
| local('echo "{0}" >> {1}/spark/conf/spark-defaults.conf'.format(prop, cluster_dir)) |
| local('sed -i "/^\s*$/d" {0}/spark/conf/spark-defaults.conf'.format(cluster_dir)) |
| |
| |
| def remove_dataengine_kernels(tag_name, notebook_name, os_user, key_path, cluster_name): |
| try: |
| private = meta_lib.get_instance_private_ip_address(tag_name, notebook_name) |
| env.hosts = "{}".format(private) |
| env.user = "{}".format(os_user) |
| env.key_filename = "{}".format(key_path) |
| env.host_string = env.user + "@" + env.hosts |
| sudo('rm -rf /home/{}/.local/share/jupyter/kernels/*_{}'.format(os_user, cluster_name)) |
| if exists('/home/{}/.ensure_dir/dataengine_{}_interpreter_ensured'.format(os_user, cluster_name)): |
| if os.environ['notebook_multiple_clusters'] == 'true': |
| try: |
| livy_port = sudo("cat /opt/" + cluster_name + |
| "/livy/conf/livy.conf | grep livy.server.port | tail -n 1 | awk '{printf $3}'") |
| process_number = sudo("netstat -natp 2>/dev/null | grep ':" + livy_port + |
| "' | awk '{print $7}' | sed 's|/.*||g'") |
| sudo('kill -9 ' + process_number) |
| sudo('systemctl disable livy-server-' + livy_port) |
| except: |
| print("Wasn't able to find Livy server for this EMR!") |
| sudo( |
| 'sed -i \"s/^export SPARK_HOME.*/export SPARK_HOME=\/opt\/spark/\" /opt/zeppelin/conf/zeppelin-env.sh') |
| sudo("rm -rf /home/{}/.ensure_dir/dataengine_interpreter_ensure".format(os_user)) |
| zeppelin_url = 'http://' + private + ':8080/api/interpreter/setting/' |
| opener = urllib2.build_opener(urllib2.ProxyHandler({})) |
| req = opener.open(urllib2.Request(zeppelin_url)) |
| r_text = req.read() |
| interpreter_json = json.loads(r_text) |
| interpreter_prefix = cluster_name |
| for interpreter in interpreter_json['body']: |
| if interpreter_prefix in interpreter['name']: |
| print("Interpreter with ID: {} and name: {} will be removed from zeppelin!".format( |
| interpreter['id'], interpreter['name'])) |
| request = urllib2.Request(zeppelin_url + interpreter['id'], data='') |
| request.get_method = lambda: 'DELETE' |
| url = opener.open(request) |
| print(url.read()) |
| sudo('chown ' + os_user + ':' + os_user + ' -R /opt/zeppelin/') |
| sudo('systemctl daemon-reload') |
| sudo("service zeppelin-notebook stop") |
| sudo("service zeppelin-notebook start") |
| zeppelin_restarted = False |
| while not zeppelin_restarted: |
| sudo('sleep 5') |
| result = sudo('nmap -p 8080 localhost | grep "closed" > /dev/null; echo $?') |
| result = result[:1] |
| if result == '1': |
| zeppelin_restarted = True |
| sudo('sleep 5') |
| sudo('rm -rf /home/{}/.ensure_dir/dataengine_{}_interpreter_ensured'.format(os_user, cluster_name)) |
| if exists('/home/{}/.ensure_dir/rstudio_dataengine_ensured'.format(os_user)): |
| dlab.fab.remove_rstudio_dataengines_kernel(os.environ['computational_name'], os_user) |
| sudo('rm -rf /opt/' + cluster_name + '/') |
| print("Notebook's {} kernels were removed".format(env.hosts)) |
| except Exception as err: |
| logging.info("Unable to remove kernels on Notebook: " + str(err) + "\n Traceback: " + traceback.print_exc( |
| file=sys.stdout)) |
| append_result(str({"error": "Unable to remove kernels on Notebook", |
| "error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)})) |
| traceback.print_exc(file=sys.stdout) |
| |
| |
| def prepare_disk(os_user): |
| if not exists('/home/' + os_user + '/.ensure_dir/disk_ensured'): |
| try: |
| disk_name = sudo("lsblk | grep disk | awk '{print $1}' | sort | tail -n 1") |
| sudo('''bash -c 'echo -e "o\nn\np\n1\n\n\nw" | fdisk /dev/{}' '''.format(disk_name)) |
| sudo('mkfs.ext4 -F /dev/{}1'.format(disk_name)) |
| sudo('mount /dev/{}1 /opt/'.format(disk_name)) |
| sudo(''' bash -c "echo '/dev/{}1 /opt/ ext4 errors=remount-ro 0 1' >> /etc/fstab" '''.format(disk_name)) |
| sudo('touch /home/' + os_user + '/.ensure_dir/disk_ensured') |
| except: |
| sys.exit(1) |
| |
| |
| def ensure_local_spark(os_user, spark_link, spark_version, hadoop_version, local_spark_path): |
| if not exists('/home/' + os_user + '/.ensure_dir/local_spark_ensured'): |
| try: |
| sudo('wget ' + spark_link + ' -O /tmp/spark-' + spark_version + '-bin-hadoop' + hadoop_version + '.tgz') |
| sudo('tar -zxvf /tmp/spark-' + spark_version + '-bin-hadoop' + hadoop_version + '.tgz -C /opt/') |
| sudo('mv /opt/spark-' + spark_version + '-bin-hadoop' + hadoop_version + ' ' + local_spark_path) |
| sudo('chown -R ' + os_user + ':' + os_user + ' ' + local_spark_path) |
| sudo('touch /home/' + os_user + '/.ensure_dir/local_spark_ensured') |
| except Exception as err: |
| print('Error:', str(err)) |
| sys.exit(1) |
| |
| |
| def install_dataengine_spark(cluster_name, spark_link, spark_version, hadoop_version, cluster_dir, os_user, |
| datalake_enabled): |
| local('wget ' + spark_link + ' -O /tmp/' + cluster_name + '/spark-' + spark_version + '-bin-hadoop' + |
| hadoop_version + '.tgz') |
| local('tar -zxvf /tmp/' + cluster_name + '/spark-' + spark_version + '-bin-hadoop' + hadoop_version + |
| '.tgz -C /opt/' + cluster_name) |
| local('mv /opt/' + cluster_name + '/spark-' + spark_version + '-bin-hadoop' + hadoop_version + ' ' + |
| cluster_dir + 'spark/') |
| local('chown -R ' + os_user + ':' + os_user + ' ' + cluster_dir + 'spark/') |
| |
| |
| def find_des_jars(all_jars, des_path): |
| try: |
| default_jars = ['hadoop-aws', 'hadoop-lzo', 'aws-java-sdk'] |
| for i in default_jars: |
| for j in all_jars: |
| if i in j: |
| print('Remove default cloud jar: {0}'.format(j)) |
| all_jars.remove(j) |
| additional_jars = ['hadoop-aws', 'aws-java-sdk-s3', 'hadoop-lzo', 'aws-java-sdk-core'] |
| aws_filter = '\|'.join(additional_jars) |
| aws_jars = sudo('find {0} -name *.jar | grep "{1}"'.format(des_path, aws_filter)).split('\r\n') |
| all_jars.extend(aws_jars) |
| return all_jars |
| except Exception as err: |
| print('Error:', str(err)) |
| sys.exit(1) |