| #!/usr/bin/env python |
| |
| ''' |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| ''' |
| |
| """ |
| Upgrade catalog file format description: |
| |
| |
| Format version 1.0 |
| |
| Global section description: |
| STACKNAME - name of stack, for example HDP |
| OLDVERSION - version of stack from which upgrade should be done, used by fromStack script argument |
| NEWVERSION - version of stack to which upgrade should be done, used by toStack script argument |
| |
| Sub-section options: |
| config-types - contains global per-config settings |
| merged-copy - would merge latest server properties with properties defined in "properties" section, |
| without this option server properties would be rewritten by properties defined in "properties" section |
| required-services - properties from json catalog would be processed only if desired services are present on the cluster |
| property level definition will always override catalog level definition. |
| |
| Sub-section properties - Contains property definition |
| Sub-section property-mapping(optional) - contains mapping of property names in case, if some property changed their name in NEWVERSION |
| |
| Example: |
| |
| { |
| "version": "1.0", |
| "stacks": [ |
| { |
| "name": "STACKNAME", |
| "old-version": "OLDVERSION", |
| "target-version": "NEWVERSION", |
| "options": { |
| "config-types": { |
| "CONFIGTYPE1": { |
| "merged-copy": "yes", |
| "required-services": ["HDFS"] |
| } |
| } |
| }, |
| "properties": { |
| "CONFIGTYPE1": { |
| "some_property": "some property value", |
| "some_second_property: { |
| "remove": "yes" |
| }, |
| "template_property": { |
| "value": "{TEMPLATE_TAG}", |
| "template": "yes", |
| "required-services": ["HDFS", "YARN"] |
| }, |
| "test_property": { |
| "value": "new value", |
| "override: "no", (optional, override already existed property yes/no. Default: yes) |
| "value-required": "old value", (optional, property would be set if the required value is present) |
| "can-create": "no", (optional, process property only if that property present on the server. |
| i.e. ability to create new property. Default: yes) |
| "required-services": ["HDFS", "YARN"], (optional, process property only if selected services existed) |
| "resolve-dependency": "no" (optional, use Stack Advisor to get depended properties changes. Default: no) |
| } |
| } |
| }, |
| "property-mapping": { |
| "old-property-name": "new-property-name", (short form, equal to "old-property-name": { "map-to": "new-property-name" }) |
| "old-property1-name": { (usually key is an name of the property which need to be mapped, but in case of same |
| property should be set to unique name and "map-from" option used instead) |
| "map-from": "old property name", (optional, define property name which should be mapped) |
| "map-to": "new_property1_name", (optional, new property name. If not set, would be used old property name) |
| "from-catalog": "test", (optional, require "to-catalog. Source of old-property1-name) |
| "to-catalog": "test", (optional, require "from-catalog. Target of new_property1_name) |
| "default": "default value", (optional, if set and old property not exists, new one would be created with default value) |
| "template": "yes", (optional, template parsing for default option) |
| "coerce-to": "pre-defined type", (optional, convert value from one type to another. Types supported: |
| yaml-array - converts string item1,item2 to ['item1', 'item2'] |
| ) |
| "replace-from": "something", (optional, should be present both from and to. Replace 'from' value to 'to') |
| "replace-to": "something, |
| "required-services": ["YARN"], (optional, process entry if services in the list existed on the cluster |
| } |
| } |
| } |
| ] |
| } |
| |
| More examples available in ambari-server/src/main/resources/upgrade/catalog/ |
| """ |
| |
| import getpass |
| import optparse |
| from pprint import pprint |
| import re |
| import sys |
| import os.path |
| import logging |
| import time |
| import base64 |
| from urllib2 import HTTPPasswordMgrWithDefaultRealm, HTTPBasicAuthHandler, Request, build_opener, URLError, HTTPError |
| |
| try: |
| # try to import new simplejson version, which should be faster than outdated python 2.6 version |
| import ambari_simplejson as json |
| except ImportError: |
| import json |
| |
| |
| # ============================== |
| # Error classes definition |
| # ============================== |
| class FatalException(Exception): |
| def __init__(self, code, reason): |
| self.code = code |
| self.reason = reason |
| |
| def __str__(self): |
| return repr("Fatal exception: %s, exit code %s" % (self.reason, self.code)) |
| |
| def _get_message(self): |
| return str(self) |
| |
| |
| class ReadOnlyPropertyException(Exception): |
| def __str__(self): |
| return "Property is read-only" |
| |
| def _get_message(self): |
| return self.__str__() |
| |
| |
| class NotSupportedCatalogVersion(Exception): |
| def __init__(self, catalog_version): |
| self._version = catalog_version |
| |
| def __str__(self): |
| return "Version %s of loaded catalog not supported" % self._version |
| |
| def _get_message(self): |
| return self.__str__() |
| |
| message = property(__str__) |
| |
| |
| class CatalogNotFoundException(Exception): |
| pass |
| |
| |
| class TemplateProcessingException(Exception): |
| pass |
| |
| |
| class CatalogExistException(Exception): |
| pass |
| |
| |
| class PropertyNotFoundException(Exception): |
| pass |
| |
| |
| class StackNotFoundException(Exception): |
| pass |
| |
| |
| class MalformedPropertyDefinitionException(Exception): |
| pass |
| |
| |
| # ============================== |
| # Constant class definition |
| # ============================== |
| class Const(object): |
| def __new__(cls, *args, **kwargs): |
| raise Exception("Class couldn't be created") |
| |
| |
| class Options(Const): |
| # action commands |
| API_PROTOCOL = "http" |
| API_PORT = "8080" |
| |
| GET_MR_MAPPING_ACTION = "save-mr-mapping" |
| VERIFY_ACTION = "verify" |
| DELETE_MR_ACTION = "delete-mr" |
| ADD_YARN_MR2_ACTION = "add-yarn-mr2" |
| MODIFY_CONFIG_ACTION = "update-configs" |
| BACKUP_CONFIG_ACTION = "backup-configs" |
| INSTALL_YARN_MR2_ACTION = "install-yarn-mr2" |
| |
| MR_MAPPING_FILE = "mr_mapping" |
| CAPACITY_SCHEDULER_TAG = "capacity-scheduler" |
| REPLACE_JH_HOST_NAME_TAG = "REPLACE_JH_HOST" |
| REPLACE_RM_HOST_NAME_TAG = "REPLACE_RM_HOST" |
| REPLACE_WITH_TAG = "REPLACE_WITH_" |
| PHOENIX_QUERY_SERVER = "PHOENIX_QUERY_SERVER" |
| ZK_OPTIONS = "zoo.cfg" |
| KAFKA_BROKER_CONF = "kafka-broker" |
| RANGER_ADMIN = "admin-properties" |
| RANGER_USERSYNC = "usersync-properties" |
| RANGER_ENV = "ranger-env" |
| KAFKA_PORT = "port" |
| RANGER_EXTERNAL_URL = "policymgr_external_url" |
| ZK_CLIENTPORT = "clientPort" |
| DELETE_OLD_TAG = "DELETE_OLD" |
| |
| ZOOKEEPER_SERVER = "ZOOKEEPER_SERVER" |
| KAFKA_BROKER = "KAFKA_BROKER" |
| NAMENODE = "NAMENODE" |
| |
| MR_MAPPING = None |
| logger = None |
| server_config_factory = None |
| """:type : ServerConfigFactory""" |
| stack_advisor = None |
| """:type : StackAdvisor""" |
| ambari_server = None |
| """:type : AmbariServer""" |
| |
| # Api constants |
| ROOT_URL = None |
| CLUSTER_URL = None |
| COMPONENTS_FORMAT = None |
| TEZ_VIEW_URL = None |
| |
| # Curl options |
| CURL_PRINT_ONLY = None |
| CURL_WRITE_ONLY = None |
| |
| ARGS = None |
| OPTIONS = None |
| HOST = None |
| CLUSTER_NAME = None |
| |
| # for verify action |
| REPORT_FILE = None |
| |
| SERVICES = [] |
| |
| API_TOKENS = { |
| "user": None, |
| "pass": None |
| } |
| |
| HEADERS = { |
| 'X-Requested-By': 'upgradeHelper' |
| } |
| |
| @classmethod |
| def initialize(cls): |
| cls.ROOT_URL = '%s://%s:%s/api/v1' % (cls.API_PROTOCOL, cls.HOST, cls.API_PORT) |
| cls.CLUSTER_URL = cls.ROOT_URL + "/clusters/%s" % cls.CLUSTER_NAME |
| cls.COMPONENTS_URL = cls.CLUSTER_URL + "/components?fields=ServiceComponentInfo/total_count" |
| cls.COMPONENTS_FORMAT = cls.CLUSTER_URL + "/components/{0}" |
| cls.TEZ_VIEW_URL = cls.ROOT_URL + "/views/TEZ" |
| cls.STACKS_URL = cls.ROOT_URL + "/stacks" |
| cls.STACKS_VERSIONS_URL = cls.STACKS_URL + "/{0}/versions" |
| cls.STACK_ADVISOR_URL = cls.STACKS_VERSIONS_URL + "/{1}/recommendations" |
| cls.AMBARI_SERVER_URL = cls.ROOT_URL + "/services/AMBARI/components/AMBARI_SERVER" |
| cls.AMBARI_AGENTS_URL = cls.ROOT_URL + "/services/AMBARI/components/AMBARI_AGENT" |
| if cls.CLUSTER_NAME is not None and cls.HOST is not None: |
| cls.SERVICES = set(map(lambda x: x.upper(), get_cluster_services())) |
| |
| cls.ambari_server = AmbariServer() |
| if not cls.isPropertyAttributesSupported(): |
| cls.logger.warning("Property attributes not supported by current Ambari version") |
| |
| @classmethod |
| def isPropertyAttributesSupported(cls): |
| if cls.ambari_server.server_version[0] * 10 + cls.ambari_server.server_version[1] >= 17: |
| return True |
| return False |
| |
| @classmethod |
| def initialize_logger(cls, filename=None): |
| cls.logger = logging.getLogger('UpgradeHelper') |
| cls.logger.setLevel(logging.DEBUG) |
| |
| if filename is not None: |
| handler = logging.FileHandler(filename) |
| handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s - %(message)s')) |
| cls.logger.addHandler(handler) |
| cls.logger.info("") |
| cls.logger.info("Start new logging section") |
| |
| handler = logging.StreamHandler(sys.stdout) |
| handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) |
| cls.logger.addHandler(handler) |
| |
| |
| class CatConst(Const): |
| VERSION_TAG = "version" |
| STACK_VERSION_OLD = "old-version" |
| STACK_VERSION_TARGET = "target-version" |
| STACK_STAGS_TAG = "stacks" |
| STACK_NAME = "name" |
| CONFIG_OPTIONS = "options" |
| CONFIG_TYPES = "config-types" |
| STACK_PROPERTIES = "properties" |
| STACK_PROPERTIES_ATTRIBUTES = "properties_attributes" |
| PROPERTY_VALUE_TAG = "value" |
| VERSIONS_TAG = "versions" |
| PROPERTY_REMOVE_TAG = "remove" |
| PROPERTY_MAP_TO = "map-to" |
| PROPERTY_MAP_FROM = "map-from" |
| PROPERTY_FROM_CATALOG = "from-catalog" |
| PROPERTY_TO_CATALOG = "to-catalog" |
| PROPERTY_DEFAULT = "default" |
| MERGED_COPY_TAG = "merged-copy" |
| REQUIRED_SERVICES = "required-services" |
| COERCE_TO_PROPERTY_TAG = "coerce-to" |
| RESOLVE_DEPENDENCY_TAG = "resolve-dependency" |
| COERCE_YAML_OPTION_TAG = "yaml-array" |
| REPLACE_FROM_TAG = "replace-from" |
| REPLACE_TO_TAG = "replace-to" |
| OVERRIDE_TAG = "override" |
| ITEMS_TAG = "items" |
| TYPE_TAG = "type" |
| TRUE_TAG = "yes" |
| VALUE_REQUIRED_TAG = "value-required" |
| PROPERTY_CAN_CREATE_TAG = "can-create" |
| STACK_PROPERTIES_MAPPING_LIST_TAG = "property-mapping" |
| VALUE_TEMPLATE_TAG = "template" |
| SEARCH_PATTERN = "(\{[^\{\}]+\})" # {XXXXX} |
| ACTION_COMMIT = "commit" |
| ACTION_RELOAD = "reload" |
| ACTION_RENAME_PROPERTY = "rename-property" |
| TEMPLATE_HANDLER = "template_handler" |
| |
| |
| # ============================== |
| # Catalog classes definition |
| # ============================== |
| |
| class AmbariServer(object): |
| def __init__(self): |
| Options.logger.info("Resolving Ambari server configuration ...") |
| self._get_server_info() |
| self._get_agents_info() |
| self._get_components() |
| |
| def _get_components(self): |
| info = curl(Options.COMPONENTS_URL, parse=True) |
| self._components = [] |
| if CatConst.ITEMS_TAG in info: |
| for item in info[CatConst.ITEMS_TAG]: |
| if "ServiceComponentInfo" in item and "total_count" in item["ServiceComponentInfo"] and \ |
| int(item["ServiceComponentInfo"]["total_count"]) > 0 and "component_name" in item["ServiceComponentInfo"]: |
| self._components.append(item["ServiceComponentInfo"]["component_name"]) |
| |
| def _get_server_info(self): |
| info = curl(Options.AMBARI_SERVER_URL, parse=True) |
| self._server_version = [0, 0, 0] |
| |
| if "RootServiceComponents" in info: |
| server_props = info["RootServiceComponents"] |
| ver = server_props["component_version"] if "component_version" in server_props else None |
| try: |
| self._server_version = list(map(lambda x: int(x), ver.split("."))) |
| except ValueError: |
| pass |
| |
| def _get_agents_info(self): |
| info = curl(Options.AMBARI_AGENTS_URL, parse=True) |
| self._agents = [] |
| if "hostComponents" in info: |
| agent_props = info["hostComponents"] |
| self._agents = list(map(lambda x: x["RootServiceHostComponents"]["host_name"], agent_props)) |
| |
| @property |
| def components(self): |
| return self._components |
| |
| @property |
| def server_version(self): |
| return self._server_version |
| |
| @property |
| def agent_hosts(self): |
| return self._agents |
| |
| class StackAdvisorFactory(object): |
| def __init__(self): |
| self._stack_info = self._load_stack_info() |
| |
| def _load_stack_versions(self, stack): |
| versions = curl(Options.STACKS_VERSIONS_URL.format(stack), parse=True) |
| if CatConst.ITEMS_TAG in versions: |
| versions = list(map(lambda x: x["Versions"]["stack_version"], versions[CatConst.ITEMS_TAG])) |
| |
| return versions |
| |
| def _load_stack_info(self): |
| stacks = curl(Options.STACKS_URL, parse=True) |
| if CatConst.ITEMS_TAG in stacks: |
| stacks = list(map(lambda x: x["Stacks"]["stack_name"], stacks["items"])) |
| else: |
| stacks = {} |
| |
| stacks_dict = {} |
| |
| for stack in stacks: |
| stacks_dict[stack] = self._load_stack_versions(stack) |
| |
| return stacks_dict |
| |
| def get_instance(self, stack, version): |
| sversion = Options.ambari_server.server_version |
| if sversion[0] * 10 + sversion[1] < 21: |
| Options.logger.warning("Ambari server version \"%s.%s.%s\" doesn't support property dependencies suggestion" % |
| (sversion[0], sversion[1], sversion[2])) |
| return BaseStackAdvisor(stack, version) |
| |
| if stack in self._stack_info and version in self._stack_info[stack]: |
| return StackAdvisor(stack, version) |
| else: |
| raise StackNotFoundException("Stack %s-%s not exist on the server" % (stack, version)) |
| |
| class StackAdvisorRequestProperty(object): |
| def __init__(self, catalog, property_name): |
| self._catalog = catalog |
| self._property_name = property_name |
| |
| @property |
| def catalog(self): |
| return self._catalog |
| |
| @property |
| def name(self): |
| return self._property_name |
| |
| def get_json(self): |
| return { |
| "type": self.catalog, |
| "name": self.name |
| } |
| |
| |
| class BaseStackAdvisor(object): |
| def __init__(self, stack, version): |
| self._req_url = Options.STACK_ADVISOR_URL.format(stack, version) |
| |
| def get_suggestion(self, cfg_factory, changed_properties): |
| return {} |
| |
| |
| class StackAdvisor(BaseStackAdvisor): |
| def __init__(self, stack, version): |
| super(StackAdvisor, self).__init__(stack, version) |
| |
| def _transform_properties(self, cfg_factory): |
| """ |
| Transform properties list to blueprint output format |
| :type cfg_factory: ServerConfigFactory |
| :rtype dict |
| """ |
| props = cfg_factory.get_json() |
| for cfg in props: |
| props[cfg] = { |
| "properties": props[cfg] |
| } |
| |
| return props |
| |
| def _from_blueprint_properties_transform(self, props): |
| """ |
| Transform SA response to dict |
| """ |
| for p in props: |
| rprop = {} |
| if "properties" in props[p] and props[p]["properties"] is not None: |
| rprop = props[p]["properties"] |
| if "property_attributes" in props[p]: |
| for property_attribute in props[p]["property_attributes"]: |
| if "delete" in props[p]["property_attributes"][property_attribute] and \ |
| props[p]["property_attributes"][property_attribute]["delete"] == "true": |
| rprop[property_attribute] = None |
| |
| props[p] = rprop |
| |
| return props |
| |
| def _generate_req_properties(self, properties): |
| rlist = [] |
| for item in properties: |
| if isinstance(item, StackAdvisorRequestProperty): |
| rlist.append(item.get_json()) |
| return rlist |
| |
| def get_suggestion(self, cfg_factory, changed_properties): |
| """ |
| :type cfg_factory: ServerConfigFactory |
| :type catalog_name str |
| :type changed_properties: list |
| :rtype dict |
| """ |
| request = { |
| "recommend": "configuration-dependencies", |
| "hosts": Options.ambari_server.agent_hosts, |
| "services": list(Options.SERVICES), |
| "changed_configurations": self._generate_req_properties(changed_properties), |
| "recommendations": { |
| "blueprint": { |
| "host_groups": [], |
| "configurations": self._transform_properties(cfg_factory), |
| "blueprint_cluster_binding": {} |
| } |
| } |
| } |
| response = curl(self._req_url, request_type="POST", data=request, parse=True) |
| if "resources" in response and isinstance(response["resources"], list) and len(response["resources"]) > 0: |
| response = response["resources"][0] |
| if "recommendations" in response and "blueprint" in response["recommendations"] and \ |
| "configurations" in response["recommendations"]["blueprint"]: |
| return self._from_blueprint_properties_transform(response["recommendations"]["blueprint"]["configurations"]) |
| |
| return {} |
| |
| |
| |
| class UpgradeCatalogFactory(object): |
| # versions of catalog which is currently supported |
| _supported_catalog_versions = ["1.0"] |
| |
| # private variables |
| _json_catalog = None |
| |
| def __init__(self, path): |
| self._load(path) |
| |
| def _load(self, path): |
| f = None |
| try: |
| f = open(path, 'r') |
| json_string = f.read() |
| self._json_catalog = json.loads(json_string) |
| self._parse_upgrade_catalog() |
| except IOError as e: |
| raise FatalException(e.errno, "Couldn't open upgrade catalog file %s: %s" % (path, e.strerror)) |
| except NotSupportedCatalogVersion as e: |
| raise FatalException(1, e.message) |
| except ValueError as e: |
| raise FatalException(1, "Malformed upgrade catalog: %s" % e.message) |
| finally: |
| try: |
| if f is not None: |
| f.close() |
| except IOError as e: |
| pass |
| |
| def _parse_upgrade_catalog(self): |
| catalog_version = None |
| if CatConst.VERSION_TAG in self._json_catalog: |
| catalog_version = self._json_catalog[CatConst.VERSION_TAG] |
| |
| if catalog_version is None or catalog_version not in self._supported_catalog_versions: |
| raise NotSupportedCatalogVersion(str(catalog_version)) |
| |
| def get_catalog(self, from_version=None, to_version=None): |
| search_version = { |
| CatConst.STACK_VERSION_OLD: from_version, |
| CatConst.STACK_VERSION_TARGET: to_version |
| } |
| |
| for stack in self._json_catalog[CatConst.STACK_STAGS_TAG]: |
| version = { |
| CatConst.STACK_VERSION_OLD: stack[CatConst.STACK_VERSION_OLD], |
| CatConst.STACK_VERSION_TARGET: stack[CatConst.STACK_VERSION_TARGET] |
| } |
| if version == search_version: |
| return UpgradeCatalog(catalog=stack, version=version) |
| |
| return None |
| |
| |
| class UpgradeCatalog(object): |
| # private variables |
| _json_catalog = None |
| _properties_catalog = None |
| _properties_map_catalog = None |
| _version = None |
| _search_pattern = None |
| _catalog_options = None |
| |
| def __init__(self, catalog=None, version=None): |
| self._handlers = {} |
| self._json_catalog = catalog |
| self._version = version |
| self._search_pattern = re.compile(CatConst.SEARCH_PATTERN) |
| |
| if CatConst.STACK_PROPERTIES in catalog: |
| self._properties_catalog = self._format_catalog_properties(catalog[CatConst.STACK_PROPERTIES]) |
| |
| if CatConst.STACK_PROPERTIES_MAPPING_LIST_TAG in catalog: |
| self._properties_map_catalog = PropertyMapping(catalog[CatConst.STACK_PROPERTIES_MAPPING_LIST_TAG]) |
| else: |
| self._properties_map_catalog = PropertyMapping() |
| |
| if catalog is not None and CatConst.CONFIG_OPTIONS in catalog \ |
| and CatConst.CONFIG_TYPES in catalog[CatConst.CONFIG_OPTIONS]: |
| self._catalog_options = catalog[CatConst.CONFIG_OPTIONS] |
| |
| def add_handler(self, name, handler): |
| if name not in self._handlers: |
| self._handlers[name] = handler |
| |
| def _format_catalog_properties(self, properties): |
| """ |
| Transform properties from short form to normal one: |
| "property": "text" => "property": { "value": "text" } |
| :param properties: dict |
| :return: dict |
| """ |
| for config_item in properties: |
| cfg_item = properties[config_item] |
| |
| """ |
| case when "properties": { |
| "yarn-site": { |
| ..... |
| } |
| } |
| is set like "properties": { |
| "yarn-site": "" |
| } |
| """ |
| if not isinstance(cfg_item, dict): |
| raise MalformedPropertyDefinitionException("The property catalog '%s' definition error" % config_item) |
| |
| properties[config_item] = dict(zip( |
| cfg_item.keys(), |
| map(lambda x: x if isinstance(x, dict) or isinstance(x, list) else {CatConst.PROPERTY_VALUE_TAG: x}, cfg_item.values()) |
| )) |
| return properties |
| |
| @property |
| def version(self): |
| return "%s-%s" % (self._version[CatConst.STACK_VERSION_OLD], self._version[CatConst.STACK_VERSION_TARGET]) |
| |
| @property |
| def target_version(self): |
| return self._version[CatConst.STACK_VERSION_TARGET] |
| |
| @property |
| def source_version(self): |
| return self._version[CatConst.STACK_VERSION_OLD] |
| |
| def get_parsed_version(self): |
| """ |
| Get numeric representation of the version for comparation purposes |
| |
| Example: |
| 1.3-2.1 will be represented as { from: 13, to: 21 } |
| |
| :return: Numeric version |
| """ |
| v_from = self._version[CatConst.STACK_VERSION_OLD].split(".") |
| v_to = self._version[CatConst.STACK_VERSION_TARGET].split(".") |
| try: |
| v_from = int(v_from[0]) * 10 + int(v_from[1]) |
| v_to = int(v_to[0]) * 10 + int(v_to[1]) |
| except ValueError: |
| v_from = 0 |
| v_to = 0 |
| |
| version = { |
| "from": v_from, |
| "to": v_to |
| } |
| |
| return version |
| |
| @property |
| def name(self): |
| if CatConst.STACK_NAME in self._json_catalog: |
| return self._json_catalog[CatConst.STACK_NAME] |
| return "" |
| |
| @property |
| def mapping(self): |
| return self._properties_map_catalog |
| |
| @property |
| def items(self): |
| return self._properties_catalog |
| |
| @property |
| def options(self): |
| if CatConst.CONFIG_TYPES in self._catalog_options: |
| return self._catalog_options[CatConst.CONFIG_TYPES] |
| return {} |
| |
| @property |
| def action_handlers(self): |
| return self._handlers |
| |
| @property |
| def tag_search_pattern(self): |
| return self._search_pattern |
| |
| def __handle_remove_tag(self, name, catalog_item_name, catalog_property_item, properties): |
| """ |
| :type name str |
| :type catalog_item_name str |
| :type catalog_property_item dict |
| :type properties dict |
| """ |
| if CatConst.PROPERTY_REMOVE_TAG in catalog_property_item and \ |
| catalog_property_item[CatConst.PROPERTY_REMOVE_TAG] == CatConst.TRUE_TAG and \ |
| catalog_item_name in properties: |
| del properties[catalog_item_name] |
| |
| def __handle_template_tag_sub(self, catalog_item_name, catalog_property_item): |
| """ |
| :type catalog_item_name str |
| :type catalog_property_item dict |
| """ |
| if CatConst.TEMPLATE_HANDLER in self._handlers and self._handlers is not None and \ |
| CatConst.VALUE_TEMPLATE_TAG in catalog_property_item and catalog_property_item[ |
| CatConst.VALUE_TEMPLATE_TAG] == CatConst.TRUE_TAG: |
| try: |
| parsed_value = self._handlers[CatConst.TEMPLATE_HANDLER]( |
| self, |
| self._search_pattern.findall(catalog_property_item[CatConst.PROPERTY_VALUE_TAG]), |
| catalog_property_item[CatConst.PROPERTY_VALUE_TAG] |
| ) |
| catalog_property_item[CatConst.PROPERTY_VALUE_TAG] = parsed_value |
| except TemplateProcessingException: |
| pass |
| |
| def __handle_add_new(self, name, catalog_item_name, catalog_property_item, properties): |
| """ |
| :type name str |
| :type catalog_item_name str |
| :type catalog_property_item dict |
| :type properties dict |
| """ |
| catalog_property_item = dict(catalog_property_item) |
| can_add_new = not (CatConst.PROPERTY_CAN_CREATE_TAG in catalog_property_item and |
| catalog_property_item[CatConst.PROPERTY_CAN_CREATE_TAG].upper() == "NO") |
| if CatConst.PROPERTY_VALUE_TAG in catalog_property_item and catalog_item_name not in properties and can_add_new: |
| self.__handle_template_tag_sub(catalog_item_name, catalog_property_item) |
| properties[catalog_item_name] = catalog_property_item[CatConst.PROPERTY_VALUE_TAG] |
| |
| def __handle_change_existing(self, name, catalog_item_name, catalog_property_item, properties): |
| """ |
| :type name str |
| :type catalog_item_name str |
| :type catalog_property_item dict |
| :type properties dict |
| """ |
| catalog_property_item = dict(catalog_property_item) |
| can_override = True |
| |
| if CatConst.OVERRIDE_TAG in catalog_property_item and catalog_property_item[CatConst.OVERRIDE_TAG] != CatConst.TRUE_TAG: |
| can_override = False |
| |
| if CatConst.PROPERTY_VALUE_TAG in catalog_property_item and catalog_item_name in properties and can_override: |
| self.__handle_template_tag_sub(catalog_item_name, catalog_property_item) |
| properties[catalog_item_name] = catalog_property_item[CatConst.PROPERTY_VALUE_TAG] |
| return properties |
| |
| def __handle_dependency_tag(self, name, catalog_item_name, catalog_property_item, properties): |
| """ |
| :type name str |
| :type catalog_item_name str |
| :type catalog_property_item dict |
| :type properties dict |
| """ |
| if CatConst.RESOLVE_DEPENDENCY_TAG in catalog_property_item and \ |
| catalog_property_item[CatConst.RESOLVE_DEPENDENCY_TAG] == CatConst.TRUE_TAG: |
| sa_suggestions = Options.stack_advisor.get_suggestion(Options.server_config_factory, |
| [StackAdvisorRequestProperty(name, catalog_item_name)]) |
| for sa_catalog in sa_suggestions: |
| # create new config group if not existed |
| if sa_catalog not in Options.server_config_factory.items(): |
| Options.server_config_factory.create_config(sa_catalog) |
| |
| catalog_properties = Options.server_config_factory.get_config(sa_catalog).properties |
| for sa_property in sa_suggestions[sa_catalog]: |
| if sa_suggestions[sa_catalog][sa_property] is None and sa_property in catalog_properties: |
| print "rem %s:%s" % (sa_catalog, sa_property) |
| del catalog_properties[sa_property] |
| elif sa_suggestions[sa_catalog][sa_property] is not None: |
| catalog_properties[sa_property] = sa_suggestions[sa_catalog][sa_property] |
| |
| |
| def __can_handler_execute(self, catalog_options, catalog_property_item, property_item, properties): |
| """ |
| :type catalog_options dict |
| :type catalog_property_item str |
| :type property_item dict |
| :type properties dict |
| """ |
| can_process = True |
| |
| # process required services tag |
| required_list = None |
| |
| if CatConst.REQUIRED_SERVICES in catalog_options and catalog_options[CatConst.REQUIRED_SERVICES] is not None and \ |
| isinstance(catalog_options[CatConst.REQUIRED_SERVICES], list): |
| required_list = catalog_options[CatConst.REQUIRED_SERVICES] |
| |
| if CatConst.REQUIRED_SERVICES in property_item and property_item[CatConst.REQUIRED_SERVICES] is not None and\ |
| isinstance(property_item[CatConst.REQUIRED_SERVICES], list): |
| required_list = property_item[CatConst.REQUIRED_SERVICES] |
| |
| if required_list is not None: |
| can_process = can_process and is_services_exists(required_list) |
| |
| if CatConst.VALUE_REQUIRED_TAG in property_item and property_item[CatConst.VALUE_REQUIRED_TAG] is not None and\ |
| CatConst.PROPERTY_VALUE_TAG in property_item and catalog_property_item in properties: |
| can_process = properties[catalog_property_item] == property_item[CatConst.VALUE_REQUIRED_TAG] |
| |
| return can_process |
| |
| def process_simple_transformations(self, name, properties): |
| """ |
| :type properties dict |
| :type name str |
| """ |
| tag_handlers = [ |
| self.__handle_add_new, |
| self.__handle_change_existing, |
| self.__handle_dependency_tag, |
| self.__handle_remove_tag |
| ] |
| # catalog has no update entries for this config group |
| if name not in self._properties_catalog: |
| return 0 |
| |
| catalog_item = self._properties_catalog[name] |
| for catalog_property_item in catalog_item.keys(): |
| catalog_options = self.options[name] if name in self.options else {} |
| if self.__can_handler_execute(catalog_options, catalog_property_item, catalog_item[catalog_property_item], properties): |
| for handler in tag_handlers: |
| handler(name, catalog_property_item, catalog_item[catalog_property_item], properties) |
| |
| |
| class PropertyMapping(object): |
| _mapping_list = {} |
| |
| def __init__(self, map_list=None): |
| if map_list is not None: |
| self._mapping_list = self._convert_list(map_list) |
| |
| def _convert_list(self, map_list): |
| return dict(zip( |
| map_list.keys(), |
| map(lambda x: x if isinstance(x, dict) else {CatConst.PROPERTY_MAP_TO: x}, map_list.values()) |
| )) |
| |
| def get(self, old_property_name): |
| """ |
| Get property mapping dict |
| :old_property_name str |
| :return dict |
| """ |
| if old_property_name in self._mapping_list: |
| return self._mapping_list[old_property_name] |
| |
| raise PropertyNotFoundException("Property %s from property mapping section not found" % old_property_name) |
| |
| def list(self): |
| return self._mapping_list.keys() |
| |
| def get_mapped_name(self, old_property_name): |
| if CatConst.PROPERTY_MAP_TO not in self.get(old_property_name): |
| raise MalformedPropertyDefinitionException("%s option is not set for %s property" % |
| (CatConst.PROPERTY_MAP_TO, old_property_name)) |
| return self.get(old_property_name)[CatConst.PROPERTY_MAP_TO] |
| |
| def exists(self, old_property_name): |
| return old_property_name in self._mapping_list |
| |
| |
| class ServerConfigFactory(object): |
| def __init__(self): |
| self.__observers = [] |
| self._server_catalogs = {} |
| self._load_configs() |
| |
| def subscribe(self, name, config_item): |
| self.__observers.append((name, config_item)) |
| |
| def _load_configs(self): |
| Options.logger.info('Getting latest cluster configuration from the server...') |
| new_configs = get_config_resp_all() |
| for config_item in new_configs: |
| if config_item in self._server_catalogs: |
| self.notify_observer(config_item, CatConst.ACTION_RELOAD, new_configs[config_item]) |
| else: |
| self._server_catalogs[config_item] = ServerConfig(self, config_item, new_configs[config_item]) |
| |
| def notify_observers(self, action, arg=None): |
| for name, config_item in self.__observers: |
| if config_item is not None and name in self._server_catalogs: |
| config_item.notify(action, arg) |
| |
| def notify_observer(self, _name, action, arg=None): |
| for name, config_item in self.__observers: |
| if config_item is not None and name == _name and name in self._server_catalogs: |
| config_item.notify(action, arg) |
| |
| def __str__(self): |
| catalogs = {} |
| for cfg in self._server_catalogs: |
| catalogs[cfg] = str(self._server_catalogs[cfg]) |
| |
| return json.dumps(catalogs) |
| |
| def get_json(self): |
| catalogs = {} |
| for cfg in self._server_catalogs: |
| catalogs[cfg] = self._server_catalogs[cfg].properties |
| |
| return catalogs |
| def get_config(self, name): |
| """ |
| Get configuration item object |
| :type name str |
| :rtype: ServerConfig |
| """ |
| if name in self._server_catalogs: |
| return self._server_catalogs[name] |
| |
| raise CatalogNotFoundException("Server catalog item \"%s\" not found" % name) |
| |
| def create_config(self, name): |
| if name not in self._server_catalogs: |
| self._server_catalogs[name] = ServerConfig(self, name, {CatConst.STACK_PROPERTIES: {}}) |
| else: |
| raise CatalogExistException("Config group \"%s\" already existed" % name) |
| |
| def items(self): |
| return self._server_catalogs.keys() |
| |
| def reload(self): |
| self._load_configs() |
| |
| def process_mapping_transformations(self, catalog): |
| """ |
| :type catalog UpgradeCatalog |
| """ |
| for map_item in catalog.mapping.list(): |
| self._process_single_map_transformation(catalog, map_item, catalog.mapping.get(map_item)) |
| |
| def _process_default_template_map_replacement(self, catalog, item): |
| """ |
| :type catalog: UpgradeCatalog |
| :type item: dict |
| """ |
| if CatConst.VALUE_TEMPLATE_TAG in item and CatConst.TEMPLATE_HANDLER in catalog.action_handlers and\ |
| CatConst.PROPERTY_DEFAULT in item and item[CatConst.VALUE_TEMPLATE_TAG] == CatConst.TRUE_TAG: |
| try: |
| parsed_value = catalog.action_handlers[CatConst.TEMPLATE_HANDLER]( |
| catalog, |
| catalog.tag_search_pattern.findall(item[CatConst.PROPERTY_DEFAULT]), |
| item[CatConst.PROPERTY_DEFAULT] |
| ) |
| item[CatConst.PROPERTY_DEFAULT] = parsed_value |
| except TemplateProcessingException: |
| pass |
| |
| def _process_property_value_transformation(self, catalog, property_map_definition, old_value): |
| """ |
| :type catalog: UpgradeCatalog |
| :type property_map_definition: dict |
| :type old_value: str |
| :return: str |
| """ |
| |
| tmp = old_value |
| |
| if CatConst.REPLACE_FROM_TAG in property_map_definition and CatConst.REPLACE_TO_TAG in property_map_definition and\ |
| property_map_definition[CatConst.REPLACE_TO_TAG] is not None and property_map_definition[CatConst.REPLACE_FROM_TAG] is not None: |
| tmp = tmp.replace(property_map_definition[CatConst.REPLACE_FROM_TAG], property_map_definition[CatConst.REPLACE_TO_TAG]) |
| |
| if CatConst.COERCE_TO_PROPERTY_TAG in property_map_definition: |
| if property_map_definition[CatConst.COERCE_TO_PROPERTY_TAG] == CatConst.COERCE_YAML_OPTION_TAG: |
| # for example c6401,c6402 into ['c6401','c6402'] |
| data = list(map(lambda x: "'%s'" % x.strip(), tmp.split(','))) |
| tmp = "[%s]" % ','.join(data) |
| |
| return tmp |
| |
| def _process_single_map_transformation(self, catalog, map_item_name, map_property_item): |
| """ |
| :type catalog UpgradeCatalog |
| :type map_item_name str |
| :type map_property_item dict |
| """ |
| old_property_name = map_item_name |
| |
| # map-from item name could be re-defined via PROPERTY_MAP_FROM property to avoid duplicate entries |
| if CatConst.PROPERTY_MAP_FROM in map_property_item and map_property_item[CatConst.PROPERTY_MAP_FROM] is not None: |
| old_property_name = map_property_item[CatConst.PROPERTY_MAP_FROM] |
| |
| new_property_name = old_property_name |
| |
| if CatConst.PROPERTY_MAP_TO in map_property_item: |
| new_property_name = map_property_item[CatConst.PROPERTY_MAP_TO] |
| |
| # process first required section |
| required_services = map_property_item[CatConst.REQUIRED_SERVICES] if CatConst.REQUIRED_SERVICES in map_property_item else None |
| |
| # process required-services tag |
| if required_services is not None and not is_services_exists(required_services): |
| return 0 |
| |
| # process template tag |
| self._process_default_template_map_replacement(catalog, map_property_item) |
| |
| source_cfg_group = map_property_item[CatConst.PROPERTY_FROM_CATALOG] if CatConst.PROPERTY_FROM_CATALOG in map_property_item and\ |
| map_property_item[CatConst.PROPERTY_FROM_CATALOG] != "" else None |
| target_cfg_group = map_property_item[CatConst.PROPERTY_TO_CATALOG] if CatConst.PROPERTY_TO_CATALOG in map_property_item and \ |
| map_property_item[CatConst.PROPERTY_TO_CATALOG] != ""else None |
| default_value = map_property_item[CatConst.PROPERTY_DEFAULT] if CatConst.PROPERTY_DEFAULT in map_property_item and \ |
| map_property_item[CatConst.PROPERTY_DEFAULT] != "" else None |
| |
| if source_cfg_group is None and target_cfg_group is None: # global scope mapping renaming |
| self.notify_observers(CatConst.ACTION_RENAME_PROPERTY, [old_property_name, new_property_name, |
| self._process_property_value_transformation, |
| catalog, |
| map_property_item |
| ]) |
| elif source_cfg_group is not None and target_cfg_group is not None: # group-to-group moving |
| if source_cfg_group in self._server_catalogs and target_cfg_group in self._server_catalogs: |
| old_cfg_group = self.get_config(source_cfg_group).properties |
| new_cfg_group = self.get_config(target_cfg_group).properties |
| |
| if old_property_name in old_cfg_group: |
| new_cfg_group[new_property_name] = self._process_property_value_transformation(catalog, map_property_item, old_cfg_group[old_property_name]) |
| if new_property_name != old_property_name: |
| del old_cfg_group[old_property_name] |
| elif old_property_name not in old_cfg_group and default_value is not None: |
| new_cfg_group[new_property_name] = default_value |
| |
| def commit(self): |
| self.notify_observers(CatConst.ACTION_COMMIT) |
| |
| |
| class ServerConfig(object): |
| def __init__(self, factory, name, initial_configs): |
| """ |
| Initialize configuration item |
| :factory ServerConfigFactory |
| """ |
| factory.subscribe(name, self) |
| self._configs = initial_configs |
| self._hash = self._calculate_hash() |
| self._name = name |
| |
| def _calculate_hash(self): |
| return hash(str(self._configs)) |
| |
| def notify(self, action, arg=None): |
| if action == CatConst.ACTION_RELOAD: |
| self._configs = arg |
| self._hash = self._calculate_hash() |
| elif action == CatConst.ACTION_COMMIT: |
| self._commit() |
| elif action == CatConst.ACTION_RENAME_PROPERTY and isinstance(arg, list) and len(arg) == 5: |
| self._rename_property(*arg) |
| |
| def _rename_property(self, old_name, new_name, transform_func, catalog, map_item): |
| """ |
| :type old_name: str |
| :type new_name: str |
| :type transform_func: function |
| :type catalog: UpgradeCatalog |
| :type map_item: dict |
| :return: |
| """ |
| if old_name in self.properties: |
| old_property_value = self.properties[old_name] |
| if transform_func is not None: |
| self.properties[new_name] = transform_func(catalog, map_item, old_property_value) |
| else: |
| self.properties[new_name] = old_property_value |
| |
| if old_name != new_name: |
| del self.properties[old_name] |
| |
| def is_attributes_exists(self): |
| return CatConst.STACK_PROPERTIES_ATTRIBUTES in self._configs |
| |
| def __str__(self): |
| return json.dumps(self.properties) |
| |
| @property |
| def properties(self): |
| return self._configs[CatConst.STACK_PROPERTIES] |
| |
| @properties.setter |
| def properties(self, value): |
| self._configs[CatConst.STACK_PROPERTIES] = value |
| |
| @property |
| def attributes(self): |
| return self._configs[CatConst.STACK_PROPERTIES_ATTRIBUTES] |
| |
| @attributes.setter |
| def attributes(self, value): |
| self._configs[CatConst.STACK_PROPERTIES_ATTRIBUTES] = value |
| |
| def _commit(self): |
| if self._hash != self._calculate_hash(): |
| Options.logger.info("Committing changes for \"%s\" configuration group ..." % self._name) |
| if self.is_attributes_exists(): |
| update_config(self.properties, self._name, self.attributes) |
| else: |
| update_config(self.properties, self._name) |
| |
| def clear(self): |
| self.properties = {} |
| self.attributes = {} |
| |
| def merge(self, catalog_item): |
| """ |
| :type catalog_item UpgradeCatalog |
| """ |
| # handle "merged-copy" tag |
| config_options = catalog_item.options[self._name] if self._name in catalog_item.options else {} |
| clear_properties = not (CatConst.MERGED_COPY_TAG in config_options and |
| config_options[CatConst.MERGED_COPY_TAG] == CatConst.TRUE_TAG) |
| if clear_properties: |
| self.clear() |
| Options.logger.info("Processing configuration group: %s", self._name) |
| catalog_item.process_simple_transformations(self._name, self.properties) |
| |
| |
| def write_mapping(hostmapping): |
| if os.path.isfile(Options.MR_MAPPING_FILE): |
| os.remove(Options.MR_MAPPING_FILE) |
| json.dump(hostmapping, open(Options.MR_MAPPING_FILE, 'w')) |
| |
| |
| def read_mapping(): |
| if os.path.isfile(Options.MR_MAPPING_FILE): |
| if Options.MR_MAPPING is not None: |
| return Options.MR_MAPPING |
| else: |
| Options.MR_MAPPING = json.load(open(Options.MR_MAPPING_FILE)) |
| return Options.MR_MAPPING |
| else: |
| raise FatalException(-1, "MAPREDUCE host mapping file, mr_mapping, is not available or badly formatted. Execute " |
| "action save-mr-mapping. Ensure the file is present in the directory where you are " |
| "executing this command.") |
| |
| |
| def get_mr1_mapping(): |
| components = ["MAPREDUCE_CLIENT", "JOBTRACKER", "TASKTRACKER", "HISTORYSERVER"] |
| GET_URL_FORMAT = Options.CLUSTER_URL + '/services/MAPREDUCE/components/%s' |
| hostmapping = {} |
| for component in components: |
| hostlist = [] |
| structured_resp = curl(GET_URL_FORMAT % component, parse=True, validate=True) |
| |
| if 'host_components' in structured_resp: |
| for hostcomponent in structured_resp['host_components']: |
| if 'HostRoles' in hostcomponent: |
| if 'host_name' in hostcomponent['HostRoles']: |
| hostlist.append(hostcomponent['HostRoles']['host_name']) |
| |
| hostmapping[component] = hostlist |
| write_mapping(hostmapping) |
| |
| pprint("File mr_mapping contains the host mapping for mapreduce components. This file is critical for later " |
| "steps.") |
| |
| |
| def get_YN_input(prompt, default): |
| yes = set(['yes', 'ye', 'y']) |
| no = set(['no', 'n']) |
| return get_choice_string_input(prompt, default, yes, no) |
| |
| |
| def get_choice_string_input(prompt, default, firstChoice, secondChoice): |
| choice = raw_input(prompt).lower() |
| if choice in firstChoice: |
| return True |
| elif choice in secondChoice: |
| return False |
| elif choice is "": # Just enter pressed |
| return default |
| else: |
| print "input not recognized, please try again: " |
| return get_choice_string_input(prompt, default, firstChoice, secondChoice) |
| |
| |
| def delete_mr(): |
| saved_mr_mapping = get_YN_input("Have you saved MR host mapping using action save-mr-mapping [y/n] (n)? ", False) |
| if not saved_mr_mapping: |
| raise FatalException(1, "Ensure MAPREDUCE host component mapping is saved before deleting it. Use action " |
| "save-mr-mapping.") |
| |
| SERVICE_URL_FORMAT = Options.CLUSTER_URL + '/services/MAPREDUCE' |
| COMPONENT_URL_FORMAT = Options.CLUSTER_URL + '/hosts/%s/host_components/%s' |
| NON_CLIENTS = ["JOBTRACKER", "TASKTRACKER", "HISTORYSERVER"] |
| PUT_IN_DISABLED = { |
| "HostRoles": { |
| "state": "DISABLED" |
| } |
| } |
| |
| hostmapping = read_mapping() |
| |
| for key, value in hostmapping.items(): |
| if (key in NON_CLIENTS) and (len(value) > 0): |
| for host in value: |
| curl(COMPONENT_URL_FORMAT % (host, key), request_type="PUT", data=PUT_IN_DISABLED, |
| validate=True) |
| |
| curl(SERVICE_URL_FORMAT, request_type="DELETE", validate=True) |
| |
| |
| def get_cluster_stackname(): |
| VERSION_URL_FORMAT = Options.CLUSTER_URL + '?fields=Clusters/version' |
| |
| structured_resp = curl(VERSION_URL_FORMAT, validate=True, parse=True) |
| |
| if 'Clusters' in structured_resp: |
| if 'version' in structured_resp['Clusters']: |
| return structured_resp['Clusters']['version'] |
| |
| raise FatalException(-1, "Unable to get the cluster version") |
| |
| |
| def has_component_in_stack_def(stack_name, service_name, component_name): |
| STACK_COMPONENT_URL_FORMAT = Options.ROOT_URL + '/stacks2/{0}/versions/{1}/stackServices/{2}/serviceComponents/{3}' |
| stack, stack_version = stack_name.split('-') |
| |
| try: |
| curl(STACK_COMPONENT_URL_FORMAT.format(stack, stack_version, service_name, component_name), |
| validate=True) |
| return True |
| except FatalException: |
| return False |
| |
| |
| def add_services(): |
| SERVICE_URL_FORMAT = Options.CLUSTER_URL + '/services/{0}' |
| COMPONENT_URL_FORMAT = SERVICE_URL_FORMAT + '/components/{1}' |
| HOST_COMPONENT_URL_FORMAT = Options.CLUSTER_URL + '/hosts/{0}/host_components/{1}' |
| service_comp = { |
| "YARN": ["NODEMANAGER", "RESOURCEMANAGER", "YARN_CLIENT"], |
| "MAPREDUCE2": ["HISTORYSERVER", "MAPREDUCE2_CLIENT"]} |
| new_old_host_map = { |
| "NODEMANAGER": "TASKTRACKER", |
| "HISTORYSERVER": "HISTORYSERVER", |
| "RESOURCEMANAGER": "JOBTRACKER", |
| "YARN_CLIENT": "MAPREDUCE_CLIENT", |
| "MAPREDUCE2_CLIENT": "MAPREDUCE_CLIENT"} |
| |
| stack_name = get_cluster_stackname() |
| stack_has_ats = has_component_in_stack_def(stack_name, "YARN", "APP_TIMELINE_SERVER") |
| |
| # if upgrading to stack > 2.1 (which has ats) |
| if stack_has_ats: |
| service_comp["YARN"].append("APP_TIMELINE_SERVER") |
| new_old_host_map["APP_TIMELINE_SERVER"] = "JOBTRACKER" |
| |
| hostmapping = read_mapping() |
| |
| for service in service_comp.keys(): |
| curl(SERVICE_URL_FORMAT.format(service), validate=True, request_type="POST") |
| |
| for component in service_comp[service]: |
| curl(COMPONENT_URL_FORMAT.format(service, component), |
| validate=True, request_type="POST") |
| |
| for host in hostmapping[new_old_host_map[component]]: |
| curl(HOST_COMPONENT_URL_FORMAT.format(host, component), |
| validate=True, request_type="POST") |
| |
| |
| def update_config(properties, config_type, attributes=None): |
| tag = "version" + str(int(time.time() * 1000)) |
| properties_payload = {"Clusters": {"desired_config": {"type": config_type, "tag": tag, "properties": properties}}} |
| if attributes is not None: |
| properties_payload["Clusters"]["desired_config"]["properties_attributes"] = attributes |
| |
| expect_body = config_type != "cluster-env" # ToDo: make exceptions more flexible |
| |
| curl(Options.CLUSTER_URL, request_type="PUT", data=properties_payload, validate=True, soft_validation=True) |
| |
| |
| def build_all_options(desired_configs): |
| """ |
| Get all configs in the old-fashion way ( versions below 1.7.0 doesn't support "properties" filter ) |
| """ |
| config_url_tpl = Options.CLUSTER_URL + "/configurations?type={0}&tag={1}" |
| all_options = {CatConst.ITEMS_TAG: []} |
| for config in desired_configs: |
| cfg_item = curl(config_url_tpl.format(config, desired_configs[config]["tag"]), parse=True, validate=True) |
| if CatConst.ITEMS_TAG in cfg_item and len(cfg_item[CatConst.ITEMS_TAG]) == 1: |
| all_options[CatConst.ITEMS_TAG].append(cfg_item[CatConst.ITEMS_TAG][0]) |
| |
| return all_options |
| |
| |
| def get_config_resp_all(): |
| desired_configs = {} |
| config_all_properties_url = Options.CLUSTER_URL + "/configurations?fields=properties" |
| desired_configs_resp = curl(Options.CLUSTER_URL + "?fields=Clusters/desired_configs", validate=True, parse=True) |
| |
| if 'Clusters' in desired_configs_resp: |
| if 'desired_configs' in desired_configs_resp['Clusters']: |
| desired_configs_resp = desired_configs_resp['Clusters']['desired_configs'] |
| else: |
| return None |
| else: |
| return None |
| |
| if Options.isPropertyAttributesSupported(): |
| config_all_properties_url += ",properties_attributes" |
| all_options = curl(config_all_properties_url, validate=True, parse=True) |
| else: |
| all_options = build_all_options(desired_configs_resp) |
| |
| if CatConst.ITEMS_TAG in all_options: |
| all_options = all_options[CatConst.ITEMS_TAG] |
| else: |
| return None |
| |
| all_options = filter( |
| lambda x: x[CatConst.TYPE_TAG] in desired_configs_resp and x["tag"] == desired_configs_resp[x[CatConst.TYPE_TAG]][ |
| "tag"], |
| all_options) |
| |
| for item in all_options: |
| dc_item = {} |
| |
| if CatConst.STACK_PROPERTIES in item: # config item could not contain any property |
| dc_item[CatConst.STACK_PROPERTIES] = item[CatConst.STACK_PROPERTIES] |
| else: |
| dc_item[CatConst.STACK_PROPERTIES] = {} |
| |
| if CatConst.STACK_PROPERTIES_ATTRIBUTES in item: |
| dc_item[CatConst.STACK_PROPERTIES_ATTRIBUTES] = item[CatConst.STACK_PROPERTIES_ATTRIBUTES] |
| |
| if "tag" in item: |
| dc_item["tag"] = item["tag"] |
| |
| if dc_item != {}: |
| desired_configs[item[CatConst.TYPE_TAG]] = dc_item |
| |
| return desired_configs |
| |
| |
| def is_services_exists(required_services): |
| """ |
| return true, if required_services is a part of Options.SERVICES |
| :param required_services: list |
| :return: bool |
| """ |
| # sets are equal |
| if Options.SERVICES == set(required_services): |
| return True |
| |
| return set(map(lambda x: x.upper(), required_services)) < Options.SERVICES |
| |
| |
| def get_cluster_services(): |
| services_url = Options.CLUSTER_URL + '/services' |
| raw_services = curl(services_url, parse=True) |
| |
| # expected structure: |
| # items: [ {"href":"...", "ServiceInfo":{"cluster_name":"..", "service_name":".."}}, ..., ... ] |
| if raw_services is not None and "items" in raw_services and isinstance(raw_services["items"], list): |
| return list(map(lambda item: item["ServiceInfo"]["service_name"], raw_services["items"])) |
| |
| Options.logger.warning("Failed to load services list, functionality that depends on them couldn't work") |
| return [] |
| |
| |
| def get_zookeeper_quorum(): |
| zoo_cfg = curl(Options.COMPONENTS_FORMAT.format(Options.ZOOKEEPER_SERVER), validate=False, parse=True) |
| zoo_quorum = [] |
| zoo_def_port = "2181" |
| if Options.server_config_factory is not None and Options.ZK_OPTIONS in Options.server_config_factory.items(): |
| props = Options.server_config_factory.get_config(Options.ZK_OPTIONS) |
| if Options.ZK_CLIENTPORT in props.properties: |
| zoo_def_port = props.properties[Options.ZK_CLIENTPORT] |
| |
| if "host_components" in zoo_cfg: |
| for item in zoo_cfg["host_components"]: |
| zoo_quorum.append("%s:%s" % (item["HostRoles"]["host_name"], zoo_def_port)) |
| |
| return ",".join(zoo_quorum) |
| |
| |
| def get_tez_history_url_base(): |
| try: |
| tez_view = curl(Options.TEZ_VIEW_URL, validate=False, parse=True) |
| except HTTPError as e: |
| raise TemplateProcessingException(str(e)) |
| |
| version = "" |
| if "versions" in tez_view and \ |
| len(tez_view['versions']) > 0 and \ |
| "ViewVersionInfo" in tez_view['versions'][0] and \ |
| 'version' in tez_view['versions'][0]['ViewVersionInfo']: |
| version = tez_view['versions'][0]['ViewVersionInfo']['version'] |
| url = '{0}://{1}:{2}/#/main/views/TEZ/{3}/TEZ_CLUSTER_INSTANCE'.format(Options.API_PROTOCOL, Options.HOST, Options.API_PORT, version) |
| return url |
| |
| def get_kafka_listeners(): |
| kafka_host = "localhost" |
| kafka_port = "6667" |
| if Options.server_config_factory is not None and Options.KAFKA_BROKER_CONF in Options.server_config_factory.items(): |
| props = Options.server_config_factory.get_config(Options.KAFKA_BROKER_CONF) |
| if Options.KAFKA_PORT in props.properties: |
| kafka_port = props.properties[Options.KAFKA_PORT] |
| |
| # Default kafka listeners string |
| kafka_listeners = "PLAINTEXT://{0}:{1}".format(kafka_host, kafka_port) |
| |
| return kafka_listeners |
| |
| |
| def get_ranger_xaaudit_hdfs_destination_directory(): |
| namenode_hostname="localhost" |
| namenode_cfg = curl(Options.COMPONENTS_FORMAT.format(Options.NAMENODE), validate=False, parse=True) |
| if "host_components" in namenode_cfg: |
| namenode_hostname = namenode_cfg["host_components"][0]["HostRoles"]["host_name"] |
| |
| return "hdfs://{0}:8020/ranger/audit".format(namenode_hostname) |
| |
| def get_ranger_policymgr_external_url(): |
| url = "{{policymgr_mgr_url}}" |
| if Options.server_config_factory is not None and Options.RANGER_ADMIN in Options.server_config_factory.items(): |
| props = Options.server_config_factory.get_config(Options.RANGER_ADMIN) |
| if Options.RANGER_EXTERNAL_URL in props.properties: |
| url = props.properties[Options.RANGER_EXTERNAL_URL] |
| return url |
| |
| def get_jdbc_driver(): |
| driver = "{{jdbc_driver}}" |
| if Options.server_config_factory is not None and Options.RANGER_ADMIN in Options.server_config_factory.items(): |
| props = Options.server_config_factory.get_config(Options.RANGER_ADMIN) |
| if "DB_FLAVOR" in props.properties: |
| db = props.properties["DB_FLAVOR"] |
| |
| if db.lower() == "mysql": |
| driver = "com.mysql.jdbc.Driver" |
| elif db.lower() == "oracle": |
| driver = "oracle.jdbc.OracleDriver" |
| return driver |
| |
| def get_audit_jdbc_url(): |
| audit_jdbc_url = "{{audit_jdbc_url}}" |
| if Options.server_config_factory is not None and Options.RANGER_ADMIN in Options.server_config_factory.items(): |
| props = Options.server_config_factory.get_config(Options.RANGER_ADMIN) |
| if "DB_FLAVOR" in props.properties: |
| xa_audit_db_flavor = props.properties["DB_FLAVOR"] |
| if "db_host" in props.properties: |
| xa_db_host = props.properties["db_host"] |
| if "audit_db_name" in props.properties: |
| xa_audit_db_name = props.properties["audit_db_name"] |
| |
| if xa_audit_db_flavor.lower() == 'mysql': |
| audit_jdbc_url = "jdbc:mysql://{0}/{1}".format(xa_db_host, xa_audit_db_name) |
| elif xa_audit_db_flavor.lower() == 'oracle': |
| audit_jdbc_url = "jdbc:oracle:thin:\@//{0}".format(xa_db_host) |
| return audit_jdbc_url |
| |
| def get_audit_db_passwd(): |
| audit_db_passwd = "crypted" |
| if Options.server_config_factory is not None and Options.RANGER_ADMIN in Options.server_config_factory.items(): |
| props = Options.server_config_factory.get_config(Options.RANGER_ADMIN) |
| if "audit_db_password" in props.properties: |
| audit_db_passwd = props.properties['audit_db_password'] |
| return audit_db_passwd |
| |
| def get_audit_to_db_enabled(config_name): |
| audit_to_db = "true" |
| if Options.server_config_factory is not None and config_name in Options.server_config_factory.items(): |
| props = Options.server_config_factory.get_config(config_name) |
| if "XAAUDIT.DB.IS_ENABLED" in props.properties: |
| audit_to_db = props.properties["XAAUDIT.DB.IS_ENABLED"] |
| return audit_to_db |
| |
| def get_audit_to_hdfs_enabled(config_name): |
| audit_to_hdfs = "false" |
| if Options.server_config_factory is not None and config_name in Options.server_config_factory.items(): |
| props = Options.server_config_factory.get_config(config_name) |
| if "XAAUDIT.HDFS.IS_ENABLED" in props.properties: |
| audit_to_hdfs = props.properties["XAAUDIT.HDFS.IS_ENABLED"] |
| return audit_to_hdfs |
| |
| def get_hdfs_batch_filespool_dir(config_name, component): |
| if component == 'hdfs': |
| path = '/var/log/hadoop/hdfs/audit/hdfs/spool' |
| else: |
| path = '/var/log/{0}/audit/hdfs/spool'.format(component) |
| if Options.server_config_factory is not None and config_name in Options.server_config_factory.items(): |
| props = Options.server_config_factory.get_config(config_name) |
| if "XAAUDIT.HDFS.LOCAL_ARCHIVE_DIRECTORY" in props.properties: |
| path = props.properties["XAAUDIT.HDFS.LOCAL_ARCHIVE_DIRECTORY"] |
| return path |
| |
| |
| def get_usersync_sync_source(): |
| ug_sync_source = 'org.apache.ranger.unixusersync.process.UnixUserGroupBuilder' |
| sync_source = 'unix' |
| if Options.server_config_factory is not None and Options.RANGER_USERSYNC in Options.server_config_factory.items(): |
| props = Options.server_config_factory.get_config(Options.RANGER_USERSYNC) |
| if "SYNC_SOURCE" in props.properties: |
| sync_source = props.properties['SYNC_SOURCE'] |
| |
| if sync_source == 'ldap': |
| ug_sync_source = 'org.apache.ranger.ldapusersync.process.LdapUserGroupBuilder' |
| return ug_sync_source |
| |
| def get_audit_check(audit_type): |
| audit_check_flag = "false" |
| if Options.server_config_factory is not None and Options.RANGER_ENV in Options.server_config_factory.items(): |
| props = Options.server_config_factory.get_config(Options.RANGER_ENV) |
| audit_property = "xasecure.audit.destination.{0}".format(audit_type) |
| if audit_property in props.properties: |
| audit_check_flag = props.properties[audit_property] |
| |
| return audit_check_flag |
| |
| def get_jt_host(catalog): |
| """ |
| :type catalog: UpgradeCatalog |
| :rtype str |
| """ |
| if catalog.get_parsed_version()["from"] == 13: |
| return read_mapping()["JOBTRACKER"][0] |
| |
| return "" |
| |
| |
| def get_jh_host(catalog): |
| """ |
| :type catalog: UpgradeCatalog |
| :rtype str |
| """ |
| if catalog.get_parsed_version()["from"] == 13: |
| return read_mapping()["HISTORYSERVER"][0] |
| |
| return "" |
| |
| def get_ranger_host(): |
| ranger_config = curl(Options.COMPONENTS_FORMAT.format('RANGER_ADMIN'), validate=False, parse=True) |
| ranger_host_list = [] |
| if "host_components" in ranger_config: |
| for item in ranger_config["host_components"]: |
| ranger_host_list.append(item["HostRoles"]["host_name"]) |
| return ranger_host_list[0] |
| |
| def get_ranger_service_details(): |
| server_cfg_factory = Options.server_config_factory |
| server_cfg_catalog = server_cfg_factory.get_config('admin-properties') |
| properties_latest = server_cfg_catalog.properties |
| data = {} |
| |
| if properties_latest['DB_FLAVOR'].lower() == 'mysql': |
| data['RANGER_JDBC_DRIVER'] = 'com.mysql.jdbc.Driver' |
| data['RANGER_JDBC_DIALECT'] = 'org.eclipse.persistence.platform.database.MySQLPlatform' |
| data['RANGER_JDBC_URL'] = 'jdbc:mysql://{0}/{1}'.format(properties_latest['db_host'], properties_latest['db_name']) |
| data['RANGER_AUDIT_JDBC_URL'] = 'jdbc:mysql://{0}/{1}'.format(properties_latest['db_host'], properties_latest['audit_db_name']) |
| data['RANGER_ROOT_JDBC_URL'] = 'jdbc:mysql://{0}'.format(properties_latest['db_host']) |
| elif properties_latest['DB_FLAVOR'].lower() == 'oracle': |
| data['RANGER_JDBC_DRIVER'] = 'oracle.jdbc.OracleDriver' |
| data['RANGER_JDBC_DIALECT'] = 'org.eclipse.persistence.platform.database.OraclePlatform' |
| data['RANGER_JDBC_URL'] = 'jdbc:oracle:thin:@//{0}'.format(properties_latest['db_host']) |
| data['RANGER_AUDIT_JDBC_URL'] = 'jdbc:oracle:thin:@//{0}'.format(properties_latest['db_host']) |
| data['RANGER_ROOT_JDBC_URL'] = 'jdbc:oracle:thin:@//{0}'.format(properties_latest['db_host']) |
| |
| return data |
| |
| def get_hive_security_authorization_setting(): |
| # this pattern should be used only once, changes here mimic UpgradeCatalog210.java -> updateRangerHiveConfigs |
| scf = Options.server_config_factory |
| response = "None" |
| |
| if "hive-env" in scf.items() and "hive_security_authorization" in scf.get_config("hive-env").properties: |
| response = scf.get_config("hive-env").properties["hive_security_authorization"] |
| |
| old_ranger_catalog = "ranger-hive-plugin-properties" |
| old_ranger_setting = "ranger-hive-plugin-enabled" |
| hive_server_catalog = "hiveserver2-site" |
| hive_sec_property = "hive.security.authorization.enabled" |
| |
| if scf is not None and old_ranger_catalog in scf.items(): |
| cfg = scf.get_config(old_ranger_catalog) |
| prop = cfg.properties |
| if old_ranger_setting in prop and cfg.properties[old_ranger_setting].upper() == "YES": |
| response = "Ranger" |
| if hive_server_catalog in scf.items(): |
| hive_props = scf.get_config(hive_server_catalog).properties |
| hive_props[hive_sec_property] = "true" |
| if old_ranger_setting in prop: |
| del prop[old_ranger_setting] |
| |
| # workaround for buggy stack advisor |
| if "HIVE" in Options.SERVICES and response == "None": |
| if hive_server_catalog not in scf.items(): |
| scf.create_config(hive_server_catalog) |
| |
| scf.get_config(hive_server_catalog).properties[hive_sec_property] = "false" |
| |
| return response |
| |
| |
| def get_hbase_coprocessmaster_classes(): |
| scf = Options.server_config_factory |
| prop = "hbase.coprocessor.master.classes" |
| hbase_ranger_enabled = False |
| old_value = "" |
| if "hbase-site" in scf.items(): |
| if prop in scf.get_config("hbase-site").properties: |
| old_value = scf.get_config("hbase-site").properties[prop] |
| if "hbase.security.authorization" in scf.get_config("hbase-site").properties and \ |
| scf.get_config("hbase-site").properties["hbase.security.authorization"].upper() == "TRUE": |
| hbase_ranger_enabled = True |
| |
| if hbase_ranger_enabled and "org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor" not in old_value: |
| if "com.xasecure.authorization.hbase.XaSecureAuthorizationCoprocessor" in old_value: |
| old_value = old_value.replace("com.xasecure.authorization.hbase.XaSecureAuthorizationCoprocessor", |
| "org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor") |
| else: |
| val = [] if old_value.strip() == "" else old_value.split(',') |
| val.append("org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor") |
| old_value = ','.join(val) |
| |
| return old_value |
| |
| |
| def get_rpc_scheduler_factory_class(): |
| if Options.PHOENIX_QUERY_SERVER in Options.ambari_server.components: |
| return "org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory" |
| else: |
| return "" |
| |
| |
| def get_hbase_rpc_controllerfactory_class(): |
| if Options.PHOENIX_QUERY_SERVER in Options.ambari_server.components: |
| return "org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory" |
| else: |
| return "" |
| |
| |
| def get_hbase_regionserver_wal_codec(): |
| prop = "phoenix_sql_enabled" |
| scf = Options.server_config_factory |
| if "hbase-env" in scf.items(): |
| if prop in scf.get_config("hbase-env").properties and scf.get_config("hbase-env").properties[prop].upper() == "TRUE": |
| return "org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec" |
| return "org.apache.hadoop.hbase.regionserver.wal.WALCellCodec" |
| |
| |
| def get_hbase_coprocessor_region_classes(): |
| scf = Options.server_config_factory |
| prop = "hbase.coprocessor.region.classes" |
| hbase_ranger_enabled = False |
| old_value = "" |
| if "hbase-site" in scf.items(): |
| if prop in scf.get_config("hbase-site").properties: |
| old_value = scf.get_config("hbase-site").properties[prop] |
| if "hbase.security.authorization" in scf.get_config("hbase-site").properties and \ |
| scf.get_config("hbase-site").properties["hbase.security.authorization"].upper() == "TRUE": |
| hbase_ranger_enabled = True |
| |
| if hbase_ranger_enabled and "org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor" not in old_value: |
| if "com.xasecure.authorization.hbase.XaSecureAuthorizationCoprocessor" in old_value: |
| old_value = old_value.replace("com.xasecure.authorization.hbase.XaSecureAuthorizationCoprocessor", |
| "org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor") |
| else: |
| val = [] if old_value.strip() == "" else old_value.split(',') |
| val.append("org.apache.ranger.authorization.hbase.RangerAuthorizationCoprocessor") |
| old_value = ','.join(val) |
| |
| return old_value |
| |
| |
| def _substitute_handler(upgrade_catalog, tokens, value): |
| """ |
| Substitute handler |
| :param upgrade_catalog: UpgradeCatalog |
| :param tokens: list |
| :param value: str |
| :rtype str |
| """ |
| for token in tokens: |
| if token == "{JOBHISTORY_HOST}": |
| value = value.replace(token, get_jh_host(upgrade_catalog)) |
| elif token == "{RESOURCEMANAGER_HOST}": |
| value = value.replace(token, get_jt_host(upgrade_catalog)) |
| elif token == "{HBASE_REGIONSERVER_WAL_CODEC}": |
| value = value.replace(token, get_hbase_regionserver_wal_codec()) |
| elif token == "{HBASE_REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS}": |
| value = value.replace(token, get_rpc_scheduler_factory_class()) |
| elif token == "{HBASE_RPC_CONTROLLERFACTORY_CLASS}": |
| value = value.replace(token, get_hbase_rpc_controllerfactory_class()) |
| elif token == "{ZOOKEEPER_QUORUM}": |
| value = value.replace(token, get_zookeeper_quorum()) |
| elif token == "{HBASE_COPROCESS_MASTER_CLASSES}": |
| value = value.replace(token, get_hbase_coprocessmaster_classes()) |
| elif token == "{HBASE_COPROCESSOR_REGION_CLASSES}": |
| value = value.replace(token, get_hbase_coprocessor_region_classes()) |
| elif token == "{HIVE_SECURITY_AUTHORIZATION}": |
| value = value.replace(token, get_hive_security_authorization_setting()) |
| elif token == "{TEZ_HISTORY_URL_BASE}": |
| value = value.replace(token, get_tez_history_url_base()) |
| elif token == "{RANGER_JDBC_DRIVER}": |
| value = value.replace(token, get_ranger_service_details()['RANGER_JDBC_DRIVER']) |
| elif token == "{RANGER_JDBC_URL}": |
| value = value.replace(token, get_ranger_service_details()['RANGER_JDBC_URL']) |
| elif token == "{RANGER_AUDIT_JDBC_URL}": |
| value = value.replace(token, get_ranger_service_details()['RANGER_AUDIT_JDBC_URL']) |
| elif token == "{RANGER_HOST}": |
| value = value.replace(token, get_ranger_host()) |
| elif token == "{RANGER_JDBC_DIALECT}": |
| value = value.replace(token, get_ranger_service_details()['RANGER_JDBC_DIALECT']) |
| elif token == "{KAFKA_LISTENERS}": |
| value = value.replace(token, get_kafka_listeners()) |
| elif token == "{RANGER_PLUGIN_HBASE_POLICY_CACHE_DIR}": |
| value = value.replace(token, "/etc/ranger/{0}{1}/policycache".format(Options.CLUSTER_NAME, "_hbase")) |
| elif token == "{RANGER_PLUGIN_HDFS_POLICY_CACHE_DIR}": |
| value = value.replace(token, "/etc/ranger/{0}{1}/policycache".format(Options.CLUSTER_NAME, "_hadoop")) |
| elif token == "{RANGER_PLUGIN_HIVE_POLICY_CACHE_DIR}": |
| value = value.replace(token, "/etc/ranger/{0}{1}/policycache".format(Options.CLUSTER_NAME, "_hive")) |
| elif token == "{RANGER_PLUGIN_KNOX_POLICY_CACHE_DIR}": |
| value = value.replace(token, "/etc/ranger/{0}{1}/policycache".format(Options.CLUSTER_NAME, "_knox")) |
| elif token == "{RANGER_PLUGIN_STORM_POLICY_CACHE_DIR}": |
| value = value.replace(token, "/etc/ranger/{0}{1}/policycache".format(Options.CLUSTER_NAME, "_storm")) |
| elif token == "{RANGER_HBASE_KEYSTORE_CREDENTIAL_FILE}": |
| value = value.replace(token, "jceks://file/etc/ranger/{0}{1}/cred.jceks".format(Options.CLUSTER_NAME, "_hbase")) |
| elif token == "{RANGER_HDFS_KEYSTORE_CREDENTIAL_FILE}": |
| value = value.replace(token, "jceks://file/etc/ranger/{0}{1}/cred.jceks".format(Options.CLUSTER_NAME, "_hadoop")) |
| elif token == "{RANGER_HIVE_KEYSTORE_CREDENTIAL_FILE}": |
| value = value.replace(token, "jceks://file/etc/ranger/{0}{1}/cred.jceks".format(Options.CLUSTER_NAME, "_hive")) |
| elif token == "{RANGER_KNOX_KEYSTORE_CREDENTIAL_FILE}": |
| value = value.replace(token, "jceks://file/etc/ranger/{0}{1}/cred.jceks".format(Options.CLUSTER_NAME, "_knox")) |
| elif token == "{RANGER_STORM_KEYSTORE_CREDENTIAL_FILE}": |
| value = value.replace(token, "jceks://file/etc/ranger/{0}{1}/cred.jceks".format(Options.CLUSTER_NAME, "_storm")) |
| elif token == "{XAAUDIT_HDFS_DESTINATION_DIRECTORY}": |
| value = value.replace(token, get_ranger_xaaudit_hdfs_destination_directory()) |
| elif token == "{HBASE_RANGER_REPO_NAME}": |
| value = value.replace(token, Options.CLUSTER_NAME+"_hbase") |
| elif token == "{HDFS_RANGER_REPO_NAME}": |
| value = value.replace(token, Options.CLUSTER_NAME+"_hadoop") |
| elif token == "{HIVE_RANGER_REPO_NAME}": |
| value = value.replace(token, Options.CLUSTER_NAME+"_hive") |
| elif token == "{KNOX_RANGER_REPO_NAME}": |
| value = value.replace(token, Options.CLUSTER_NAME+"_knox") |
| elif token == "{STORM_RANGER_REPO_NAME}": |
| value = value.replace(token, Options.CLUSTER_NAME+"_storm") |
| elif token == "{POLICYMGR_MGR_URL}": |
| value = value.replace(token, get_ranger_policymgr_external_url()) |
| elif token == "{HDFS_JDBC_DRIVER}": |
| value = value.replace(token, get_jdbc_driver()) |
| elif token == "{HBASE_JDBC_DRIVER}": |
| value = value.replace(token, get_jdbc_driver()) |
| elif token == "{HIVE_JDBC_DRIVER}": |
| value = value.replace(token, get_jdbc_driver()) |
| elif token == "{KNOX_JDBC_DRIVER}": |
| value = value.replace(token, get_jdbc_driver()) |
| elif token == "{STORM_JDBC_DRIVER}": |
| value = value.replace(token, get_jdbc_driver()) |
| elif token == "{HDFS_AUDIT_JDBC_URL}": |
| value = value.replace(token, get_audit_jdbc_url()) |
| elif token == "{HBASE_AUDIT_JDBC_URL}": |
| value = value.replace(token, get_audit_jdbc_url()) |
| elif token == "{HIVE_AUDIT_JDBC_URL}": |
| value = value.replace(token, get_audit_jdbc_url()) |
| elif token == "{KNOX_AUDIT_JDBC_URL}": |
| value = value.replace(token, get_audit_jdbc_url()) |
| elif token == "{STORM_AUDIT_JDBC_URL}": |
| value = value.replace(token, get_audit_jdbc_url()) |
| elif token == "{AUDIT_TO_DB_HDFS}": |
| value = value.replace(token, get_audit_to_db_enabled("ranger-hdfs-plugin-properties")) |
| elif token == "{AUDIT_TO_DB_HBASE}": |
| value = value.replace(token, get_audit_to_db_enabled("ranger-hbase-plugin-properties")) |
| elif token == "{AUDIT_TO_DB_HIVE}": |
| value = value.replace(token, get_audit_to_db_enabled("ranger-hive-plugin-properties")) |
| elif token == "{AUDIT_TO_DB_KNOX}": |
| value = value.replace(token, get_audit_to_db_enabled("ranger-knox-plugin-properties")) |
| elif token == "{AUDIT_TO_DB_STORM}": |
| value = value.replace(token, get_audit_to_db_enabled("ranger-storm-plugin-properties")) |
| elif token == "{AUDIT_TO_HDFS_HDFS}": |
| value = value.replace(token, get_audit_to_hdfs_enabled("ranger-hdfs-plugin-properties")) |
| elif token == "{AUDIT_TO_HDFS_HIVE}": |
| value = value.replace(token, get_audit_to_hdfs_enabled("ranger-hive-plugin-properties")) |
| elif token == "{AUDIT_TO_HDFS_HBASE}": |
| value = value.replace(token, get_audit_to_hdfs_enabled("ranger-hbase-plugin-properties")) |
| elif token == "{AUDIT_TO_HDFS_KNOX}": |
| value = value.replace(token, get_audit_to_hdfs_enabled("ranger-knox-plugin-properties")) |
| elif token == "{AUDIT_TO_HDFS_STORM}": |
| value = value.replace(token, get_audit_to_hdfs_enabled("ranger-storm-plugin-properties")) |
| elif token == "{AUDIT_HDFS_FILESPOOL_DIR_HDFS}": |
| value = value.replace(token, get_hdfs_batch_filespool_dir("ranger-hdfs-plugin-properties", "hdfs")) |
| elif token == "{AUDIT_HDFS_FILESPOOL_DIR_HIVE}": |
| value = value.replace(token, get_hdfs_batch_filespool_dir("ranger-hive-plugin-properties", "hive")) |
| elif token == "{AUDIT_HDFS_FILESPOOL_DIR_HBASE}": |
| value = value.replace(token, get_hdfs_batch_filespool_dir("ranger-hbase-plugin-properties", "hbase")) |
| elif token == "{AUDIT_HDFS_FILESPOOL_DIR_KNOX}": |
| value = value.replace(token, get_hdfs_batch_filespool_dir("ranger-knox-plugin-properties", "knox")) |
| elif token == "{AUDIT_HDFS_FILESPOOL_DIR_STORM}": |
| value = value.replace(token, get_hdfs_batch_filespool_dir("ranger-storm-plugin-properties", "storm")) |
| elif token == "{USERSYNC_SYNC_SOURCE}": |
| value = value.replace(token, get_usersync_sync_source()) |
| elif token == "{AUDIT_TO_DB}": |
| value = value.replace(token, get_audit_check("db")) |
| elif token == "{AUDIT_TO_HDFS}": |
| value = value.replace(token, get_audit_check("hdfs")) |
| elif token == "{RANGER_ROOT_JDBC_URL}": |
| value = value.replace(token, get_ranger_service_details()['RANGER_ROOT_JDBC_URL']) |
| |
| return value |
| |
| |
| def modify_config_item(config_type, catalog, server_config_factory): |
| """ |
| Modify configuration item |
| :type config_type str |
| :type catalog UpgradeCatalog |
| :type server_config_factory ServerConfigFactory |
| """ |
| |
| # if config group is absent on the server, we will create it |
| if config_type not in server_config_factory.items(): |
| server_config_factory.create_config(config_type) |
| |
| server_config_catalog = server_config_factory.get_config(config_type) |
| |
| server_config_catalog.merge(catalog) |
| |
| |
| def modify_configs(): |
| if len(Options.ARGS) > 1: |
| config_type = Options.ARGS[1] |
| else: |
| config_type = None |
| |
| catalog_farm = UpgradeCatalogFactory(Options.OPTIONS.upgrade_json) # Load upgrade catalog |
| catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack, |
| Options.OPTIONS.to_stack) # get desired version of catalog |
| Options.stack_advisor = StackAdvisorFactory().get_instance(catalog.name, catalog.target_version) |
| |
| # load all desired configs from the server |
| # ToDo: implement singleton for that class |
| Options.server_config_factory = ServerConfigFactory() |
| |
| if catalog is None: |
| raise FatalException(1, "Upgrade catalog for version %s-%s not found, no configs was modified" |
| % (Options.OPTIONS.from_stack, Options.OPTIONS.to_stack)) |
| |
| # add user-defined template processing function |
| catalog.add_handler(CatConst.TEMPLATE_HANDLER, _substitute_handler) |
| |
| if config_type is not None and config_type not in catalog.items: |
| raise FatalException("Config type %s not exists, no configs was modified" % config_type) |
| |
| if config_type is not None: |
| modify_config_item(config_type, catalog, Options.server_config_factory) |
| else: |
| for collection_name in catalog.items: |
| modify_config_item(collection_name, catalog, Options.server_config_factory) |
| |
| Options.server_config_factory.process_mapping_transformations(catalog) |
| |
| # commit changes to server, if any will be found |
| Options.server_config_factory.commit() |
| |
| |
| def backup_configs(conf_type=None): |
| dir = "backups_%d" % time.time() |
| file_pattern = "%s%s%s_%s.json" |
| configs = get_config_resp_all() |
| if configs is None: |
| Options.logger.error("Unexpected response from the server") |
| return -1 |
| |
| if conf_type is not None and conf_type in configs: |
| configs = {conf_type: configs[conf_type]} |
| |
| if not os.path.exists(dir): |
| os.mkdir(dir) |
| |
| for item in configs: |
| filename = file_pattern % (dir, os.path.sep, item, configs[item]["tag"]) |
| if os.path.exists(filename): |
| os.remove(filename) |
| |
| try: |
| with open(filename, "w") as f: |
| f.write(json.dumps(configs[item][CatConst.STACK_PROPERTIES], indent=4)) |
| Options.logger.info("Catalog \"%s\" stored to %s", item, filename) |
| except IOError as e: |
| Options.logger.error("Unable to store \"%s\": %s", item, e) |
| |
| |
| def install_services(): |
| SERVICE_URL_FORMAT = Options.CLUSTER_URL + '/services/{0}' |
| SERVICES = ["MAPREDUCE2", "YARN"] |
| PUT_IN_INSTALLED = [ |
| { |
| "RequestInfo": { |
| "context": "Install MapReduce2" |
| }, |
| "Body": { |
| "ServiceInfo": { |
| "state": "INSTALLED" |
| } |
| } |
| }, |
| { |
| "RequestInfo": { |
| "context": "Install YARN" |
| }, |
| "Body": { |
| "ServiceInfo": { |
| "state": "INSTALLED" |
| } |
| } |
| } |
| ] |
| |
| err_retcode = 0 |
| err_message = "" |
| for index in [0, 1]: |
| try: |
| curl(SERVICE_URL_FORMAT.format(SERVICES[index]), validate=True, request_type="PUT", data=PUT_IN_INSTALLED[index]) |
| except FatalException as e: |
| if not e.code == 0: |
| err_retcode = e.code |
| err_message = err_message + " Error while installing " + SERVICES[index] + ". Details: " + e.message + "." |
| |
| if err_retcode != 0: |
| raise FatalException(err_retcode, |
| err_message + "(Services may already be installed or agents are not yet started.)") |
| |
| Options.OPTIONS.exit_message = "Requests has been submitted to install YARN and MAPREDUCE2. Use Ambari Web to monitor " \ |
| "the status of the install requests." |
| |
| |
| def generate_auth_header(user, password): |
| token = "%s:%s" % (user, password) |
| token = base64.encodestring(token) |
| return {"Authorization": "Basic %s" % token.replace('\n', '')} |
| |
| |
| def curl(url, tokens=None, headers=None, request_type="GET", data=None, parse=False, |
| validate=False, soft_validation=False): |
| """ |
| :rtype type |
| """ |
| _headers = {} |
| handler_chain = [] |
| post_req = ["POST", "PUT"] |
| get_req = ["GET", "DELETE"] |
| |
| print_url = Options.CURL_PRINT_ONLY is not None |
| write_only_print = Options.CURL_WRITE_ONLY is not None |
| |
| if request_type not in post_req + get_req: |
| raise IOError("Wrong request type \"%s\" passed" % request_type) |
| |
| if data is not None and isinstance(data, dict): |
| data = json.dumps(data) |
| |
| if tokens is not None: |
| _headers.update(generate_auth_header(tokens["user"], tokens["pass"])) |
| elif Options.API_TOKENS is not None: |
| _headers.update(generate_auth_header(Options.API_TOKENS["user"], Options.API_TOKENS["pass"])) |
| |
| if request_type in post_req and data is not None: |
| _headers["Content-Length"] = len(data) |
| |
| if headers is not None: |
| _headers.update(headers) |
| |
| if Options.HEADERS is not None: |
| _headers.update(Options.HEADERS) |
| |
| director = build_opener(*handler_chain) |
| if request_type in post_req: |
| _data = bytes(data) |
| req = Request(url, headers=_headers, data=_data) |
| else: |
| req = Request(url, headers=_headers) |
| |
| req.get_method = lambda: request_type |
| |
| if print_url: |
| if write_only_print: |
| if request_type in post_req: |
| Options.logger.info(url) |
| if data is not None: |
| Options.logger.info("POST Data: \n" + str(data)) |
| else: |
| Options.logger.info(url) |
| if request_type in post_req and data is not None: |
| Options.logger.info("POST Data: \n" + str(data)) |
| |
| code = 200 |
| if not (print_url and request_type in post_req): |
| try: |
| resp = director.open(req) |
| out = resp.read() |
| if isinstance(out, bytes): |
| out = out.decode("utf-8") |
| code = resp.code |
| except URLError as e: |
| Options.logger.error(str(e)) |
| if isinstance(e, HTTPError): |
| raise e |
| else: |
| raise FatalException(-1, str(e)) |
| else: |
| if not print_url: |
| Options.logger.info(url) |
| out = "{}" |
| |
| if validate and not print_url and (code > 299 or code < 200): |
| if soft_validation: |
| Options.logger.warning("Response validation failed, please check previous action result manually.") |
| else: |
| raise FatalException(code, "Response validation failed, please check previous action result manually.") |
| |
| if parse: |
| return json.loads(out) |
| else: |
| return out |
| |
| |
| def configuration_item_diff(collection_name, catalog, actual_properties_list): |
| """ |
| Merge catalog item with actual config item on the server |
| Diff item response: |
| { |
| "property" : name, |
| "catalog_item": value, |
| "catalog_value": value, |
| "actual_value": value |
| } |
| :param collection_name: |
| :param catalog: |
| :param actual_properties_list |
| :return: |
| """ |
| |
| verified_catalog = [] |
| catalog_properties = dict(catalog) |
| actual_properties = dict(actual_properties_list) |
| |
| if actual_properties is None: |
| verified_catalog = map(lambda x: { |
| "property": x, |
| "catalog_item": catalog_properties[x], |
| "catalog_value": catalog_properties[x][CatConst.PROPERTY_VALUE_TAG], |
| "actual_value": None |
| }, catalog_properties.keys()) |
| else: |
| # build list of properties according to actual properties |
| verified_catalog = map(lambda x: { |
| "property": x, |
| "catalog_item": catalog_properties[x] if x in catalog_properties else None, |
| "catalog_value": catalog_properties[x][CatConst.PROPERTY_VALUE_TAG] if x in catalog_properties else None, |
| "actual_value": actual_properties[x] |
| }, actual_properties.keys()) |
| |
| # build list of properties according to catalog properties |
| verified_catalog_catalog = map(lambda x: { |
| "property": x, |
| "catalog_item": catalog_properties[x], |
| "catalog_value": catalog_properties[x][CatConst.PROPERTY_VALUE_TAG] if CatConst.PROPERTY_VALUE_TAG in |
| catalog_properties[x] else None, |
| "actual_value": actual_properties[x] if x in actual_properties else None, |
| }, catalog_properties.keys()) |
| |
| # append properties, which are listened in catalog but doesn't present in the actual configuration |
| verified_catalog += filter(lambda x: x["property"] not in actual_properties, verified_catalog_catalog) |
| |
| return verified_catalog |
| |
| |
| def configuration_diff_analyze(diff_list): |
| report = {} |
| for item_key in diff_list.keys(): |
| property_diff_list = diff_list[item_key] |
| item_stat = { |
| "skipped": {"count": 0, "items": []}, |
| "ok": {"count": 0, "items": []}, |
| "fail": {"count": 0, "items": []}, |
| "total": {"count": len(property_diff_list), "items": []} |
| } |
| |
| def push_status(status, _property_item): |
| item_stat[status]["count"] += 1 |
| item_stat[status]["items"].append(_property_item) |
| |
| for property_item in property_diff_list: |
| # process properties which can be absent |
| |
| # item was removed, from actual configs according to catalog instructions |
| if property_item["actual_value"] is None \ |
| and CatConst.PROPERTY_REMOVE_TAG in property_item["catalog_item"] \ |
| and property_item["catalog_item"][CatConst.PROPERTY_REMOVE_TAG] == CatConst.TRUE_TAG: |
| |
| push_status("ok", property_item) |
| |
| # currently skip values with template tag, as there no filter implemented |
| # ToDo: implement possibility to filter values without filter handler, |
| # ToDo: currently filtering is possible only on update-configs stage |
| elif property_item["actual_value"] is not None and property_item["catalog_value"] is not None \ |
| and CatConst.VALUE_TEMPLATE_TAG in property_item["catalog_item"] \ |
| and property_item["catalog_item"][CatConst.VALUE_TEMPLATE_TAG] == CatConst.TRUE_TAG: |
| |
| push_status("skipped", property_item) |
| |
| # item not present in actual config, but present in catalog and no remove tag is present |
| elif property_item["actual_value"] is None and property_item["catalog_value"] is not None: |
| push_status("fail", property_item) |
| |
| # property exists in actual configuration, but not described in catalog configuration |
| elif property_item["actual_value"] is not None and property_item["catalog_value"] is None: |
| push_status("skipped", property_item) |
| |
| # actual and catalog properties are equal |
| elif property_item["catalog_value"] == property_item["actual_value"]: |
| push_status("ok", property_item) |
| elif property_item["catalog_value"] != property_item["actual_value"]: |
| push_status("fail", property_item) |
| |
| report[item_key] = item_stat |
| return report |
| |
| |
| def verify_configuration(): |
| diff_list = {} |
| |
| if len(Options.ARGS) > 1: |
| config_type = Options.ARGS[1] |
| else: |
| config_type = None |
| |
| catalog_farm = UpgradeCatalogFactory(Options.OPTIONS.upgrade_json) # Load upgrade catalog |
| catalog = catalog_farm.get_catalog(Options.OPTIONS.from_stack, |
| Options.OPTIONS.to_stack) # get desired version of catalog |
| server_configs = ServerConfigFactory() |
| |
| if catalog is None: |
| raise FatalException(1, "Upgrade catalog for version %s-%s not found" |
| % (Options.OPTIONS.from_stack, Options.OPTIONS.to_stack)) |
| |
| if config_type is not None and config_type not in catalog.items.keys() and config_type not in server_configs.items(): |
| raise FatalException("Config type %s not exists" % config_type) |
| |
| # fetch from server all option at one time and filter only desired versions |
| |
| if config_type is not None: |
| diff_list[config_type] = configuration_item_diff(config_type, catalog.items[config_type], server_configs.get_config(config_type).properties) |
| else: |
| for collection_name in catalog.items.keys(): |
| diff_list[collection_name] = configuration_item_diff(collection_name, catalog.items[collection_name], server_configs.get_config(collection_name).properties) |
| |
| analyzed_list = configuration_diff_analyze(diff_list) |
| |
| report_file = None |
| if Options.REPORT_FILE is not None: |
| try: |
| report_file = open(Options.REPORT_FILE, "w") |
| except IOError as e: |
| Options.logger.error("Report file open error: %s" % e.message) |
| |
| for config_item in analyzed_list: |
| if analyzed_list[config_item]["fail"]["count"] != 0: |
| Options.logger.info( |
| "%s: %s missing configuration(s) - please look in the output file for the missing params" % ( |
| config_item, analyzed_list[config_item]["fail"]["count"] |
| ) |
| ) |
| if report_file is not None: |
| report_formatter(report_file, config_item, analyzed_list[config_item]) |
| else: |
| Options.logger.info("%s: verified" % config_item) |
| |
| if report_file is not None: |
| try: |
| report_file.close() |
| except IOError as e: |
| Options.logger.error("Report file close error: %s" % e.message) |
| |
| |
| def report_formatter(report_file, config_item, analyzed_list_item): |
| prefix = "Configuration item %s" % config_item |
| if analyzed_list_item["fail"]["count"] > 0: |
| for item in analyzed_list_item["fail"]["items"]: |
| report_file.write("%s: property \"%s\" is set to \"%s\", but should be set to \"%s\"\n" % ( |
| prefix, item["property"], item["actual_value"], item["catalog_value"] |
| )) |
| |
| |
| def main(): |
| action_list = { # list of supported actions |
| Options.GET_MR_MAPPING_ACTION: get_mr1_mapping, |
| Options.DELETE_MR_ACTION: delete_mr, |
| Options.ADD_YARN_MR2_ACTION: add_services, |
| Options.MODIFY_CONFIG_ACTION: modify_configs, |
| Options.INSTALL_YARN_MR2_ACTION: install_services, |
| Options.BACKUP_CONFIG_ACTION: backup_configs, |
| Options.VERIFY_ACTION: verify_configuration |
| } |
| |
| parser = optparse.OptionParser(usage="usage: %prog [options] action\n Valid actions: " |
| + ", ".join(action_list.keys()) |
| + "\n update-configs accepts type, e.g. hdfs-site to update specific configs") |
| |
| parser.add_option("-n", "--printonly", |
| action="store_true", dest="printonly", default=False, |
| help="Prints all the curl commands to be executed (no post/update request will be performed)") |
| parser.add_option("-w", "--writeonly", |
| action="store_true", dest="writeonly", default=False, |
| help="in the combination with --printonly param will print only post/update requests") |
| parser.add_option("-o", "--log", dest="logfile", default=None, |
| help="Log file") |
| parser.add_option("--report", dest="report", default=None, |
| help="Report file output location") |
| |
| parser.add_option('--upgradeCatalog', default=None, help="Upgrade Catalog file full path", dest="upgrade_json") |
| parser.add_option('--fromStack', default=None, help="stack version to upgrade from", dest="from_stack") |
| parser.add_option('--toStack', default=None, help="stack version to upgrade to", dest="to_stack") |
| |
| parser.add_option('--hostname', default=None, help="Hostname for Ambari server", dest="hostname") |
| parser.add_option('--port', default='8080', help="Port number for Ambari server", dest="port") |
| parser.add_option('--https', default=False, action="store_true", dest="https", help="Use https protocol for connection to the server") |
| parser.add_option('--user', default=None, help="Ambari admin user", dest="user") |
| parser.add_option('--password', default=None, help="Ambari admin password", dest="password") |
| parser.add_option('--clustername', default=None, help="Cluster name", dest="clustername") |
| |
| (options, args) = parser.parse_args() |
| Options.initialize_logger(options.logfile) |
| options.warnings = [] |
| |
| if len(args) == 0: |
| parser.error("No action entered") |
| |
| if options.user is None: |
| options.warnings.append("User name must be provided (e.g. admin)") |
| if options.hostname is None: |
| options.warnings.append("Ambari server host name must be provided") |
| if options.clustername is None: |
| options.warnings.append("Cluster name must be provided") |
| if options.password is None: |
| options.password = getpass.getpass("Please enter Ambari admin password: ") |
| if options.password == "": |
| options.warnings.append("Ambari admin user's password name must be provided (e.g. admin)") |
| |
| if options.https: |
| Options.API_PROTOCOL = "https" |
| |
| if options.port: |
| Options.API_PORT = str(options.port) |
| |
| action = args[0] |
| |
| # check params according to executed action |
| if action == Options.MODIFY_CONFIG_ACTION or action == Options.VERIFY_ACTION: |
| if options.upgrade_json is None: |
| options.warnings.append("Upgrade catalog option need to be set") |
| if options.from_stack is None: |
| options.warnings.append("Should be provided fromStack option") |
| if options.to_stack is None: |
| options.warnings.append("Should be provided toStack option") |
| |
| if action == Options.VERIFY_ACTION: |
| if options.report is None: |
| options.warnings.append("Should be provided report option") |
| |
| if len(options.warnings) != 0: |
| print parser.print_help() |
| for warning in options.warnings: |
| Options.logger.warn(warning) |
| raise FatalException(1, "Not all required options was set") |
| |
| options.exit_message = "Upgrade action '%s' completed successfully." % action |
| if options.printonly: |
| Options.CURL_PRINT_ONLY = "yes" |
| options.exit_message = "Simulated execution of action '%s'. Verify the list edit calls." % action |
| if options.writeonly: |
| Options.CURL_WRITE_ONLY = "yes" |
| |
| Options.ARGS = args |
| Options.OPTIONS = options |
| Options.HOST = options.hostname |
| Options.CLUSTER_NAME = options.clustername |
| Options.API_TOKENS = { |
| "user": options.user, |
| "pass": options.password |
| } |
| Options.REPORT_FILE = options.report |
| |
| if action in action_list: |
| Options.initialize() |
| action_list[action]() |
| else: |
| parser.error("Invalid action") |
| |
| if options.exit_message is not None: |
| Options.logger.info(options.exit_message) |
| |
| if __name__ == "__main__": |
| try: |
| main() |
| except (KeyboardInterrupt, EOFError): |
| print("\nAborting ... Keyboard Interrupt.") |
| sys.exit(1) |
| except HTTPError as e: |
| print("\nResponse error, " + str(e)) |
| sys.exit(1) |
| except FatalException as e: |
| if e.reason is not None: |
| error = "Exiting with exit code {0}. Reason: {1}".format(e.code, e.reason) |
| if Options.logger is not None: |
| Options.logger.error(error) |
| sys.exit(e.code) |