blob: 0e313104a5e9562c492e3116341f3167d011cfa7 [file] [log] [blame]
#!/usr/bin/env python
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Ambari Agent
"""
import os
import signal
import re
import os.path
import ambari_simplejson as json # simplejson is much faster comparing to Python 2.6 json module and has the same functions set.
from resource_management import *
import resource_management
from resource_management.libraries.functions.list_ambari_managed_repos import list_ambari_managed_repos
from ambari_commons.os_check import OSCheck, OSConst
from ambari_commons.str_utils import cbool, cint
from resource_management.libraries.functions.packages_analyzer import allInstalledPackages, verifyDependencies
from resource_management.libraries.functions import conf_select
from resource_management.libraries.functions import stack_tools
from resource_management.libraries.functions.stack_select import get_stack_versions
from resource_management.libraries.functions.version import format_stack_version
from resource_management.libraries.functions.repo_version_history \
import read_actual_version_from_history_file, write_actual_version_to_history_file, REPO_VERSION_HISTORY_FILE
from resource_management.libraries.script.script import Script
from resource_management.core.resources.system import Execute
from resource_management.libraries.functions.stack_features import check_stack_feature
from resource_management.libraries.functions import StackFeature
from resource_management.core.logger import Logger
class InstallPackages(Script):
"""
This script is a part of Rolling Upgrade workflow and is described at
appropriate design doc.
It installs repositories to the node and then installs packages.
For now, repositories are installed into individual files.
"""
UBUNTU_REPO_COMPONENTS_POSTFIX = ["main"]
def actionexecute(self, env):
num_errors = 0
# Parse parameters
config = Script.get_config()
repo_rhel_suse = config['configurations']['cluster-env']['repo_suse_rhel_template']
repo_ubuntu = config['configurations']['cluster-env']['repo_ubuntu_template']
template = repo_rhel_suse if OSCheck.is_redhat_family() or OSCheck.is_suse_family() else repo_ubuntu
# Handle a SIGTERM and SIGINT gracefully
signal.signal(signal.SIGTERM, self.abort_handler)
signal.signal(signal.SIGINT, self.abort_handler)
self.repository_version_id = None
# Select dict that contains parameters
try:
self.repository_version = config['roleParams']['repository_version']
base_urls = json.loads(config['roleParams']['base_urls'])
package_list = json.loads(config['roleParams']['package_list'])
stack_id = config['roleParams']['stack_id']
if 'repository_version_id' in config['roleParams']:
self.repository_version_id = config['roleParams']['repository_version_id']
except KeyError:
# Last try
self.repository_version = config['commandParams']['repository_version']
base_urls = json.loads(config['commandParams']['base_urls'])
package_list = json.loads(config['commandParams']['package_list'])
stack_id = config['commandParams']['stack_id']
if 'repository_version_id' in config['commandParams']:
self.repository_version_id = config['commandParams']['repository_version_id']
# current stack information
self.current_stack_version_formatted = None
if 'stack_version' in config['hostLevelParams']:
current_stack_version_unformatted = str(config['hostLevelParams']['stack_version'])
self.current_stack_version_formatted = format_stack_version(current_stack_version_unformatted)
self.stack_name = Script.get_stack_name()
if self.stack_name is None:
raise Fail("Cannot determine the stack name")
self.stack_root_folder = Script.get_stack_root()
if self.stack_root_folder is None:
raise Fail("Cannot determine the stack's root directory")
if self.repository_version is None:
raise Fail("Cannot determine the repository version to install")
self.repository_version = self.repository_version.strip()
# Install/update repositories
installed_repositories = []
self.current_repositories = []
self.current_repo_files = set()
# Enable base system repositories
# We don't need that for RHEL family, because we leave all repos enabled
# except disabled HDP* ones
if OSCheck.is_suse_family():
self.current_repositories.append('base')
elif OSCheck.is_ubuntu_family():
self.current_repo_files.add('base')
Logger.info("Will install packages for repository version {0}".format(self.repository_version))
if 0 == len(base_urls):
Logger.info("Repository list is empty. Ambari may not be managing the repositories for {0}.".format(self.repository_version))
try:
append_to_file = False
for url_info in base_urls:
repo_name, repo_file = self.install_repository(url_info, append_to_file, template)
self.current_repositories.append(repo_name)
self.current_repo_files.add(repo_file)
append_to_file = True
installed_repositories = list_ambari_managed_repos(self.stack_name)
except Exception, err:
Logger.logger.exception("Cannot distribute repositories. Error: {0}".format(str(err)))
num_errors += 1
# Build structured output with initial values
self.structured_output = {
'ambari_repositories': installed_repositories,
'installed_repository_version': self.repository_version,
'stack_id': stack_id,
'package_installation_result': 'FAIL'
}
if self.repository_version_id is not None:
self.structured_output['repository_version_id'] = self.repository_version_id
self.put_structured_out(self.structured_output)
if num_errors > 0:
raise Fail("Failed to distribute repositories/install packages")
# Initial list of versions, used to compute the new version installed
self.old_versions = get_stack_versions(self.stack_root_folder)
try:
is_package_install_successful = False
ret_code = self.install_packages(package_list)
if ret_code == 0:
self.structured_output['package_installation_result'] = 'SUCCESS'
self.put_structured_out(self.structured_output)
is_package_install_successful = True
else:
num_errors += 1
except Exception, err:
num_errors += 1
Logger.logger.exception("Could not install packages. Error: {0}".format(str(err)))
# Provide correct exit code
if num_errors > 0:
raise Fail("Failed to distribute repositories/install packages")
# if installing a version of HDP that needs some symlink love, then create them
if is_package_install_successful and 'actual_version' in self.structured_output:
self._create_config_links_if_necessary(stack_id, self.structured_output['actual_version'])
def _create_config_links_if_necessary(self, stack_id, stack_version):
"""
Sets up the required structure for /etc/<component>/conf symlinks and <stack-root>/current
configuration symlinks IFF the current stack is < HDP 2.3+ and the new stack is >= HDP 2.3
stack_id: stack id, ie HDP-2.3
stack_version: version to set, ie 2.3.0.0-1234
"""
if stack_id is None:
Logger.info("Cannot create config links when stack_id is not defined")
return
args = stack_id.upper().split('-')
if len(args) != 2:
Logger.info("Unrecognized stack id {0}, cannot create config links".format(stack_id))
return
target_stack_version = args[1]
if not (target_stack_version and check_stack_feature(StackFeature.CONFIG_VERSIONING, target_stack_version)):
Logger.info("Configuration symlinks are not needed for {0}".format(stack_version))
return
for package_name, directories in conf_select.get_package_dirs().iteritems():
# if already on HDP 2.3, then we should skip making conf.backup folders
if self.current_stack_version_formatted and check_stack_feature(StackFeature.CONFIG_VERSIONING, self.current_stack_version_formatted):
conf_selector_name = stack_tools.get_stack_tool_name(stack_tools.CONF_SELECTOR_NAME)
Logger.info("The current cluster stack of {0} does not require backing up configurations; "
"only {1} versioned config directories will be created.".format(stack_version, conf_selector_name))
# only link configs for all known packages
conf_select.select(self.stack_name, package_name, stack_version, ignore_errors = True)
else:
# link configs and create conf.backup folders for all known packages
# this will also call conf-select select
conf_select.convert_conf_directories_to_symlinks(package_name, stack_version, directories,
skip_existing_links = False, link_to = "backup")
def compute_actual_version(self):
"""
After packages are installed, determine what the new actual version is.
"""
# If the repo contains a build number, optimistically assume it to be the actual_version. It will get changed
# to correct value if it is not
self.actual_version = None
self.repo_version_with_build_number = None
if self.repository_version:
m = re.search("[\d\.]+-\d+", self.repository_version)
if m:
# Contains a build number
self.repo_version_with_build_number = self.repository_version
self.structured_output['actual_version'] = self.repo_version_with_build_number # This is the best value known so far.
self.put_structured_out(self.structured_output)
Logger.info("Attempting to determine actual version with build number.")
Logger.info("Old versions: {0}".format(self.old_versions))
new_versions = get_stack_versions(self.stack_root_folder)
Logger.info("New versions: {0}".format(new_versions))
deltas = set(new_versions) - set(self.old_versions)
Logger.info("Deltas: {0}".format(deltas))
# Get version without build number
normalized_repo_version = self.repository_version.split('-')[0]
if 1 == len(deltas):
self.actual_version = next(iter(deltas)).strip()
self.structured_output['actual_version'] = self.actual_version
self.put_structured_out(self.structured_output)
write_actual_version_to_history_file(normalized_repo_version, self.actual_version)
Logger.info(
"Found actual version {0} by checking the delta between versions before and after installing packages".format(
self.actual_version))
else:
# If the first install attempt does a partial install and is unable to report this to the server,
# then a subsequent attempt will report an empty delta. For this reason, we search for a best fit version for the repo version
Logger.info("Cannot determine actual version installed by checking the delta between versions "
"before and after installing package")
Logger.info("Will try to find for the actual version by searching for best possible match in the list of versions installed")
self.actual_version = self.find_best_fit_version(new_versions, self.repository_version)
if self.actual_version is not None:
self.actual_version = self.actual_version.strip()
self.structured_output['actual_version'] = self.actual_version
self.put_structured_out(self.structured_output)
Logger.info("Found actual version {0} by searching for best possible match".format(self.actual_version))
else:
msg = "Could not determine actual version installed. Try reinstalling packages again."
raise Fail(msg)
def check_partial_install(self):
"""
If an installation did not complete successfully, check if installation was partially complete and
log the partially completed version to REPO_VERSION_HISTORY_FILE.
:return:
"""
Logger.info("Installation of packages failed. Checking if installation was partially complete")
Logger.info("Old versions: {0}".format(self.old_versions))
new_versions = get_stack_versions(self.stack_root_folder)
Logger.info("New versions: {0}".format(new_versions))
deltas = set(new_versions) - set(self.old_versions)
Logger.info("Deltas: {0}".format(deltas))
# Get version without build number
normalized_repo_version = self.repository_version.split('-')[0]
if 1 == len(deltas):
# Some packages were installed successfully. Log this version to REPO_VERSION_HISTORY_FILE
partial_install_version = next(iter(deltas)).strip()
write_actual_version_to_history_file(normalized_repo_version, partial_install_version)
Logger.info("Version {0} was partially installed. ".format(partial_install_version))
def find_best_fit_version(self, versions, repo_version):
"""
Given a list of installed versions and a repo version, search for a version that best fits the repo version
If the repo version is found in the list of installed versions, return the repo version itself.
If the repo version is not found in the list of installed versions
normalize the repo version and use the REPO_VERSION_HISTORY_FILE file to search the list.
:param versions: List of versions installed
:param repo_version: Repo version to search
:return: Matching version, None if no match was found.
"""
if versions is None or repo_version is None:
return None
build_num_match = re.search("[\d\.]+-\d+", repo_version)
if build_num_match and repo_version in versions:
# If repo version has build number and is found in the list of versions, return it as the matching version
Logger.info("Best Fit Version: Resolved from repo version with valid build number: {0}".format(repo_version))
return repo_version
# Get version without build number
normalized_repo_version = repo_version.split('-')[0]
# Find all versions that match the normalized repo version
match_versions = filter(lambda x: x.startswith(normalized_repo_version), versions)
if match_versions:
if len(match_versions) == 1:
# Resolved without conflicts
Logger.info("Best Fit Version: Resolved from normalized repo version without conflicts: {0}".format(match_versions[0]))
return match_versions[0]
# Resolve conflicts using REPO_VERSION_HISTORY_FILE
history_version = read_actual_version_from_history_file(normalized_repo_version)
# Validate history version retrieved is valid
if history_version in match_versions:
Logger.info("Best Fit Version: Resolved from normalized repo version using {0}: {1}".format(REPO_VERSION_HISTORY_FILE, history_version))
return history_version
# No matching version
return None
def install_packages(self, package_list):
"""
Actually install the packages using the package manager.
:param package_list: List of package names to install
:return: Returns 0 if no errors were found, and 1 otherwise.
"""
ret_code = 0
config = self.get_config()
agent_stack_retry_on_unavailability = cbool(config['hostLevelParams']['agent_stack_retry_on_unavailability'])
agent_stack_retry_count = cint(config['hostLevelParams']['agent_stack_retry_count'])
# Install packages
packages_were_checked = False
stack_selector_package = stack_tools.get_stack_tool_package(stack_tools.STACK_SELECTOR_NAME)
try:
Package(stack_selector_package,
action="upgrade",
retry_on_repo_unavailability=agent_stack_retry_on_unavailability,
retry_count=agent_stack_retry_count
)
packages_installed_before = []
allInstalledPackages(packages_installed_before)
packages_installed_before = [package[0] for package in packages_installed_before]
packages_were_checked = True
filtered_package_list = self.filter_package_list(package_list)
for package in filtered_package_list:
name = self.format_package_name(package['name'])
Package(name,
action="upgrade", # this enables upgrading non-versioned packages, despite the fact they exist. Needed by 'mahout' which is non-version but have to be updated
retry_on_repo_unavailability=agent_stack_retry_on_unavailability,
retry_count=agent_stack_retry_count
)
except Exception as err:
ret_code = 1
Logger.logger.exception("Package Manager failed to install packages. Error: {0}".format(str(err)))
# Remove already installed packages in case of fail
if packages_were_checked and packages_installed_before:
packages_installed_after = []
allInstalledPackages(packages_installed_after)
packages_installed_after = [package[0] for package in packages_installed_after]
packages_installed_before = set(packages_installed_before)
new_packages_installed = [package for package in packages_installed_after if package not in packages_installed_before]
if OSCheck.is_ubuntu_family():
package_version_string = self.repository_version.replace('.', '-')
else:
package_version_string = self.repository_version.replace('-', '_')
package_version_string = package_version_string.replace('.', '_')
for package in new_packages_installed:
if package_version_string and (package_version_string in package):
Package(package, action="remove")
if not verifyDependencies():
ret_code = 1
Logger.logger.error("Failure while verifying dependencies")
Logger.logger.error("*******************************************************************************")
Logger.logger.error("Manually verify and fix package dependencies and then re-run install_packages")
Logger.logger.error("*******************************************************************************")
# Compute the actual version in order to save it in structured out
try:
if ret_code == 0:
self.compute_actual_version()
else:
self.check_partial_install()
except Fail as err:
ret_code = 1
Logger.logger.exception("Failure while computing actual version. Error: {0}".format(str(err)))
return ret_code
def install_repository(self, url_info, append_to_file, template):
repo = {
'repoName': "{0}-{1}".format(url_info['name'], self.repository_version)
}
if not 'baseUrl' in url_info:
repo['baseurl'] = None
else:
repo['baseurl'] = url_info['baseUrl']
if not 'mirrorsList' in url_info:
repo['mirrorsList'] = None
else:
repo['mirrorsList'] = url_info['mirrorsList']
ubuntu_components = [url_info['name']] + self.UBUNTU_REPO_COMPONENTS_POSTFIX
file_name = self.stack_name + "-" + self.repository_version
Repository(repo['repoName'],
action = "create",
base_url = repo['baseurl'],
mirror_list = repo['mirrorsList'],
repo_file_name = file_name,
repo_template = template,
append_to_file = append_to_file,
components = ubuntu_components, # ubuntu specific
)
return repo['repoName'], file_name
def abort_handler(self, signum, frame):
Logger.error("Caught signal {0}, will handle it gracefully. Compute the actual version if possible before exiting.".format(signum))
self.check_partial_install()
def filter_package_list(self, package_list):
"""
Note: that we have skipUpgrade option in metainfo.xml to filter packages,
as well as condition option to filter them conditionally,
so use this method only if, for some reason the metainfo option cannot be used.
:param package_list: original list
:return: filtered package_list
"""
filtered_package_list = []
for package in package_list:
if Script.check_package_condition(package):
filtered_package_list.append(package)
return filtered_package_list
if __name__ == "__main__":
InstallPackages().execute()