blob: e07843dfa7a8bc41be76c31f4e4661df72d4bde4 [file] [log] [blame]
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import urllib2, base64, json, ssl, time, random, sys
from optparse import OptionParser
from contextlib import closing
class SslContext:
def build(self, url):
if not url.startswith('https') or not hasattr(ssl, 'SSLContext'):
return None
return ssl.SSLContext(self._protocol()) if self._protocol() else ssl.create_default_context()
def _protocol(self):
if hasattr(ssl, 'PROTOCOL_TLS'): return ssl.PROTOCOL_TLS
elif hasattr(ssl, 'PROTOCOL_TLSv1_2'): return ssl.PROTOCOL_TLSv1_2
elif hasattr(ssl, 'PROTOCOL_TLSv1_1'): return ssl.PROTOCOL_TLSv1_1
elif hasattr(ssl, 'PROTOCOL_TLSv1'): return ssl.PROTOCOL_TLSv1
else: return None
class PermissiveSslContext:
def build(self, url):
context = SslContext().build(url)
if hasattr(context, '_https_verify_certificates'):
context._https_verify_certificates(False)
return context
class Url:
@classmethod
def base(clazz, protocol, host, port):
return clazz('%s://%s:%d' % (protocol, host, port))
def __init__(self, url_str):
self.base = url_str.rstrip('/')
def __div__(self, suffix_url):
suffix_str = str(suffix_url)
if self._is_absolute(suffix_str):
return Url(suffix_str)
else:
return Url(self.base + (suffix_str if suffix_str.startswith('/') else '/' + suffix_str))
def _is_absolute(self, suffix_str):
return suffix_str.startswith(self.base)
def query_params(self, a_dict):
return Url(self.base + '?' + '&'.join('%s=%s' % (name, value) for name, value in a_dict.items()))
def __str__(self):
return self.base
class Header:
@classmethod
def csrf(clazz):
return clazz('X-Requested-By', 'ambari')
def __init__(self, key, value):
self.key, self.value = key, value
def add_to(self, request):
request.add_header(self.key, self.value)
class BasicAuth:
def __init__(self, user, password):
self.header = Header(
'Authorization',
'Basic %s' % base64.encodestring('%s:%s' % (user, password)).replace('\n', ''))
def authenticate(self, request):
self.header.add_to(request)
class ResponseTransformer:
@staticmethod
def identity():
return lambda url, code, data: (code, data)
def __call__(self, url, code, data):
raise RuntimeError('Subclass responsibility')
class UnexpectedHttpCode(Exception): pass
class JsonTransformer(ResponseTransformer):
def __call__(self, url, code, data):
if 200 <= code <= 299:
return code, self._parse(data)
else:
return UnexpectedHttpCode('Unexpected http code: %d url: %s response: %s' % (code, url, data))
def _parse(self, a_str):
if not a_str:
return {}
try:
return json.loads(a_str)
except ValueError as e:
raise ValueError('Error %s while parsing: %s' % (e, a_str))
class RestClient:
def __init__(self, an_url, authenticator, headers=[], ssl_context=SslContext(), request_transformer=lambda r:r, response_transformer=ResponseTransformer.identity()):
self.base_url = an_url
self.authenticator = authenticator
self.headers = headers
self.ssl_context = ssl_context
self.request_transformer = request_transformer
self.response_transformer = response_transformer
def get(self, suffix_str):
return self._response(*self._request(suffix_str, 'GET'))
def post(self, suffix_str, data):
return self._response(*self._request(suffix_str, 'POST', data=data))
def put(self, suffix_str, data):
return self._response(*self._request(suffix_str, 'PUT', data=data))
def delete(self, suffix_str):
return self._response(*self._request(suffix_str, 'DELETE'))
def _request(self, suffix_str, http_method, data=""):
url = str(self.base_url / suffix_str)
request = urllib2.Request(url, data=self.request_transformer(data))
request.get_method = lambda: http_method
self.authenticator.authenticate(request)
map(lambda each: each.add_to(request), self.headers)
return request, self.ssl_context.build(url)
def _response(self, request, ssl_context):
with closing(urllib2.urlopen(request, context=ssl_context)) as response:
return self.response_transformer(request.get_full_url(), response.getcode(), response.read())
def rebased(self, new_base_url):
return RestClient(
new_base_url,
self.authenticator,
self.headers,
self.ssl_context,
self.request_transformer,
self.response_transformer)
class ServiceComponent:
def __init__(self, client, a_dict):
self.client = client
self.name = a_dict['ServiceComponentInfo']['component_name']
self.component = a_dict
def host_names(self):
return [each['HostRoles']['host_name'] for each in self.component['host_components']]
def __str__(self):
return self.name
class Service:
def __init__(self, client, a_dict):
self.client = client
self.service = a_dict
self.href = self.service['href']
self.name = self.service['ServiceInfo']['service_name']
def delete(self):
try:
self.client.delete(self.href)
except urllib2.HTTPError as e:
if e.code != 404:
raise e
def start(self):
_, data = self.client.put(self.href, {'ServiceInfo': {'state' : 'STARTED'}})
return AsyncResult.of(self.client, data)
def components(self):
return [ServiceComponent(self.client, self.client.get(each['href'])[1]) for each in self.service['components']]
def component(self, component_name):
matches = [each for each in self.components() if each.name == 'HDFS_CLIENT']
return matches[0] if matches else None
def __str__(self):
return self.name
class Cluster:
def __init__(self, cluster_name, host, port=8080, protocol='http', user='admin', password='admin', api_version='v1'):
self.cluster_name = cluster_name
self.base_url = Url.base(protocol, host, port) / 'api' / api_version
self.client = RestClient(
self.base_url / 'clusters' / cluster_name,
BasicAuth(user, password),
headers=[Header.csrf()],
ssl_context=PermissiveSslContext(),
request_transformer=json.dumps,
response_transformer=JsonTransformer())
def version(self):
_, data = self.client.get('')
return data['Clusters']['version']
def installed_stack(self):
stack_name, stack_ver = cluster.version().split('-')
return Stack(stack_name, stack_ver, self.client.rebased(self.base_url / 'stacks'))
def add_service(self, service_name):
self.client.post(Url('services') / service_name, {'ServiceInfo' : {'service_name' : service_name}})
def add_service_component(self, service_name, component_name):
self.client.post(Url('services') / service_name / 'components' / component_name, {})
def add_host_component(self, service_name, component_name, host_name):
self.client.post(
Url('hosts').query_params({'Hosts/host_name': host_name}),
{'host_components': [{'HostRoles': {'component_name': component_name}}]})
_, data = self.client.put(Url('services') / service_name, {'ServiceInfo': {'state' : 'INSTALLED'}})
return AsyncResult.of(self.client, data)
def service(self, service_name):
_, data = self.client.get(Url('services') / service_name)
return Service(self.client, data)
def services(self):
_, data = self.client.get(Url('services'))
return [Service(self.client, self.client.get(each['href'])[1]) for each in data['items']]
def has_service(self, service_name):
return service_name in [each.name for each in self.services()]
def add_config(self, config_type, tag, properties):
self.client.post(Url('configurations'), {
'type': config_type,
'tag': tag,
'properties' : properties
})
self.client.put('', {
'Clusters' : {
'desired_configs': {'type': config_type, 'tag' : tag }
}
})
def config(self, config_type):
code, data = self.client.get(Url('configurations').query_params({'type': config_type}))
return Configs(self.client, [Config(self.client, each) for each in data['items']])
def start_all(self):
_, data = self.client.put('services', {
'RequestInfo' : {
'context' : '_PARSE_.START.ALL_SERVICES',
'operation_level' : { 'level' : 'CLUSTER', 'cluster_name' : self.cluster_name }
},
'Body' : { 'ServiceInfo' : {'state' : 'STARTED'} }
})
return AsyncResult.of(self.client, data)
def stop_all(self):
_, data = self.client.put('services', {
'RequestInfo' : {
'context' : '_PARSE_.STOP.ALL_SERVICES',
'operation_level' : { 'level' : 'CLUSTER', 'cluster_name' : self.cluster_name }
},
'Body' : { 'ServiceInfo' : {'state' : 'INSTALLED'} }
})
return AsyncResult.of(self.client, data)
def __str__(self):
return 'Cluster: %s (%s)' % (self.cluster_name, self.client.base_url)
class OperationFailed(Exception): pass
class AsyncResult:
@staticmethod
def of(client, data):
return AsyncResult(client, data) if data else NoResult()
def __init__(self, client, a_dict):
self.client = client
self.status = a_dict['Requests']['status']
self.id = a_dict['Requests']['id']
self.href = a_dict['href']
def request_status(self):
_, data = self.client.get(self.href)
return data['Requests']['request_status']
def is_finished(self):
return self.request_status() in ['FAILED', 'TIMEDOUT', 'ABORTED', 'COMPLETED', 'SKIPPED_FAILED']
def await(self):
while not self.is_finished():
time.sleep(1)
status = self.request_status()
if status != 'COMPLETED':
raise OperationFailed("%s failed with status: %s" % (self.id, status))
return status
def __str__(self):
return "Request status: %s id: %d" % (self.status, self.id)
class NoResult:
def request_status(): return 'UNKNOWN'
def is_finished(self): return True
def await(self): pass
class Config:
def __init__(self, client, a_dict):
self.client = client
self.config = a_dict
def version(self):
return int(self.config['version'])
def href(self):
return self.config['href']
def properties(self):
code, data = self.client.get(self.href())
return data['items'][0]['properties']
def __str__(self):
return json.dumps(self.config)
class Configs:
def __init__(self, client, config_list):
self.client = client
self.configs = sorted(config_list, key=lambda config: config.version())
def latest(self):
return self.configs[-1]
class Stack:
def __init__(self, stack_name, stack_version, client):
self.name = stack_name
self.version = stack_version
self.client = client
def has_service(self, service_name):
try:
_, data = self.client.get(Url(self.name) / 'versions' / self.version / 'services' / service_name)
return True
except urllib2.HTTPError as e:
if e.code == 404:
return False
else:
raise e
class CannotLoad(Exception): pass
class FsStorage:
def save(self, key, value):
with open("saved-" + key, 'wt') as f:
f.write(repr(value))
def load(self, key):
try:
with open("saved-" + key, 'rt') as f:
return eval(f.read())
except IOError as e:
raise CannotLoad(key + ' not found')
class Conversion:
def __init__(self, cluster, storage):
self.cluster = cluster
self.storage = storage
def check_prerequisites(self):
print 'Checking %s' % self.cluster
ver = self.cluster.version()
print 'Found stack %s' % ver
if not ver.startswith('HDP-3.'):
print 'Only HDP-3.x stacks are supported.'
return False
if not self.cluster.installed_stack().has_service('ONEFS'):
print 'ONEFS management pack is not installed.'
return False
sys.stdout.write('Please, confirm you have made backup of the Ambari db [y/n] (n)? ')
if raw_input() != 'y':
return False
return True
def perform(self):
hdfs_client_hosts = self.find_hdfs_client_hosts()
self.stop_all_services()
self.read_configs()
self.delete_hdfs()
self.add_onefs()
self.configure_onefs()
self.install_onefs_clients(hdfs_client_hosts)
self.start_all_services()
def find_hdfs_client_hosts(self):
if self.cluster.has_service('HDFS'):
print 'Collecting hosts with HDFS_CLIENT'
hdfs_client_hosts = self.cluster.service('HDFS').component('HDFS_CLIENT').host_names()
self.storage.save('hdfs_client_hosts', hdfs_client_hosts)
else:
print 'Using previously saved HDFS client hosts'
hdfs_client_hosts = self.storage.load('hdfs_client_hosts')
print 'Found hosts %s' % hdfs_client_hosts
return hdfs_client_hosts
def stop_all_services(self):
print 'Stopping all services..'
self.cluster.stop_all().await()
def read_configs(self):
if self.cluster.has_service('HDFS'):
print 'Downloading core-site..'
self.core_site = self.cluster.config('core-site').latest().properties()
print 'Downloading hdfs-site..'
self.hdfs_site = self.cluster.config('hdfs-site').latest().properties()
print 'Downloading hadoop-env..'
self.hadoop_env = self.cluster.config('hadoop-env').latest().properties()
self.storage.save('core-site', self.core_site)
self.storage.save('hdfs-site', self.hdfs_site)
self.storage.save('hadoop-env', self.hadoop_env)
else:
print 'Using previously saved HDFS configs'
self.core_site = self.storage.load('core-site')
self.hdfs_site = self.storage.load('hdfs-site')
self.hadoop_env = self.storage.load('hadoop-env')
def delete_hdfs(self):
print 'Deleting HDFS..'
if self.cluster.has_service('HDFS'):
self.cluster.service('HDFS').delete()
else:
print 'Already deleted.'
def add_onefs(self):
print 'Adding ONEFS..'
if self.cluster.has_service('ONEFS'):
print 'Already added.'
else:
self.cluster.add_service('ONEFS')
try:
self.cluster.add_service_component('ONEFS', 'ONEFS_CLIENT')
except urllib2.HTTPError as e:
if e.code != 409:
raise e
def configure_onefs(self):
print 'Adding ONEFS config..'
self.cluster.add_config('onefs', random_tag('onefs'), { "onefs_host" : self.smart_connect_zone(self.core_site) })
print 'Adding core-site'
self.cluster.add_config('core-site', random_tag('new-core-site'), self.core_site)
print 'Adding hdfs-site'
self.cluster.add_config('hdfs-site', random_tag('new-hdfs-site'), self.hdfs_site)
print 'Adding hadoop-env-site'
self.cluster.add_config('hadoop-env', random_tag('new-hadoop-env'), self.hadoop_env)
def smart_connect_zone(self, core_site):
def_fs = core_site['fs.defaultFS']
if '://' in def_fs:
def_fs = def_fs.split('://')[1]
if ':' in def_fs:
def_fs = def_fs.split(':')[0]
return def_fs
def install_onefs_clients(self, hdfs_client_hosts):
print 'Adding ONEFS_CLIENT to hosts: %s' % (hdfs_client_hosts)
results = [self.add_onefs_client(each) for each in hdfs_client_hosts]
for each in results:
each.await()
def add_onefs_client(self, hostname):
try:
return self.cluster.add_host_component('ONEFS', 'ONEFS_CLIENT', hostname)
except urllib2.HTTPError as e:
if e.code == 409:
print 'Already added to host %s' % hostname
return NoResult()
else:
raise e
def start_all_services(self):
print 'Starting all services..'
self.cluster.start_all().await()
def random_tag(tag_name): return "%s-%s" % (tag_name, time.time())
class CommandLine:
def __init__(self):
self.parser = OptionParser()
self.parser.add_option("-o", '--host', dest='host', help='Ambari server host', default='localhost')
self.parser.add_option("-p", '--port', dest='port', help='Ambari server port', default='8080')
self.parser.add_option("-c", '--cluster', dest='cluster_name', help='Cluster name')
self.parser.add_option("-u", '--user', dest='admin_user', help='Admin user name', default='admin')
self.parser.add_option("-k", '--password', dest='admin_pass', help='Admin user name', default='admin')
self.parser.add_option("-t", '--protocol', dest='protocol', help='HTTP protocol', default='http')
def parse_options(self):
options, args = self.parser.parse_args()
if not options.cluster_name:
self.parser.error('Missing cluster name.')
if not options.protocol or options.protocol.lower() not in ['http', 'https']:
self.parser.error('Invalid protocol. Use http or https.')
if not options.port or not options.port.isdigit():
self.parser.error('Port should be an integer')
return options
if __name__ == '__main__':
options = CommandLine().parse_options()
cluster = Cluster(
options.cluster_name,
options.host,
port=int(options.port),
protocol=options.protocol.lower(),
user=options.admin_user,
password=options.admin_pass)
print 'This script will replace the HDFS service to ONEFS'
print 'The following prerequisites are required:'
print ' * ONEFS management package must be installed'
print ' * Ambari must be upgraded to >=v2.7.1'
print ' * Stack must be upgraded to >=HDP-3.0'
print ' * Is highly recommended to backup ambari database before you proceed.'
conversion = Conversion(cluster, FsStorage())
if not conversion.check_prerequisites():
sys.exit()
else:
conversion.perform()