blob: d1a53a65d83bbacd3b169705faa052da4909c668 [file] [log] [blame]
# *****************************************************************************
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# ******************************************************************************
import ast
import backoff
import datalab.common_lib
import datalab.fab
import datalab.meta_lib
import google.auth
import json
import logging
import os
import random
import sys
import time
import traceback
import urllib3
import subprocess
from Crypto.PublicKey import RSA
from datalab.fab import *
from fabric import *
from google.cloud import exceptions
from google.cloud import storage
from googleapiclient import errors
from googleapiclient.discovery import build
class GCPActions:
def __init__(self, auth_type='service_account'):
@backoff.on_exception(backoff.expo,
google.auth.exceptions.DefaultCredentialsError,
max_tries=15)
def get_gcp_cred():
credentials, project = google.auth.default()
return credentials, project
self.auth_type = auth_type
self.project = os.environ['gcp_project_id']
if os.environ['conf_resource'] == 'ssn':
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "/root/service_account.json"
credentials, project = google.auth.default()
if credentials.requires_scopes:
credentials = credentials.with_scopes(
['https://www.googleapis.com/auth/compute',
'https://www.googleapis.com/auth/iam',
'https://www.googleapis.com/auth/cloud-platform'])
self.service = build('compute', 'v1', credentials=credentials)
self.service_iam = build('iam', 'v1', credentials=credentials)
self.dataproc = build('dataproc', 'v1', credentials=credentials)
self.service_storage = build('storage', 'v1', credentials=credentials)
self.storage_client = storage.Client(project=project, credentials=credentials)
self.service_resource = build('cloudresourcemanager', 'v1', credentials=credentials)
else:
credentials, project = get_gcp_cred()
self.service = build('compute', 'v1', credentials=credentials)
self.service_iam = build('iam', 'v1', credentials=credentials)
self.dataproc = build('dataproc', 'v1', credentials=credentials)
self.service_storage = build('storage', 'v1', credentials=credentials)
self.storage_client = storage.Client(project=project, credentials=credentials)
self.service_resource = build('cloudresourcemanager', 'v1', credentials=credentials)
def create_vpc(self, vpc_name):
network_params = {'name': vpc_name, 'autoCreateSubnetworks': False}
request = self.service.networks().insert(project=self.project, body=network_params)
try:
print("Create VPC {}".format(vpc_name))
result = request.execute()
datalab.meta_lib.GCPMeta().wait_for_operation(result['name'])
print("VPC {} has been created".format(vpc_name))
return result
except Exception as err:
logging.info(
"Unable to create VPC: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create VPC",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def remove_vpc(self, vpc_name):
request = self.service.networks().delete(project=self.project, network=vpc_name)
allow = False
count = 0
try:
while not allow and count < 5:
result = request.execute()
datalab.meta_lib.GCPMeta().wait_for_operation(result['name'])
time.sleep(5)
if datalab.meta_lib.GCPMeta().get_vpc(vpc_name) == '':
allow = True
else:
count = count + 1
print("VPC {} has been removed".format(vpc_name))
return result
except Exception as err:
logging.info(
"Unable to remove VPC: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to remove VPC",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_subnet(self, subnet_name, subnet_cidr, vpc_selflink, region):
subnetwork_params = {
'name': subnet_name,
'ipCidrRange': subnet_cidr,
'privateIpGoogleAccess': 'true',
'network': vpc_selflink
}
request = self.service.subnetworks().insert(
project=self.project, region=region, body=subnetwork_params)
try:
print("Create subnet {}".format(subnet_name))
result = request.execute()
datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], region=region)
print("Subnet {} has been created".format(subnet_name))
return result
except Exception as err:
logging.info(
"Unable to create Subnet: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create Subnet",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def remove_subnet(self, subnet_name, region):
request = self.service.subnetworks().delete(project=self.project, region=region, subnetwork=subnet_name)
try:
result = request.execute()
datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], region=region)
print("Subnet {} has been removed".format(subnet_name))
return result
except Exception as err:
logging.info(
"Unable to remove Subnet: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to remove Subnet",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_firewall(self, firewall_params):
request = self.service.firewalls().insert(project=self.project, body=firewall_params)
try:
result = request.execute()
datalab.meta_lib.GCPMeta().wait_for_operation(result['name'])
print('Firewall {} created.'.format(firewall_params['name']))
return result
except Exception as err:
logging.info(
"Unable to create Firewall: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create Firewall",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def remove_firewall(self, firewall_name):
request = self.service.firewalls().delete(project=self.project, firewall=firewall_name)
try:
result = request.execute()
datalab.meta_lib.GCPMeta().wait_for_operation(result['name'])
print('Firewall {} removed.'.format(firewall_name))
return result
except Exception as err:
logging.info(
"Unable to remove Firewall: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to remove Firewall",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_nat_route(self, nat_route_params):
request = self.service.routes().insert(project=self.project, body=nat_route_params)
try:
result = request.execute()
datalab.meta_lib.GCPMeta().wait_for_operation(result['name'])
print('NAT route {} created.'.format(nat_route_params['name']))
return result
except Exception as err:
logging.info(
"Unable to create NAT route: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create NAT route",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def delete_nat_route(self, nat_route_name):
request = self.service.routes().delete(project=self.project, route=nat_route_name)
try:
result = request.execute()
datalab.meta_lib.GCPMeta().wait_for_operation(result['name'])
print('NAT route {} deleteed.'.format(nat_route_name))
return result
except Exception as err:
logging.info(
"Unable to delete NAT route: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to delete NAT route",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_bucket(self, bucket_name):
try:
bucket = self.storage_client.create_bucket(bucket_name)
print('Bucket {} created.'.format(bucket.name))
except Exception as err:
logging.info(
"Unable to create Bucket: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create Bucket",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def add_bucket_labels(self, bucket_name, tags):
try:
bucket = self.storage_client.get_bucket(bucket_name)
labels = bucket.labels
labels.update(tags)
bucket.labels = labels
bucket.patch()
print('Updated labels on {}.'.format(bucket_name))
except Exception as err:
logging.info(
"Unable to create Bucket: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create Bucket",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def remove_bucket(self, bucket_name):
try:
GCPActions().bucket_cleanup(bucket_name)
storage_resource = storage.Bucket(self.storage_client, bucket_name)
storage_resource.delete(force=True)
print('Bucket {} removed.'.format(bucket_name))
except Exception as err:
logging.info(
"Unable to remove Bucket: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to remove Bucket",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def bucket_cleanup(self, bucket_name, user_name='', cluster_name=''):
try:
prefix = ''
bucket = self.storage_client.get_bucket(bucket_name)
if user_name != '':
prefix = '{0}/{1}'.format(user_name, cluster_name)
list_files = bucket.list_blobs(prefix=prefix)
for item in list_files:
print("Deleting:{}".format(item.name))
blob = bucket.blob(item.name)
blob.delete()
except Exception as err:
logging.info(
"Unable to remove files from bucket: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to remove files from bucket",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_disk(self, instance_name, zone, size, secondary_image_name):
try:
if secondary_image_name == 'None':
params = {"sizeGb": size, "name": instance_name + '-secondary',
"type": "projects/{0}/zones/{1}/diskTypes/pd-ssd".format(self.project, zone)}
else:
params = {"sizeGb": size, "name": instance_name + '-secondary',
"type": "projects/{0}/zones/{1}/diskTypes/pd-ssd".format(self.project, zone),
"sourceImage": secondary_image_name}
request = self.service.disks().insert(project=self.project, zone=zone, body=params)
result = request.execute()
datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], zone=zone)
print('Disk {}-secondary created.'.format(instance_name))
return request
except Exception as err:
logging.info(
"Unable to create disk: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create disk",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def remove_disk(self, instance_name, zone):
try:
request = self.service.disks().delete(project=self.project, zone=zone, disk=instance_name + '-secondary')
try:
result = request.execute()
datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], zone=zone)
print('Disk {}-secondary removed.'.format(instance_name))
except errors.HttpError as err:
if err.resp.status == 404:
print('Disk {}-secondary was not found. Skipped'.format(instance_name))
return request
else:
raise err
return request
except Exception as err:
logging.info(
"Unable to remove disk: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to remove disk",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_instance(self, instance_name, service_base_name, cluster_name, region, zone, vpc_name, subnet_name, instance_size,
ssh_key_path,
initial_user, image_name, secondary_image_name, service_account_name, instance_class,
network_tag, labels, static_ip='',
primary_disk_size='12', secondary_disk_size='30',
gpu_accelerator_type='None'):
key = RSA.importKey(open(ssh_key_path, 'rb').read())
ssh_key = key.publickey().exportKey("OpenSSH").decode('UTF-8')
unique_index = datalab.meta_lib.GCPMeta().get_index_by_service_account_name(service_account_name)
service_account_email = "{}-{}@{}.iam.gserviceaccount.com".format(service_base_name, unique_index, self.project)
access_configs = ''
if instance_class == 'edge':
ip_forward = True
else:
ip_forward = False
if instance_class == 'ssn' or instance_class == 'edge':
access_configs = [{
"type": "ONE_TO_ONE_NAT",
"name": "External NAT",
"natIP": static_ip
}]
if instance_class == 'notebook':
GCPActions().create_disk(instance_name, zone, secondary_disk_size, secondary_image_name)
disks = [
{
"name": instance_name,
"tag_name": instance_name + '-volume-primary',
"deviceName": instance_name + '-primary',
"autoDelete": "true",
"boot": "true",
"mode": "READ_WRITE",
"type": "PERSISTENT",
"initializeParams": {
"diskSizeGb": primary_disk_size,
"sourceImage": image_name
}
},
{
"name": instance_name + '-secondary',
"tag_name": instance_name + '-volume-secondary',
"deviceName": instance_name + '-secondary',
"autoDelete": "true",
"boot": "false",
"mode": "READ_WRITE",
"type": "PERSISTENT",
"interface": "SCSI",
"source": "projects/{0}/zones/{1}/disks/{2}-secondary".format(self.project,
zone,
instance_name)
}
]
elif instance_class == 'dataengine':
GCPActions().create_disk(instance_name, zone, secondary_disk_size, secondary_image_name)
disks = [{
"name": instance_name,
"tag_name": cluster_name + '-volume-primary',
"deviceName": cluster_name + '-primary',
"autoDelete": 'true',
"initializeParams": {
"diskSizeGb": primary_disk_size,
"sourceImage": image_name
},
"boot": 'true',
"mode": "READ_WRITE"
},
{
"name": instance_name + '-secondary',
"tag_name": instance_name + '-volume-secondary',
"deviceName": instance_name + '-secondary',
"autoDelete": "true",
"boot": "false",
"mode": "READ_WRITE",
"type": "PERSISTENT",
"interface": "SCSI",
"source": "projects/{0}/zones/{1}/disks/{2}-secondary".format(self.project,
zone,
instance_name)
}
]
else:
disks = [{
"name": instance_name,
"tag_name": instance_name + '-volume-primary',
"deviceName": instance_name + '-primary',
"autoDelete": 'true',
"initializeParams": {
"diskSizeGb": primary_disk_size,
"sourceImage": image_name
},
"boot": 'true',
"mode": "READ_WRITE"
}]
instance_params = {
"name": instance_name,
"machineType": "zones/{}/machineTypes/{}".format(zone, instance_size),
"labels": labels,
"canIpForward": ip_forward,
"networkInterfaces": [
{
"network": "global/networks/{}".format(vpc_name),
"subnetwork": "regions/{}/subnetworks/{}".format(region, subnet_name),
"accessConfigs": access_configs
},
],
"metadata":
{"items": [
{
"key": "ssh-keys",
"value": "{}:{}".format(initial_user, ssh_key)
}
]
},
"disks": disks,
"serviceAccounts": [
{
"email": service_account_email,
"scopes": ["https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/compute"]
}
]
}
if instance_class == 'notebook' or instance_class == 'dataengine':
del instance_params['networkInterfaces'][0]['accessConfigs']
if gpu_accelerator_type != 'None':
request = self.service.acceleratorTypes().list(project=self.project, zone = zone)
result = request.execute().get('items')
gpu_accelerator_type = result[0].get('name')
instance_params['guestAccelerators'] = [
{
"acceleratorCount": 1,
"acceleratorType": "projects/{0}/zones/{1}/acceleratorTypes/{2}".format(
self.project, zone, gpu_accelerator_type)
}
]
instance_params['scheduling'] = {
"onHostMaintenance": "terminate",
"automaticRestart": "true"
}
request = self.service.instances().insert(project=self.project, zone=zone,
body=instance_params)
try:
result = request.execute()
datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], zone=zone)
print('Instance {} created.'.format(instance_name))
request = self.service.instances().get(instance=instance_name, project=self.project,
zone=zone)
res = request.execute()
if 'ssn' in network_tag:
instance_tag = {"items": [network_tag, "datalab", "ssn"], "fingerprint": res['tags']['fingerprint']}
elif 'edge' in network_tag:
instance_tag = {"items": [network_tag, "datalab", "edge"], "fingerprint": res['tags']['fingerprint']}
else:
instance_tag = {"items": [network_tag, "datalab"], "fingerprint": res['tags']['fingerprint']}
request = self.service.instances().setTags(instance=instance_name, project=self.project,
zone=zone,
body=instance_tag)
GCPActions().set_disks_tag(disks, zone, labels)
request.execute()
return result
except Exception as err:
logging.info(
"Unable to create Instance: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to create Instance",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def set_disks_tag(self, disks, zone, labels):
try:
for disk in disks:
labels['name'] = disk['tag_name']
request = self.service.disks().get(disk=disk['name'], project=self.project,
zone=zone)
finger_print = request.execute()['labelFingerprint']
label = {
"labels": labels,
"labelFingerprint": finger_print
}
request = self.service.disks().setLabels(resource=disk['name'],
project=self.project,
zone=zone,
body=label)
request.execute()
except Exception as err:
logging.info(
"Unable to create add tags: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to add tags",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def remove_instance(self, instance_name, zone):
request = self.service.instances().delete(project=self.project, zone=zone,
instance=instance_name)
try:
result = request.execute()
datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], zone=zone)
print('Instance {} removed.'.format(instance_name))
return result
except Exception as err:
logging.info(
"Unable to remove Instance: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to remove Instance",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def stop_instance(self, instance_name, zone):
request = self.service.instances().stop(project=self.project, zone=zone, instance=instance_name)
try:
result = request.execute()
datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], zone=zone)
return True
except Exception as err:
logging.info(
"Unable to stop Instance: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to stop Instance",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def start_instance(self, instance_name, zone):
request = self.service.instances().start(project=self.project, zone=zone, instance=instance_name)
try:
result = request.execute()
datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], zone=zone)
return True
except Exception as err:
logging.info(
"Unable to start Instance: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to start Instance",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def remove_service_account(self, service_account_name, service_base_name):
unique_index = datalab.meta_lib.GCPMeta().get_index_by_service_account_name(service_account_name)
service_account_email = "{}-{}@{}.iam.gserviceaccount.com".format(service_base_name, unique_index, self.project)
request = self.service_iam.projects().serviceAccounts().delete(
name='projects/{}/serviceAccounts/{}'.format(self.project, service_account_email))
try:
result = request.execute()
service_account_removed = datalab.meta_lib.GCPMeta().get_service_account(service_account_name, service_base_name)
while service_account_removed:
time.sleep(5)
service_account_removed = datalab.meta_lib.GCPMeta().get_service_account(service_account_name, service_base_name)
time.sleep(30)
print('Service account {} removed.'.format(service_account_name))
return result
except Exception as err:
logging.info(
"Unable to remove Service account: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to remove Service account",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_service_account(self, service_account_name, service_base_name, unique_index):
service_account_id = service_base_name + '-' + unique_index
print("Creating service account with accountID:" + service_account_id)
params = {"accountId": service_account_id, "serviceAccount": {"displayName": service_account_name}}
request = self.service_iam.projects().serviceAccounts().create(name='projects/{}'.format(self.project),
body=params)
try:
result = request.execute()
service_account_created = datalab.meta_lib.GCPMeta().get_service_account(service_account_name, service_base_name)
while not service_account_created:
time.sleep(5)
service_account_created = datalab.meta_lib.GCPMeta().get_service_account(service_account_name, service_base_name)
time.sleep(30)
print('Service account {} created.'.format(service_account_name))
return result
except Exception as err:
logging.info(
"Unable to create Service account: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to create Service account",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def set_role_to_service_account(self, service_account_name, role_name, service_base_name, role_type='custom',
num=0):
num += 1
request = GCPActions().service_resource.projects().getIamPolicy(resource=self.project, body={})
project_policy = request.execute()
unique_index = datalab.meta_lib.GCPMeta().get_index_by_service_account_name(service_account_name)
service_account_email = "{}-{}@{}.iam.gserviceaccount.com".format(service_base_name, unique_index, self.project)
params = {
"role": "projects/{}/roles/{}".format(self.project, role_name.replace('-', '_')),
"members": [
"serviceAccount:{}".format(service_account_email)
]
}
if role_type == 'predefined':
params['role'] = "roles/{}".format(role_name)
project_policy['bindings'].append(params)
params = {
"policy": {
"bindings": project_policy['bindings']
}
}
request = self.service_resource.projects().setIamPolicy(resource=self.project, body=params)
try:
return request.execute()
except Exception as err:
if "There were concurrent policy changes. " \
"Please retry the whole read-modify-write with exponential backoff." in str(err) and num <= 10:
time.sleep(random.randint(5, 20))
self.set_role_to_service_account(service_base_name, role_name, service_base_name, role_type, num)
logging.info(
"Unable to set Service account policy: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to set Service account policy",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_role(self, role_name, permissions):
request = self.service_iam.projects().roles().create(parent="projects/{}".format(self.project),
body=
{
"roleId": role_name.replace('-', '_'),
"role": {
"title": role_name,
"includedPermissions": permissions
}})
try:
result = request.execute()
role_created = datalab.meta_lib.GCPMeta().get_role(role_name)
while not role_created:
time.sleep(5)
role_created = datalab.meta_lib.GCPMeta().get_role(role_name)
time.sleep(30)
print('IAM role {} created.'.format(role_name))
return result
except Exception as err:
logging.info(
"Unable to create IAM role: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to create IAM role",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def undelete_role(self, role_name):
request = self.service_iam.projects().roles().undelete(
name='projects/{}/roles/{}'.format(self.project, role_name.replace('-', '_')),
body=
{ })
try:
result = request.execute()
role = datalab.meta_lib.GCPMeta().get_role(role_name)
if 'deleted' in role:
role_removed = True
else:
role_removed = False
while role_removed:
time.sleep(5)
role = datalab.meta_lib.GCPMeta().get_role(role_name)
if 'deleted' in role:
role_removed = True
time.sleep(30)
print('IAM role {} restored.'.format(role_name))
return result
except Exception as err:
logging.info(
"Unable to restore IAM role: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to restore IAM role",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def remove_role(self, role_name):
request = self.service_iam.projects().roles().delete(
name='projects/{}/roles/{}'.format(self.project, role_name.replace('-', '_')))
try:
result = request.execute()
role = datalab.meta_lib.GCPMeta().get_role(role_name)
if 'deleted' in role:
role_removed = True
else:
role_removed = False
while not role_removed:
time.sleep(5)
role = datalab.meta_lib.GCPMeta().get_role(role_name)
if 'deleted' in role:
role_removed = True
time.sleep(30)
print('IAM role {} removed.'.format(role_name))
return result
except Exception as err:
logging.info(
"Unable to remove IAM role: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to remove IAM role",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def set_label_for_instance(self, zone, instance_name, key, value):
try:
instance_params = self.service.instances().get(project=self.project, zone=zone,
instance=instance_name).execute()
label_fingerprint = instance_params.get('labelFingerprint')
self.service.instances().setLabels(project=self.project, zone=zone, instance=instance_name, body={
"labels":
{
key: value
},
"labelFingerprint": label_fingerprint
}).execute()
except Exception as err:
logging.info(
"Unable to set label to instance: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to set label to instance",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def set_service_account_to_instance(self, service_account_name, instance_name, service_base_name):
unique_index = datalab.meta_lib.GCPMeta().get_index_by_service_account_name(service_account_name)
service_account_email = "{}-{}@{}.iam.gserviceaccount.com".format(service_base_name, unique_index, self.project)
params = {
"email": service_account_email
}
request = self.service.instances().setServiceAccount(
project=self.project, zone=os.environ['gcp_zone'], instance=instance_name, body=params)
try:
return request.execute()
except Exception as err:
logging.info(
"Unable to set Service account to instance: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to set Service account to instance",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_static_address(self, address_name, region):
params = {"name": address_name}
request = self.service.addresses().insert(project=self.project, region=region, body=params)
try:
result = request.execute()
datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], region=region)
print('Static address {} created.'.format(address_name))
return result
except Exception as err:
logging.info(
"Unable to create Static IP address: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to create Static IP address",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def remove_static_address(self, address_name, region):
request = self.service.addresses().delete(project=self.project, region=region, address=address_name)
try:
result = request.execute()
datalab.meta_lib.GCPMeta().wait_for_operation(result['name'], region=region)
print('Static address {} removed.'.format(address_name))
return result
except Exception as err:
logging.info(
"Unable to remove Static IP address: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to remove Static IP address",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_image_from_instance_disks(self, primary_image_name, secondary_image_name, instance_name, zone, labels):
primary_disk_name = "projects/{0}/zones/{1}/disks/{2}".format(self.project, zone, instance_name)
secondary_disk_name = "projects/{0}/zones/{1}/disks/{2}-secondary".format(self.project, zone, instance_name)
labels.update({"name": primary_image_name})
primary_params = {"name": primary_image_name, "sourceDisk": primary_disk_name, "labels": labels}
primary_request = self.service.images().insert(project=self.project, body=primary_params)
labels.update({"name": secondary_image_name})
secondary_params = {"name": secondary_image_name, "sourceDisk": secondary_disk_name, "labels": labels}
secondary_request = self.service.images().insert(project=self.project, body=secondary_params)
id_list=[]
try:
GCPActions().stop_instance(instance_name, zone)
primary_image_check = datalab.meta_lib.GCPMeta().get_image_by_name(primary_image_name)
if primary_image_check != '':
GCPActions().start_instance(instance_name, zone)
return ''
primary_result = primary_request.execute()
secondary_result = secondary_request.execute()
datalab.meta_lib.GCPMeta().wait_for_operation(primary_result['name'])
print('Image {} has been created.'.format(primary_image_name))
id_list.append(primary_result.get('id'))
datalab.meta_lib.GCPMeta().wait_for_operation(secondary_result['name'])
print('Image {} has been created.'.format(secondary_image_name))
id_list.append(secondary_result.get('id'))
GCPActions().start_instance(instance_name, zone)
return id_list
except Exception as err:
logging.info(
"Unable to create image from disks: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to create images from disks",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
return id_list
def remove_image(self, image_name):
try:
request = self.service.images().delete(project=self.project, image=image_name)
try:
result = request.execute()
datalab.meta_lib.GCPMeta().wait_for_operation(result['name'])
print('Image {} was removed.'.format(image_name))
except errors.HttpError as err:
if err.resp.status == 404:
print('Image {} was not found. Skipped'.format(image_name))
return request
else:
raise err
return request
except Exception as err:
logging.info(
"Unable to remove disk: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to remove disk",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def put_to_bucket(self, bucket_name, local_file, dest_file):
try:
bucket = self.storage_client.get_bucket(bucket_name)
blob = bucket.blob(dest_file)
blob.upload_from_filename(local_file)
return True
except:
return False
def get_from_bucket(self, bucket_name, dest_file, local_file):
try:
bucket = self.storage_client.get_bucket(bucket_name)
blob = bucket.blob(dest_file)
if blob.exists():
blob.download_to_filename(local_file)
return True
else:
return False
except exceptions.NotFound:
return False
def set_bucket_owner(self, bucket_name, service_account_name, service_base_name):
try:
unique_index = datalab.meta_lib.GCPMeta().get_index_by_service_account_name(service_account_name)
service_account_email = "{}-{}@{}.iam.gserviceaccount.com".format(service_base_name, unique_index,
self.project)
bucket = self.storage_client.get_bucket(bucket_name)
# setting bucket owner
acl = bucket.acl
acl.user(service_account_email).grant_owner()
acl.save()
# setting default ACL for bucket
self.service_storage.defaultObjectAccessControls().insert(bucket=bucket_name, body={
"entity": "user-{}".format(service_account_email),
"role": "OWNER"
}).execute()
# setting new default ACL for all objects in bucket
default_acl = self.service_storage.defaultObjectAccessControls().list(
bucket=bucket_name).execute().get('items')
objects = bucket.list_blobs()
for bucket_object in objects:
object_params = bucket.get_blob(bucket_object.name)
for acl in default_acl:
if acl.get('role') == 'OWNER' and acl.get('entity')[:5] == 'user-':
object_params.acl.user(acl.get('entity')[5:]).grant_owner()
object_params.acl.save()
except Exception as err:
logging.info(
"Unable to modify bucket ACL: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to modify bucket ACL",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def get_gitlab_cert(self, bucket_name, certfile):
try:
bucket = self.storage_client.get_bucket(bucket_name)
blob = bucket.blob(certfile)
if blob.exists():
blob.download_to_filename(certfile)
return True
else:
return False
except exceptions.NotFound:
return False
def create_dataproc_cluster(self, cluster_name, region, params):
request = self.dataproc.projects().regions().clusters().create(projectId=self.project, region=region, body=params)
try:
result = request.execute()
time.sleep(5)
cluster_status = datalab.meta_lib.GCPMeta().get_list_cluster_statuses([cluster_name])
while cluster_status[0]['status'] != 'running':
time.sleep(5)
print('The cluster is being created... Please wait')
cluster_status = datalab.meta_lib.GCPMeta().get_list_cluster_statuses([cluster_name])
if cluster_status[0]['status'] == 'terminated':
raise Exception
return result
except Exception as err:
logging.info(
"Unable to create dataproc cluster: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to create dataproc cluster",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
return ''
def set_cluster_volume_tag(self, clusteName, region, zone):
try:
print('Setting volume tags')
print(clusteName + ':' + region + ':' + zone)
result = self.dataproc.projects().regions().clusters().list(
projectId=self.project,
region=region).execute()
clusters = result.get('clusters')
dataproc_instances = []
labels = ''
for cluster in clusters:
if cluster['clusterName'] == clusteName:
print(cluster)
labels = cluster.get('labels')
master_instances = cluster.get('config').get('masterConfig').get('instanceNames')
slave_instances = cluster.get('config').get('workerConfig').get('instanceNames')
for instance in master_instances:
param = {}
param['name'] = instance
param['tag_name'] = clusteName + '-volume-primary'
dataproc_instances.append(param)
for instance in slave_instances:
param = {}
param['name'] = instance
param['tag_name'] = clusteName + '-volume-primary'
dataproc_instances.append(param)
GCPActions().set_disks_tag(dataproc_instances, zone, labels)
except Exception as err:
logging.info(
"Unable to tag volume dataproc cluster: " + str(
err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to tag volume dataproc cluster",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
return ''
def delete_dataproc_cluster(self, cluster_name, region):
request = self.dataproc.projects().regions().clusters().delete(projectId=self.project, region=region, clusterName=cluster_name)
try:
result = request.execute()
cluster_status = datalab.meta_lib.GCPMeta().get_list_cluster_statuses([cluster_name])
while cluster_status[0]['status'] != 'terminated':
time.sleep(5)
print('The cluster is being terminated... Please wait')
cluster_status = datalab.meta_lib.GCPMeta().get_list_cluster_statuses([cluster_name])
GCPActions().delete_dataproc_jobs(cluster_name)
return result
except Exception as err:
logging.info(
"Unable to delete dataproc cluster: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to delete dataproc cluster",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
return ''
def update_dataproc_cluster(self, cluster_name, cluster_labels):
body = {"labels": cluster_labels}
request = self.dataproc.projects().regions().clusters().patch(projectId=self.project,
region=os.environ['gcp_region'],
clusterName=cluster_name,
updateMask='labels',
body=body)
try:
result = request.execute()
time.sleep(15)
GCPActions().set_cluster_volume_tag(cluster_name,
os.environ['gcp_region'],
os.environ['gcp_zone'])
return result
except Exception as err:
logging.info(
"Unable to update dataproc cluster: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to update dataproc cluster",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
return ''
def submit_dataproc_job(self, job_body):
request = self.dataproc.projects().regions().jobs().submit(projectId=self.project,
region=os.environ['gcp_region'],
body=job_body)
try:
res = request.execute()
print("Job ID: {}".format(res['reference']['jobId']))
job_status = datalab.meta_lib.GCPMeta().get_dataproc_job_status(res['reference']['jobId'])
while job_status != 'done':
time.sleep(5)
job_status = datalab.meta_lib.GCPMeta().get_dataproc_job_status(res['reference']['jobId'])
if job_status in ('failed', 'error'):
raise Exception
return job_status
except Exception as err:
logging.info(
"Unable to submit dataproc job: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to submit dataproc job",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
sys.exit(1)
def delete_dataproc_jobs(self, cluster_filter):
try:
jobs = datalab.meta_lib.GCPMeta().get_dataproc_jobs()
cluster_jobs_ids = [job['reference']['jobId'] for job in jobs
if cluster_filter in job['placement']['clusterName']]
for job_id in list(set(cluster_jobs_ids)):
print('The cluster jobs is being deleted... Please wait')
try:
req = self.dataproc.projects().regions().jobs().delete(projectId=self.project,
region=os.environ['gcp_region'],
jobId=job_id)
req.execute()
except errors.HttpError as err:
if err.resp.status == 404:
print('Job with ID: {} have not been found.'.format(job_id))
except Exception as err:
logging.info(
"Unable to delete dataproc jobs: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to delete dataproc jobs",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
sys.exit(1)
def get_cluster_app_version(self, bucket, user_name, cluster_name, application):
try:
version_file = '{0}/{1}/{2}_version'.format(user_name, cluster_name, application)
if GCPActions().get_from_bucket(bucket, version_file, '/tmp/{}_version'.format(application)):
with open('/tmp/{}_version'.format(application)) as f:
version = f.read()
return version[0:5]
else:
raise Exception
except Exception as err:
logging.info(
"Unable to get software version: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to get software version",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
return ''
def jars(self, args, dataproc_dir):
print("Downloading jars...")
GCPActions().get_from_bucket(args.bucket, 'jars/{0}/jars.tar.gz'.format(args.dataproc_version), '/tmp/jars.tar.gz')
GCPActions().get_from_bucket(args.bucket, 'jars/{0}/jars-checksum.chk'.format(args.dataproc_version), '/tmp/jars-checksum.chk')
if 'WARNING' in subprocess.run('md5sum -c /tmp/jars-checksum.chk', capture_output=True, shell=True, check=True).stdout.decode('UTF-8'):
subprocess.run('rm -f /tmp/jars.tar.gz', shell=True, check=True)
GCPActions().get_from_bucket(args.bucket, 'jars/{0}/jars.tar.gz'.format(args.cluster_name), '/tmp/jars.tar.gz')
if 'WARNING' in subprocess.run('md5sum -c /tmp/jars-checksum.chk', capture_output=True, shell=True, check=True).stdout.decode('UTF-8'):
print("The checksum of jars.tar.gz is mismatched. It could be caused by gcp network issue.")
sys.exit(1)
subprocess.run('tar -zhxvf /tmp/jars.tar.gz -C {}'.format(dataproc_dir), shell=True, check=True)
def yarn(self, args, yarn_dir):
print("Downloading yarn configuration...")
bucket = self.storage_client.get_bucket(args.bucket)
list_files = bucket.list_blobs(prefix='{0}/{1}/config/'.format(args.user_name, args.cluster_name))
subprocess.run('mkdir -p /tmp/{0}/{1}/config/'.format(args.user_name, args.cluster_name), shell=True, check=True)
for item in list_files:
local_file = '/tmp/{0}/{1}/config/{2}'.format(args.user_name, args.cluster_name, item.name.split("/")[-1:][0])
GCPActions().get_from_bucket(args.bucket, item.name, local_file)
subprocess.run('sudo mv /tmp/{0}/{1}/config/* {2}'.format(args.user_name, args.cluster_name, yarn_dir), shell=True, check=True)
subprocess.run('sudo rm -rf /tmp/{}'.format(args.user_name), shell=True, check=True)
def install_dataproc_spark(self, args):
print("Installing spark...")
GCPActions().get_from_bucket(args.bucket, '{0}/{1}/spark.tar.gz'.format(args.user_name, args.cluster_name), '/tmp/spark.tar.gz')
GCPActions().get_from_bucket(args.bucket, '{0}/{1}/spark-checksum.chk'.format(args.user_name, args.cluster_name), '/tmp/spark-checksum.chk')
if 'WARNING' in subprocess.run('md5sum -c /tmp/spark-checksum.chk', capture_output=True, shell=True, check=True).stdout.decode('UTF-8'):
subprocess.run('rm -f /tmp/spark.tar.gz', shell=True, check=True)
GCPActions().get_from_bucket(args.bucket, '{0}/{1}/spark.tar.gz'.format(args.user_name, args.cluster_name), '/tmp/spark.tar.gz')
if 'WARNING' in subprocess.run('md5sum -c /tmp/spark-checksum.chk', capture_output=True, shell=True, check=True).stdout.decode('UTF-8'):
print("The checksum of spark.tar.gz is mismatched. It could be caused by gcp network issue.")
sys.exit(1)
subprocess.run('sudo tar -zhxvf /tmp/spark.tar.gz -C /opt/{0}/{1}/'.format(args.dataproc_version, args.cluster_name), shell=True, check=True)
def spark_defaults(self, args):
spark_def_path = '/opt/{0}/{1}/spark/conf/spark-env.sh'.format(args.dataproc_version, args.cluster_name)
subprocess.run(""" sudo bash -c " sed -i '/#/d' {}" """.format(spark_def_path), shell=True, check=True)
subprocess.run(""" sudo bash -c " sed -i '/^\s*$/d' {}" """.format(spark_def_path), shell=True, check=True)
subprocess.run(""" sudo bash -c " sed -i 's|/usr/lib/hadoop|/opt/{0}/jars/usr/lib/hadoop|g' {1}" """.format(args.dataproc_version, spark_def_path), shell=True, check=True)
subprocess.run(""" sudo bash -c " sed -i 's|/etc/hadoop/conf|/opt/{0}/{1}/conf|g' {2}" """.format(args.dataproc_version, args.cluster_name, spark_def_path), shell=True, check=True)
subprocess.run(""" sudo bash -l -c " sed -i '/\$HADOOP_HOME\/\*/a SPARK_DIST_CLASSPATH=\\"\$SPARK_DIST_CLASSPATH:\$HADOOP_HOME\/client\/*\\"' {}" """.format(spark_def_path), shell=True, check=True)
subprocess.run(""" sudo bash -l -c " sed -i '/\$HADOOP_YARN_HOME\/\*/a SPARK_DIST_CLASSPATH=\\"\$SPARK_DIST_CLASSPATH:\/opt\/jars\/\*\\"' {}" """.format(spark_def_path), shell=True, check=True)
subprocess.run(""" sudo bash -c " sed -i 's|/hadoop/spark/work|/tmp/hadoop/spark/work|g' {}" """.format(spark_def_path), shell=True, check=True)
subprocess.run(""" sudo bash -c " sed -i 's|/hadoop/spark/tmp|/tmp/hadoop/spark/tmp|g' {}" """.format(spark_def_path), shell=True, check=True)
subprocess.run(""" sudo bash -c " sed -i 's/STANDALONE_SPARK_MASTER_HOST.*/STANDALONE_SPARK_MASTER_HOST={0}-m/g' {1}" """.format(args.cluster_name, spark_def_path), shell=True, check=True)
subprocess.run(""" sudo bash -c " sed -i 's|/hadoop_gcs_connector_metadata_cache|/tmp/hadoop_gcs_connector_metadata_cache|g' /opt/{0}/{1}/conf/core-site.xml" """.format(args.dataproc_version, args.cluster_name), shell=True, check=True)
def remove_kernels(self, notebook_name, dataproc_name, dataproc_version, ssh_user, key_path, computational_name):
try:
notebook_ip = datalab.meta_lib.GCPMeta().get_private_ip_address(notebook_name)
global con
con = datalab.fab.init_datalab_connection(notebook_ip, ssh_user, key_path)
con.sudo('rm -rf /home/{}/.local/share/jupyter/kernels/*_{}'.format(ssh_user, dataproc_name))
if exists(con, '/home/{}/.ensure_dir/dataengine-service_{}_interpreter_ensured'.format(ssh_user, dataproc_name)):
if os.environ['notebook_multiple_clusters'] == 'true':
try:
livy_port = con.sudo("cat /opt/" + dataproc_version + "/" + dataproc_name
+ "/livy/conf/livy.conf | grep livy.server.port | tail -n 1 | awk '{printf $3}'").stdout.replace('\n','')
process_number = con.sudo("netstat -natp 2>/dev/null | grep ':" + livy_port +
"' | awk '{print $7}' | sed 's|/.*||g'").stdout.replace('\n','')
con.sudo('kill -9 ' + process_number)
con.sudo('systemctl disable livy-server-' + livy_port)
except:
print("Wasn't able to find Livy server for this EMR!")
con.sudo('sed -i \"s/^export SPARK_HOME.*/export SPARK_HOME=\/opt\/spark/\" /opt/zeppelin/conf/zeppelin-env.sh')
con.sudo("rm -rf /home/{}/.ensure_dir/dataengine-service_interpreter_ensure".format(ssh_user))
zeppelin_url = 'http://' + notebook_ip + ':8080/api/interpreter/setting/'
opener = urllib3.build_opener(urllib3.ProxyHandler({}))
req = opener.open(urllib3.Request(zeppelin_url))
r_text = req.read()
interpreter_json = json.loads(r_text)
interpreter_prefix = dataproc_name
for interpreter in interpreter_json['body']:
if interpreter_prefix in interpreter['name']:
print("Interpreter with ID: {} and name: {} will be removed from zeppelin!".format(
interpreter['id'], interpreter['name']))
request = urllib3.Request(zeppelin_url + interpreter['id'], data='')
request.get_method = lambda: 'DELETE'
url = opener.open(request)
print(url.read())
con.sudo('chown {0}:{0} -R /opt/zeppelin/'.format(ssh_user))
con.sudo('systemctl restart zeppelin-notebook.service')
zeppelin_restarted = False
while not zeppelin_restarted:
con.sudo('sleep 5')
result = con.sudo('nmap -p 8080 localhost | grep "closed" > /dev/null; echo $?').stdout.replace('\n','')
result = result[:1]
if result == '1':
zeppelin_restarted = True
con.sudo('sleep 5')
con.sudo('rm -rf /home/{}/.ensure_dir/dataengine-service_{}_interpreter_ensured'.format(ssh_user, dataproc_name))
if exists(con, '/home/{}/.ensure_dir/rstudio_dataengine-service_ensured'.format(ssh_user)):
datalab.fab.remove_rstudio_dataengines_kernel(computational_name, ssh_user)
con.sudo('rm -rf /opt/{0}/{1}/'.format(dataproc_version, dataproc_name))
print("Notebook's {} kernels were removed".format(notebook_ip))
except Exception as err:
logging.info(
"Unable to delete dataproc kernels from notebook: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to delete dataproc kernels from notebook",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
return ''
def configure_zeppelin_dataproc_interpreter(self, dataproc_version, cluster_name, spark_dir,
os_user, yarn_dir, bucket, user_name, multiple_clusters):
try:
port_number_found = False
zeppelin_restarted = False
default_port = 8998
GCPActions().get_cluster_app_version(bucket, user_name, cluster_name, 'python')
with open('/tmp/python_version') as f:
python_version = f.read()
python_version = python_version[0:5]
livy_port = ''
livy_path = '/opt/{0}/{1}/livy/'.format(dataproc_version, cluster_name)
subprocess.run('echo \"Configuring dataproc path for Zeppelin\"', shell=True, check=True)
subprocess.run('sed -i \"s/^export SPARK_HOME.*/export SPARK_HOME=\/opt\/{0}\/{1}\/spark/\" /opt/zeppelin/conf/zeppelin-env.sh'
.format(dataproc_version, cluster_name), shell=True, check=True)
subprocess.run('sed -i \"s/^export HADOOP_CONF_DIR.*/export HADOOP_CONF_DIR=\/opt\/{0}\/{1}\/conf/\" /opt/{0}/{1}/spark/conf/spark-env.sh'
.format(dataproc_version, cluster_name), shell=True, check=True)
subprocess.run('sed -i "/spark.executorEnv.PYTHONPATH/d" /opt/{0}/{1}/spark/conf/spark-defaults.conf'.format(dataproc_version, cluster_name), shell=True, check=True)
subprocess.run('sed -i "/spark.yarn.dist.files/d" /opt/{0}/{1}/spark/conf/spark-defaults.conf'.format(dataproc_version, cluster_name), shell=True, check=True)
subprocess.run('sudo chown {0}:{0} -R /opt/zeppelin/'.format(os_user), shell=True, check=True)
subprocess.run('sudo systemctl restart zeppelin-notebook.service', shell=True, check=True)
while not zeppelin_restarted:
subprocess.run('sleep 5', shell=True, check=True)
result = subprocess.run('sudo bash -c "nmap -p 8080 localhost | grep closed > /dev/null" ; echo $?', capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip("\n\r")
result = result[:1]
if result == '1':
zeppelin_restarted = True
subprocess.run('sleep 5', shell=True, check=True)
subprocess.run('echo \"Configuring dataproc spark interpreter for Zeppelin\"', shell=True, check=True)
if multiple_clusters == 'true':
while not port_number_found:
port_free = subprocess.run('sudo bash -c "nmap -p ' + str(default_port) +
' localhost | grep closed > /dev/null" ; echo $?', capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip("\n\r")
port_free = port_free[:1]
if port_free == '0':
livy_port = default_port
port_number_found = True
else:
default_port += 1
subprocess.run('sudo echo "livy.server.port = {0}" >> {1}conf/livy.conf'.format(str(livy_port), livy_path), shell=True, check=True)
subprocess.run('sudo echo "livy.spark.master = yarn" >> {}conf/livy.conf'.format(livy_path), shell=True, check=True)
if os.path.exists('{}conf/spark-blacklist.conf'.format(livy_path)):
subprocess.run('sudo sed -i "s/^/#/g" {}conf/spark-blacklist.conf'.format(livy_path), shell=True, check=True)
subprocess.run('sudo echo "export SPARK_HOME={0}" >> {1}conf/livy-env.sh'.format(spark_dir, livy_path), shell=True, check=True)
subprocess.run('sudo echo "export HADOOP_CONF_DIR={0}" >> {1}conf/livy-env.sh'.format(yarn_dir, livy_path), shell=True, check=True)
subprocess.run('sudo echo "export PYSPARK3_PYTHON=python{0}" >> {1}conf/livy-env.sh'.format(python_version[0:3], livy_path), shell=True, check=True)
template_file = "/tmp/dataengine-service_interpreter.json"
fr = open(template_file, 'r+')
text = fr.read()
text = text.replace('CLUSTER_NAME', cluster_name)
text = text.replace('SPARK_HOME', spark_dir)
text = text.replace('LIVY_PORT', str(livy_port))
fw = open(template_file, 'w')
fw.write(text)
fw.close()
for _ in range(5):
try:
subprocess.run("curl --noproxy localhost -H 'Content-Type: application/json' -X POST -d " +
"@/tmp/dataengine-service_interpreter.json http://localhost:8080/api/interpreter/setting", shell=True, check=True)
break
except:
subprocess.run('sleep 5', shell=True, check=True)
subprocess.run('sudo cp /opt/livy-server-cluster.service /etc/systemd/system/livy-server-{}.service'.format(str(livy_port)), shell=True, check=True)
subprocess.run("sudo sed -i 's|OS_USER|{0}|' /etc/systemd/system/livy-server-{1}.service".format(os_user, str(livy_port)), shell=True, check=True)
subprocess.run("sudo sed -i 's|LIVY_PATH|{0}|' /etc/systemd/system/livy-server-{1}.service".format(livy_path, str(livy_port)), shell=True, check=True)
subprocess.run('sudo chmod 644 /etc/systemd/system/livy-server-{}.service'.format(str(livy_port)), shell=True, check=True)
subprocess.run('sudo systemctl daemon-reload', shell=True, check=True)
subprocess.run('sudo systemctl enable livy-server-{}'.format(str(livy_port)), shell=True, check=True)
subprocess.run('sudo systemctl start livy-server-{}'.format(str(livy_port)), shell=True, check=True)
else:
template_file = "/tmp/dataengine-service_interpreter.json"
p_versions = ["2", "{}-dp".format(python_version[:3])]
for p_version in p_versions:
fr = open(template_file, 'r+')
text = fr.read()
text = text.replace('CLUSTERNAME', cluster_name)
text = text.replace('PYTHONVERSION', p_version)
text = text.replace('SPARK_HOME', spark_dir)
text = text.replace('PYTHONVER_SHORT', p_version[:1])
text = text.replace('DATAENGINE-SERVICE_VERSION', dataproc_version)
tmp_file = '/tmp/dataproc_spark_py{}_interpreter.json'.format(p_version)
fw = open(tmp_file, 'w')
fw.write(text)
fw.close()
for _ in range(5):
try:
subprocess.run("curl --noproxy localhost -H 'Content-Type: application/json' -X POST -d " +
"@/tmp/dataproc_spark_py{}_interpreter.json http://localhost:8080/api/interpreter/setting".format(p_version), shell=True, check=True)
break
except:
subprocess.run('sleep 5', shell=True, check=True)
subprocess.run('touch /home/{0}/.ensure_dir/dataengine-service_{1}_interpreter_ensured'.format(os_user, cluster_name), shell=True, check=True)
except:
sys.exit(1)
def install_python(self, bucket, user_name, cluster_name, application, numpy_version='1.14.3'):
try:
GCPActions().get_cluster_app_version(bucket, user_name, cluster_name, 'python')
with open('/tmp/python_version') as f:
python_version = f.read()
python_version = python_version[0:5]
if not os.path.exists('/opt/python/python{}'.format(python_version)):
subprocess.run('wget https://www.python.org/ftp/python/{0}/Python-{0}.tgz -O /tmp/Python-{0}.tgz'.format(python_version), shell=True, check=True)
subprocess.run('tar zxvf /tmp/Python-{}.tgz -C /tmp/'.format(python_version), shell=True, check=True)
subprocess.run('cd /tmp/Python-{0}; ./configure --prefix=/opt/python/python{0} --with-zlib-dir=/usr/local/lib/ --with-ensurepip=install'.format(python_version), shell=True, check=True)
subprocess.run('cd /tmp/Python-{}; sudo make altinstall'.format(python_version), shell=True, check=True)
subprocess.run('cd /tmp/; sudo rm -rf Python-{}/'.format(python_version), shell=True, check=True)
subprocess.run('sudo -i virtualenv /opt/python/python{}'.format(python_version), shell=True, check=True)
venv_command = 'source /opt/python/python{}/bin/activate'.format(python_version)
pip_command = '/opt/python/python{0}/bin/pip{1}'.format(python_version, python_version[:3])
subprocess.run('{0} && sudo -i {1} install -U pip==9.0.3'.format(venv_command, pip_command), shell=True, check=True)
subprocess.run('{0} && sudo -i {1} install pyzmq==17.0.0'.format(venv_command, pip_command), shell=True, check=True)
subprocess.run('{0} && sudo -i {1} install ipython ipykernel --no-cache-dir'.format(venv_command, pip_command), shell=True, check=True)
subprocess.run('{0} && sudo -i {1} install boto boto3 NumPy=={2} SciPy Matplotlib pandas Sympy Pillow sklearn --no-cache-dir'
.format(venv_command, pip_command, numpy_version), shell=True, check=True)
if application == 'deeplearning':
subprocess.run('{0} && sudo -i {1} install mxnet-cu80 opencv-python keras Theano --no-cache-dir'.format(venv_command, pip_command), shell=True, check=True)
python_without_dots = python_version.replace('.', '')
subprocess.run('{0} && sudo -i {1} install https://cntk.ai/PythonWheel/GPU/cntk-2.0rc3-cp{2}-cp{2}m-linux_x86_64.whl --no-cache-dir'
.format(venv_command, pip_command, python_without_dots[:2]), shell=True, check=True)
subprocess.run('sudo rm -rf /usr/bin/python{}-dp'.format(python_version[0:3]), shell=True, check=True)
subprocess.run('sudo ln -fs /opt/python/python{0}/bin/python{1} /usr/bin/python{1}-dp'.format(python_version, python_version[0:3]), shell=True, check=True)
except Exception as err:
logging.info(
"Unable to install python: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to install python",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
return ''
def ensure_local_jars(os_user, jars_dir):
if not exists(datalab.fab.conn,'/home/{}/.ensure_dir/gs_kernel_ensured'.format(os_user)):
try:
templates_dir = '/root/templates/'
datalab.fab.conn.sudo('mkdir -p {}'.format(jars_dir))
datalab.fab.conn.sudo('wget https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-{0}.jar -O {1}'
'gcs-connector-hadoop2-{0}.jar'.format(os.environ['notebook_gcs_connector_version'], jars_dir))
datalab.fab.conn.sudo('wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-yarn-server-web-proxy/2.7.4/{0} -O {1}{0}'
.format('hadoop-yarn-server-web-proxy-2.7.4.jar', jars_dir))
datalab.fab.conn.put(templates_dir + 'core-site.xml', '/tmp/core-site.xml')
datalab.fab.conn.sudo('sed -i "s|GCP_PROJECT_ID|{}|g" /tmp/core-site.xml'.format(os.environ['gcp_project_id']))
datalab.fab.conn.sudo('mv /tmp/core-site.xml /opt/spark/conf/core-site.xml')
datalab.fab.conn.put(templates_dir + 'notebook_spark-defaults_local.conf', '/tmp/notebook_spark-defaults_local.conf')
if os.environ['application'] == 'zeppelin':
datalab.fab.conn.run('echo \"spark.jars $(ls -1 ' + jars_dir + '* | tr \'\\n\' \',\')\" >> /tmp/notebook_spark-defaults_local.conf')
datalab.fab.conn.sudo('\cp /tmp/notebook_spark-defaults_local.conf /opt/spark/conf/spark-defaults.conf')
datalab.fab.conn.sudo('touch /home/{}/.ensure_dir/gs_kernel_ensured'.format(os_user))
except Exception as err:
print('Error:', str(err))
sys.exit(1)
def get_cluster_python_version(region, bucket, user_name, cluster_name):
try:
GCPActions().get_cluster_app_version(bucket, user_name, cluster_name, 'python')
except:
sys.exit(1)
def installing_python(region, bucket, user_name, cluster_name, application='', pip_mirror='', numpy_version='1.14.3'):
try:
GCPActions().install_python(bucket, user_name, cluster_name, application)
except:
sys.exit(1)
def prepare_disk(os_user):
if not exists(datalab.fab.conn,'/home/' + os_user + '/.ensure_dir/disk_ensured'):
try:
disk_name = datalab.fab.conn.sudo("lsblk | grep disk | awk '{print $1}' | sort | tail -n 1").stdout.replace('\n','')
datalab.fab.conn.sudo('''bash -c 'echo -e "o\nn\np\n1\n\n\nw" | fdisk /dev/{}' '''.format(disk_name))
datalab.fab.conn.sudo('mkfs.ext4 -F /dev/{}1'.format(disk_name))
datalab.fab.conn.sudo('mount /dev/{}1 /opt/'.format(disk_name))
datalab.fab.conn.sudo(''' bash -c "echo '/dev/{}1 /opt/ ext4 errors=remount-ro 0 1' >> /etc/fstab" '''.format(disk_name))
datalab.fab.conn.sudo('touch /home/' + os_user + '/.ensure_dir/disk_ensured')
except:
sys.exit(1)
def ensure_local_spark(os_user, spark_link, spark_version, hadoop_version, local_spark_path):
if not exists(datalab.fab.conn,'/home/' + os_user + '/.ensure_dir/local_spark_ensured'):
try:
datalab.fab.conn.sudo('wget ' + spark_link + ' -O /tmp/spark-' + spark_version + '-bin-hadoop' + hadoop_version + '.tgz')
datalab.fab.conn.sudo('tar -zxvf /tmp/spark-' + spark_version + '-bin-hadoop' + hadoop_version + '.tgz -C /opt/')
datalab.fab.conn.sudo('mv /opt/spark-' + spark_version + '-bin-hadoop' + hadoop_version + ' ' + local_spark_path)
datalab.fab.conn.sudo('chown -R ' + os_user + ':' + os_user + ' ' + local_spark_path)
datalab.fab.conn.sudo('touch /home/' + os_user + '/.ensure_dir/local_spark_ensured')
except Exception as err:
print('Error:', str(err))
sys.exit(1)
def configure_local_spark(jars_dir, templates_dir, memory_type='driver'):
try:
# Checking if spark.jars parameter was generated previously
spark_jars_paths = None
if exists(datalab.fab.conn, '/opt/spark/conf/spark-defaults.conf'):
try:
spark_jars_paths = datalab.fab.conn.sudo('cat /opt/spark/conf/spark-defaults.conf | grep -e "^spark.jars " ').stdout.replace('\n','')
except:
spark_jars_paths = None
datalab.fab.conn.put(templates_dir + 'notebook_spark-defaults_local.conf', '/tmp/notebook_spark-defaults_local.conf')
if os.environ['application'] == 'zeppelin':
datalab.fab.conn.run('echo \"spark.jars $(ls -1 ' + jars_dir + '* | tr \'\\n\' \',\')\" >> /tmp/notebook_spark-defaults_local.conf')
datalab.fab.conn.sudo('\cp -f /tmp/notebook_spark-defaults_local.conf /opt/spark/conf/spark-defaults.conf')
if memory_type == 'driver':
spark_memory = datalab.fab.get_spark_memory()
datalab.fab.conn.sudo('sed -i "/spark.*.memory/d" /opt/spark/conf/spark-defaults.conf')
datalab.fab.conn.sudo('''bash -c 'echo "spark.{0}.memory {1}m" >> /opt/spark/conf/spark-defaults.conf' '''
.format(memory_type, spark_memory))
if not exists(datalab.fab.conn,'/opt/spark/conf/spark-env.sh'):
datalab.fab.conn.sudo('mv /opt/spark/conf/spark-env.sh.template /opt/spark/conf/spark-env.sh')
java_home = datalab.fab.conn.run("update-alternatives --query java | grep -o --color=never \'/.*/java-8.*/jre\'").stdout.splitlines()[0]
datalab.fab.conn.sudo('''bash -l -c 'echo "export JAVA_HOME={}" >> /opt/spark/conf/spark-env.sh' '''.format(java_home))
if 'spark_configurations' in os.environ:
datalab_header = datalab.fab.conn.sudo('cat /tmp/notebook_spark-defaults_local.conf | grep "^#"').stdout
spark_configurations = ast.literal_eval(os.environ['spark_configurations'])
new_spark_defaults = list()
spark_defaults = datalab.fab.conn.sudo('cat /opt/spark/conf/spark-defaults.conf').stdout
current_spark_properties = spark_defaults.split('\n')
for param in current_spark_properties:
if param.split(' ')[0] != '#':
for config in spark_configurations:
if config['Classification'] == 'spark-defaults':
for property in config['Properties']:
if property == param.split(' ')[0]:
param = property + ' ' + config['Properties'][property]
else:
new_spark_defaults.append(property + ' ' + config['Properties'][property])
new_spark_defaults.append(param)
new_spark_defaults = set(new_spark_defaults)
datalab.fab.conn.sudo('''bash -c 'echo "{}" > /opt/spark/conf/spark-defaults.conf' '''.format(datalab_header))
for prop in new_spark_defaults:
datalab.fab.conn.sudo('''bash -c 'echo "{}" >> /opt/spark/conf/spark-defaults.conf' '''.format(prop))
datalab.fab.conn.sudo('sed -i "/^\s*$/d" /opt/spark/conf/spark-defaults.conf')
if spark_jars_paths:
datalab.fab.conn.sudo('''bash -c 'echo "{}" >> /opt/spark/conf/spark-defaults.conf' '''.format(spark_jars_paths))
except Exception as err:
print('Error:', str(err))
sys.exit(1)
def remove_dataengine_kernels(notebook_name, os_user, key_path, cluster_name):
try:
computational_name = os.environ['computational_name'].replace('_', '-').lower()
private = datalab.meta_lib.get_instance_private_ip_address(cluster_name, notebook_name)
global con
con = datalab.fab.init_datalab_connection(private, ssh_user, key_path)
con.sudo('rm -rf /home/{}/.local/share/jupyter/kernels/*_{}'.format(os_user, cluster_name))
if exists(con, '/home/{}/.ensure_dir/dataengine_{}_interpreter_ensured'.format(os_user, cluster_name)):
if os.environ['notebook_multiple_clusters'] == 'true':
try:
livy_port = con.sudo("cat /opt/" + cluster_name +
"/livy/conf/livy.conf | grep livy.server.port | tail -n 1 | awk '{printf $3}'").stdout.replace('\n','')
process_number = con.sudo("netstat -natp 2>/dev/null | grep ':" + livy_port +
"' | awk '{print $7}' | sed 's|/.*||g'").stdout.replace('\n','')
con.sudo('kill -9 ' + process_number)
con.sudo('systemctl disable livy-server-' + livy_port)
except:
print("Wasn't able to find Livy server for this EMR!")
con.sudo(
'sed -i \"s/^export SPARK_HOME.*/export SPARK_HOME=\/opt\/spark/\" /opt/zeppelin/conf/zeppelin-env.sh')
con.sudo("rm -rf /home/{}/.ensure_dir/dataengine_interpreter_ensure".format(os_user))
zeppelin_url = 'http://' + private + ':8080/api/interpreter/setting/'
opener = urllib3.build_opener(urllib3.ProxyHandler({}))
req = opener.open(urllib3.Request(zeppelin_url))
r_text = req.read()
interpreter_json = json.loads(r_text)
interpreter_prefix = cluster_name
for interpreter in interpreter_json['body']:
if interpreter_prefix in interpreter['name']:
print("Interpreter with ID: {} and name: {} will be removed from zeppelin!".format(
interpreter['id'], interpreter['name']))
request = urllib3.Request(zeppelin_url + interpreter['id'], data='')
request.get_method = lambda: 'DELETE'
url = opener.open(request)
print(url.read())
con.sudo('chown ' + os_user + ':' + os_user + ' -R /opt/zeppelin/')
con.sudo('systemctl daemon-reload')
con.sudo("service zeppelin-notebook stop")
con.sudo("service zeppelin-notebook start")
zeppelin_restarted = False
while not zeppelin_restarted:
con.sudo('sleep 5')
result = con.sudo('nmap -p 8080 localhost | grep "closed" > /dev/null; echo $?').stdout
result = result[:1]
if result == '1':
zeppelin_restarted = True
con.sudo('sleep 5')
con.sudo('rm -rf /home/{}/.ensure_dir/dataengine_{}_interpreter_ensured'.format(os_user, cluster_name))
if exists(con, '/home/{}/.ensure_dir/rstudio_dataengine_ensured'.format(os_user)):
datalab.fab.remove_rstudio_dataengines_kernel(computational_name, os_user)
con.sudo('rm -rf /opt/' + cluster_name + '/')
print("Notebook's {} kernels were removed".format(private))
except Exception as err:
logging.info("Unable to remove kernels on Notebook: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to remove kernels on Notebook",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def install_dataengine_spark(cluster_name, spark_link, spark_version, hadoop_version, cluster_dir, os_user, datalake_enabled):
subprocess.run('wget ' + spark_link + ' -O /tmp/' + cluster_name + '/spark-' + spark_version + '-bin-hadoop' + hadoop_version + '.tgz', shell=True, check=True)
subprocess.run('tar -zxvf /tmp/' + cluster_name + '/spark-' + spark_version + '-bin-hadoop' + hadoop_version + '.tgz -C /opt/', shell=True, check=True)
subprocess.run('mv /opt/spark-' + spark_version + '-bin-hadoop' + hadoop_version + ' ' + cluster_dir + 'spark/', shell=True, check=True)
subprocess.run('chown -R ' + os_user + ':' + os_user + ' ' + cluster_dir + 'spark/', shell=True, check=True)
def configure_dataengine_spark(cluster_name, jars_dir, cluster_dir, datalake_enabled, spark_configs=''):
subprocess.run("jar_list=`find {0} -name '*.jar' | tr '\\n' ',' | sed 's/,$//'` ; echo \"spark.jars $jar_list\" >> \
/tmp/{1}/notebook_spark-defaults_local.conf".format(jars_dir, cluster_name), shell=True, check=True)
if os.path.exists('{0}spark/conf/spark-defaults.conf'.format(cluster_dir)):
additional_spark_properties = subprocess.run('diff --changed-group-format="%>" --unchanged-group-format="" '
'/tmp/{0}/notebook_spark-defaults_local.conf '
'{1}spark/conf/spark-defaults.conf | grep -v "^#"'.format(
cluster_name, cluster_dir), capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip("\n\r")
for property in additional_spark_properties.split('\n'):
subprocess.run('echo "{0}" >> /tmp/{1}/notebook_spark-defaults_local.conf'.format(property, cluster_name), shell=True, check=True)
if os.path.exists('{0}'.format(cluster_dir)):
subprocess.run('cp -f /tmp/{0}/notebook_spark-defaults_local.conf {1}spark/conf/spark-defaults.conf'.format(cluster_name,
cluster_dir), shell=True, check=True)
subprocess.run('cp -f /opt/spark/conf/core-site.xml {}spark/conf/'.format(cluster_dir), shell=True, check=True)
if spark_configs and os.path.exists('{0}'.format(cluster_dir)):
datalab_header = subprocess.run('cat /tmp/{0}/notebook_spark-defaults_local.conf | grep "^#"'.format(cluster_name),
capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip("\n\r")
spark_configurations = ast.literal_eval(spark_configs)
new_spark_defaults = list()
spark_defaults = subprocess.run('cat {0}spark/conf/spark-defaults.conf'.format(cluster_dir), capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip("\n\r")
current_spark_properties = spark_defaults.split('\n')
for param in current_spark_properties:
if param.split(' ')[0] != '#':
for config in spark_configurations:
if config['Classification'] == 'spark-defaults':
for property in config['Properties']:
if property == param.split(' ')[0]:
param = property + ' ' + config['Properties'][property]
else:
new_spark_defaults.append(property + ' ' + config['Properties'][property])
new_spark_defaults.append(param)
new_spark_defaults = set(new_spark_defaults)
subprocess.run("echo '{0}' > {1}/spark/conf/spark-defaults.conf".format(datalab_header, cluster_dir), shell=True, check=True)
for prop in new_spark_defaults:
prop = prop.rstrip()
subprocess.run('echo "{0}" >> {1}/spark/conf/spark-defaults.conf'.format(prop, cluster_dir), shell=True, check=True)
subprocess.run('sed -i "/^\s*$/d" {0}/spark/conf/spark-defaults.conf'.format(cluster_dir), shell=True, check=True)
def find_des_jars(all_jars, des_path):
try:
# Use this method to filter cloud jars (see an example in aws method)
return all_jars
except Exception as err:
print('Error:', str(err))
sys.exit(1)