blob: 14524cc28ddab5499937f21e16026f5675a659fc [file] [log] [blame]
#!/usr/bin/env python
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
import signal
import os
import re
import ambari_simplejson as json
from ambari_commons.os_check import OSCheck
from import RepoCallContext
from ambari_commons.str_utils import cbool, cint
from ambari_commons.repo_manager import ManagerFactory
from resource_management.core.exceptions import Fail
from resource_management.core.logger import Logger
from resource_management.libraries.functions import conf_select
from resource_management.libraries.functions import stack_tools
from resource_management.libraries.functions.stack_select import get_stack_versions
from resource_management.libraries.functions.repo_version_history \
import read_actual_version_from_history_file, write_actual_version_to_history_file, REPO_VERSION_HISTORY_FILE
from resource_management.core.resources.system import Link
from resource_management.libraries.functions import StackFeature
from resource_management.libraries.functions.repository_util import CommandRepository
from resource_management.libraries.functions.stack_features import check_stack_feature
from resource_management.libraries.script.script import Script
from resource_management.core import sudo
from resource_management.libraries.functions import lzo_utils
class InstallPackages(Script):
This script is a part of Rolling Upgrade workflow and is described at
appropriate design doc.
It installs repositories to the node and then installs packages.
For now, repositories are installed into individual files.
def __init__(self):
super(InstallPackages, self).__init__()
self.repo_mgr = ManagerFactory.get()
self.repo_files = {}
def actionexecute(self, env):
num_errors = 0
# Parse parameters
config = Script.get_config()
command_repository = CommandRepository(config['repositoryFile'])
except KeyError:
raise Fail("The command repository indicated by 'repositoryFile' was not found")
# Handle a SIGTERM and SIGINT gracefully
signal.signal(signal.SIGTERM, self.abort_handler)
signal.signal(signal.SIGINT, self.abort_handler)
self.repository_version = command_repository.version_string
# Select dict that contains parameters
package_list = json.loads(config['roleParams']['package_list'])
stack_id = config['roleParams']['stack_id']
except KeyError:
self.stack_name = Script.get_stack_name()
if self.stack_name is None:
raise Fail("Cannot determine the stack name")
self.stack_root_folder = Script.get_stack_root()
if self.stack_root_folder is None:
raise Fail("Cannot determine the stack's root directory")
if self.repository_version is None:
raise Fail("Cannot determine the repository version to install")
self.repository_version = self.repository_version.strip()
if not command_repository.items:
"Repository list is empty. Ambari may not be managing the repositories for {0}.".format(
"Will install packages for repository version {0}".format(self.repository_version))
new_repo_files = Script.repository_util.create_repo_files()
except Exception as err:
Logger.logger.exception("Cannot install repository files. Error: {0}".format(str(err)))
num_errors += 1
# Build structured output with initial values
self.structured_output = {
'package_installation_result': 'FAIL',
'repository_version_id': command_repository.version_id
# check package manager non-completed transactions
if self.repo_mgr.check_uncompleted_transactions():
num_errors += 1
except Exception as e: # we need to ignore any exception
Logger.warning("Failed to check for uncompleted package manager transactions: " + str(e))
if num_errors > 0:
raise Fail("Failed to distribute repositories/install packages")
# Initial list of versions, used to compute the new version installed
self.old_versions = get_stack_versions(self.stack_root_folder)
is_package_install_successful = False
ret_code = self.install_packages(package_list)
if ret_code == 0:
self.structured_output['package_installation_result'] = 'SUCCESS'
is_package_install_successful = True
num_errors += 1
except Exception as err:
num_errors += 1
Logger.logger.exception("Could not install packages. Error: {0}".format(str(err)))
except Exception as err:
num_errors += 1
Logger.logger.exception("Could not install LZO packages. Error: {0}".format(str(err)))
# Provide correct exit code
if num_errors > 0:
raise Fail("Failed to distribute repositories/install packages")
# if installing a version of HDP that needs some symlink love, then create them
if is_package_install_successful and 'actual_version' in self.structured_output:
self._relink_configurations_with_conf_select(stack_id, self.structured_output['actual_version'])
def _fix_default_links_for_current(self):
If a prior version of Ambari did not correctly reverse the conf symlinks, then they would
be put into a bad state when distributing a new stack. For example:
/etc/component/conf (directory)
<stack-root>/v1/component/conf -> /etc/component/conf
When distributing v2, we'd detect the /etc/component/conf problems and would try to adjust it:
/etc/component/conf -> <stack-root>/current/component/conf
<stack-root>/v2/component/conf -> /etc/component/v2/0
The problem is that v1 never gets changed (since the stack being distributed is v2), and
we end up with a circular link:
/etc/component/conf -> <stack-root>/current/component/conf
<stack-root>/v1/component/conf -> /etc/component/conf
:return: None
""""Attempting to fix any configuration symlinks which are not in the correct state")
from resource_management.libraries.functions import stack_select
restricted_packages = conf_select.get_restricted_packages()
if 0 == len(restricted_packages):"There are no restricted conf-select packages for this installation")
else:"Restricting conf-select packages to {0}".format(restricted_packages))
for package_name, directories in conf_select.get_package_dirs().iteritems():"Attempting to fix the default conf links for {0}".format(package_name))"The following directories will be fixed for {0}: {1}".format(package_name, str(directories)))
component_name = None
for directory_struct in directories:
if "component" in directory_struct:
component_name = directory_struct["component"]
if component_name:
stack_version = stack_select.get_stack_version_before_install(component_name)
Logger.warning("Unable to fix {0} since stack using outdated stack_packages.json".format(package_name))
if 0 == len(restricted_packages) or package_name in restricted_packages:
if stack_version:
conf_select.convert_conf_directories_to_symlinks(package_name, stack_version, directories)
"Unable to fix {0} since there is no known installed version for this component".format(package_name))
def _relink_configurations_with_conf_select(self, stack_id, stack_version):
Sets up the required structure for /etc/<component>/conf symlinks and <stack-root>/current
configuration symlinks IFF the current stack is < HDP 2.3+ and the new stack is >= HDP 2.3
stack_id: stack id, ie HDP-2.3
stack_version: version to set, ie
if stack_id is None:"Cannot create config links when stack_id is not defined")
args = stack_id.upper().split('-')
if len(args) != 2:"Unrecognized stack id {0}, cannot create config links".format(stack_id))
target_stack_version = args[1]
if not (target_stack_version and check_stack_feature(StackFeature.CONFIG_VERSIONING, target_stack_version)):"Configuration symlinks are not needed for {0}".format(stack_version))
# After upgrading hdf-select package from HDF-2.X to HDF-3.Y, we need to create this symlink
if self.stack_name.upper() == "HDF" \
and not sudo.path_exists("/usr/bin/conf-select") and sudo.path_exists("/usr/bin/hdfconf-select"):
Link("/usr/bin/conf-select", to="/usr/bin/hdfconf-select")
restricted_packages = conf_select.get_restricted_packages()
if 0 == len(restricted_packages):"There are no restricted conf-select packages for this installation")
else:"Restricting conf-select packages to {0}".format(restricted_packages))
for package_name, directories in conf_select.get_package_dirs().iteritems():
if 0 == len(restricted_packages) or package_name in restricted_packages:
conf_select.convert_conf_directories_to_symlinks(package_name, stack_version, directories)
def compute_actual_version(self):
After packages are installed, determine what the new actual version is.
# If the repo contains a build number, optimistically assume it to be the actual_version. It will get changed
# to correct value if it is not
self.actual_version = None
self.repo_version_with_build_number = None
if self.repository_version:
m ="[\d\.]+-\d+", self.repository_version)
if m:
# Contains a build number
self.repo_version_with_build_number = self.repository_version
self.structured_output['actual_version'] = self.repo_version_with_build_number # This is the best value known so far.
self.put_structured_out(self.structured_output)"Attempting to determine actual version with build number.")"Old versions: {0}".format(self.old_versions))
new_versions = get_stack_versions(self.stack_root_folder)"New versions: {0}".format(new_versions))
deltas = set(new_versions) - set(self.old_versions)"Deltas: {0}".format(deltas))
# Get version without build number
normalized_repo_version = self.repository_version.split('-')[0]
if 1 == len(deltas):
self.actual_version = next(iter(deltas)).strip()
self.structured_output['actual_version'] = self.actual_version
write_actual_version_to_history_file(normalized_repo_version, self.actual_version)
"Found actual version {0} by checking the delta between versions before and after installing packages".format(
# If the first install attempt does a partial install and is unable to report this to the server,
# then a subsequent attempt will report an empty delta. For this reason, we search for a best fit version for the repo version"Cannot determine actual version installed by checking the delta between versions "
"before and after installing package")"Will try to find for the actual version by searching for best possible match in the list of versions installed")
self.actual_version = self.find_best_fit_version(new_versions, self.repository_version)
if self.actual_version is not None:
self.actual_version = self.actual_version.strip()
self.structured_output['actual_version'] = self.actual_version
self.put_structured_out(self.structured_output)"Found actual version {0} by searching for best possible match".format(self.actual_version))
msg = "Could not determine actual version installed. Try reinstalling packages again."
raise Fail(msg)
def check_partial_install(self):
If an installation did not complete successfully, check if installation was partially complete and
log the partially completed version to REPO_VERSION_HISTORY_FILE.
""""Installation of packages failed. Checking if installation was partially complete")"Old versions: {0}".format(self.old_versions))
new_versions = get_stack_versions(self.stack_root_folder)"New versions: {0}".format(new_versions))
deltas = set(new_versions) - set(self.old_versions)"Deltas: {0}".format(deltas))
# Get version without build number
normalized_repo_version = self.repository_version.split('-')[0]
if 1 == len(deltas):
# Some packages were installed successfully. Log this version to REPO_VERSION_HISTORY_FILE
partial_install_version = next(iter(deltas)).strip()
write_actual_version_to_history_file(normalized_repo_version, partial_install_version)"Version {0} was partially installed. ".format(partial_install_version))
def find_best_fit_version(self, versions, repo_version):
Given a list of installed versions and a repo version, search for a version that best fits the repo version
If the repo version is found in the list of installed versions, return the repo version itself.
If the repo version is not found in the list of installed versions
normalize the repo version and use the REPO_VERSION_HISTORY_FILE file to search the list.
:param versions: List of versions installed
:param repo_version: Repo version to search
:return: Matching version, None if no match was found.
if versions is None or repo_version is None:
return None
build_num_match ="[\d\.]+-\d+", repo_version)
if build_num_match and repo_version in versions:
# If repo version has build number and is found in the list of versions, return it as the matching version"Best Fit Version: Resolved from repo version with valid build number: {0}".format(repo_version))
return repo_version
# Get version without build number
normalized_repo_version = repo_version.split('-')[0]
# Find all versions that match the normalized repo version
match_versions = filter(lambda x: x.startswith(normalized_repo_version), versions)
if match_versions:
if len(match_versions) == 1:
# Resolved without conflicts"Best Fit Version: Resolved from normalized repo version without conflicts: {0}".format(match_versions[0]))
return match_versions[0]
# Resolve conflicts using REPO_VERSION_HISTORY_FILE
history_version = read_actual_version_from_history_file(normalized_repo_version)
# Validate history version retrieved is valid
if history_version in match_versions:"Best Fit Version: Resolved from normalized repo version using {0}: {1}".format(REPO_VERSION_HISTORY_FILE, history_version))
return history_version
# No matching version
return None
def install_packages(self, package_list):
Actually install the packages using the package manager.
:param package_list: List of package names to install
:return: Returns 0 if no errors were found, and 1 otherwise.
ret_code = 0
config = self.get_config()
agent_stack_retry_on_unavailability = cbool(config['ambariLevelParams']['agent_stack_retry_on_unavailability'])
agent_stack_retry_count = cint(config['ambariLevelParams']['agent_stack_retry_count'])
# Install packages
packages_were_checked = False
packages_installed_before = []
stack_selector_package = stack_tools.get_stack_tool_package(stack_tools.STACK_SELECTOR_NAME)
# install the stack-selector; we need to supply the action as "upgrade" here since the normal
# install command will skip if the package is already installed in the system.
# This is required for non-versioned components, like stack-select, since each version of
# the stack comes with one. Also, scope the install by repository since we need to pick a
# specific repo that the stack-select tools are coming out of in case there are multiple
# patches installed
repositories = config['repositoryFile']['repositories']
command_repos = CommandRepository(config['repositoryFile'])
command_repos.items = [x for x in command_repos.items if not x.applicable_services]
repository_ids = [repository['repoId'] for repository in repositories]
repos_to_use = {}
for repo_id in repository_ids:
if repo_id in self.repo_files:
repos_to_use[repo_id] = self.repo_files[repo_id]
self.repo_mgr.upgrade_package(stack_selector_package, RepoCallContext(
packages_installed_before = self.repo_mgr.installed_packages()
packages_installed_before = [package[0] for package in packages_installed_before]
packages_were_checked = True
filtered_package_list = self.filter_package_list(package_list)
available_packages_in_repos = self.repo_mgr.get_available_packages_in_repos(command_repos)
except Exception:
available_packages_in_repos = []
installation_context = RepoCallContext(
for package in filtered_package_list:
name = self.get_package_from_available(package['name'], available_packages_in_repos)
# This enables upgrading non-versioned packages, despite the fact they exist.
# Needed by 'mahout' which is non-version but have to be updated
self.repo_mgr.upgrade_package(name, installation_context)
except Exception as err:
ret_code = 1
Logger.logger.error("Package Manager failed to install packages: {0}".format(str(err)))
# Remove already installed packages in case of fail
if packages_were_checked and packages_installed_before:
packages_installed_after = self.repo_mgr.installed_packages()
packages_installed_after = [package[0] for package in packages_installed_after]
packages_installed_before = set(packages_installed_before)
new_packages_installed = [package for package in packages_installed_after if package not in packages_installed_before]
if OSCheck.is_ubuntu_family():
package_version_string = self.repository_version.replace('.', '-')
package_version_string = self.repository_version.replace('-', '_')
package_version_string = package_version_string.replace('.', '_')
for package in new_packages_installed:
if package_version_string and (package_version_string in package):
self.repo_mgr.remove_package(package, RepoCallContext())
if not self.repo_mgr.verify_dependencies():
ret_code = 1
Logger.logger.error("Failure while verifying dependencies")
Logger.logger.error("Manually verify and fix package dependencies and then re-run install_packages")
# Compute the actual version in order to save it in structured out
if ret_code == 0:
except Fail as err:
ret_code = 1
Logger.logger.exception("Failure while computing actual version. Error: {0}".format(str(err)))
return ret_code
def abort_handler(self, signum, frame):
Logger.error("Caught signal {0}, will handle it gracefully. Compute the actual version if possible before exiting.".format(signum))
def filter_package_list(self, package_list):
Note: that we have skipUpgrade option in metainfo.xml to filter packages,
as well as condition option to filter them conditionally,
so use this method only if, for some reason the metainfo option cannot be used.
:param package_list: original list
:return: filtered package_list
filtered_package_list = []
for package in package_list:
if self.check_package_condition(package):
return filtered_package_list
if __name__ == "__main__":