blob: 0463111547fd9820aa4ab10fb3e76ba044740a92 [file] [log] [blame]
#!/usr/bin/python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Deploys a new Impala_Kudu service, either based on an existing Impala service
# or from scratch.
#
# Prerequisites:
# - A cluster running CDH 5.4.x and Cloudera Manager 5.4.x with x >= 7
# - CM API Python client (http://cloudera.github.io/cm_api/docs/python-client).
#
# Sample usage:
#
# ./deploy.py clone IMPALA_KUDU IMPALA-1
# Clones IMPALA-1 into a new Impala_Kudu service called "IMPALA_KUDU".
#
# ./deploy.py create new_service /data/impala/
# Creates a new Impala_Kudu service called "new_service" using /data/impala/
# for its scratch directories.
import argparse
import hashlib
import os
import re
import time
from cm_api.api_client import ApiResource
IMPALA_KUDU_PARCEL_URL = os.getenv("IMPALA_KUDU_PARCEL_URL",
"http://archive.cloudera.com/beta/impala-kudu/parcels/latest")
IMPALA_KUDU_PARCEL_PRODUCT = "IMPALA_KUDU"
MAX_PARCEL_REPO_WAIT_SECS = 60
MAX_PARCEL_WAIT_SECS = 60 * 30
SERVICE_DEPENDENCIES = {
"HDFS" : True,
"HIVE" : True,
"YARN" : False,
"HBASE" : False,
"SENTRY" : False,
"ZOOKEEPER" : False
}
def parse_args():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--host", type=str,
default="localhost",
help="Hostname of the Cloudera Manager server.")
parser.add_argument("--user", type=str,
default="admin",
help="Username with which to log into Cloudera Manager.")
parser.add_argument("--password", type=str,
default="admin",
help="Password with which to log into Cloudera Manager.")
parser.add_argument("--cluster", type=str,
help="Name of existing cluster where the Impala_Kudu service "
"should be added. If not specified, uses the only cluster or "
"raises an exception if multiple clusters are found.")
parents_parser = argparse.ArgumentParser(add_help=False)
parents_parser.add_argument("service_name", type=str,
help="Name of Impala_Kudu service to create.")
subparsers = parser.add_subparsers(dest="subparsers_name")
clone_parser = subparsers.add_parser("clone",
parents=[parents_parser],
help="Use an existing Impala service as a template for "
"the new Impala_Kudu service. To be used when Impala_"
"Kudu is to run side-by-side with an existing Impala.")
clone_parser.add_argument("based_on", type=str,
help="Name of existing Impala service to clone as the basis for the "
"new service.")
create_parser = subparsers.add_parser("create",
parents=[parents_parser],
help="create a new Impala_Kudu service from scratch. To "
"be used when Impala_Kudu runs in its own cluster.")
create_parser.add_argument("--master_host", type=str,
help="Hostname where new Impala_Kudu service's master roles should "
"be placed. If not specified, uses the Cloudera Manager Server host "
"or raises an exception if that host is not managed.")
for service_type, required in SERVICE_DEPENDENCIES.iteritems():
create_parser.add_argument("--%s_dependency" % (service_type.lower(),),
type=str,
help="Name of %s service that the new Impala_Kudu service "
"should depend on. If not specified, will use only service of "
"that type in the cluster. Will raise an exception if exactly "
"one instance of that service is not found in the cluster. %s" %
(service_type, "REQUIRED." if required else ""))
create_parser.add_argument("scratch_dirs", type=str,
help="Comma-separated list of scratch directories to use in the new "
"Impala_Kudu service.")
return parser.parse_args()
def find_cluster(api, cluster_name):
if cluster_name:
cluster = api.get_cluster(cluster_name)
else:
all_clusters = api.get_all_clusters()
if len(all_clusters) == 0:
raise Exception("No clusters found; create one before calling this script")
if len(all_clusters) > 1:
raise Exception("Cannot use implicit cluster; there is more than one available")
cluster = all_clusters[0]
print("Found cluster: %s" % (cluster.displayName, ))
return cluster
def find_dependencies(args, cluster):
deps = []
# { service type : { service name : service }}
services_by_type = {}
for service in cluster.get_all_services():
service_dict = services_by_type.get(service.type, {})
service_dict[service.name] = service
services_by_type[service.type] = service_dict
for service_type, required in SERVICE_DEPENDENCIES.iteritems():
candidates = services_by_type.get(service_type, {})
arg = getattr(args, service_type.lower() + "_dependency")
if arg:
found = candidates.get(arg, None)
if not found:
raise Exception("Could not find dependency service (type %s, name %s)" %
(service_type, arg))
print "Found explicit dependency service %s" % (found.name)
deps.append(found)
else:
if not required:
print "Skipping optional dependency of type %s" % (service_type,)
continue
if len(candidates) > 1:
raise Exception("Found %d possible implicit dependency services of type %s" %
(len(candidates), service_type))
elif len(candidates) == 0:
raise Exception("Could not find implicit dependency service of type %s" %
(service_type,))
else:
found = candidates.values()[0]
print "Found implicit dependency service %s" % (found.name,)
deps.append(found)
return deps
def check_new_service_does_not_exist(api, cluster, new_name):
for service in cluster.get_all_services():
if service.displayName == new_name:
raise Exception("New service name %s already in use" % (new_name,))
print "New service name %s is not in use" % (new_name,)
def find_template_service(api, cluster, based_on):
template_service = None
for service in cluster.get_all_services():
if based_on and service.displayName == based_on:
if service.type != "IMPALA":
raise Exception("Based-on service %s is of wrong type %s" %
(based_on, service.type))
print "Found based-on service: %s" % (based_on,)
template_service = service
if based_on and not template_service:
raise Exception("Could not find based-on service: %s" % (based_on,))
return template_service
def find_master_host(api, cm_hostname, master_hostname):
for h in api.get_all_hosts():
if master_hostname and h.hostname == master_hostname:
print "Found master host %s" % (master_hostname,)
return h
elif not master_hostname and h.hostname == cm_hostname:
print "Found implicit master host on CM host %s" % (cm_hostname,)
return h
if master_hostname:
raise Exception("Could not find master host with hostname %s" % (master_hostname,))
else:
raise Exception("Could not find implicit master host %s" % (cm_hostname,))
def get_best_parcel(api, cluster):
parcels_available_remotely = []
parcels_downloaded = []
parcels_distributed = []
parcels_activated = []
for parcel in cluster.get_all_parcels():
if parcel.product == IMPALA_KUDU_PARCEL_PRODUCT:
if parcel.stage == "AVAILABLE_REMOTELY":
parcels_available_remotely.append(parcel)
elif parcel.stage == "DOWNLOADED":
parcels_downloaded.append(parcel)
elif parcel.stage == "DISTRIBUTED":
parcels_distributed.append(parcel)
elif parcel.stage == "ACTIVATED":
parcels_activated.append(parcel)
def parcel_cmp(p1, p2):
if p1.version < p2.version:
return -1
elif p1.version > p2.version:
return 1
else:
return 0
# Prefer the "closest" parcel, even if it's not the newest by version.
if len(parcels_activated) > 0:
parcel = sorted(parcels_activated, key=lambda parcel: parcel.version)[0]
elif len(parcels_distributed) > 0:
parcel = sorted(parcels_distributed, key=lambda parcel: parcel.version)[0]
elif len(parcels_downloaded) > 0:
parcel = sorted(parcels_downloaded, key=lambda parcel: parcel.version)[0]
elif len(parcels_available_remotely) > 0:
parcel = sorted(parcels_available_remotely, key=lambda parcel: parcel.version)[0]
else:
parcel = None
if parcel:
print "Chose best parcel %s-%s (stage %s)" % (parcel.product,
parcel.version,
parcel.stage)
else:
print "Found no candidate parcels"
return parcel
def ensure_parcel_repo_added(api):
cm = api.get_cloudera_manager()
config = cm.get_config(view='summary')
parcel_urls = config.get("REMOTE_PARCEL_REPO_URLS", "").split(",")
if IMPALA_KUDU_PARCEL_URL in parcel_urls:
print "Impala_Kudu parcel URL already present"
else:
print "Adding Impala_Kudu parcel URL"
parcel_urls.append(IMPALA_KUDU_PARCEL_URL)
config["REMOTE_PARCEL_REPO_URLS"] = ",".join(parcel_urls)
cm.update_config(config)
def wait_for_parcel_stage(cluster, parcel, stage):
for attempt in xrange(1, MAX_PARCEL_WAIT_SECS + 1):
new_parcel = cluster.get_parcel(parcel.product, parcel.version)
if new_parcel.stage == stage:
return
if new_parcel.state.errors:
raise Exception(str(new_parcel.state.errors))
print "progress: %s / %s" % (new_parcel.state.progress,
new_parcel.state.totalProgress)
time.sleep(1)
else:
raise Exception("Parcel %s-%s did not reach stage %s in %d seconds" %
(parcel.product, parcel.version, stage, MAX_PARCEL_WAIT_SECS,))
def ensure_parcel_activated(cluster, parcel):
parcel_stage = parcel.stage
if parcel_stage == "AVAILABLE_REMOTELY":
print "Downloading parcel: %s-%s " % (parcel.product, parcel.version)
parcel.start_download()
wait_for_parcel_stage(cluster, parcel, "DOWNLOADED")
print "Downloaded parcel: %s-%s " % (parcel.product, parcel.version)
parcel_stage = "DOWNLOADED"
if parcel_stage == "DOWNLOADED":
print "Distributing parcel: %s-%s " % (parcel.product, parcel.version)
parcel.start_distribution()
wait_for_parcel_stage(cluster, parcel, "DISTRIBUTED")
print "Distributed parcel: %s-%s " % (parcel.product, parcel.version)
parcel_stage = "DISTRIBUTED"
if parcel_stage == "DISTRIBUTED":
print "Activating parcel: %s-%s " % (parcel.product, parcel.version)
parcel.activate()
wait_for_parcel_stage(cluster, parcel, "ACTIVATED")
print "Activated parcel: %s-%s " % (parcel.product, parcel.version)
parcel_stage = "ACTIVATED"
print "Parcel %s-%s is activated" % (parcel.product, parcel.version)
def print_configs(entity_name, config_dict):
for attr, value in config_dict.iteritems():
print "Set %s config %s=\'%s\'" % (entity_name, attr, value)
def create_new_service(api, cluster, new_name, deps, scratch_dirs, master_host):
new_service = cluster.create_service(new_name, "IMPALA")
print "Created new service %s" % (new_name,)
service_config = {}
for d in deps:
service_config[d.type.lower() + "_service"] = d.name
service_config["impala_service_env_safety_valve"] = "IMPALA_KUDU=1"
new_service.update_config(service_config)
print_configs("service " + new_name, service_config)
for rcg in new_service.get_all_role_config_groups():
if rcg.roleType == "IMPALAD":
scratch_dirs_dict = { "scratch_dirs" : scratch_dirs }
rcg.update_config(scratch_dirs_dict)
print_configs("rcg " + rcg.displayName, scratch_dirs_dict)
for h in cluster.list_hosts():
if h.hostId == master_host.hostId:
continue
# This formula is embedded within CM. If we don't strictly
# adhere to it, we can't use any %s-%s-%s naming scheme.
md5 = hashlib.md5()
md5.update(h.hostId)
new_role_name = "%s-%s-%s" % (new_name, rcg.roleType, md5.hexdigest())
new_service.create_role(new_role_name, rcg.roleType, h.hostId)
print "Created new role %s" % (new_role_name,)
else:
md5 = hashlib.md5()
md5.update(master_host.hostId)
new_role_name = "%s-%s-%s" % (new_name, rcg.roleType, md5.hexdigest())
new_service.create_role(new_role_name, rcg.roleType, master_host.hostId)
print "Created new role %s" % (new_role_name,)
def transform_path(rcg_name, rcg_config_dict, rcg_config_name):
# TODO: Do a better job with paths where the role type is embedded.
#
# e.g. /var/log/impalad/lineage --> /var/log/impalad2/lineage
val = rcg_config_dict.get(rcg_config_name, None)
if not val:
raise Exception("Could not get %s config for rcg %s" %
(rcg_config_name, rcg_name,))
new_val = re.sub(r"/(.*?)(/?)$", r"/\g<1>2\g<2>", val)
return {rcg_config_name : new_val}
def transform_port(rcg_name, rcg_config_dict, rcg_config_name):
# TODO: Actually resolve all port conflicts.
val = rcg_config_dict.get(rcg_config_name, None)
if not val:
raise Exception("Could not get %s config for rcg %s" %
(rcg_config_name, rcg_name,))
try:
val_int = int(val)
except ValueError, e:
raise Exception("Could not convert %s config (%s) for rcg %s into integer" %
(rcg_config_name, val, rcg_name))
new_val = str(val_int + 7)
return {rcg_config_name : new_val}
def transform_rcg_config(rcg):
summary = rcg.get_config()
full = {}
for name, config in rcg.get_config("full").iteritems():
full[name] = config.value if config.value else config.default
new_config = summary
if rcg.roleType == "IMPALAD":
new_config.update(transform_path(rcg.name, full, "audit_event_log_dir"))
new_config.update(transform_path(rcg.name, full, "lineage_event_log_dir"))
new_config.update(transform_path(rcg.name, full, "log_dir"))
new_config.update(transform_path(rcg.name, full, "scratch_dirs"))
new_config.update(transform_port(rcg.name, full, "be_port"))
new_config.update(transform_port(rcg.name, full, "beeswax_port"))
new_config.update(transform_port(rcg.name, full, "hs2_port"))
new_config.update(transform_port(rcg.name, full, "impalad_webserver_port"))
new_config.update(transform_port(rcg.name, full, "state_store_subscriber_port"))
elif rcg.roleType == "CATALOGSERVER":
new_config.update(transform_path(rcg.name, full, "log_dir"))
new_config.update(transform_port(rcg.name, full, "catalog_service_port"))
new_config.update(transform_port(rcg.name, full, "catalogserver_webserver_port"))
elif rcg.roleType == "STATESTORE":
new_config.update(transform_path(rcg.name, full, "log_dir"))
new_config.update(transform_port(rcg.name, full, "state_store_port"))
new_config.update(transform_port(rcg.name, full, "statestore_webserver_port"))
return new_config
def clone_existing_service(cluster, new_name, template_service):
new_service = cluster.create_service(new_name, "IMPALA")
print "Created new service %s" % (new_name,)
service_config, _ = template_service.get_config()
service_config["impala_service_env_safety_valve"] = "IMPALA_KUDU=1"
new_service.update_config(service_config)
print_configs("service " + new_name, service_config)
saved_special_port = None
i = 0
for old_rcg in template_service.get_all_role_config_groups():
if old_rcg.name != ("%s-%s-BASE" % (template_service.name, old_rcg.roleType)):
new_rcg_name = "%s-%s-%d" % (new_name, old_rcg.roleType, i)
i += 1
new_rcg = new_service.create_role_config_group(new_rcg_name,
new_rcg_name,
old_rcg.roleType)
print "Created new rcg %s" % (new_rcg_name,)
else:
new_rcg = new_service.get_role_config_group("%s-%s-BASE" % (new_name,
old_rcg.roleType))
new_rcg_config = transform_rcg_config(old_rcg)
new_rcg.update_config(new_rcg_config)
print_configs("rcg " + new_rcg.displayName, new_rcg_config)
special_port = new_rcg_config.get("state_store_subscriber_port", None)
if special_port:
saved_special_port = special_port
new_role_names = []
for old_role in old_rcg.get_all_roles():
md5 = hashlib.md5()
md5.update(old_role.hostRef.hostId)
new_role_name = "%s-%s-%s" % (new_name, new_rcg.roleType, md5.hexdigest())
new_role = new_service.create_role(new_role_name,
new_rcg.roleType,
old_role.hostRef.hostId)
print "Created new role %s" % (new_role_name,)
new_role_names.append(new_role.name)
new_rcg.move_roles(new_role_names)
for new_rcg in new_service.get_all_role_config_groups():
if new_rcg.roleType == "CATALOGSERVER":
special_port_config_dict = { "catalogd_cmd_args_safety_valve" :
"-state_store_subscriber_port=%s" % (saved_special_port,) }
new_rcg.update_config(special_port_config_dict)
print_configs("rcg " + new_rcg.displayName, special_port_config_dict)
def main():
args = parse_args()
api = ApiResource(args.host,
username=args.user,
password=args.password,
version=10)
cluster = find_cluster(api, args.cluster)
check_new_service_does_not_exist(api, cluster, args.service_name)
if args.subparsers_name == "clone":
template_service = find_template_service(api, cluster, args.based_on)
else:
master_host = find_master_host(api, args.host, args.master_host)
deps = find_dependencies(args, cluster)
parcel = get_best_parcel(api, cluster)
if not parcel:
ensure_parcel_repo_added(api)
for attempt in xrange(1, MAX_PARCEL_REPO_WAIT_SECS + 1):
parcel = get_best_parcel(api, cluster)
if parcel:
break
print "Could not find parcel in attempt %d, will sleep and retry" % (attempt,)
time.sleep(1)
else:
raise Exception("No parcel showed up in %d seconds" % (MAX_PARCEL_REPO_WAIT_SECS,))
ensure_parcel_activated(cluster, parcel)
if args.subparsers_name == "create":
create_new_service(api, cluster, args.service_name, deps, args.scratch_dirs, master_host)
else:
clone_existing_service(cluster, args.service_name, template_service)
if __name__ == "__main__":
main()