blob: ea7b26248607c9b2fe84b578d143e075268041ec [file] [log] [blame]
# *****************************************************************************
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# ******************************************************************************
from azure.common.client_factory import get_client_from_auth_file
import azure.common
from azure.mgmt.authorization import AuthorizationManagementClient
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.network import NetworkManagementClient
from azure.mgmt.storage import StorageManagementClient
from azure.storage.blob import BlockBlobService
from azure.mgmt.datalake.store import DataLakeStoreAccountManagementClient
from azure.datalake.store import core, lib
from azure.graphrbac import GraphRbacManagementClient
from azure.common.credentials import ServicePrincipalCredentials
import azure.common.exceptions as AzureExceptions
from fabric.api import *
from fabric.contrib.files import exists
import backoff
import urllib2
import meta_lib
import logging
import traceback
import sys, time
import os, json
import dlab.fab
import dlab.common_lib
import ast
class AzureActions:
def __init__(self):
os.environ['AZURE_AUTH_LOCATION'] = '/root/azure_auth.json'
self.compute_client = get_client_from_auth_file(ComputeManagementClient)
self.resource_client = get_client_from_auth_file(ResourceManagementClient)
self.network_client = get_client_from_auth_file(NetworkManagementClient)
self.storage_client = get_client_from_auth_file(StorageManagementClient)
self.datalake_client = get_client_from_auth_file(DataLakeStoreAccountManagementClient)
self.authorization_client = get_client_from_auth_file(AuthorizationManagementClient)
self.sp_creds = json.loads(open(os.environ['AZURE_AUTH_LOCATION']).read())
self.dl_filesystem_creds = lib.auth(tenant_id=json.dumps(self.sp_creds['tenantId']).replace('"', ''),
client_secret=json.dumps(self.sp_creds['clientSecret']).replace('"', ''),
client_id=json.dumps(self.sp_creds['clientId']).replace('"', ''),
resource='https://datalake.azure.net/')
def create_resource_group(self, resource_group_name, region):
try:
result = self.resource_client.resource_groups.create_or_update(
resource_group_name,
{
'location': region,
'tags': {
'Name': resource_group_name
}
}
)
return result
except Exception as err:
logging.info(
"Unable to create Resource Group: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create Resource Group",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def remove_resource_group(self, resource_group_name, region):
try:
result = self.resource_client.resource_groups.delete(
resource_group_name,
{
'location': region
}
)
return result
except Exception as err:
logging.info(
"Unable to remove Resource Group: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to remove Resource Group",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_vpc(self, resource_group_name, vpc_name, region, vpc_cidr):
try:
result = self.network_client.virtual_networks.create_or_update(
resource_group_name,
vpc_name,
{
'location': region,
'tags': {
'Name': vpc_name,
os.environ['conf_billing_tag_key']: os.environ['conf_billing_tag_value']
},
'address_space': {
'address_prefixes': [vpc_cidr]
}
}
)
return result
except Exception as err:
logging.info(
"Unable to create Virtual Network: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create Virtual Network",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_virtual_network_peerings(self, resource_group_name,
virtual_network_name,
virtual_network_peering_name,
vnet_id
):
try:
result = self.network_client.virtual_network_peerings.create_or_update(
resource_group_name,
virtual_network_name,
virtual_network_peering_name,
{
"allow_virtual_network_access": True,
"allow_forwarded_traffic": True,
"remote_virtual_network": {
"id": vnet_id
}
}
).wait(60)
return result
except Exception as err:
logging.info(
"Unable to create Virtual Network peering: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create Virtual Network peering",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def remove_vpc(self, resource_group_name, vpc_name):
try:
result = self.network_client.virtual_networks.delete(
resource_group_name,
vpc_name
)
return result
except Exception as err:
logging.info(
"Unable to remove Virtual Network: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to remove Virtual Network",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_subnet(self, resource_group_name, vpc_name, subnet_name, subnet_cidr):
try:
result = self.network_client.subnets.create_or_update(
resource_group_name,
vpc_name,
subnet_name,
{
'address_prefix': subnet_cidr
}
)
return result
except Exception as err:
logging.info(
"Unable to create Subnet: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create Subnet",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def remove_subnet(self, resource_group_name, vpc_name, subnet_name):
try:
result = self.network_client.subnets.delete(
resource_group_name,
vpc_name,
subnet_name
)
return result
except Exception as err:
logging.info(
"Unable to remove Subnet: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to remove Subnet",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_security_group(self, resource_group_name, network_security_group_name, region, tags, list_rules):
try:
result = self.network_client.network_security_groups.create_or_update(
resource_group_name,
network_security_group_name,
{
'location': region,
'tags': tags,
}
).wait()
for rule in list_rules:
self.network_client.security_rules.create_or_update(
resource_group_name,
network_security_group_name,
security_rule_name=rule['name'],
security_rule_parameters=rule
).wait()
return result
except Exception as err:
logging.info(
"Unable to create security group: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create security group",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def remove_security_group(self, resource_group_name, network_security_group_name):
try:
result = self.network_client.network_security_groups.delete(
resource_group_name,
network_security_group_name
)
return result
except Exception as err:
logging.info(
"Unable to remove security group: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to remove security group",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_datalake_store(self, resource_group_name, datalake_name, region, tags):
try:
result = self.datalake_client.account.create(
resource_group_name,
datalake_name,
{
"location": region,
"tags": tags,
"encryption_state": "Enabled",
"encryption_config": {
"type": "ServiceManaged"
}
}
).wait()
return result
except Exception as err:
logging.info(
"Unable to create Data Lake store: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create Data Lake store",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def delete_datalake_store(self, resource_group_name, datalake_name):
try:
result = self.datalake_client.account.delete(
resource_group_name,
datalake_name
).wait()
return result
except Exception as err:
logging.info(
"Unable to remove Data Lake store: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to remove Data Lake store",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_datalake_directory(self, datalake_name, dir_name):
try:
datalake_client = core.AzureDLFileSystem(self.dl_filesystem_creds, store_name=datalake_name)
result = datalake_client.mkdir(dir_name)
return result
except Exception as err:
logging.info(
"Unable to create Data Lake directory: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create Data Lake directory",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def chown_datalake_directory(self, datalake_name, dir_name, ad_user='', ad_group=''):
try:
datalake_client = core.AzureDLFileSystem(self.dl_filesystem_creds, store_name=datalake_name)
if ad_user and ad_group:
result = datalake_client.chown(dir_name, owner=ad_user, group=ad_group)
elif ad_user and not ad_group:
result = datalake_client.chown(dir_name, owner=ad_user)
elif not ad_user and ad_group:
result = datalake_client.chown(dir_name, group=ad_group)
return result
except Exception as err:
logging.info(
"Unable to chown Data Lake directory: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to chown Data Lake directory",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def chmod_datalake_directory(self, datalake_name, dir_name, mod):
try:
datalake_client = core.AzureDLFileSystem(self.dl_filesystem_creds, store_name=datalake_name)
result = datalake_client.chmod(dir_name, mod)
return result
except Exception as err:
logging.info(
"Unable to chmod Data Lake directory: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to chmod Data Lake directory",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def set_user_permissions_to_datalake_directory(self, datalake_name, dir_name, ad_user, mod='rwx'):
try:
acl_specification = 'user:{}:{}'.format(ad_user, mod)
datalake_client = core.AzureDLFileSystem(self.dl_filesystem_creds, store_name=datalake_name)
result = datalake_client.modify_acl_entries(path=dir_name, acl_spec=acl_specification)
return result
except Exception as err:
logging.info(
"Unable to set user permission to Data Lake directory: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to set user permission to Data Lake directory",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def unset_user_permissions_to_datalake_directory(self, datalake_name, dir_name, ad_user):
try:
acl_specification = 'user:{}'.format(ad_user)
datalake_client = core.AzureDLFileSystem(self.dl_filesystem_creds, store_name=datalake_name)
result = datalake_client.remove_acl_entries(path=dir_name, acl_spec=acl_specification)
return result
except Exception as err:
logging.info(
"Unable to unset user permission to Data Lake directory: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to unset user permission to Data Lake directory",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def remove_datalake_directory(self, datalake_name, dir_name):
try:
datalake_client = core.AzureDLFileSystem(self.dl_filesystem_creds, store_name=datalake_name)
result = datalake_client.rm(dir_name, recursive=True)
return result
except Exception as err:
logging.info(
"Unable to delete Data Lake directory: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to delete Data Lake directory",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_storage_account(self, resource_group_name, account_name, region, tags):
try:
result = self.storage_client.storage_accounts.create(
resource_group_name,
account_name,
{
"sku": {"name": "Standard_LRS"},
"kind": "BlobStorage",
"location": region,
"tags": tags,
"access_tier": "Hot",
"encryption": {
"services": {"blob": {"enabled": True}}
}
}
).wait()
return result
except Exception as err:
logging.info(
"Unable to create Storage account: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create Storage account",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def remove_storage_account(self, resource_group_name, account_name):
try:
result = self.storage_client.storage_accounts.delete(
resource_group_name,
account_name
)
return result
except Exception as err:
logging.info(
"Unable to remove Storage account: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to remove Storage account",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_blob_container(self, resource_group_name, account_name, container_name):
try:
secret_key = meta_lib.AzureMeta().list_storage_keys(resource_group_name, account_name)[0]
block_blob_service = BlockBlobService(account_name=account_name, account_key=secret_key)
result = block_blob_service.create_container(container_name)
return result
except Exception as err:
logging.info(
"Unable to create blob container: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create blob container",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def upload_to_container(self, resource_group_name, account_name, container_name, files):
try:
secret_key = meta_lib.AzureMeta().list_storage_keys(resource_group_name, account_name)[0]
block_blob_service = BlockBlobService(account_name=account_name, account_key=secret_key)
for filename in files:
block_blob_service.create_blob_from_path(container_name, filename, filename)
return ''
except Exception as err:
logging.info(
"Unable to upload files to container: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to upload files to container",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def download_from_container(self, resource_group_name, account_name, container_name, files):
try:
secret_key = meta_lib.AzureMeta().list_storage_keys(resource_group_name, account_name)[0]
block_blob_service = BlockBlobService(account_name=account_name, account_key=secret_key)
for filename in files:
block_blob_service.get_blob_to_path(container_name, filename, filename)
return ''
except azure.common.AzureMissingResourceHttpError:
return ''
except Exception as err:
logging.info(
"Unable to download files from container: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to download files from container",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_static_public_ip(self, resource_group_name, ip_name, region, instance_name, tags):
try:
self.network_client.public_ip_addresses.create_or_update(
resource_group_name,
ip_name,
{
"location": region,
'tags': tags,
"public_ip_allocation_method": "static",
"public_ip_address_version": "IPv4",
"dns_settings": {
"domain_name_label": "host-" + instance_name.lower()
}
}
).wait()
return meta_lib.AzureMeta().get_static_ip(resource_group_name, ip_name).ip_address
except Exception as err:
logging.info(
"Unable to create static IP address: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create static IP address",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def delete_static_public_ip(self, resource_group_name, ip_name):
try:
result = self.network_client.public_ip_addresses.delete(
resource_group_name,
ip_name
).wait()
return result
except Exception as err:
logging.info(
"Unable to delete static IP address: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to delete static IP address",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_instance(self, region, instance_size, service_base_name, instance_name, dlab_ssh_user_name, public_key,
network_interface_resource_id, resource_group_name, primary_disk_size, instance_type,
image_full_name, tags, project_name='', create_option='fromImage', disk_id='',
instance_storage_account_type='Premium_LRS', image_type='default'):
if image_type == 'pre-configured':
image_id = meta_lib.AzureMeta().get_image(resource_group_name, image_full_name).id
else:
image_name = image_full_name.split('_')
publisher = image_name[0]
offer = image_name[1]
sku = image_name[2]
try:
if instance_type == 'ssn':
parameters = {
'location': region,
'tags': tags,
'hardware_profile': {
'vm_size': instance_size
},
'storage_profile': {
'image_reference': {
'publisher': publisher,
'offer': offer,
'sku': sku,
'version': 'latest'
},
'os_disk': {
'os_type': 'Linux',
'name': '{}-ssn-volume-primary'.format(service_base_name),
'create_option': 'fromImage',
'disk_size_gb': int(primary_disk_size),
'tags': tags,
'managed_disk': {
'storage_account_type': instance_storage_account_type,
}
}
},
'os_profile': {
'computer_name': instance_name.replace('_', '-'),
'admin_username': dlab_ssh_user_name,
'linux_configuration': {
'disable_password_authentication': True,
'ssh': {
'public_keys': [{
'path': '/home/{}/.ssh/authorized_keys'.format(dlab_ssh_user_name),
'key_data': public_key
}]
}
}
},
'network_profile': {
'network_interfaces': [
{
'id': network_interface_resource_id
}
]
}
}
elif instance_type == 'edge':
if create_option == 'fromImage':
parameters = {
'location': region,
'tags': tags,
'hardware_profile': {
'vm_size': instance_size
},
'storage_profile': {
'image_reference': {
'publisher': publisher,
'offer': offer,
'sku': sku,
'version': 'latest'
},
'os_disk': {
'os_type': 'Linux',
'name': '{}-{}-{}-edge-volume-primary'.format(service_base_name, project_name,
os.environ['endpoint_name'].lower()),
'create_option': create_option,
'disk_size_gb': int(primary_disk_size),
'tags': tags,
'managed_disk': {
'storage_account_type': instance_storage_account_type
}
}
},
'os_profile': {
'computer_name': instance_name.replace('_', '-'),
'admin_username': dlab_ssh_user_name,
'linux_configuration': {
'disable_password_authentication': True,
'ssh': {
'public_keys': [{
'path': '/home/{}/.ssh/authorized_keys'.format(dlab_ssh_user_name),
'key_data': public_key
}]
}
}
},
'network_profile': {
'network_interfaces': [
{
'id': network_interface_resource_id
}
]
}
}
elif create_option == 'attach':
parameters = {
'location': region,
'tags': tags,
'hardware_profile': {
'vm_size': instance_size
},
'storage_profile': {
'os_disk': {
'os_type': 'Linux',
'name': '{}-{}-{}-edge-volume-primary'.format(service_base_name, project_name,
os.environ['endpoint_name'].lower()),
'create_option': create_option,
'disk_size_gb': int(primary_disk_size),
'tags': tags,
'managed_disk': {
'id': disk_id,
'storage_account_type': instance_storage_account_type
}
}
},
'network_profile': {
'network_interfaces': [
{
'id': network_interface_resource_id
}
]
}
}
elif instance_type == 'notebook':
if image_type == 'default':
storage_profile = {
'image_reference': {
'publisher': publisher,
'offer': offer,
'sku': sku,
'version': 'latest'
},
'os_disk': {
'os_type': 'Linux',
'name': '{}-volume-primary'.format(instance_name),
'create_option': 'fromImage',
'disk_size_gb': int(primary_disk_size),
'tags': tags,
'managed_disk': {
'storage_account_type': instance_storage_account_type
}
},
'data_disks': [
{
'lun': 1,
'name': '{}-volume-secondary'.format(instance_name),
'create_option': 'empty',
'disk_size_gb': 32,
'tags': {
'Name': '{}-volume-secondary'.format(instance_name)
},
'managed_disk': {
'storage_account_type': instance_storage_account_type
}
}
]
}
elif image_type == 'pre-configured':
storage_profile = {
'image_reference': {
'id': image_id
},
'os_disk': {
'os_type': 'Linux',
'name': '{}-volume-primary'.format(instance_name),
'create_option': 'fromImage',
'disk_size_gb': int(primary_disk_size),
'tags': tags,
'managed_disk': {
'storage_account_type': instance_storage_account_type
}
}
}
parameters = {
'location': region,
'tags': tags,
'hardware_profile': {
'vm_size': instance_size
},
'storage_profile': storage_profile,
'os_profile': {
'computer_name': instance_name.replace('_', '-'),
'admin_username': dlab_ssh_user_name,
'linux_configuration': {
'disable_password_authentication': True,
'ssh': {
'public_keys': [{
'path': '/home/{}/.ssh/authorized_keys'.format(dlab_ssh_user_name),
'key_data': public_key
}]
}
}
},
'network_profile': {
'network_interfaces': [
{
'id': network_interface_resource_id
}
]
}
}
elif instance_type == 'dataengine':
if image_type == 'pre-configured':
storage_profile = {
'image_reference': {
'id': image_id
},
'os_disk': {
'os_type': 'Linux',
'name': '{}-volume-primary'.format(instance_name),
'create_option': 'fromImage',
'disk_size_gb': int(primary_disk_size),
'tags': tags,
'managed_disk': {
'storage_account_type': instance_storage_account_type
}
}
}
elif image_type == 'default':
storage_profile = {
'image_reference': {
'publisher': publisher,
'offer': offer,
'sku': sku,
'version': 'latest'
},
'os_disk': {
'os_type': 'Linux',
'name': '{}-volume-primary'.format(instance_name),
'create_option': 'fromImage',
'disk_size_gb': int(primary_disk_size),
'tags': tags,
'managed_disk': {
'storage_account_type': instance_storage_account_type
}
}
}
parameters = {
'location': region,
'tags': tags,
'hardware_profile': {
'vm_size': instance_size
},
'storage_profile': storage_profile,
'os_profile': {
'computer_name': instance_name.replace('_', '-'),
'admin_username': dlab_ssh_user_name,
'linux_configuration': {
'disable_password_authentication': True,
'ssh': {
'public_keys': [{
'path': '/home/{}/.ssh/authorized_keys'.format(dlab_ssh_user_name),
'key_data': public_key
}]
}
}
},
'network_profile': {
'network_interfaces': [
{
'id': network_interface_resource_id
}
]
}
}
else:
parameters = {}
result = self.compute_client.virtual_machines.create_or_update(
resource_group_name, instance_name, parameters
).wait()
AzureActions().tag_disks(resource_group_name, instance_name)
return result
except Exception as err:
logging.info(
"Unable to create instance: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create instance",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def tag_disks(self, resource_group_name, instance_name):
postfix_list = ['-volume-primary', '-volume-secondary', '-volume-tertiary']
disk_list = meta_lib.AzureMeta().get_vm_disks(resource_group_name, instance_name)
for inx, disk in enumerate(disk_list):
tags_copy = disk.tags.copy()
tags_copy['Name'] = tags_copy['Name'] + postfix_list[inx]
disk.tags = tags_copy
self.compute_client.disks.create_or_update(resource_group_name, disk.name, disk)
def stop_instance(self, resource_group_name, instance_name):
try:
result = self.compute_client.virtual_machines.deallocate(resource_group_name, instance_name).wait()
return result
except Exception as err:
logging.info(
"Unable to stop instance: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to stop instance",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def start_instance(self, resource_group_name, instance_name):
try:
result = self.compute_client.virtual_machines.start(resource_group_name, instance_name).wait()
return result
except Exception as err:
logging.info(
"Unable to start instance: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to start instance",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def set_tag_to_instance(self, resource_group_name, instance_name, tags):
try:
instance_parameters = self.compute_client.virtual_machines.get(resource_group_name, instance_name)
instance_parameters.tags = tags
result = self.compute_client.virtual_machines.create_or_update(resource_group_name, instance_name,
instance_parameters)
return result
except Exception as err:
logging.info(
"Unable to set instance tags: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to set instance tags",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def remove_instance(self, resource_group_name, instance_name):
try:
result = self.compute_client.virtual_machines.delete(resource_group_name, instance_name).wait()
print("Instance {} has been removed".format(instance_name))
# Removing instance disks
disk_names = []
resource_group_disks = self.compute_client.disks.list_by_resource_group(resource_group_name)
for disk in resource_group_disks:
if instance_name in disk.name:
disk_names.append(disk.name)
for i in disk_names:
self.remove_disk(resource_group_name, i)
print("Disk {} has been removed".format(i))
# Removing public static IP address and network interfaces
network_interface_name = instance_name + '-nif'
for j in meta_lib.AzureMeta().get_network_interface(resource_group_name,
network_interface_name).ip_configurations:
self.delete_network_if(resource_group_name, network_interface_name)
print("Network interface {} has been removed".format(network_interface_name))
if j.public_ip_address:
static_ip_name = j.public_ip_address.id.split('/')[-1]
self.delete_static_public_ip(resource_group_name, static_ip_name)
print("Static IP address {} has been removed".format(static_ip_name))
return result
except Exception as err:
logging.info(
"Unable to remove instance: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to remove instance",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def remove_disk(self, resource_group_name, disk_name):
try:
result = self.compute_client.disks.delete(resource_group_name, disk_name).wait()
return result
except Exception as err:
logging.info(
"Unable to remove disk: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to remove disk",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
@backoff.on_exception(backoff.expo,
TypeError,
max_tries=5)
def create_network_if(self, resource_group_name, vpc_name, subnet_name, interface_name, region, security_group_name,
tags, public_ip_name="None"):
try:
subnet_cidr = meta_lib.AzureMeta().get_subnet(resource_group_name, vpc_name, subnet_name).address_prefix.split('/')[0]
private_ip = meta_lib.AzureMeta().check_free_ip(resource_group_name, vpc_name, subnet_cidr)
subnet_id = meta_lib.AzureMeta().get_subnet(resource_group_name, vpc_name, subnet_name).id
security_group_id = meta_lib.AzureMeta().get_security_group(resource_group_name, security_group_name).id
if public_ip_name == "None":
ip_params = [{
"name": interface_name,
"private_ip_allocation_method": "Static",
"private_ip_address": private_ip,
"private_ip_address_version": "IPv4",
"subnet": {
"id": subnet_id
}
}]
else:
public_ip_id = meta_lib.AzureMeta().get_static_ip(resource_group_name, public_ip_name).id
ip_params = [{
"name": interface_name,
"private_ip_allocation_method": "Static",
"private_ip_address": private_ip,
"private_ip_address_version": "IPv4",
"public_ip_address": {
"id": public_ip_id
},
"subnet": {
"id": subnet_id
}
}]
result = self.network_client.network_interfaces.create_or_update(
resource_group_name,
interface_name,
{
"location": region,
"tags": tags,
"network_security_group": {
"id": security_group_id
},
"ip_configurations": ip_params
}
).wait()
network_interface_id = meta_lib.AzureMeta().get_network_interface(
resource_group_name,
interface_name
).id
return network_interface_id
except Exception as err:
logging.info(
"Unable to create network interface: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create network interface",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def delete_network_if(self, resource_group_name, interface_name):
try:
result = self.network_client.network_interfaces.delete(resource_group_name, interface_name).wait()
return result
except Exception as err:
logging.info(
"Unable to delete network interface: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to delete network interface",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def remove_dataengine_kernels(self, resource_group_name, notebook_name, os_user, key_path, cluster_name):
try:
private = meta_lib.AzureMeta().get_private_ip_address(resource_group_name, notebook_name)
env.hosts = "{}".format(private)
env.user = "{}".format(os_user)
env.key_filename = "{}".format(key_path)
env.host_string = env.user + "@" + env.hosts
sudo('rm -rf /home/{}/.local/share/jupyter/kernels/*_{}'.format(os_user, cluster_name))
if exists('/home/{}/.ensure_dir/dataengine_{}_interpreter_ensured'.format(os_user, cluster_name)):
if os.environ['notebook_multiple_clusters'] == 'true':
try:
livy_port = sudo("cat /opt/" + cluster_name +
"/livy/conf/livy.conf | grep livy.server.port | tail -n 1 | awk '{printf $3}'")
process_number = sudo("netstat -natp 2>/dev/null | grep ':" + livy_port +
"' | awk '{print $7}' | sed 's|/.*||g'")
sudo('kill -9 ' + process_number)
sudo('systemctl disable livy-server-' + livy_port)
except:
print("Wasn't able to find Livy server for this dataengine!")
sudo(
'sed -i \"s/^export SPARK_HOME.*/export SPARK_HOME=\/opt\/spark/\" /opt/zeppelin/conf/zeppelin-env.sh')
sudo("rm -rf /home/{}/.ensure_dir/dataengine_interpreter_ensure".format(os_user))
zeppelin_url = 'http://' + private + ':8080/api/interpreter/setting/'
opener = urllib2.build_opener(urllib2.ProxyHandler({}))
req = opener.open(urllib2.Request(zeppelin_url))
r_text = req.read()
interpreter_json = json.loads(r_text)
interpreter_prefix = cluster_name
for interpreter in interpreter_json['body']:
if interpreter_prefix in interpreter['name']:
print("Interpreter with ID: {0} and name: {1} will be removed from zeppelin!".
format(interpreter['id'], interpreter['name']))
request = urllib2.Request(zeppelin_url + interpreter['id'], data='')
request.get_method = lambda: 'DELETE'
url = opener.open(request)
print(url.read())
sudo('chown ' + os_user + ':' + os_user + ' -R /opt/zeppelin/')
sudo('systemctl daemon-reload')
sudo("service zeppelin-notebook stop")
sudo("service zeppelin-notebook start")
zeppelin_restarted = False
while not zeppelin_restarted:
sudo('sleep 5')
result = sudo('nmap -p 8080 localhost | grep "closed" > /dev/null; echo $?')
result = result[:1]
if result == '1':
zeppelin_restarted = True
sudo('sleep 5')
sudo('rm -rf /home/{}/.ensure_dir/dataengine_{}_interpreter_ensured'.format(os_user, cluster_name))
if exists('/home/{}/.ensure_dir/rstudio_dataengine_ensured'.format(os_user)):
dlab.fab.remove_rstudio_dataengines_kernel(os.environ['computational_name'], os_user)
sudo('rm -rf /opt/' + cluster_name + '/')
print("Notebook's {} kernels were removed".format(env.hosts))
except Exception as err:
logging.info("Unable to remove kernels on Notebook: " + str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout))
append_result(str({"error": "Unable to remove kernels on Notebook",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def create_image_from_instance(self, resource_group_name, instance_name, region, image_name, tags):
try:
instance_id = meta_lib.AzureMeta().get_instance(resource_group_name, instance_name).id
self.compute_client.virtual_machines.deallocate(resource_group_name, instance_name).wait()
self.compute_client.virtual_machines.generalize(resource_group_name, instance_name)
if not meta_lib.AzureMeta().get_image(resource_group_name, image_name):
self.compute_client.images.create_or_update(resource_group_name, image_name, parameters={
"location": region,
"tags": json.loads(tags),
"source_virtual_machine": {
"id": instance_id
}
}).wait()
AzureActions().remove_instance(resource_group_name, instance_name)
except Exception as err:
logging.info(
"Unable to create image: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to create image",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def remove_image(self, resource_group_name, image_name):
try:
return self.compute_client.images.delete(resource_group_name, image_name)
except Exception as err:
logging.info(
"Unable to remove image: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to remove image",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def ensure_local_jars(os_user, jars_dir):
if not exists('/home/{}/.ensure_dir/local_jars_ensured'.format(os_user)):
try:
hadoop_version = sudo("ls /opt/spark/jars/hadoop-common* | sed -n 's/.*\([0-9]\.[0-9]\.[0-9]\).*/\\1/p'")
print("Downloading local jars for Azure")
sudo('mkdir -p {}'.format(jars_dir))
if os.environ['azure_datalake_enable'] == 'false':
sudo('wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure/{0}/hadoop-azure-{0}.jar -O \
{1}hadoop-azure-{0}.jar'.format(hadoop_version, jars_dir))
sudo('wget https://repo1.maven.org/maven2/com/microsoft/azure/azure-storage/{0}/azure-storage-{0}.jar \
-O {1}azure-storage-{0}.jar'.format('2.2.0', jars_dir))
else:
sudo('wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure/{0}/hadoop-azure-{0}.jar -O \
{1}hadoop-azure-{0}.jar'.format('3.0.0', jars_dir))
sudo('wget https://repo1.maven.org/maven2/com/microsoft/azure/azure-storage/{0}/azure-storage-{0}.jar \
-O {1}azure-storage-{0}.jar'.format('6.1.0', jars_dir))
sudo('wget https://repo1.maven.org/maven2/com/microsoft/azure/azure-data-lake-store-sdk/{0}/azure-data-lake-store-sdk-{0}.jar \
-O {1}azure-data-lake-store-sdk-{0}.jar'.format('2.2.3', jars_dir))
sudo('wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure-datalake/{0}/hadoop-azure-datalake-{0}.jar \
-O {1}hadoop-azure-datalake-{0}.jar'.format('3.0.0', jars_dir))
if os.environ['application'] == 'tensor' or os.environ['application'] == 'deeplearning':
sudo('wget https://dl.bintray.com/spark-packages/maven/tapanalyticstoolkit/spark-tensorflow-connector/{0}/spark-tensorflow-connector-{0}.jar \
-O {1}spark-tensorflow-connector-{0}.jar'.format('1.0.0-s_2.11', jars_dir))
sudo('touch /home/{}/.ensure_dir/local_jars_ensured'.format(os_user))
except Exception as err:
logging.info(
"Unable to download local jars: " + str(err) + "\n Traceback: " + traceback.print_exc(file=sys.stdout))
append_result(str({"error": "Unable to download local jars",
"error_message": str(err) + "\n Traceback: " + traceback.print_exc(
file=sys.stdout)}))
traceback.print_exc(file=sys.stdout)
def configure_local_spark(jars_dir, templates_dir, memory_type='driver'):
try:
# Checking if spark.jars parameter was generated previously
spark_jars_paths = None
if exists('/opt/spark/conf/spark-defaults.conf'):
try:
spark_jars_paths = sudo('cat /opt/spark/conf/spark-defaults.conf | grep -e "^spark.jars " ')
except:
spark_jars_paths = None
user_storage_account_tag = "{}-{}-{}-bucket".format(os.environ['conf_service_base_name'],
os.environ['project_name'].lower(),
os.environ['endpoint_name'].lower())
shared_storage_account_tag = '{0}-{1}-shared-bucket'.format(os.environ['conf_service_base_name'],
os.environ['endpoint_name'].lower())
for storage_account in meta_lib.AzureMeta().list_storage_accounts(os.environ['azure_resource_group_name']):
if user_storage_account_tag == storage_account.tags["Name"]:
user_storage_account_name = storage_account.name
user_storage_account_key = meta_lib.AzureMeta().list_storage_keys(
os.environ['azure_resource_group_name'], user_storage_account_name)[0]
if shared_storage_account_tag == storage_account.tags["Name"]:
shared_storage_account_name = storage_account.name
shared_storage_account_key = meta_lib.AzureMeta().list_storage_keys(
os.environ['azure_resource_group_name'], shared_storage_account_name)[0]
if os.environ['azure_datalake_enable'] == 'false':
put(templates_dir + 'core-site-storage.xml', '/tmp/core-site.xml')
else:
put(templates_dir + 'core-site-datalake.xml', '/tmp/core-site.xml')
sudo('sed -i "s|USER_STORAGE_ACCOUNT|{}|g" /tmp/core-site.xml'.format(user_storage_account_name))
sudo('sed -i "s|SHARED_STORAGE_ACCOUNT|{}|g" /tmp/core-site.xml'.format(shared_storage_account_name))
sudo('sed -i "s|USER_ACCOUNT_KEY|{}|g" /tmp/core-site.xml'.format(user_storage_account_key))
sudo('sed -i "s|SHARED_ACCOUNT_KEY|{}|g" /tmp/core-site.xml'.format(shared_storage_account_key))
if os.environ['azure_datalake_enable'] == 'true':
client_id = os.environ['azure_application_id']
refresh_token = os.environ['azure_user_refresh_token']
sudo('sed -i "s|CLIENT_ID|{}|g" /tmp/core-site.xml'.format(client_id))
sudo('sed -i "s|REFRESH_TOKEN|{}|g" /tmp/core-site.xml'.format(refresh_token))
if os.environ['azure_datalake_enable'] == 'false':
sudo('rm -f /opt/spark/conf/core-site.xml')
sudo('mv /tmp/core-site.xml /opt/spark/conf/core-site.xml')
else:
sudo('rm -f /opt/hadoop/etc/hadoop/core-site.xml')
sudo('mv /tmp/core-site.xml /opt/hadoop/etc/hadoop/core-site.xml')
put(templates_dir + 'notebook_spark-defaults_local.conf', '/tmp/notebook_spark-defaults_local.conf')
sudo("jar_list=`find {} -name '*.jar' | tr '\\n' ','` ; echo \"spark.jars $jar_list\" >> \
/tmp/notebook_spark-defaults_local.conf".format(jars_dir))
sudo('\cp -f /tmp/notebook_spark-defaults_local.conf /opt/spark/conf/spark-defaults.conf')
if memory_type == 'driver':
spark_memory = dlab.fab.get_spark_memory()
sudo('sed -i "/spark.*.memory/d" /opt/spark/conf/spark-defaults.conf')
sudo('echo "spark.{0}.memory {1}m" >> /opt/spark/conf/spark-defaults.conf'.format(memory_type,
spark_memory))
if not exists('/opt/spark/conf/spark-env.sh'):
sudo('mv /opt/spark/conf/spark-env.sh.template /opt/spark/conf/spark-env.sh')
java_home = run("update-alternatives --query java | grep -o --color=never \'/.*/java-8.*/jre\'").splitlines()[0]
sudo("echo 'export JAVA_HOME=\'{}\'' >> /opt/spark/conf/spark-env.sh".format(java_home))
if 'spark_configurations' in os.environ:
dlab_header = sudo('cat /tmp/notebook_spark-defaults_local.conf | grep "^#"')
spark_configurations = ast.literal_eval(os.environ['spark_configurations'])
new_spark_defaults = list()
spark_defaults = sudo('cat /opt/spark/conf/spark-defaults.conf')
current_spark_properties = spark_defaults.split('\n')
for param in current_spark_properties:
if param.split(' ')[0] != '#':
for config in spark_configurations:
if config['Classification'] == 'spark-defaults':
for property in config['Properties']:
if property == param.split(' ')[0]:
param = property + ' ' + config['Properties'][property]
else:
new_spark_defaults.append(property + ' ' + config['Properties'][property])
new_spark_defaults.append(param)
new_spark_defaults = set(new_spark_defaults)
sudo("echo '{}' > /opt/spark/conf/spark-defaults.conf".format(dlab_header))
for prop in new_spark_defaults:
prop = prop.rstrip()
sudo('echo "{}" >> /opt/spark/conf/spark-defaults.conf'.format(prop))
sudo('sed -i "/^\s*$/d" /opt/spark/conf/spark-defaults.conf')
if spark_jars_paths:
sudo('echo "{}" >> /opt/spark/conf/spark-defaults.conf'.format(spark_jars_paths))
except Exception as err:
print('Error:', str(err))
sys.exit(1)
def configure_dataengine_spark(cluster_name, jars_dir, cluster_dir, datalake_enabled, spark_configs=''):
local("jar_list=`find {0} -name '*.jar' | tr '\\n' ',' | sed 's/,$//'` ; echo \"spark.jars $jar_list\" >> \
/tmp/{1}/notebook_spark-defaults_local.conf".format(jars_dir, cluster_name))
if os.path.exists('{0}spark/conf/spark-defaults.conf'.format(cluster_dir)):
additional_spark_properties = local('diff --changed-group-format="%>" --unchanged-group-format="" '
'/tmp/{0}/notebook_spark-defaults_local.conf '
'{1}spark/conf/spark-defaults.conf | grep -v "^#"'.format(
cluster_name, cluster_dir), capture=True)
for property in additional_spark_properties.split('\n'):
local('echo "{0}" >> /tmp/{1}/notebook_spark-defaults_local.conf'.format(property, cluster_name))
local('cp -f /tmp/{0}/notebook_spark-defaults_local.conf {1}spark/conf/spark-defaults.conf'.format(cluster_name,
cluster_dir))
if datalake_enabled == 'false':
local('cp -f /opt/spark/conf/core-site.xml {}spark/conf/'.format(cluster_dir))
else:
local('cp -f /opt/hadoop/etc/hadoop/core-site.xml {}hadoop/etc/hadoop/core-site.xml'.format(cluster_dir))
if spark_configs:
dlab_header = local('cat /tmp/{0}/notebook_spark-defaults_local.conf | grep "^#"'.format(cluster_name),
capture=True)
spark_configurations = ast.literal_eval(spark_configs)
new_spark_defaults = list()
spark_defaults = local('cat {0}spark/conf/spark-defaults.conf'.format(cluster_dir), capture=True)
current_spark_properties = spark_defaults.split('\n')
for param in current_spark_properties:
if param.split(' ')[0] != '#':
for config in spark_configurations:
if config['Classification'] == 'spark-defaults':
for property in config['Properties']:
if property == param.split(' ')[0]:
param = property + ' ' + config['Properties'][property]
else:
new_spark_defaults.append(property + ' ' + config['Properties'][property])
new_spark_defaults.append(param)
new_spark_defaults = set(new_spark_defaults)
local("echo '{0}' > {1}/spark/conf/spark-defaults.conf".format(dlab_header, cluster_dir))
for prop in new_spark_defaults:
prop = prop.rstrip()
local('echo "{0}" >> {1}/spark/conf/spark-defaults.conf'.format(prop, cluster_dir))
local('sed -i "/^\s*$/d" {0}/spark/conf/spark-defaults.conf'.format(cluster_dir))
def remount_azure_disk(creds=False, os_user='', hostname='', keyfile=''):
if creds:
env['connection_attempts'] = 100
env.key_filename = [keyfile]
env.host_string = os_user + '@' + hostname
sudo('sed -i "/azure_resource-part1/ s|/mnt|/media|g" /etc/fstab')
sudo('grep "azure_resource-part1" /etc/fstab > /dev/null && umount -f /mnt/ || true')
sudo('mount -a')
def prepare_vm_for_image(creds=False, os_user='', hostname='', keyfile=''):
if creds:
env['connection_attempts'] = 100
env.key_filename = [keyfile]
env.host_string = os_user + '@' + hostname
sudo('waagent -deprovision -force')
def prepare_disk(os_user):
if not exists('/home/' + os_user + '/.ensure_dir/disk_ensured'):
try:
allow = False
counter = 0
remount_azure_disk()
disk_name = sudo("lsblk | grep disk | awk '{print $1}' | sort | tail -n 1")
with settings(warn_only=True):
sudo('umount -l /dev/{}1'.format(disk_name))
while not allow:
if counter > 4:
print("Unable to prepare disk")
sys.exit(1)
else:
sudo('''bash -c 'echo -e "o\nn\np\n1\n\n\nw" | fdisk /dev/{}' 2>&1 | tee /tmp/tee.tmp '''.format(
disk_name), warn_only=True)
out = sudo('cat /tmp/tee.tmp')
if 'Syncing disks' in out:
allow = True
elif 'The kernel still uses the old table.' in out:
if sudo('partprobe'):
with settings(warn_only=True):
reboot(wait=180)
allow = True
else:
counter += 1
time.sleep(5)
sudo('umount -l /dev/{}1'.format(disk_name), warn_only=True)
sudo('mkfs.ext4 -F /dev/{}1'.format(disk_name))
sudo('mount /dev/{}1 /opt/'.format(disk_name))
sudo(''' bash -c "echo '/dev/{}1 /opt/ ext4 errors=remount-ro 0 1' >> /etc/fstab" '''.format(
disk_name))
sudo('touch /home/' + os_user + '/.ensure_dir/disk_ensured')
except Exception as err:
traceback.print_exc()
print('Error:', str(err))
sys.exit(1)
def ensure_local_spark(os_user, spark_link, spark_version, hadoop_version, local_spark_path):
if not exists('/home/' + os_user + '/.ensure_dir/local_spark_ensured'):
try:
if os.environ['azure_datalake_enable'] == 'false':
sudo('wget ' + spark_link + ' -O /tmp/spark-' + spark_version + '-bin-hadoop' + hadoop_version + '.tgz')
sudo('tar -zxvf /tmp/spark-' + spark_version + '-bin-hadoop' + hadoop_version + '.tgz -C /opt/')
sudo('mv /opt/spark-' + spark_version + '-bin-hadoop' + hadoop_version + ' ' + local_spark_path)
sudo('chown -R ' + os_user + ':' + os_user + ' ' + local_spark_path)
sudo('touch /home/' + os_user + '/.ensure_dir/local_spark_ensured')
else:
# Downloading Spark without Hadoop
sudo('wget https://archive.apache.org/dist/spark/spark-{0}/spark-{0}-bin-without-hadoop.tgz -O /tmp/spark-{0}-bin-without-hadoop.tgz'
.format(spark_version))
sudo('tar -zxvf /tmp/spark-{}-bin-without-hadoop.tgz -C /opt/'.format(spark_version))
sudo('mv /opt/spark-{}-bin-without-hadoop {}'.format(spark_version, local_spark_path))
sudo('chown -R {0}:{0} {1}'.format(os_user, local_spark_path))
# Downloading Hadoop
hadoop_version = '3.0.0'
sudo('wget https://archive.apache.org/dist/hadoop/common/hadoop-{0}/hadoop-{0}.tar.gz -O /tmp/hadoop-{0}.tar.gz'
.format(hadoop_version))
sudo('tar -zxvf /tmp/hadoop-{0}.tar.gz -C /opt/'.format(hadoop_version))
sudo('mv /opt/hadoop-{0} /opt/hadoop/'.format(hadoop_version))
sudo('chown -R {0}:{0} /opt/hadoop/'.format(os_user))
# Configuring Hadoop and Spark
java_path = dlab.common_lib.find_java_path_remote()
sudo('echo "export JAVA_HOME={}" >> /opt/hadoop/etc/hadoop/hadoop-env.sh'.format(java_path))
sudo("""echo 'export HADOOP_CLASSPATH="$HADOOP_HOME/share/hadoop/tools/lib/*"' >> /opt/hadoop/etc/hadoop/hadoop-env.sh""")
sudo('echo "export HADOOP_HOME=/opt/hadoop/" >> /opt/spark/conf/spark-env.sh')
sudo('echo "export SPARK_HOME=/opt/spark/" >> /opt/spark/conf/spark-env.sh')
spark_dist_classpath = sudo('/opt/hadoop/bin/hadoop classpath')
sudo('echo "export SPARK_DIST_CLASSPATH={}" >> /opt/spark/conf/spark-env.sh'.format(
spark_dist_classpath))
sudo('touch /home/{}/.ensure_dir/local_spark_ensured'.format(os_user))
except Exception as err:
print('Error:', str(err))
sys.exit(1)
def install_dataengine_spark(cluster_name, spark_link, spark_version, hadoop_version, cluster_dir, os_user, datalake_enabled):
try:
if datalake_enabled == 'false':
local('wget ' + spark_link + ' -O /tmp/' + cluster_name + '/spark-' + spark_version + '-bin-hadoop' + hadoop_version + '.tgz')
local('tar -zxvf /tmp/' + cluster_name + '/spark-' + spark_version + '-bin-hadoop' + hadoop_version + '.tgz -C /opt/')
local('mv /opt/spark-' + spark_version + '-bin-hadoop' + hadoop_version + ' ' + cluster_dir + 'spark/')
local('chown -R ' + os_user + ':' + os_user + ' ' + cluster_dir + 'spark/')
else:
# Downloading Spark without Hadoop
local('wget https://archive.apache.org/dist/spark/spark-{0}/spark-{0}-bin-without-hadoop.tgz -O /tmp/{1}/spark-{0}-bin-without-hadoop.tgz'
.format(spark_version, cluster_name))
local('tar -zxvf /tmp/' + cluster_name + '/spark-{}-bin-without-hadoop.tgz -C /opt/'.format(spark_version))
local('mv /opt/spark-{}-bin-without-hadoop {}spark/'.format(spark_version, cluster_dir))
local('chown -R {0}:{0} {1}/spark/'.format(os_user, cluster_dir))
# Downloading Hadoop
hadoop_version = '3.0.0'
local('wget https://archive.apache.org/dist/hadoop/common/hadoop-{0}/hadoop-{0}.tar.gz -O /tmp/{1}/hadoop-{0}.tar.gz'
.format(hadoop_version, cluster_name))
local('tar -zxvf /tmp/' + cluster_name + '/hadoop-{0}.tar.gz -C /opt/'.format(hadoop_version))
local('mv /opt/hadoop-{0} {1}hadoop/'.format(hadoop_version, cluster_dir))
local('chown -R {0}:{0} {1}hadoop/'.format(os_user, cluster_dir))
# Configuring Hadoop and Spark
java_path = dlab.common_lib.find_java_path_local()
local('echo "export JAVA_HOME={}" >> {}hadoop/etc/hadoop/hadoop-env.sh'.format(java_path, cluster_dir))
local("""echo 'export HADOOP_CLASSPATH="$HADOOP_HOME/share/hadoop/tools/lib/*"' >> {}hadoop/etc/hadoop/hadoop-env.sh""".format(cluster_dir))
local('echo "export HADOOP_HOME={0}hadoop/" >> {0}spark/conf/spark-env.sh'.format(cluster_dir))
local('echo "export SPARK_HOME={0}spark/" >> {0}spark/conf/spark-env.sh'.format(cluster_dir))
spark_dist_classpath = local('{}hadoop/bin/hadoop classpath'.format(cluster_dir), capture=True)
local('echo "export SPARK_DIST_CLASSPATH={}" >> {}spark/conf/spark-env.sh'.format(
spark_dist_classpath, cluster_dir))
except:
sys.exit(1)
def find_des_jars(all_jars, des_path):
try:
# Use this method to filter cloud jars (see an example in aws method)
return all_jars
except Exception as err:
print('Error:', str(err))
sys.exit(1)