blob: 291c50ff613f422b7d3ef961fcae91d7286c9436 [file]
#!/usr/bin/env python3
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import tempfile
__all__ = ["Script"]
import re
import os
import sys
import ssl
import logging
import distro as platform
import inspect
import tarfile
import traceback
import time
from optparse import OptionParser
import resource_management
from ambari_commons import OSCheck, OSConst
from ambari_commons.constants import UPGRADE_TYPE_NON_ROLLING
from ambari_commons.constants import UPGRADE_TYPE_ROLLING
from ambari_commons.constants import UPGRADE_TYPE_HOST_ORDERED
from ambari_commons.network import reconfigure_urllib2_opener
from ambari_commons.inet_utils import resolve_address, ensure_ssl_using_protocol
from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl
from resource_management.libraries.resources import XmlConfig
from resource_management.libraries.resources import PropertiesFile
from resource_management.core import sudo
from resource_management.core.resources import File, Directory
from resource_management.core.source import InlineTemplate
from resource_management.core.environment import Environment
from resource_management.core.logger import Logger
from resource_management.core.exceptions import (
Fail,
ClientComponentHasNoStatus,
ComponentIsNotRunning,
)
from resource_management.core.resources.packaging import Package
from resource_management.libraries.functions import version_select_util
from resource_management.libraries.functions.version import compare_versions
from resource_management.libraries.functions.version import format_stack_version
from resource_management.libraries.functions import stack_tools
from resource_management.libraries.functions.constants import Direction
from resource_management.libraries.script.config_dictionary import (
ConfigDictionary,
UnknownConfiguration,
)
from resource_management.libraries.functions.repository_util import (
CommandRepository,
RepositoryUtil,
)
from resource_management.core.resources.system import Execute
from contextlib import closing
from resource_management.libraries.functions.stack_features import check_stack_feature
from resource_management.libraries.functions.constants import StackFeature
from resource_management.libraries.functions.show_logs import show_logs
from resource_management.libraries.execution_command.execution_command import (
ExecutionCommand,
)
from resource_management.libraries.functions.fcntl_based_process_lock import (
FcntlBasedProcessLock,
)
import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
if OSCheck.is_windows_family():
from resource_management.libraries.functions.install_windows_msi import (
install_windows_msi,
)
from resource_management.libraries.functions.reload_windows_env import (
reload_windows_env,
)
from resource_management.libraries.functions.zip_archive import archive_dir
from resource_management.libraries.resources import Msi
else:
from resource_management.libraries.functions.tar_archive import archive_dir
USAGE = """Usage: {0} <COMMAND> <JSON_CONFIG> <BASEDIR> <STROUTPUT> <LOGGING_LEVEL> <TMP_DIR> [PROTOCOL]
<COMMAND> command type (INSTALL/CONFIGURE/START/STOP/SERVICE_CHECK...)
<JSON_CONFIG> path to command json file. Ex: /var/lib/ambari-agent/data/command-2.json
<BASEDIR> path to service metadata dir. Ex: /var/lib/ambari-agent/cache/common-services/HDFS/2.1.0.2.0/package
<STROUTPUT> path to file with structured command output (file will be created). Ex:/tmp/my.txt
<LOGGING_LEVEL> log level for stdout. Ex:DEBUG,INFO
<TMP_DIR> temporary directory for executable scripts. Ex: /var/lib/ambari-agent/tmp
[PROTOCOL] optional protocol to use during https connections. Ex: see python ssl.PROTOCOL_<PROTO> variables, default PROTOCOL_TLSv1_2
"""
_PASSWORD_MAP = {
"/configurations/cluster-env/hadoop.user.name": "/configurations/cluster-env/hadoop.user.password"
}
STACK_VERSION_PLACEHOLDER = "${stack_version}"
COUNT_OF_LAST_LINES_OF_OUT_FILES_LOGGED = 100
OUT_FILES_MASK = "*.out"
AGENT_TASKS_LOG_FILE = "/var/log/ambari-agent/agent_tasks.log"
def get_path_from_configuration(name, configuration):
subdicts = [_f for _f in name.split("/") if _f]
for x in subdicts:
if x in configuration:
configuration = configuration[x]
else:
return None
return configuration
def get_config_lock_file():
return os.path.join(Script.get_tmp_dir(), "link_configs_lock_file")
class Script(object):
instance = None
"""
Executes a command for custom service. stdout and stderr are written to
tmpoutfile and to tmperrfile respectively.
Script instances share configuration as a class parameter and therefore
different Script instances can not be used from different threads at
the same time within a single python process
Accepted command line arguments mapping:
1 command type (START/STOP/...)
2 path to command json file
3 path to service metadata dir (Directory "package" inside service directory)
4 path to file with structured command output (file will be created)
"""
config = None
execution_command = None
module_configs = None
cluster_settings = None
stack_settings = None
stack_version_from_distro_select = None
structuredOut = {}
command_data_file = ""
basedir = ""
stroutfile = ""
logging_level = ""
# Class variable
tmp_dir = ""
force_https_protocol = (
"PROTOCOL_TLSv1_2" if hasattr(ssl, "PROTOCOL_TLSv1_2") else "PROTOCOL_TLSv1"
)
ca_cert_file_path = None
def load_structured_out(self):
Script.structuredOut = {}
if os.path.exists(self.stroutfile):
if os.path.getsize(self.stroutfile) > 0:
with open(self.stroutfile, "r") as fp:
try:
Script.structuredOut = json.load(fp)
except Exception:
errMsg = "Unable to read structured output from " + self.stroutfile
Logger.logger.exception(errMsg)
pass
# version is only set in a specific way and should not be carried
if "version" in Script.structuredOut:
del Script.structuredOut["version"]
# reset security issues and errors found on previous runs
if "securityIssuesFound" in Script.structuredOut:
del Script.structuredOut["securityIssuesFound"]
if "securityStateErrorInfo" in Script.structuredOut:
del Script.structuredOut["securityStateErrorInfo"]
def put_structured_out(self, sout):
Script.structuredOut.update(sout)
try:
with open(self.stroutfile, "w") as fp:
json.dump(Script.structuredOut, fp)
except IOError as err:
Script.structuredOut.update({"errMsg": "Unable to write to " + self.stroutfile})
def get_config_dir_during_stack_upgrade(self, env, base_dir, conf_select_name):
"""
Because this gets called during a Rolling Upgrade, the new configs have already been saved, so we must be
careful to only call configure() on the directory with the new version.
If valid, returns the config directory to save configs to, otherwise, return None
"""
import params
env.set_params(params)
required_attributes = ["stack_name", "stack_root", "version"]
for attribute in required_attributes:
if not hasattr(params, attribute):
raise Fail(
"Failed in function 'stack_upgrade_save_new_config' because params was missing variable %s."
% attribute
)
Logger.info(
"stack_upgrade_save_new_config(): Checking if can write new client configs to new config version folder."
)
if check_stack_feature(StackFeature.CONFIG_VERSIONING, params.version):
# Even though hdp-select has not yet been called, write new configs to the new config directory.
config_path = os.path.join(
params.stack_root, params.version, conf_select_name, "conf"
)
return os.path.realpath(config_path)
return None
def save_component_version_to_structured_out(self, command_name):
"""
Saves the version of the component for this command to the structured out file. If the
command is an install command and the repository is trusted, then it will use the version of
the repository. Otherwise, it will consult the stack-select tool to read the symlink version.
Under rare circumstances, a component may have a bug which prevents it from reporting a
version back after being installed. This is most likely due to the stack-select tool not being
invoked by the package's installer. In these rare cases, we try to see if the component
should have reported a version and we try to fallback to the "<stack-select> versions" command.
:param command_name: command name
:return: None
"""
from resource_management.libraries.functions.default import default
from resource_management.libraries.functions import stack_select
repository_resolved = default("repositoryFile/resolved", False)
repository_version = default("repositoryFile/repoVersion", None)
is_install_command = command_name is not None and command_name.lower() == "install"
# start out with no version
component_version = None
# install command + trusted repo means use the repo version and don't consult stack-select
# this is needed in cases where an existing symlink is on the system and stack-select can't
# change it on installation (because it's scared to in order to support parallel installs)
if is_install_command and repository_resolved and repository_version is not None:
Logger.info(
f"The repository with version {repository_version} for this command has been marked as resolved."
" It will be used to report the version of the component which was installed"
)
component_version = repository_version
stack_name = Script.get_stack_name()
stack_select_package_name = stack_select.get_package_name()
if stack_select_package_name and stack_name:
# only query for the component version from stack-select if we can't trust the repository yet
if component_version is None:
component_version = version_select_util.get_component_version_from_symlink(
stack_name, stack_select_package_name
)
# last ditch effort - should cover the edge case where the package failed to setup its
# link and we have to try to see if <stack-select> can help
if component_version is None:
output, code, versions = stack_select.unsafe_get_stack_versions()
if len(versions) == 1:
component_version = versions[0]
Logger.error(
f"The '{stack_select_package_name}' component did not advertise a version. This may indicate a problem with the component packaging. "
f"However, the stack-select tool was able to report a single version installed ({component_version}). "
"This is the version that will be reported."
)
if component_version:
self.put_structured_out({"version": component_version})
# if repository_version_id is passed, pass it back with the version
from resource_management.libraries.functions.default import default
repo_version_id = default("/repositoryFile/repoVersionId", None)
if repo_version_id:
self.put_structured_out({"repository_version_id": repo_version_id})
else:
if not self.is_hook():
Logger.error(
f"The '{stack_select_package_name}' component did not advertise a version. This may indicate a problem with the component packaging."
)
def should_expose_component_version(self, command_name):
"""
Analyzes config and given command to determine if stack version should be written
to structured out. Currently only HDP stack versions >= 2.2 are supported.
:param command_name: command name
:return: True or False
"""
from resource_management.libraries.functions.default import default
stack_version_unformatted = str(default("/clusterLevelParams/stack_version", ""))
stack_version_formatted = format_stack_version(stack_version_unformatted)
if stack_version_formatted and check_stack_feature(
StackFeature.ROLLING_UPGRADE, stack_version_formatted
):
if command_name.lower() == "get_version":
return True
else:
# Populate version only on base commands
return (
command_name.lower() == "start"
or command_name.lower() == "install"
or command_name.lower() == "restart"
)
return False
def execute(self):
"""
Sets up logging;
Parses command parameters and executes method relevant to command type
"""
parser = OptionParser()
parser.add_option(
"-o",
"--out-files-logging",
dest="log_out_files",
action="store_true",
help="use this option to enable outputting *.out files of the service pre-start",
)
(self.options, args) = parser.parse_args()
self.log_out_files = self.options.log_out_files
# parse arguments
if len(args) < 6:
print("Script expects at least 6 arguments")
print(USAGE.format(os.path.basename(sys.argv[0]))) # print to stdout
sys.exit(1)
self.command_name = str.lower(sys.argv[1])
self.command_data_file = sys.argv[2]
self.basedir = sys.argv[3]
self.stroutfile = sys.argv[4]
self.load_structured_out()
self.logging_level = sys.argv[5]
Script.tmp_dir = sys.argv[6]
# optional script arguments for forcing https protocol and ca_certs file
if len(sys.argv) >= 8:
Script.force_https_protocol = sys.argv[7]
if len(sys.argv) >= 9:
Script.ca_cert_file_path = sys.argv[8]
Logger.initialize_logger(__name__, logging_level=self.logging_level)
# on windows we need to reload some of env variables manually because there is no default paths for configs(like
# /etc/something/conf on linux. When this env vars created by one of the Script execution, they can not be updated
# in agent, so other Script executions will not be able to access to new env variables
if OSCheck.is_windows_family():
reload_windows_env()
# !!! status commands re-use structured output files; if the status command doesn't update the
# the file (because it doesn't have to) then we must ensure that the file is reset to prevent
# old, stale structured output from a prior status command from being used
if self.command_name == "status":
Script.structuredOut = {}
self.put_structured_out({})
# make sure that script has forced https protocol and ca_certs file passed from agent
ensure_ssl_using_protocol(
Script.get_force_https_protocol_name(), Script.get_ca_cert_file_path()
)
try:
with open(self.command_data_file) as f:
pass
Script.config = ConfigDictionary(json.load(f))
Script.execution_command = ExecutionCommand(Script.config)
Script.module_configs = Script.execution_command.get_module_configs()
Script.cluster_settings = Script.execution_command.get_cluster_settings()
Script.stack_settings = Script.execution_command.get_stack_settings()
# load passwords here(used on windows to impersonate different users)
Script.passwords = {}
for k, v in _PASSWORD_MAP.items():
if get_path_from_configuration(
k, Script.config
) and get_path_from_configuration(v, Script.config):
Script.passwords[get_path_from_configuration(k, Script.config)] = (
get_path_from_configuration(v, Script.config)
)
except IOError:
Logger.logger.exception("Can not read json file with command parameters: ")
sys.exit(1)
Script.repository_util = RepositoryUtil(Script.config)
# Run class method depending on a command type
try:
method = self.choose_method_to_execute(self.command_name)
with Environment(self.basedir, tmp_dir=Script.tmp_dir) as env:
env.config.download_path = Script.tmp_dir
if not self.is_hook():
self.execute_prefix_function(self.command_name, "pre", env)
method(env)
if not self.is_hook():
self.execute_prefix_function(self.command_name, "post", env)
# catch this to avoid unhandled exception logs in /var/log/messages
except (ComponentIsNotRunning, ClientComponentHasNoStatus) as e:
traceback.print_exc()
sys.exit(1)
except Fail as ex:
ex.pre_raise()
raise
finally:
try:
if self.should_expose_component_version(self.command_name):
self.save_component_version_to_structured_out(self.command_name)
except:
Logger.exception("Reporting component version failed")
def get_version(self, env):
pass
def execute_prefix_function(self, command_name, afix, env):
"""
Execute action afix (prefix or suffix) based on command_name and afix type
example: command_name=start, afix=pre will result in execution of self.pre_start(env) if exists
"""
self_methods = dir(self)
method_name = f"{afix}_{command_name}"
if not method_name in self_methods:
Logger.logger.debug(f"Action afix '{method_name}' not present")
return
Logger.logger.debug(f"Execute action afix: {method_name}")
method = getattr(self, method_name)
method(env)
def is_hook(self):
from resource_management.libraries.script.hook import Hook
return Hook in self.__class__.__bases__
def get_log_folder(self):
return ""
def get_user(self):
return ""
def get_pid_files(self):
return []
def pre_start(self, env=None):
"""
Executed before any start method. Posts contents of relevant *.out files to command execution log.
"""
if self.log_out_files:
log_folder = self.get_log_folder()
user = self.get_user()
if log_folder == "":
Logger.logger.warning("Log folder for current script is not defined")
return
if user == "":
Logger.logger.warning("User for current script is not defined")
return
show_logs(
log_folder,
user,
lines_count=COUNT_OF_LAST_LINES_OF_OUT_FILES_LOGGED,
mask=OUT_FILES_MASK,
)
def post_start(self, env=None):
pid_files = self.get_pid_files()
if pid_files == []:
Logger.logger.warning("Pid files for current script are not defined")
return
pids = []
for pid_file in pid_files:
if not sudo.path_exists(pid_file):
raise Fail(
f"Pid file {pid_file} doesn't exist after starting of the component."
)
pids.append(sudo.read_file(pid_file).strip())
Logger.info(
f"Component has started with pid(s): {', '.join([x.decode('utf-8') for x in pids])}"
)
def post_stop(self, env):
"""
Executed after completion of every stop method. Waits until component is actually stopped (check is performed using
components status() method.
"""
self_methods = dir(self)
if not "status" in self_methods:
pass
status_method = getattr(self, "status")
component_is_stopped = False
counter = 0
while not component_is_stopped:
try:
if counter % 100 == 0:
Logger.logger.info("Waiting for actual component stop")
status_method(env)
time.sleep(0.1)
counter += 1
except ComponentIsNotRunning as e:
Logger.logger.debug("'status' reports ComponentIsNotRunning")
component_is_stopped = True
except ClientComponentHasNoStatus as e:
Logger.logger.debug("Client component has no status")
component_is_stopped = True
def choose_method_to_execute(self, command_name):
"""
Returns a callable object that should be executed for a given command.
"""
self_methods = dir(self)
if not command_name in self_methods:
raise Fail(f"Script '{sys.argv[0]}' has no method '{command_name}'")
method = getattr(self, command_name)
return method
def get_stack_version_before_packages_installed(self):
"""
This works in a lazy way (calculates the version first time and stores it).
If you need to recalculate the version explicitly set:
Script.stack_version_from_distro_select = None
before the call. However takes a bit of time, so better to avoid.
:return: stack version including the build number. e.g.: 2.3.4.0-1234.
"""
from resource_management.libraries.functions import stack_select
from ambari_commons.repo_manager import ManagerFactory
# preferred way is to get the actual selected version of current component
stack_select_package_name = stack_select.get_package_name()
if not Script.stack_version_from_distro_select and stack_select_package_name:
Script.stack_version_from_distro_select = (
stack_select.get_stack_version_before_install(stack_select_package_name)
)
# If <stack-selector-tool> has not yet been done (situations like first install),
# we can use <stack-selector-tool> version itself.
# Wildcards cause a lot of troubles with installing packages, if the version contains wildcards we should try to specify it.
if (
not Script.stack_version_from_distro_select
or "*" in Script.stack_version_from_distro_select
):
# FIXME: this method is not reliable to get stack-selector-version
# as if there are multiple versions installed with different <stack-selector-tool>, we won't detect the older one (if needed).
pkg_provider = ManagerFactory.get()
Script.stack_version_from_distro_select = (
pkg_provider.get_installed_package_version(
stack_tools.get_stack_tool_package(stack_tools.STACK_SELECTOR_NAME)
)
)
return Script.stack_version_from_distro_select
def get_package_from_available(self, name, available_packages_in_repos=None):
"""
This function matches package names with ${stack_version} placeholder to actual package names from
Ambari-managed repository.
Package names without ${stack_version} placeholder are returned as is.
"""
if STACK_VERSION_PLACEHOLDER not in name:
return name
if not available_packages_in_repos:
available_packages_in_repos = self.load_available_packages()
from resource_management.libraries.functions.default import default
package_delimiter = "-" if OSCheck.is_ubuntu_family() else "_"
package_regex = (
name.replace(STACK_VERSION_PLACEHOLDER, f"(\d|{package_delimiter})+") + "$"
)
repo = default("/repositoryFile", None)
name_with_version = None
if repo:
command_repo = CommandRepository(repo)
version_str = command_repo.version_string.replace(".", package_delimiter).replace(
"-", package_delimiter
)
name_with_version = name.replace(STACK_VERSION_PLACEHOLDER, version_str)
for package in available_packages_in_repos:
if re.match(package_regex, package):
return package
if name_with_version:
raise Fail(f"No package found for {name}(expected name: {name_with_version})")
else:
raise Fail(
f"Cannot match package for regexp name {name}. Available packages: {self.available_packages_in_repos}"
)
def format_package_name(self, name):
from resource_management.libraries.functions.default import default
"""
This function replaces ${stack_version} placeholder with actual version. If the package
version is passed from the server, use that as an absolute truth.
:param name name of the package
:param repo_version actual version of the repo currently installing
"""
if not STACK_VERSION_PLACEHOLDER in name:
return name
stack_version_package_formatted = ""
package_delimiter = "-" if OSCheck.is_ubuntu_family() else "_"
# repositoryFile is the truth
# package_version should be made to the form W_X_Y_Z_nnnn
package_version = default("repositoryFile/repoVersion", None)
# TODO remove legacy checks
if package_version is None:
package_version = default("roleParams/package_version", None)
# TODO remove legacy checks
if package_version is None:
package_version = default("hostLevelParams/package_version", None)
if (package_version is None or "-" not in package_version) and default(
"/repositoryFile", None
):
return self.get_package_from_available(name)
if package_version is not None:
package_version = package_version.replace(".", package_delimiter).replace(
"-", package_delimiter
)
# The cluster effective version comes down when the version is known after the initial
# install. In that case we should not be guessing which version when invoking INSTALL, but
# use the supplied version to build the package_version
effective_version = default("commandParams/version", None)
role_command = default("roleCommand", None)
if (
(package_version is None or "*" in package_version)
and effective_version is not None
and "INSTALL" == role_command
):
package_version = effective_version.replace(".", package_delimiter).replace(
"-", package_delimiter
)
Logger.info(
f"Version {effective_version} was provided as effective cluster version. Using package version {package_version}"
)
if package_version:
stack_version_package_formatted = package_version
if OSCheck.is_ubuntu_family():
stack_version_package_formatted = package_version.replace(
"_", package_delimiter
)
# Wildcards cause a lot of troubles with installing packages, if the version contains wildcards we try to specify it.
if not package_version or "*" in package_version:
repo_version = self.get_stack_version_before_packages_installed()
stack_version_package_formatted = (
repo_version.replace(".", package_delimiter).replace("-", package_delimiter)
if STACK_VERSION_PLACEHOLDER in name
else name
)
package_name = name.replace(
STACK_VERSION_PLACEHOLDER, stack_version_package_formatted
)
return package_name
@staticmethod
def get_config():
"""
HACK. Uses static field to store configuration. This is a workaround for
"circular dependency" issue when importing params.py file and passing to
it a configuration instance.
"""
return Script.config
@staticmethod
def get_execution_command():
"""
The dot access dict object holds command.json
:return:
"""
return Script.execution_command
@staticmethod
def get_module_configs():
"""
The dict object holds configurations block in command.json which maps service configurations
:return: module_configs object
"""
if not Script.module_configs:
Script.module_configs = Script.execution_command.get_module_configs()
return Script.module_configs
@staticmethod
def get_cluster_settings():
"""
The dict object holds cluster_settings block in command.json which maps cluster configurations
:return: cluster_settings object
"""
if not Script.cluster_settings and Script.execution_command:
Script.cluster_settings = Script.execution_command.get_cluster_settings()
return Script.cluster_settings
@staticmethod
def get_stack_settings():
"""
The dict object holds stack_settings block in command.json which maps stack configurations
:return: stack_settings object
"""
if not Script.stack_settings and Script.execution_command:
Script.stack_settings = Script.execution_command.get_stack_settings()
return Script.stack_settings
@staticmethod
def get_password(user):
return Script.passwords[user]
@staticmethod
def get_tmp_dir():
"""
HACK. Uses static field to avoid "circular dependency" issue when
importing params.py.
"""
return Script.tmp_dir
@staticmethod
def get_force_https_protocol_name():
"""
Get forced https protocol name.
:return: protocol name, PROTOCOL_TLSv1_2 by default
"""
return Script.force_https_protocol
@staticmethod
def get_force_https_protocol_value():
"""
Get forced https protocol value that correspondents to ssl module variable.
:return: protocol value
"""
return getattr(ssl, Script.get_force_https_protocol_name())
@staticmethod
def get_ca_cert_file_path():
"""
Get path to file with trusted certificates.
:return: trusted certificates file path
"""
return Script.ca_cert_file_path
@staticmethod
def get_component_from_role(role_directory_map, default_role):
"""
Gets the <stack-root>/current/<component> component given an Ambari role,
such as DATANODE or HBASE_MASTER.
:return: the component name, such as hbase-master
"""
from resource_management.libraries.functions.default import default
command_role = default("/role", default_role)
if command_role in role_directory_map:
return role_directory_map[command_role]
else:
return role_directory_map[default_role]
@staticmethod
def get_stack_name():
"""
Gets the name of the stack from clusterLevelParams/stack_name.
:return: a stack name or None
"""
from resource_management.libraries.functions.default import default
stack_name = default("/clusterLevelParams/stack_name", None)
if stack_name is None:
stack_name = default("/configurations/cluster-env/stack_name", "HDP")
return stack_name
@staticmethod
def get_stack_root():
"""
Get the stack-specific install root directory
:return: stack_root
"""
from resource_management.libraries.functions.default import default
stack_name = Script.get_stack_name()
stack_root_json = default("/configurations/cluster-env/stack_root", None)
if stack_root_json is None:
return f"/usr/{stack_name.lower()}"
stack_root = json.loads(stack_root_json)
if stack_name not in stack_root:
Logger.warning(f"Cannot determine stack root for stack named {stack_name}")
return f"/usr/{stack_name.lower()}"
return stack_root[stack_name]
@staticmethod
def get_stack_version():
"""
Gets the normalized version of the stack in the form #.#.#.# if it is
present on the configurations sent.
:return: a normalized stack version or None
"""
config = Script.get_config()
if (
"clusterLevelParams" not in config
or "stack_version" not in config["clusterLevelParams"]
):
return None
stack_version_unformatted = str(config["clusterLevelParams"]["stack_version"])
if stack_version_unformatted is None or stack_version_unformatted == "":
return None
return format_stack_version(stack_version_unformatted)
@staticmethod
def in_stack_upgrade():
from resource_management.libraries.functions.default import default
upgrade_direction = default("/commandParams/upgrade_direction", None)
return upgrade_direction is not None and upgrade_direction in [
Direction.UPGRADE,
Direction.DOWNGRADE,
]
@staticmethod
def is_stack_greater(stack_version_formatted, compare_to_version):
"""
Gets whether the provided stack_version_formatted (normalized)
is greater than the specified stack version
:param stack_version_formatted: the version of stack to compare
:param compare_to_version: the version of stack to compare to
:return: True if the command's stack is greater than the specified version
"""
if stack_version_formatted is None or stack_version_formatted == "":
return False
return compare_versions(stack_version_formatted, compare_to_version) > 0
@staticmethod
def is_stack_greater_or_equal(compare_to_version):
"""
Gets whether the hostLevelParams/stack_version, after being normalized,
is greater than or equal to the specified stack version
:param compare_to_version: the version to compare to
:return: True if the command's stack is greater than or equal the specified version
"""
return Script.is_stack_greater_or_equal_to(
Script.get_stack_version(), compare_to_version
)
@staticmethod
def is_stack_greater_or_equal_to(stack_version_formatted, compare_to_version):
"""
Gets whether the provided stack_version_formatted (normalized)
is greater than or equal to the specified stack version
:param stack_version_formatted: the version of stack to compare
:param compare_to_version: the version of stack to compare to
:return: True if the command's stack is greater than or equal to the specified version
"""
if stack_version_formatted is None or stack_version_formatted == "":
return False
return compare_versions(stack_version_formatted, compare_to_version) >= 0
@staticmethod
def is_stack_less_than(compare_to_version):
"""
Gets whether the hostLevelParams/stack_version, after being normalized,
is less than the specified stack version
:param compare_to_version: the version to compare to
:return: True if the command's stack is less than the specified version
"""
stack_version_formatted = Script.get_stack_version()
if stack_version_formatted is None:
return False
return compare_versions(stack_version_formatted, compare_to_version) < 0
def install(self, env):
"""
Default implementation of install command is to install all packages
from a list, received from the server.
Feel free to override install() method with your implementation. It
usually makes sense to call install_packages() manually in this case
"""
self.install_packages(env)
def load_available_packages(self):
from ambari_commons.repo_manager import ManagerFactory
if self.available_packages_in_repos:
return self.available_packages_in_repos
config = self.get_config()
service_name = config["serviceName"] if "serviceName" in config else None
repos = CommandRepository(config["repositoryFile"])
repo_ids = [repo.repo_id for repo in repos.items]
Logger.info(f"Command repositories: {', '.join(repo_ids)}")
repos.items = [
x
for x in repos.items
if (not x.applicable_services or service_name in x.applicable_services)
]
applicable_repo_ids = [repo.repo_id for repo in repos.items]
Logger.info(f"Applicable repositories: {', '.join(applicable_repo_ids)}")
pkg_provider = ManagerFactory.get()
try:
self.available_packages_in_repos = pkg_provider.get_available_packages_in_repos(
repos
)
except Exception as err:
Logger.exception("Unable to load available packages")
self.available_packages_in_repos = []
return self.available_packages_in_repos
def install_packages(self, env):
"""
List of packages that are required< by service is received from the server
as a command parameter. The method installs all packages
from this list
exclude_packages - list of regexes (possibly raw strings as well), the
packages which match the regex won't be installed.
NOTE: regexes don't have Python syntax, but simple package regexes which support only * and .* and ?
"""
config = self.get_config()
if "host_sys_prepped" in config["ambariLevelParams"]:
# do not install anything on sys-prepped host
if config["ambariLevelParams"]["host_sys_prepped"] is True:
Logger.info("Node has all packages pre-installed. Skipping.")
return
pass
try:
package_list_str = config["commandParams"]["package_list"]
agent_stack_retry_on_unavailability = bool(
config["ambariLevelParams"]["agent_stack_retry_on_unavailability"]
)
agent_stack_retry_count = int(
config["ambariLevelParams"]["agent_stack_retry_count"]
)
if isinstance(package_list_str, str) and len(package_list_str) > 0:
package_list = json.loads(package_list_str)
for package in package_list:
if self.check_package_condition(package):
name = self.format_package_name(package["name"])
# HACK: On Windows, only install ambari-metrics packages using Choco Package Installer
# TODO: Update this once choco packages for hadoop are created. This is because, service metainfo.xml support
# <osFamily>any<osFamily> which would cause installation failure on Windows.
if OSCheck.is_windows_family():
if "ambari-metrics" in name:
Package(name)
else:
Package(
name,
retry_on_repo_unavailability=agent_stack_retry_on_unavailability,
retry_count=agent_stack_retry_count,
)
except KeyError:
traceback.print_exc()
if OSCheck.is_windows_family():
# TODO hacky install of windows msi, remove it or move to old(2.1) stack definition when component based install will be implemented
hadoop_user = config["configurations"]["cluster-env"]["hadoop.user.name"]
install_windows_msi(
config["ambariLevelParams"]["jdk_location"],
config["agentLevelParams"]["agentCacheDir"],
["hdp-2.3.0.0.winpkg.msi", "hdp-2.3.0.0.cab", "hdp-2.3.0.0-01.cab"],
hadoop_user,
self.get_password(hadoop_user),
str(config["clusterLevelParams"]["stack_version"]),
)
reload_windows_env()
def check_package_condition(self, package):
condition = package["condition"]
if not condition:
return True
return self.should_install_package(package)
def should_install_package(self, package):
from resource_management.libraries.functions import package_conditions
condition = package["condition"]
try:
chooser_method = getattr(package_conditions, condition)
except AttributeError:
name = package["name"]
raise Fail(
f"Condition with name '{condition}', when installing package {name}. Please check package_conditions.py."
)
return chooser_method()
@staticmethod
def matches_any_regexp(string, regexp_list):
for regex in regexp_list:
# we cannot use here Python regex, since * will create some troubles matching plaintext names.
package_regex = (
"^"
+ re.escape(regex)
.replace("\\.\\*", ".*")
.replace("\\?", ".")
.replace("\\*", ".*")
+ "$"
)
if re.match(package_regex, string):
return True
return False
@staticmethod
def fail_with_error(message):
"""
Prints error message and exits with non-zero exit code
"""
print("Error: " + message)
sys.stderr.write("Error: " + message)
sys.exit(1)
def start(self, env, upgrade_type=None):
"""
To be overridden by subclasses
"""
self.fail_with_error("start method isn't implemented")
def stop(self, env, upgrade_type=None):
"""
To be overridden by subclasses
"""
self.fail_with_error("stop method isn't implemented")
# TODO, remove after all services have switched to pre_upgrade_restart
def pre_rolling_restart(self, env):
"""
To be overridden by subclasses
"""
pass
def disable_security(self, env):
"""
To be overridden by subclasses if a custom action is required upon dekerberization (e.g. removing zk ACLs)
"""
pass
def restart(self, env):
"""
Default implementation of restart command is to call stop and start methods
Feel free to override restart() method with your implementation.
For client components we call install
"""
config = self.get_config()
componentCategory = None
try:
componentCategory = config["roleParams"]["component_category"]
except KeyError:
pass
upgrade_type_command_param = ""
direction = None
is_rolling_restart = None
if config is not None:
command_params = config["commandParams"] if "commandParams" in config else None
if command_params is not None:
upgrade_type_command_param = (
command_params["upgrade_type"] if "upgrade_type" in command_params else ""
)
direction = (
command_params["upgrade_direction"]
if "upgrade_direction" in command_params
else None
)
is_rolling_restart = (
command_params["rolling_restart"]
if "rolling_restart" in command_params
else None
)
upgrade_type = Script.get_upgrade_type(upgrade_type_command_param)
is_stack_upgrade = upgrade_type is not None
# need this before actually executing so that failures still report upgrade info
if is_stack_upgrade:
upgrade_info = {"upgrade_type": upgrade_type_command_param}
if direction is not None:
upgrade_info["direction"] = direction.upper()
Script.structuredOut.update(upgrade_info)
if componentCategory and componentCategory.strip().lower() == "CLIENT".lower():
if is_stack_upgrade:
# Remain backward compatible with the rest of the services that haven't switched to using
# the pre_upgrade_restart method. Once done. remove the else-block.
if "pre_upgrade_restart" in dir(self):
self.pre_upgrade_restart(env, upgrade_type=upgrade_type)
else:
self.pre_rolling_restart(env)
self.install(env)
else:
# To remain backward compatible with older stacks, only pass upgrade_type if available.
# TODO, remove checking the argspec for "upgrade_type" once all of the services support that optional param.
if "upgrade_type" in inspect.signature(self.stop).parameters:
self.stop(env, upgrade_type=upgrade_type)
else:
if is_stack_upgrade:
self.stop(env, rolling_restart=(upgrade_type == UPGRADE_TYPE_ROLLING))
else:
self.stop(env)
if is_stack_upgrade:
# Remain backward compatible with the rest of the services that haven't switched to using
# the pre_upgrade_restart method. Once done. remove the else-block.
if "pre_upgrade_restart" in dir(self):
self.pre_upgrade_restart(env, upgrade_type=upgrade_type)
else:
self.pre_rolling_restart(env)
service_name = (
config["serviceName"]
if config is not None and "serviceName" in config
else None
)
try:
# TODO Once the logic for pid is available from Ranger and Ranger KMS code, will remove the below if block.
services_to_skip = ["RANGER", "RANGER_KMS"]
if service_name in services_to_skip:
Logger.info(
f"Temporarily skipping status check for {service_name} service only."
)
elif is_stack_upgrade:
Logger.info(
f"Skipping status check for {service_name} service during upgrade"
)
else:
self.status(env)
raise Fail("Stop command finished but process keep running.")
except ComponentIsNotRunning as e:
pass # expected
except ClientComponentHasNoStatus as e:
pass # expected
# To remain backward compatible with older stacks, only pass upgrade_type if available.
# TODO, remove checking the argspec for "upgrade_type" once all of the services support that optional param.
self.pre_start(env)
if "upgrade_type" in inspect.signature(self.start).parameters:
self.start(env, upgrade_type=upgrade_type)
else:
if is_stack_upgrade:
self.start(env, rolling_restart=(upgrade_type == UPGRADE_TYPE_ROLLING))
else:
self.start(env)
self.post_start(env)
if is_rolling_restart:
self.post_rolling_restart(env)
if is_stack_upgrade:
self.post_upgrade_restart(env, upgrade_type=upgrade_type)
if self.should_expose_component_version("restart"):
self.save_component_version_to_structured_out("restart")
def post_upgrade_restart(self, env, upgrade_type=None):
"""
To be overridden by subclasses
"""
pass
# TODO, remove after all services have switched to post_upgrade_restart
def post_rolling_restart(self, env):
"""
To be overridden by subclasses
"""
# Mostly Actions are the same for both of these cases. If they are different this method should be overriden.
self.post_upgrade_restart(env, UPGRADE_TYPE_ROLLING)
def configure(self, env, upgrade_type=None, config_dir=None):
"""
To be overridden by subclasses (may invoke save_configs)
:param upgrade_type: only valid during RU/EU, otherwise will be None
:param config_dir: for some clients during RU, the location to save configs to, otherwise None
"""
self.fail_with_error("configure method isn't implemented")
def save_configs(self, env):
"""
To be overridden by subclasses
Creates / updates configuration files
"""
self.fail_with_error("save_configs method isn't implemented")
def reconfigure(self, env):
"""
Default implementation of RECONFIGURE action which may be overridden by subclasses
"""
Logger.info("Refresh config files ...")
self.save_configs(env)
config = self.get_config()
if (
"reconfigureAction" in config["commandParams"]
and config["commandParams"]["reconfigureAction"] is not None
):
reconfigure_action = config["commandParams"]["reconfigureAction"]
Logger.info(f"Call {reconfigure_action}")
method = self.choose_method_to_execute(reconfigure_action)
method(env)
def generate_configs_get_template_file_content(self, filename, dicts):
config = self.get_config()
content = ""
for dict in dicts.split(","):
if dict.strip() in config["configurations"]:
try:
content += config["configurations"][dict.strip()]["content"]
except Fail:
# 'content' section not available in the component client configuration
pass
return content
def generate_configs_get_xml_file_content(self, filename, dict):
config = self.get_config()
return {
"configurations": config["configurations"][dict],
"configuration_attributes": config["configurationAttributes"][dict],
}
def generate_configs_get_xml_file_dict(self, filename, dict):
config = self.get_config()
return config["configurations"][dict]
def generate_configs(self, env):
"""
Generates config files and stores them as an archive in tmp_dir
based on xml_configs_list and env_configs_list from commandParams
"""
import params
env.set_params(params)
config = self.get_config()
xml_configs_list = config["commandParams"]["xml_configs_list"]
env_configs_list = config["commandParams"]["env_configs_list"]
properties_configs_list = config["commandParams"]["properties_configs_list"]
Directory(self.get_tmp_dir(), create_parents=True)
conf_tmp_dir = tempfile.mkdtemp(dir=self.get_tmp_dir())
os.chmod(conf_tmp_dir, 0o700)
output_filename = os.path.join(
self.get_tmp_dir(), config["commandParams"]["output_file"]
)
try:
for file_dict in xml_configs_list:
for filename, dict in file_dict.items():
XmlConfig(
filename,
conf_dir=conf_tmp_dir,
mode=0o644,
**self.generate_configs_get_xml_file_content(filename, dict),
)
for file_dict in env_configs_list:
for filename, dicts in file_dict.items():
File(
os.path.join(conf_tmp_dir, filename),
mode=0o644,
content=InlineTemplate(
self.generate_configs_get_template_file_content(filename, dicts)
),
)
for file_dict in properties_configs_list:
for filename, dict in file_dict.items():
PropertiesFile(
os.path.join(conf_tmp_dir, filename),
mode=0o644,
properties=self.generate_configs_get_xml_file_dict(filename, dict),
)
with closing(tarfile.open(output_filename, "w:gz")) as tar:
os.chmod(output_filename, 0o600)
try:
tar.add(conf_tmp_dir, arcname=os.path.basename("."))
finally:
tar.close()
finally:
Directory(conf_tmp_dir, action="delete")
@staticmethod
def get_instance():
if Script.instance is None:
from resource_management.libraries.functions.default import default
use_proxy = default(
"/agentLevelParams/agentConfigParams/agent/use_system_proxy_settings", True
)
if not use_proxy:
reconfigure_urllib2_opener(ignore_system_proxy=True)
Script.instance = Script()
return Script.instance
@staticmethod
def get_upgrade_type(upgrade_type_command_param):
upgrade_type = None
if upgrade_type_command_param.lower() == "rolling_upgrade":
upgrade_type = UPGRADE_TYPE_ROLLING
elif upgrade_type_command_param.lower() == "nonrolling_upgrade":
upgrade_type = UPGRADE_TYPE_NON_ROLLING
elif upgrade_type_command_param.lower() == "host_ordered_upgrade":
upgrade_type = UPGRADE_TYPE_HOST_ORDERED
return upgrade_type
def __init__(self):
self.available_packages_in_repos = []
if Script.instance is not None:
raise Fail("An instantiation already exists! Use, get_instance() method.")