blob: 3ca379863c93fde123989ceccf3c28fe60128265 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Line too long - pylint: disable=C0301
from contextlib import closing
import os
import platform
import shutil
import sys
import tarfile
try:
from gppylib import gplog
from gppylib.commands import gp
from gppylib.commands.base import Command, REMOTE, WorkerPool, ExecutionError
from gppylib.commands.unix import Scp
from gppylib.gpversion import GpVersion
from gppylib.mainUtils import ExceptionNoStackTraceNeeded
from gppylib.operations import Operation
from gppylib.operations.utils import RemoteOperation, ParallelOperation
from gppylib.operations.unix import CheckFile, CheckDir, MakeDir, RemoveFile, RemoveRemoteTree, RemoveRemoteFile, CheckRemoteDir, MakeRemoteDir, CheckRemoteFile, ListRemoteFilesByPattern, ListFiles, ListFilesByPattern
from gppylib.utils import TableLogger
import yaml
from yaml.scanner import ScannerError
except ImportError, ex:
sys.exit('Operation: Cannot import modules. Please check that you have sourced greenplum_path.sh. Detail: ' + str(ex))
logger = gplog.get_default_logger()
def dereference_symlink(path):
"""
MPP-15429: rpm is funky with symlinks...
During an rpm -e invocation, rpm mucks with the /usr/local/greenplum-db symlink.
From strace output, it appears that rpm tries to rmdir any directories it may have created during
package installation. And, in the case of our GPHOME symlink, rpm will actually try to unlink it.
To avoid this scenario, we perform all rpm actions against the "symlink dereferenced" $GPHOME.
"""
path = os.path.normpath(path)
if not os.path.islink(path):
return path
link = os.path.normpath(os.readlink(path))
if os.path.isabs(link):
return link
return os.path.join(os.path.dirname(path), link)
GPHOME = dereference_symlink(gp.get_gphome())
GPPKG_EXTENSION = '.gppkg'
SPECFILE_NAME = 'gppkg_spec.yml'
SPECFILE_REQUIRED_TAGS = ['pkgname', 'version', 'architecture', 'os', 'description', 'gpdbversion']
SPECFILE_OPTIONAL_TAGS = ['preinstall', 'postinstall', 'preuninstall', 'postuninstall']
# TODO: AK: Our interactions with the internal RPM database could benefit from an abstraction layer
# that hides the underlying commands used for installation, uninstallation, queries, etc.
RPM_DATABASE_PATH = 'share/packages/database'
RPM_DATABASE = os.path.join(GPHOME, RPM_DATABASE_PATH)
RPM_INSTALLATION_PATH = GPHOME
# TODO: AK: Our interactions with the archive could benefit from an abstraction layer
# that hides the implementations of archival, unarchival, queries, etc.
# That is, consider the query "is this package already archived?" Currently, this is implemented
# with a CheckFile. Rather, it should be a call to Archive.contains(package), where package
# is instanceof Gppkg.
ARCHIVE_PATH = 'share/packages/archive'
GPPKG_ARCHIVE_PATH = os.path.join(GPHOME, ARCHIVE_PATH)
# TODO: AK: Shouldn't this be "$GPHOME/.tmp"?
# i.e. what if remote host has its $GPHOME elsewhere?
TEMP_EXTRACTION_PATH = GPHOME + '/.tmp'
DEPS_DIR = 'deps'
class GpdbVersionError(Exception):
'''
Exception to notify that the gpdb version
does not match
'''
pass
class AlreadyInstalledError(Exception):
def __init__(self, package_name):
Exception.__init__(self, '%s is already installed.' % package_name)
class NotInstalledError(Exception):
def __init__(self, package_name):
Exception.__init__(self, '%s is not installed.' % package_name)
class BuildPkgError(Exception):
'''
Exception to notify that there was an error during
the building of a gppkg
'''
pass
class MissingDependencyError(Exception):
'''
Exception to catch missing dependency
'''
def __init__(self, value):
Exception.__init__(self, 'Dependency %s is missing' % value )
class OSCompatibilityError(Exception):
'''
Exception to notify that OS does not meet the
requirement
'''
def __init__(self, requiredos, foundos):
Exception.__init__(self, '%s OS required. %s OS found' % (requiredos, foundos))
class ArchCompatibilityError(Exception):
'''
Exception to notify that architecture does not meet
the requirement
'''
def __init__(self, requiredarch, foundarch):
Exception.__init__(self, '%s Arch required. %s Arch found' % (requiredarch, foundarch))
class RequiredDependencyError(Exception):
'''
Exception to notify that the package being uninstalled
is a dependency for another package
'''
pass
class Gppkg:
'''
This class stores all the information about a gppkg
'''
def __init__(self, pkg, pkgname, main_rpm, version, architecture, os, gpdbversion, description, abspath, preinstall, postinstall, preuninstall, postuninstall, dependencies, file_list):
'''
The constructor takes the following arguments
pkg The complete package name e.g pgcrypto-1.0-Darwin-i386.gppkg TODO: AK: This is an awful variable name. Change to "package_filename".
pkgname The name of the package as specified in the spec file
main_rpm The name of the main rpm. e.g PL/R, PostGIS etc
version The version of the gppkg
architecture The architecture for which the package is built
os The operating system for which the package is built
gpdbversion The Greenplum Database version for which package is built
description A short description for the package
abspath This is the absolute path where the package sits on the host
preinstall The cluster level preinstallation hooks
postinstall The cluster level postinstallation hooks
preuninstall The cluster level preuninstallation hooks
postuninstall The cluster level postuninstallation hooks
dependencies The dependencies of the package. e.g Geos, Proj in case of PostGIS
file_list The list of files present in the package
'''
logger.debug('Gppkg Constructor')
self.pkg = pkg
self.pkgname = pkgname
self.main_rpm = main_rpm
self.version = version
self.architecture = architecture
self.os = os
self.gpdbversion = gpdbversion
self.description = description
self.abspath = abspath
self.preinstall = preinstall
self.postinstall = postinstall
self.preuninstall = preuninstall
self.postuninstall = postuninstall
self.dependencies = dependencies
self.file_list = file_list
@staticmethod
def from_package_path(pkg_path):
'''
This method takes a package as the argument and
obtains all the information about the package
Details include name, arch, OS, version, description, dependencies,
list of files present in the package and returns a gppkg object
'''
logger.debug('from_package_path')
if not os.path.exists(pkg_path):
logger.error('Cannot find package %s' % pkg_path)
raise IOError
#We check for a directory first because
#is_tarfile does not accept directories as path names
if os.path.isdir(pkg_path):
logger.error('%s is a directory !' % pkg_path)
raise IOError
if not tarfile.is_tarfile(pkg_path) or not pkg_path.endswith(GPPKG_EXTENSION):
logger.error('%s is Not a valid package' % pkg_path)
raise IOError
if os.path.getsize(pkg_path) == 0:
logger.error('Package is empty')
raise IOError
pkg = {}
# XXX: AK: It's purely coincidence that the optional tags are lists.
for tag in SPECFILE_REQUIRED_TAGS:
pkg[tag] = ''
for tag in SPECFILE_OPTIONAL_TAGS:
pkg[tag] = []
pkg['file_list'] = []
pkg['dependencies'] = []
with closing(tarfile.open(pkg_path, 'r:gz')) as tarinfo:
#store the list of all files present in the archive
archive_list = tarinfo.getnames()
pkg["file_list"] = archive_list
#The spec file has to be called gppkg_spec
#so there will only be one such file,
#so we dont need to worry about the loop
#overwriting the 'specfile' variable with different values
for cur_file in archive_list:
if cur_file.endswith(SPECFILE_NAME):
specfile = tarinfo.extractfile(cur_file)
yamlfile = yaml.load(specfile)
keys = yamlfile.keys()
#store all the tags
for key in keys:
pkg[key.lower()] = yamlfile[key]
#update the pkgpath
pkg['pkg'] = os.path.split(pkg_path)[-1]
#make the version as string
pkg['version'] = str(pkg['version'])
#Convert the required version to a GpVersion
pkg['gpdbversion'] = GpVersion(str(pkg['gpdbversion']))
#update the absolute path
pkg['abspath'] = pkg_path
#store all the dependencies of the gppkg
for cur_file in archive_list:
if cur_file.find('deps/') != -1 and cur_file.endswith('.rpm'):
pkg['dependencies'].append(cur_file[cur_file.rfind('/') + 1:])
#store the main rpm
for cur_file in archive_list:
if cur_file.find('deps/') == -1 and cur_file.endswith('.rpm'):
pkg['main_rpm'] = cur_file
gppkg = Gppkg(**pkg)
return gppkg
class LocalCommand(Operation):
'''
DEPRECATED
TODO: AK: Eliminate this. Replace invocations with Command(...).run(validateAfter = True)
'''
def __init__(self, cmd_str, echo = False):
self.cmd_str = cmd_str
self.echo = echo
def execute(self):
logger.debug(self.cmd_str)
cmd = Command(name = 'LocalCommand', cmdStr = self.cmd_str)
cmd.run(validateAfter = True)
if self.echo:
echo_str = cmd.get_results().stdout.strip()
if echo_str:
logger.info(echo_str)
return cmd.get_results()
class RemoteCommand(Operation):
"""
DEPRECATED
TODO: AK: Rename as GpSsh, like GpScp below.
"""
def __init__(self, cmd_str, host_list):
self.cmd_str = cmd_str
self.host_list = host_list
def execute(self):
logger.debug(self.cmd_str)
# Create Worker pool
# and add commands to it
pool = WorkerPool()
for host in self.host_list:
cmd = Command(name = 'Remote Command', cmdStr = self.cmd_str, ctxt = REMOTE, remoteHost = host)
pool.addCommand(cmd)
pool.join()
pool.haltWork()
#This will raise ExecutionError exception if even a single command fails
pool.check_results()
class ListPackages(Operation):
'''
Lists all the packages present in
$GPHOME/share/packages/archive
'''
def __init__(self):
pass
def execute(self):
# Ensure archive path exists
# TODO: AK: In hindsight, this should've been named MakeDirP,
# to reflect that it won't blow up if the path already exists.
MakeDir(GPPKG_ARCHIVE_PATH).run()
package_list = ListFilesByPattern(GPPKG_ARCHIVE_PATH, '*' + GPPKG_EXTENSION).run()
package_name_list = []
for pkg in package_list:
pkg_name = pkg.split('/')[-1]
package_name_list.append(pkg_name[:pkg_name.index('-', pkg_name.index('-') + 1)])
return package_name_list
class CleanupDir(Operation):
'''
Cleans up the given dir
Returns True if either the dir is already removed
or if we were able to remove the dir successfully
False for other errors
'''
def __init__(self, dir_path):
self.dir_path = dir_path
def execute(self):
dir_path = self.dir_path
logger.debug('Cleaning up %s' % dir_path)
#If file does not exist, nothing to remove
#So we return true
if not os.path.exists(dir_path):
return True
if os.path.isdir(dir_path):
shutil.rmtree(dir_path)
else:
return False
return True
class IsVersionCompatible(Operation):
'''
Returns True if the gppkg is compatible
with the gpdb version that has been installed
'''
def __init__(self, gppkg):
self.gppkg = gppkg
def execute(self):
gppkg = self.gppkg
gpdb_version = self._get_hawq_version()
required_gpdb_version = gppkg.gpdbversion
logger.debug('Greenplum Database Version = %s' % gpdb_version)
logger.debug('Required Greenplum Database version = %s' % required_gpdb_version)
if gpdb_version is None:
logger.error('Could not determine Greenplum Database version')
return False
print "Installed GPDB Version: {0}".format(gpdb_version)
if not required_gpdb_version.isVersionRelease(gpdb_version):
logger.error('%s requires Greenplum Database version %s' % (gppkg.pkgname, required_gpdb_version))
return False
return True
def _get_gpdb_version(self):
'''
Get the version of the current GPDB
Returns a string consisting of the major
release version
'''
logger.debug('_get_gpdb_version')
self.gphome = gp.get_gphome()
version = gp.GpVersion.local('local GP software version check', self.gphome)
gpdb_version = GpVersion(version.strip())
return gpdb_version
def _get_hawq_version(self):
binary = os.path.join(GPHOME, 'bin', 'pg_ctl')
cmdstr = '{0} --hawq-version'.format(binary)
cmd = Command('Get hawq version', cmdstr)
cmd.run(validateAfter = True)
return cmd.results.stdout.strip()
class ValidateInstallPackage(Operation):
"""
Ensure that the given rpms can be installed safely. This is accomplished mainly
through use of rpm --test, which will have one of a few outcomes:
1) A return code of 0, indicating the installation should proceed smoothly
2) A non-zero return code, and stderr indicating some of the rpms are already installed.
We simply omit such rpms from the returned list of rpms, indicating to the caller
that to be successful, installation should only be attempted on the filtered list of rpms.
3) A non-zero return code, and stderr indicating that a failed dependency issue will arise.
This scenario must result in a MissingDependencyError.
Note: install and update share this code, because there is extensive commonality in regards
to the version, os, arch. checking, in addition to the 3 code paths enumerated just above.
Lastly, for an edge case, if we determine that all of the relevant rpms are currently installed
*and* the archive package already exists we declare the package is already installed.
TODO: This is depending on ExtractPackage having put the dependencies in this same directory.
TODO: Use regexes for more reliable string matching. CR-2865#c20112
"""
def __init__(self, gppkg, is_update = False):
self.gppkg = gppkg
self.is_update = is_update
def execute(self):
#Check the GPDB requirements
if not IsVersionCompatible(self.gppkg).run():
raise GpdbVersionError
# TODO: AK: I've changed our use of the OS tag from 'Linux' to 'rhel5' or 'suse10'.
# So, the two lines below will not work properly.
#if self.gppkg.os.lower() != platform.system().lower():
# raise OSCompatibilityError(self.gppkg.os, platform.system().lower())
#architecture compatibility
if self.gppkg.architecture.lower() != platform.machine().lower():
raise ArchCompatibilityError(self.gppkg.architecture, platform.machine().lower())
rpm_set = set([self.gppkg.main_rpm] + self.gppkg.dependencies)
rpm_install_string = ' '.join([os.path.join(TEMP_EXTRACTION_PATH, rpm) for rpm in rpm_set])
if self.is_update:
rpm_install_command = 'rpm --test --nodeps -U %s --dbpath %s --prefix %s' % (rpm_install_string, RPM_DATABASE, RPM_INSTALLATION_PATH)
else:
rpm_install_command = 'rpm --test --nodeps -i %s --dbpath %s --prefix %s' % (rpm_install_string, RPM_DATABASE, RPM_INSTALLATION_PATH)
cmd = Command('Validating rpm installation', rpm_install_command)
logger.info(cmd) # TODO: AK: This should be debug(), but RMI cannot propagate a log level.
try:
cmd.run(validateAfter = True)
except ExecutionError, e:
lines = e.cmd.get_results().stderr.splitlines()
# Forking between code paths 2 and 3 depends on some meaningful stderr
# Without such stderr, we must bubble up the ExecutionError.
if len(lines) == 0:
raise
if 'failed dependencies' in lines[0].lower():
# Code path 3 (see docstring)
# example stderr:
# error: Failed dependencies:
# geos-3.2.2-1.x86_64.rpm is needed by postgis-1.0-1.x86_64
# TODO: AK: Dependencies should be parsed out here and used to initialize
# this MissingDependencyError. However, this exception does not support
# multiple missing dependencies. Some refactoring work is needed in both places.
logger.error(e.cmd.get_results().stderr)
raise MissingDependencyError('')
# Code path 2, possibly (see docstring)
# example stderr:
# package geos-3.2.2-1.x86_64 is already installed
# package proj-4.7.0-1.x86_64 is already installed
# package postgis-1.0-1.x86_64 is already installed
for line in lines:
if 'already installed' in line.lower():
package_name = line.split()[1]
rpm_name = "%s.rpm" % package_name
rpm_set.remove(rpm_name)
else:
# This is unexpected, so bubble up the ExecutionError.
raise
# MPP-14359 - installation and uninstallation prechecks must also consider
# the archive. That is, if a partial installation had added all rpms
# but failed to add the archive package, then for our purposes, we consider
# the package not yet installed and still in need of InstallPackageLocally.
archive_package_exists = CheckFile(os.path.join(GPPKG_ARCHIVE_PATH, self.gppkg.pkg)).run()
package_already_installed = (not rpm_set) and archive_package_exists
if package_already_installed:
raise AlreadyInstalledError(self.gppkg.pkg)
# Code path 1 (See docstring)
return rpm_set
class ValidateUninstallPackage(Operation):
"""
Ensure that the given rpms can be uninstalled safely. This is accomplished mainly
through use of rpm --test, which will have one of a few outcomes:
1) A return code of 0, indicating the uninstallation should proceed smoothly
2) A non-zero return code, and stderr indicating some of the rpms are already uninstalled.
We simply omit such rpms from the returned list of rpms, indicating to the caller
that to be successful, uninstallation should only be attempted on the filtered list of rpms.
3) A non-zero return code, and stderr indicating that dependencies remain.
Lastly, for an edge case, if we determine that none of the relevant rpms are currently installed
*and* the archive package does not exist, we declare the package is not installed.
TODO: Use regexes for more reliable string matching.
"""
def __init__(self, gppkg):
self.gppkg = gppkg
def execute(self):
rpm_list = [self.gppkg.main_rpm] + self.gppkg.dependencies
def strip_extension_and_arch(filename):
# expecting filename of form %{name}-%{version}-%{release}.%{arch}.rpm
rest, ext = os.path.splitext(filename)
rest, arch = os.path.splitext(rest)
return rest
rpm_set = set([strip_extension_and_arch(rpm) for rpm in rpm_list])
rpm_uninstall_string = ' '.join(rpm_set)
rpm_uninstall_command = 'rpm --test -e %s --dbpath %s' % (rpm_uninstall_string, RPM_DATABASE)
cmd = Command('Validating rpm uninstallation', rpm_uninstall_command)
logger.info(cmd) # TODO: AK: This should be debug(), but RMI cannot propagate a log level.
try:
cmd.run(validateAfter = True)
except ExecutionError, e:
lines = e.cmd.get_results().stderr.splitlines()
# Forking between code paths 2 and 3 depends on some meaningful stderr
# Without such stderr, we must bubble up the ExecutionError.
if len(lines) == 0:
raise
if 'failed dependencies' in lines[0].lower():
# Code path 3 (see docstring)
# example stderr:
# error: Failed dependencies:
# jre = 1.6.0_26 is needed by (installed) gphdfs-1.1-1.x86_64
self.resolve_shared_dependencies(rpm_set, lines[1:])
else:
# Code path 2, possibly (see docstring)
# example stderr:
# error: package postgis-1.0-1.x86_64 is not installed
# error: package proj-4.7.0-1.x86_64 is not installed
# error: package geos-3.2.2-1.x86_64 is not installed
for line in lines:
if 'not installed' in line.lower():
package_name = line.split()[2]
rpm_set.remove(package_name)
else:
# This is unexpected, so bubble up the ExecutionError.
raise
# MPP-14359 - installation and uninstallation prechecks must also consider
# the archive. That is, if a partial uninstallation had removed all rpms
# but failed to remove the archive package, then for our purposes, we consider
# the package installed and still in need of UninstallPackageLocally.
archive_package_exists = CheckFile(os.path.join(GPPKG_ARCHIVE_PATH, self.gppkg.pkg)).run()
package_not_installed = (not rpm_set) and (not archive_package_exists)
if package_not_installed:
raise NotInstalledError(self.gppkg.pkg)
# Code path 1 (See docstring)
return rpm_set
def resolve_shared_dependencies(self, rpm_set, dependency_lines):
"""
This is a very naive resolution to shared dependencies. (See code path #3 in ValidateUninstallPackage.execute)
Among the rpms we attempt to remove from the system, a subset cannot be
removed during this particular gppkg uninstallation, because their removal would violate
the dependency constraints of other rpms that remain in the system; we simply leave these culprit rpm(s) behind.
More specifically, the preceding rpm --test -e command has given us the violated *capabilities*. For each *capability*,
we query the rpm database with --whatprovides to discern the culprit rpm(s).
In simpler terms, consider this example:
pljava depends on jre, which its gppkg contains
gphdfs depends on jre, which its gppkg contains
install the gppkgs for both pljava and gphdfs
uninstall pljava gppkg
we internally attempt to "rpm -e" the jre rpm, hitting the gphdfs dependency error here involving "jre = 1.6"
we determine that the jre rpm is responsible for *providing* "jre = 1.6"
so, we ultimately omit the jre rpm from our "rpm -e" and move on
TODO: AK: A more robust version of this function would ensure that the remaining
rpms are, in fact, bound by a remaining gppkg. We defer this responsibility for now because gppkgs
should not have external dependencies. That is, no package should have requirements on rpms
not contained in its own gppkg distro. So, it's safe to assume that if foo is a culprit rpm, there exists
some gppkg bar that internally contains foo. (I realize that, with time, this will not be a scalable requirement
for gppkgs... hence the TODO.)
@type rpm_set: set
@param rpm_set: rpms being uninstalled, among which there exists an rpm
whose removal violates the dependencies of remaining rpms
@type dependency_lines: list
@param dependency_lines: lines produced from the stderr in
code path #3 in ValidateUninstallPackage.execute
ex: [" jre >= 1.6.0_26 is needed by (installed) gphdfs-1.1-1.x86_64"]
"""
for dependency_line in dependency_lines:
violated_capability = dependency_line.split()[0] # e.g. "jre"
cmd = Command('Discerning culprit rpms for %s' % violated_capability,
'rpm -q --whatprovides %s --dbpath %s' % (violated_capability, RPM_DATABASE))
cmd.run(validateAfter = True)
culprit_rpms = set(cmd.get_results().stdout.splitlines())
rpm_set -= culprit_rpms
class ExtractPackage(Operation):
"""
Extract the contents of the package into the temp folder
TODO: AK: Extraction should be implemented as a context manager.
"""
def __init__(self, gppkg):
self.gppkg = gppkg
def execute(self):
#clean up tmp extraction folder
if os.path.exists(TEMP_EXTRACTION_PATH) and not CleanupDir(TEMP_EXTRACTION_PATH).run():
logger.error('Could not clean temp folder')
raise IOError
#untar the package into tmp folder
with closing(tarfile.open(self.gppkg.abspath)) as tarinfo:
tarinfo.extractall(TEMP_EXTRACTION_PATH)
#move all the deps into same folder as the main rpm
path = os.path.join(TEMP_EXTRACTION_PATH, DEPS_DIR)
if os.path.exists(path):
for cur_file in os.listdir(path):
shutil.move(os.path.join(TEMP_EXTRACTION_PATH, DEPS_DIR, cur_file), TEMP_EXTRACTION_PATH)
class InstallPackageLocally(Operation):
"""
Installs a package on the local host
This operation must take a slew of starting conditions and drive the state
of the local machine towards the ending state, in which the given package is successfully
installed, the rpm database is sane, and the package resides in the designated archive.
To that end, we indiscriminately squash AlreadyInstalledErrors arising from ValidateInstallPackage,
because in this context, it's not an exception, but rather an indication of our desired ending
conditions.
We must consider the following scenarios and more: package was deleted from archive,
the main comprising rpm was uninstalled, dependent rpms were removed, the rpm database was
corrupted, etc.
Again, much like ValidateInstallPackages, we make cheap reuse of this code for the purposes
of an --update as there is considerable commonality.
"""
def __init__(self, package_path, is_update = False):
self.package_path = package_path
self.is_update = is_update
def execute(self):
current_package_location = self.package_path
package_name = os.path.basename(current_package_location)
logger.info('Installing %s locally' % package_name)
final_package_location = os.path.join(GPPKG_ARCHIVE_PATH, package_name)
gppkg = Gppkg.from_package_path(current_package_location)
ExtractPackage(gppkg).run()
# squash AlreadyInstalledError here: the caller doesn't ever need to
# know that we didn't have to do anything here
try:
rpm_set = ValidateInstallPackage(gppkg, is_update = self.is_update).run()
except AlreadyInstalledError, e:
logger.info(e)
return
if rpm_set:
if self.is_update:
rpm_install_command = 'rpm -U --nodeps %s --dbpath %s --prefix=%s'
else:
rpm_install_command = 'rpm -i --nodeps %s --dbpath %s --prefix=%s'
rpm_install_command = rpm_install_command % \
(" ".join([os.path.join(TEMP_EXTRACTION_PATH, rpm) for rpm in rpm_set]),
RPM_DATABASE,
RPM_INSTALLATION_PATH)
cmd = Command('Installing rpms', rpm_install_command)
logger.info(cmd)
cmd.run(validateAfter = True)
# TODO: AK: MPP-15568
# TODO: AK: abstraction layer for archive interactions... to hide use of shutil.copy, RemoveFile, etc.
MakeDir(GPPKG_ARCHIVE_PATH).run()
shutil.copy(current_package_location, final_package_location)
logger.info("Completed local installation of %s." % package_name)
class UninstallPackageLocally(Operation):
"""
Uninstalls a package on the local host
This operation must take a slew of starting conditions and drive the state
of the local machine towards the ending state, in which the given package is successfully
uninstalled, the rpm database is sane, and the package is removed from the archive.
To that end, we indiscriminately squash NotInstalledErrors arising from ValidateUninstallPackage,
because in this context, it's not an exception, but rather an indication of our desired ending
conditions.
We must consider the following scenarios and more: package was deleted from archive,
the main comprising rpm was uninstalled, dependent rpms were removed, the rpm database was
corrupted, etc.
"""
def __init__(self, package_name):
self.package_name = package_name
def execute(self):
# TODO: AK: MPP-15737 - we're entirely dependent on the package residing in the archive
current_package_location = os.path.join(GPPKG_ARCHIVE_PATH, self.package_name)
gppkg = Gppkg.from_package_path(current_package_location)
# squash NotInstalledError here: the caller doesn't ever need to
# know that we didn't have to do anything here
try:
rpm_set = ValidateUninstallPackage(gppkg).run()
except NotInstalledError, e:
logger.info(e)
return
if rpm_set:
rpm_uninstall_command = 'rpm -e %s --dbpath %s' % (" ".join(rpm_set), RPM_DATABASE)
cmd = Command('Uninstalling rpms', rpm_uninstall_command)
logger.info(cmd)
cmd.run(validateAfter = True)
# TODO: AK: abstraction layer for archive interactions... to hide use of shutil.copy, RemoveFile, etc.
MakeDir(GPPKG_ARCHIVE_PATH).run()
RemoveFile(current_package_location).run()
logger.info("Completed local uninstallation of %s." % self.package_name)
class SyncPackages(Operation):
"""
Synchronizes packages from master to a remote host
TODO: AK: MPP-15568
"""
def __init__(self, host):
self.host = host
def execute(self):
if not CheckDir(GPPKG_ARCHIVE_PATH).run():
MakeDir(GPPKG_ARCHIVE_PATH).run()
if not CheckRemoteDir(GPPKG_ARCHIVE_PATH, self.host).run():
MakeRemoteDir(GPPKG_ARCHIVE_PATH, self.host).run()
# set of packages on the master
master_package_set = set(ListFilesByPattern(GPPKG_ARCHIVE_PATH, '*' + GPPKG_EXTENSION).run())
# set of packages on the remote host
remote_package_set = set(ListRemoteFilesByPattern(GPPKG_ARCHIVE_PATH, '*' + GPPKG_EXTENSION, self.host).run())
# packages to be uninstalled on the remote host
uninstall_package_set = remote_package_set - master_package_set
# packages to be installed on the remote host
install_package_set = master_package_set - remote_package_set
if not install_package_set and not uninstall_package_set:
logger.info('The packages on %s are consistent.' % self.host)
return
if install_package_set:
logger.info('The following packages will be installed on %s: %s' % (self.host, ', '.join(install_package_set)))
for package in install_package_set:
logger.debug('copying %s to %s' % (package, self.host))
dstFile = os.path.join(GPHOME, package)
Scp(name = 'copying %s to %s' % (package, self.host),
srcFile = os.path.join(GPPKG_ARCHIVE_PATH, package),
dstFile = dstFile,
dstHost = self.host).run(validateAfter = True)
RemoteOperation(InstallPackageLocally(dstFile), self.host).run()
RemoveRemoteFile(dstFile, self.host).run()
if uninstall_package_set:
logger.info('The following packages will be uninstalled on %s: %s' % (self.host, ', '.join(uninstall_package_set)))
for package in uninstall_package_set:
RemoteOperation(UninstallPackageLocally(package), self.host).run()
class InstallPackage(Operation):
def __init__(self, gppkg, master_host, standby_host, segment_host_list):
self.gppkg = gppkg
self.master_host = master_host
self.standby_host = standby_host
self.segment_host_list = segment_host_list
def execute(self):
logger.info('Installing package %s' % self.gppkg.pkg)
# TODO: AK: MPP-15736 - precheck package state on master
ExtractPackage(self.gppkg).run()
ValidateInstallPackage(self.gppkg).run()
# perform any pre-installation steps
PerformHooks(hooks = self.gppkg.preinstall,
master_host = self.master_host,
standby_host = self.standby_host,
segment_host_list = self.segment_host_list).run()
# distribute package to segments
srcFile = self.gppkg.abspath
dstFile = os.path.join(GPHOME, self.gppkg.pkg)
GpScp(srcFile, dstFile, self.segment_host_list).run()
# install package on segments
HostOperation(InstallPackageLocally(dstFile), self.segment_host_list).run()
# install package on standby
if self.standby_host:
Scp(name = 'copying %s to %s' % (srcFile, self.standby_host),
srcFile = srcFile,
dstFile = dstFile,
dstHost = self.standby_host).run(validateAfter = True)
RemoteOperation(InstallPackageLocally(dstFile), self.standby_host).run()
# install package on master
InstallPackageLocally(srcFile).run()
# perform any post-installation steps
PerformHooks(hooks = self.gppkg.postinstall,
master_host = self.master_host,
standby_host = self.standby_host,
segment_host_list = self.segment_host_list).run()
logger.info('%s successfully installed.' % (self.gppkg.pkg))
class PerformHooks(Operation):
def __init__(self, hooks, master_host, standby_host, segment_host_list):
"""
Performs steps that have been specified in the yaml file for a particular
stage of gppkg execution
TODO: AK: A packager may have added commands to their hooks, with the
assumption that the current working directory would be that which contains
the spec file, rpms, and other artifacts (external scripts, perhaps.) To support
this, these commands should be prefixed with a "cd".
TODO: AK: I'm adding master_host for consistency.
But, why would we ever need master_host? We're on the master host!
"""
self.hooks = hooks
self.master_host = master_host
self.standby_host = standby_host
self.segment_host_list = segment_host_list
def execute(self):
if self.hooks is None:
return
for hook in self.hooks:
key = hook.keys()
if key is None:
return
key_str = key[0]
if key_str.lower() == 'master':
if self.standby_host:
RemoteCommand(hook[key_str], [self.standby_host]).run()
LocalCommand(hook[key_str], True).run()
elif key_str.lower() == 'segment':
RemoteCommand(hook[key_str], self.segment_host_list).run()
class UninstallPackage(Operation):
def __init__(self, gppkg, master_host, standby_host, segment_host_list):
self.gppkg = gppkg
self.master_host = master_host
self.standby_host = standby_host
self.segment_host_list = segment_host_list
def execute(self):
logger.info('Uninstalling package %s' % self.gppkg.pkg)
# TODO: AK: MPP-15736 - precheck package state on master
ExtractPackage(self.gppkg).run()
ValidateUninstallPackage(self.gppkg).run()
# perform any pre-uninstallation steps
PerformHooks(hooks = self.gppkg.preuninstall,
master_host = self.master_host,
standby_host = self.standby_host,
segment_host_list = self.segment_host_list).run()
# uninstall on segments
HostOperation(UninstallPackageLocally(self.gppkg.pkg), self.segment_host_list).run()
if self.standby_host:
RemoteOperation(UninstallPackageLocally(self.gppkg.pkg), self.standby_host).run()
UninstallPackageLocally(self.gppkg.pkg).run()
# perform any pre-installation steps
PerformHooks(hooks = self.gppkg.postuninstall,
master_host = self.master_host,
standby_host = self.standby_host,
segment_host_list = self.segment_host_list).run()
logger.info('%s successfully uninstalled.' % self.gppkg.pkg)
class QueryPackage(Operation):
INFO, LIST, ALL = range(3)
def __init__(self, query_type, package_path):
self.query_type = query_type
self.package_path = package_path
def execute(self):
if self.query_type == QueryPackage.INFO:
def package_details(p):
yield 'Name', p.pkgname
yield 'Version', p.version
yield 'Architecture', p.architecture
yield 'OS', p.os
yield 'GPDBVersion', str(p.gpdbversion)
yield 'Description', p.description
def print_package_info(package):
tabLog = TableLogger()
for name, value in package_details( package ):
tabLog.info([name, value])
tabLog.outputTable()
package = Gppkg.from_package_path(self.package_path)
print_package_info( package )
elif self.query_type == QueryPackage.LIST:
package = Gppkg.from_package_path(self.package_path)
for file in package.file_list:
print file
elif self.query_type == QueryPackage.ALL:
package_name_list = ListPackages().run()
for package_name in package_name_list:
print package_name
else:
package = Gppkg.from_package_path(self.package_path)
try:
ExtractPackage(package).run()
ValidateInstallPackage(package).run()
except AlreadyInstalledError:
print '%s is installed.' % package.pkgname
else:
print '%s is not installed.' % package.pkgname
class BuildGppkg(Operation):
'''
Builds a gppkg given a directory containing
the spec file, rpms and any pre/post installation scripts
'''
def __init__(self, directory):
self.directory = directory
def execute(self):
directory = self.directory
logger.info('Building gppkg')
#Check if the directory is valid
if not os.path.exists(directory) or not os.path.isdir(directory):
logger.error('%s is an Invalid directory' % directory)
raise BuildPkgError
filelist = os.listdir(directory)
#Check for the spec file
specfile = directory + '/' + SPECFILE_NAME
if not os.path.exists(specfile):
logger.error(' Spec file does not exist')
raise BuildPkgError
#parse the spec file and get the name, version and arch
#this is used to name the gppkg
pkg_path_details = self._get_package_name_details(specfile)
if pkg_path_details is None:
raise BuildPkgError
#The file already exists. Rewrite the original with the new one
pkg = pkg_path_details['pkgname'] + '-' + str(pkg_path_details['version']) + '-' + pkg_path_details['os'] + '-' + pkg_path_details['architecture'] + GPPKG_EXTENSION
if os.path.exists(pkg):
os.remove(pkg)
#Verify the spec file
if not self._verify_specfile(specfile, directory):
raise BuildPkgError
#tar and gzip the directory
#rename the file with .gppkg extension
with closing(tarfile.open(pkg, 'w:gz')) as tarinfo:
for cur_file in filelist:
tarinfo.add(name = os.path.join(directory, cur_file),
arcname = cur_file)
logger.info('Completed building gppkg')
def _get_package_name_details(self, specfile):
'''
Get details about the name, version, operating system, architecture
of the package. The final gppkg which will be created
will be named as <name>-<version>-<os>-<arch>.gppkg
'''
logger.debug('_get_package_name_details')
cur_file = None
with open(specfile) as cur_file:
yamlfile = yaml.load(cur_file)
tags = yamlfile.keys()
pkg_path_details = {}
#return all the required tags as a dict
for tag in tags:
if tag.lower() in SPECFILE_REQUIRED_TAGS:
pkg_path_details[tag.lower()] = yamlfile[tag]
return pkg_path_details
def _verify_specfile(self, specfile, directory):
'''
Reads the spec file and makes sure that the tags are correct.
'''
logger.debug('_verify_specfile')
cur_file = None
try:
with open(specfile) as cur_file:
yamlfile = yaml.load(cur_file)
if not self._verify_tags(yamlfile):
return False
return True
except ScannerError, ex:
return False
def _verify_tags(self, yamlfile):
'''
Verify that the tags are valid.
Returns true if all tags are valid
False otherwise
'''
logger.debug('_verify_tags')
tags = yamlfile.keys()
tags = [tag.lower() for tag in tags]
#check required tags
for required_tag in SPECFILE_REQUIRED_TAGS:
if required_tag not in tags:
logger.error(' Required tag %s missing in Spec file' % required_tag)
return False
#check for invalid tags
for tag in tags:
if tag not in SPECFILE_OPTIONAL_TAGS and tag not in SPECFILE_REQUIRED_TAGS:
logger.error(' Invalid tag %s in Spec file' % tag)
return False
return True
class UpdatePackage(Operation):
""" TODO: AK: Enforce gppkg version is higher than currently installed version """
def __init__(self, gppkg, master_host, standby_host, segment_host_list):
self.gppkg = gppkg
self.master_host = master_host
self.standby_host = standby_host
self.segment_host_list = segment_host_list
def execute(self):
logger.info('Updating package %s' % self.gppkg.pkg)
ExtractPackage(self.gppkg).run()
ValidateInstallPackage(self.gppkg, is_update = True).run()
# distribute package to segments
srcFile = self.gppkg.abspath
dstFile = os.path.join(GPHOME, self.gppkg.pkg)
GpScp(srcFile, dstFile, self.segment_host_list).run()
# update package on segments
HostOperation(UpdatePackageLocally(dstFile), self.segment_host_list).run()
# update package on standby
if self.standby_host:
Scp(name = 'copying %s to %s' % (srcFile, self.standby_host),
srcFile = srcFile,
dstFile = dstFile,
dstHost = self.standby_host).run(validateAfter = True)
RemoteOperation(UpdatePackageLocally(dstFile), self.standby_host).run()
# update package on master
UpdatePackageLocally(srcFile).run()
logger.info('%s successfully updated.' % (self.gppkg.pkg))
class UpdatePackageLocally(Operation):
"""
Updates a package on the local host
We make cheap reuse of InstallPackageLocally with the propagation of is_update = True, which
effectively changes the rpm --test command to use -U instead of -i. Beyond the invocation of
InstallPackageLocally, here, we also clean up the archive directory to remove other (ideally, older)
versions of the updated package.
"""
def __init__(self, package_path):
self.package_path = package_path
def execute(self):
InstallPackageLocally(self.package_path, is_update = True).run()
# Remove other versions of the package from archive.
# Note: Do not rely on filename format to discern such packages.
# Rather, interrogate a package only through the Gppkg class interface.
current_package = Gppkg.from_package_path(self.package_path)
MakeDir(GPPKG_ARCHIVE_PATH).run()
archived_package_paths = ListFiles(GPPKG_ARCHIVE_PATH).run()
for archived_package_path in archived_package_paths:
temp_package = Gppkg.from_package_path(os.path.join(GPPKG_ARCHIVE_PATH, archived_package_path))
if temp_package.pkgname == current_package.pkgname and temp_package.version != current_package.version:
RemoveFile(os.path.join(GPPKG_ARCHIVE_PATH, archived_package_path)).run()
class CleanGppkg(Operation):
'''
Cleans up the Gppkg from the cluster in case of partial
installation or removal. This might not be required if
we can make the install and uninstall options idempotent.
This operation is exactly the same as remove but we dont
check on each host to see if the rpm is installed or not.
'''
def __init__(self, standby_host, segment_host_list):
self.standby_host = standby_host
self.segment_host_list = segment_host_list
def execute(self):
operations = [SyncPackages(host) for host in self.segment_host_list]
if self.standby_host:
operations.append(SyncPackages(self.standby_host))
ParallelOperation(operations).run()
for operation in operations:
try:
operation.get_ret()
except Exception, e:
raise ExceptionNoStackTraceNeeded('SyncPackages failed' + str(e))
logger.info('Successfully cleaned the cluster')
class MigratePackages(Operation):
"""
Migrates packages from another $GPHOME to this one
This functionality is meant to facilitate minor version upgrade, whereby old packages
need to be brought over from the older $GPHOME to the newer $GPHOME.
Presumably, this could also be used to migrate packages across arbitrary choices
of $GPHOMEs. However, the migration will only succeed if the packages being migrated
are actually compatible with the target GPDB.
"""
def __init__(self, from_gphome, to_gphome):
self.from_gphome, self.to_gphome = from_gphome, to_gphome
def execute(self):
if not os.path.samefile(self.to_gphome, GPHOME):
raise ExceptionNoStackTraceNeeded('The target GPHOME, %s, must match the current $GPHOME used to launch gppkg.' % self.to_gphome)
if os.path.samefile(self.to_gphome, self.from_gphome):
raise ExceptionNoStackTraceNeeded('The source and target GPHOMEs, %s => %s, must differ for packages to be migrated.' % (self.from_gphome, self.to_gphome))
# TODO: AK: Given an invalid from_gphome, we'll end up creating a 'share/packages' subdirectory within it.
old_archive_path = os.path.join(self.from_gphome, ARCHIVE_PATH)
MakeDir(old_archive_path).run()
packages = ListFilesByPattern(old_archive_path, '*' + GPPKG_EXTENSION).run()
if not packages:
logger.info('There are no packages to migrate from %s.' % self.from_gphome)
return
logger.info('The following packages will be migrated: %s' % ', '.join(packages))
for package in packages:
package_path = os.path.join(old_archive_path, package)
try:
InstallPackageLocally(package_path).run()
except AlreadyInstalledError:
logger.info("%s is already installed." % package)
except Exception:
logger.exception("Failed to migrate %s from %s" % (old_archive_path, package))
logger.info('The package migration has completed.')
class GpScp(Operation):
"""
TODO: AK: This obviously does not belong here. My preference would be that it remain here until
the following problem is solved.
MPP-15270 - Improve performance of file transfer across large clusters
I suggest:
We consume an extra parameter 'fanout'. We partition the host_list into a number of buckets
given by 'fanout'. For each bucket, we scp the artifact to the first host in the bucket, and then
we recursively invoke GpScp on that machine for the remaining hosts in its bucket.
GpScp := ParallelOperation([ A(i) for i in range(0, n) ])
A := SerialOperation(B, C)
B := scp source_path target_path @ host_i
where host_i := the first host in the ith bucket
C := RemoteOperation(GpScp(target_path, target_path, host_list_i))
where host_list_i := the remaining hosts in the ith bucket
"""
def __init__(self, source_path, target_path, host_list):
self.source_path = source_path
self.target_path = target_path
self.host_list = host_list
def execute(self):
pool = WorkerPool()
for host in self.host_list:
pool.addCommand(Scp(name = 'copying %s to %s' % (self.source_path, host),
srcFile = self.source_path,
dstFile = self.target_path,
dstHost = host))
pool.join()
pool.haltWork()
class HostOperation(Operation):
"""
TODO: AK: This obviously does not belong here. My preference would be to move it to gppylib.operations.utils
when another consumer becomes clear.
TODO: AK: For generality, the underlying operation should inherit/implement NestedHostOperation so that
it may be initialized with information about the host to which it's been bound. This is fortunately not necessary
for our purposes here, so it's deferrable.
TODO: AK: Build a SegHostOperation that wraps this and is driven by GpArray content.
TODO: AK: Implement something similar for a SegmentOperation + NestedSegmentOperation.
TODO: AK: This (as well as ParallelOperation) would benefit from an appropriate choice of return value. The likely
choice would be: [op.get_ret() for op in self.operations]
"""
def __init__(self, operation, host_list):
self.operation = operation
self.host_list = host_list
def execute(self):
operations = []
for host in self.host_list:
operations.append(RemoteOperation(self.operation, host))
ParallelOperation(operations).run()
for operation in operations:
operation.get_ret()