blob: 4ae07a78f9979edf05b069ac7908b1ffe3ee4e79 [file] [log] [blame]
#!/usr/bin/env python
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
import base64
import getpass
import json
import os
import shutil
import socket
import urllib2
import xml.etree.ElementTree as ET
from optparse import OptionParser
PLUGIN_VERSION = '${release}'
DEFAULT_STACK = '${default.stack}'
SUPPORTED_OS_LIST = ['redhat6', 'redhat7']
HAWQ_LIB_STAGING_DIR = '${hawq.lib.staging.dir}'
REPO_VERSION = '${repository.version}'
HAWQ_REPO = '${hawq.repo.prefix}'
HAWQ_ADD_ONS_REPO = '${hawq.addons.repo.prefix}'
'repoid': '-'.join([HAWQ_REPO, REPO_VERSION]),
'input_param': '--hawqrepo',
'optional': False
'repoid': '-'.join([HAWQ_ADD_ONS_REPO, REPO_VERSION]),
'input_param': '--addonsrepo',
'optional': True
class APIClient:
Class which interacts with Ambari Server API
# Base API URL points to localhost. This script is to be executed on the Ambari Server
BASE_API_URL = 'http://localhost:8080/api/v1'
def __init__(self, user, password):
self.user = user
self.password = password
self.encoded_credentials = base64.encodestring(self.user + ':' + self.password).replace('\n', '')
def __request(self, method, url_path, headers=None, data=None):
Creates API requests and packages response into the following format: (response code, response body in json object)
headers = headers if headers is not None else {}
headers['Authorization'] = 'Basic {0}'.format(self.encoded_credentials)
req = urllib2.Request(self.BASE_API_URL + url_path, data, headers)
req.get_method = lambda: method
response = urllib2.urlopen(req)
response_str =
return response.getcode(), json.loads(response_str) if response_str else None
def verify_api_reachable(self):
Returns true if Ambari Server is reachable through API
status_code, _ = self.__request('GET', '/stacks')
except Exception as e:
if type(e) == urllib2.HTTPError and e.code == 403:
raise Exception('Invalid username and/or password.')
elif type(e) == urllib2.URLError:
raise Exception('Ambari-server is not running. Please start ambari-server.')
raise Exception('Unable to connect to Ambari Server.\n' + str(e))
def get_cluster_name(self):
Returns the name of the installed cluster
_, response_json = self.__request('GET', '/clusters')
return None if len(response_json['items']) == 0 else response_json['items'][0]['Clusters']['cluster_name']
def get_stack_info(self, cluster_name):
Returns stack information (stack name, stack version, repository version) of stack installed on cluster
_, response_json = self.__request('GET',
if 'items' not in response_json or len(response_json['items']) == 0:
raise Exception('No Stack found to be installed on the cluster {0}'.format(cluster_name))
stack_versions = response_json['items'][0]['ClusterStackVersions']
return stack_versions['stack'], stack_versions['version'], stack_versions['repository_version']
def get_existing_repository_info(self, stack_name, stack_version, repository_version):
Returns existing repo information for a given stack
url_path = '/stacks/{0}/versions/{1}/compatible_repository_versions/{2}?fields=*,operating_systems/*,operating_systems/repositories/*'.format(
_, response_json = self.__request('GET', url_path)
return response_json
def update_existing_repo(self, stack_name, stack_version, repository_version, merged_repo_info):
Sends a PUT request to add new repo information to the Ambari database
url_path = '/stacks/{0}/versions/{1}/repository_versions/{2}'.format(stack_name, stack_version,
headers = {}
headers['X-Requested-By'] = 'ambari'
headers['Content-Type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
status_code, _ = self.__request('PUT', url_path, headers, merged_repo_info)
# Ambari returns sporadic errors even if PUT succeeds
# Ignore any exception, because existing information from cluster will be verified after PUT request
class RepoUtils:
Utility class for handling json structure to add new repo to existing repo
def __transform_repo(self, repository):
Extracts and returns the base_url, repo_id and repo_name for each repository
repo_info_json = repository['Repositories']
result = {}
result['Repositories'] = dict(
(k, v) for k, v in repo_info_json.iteritems() if k in ('base_url', 'repo_id', 'repo_name'))
return result
def __transform_os_repos(self, os_repos):
Constructs the json string for each operating system
result = {
'OperatingSystems': {},
'repositories': []
result['OperatingSystems']['os_type'] = os_repos['OperatingSystems']['os_type']
result['repositories'] = [self.__transform_repo(repository) for repository in os_repos['repositories']]
return result
def __transform(self, repository_info):
Constructs the json string with required repository information
result = {
'operating_systems': []
result['operating_systems'] = [self.__transform_os_repos(os_repos) for os_repos in
return result
def __create_repo_info_dict(self, repo):
Creates json string with new repo information
result = {}
result['Repositories'] = {
'base_url': repo['baseurl'],
'repo_id': repo['repoid'],
'repo_name': repo['reponame']
return result
def verify_repos_updated(self, existing_repo_info, repos_to_add):
Checks if input repo exists for that os_type on the cluster
existing_repos = self.__transform(existing_repo_info)
all_repos_updated = True
for os_repos in existing_repos['operating_systems']:
if os_repos['OperatingSystems']['os_type'] in SUPPORTED_OS_LIST:
for repo_to_add in repos_to_add:
repo_exists = False
for existing_repo in os_repos['repositories']:
if existing_repo['Repositories']['repo_id'] == repo_to_add['repoid'] and \
existing_repo['Repositories']['repo_name'] == repo_to_add['reponame'] and \
url_exists(existing_repo['Repositories']['base_url'], repo_to_add['baseurl']):
repo_exists = True
all_repos_updated = all_repos_updated and repo_exists
return all_repos_updated
def add_to_existing_repos(self, existing_repo_info, repos_to_add):
Helper function for adding new repos to existing repos
existing_repos = self.__transform(existing_repo_info)
for os_repos in existing_repos['operating_systems']:
if os_repos['OperatingSystems']['os_type'] in SUPPORTED_OS_LIST:
for repo_to_add in repos_to_add:
repo_exists = False
for existing_repo in os_repos['repositories']:
if existing_repo['Repositories']['repo_id'] == repo_to_add['repoid']:
repo_exists = True
existing_repo['Repositories']['repo_name'] = repo_to_add['reponame']
existing_repo['Repositories']['base_url'] = repo_to_add['baseurl']
if not repo_exists:
return json.dumps(existing_repos)
class InputValidator:
Class containing methods for validating command line inputs
def __is_repourl_valid(self, repo_url):
Returns True if repo_url points to a valid repository
repo_url = os.path.join(repo_url, 'repodata/repomd.xml')
req = urllib2.Request(repo_url)
response = urllib2.urlopen(req)
except urllib2.URLError:
return False
if response.getcode() != 200:
return False
return True
def verify_stack(self, stack):
Returns stack info of stack
if not stack:
# Use default stack
print 'INFO: Using default stack {0}, since --stack parameter was not specified.'.format(DEFAULT_STACK)
stack_pair = stack.split('-')
if len(stack_pair) != 2:
raise Exception('Specified stack {0} is not of expected format STACK_NAME-STACK_VERSION'.format(stack))
stack_name = stack_pair[0]
stack_version = stack_pair[1]
stack_dir = '/var/lib/ambari-server/resources/stacks/{0}/{1}'.format(stack_name, stack_version)
if not os.path.isdir(stack_dir):
raise Exception(
'Specified stack {0} does not exist under /var/lib/ambari-server/resources/stacks'.format(stack))
return {
'stack_name': stack_name,
'stack_version': stack_version,
'stack_dir': stack_dir
def verify_repo(self, repoid_prefix, repo_url):
Returns repo info of repo
repo_specified = True
if not repo_url:
# Use default repo_url
repo_url = 'http://{0}/{1}'.format(socket.getfqdn(), REPO_INFO[repoid_prefix]['repoid'])
repo_specified = False
if not self.__is_repourl_valid(repo_url):
if repo_specified:
raise Exception('Specified URL {0} is not a valid repository. \n'
'Please specify a valid url for {1}'.format(repo_url,
elif REPO_INFO[repoid_prefix]['optional']:
return None
raise Exception(
'Repository URL {0} is not valid. \nPlease ensure has been run for the {1} repository on this machine '
'OR specify a valid url for {2}'.format(repo_url, REPO_INFO[repoid_prefix]['repoid'],
return {
'repoid': REPO_INFO[repoid_prefix]['repoid'],
'reponame': REPO_INFO[repoid_prefix]['repoid'],
'baseurl': repo_url
def url_exists(repoA, repoB):
Returns True if given repourl repoA exists in repoB
if type(repoB) in (list, tuple):
return repoA.rstrip('/') in [existing_url.rstrip('/') for existing_url in repoB]
return repoA.rstrip('/') == repoB.rstrip('/')
def update_repoinfo(stack_dir, repos_to_add):
Updates the repoinfo.xml under the specified stack_dir
file_path = '{0}/repos/repoinfo.xml'.format(stack_dir)
for repo in repos_to_add:
repo['xmltext'] = '<repo>\n' \
' <repoid>{0}</repoid>\n' \
' <reponame>{1}</reponame>\n' \
' <baseurl>{2}</baseurl>\n' \
'</repo>\n'.format(repo['repoid'], repo['reponame'], repo['baseurl'])
tree = ET.parse(file_path)
root = tree.getroot()
file_needs_update = False
for os_tag in root.findall('.//os'):
if os_tag.attrib['family'] in SUPPORTED_OS_LIST:
for repo_to_add in repos_to_add:
repo_needs_update = False
for existing_repo in os_tag.findall('.//repo'):
existing_repoid = [repoid.text for repoid in existing_repo.findall('.//repoid')][0]
existing_reponame = [repoid.text for repoid in existing_repo.findall('.//reponame')][0]
existing_baseurl = [baseurl.text for baseurl in existing_repo.findall('.//baseurl')][0]
if existing_repoid == repo_to_add['repoid']:
repo_needs_update = True
print 'INFO: Repository {0} already exists with reponame {1}, baseurl {2} in {3}'.format(
repo_to_add['repoid'], existing_reponame, existing_baseurl, file_path)
if existing_reponame != repo_to_add['reponame'] or existing_baseurl != repo_to_add['baseurl']:
print 'INFO: Repository {0} updated with reponame {1}, baseurl {2} in {3}'.format(
repo_to_add['repoid'], repo_to_add['reponame'], repo_to_add['baseurl'], file_path)
file_needs_update = True
if not repo_needs_update:
print 'INFO: Repository {0} with baseurl {1} added to {2}'.format(repo_to_add['repoid'],
repo_to_add['baseurl'], file_path)
file_needs_update = True
if file_needs_update:
def add_repo_to_cluster(api_client, stack, repos_to_add):
Adds the new repository to the existing cluster if the specified stack has been installed on that cluster
stack_name = stack['stack_name']
stack_version = stack['stack_version']
cluster_name = api_client.get_cluster_name()
# Proceed only if cluster is installed
if cluster_name is None:
repo_utils = RepoUtils()
installed_stack_name, installed_stack_version, installed_repository_version = api_client.get_stack_info(
# Proceed only if installed stack matches input stack
if stack_name != installed_stack_name or stack_version != installed_stack_version:
existing_repo_info = api_client.get_existing_repository_info(stack_name, stack_version,
new_repo_info = repo_utils.add_to_existing_repos(existing_repo_info, repos_to_add)
api_client.update_existing_repo(stack_name, stack_version, installed_repository_version, new_repo_info)
if not repo_utils.verify_repos_updated(
api_client.get_existing_repository_info(stack_name, stack_version, installed_repository_version),
raise Exception(
'Failed to update repository information on existing cluster, {0} with stack {1}-{2}'.format(cluster_name,
print 'INFO: Repositories are available on existing cluster, {0} with stack {1}-{2}'.format(cluster_name,
def write_service_info(stack_dir):
Writes the service info content to the specified stack_dir
stack_services = os.path.join(stack_dir, 'services')
for service in ('HAWQ', 'PXF'):
source_directory = os.path.join(HAWQ_LIB_STAGING_DIR, service)
destination_directory = os.path.join(stack_services, service)
if not os.path.exists(source_directory):
raise Exception('{0} directory was not found under {1}'.format(service, HAWQ_LIB_STAGING_DIR))
service_exists = False
if os.path.exists(destination_directory):
service_exists = True
if service_exists:
print 'INFO: Updating service {0}, which already exists under {1}'.format(service, stack_services)
shutil.copytree(source_directory, destination_directory)
print 'INFO: {0} directory was successfully {1}d under directory {2}'.format(service,
'update' if service_exists else 'create',
def build_parser():
Builds the parser required for parsing user inputs from command line
usage_string = 'Usage: ./ --user admin --password admin --stack HDP-2.4 --hawqrepo --addonsrepo'
parser = OptionParser(usage=usage_string, version='%prog {0}'.format(PLUGIN_VERSION))
parser.add_option('-u', '--user', dest='user', help='Ambari login username (Required)')
parser.add_option('-p', '--password', dest='password',
help='Ambari login password. Providing password through command line is not recommended.\n'
'The script prompts for the password.')
parser.add_option('-s', '--stack', dest='stack', help='Stack Name and Version to be added.'
'(Eg: HDP-2.4 or HDP-2.5)')
parser.add_option('-r', '--hawqrepo', dest='hawqrepo', help='Repository URL which points to the HAWQ packages')
parser.add_option('-a', '--addonsrepo', dest='addonsrepo',
help='Repository URL which points to the HAWQ Add Ons packages')
return parser
def main():
parser = build_parser()
options, _ = parser.parse_args()
user = options.user if options.user else raw_input('Enter Ambari login Username: ')
password = options.password if options.password else getpass.getpass('Enter Ambari login Password: ')
# Verify if Ambari credentials are correct and API is reachable
api_client = APIClient(user, password)
validator = InputValidator()
stack_info = validator.verify_stack(options.stack)
repos_to_add = [validator.verify_repo(HAWQ_REPO, options.hawqrepo)]
add_ons_repo = validator.verify_repo(HAWQ_ADD_ONS_REPO, options.addonsrepo)
if add_ons_repo is not None:
update_repoinfo(stack_info['stack_dir'], repos_to_add)
add_repo_to_cluster(api_client, stack_info, repos_to_add)
print '\nINFO: Please restart ambari-server for changes to take effect'
except Exception as e:
print '\nERROR: {0}'.format(str(e))
if __name__ == '__main__':