| #!/usr/bin/env python |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import os, sys, re, tempfile, shutil, pickle, getpass, subprocess, time, glob, ConfigParser |
| from xml.dom import minidom |
| |
| try: |
| from optparse import Option, OptionParser |
| from gppylib.gpparseopts import OptParser, OptChecker |
| from gppylib.gplog import get_default_logger, setup_tool_logging |
| from gppylib.commands.unix import getLocalHostname, getUserName, SYSTEM |
| from gppylib.commands.base import WorkerPool, Command, REMOTE |
| from gppylib.gpcheckutil import HostType, hosttype_str |
| from hawqpylib.hawqlib import remote_ssh_output |
| from pygresql.pgdb import DatabaseError |
| from pygresql import pg |
| import stat |
| |
| except ImportError, e: |
| sys.exit('Cannot import modules. Please check that you have sourced greenplum_path.sh. Detail: ' + str(e)) |
| |
| |
| class GpCheckError(Exception): |
| pass |
| |
| |
| class GpCheckInfo: |
| def __init__(self): |
| self.is_root = (os.geteuid() == 0) |
| self.host_type = HostType.GPCHECK_HOSTTYPE_UNDEFINED |
| self.appliance_version = None |
| |
| # record gpcheck_hostdump data for each host |
| self.hosts = dict() # hostname => GpCheckHost obj |
| |
| # record HAWQ configuration |
| self.hawq_gucs = dict() # guc name => (master_value, segment_value) |
| self.hawq_segment_configuration = None |
| self.hawq_collected_ok = False # if successfully collect HAWQ gucs |
| |
| self.collection_start_time = 0 # used in NTPD testing |
| self.collection_end_time = 0 # used in NTPD testing |
| |
| |
| class GpCheckHost: |
| def __init__(self, name, is_namenode=False): |
| self.hostname = name |
| self.datafile = None # pickle file on each host |
| self.data = None # `gpcheck_hostdump` collected data for each host |
| self.is_namenode = is_namenode |
| |
| def __str__(self): |
| s = "%s datafile(%s)" % (self.hostname, self.datafile) |
| if self.is_namenode: |
| s += " namenode" |
| return s |
| |
| |
| class GpCheckConfig: |
| def __init__(self): |
| self.parser = ConfigParser.RawConfigParser() |
| self.gpcheck_config_version = 0 |
| |
| self.mount_points = set() |
| self.sysctl_expected = dict() |
| |
| self.limits_expected = { # default value for limits.conf |
| ("soft", "nofile"): 2900000, |
| ("hard", "nofile"): 2900000, |
| ("soft", "nproc") : 131072, |
| ("hard", "nproc") : 131072 } |
| |
| self.diskusage_mounts = [] |
| self.diskusage_usagemax = 90 # max disk usage percentage |
| |
| self.hdfs_expected = { # default value for HDFS configuration |
| "dfs.mem.namenode.heap": 8192, |
| "dfs.mem.datanode.heap": 8192 } |
| self.hdfs_non_expected = {} |
| self.hdfs_ha_expected = {} |
| self.hdfs_kerberos_expected = {} |
| self.hdfs_ha_kerberos_expected = {} |
| |
| self.yarn_expected = {} |
| self.yarn_non_expected = {} |
| self.yarn_ha_expected = {} |
| self.yarn_kerberos_expected = {} |
| self.yarn_ha_kerberos_expected = {} |
| |
| self.hawq_expected = {} |
| self.hawq_kerberos_expected = {} |
| self.hawq_yarn_expected = {} |
| |
| |
| def readConfigFile(self, config_file): |
| parsed_list = self.parser.read(config_file) |
| if len(parsed_list) != 1: |
| raise GpCheckError("cannot open file!") |
| |
| if not self.parser.has_section("linux.sysctl"): |
| raise GpCheckError("require section 'linux.sysctl'") |
| |
| section = "global" |
| if self.parser.has_option(section, "configfile_version"): |
| self.gpcheck_config_version = self.parser.getint(section, "configfile_version") |
| |
| section = "linux.mount" |
| if self.parser.has_option(section, "mount.points"): |
| for p in self.parser.get(section, "mount.points").split(","): |
| self.mount_points.add(p.strip()) |
| |
| section = 'linux.sysctl' |
| for opt in self.parser.options(section): |
| if re.match('sysctl\.', opt): |
| fields = opt.split('sysctl.') |
| if len(fields) != 2: |
| raise GpCheckError("Bad config line entry '%s'" % opt) |
| self.sysctl_expected[fields[1]] = self.parser.get(section, opt) |
| |
| section = "linux.limits" |
| for opt in self.parser.options(section): |
| key = tuple(opt.split(".")) |
| self.limits_expected[key] = self.parser.getint(section, opt) |
| |
| section = "linux.diskusage" |
| if self.parser.has_option(section, "diskusage.monitor.mounts"): |
| self.diskusage_mounts = [m.strip() for m in self.parser.get(section, "diskusage.monitor.mounts").split(",")] |
| if self.parser.has_option(section, "diskusage.monitor.usagemax"): |
| self.diskusage_usagemax = self.parser.get(section, "diskusage.monitor.usagemax") |
| try: |
| if self.diskusage_usagemax[-1] == "%": |
| self.diskusage_usagemax = int(self.diskusage_usagemax[:-1]) |
| else: |
| self.diskusage_usagemax = int(self.diskusage_usagemax) |
| |
| except Exception, e: |
| raise GpCheckError("Bad config entry value '%s' for 'diskusage.monitor.usagemax': %s" % |
| (self.diskusage_usagemax, e)) |
| |
| if not self.parser.has_section('hdfs.base'): |
| if not self.parser.has_section("hdfs"): |
| raise GpCheckError("require section 'hdfs'") |
| |
| section = 'hdfs' |
| for opt in self.parser.options(section): |
| self.hdfs_expected[opt] = self.parser.get(section, opt) |
| try: |
| self.hdfs_expected["dfs.mem.namenode.heap"] = int(self.hdfs_expected["dfs.mem.namenode.heap"]) |
| self.hdfs_expected["dfs.mem.datanode.heap"] = int(self.hdfs_expected["dfs.mem.datanode.heap"]) |
| except ValueError, e: |
| raise GpCheckError("'dfs.mem.namenode.heap' or 'dfs.mem.namenode.heap' should be a number: %s" % e) |
| else: |
| section = 'hdfs.base' |
| for opt in self.parser.options(section): |
| self.hdfs_expected[opt] = self.parser.get(section, opt) |
| try: |
| self.hdfs_expected["dfs.mem.namenode.heap"] = int(self.hdfs_expected["dfs.mem.namenode.heap"]) |
| self.hdfs_expected["dfs.mem.datanode.heap"] = int(self.hdfs_expected["dfs.mem.datanode.heap"]) |
| except ValueError, e: |
| raise GpCheckError("'dfs.mem.namenode.heap' or 'dfs.mem.namenode.heap' should be a number: %s" % e) |
| |
| section = 'hdfs.non' |
| for opt in self.parser.options(section): |
| self.hdfs_non_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'hdfs.ha' |
| for opt in self.parser.options(section): |
| self.hdfs_ha_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'hdfs.kerberos' |
| for opt in self.parser.options(section): |
| self.hdfs_kerberos_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'hdfs.ha.kerberos' |
| for opt in self.parser.options(section): |
| self.hdfs_ha_kerberos_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'yarn.base' |
| for opt in self.parser.options(section): |
| self.yarn_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'yarn.non' |
| for opt in self.parser.options(section): |
| self.yarn_non_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'yarn.ha' |
| for opt in self.parser.options(section): |
| self.yarn_ha_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'yarn.kerberos' |
| for opt in self.parser.options(section): |
| self.yarn_kerberos_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'yarn.ha.kerberos' |
| for opt in self.parser.options(section): |
| self.yarn_ha_kerberos_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'hawq.base' |
| for opt in self.parser.options(section): |
| self.hawq_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'hawq.kerberos' |
| for opt in self.parser.options(section): |
| self.hawq_kerberos_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'hawq.yarn' |
| for opt in self.parser.options(section): |
| self.hawq_yarn_expected[opt] = self.parser.get(section, opt) |
| |
| ###### Global Variables ############# |
| logger = get_default_logger() |
| EXECNAME = os.path.split(__file__)[-1] |
| setup_tool_logging(EXECNAME,getLocalHostname(),getUserName()) |
| |
| options = None |
| GPHOME = None |
| GPCHECK_CONFIG_FILE = None |
| HADOOP_HOME = None |
| |
| gpcheck_info = GpCheckInfo() |
| gpcheck_config = GpCheckConfig() |
| pool = WorkerPool() |
| tmpdir = None |
| found_errors = 0 |
| |
| HAWQ_GUC_MEMORY = "hawq_re_memory_overcommit_max" |
| |
| |
| def checkPlatform(): |
| host_type_map = { "linux": HostType.GPCHECK_HOSTTYPE_GENERIC_LINUX, |
| "sunos": HostType.GPCHECK_HOSTTYPE_GENERIC_SOLARIS } |
| try: |
| gpcheck_info.host_type = host_type_map[SYSTEM.getName()] |
| logger.info("Detected platform: %s" % hosttype_str(gpcheck_info.host_type)) |
| |
| except KeyError: |
| raise GpCheckError("No tests exists for this platform in gpcheck") |
| |
| |
| def parse_host_list_file(host_file): |
| host_list = list() |
| with open(host_file) as f: |
| hosts = f.readlines() |
| for host in hosts: |
| host = host.split("#",1)[0].strip() |
| if host: |
| host_list.append(host) |
| return host_list |
| |
| def parseargs(): |
| global options, GPHOME, HADOOP_HOME, GPCHECK_CONFIG_FILE |
| |
| parser = OptParser(option_class=OptChecker, version='%prog version $Revision: #1 $') |
| parser.remove_option('-h') |
| parser.add_option('-?', '--help', action='help') |
| parser.add_option('--verbose', action='store_true') |
| parser.add_option('--stdout', action='store_true') |
| parser.add_option('--zipout', action='store_true') |
| parser.add_option('--zipin', type='string') |
| parser.add_option('--gphome', type='string') |
| # for HDFS xml and memory check |
| parser.add_option('--hadoop', '--hadoop-home', type='string') |
| parser.add_option('--hdfs', action='store_true') |
| parser.add_option('--hdfs-ha', dest="hdfs_ha", action='store_true') |
| parser.add_option('--yarn', action='store_true') |
| parser.add_option('--yarn-ha', dest="yarn_ha", action='store_true') |
| parser.add_option('--kerberos', action='store_true') |
| |
| parser.add_option('-c', '--config', type='string') # optional: gpcheck config file path |
| parser.add_option('-f', '--file', type='string') # host file, for testing a list of hosts |
| parser.add_option('-h', '--host', type='string') # test a single host |
| |
| (options, args) = parser.parse_args() |
| if len(args) > 0: |
| if args[0] == 'help': |
| parser.print_help(sys.stderr) |
| sys.exit(0) |
| |
| # GPHOME must be found |
| GPHOME = options.gphome if options.gphome else os.environ.get("GPHOME") |
| if not GPHOME: |
| raise GpCheckError("GPHOME not set, must be specified in --gphome") |
| GPCHECK_CONFIG_FILE = options.config if options.config else "%s/etc/gpcheck.cnf" % GPHOME |
| logger.info("Checks uses config file: %s", GPCHECK_CONFIG_FILE) |
| |
| HADOOP_HOME = options.hadoop if options.hadoop else os.environ.get("HADOOP_HOME") |
| |
| if not HADOOP_HOME: |
| checkFailed(None, "utility will SKIP HDFS configuration check because HADOOP_HOME is not specified in environment variable or --hadoop") |
| |
| if options.yarn and not HADOOP_HOME: |
| options.yarn = False |
| checkFailed(None, "utility will SKIP YARN configuration check because HADOOP_HOME is not specified in environment variable or --hadoop") |
| |
| # params check |
| if not options.file and not options.host and not options.zipin: |
| raise GpCheckError(" --file or --host or --zipin must be specified") |
| |
| if options.file and options.host: |
| raise GpCheckError(" You can specify either --file or --host, but not both") |
| |
| if options.stdout and options.zipout: |
| raise GpCheckError(" You can specify either --stdout or --zipout, but not both") |
| |
| |
| def readConfigFile(): |
| try: |
| gpcheck_config.readConfigFile(GPCHECK_CONFIG_FILE) |
| |
| except Exception, e: |
| raise GpCheckError("Field to read gpcheck config file '%s':\n%s" % (GPCHECK_CONFIG_FILE, e)) |
| |
| |
| def checkFailed(host, msg): |
| global found_errors |
| found_errors += 1 |
| if host: |
| logger.error("host(%s): %s", host, msg) |
| else: |
| logger.error(msg) |
| |
| |
| def getHDFSNamenodeHost(): |
| core_site_file = os.path.join(HADOOP_HOME, "etc/hadoop/core-site.xml") |
| hdfs_site_file = os.path.join(HADOOP_HOME, "etc/hadoop/hdfs-site.xml") |
| logger.info("try to detect namenode from %s" % core_site_file) |
| |
| # for processing property xml |
| getPropName = lambda node: node.getElementsByTagName('name')[0].childNodes[0].data |
| getPropValue = lambda node: node.getElementsByTagName('value')[0].childNodes[0].data |
| |
| # read namenode address from core-site.xml |
| with open(core_site_file) as f: |
| xmldoc = minidom.parse(f) |
| namenode_addr = '' |
| for node in xmldoc.getElementsByTagName('property'): |
| if getPropName(node) == 'fs.default.name' or getPropName(node) == 'fs.defaultFS': |
| fsurl = getPropValue(node).strip() |
| namenode_list_alias = re.search(r"//([^:/]*)", fsurl).group(1) |
| if_ha_disabled = re.search(".*:[0-9]+$", fsurl) |
| if if_ha_disabled: |
| namenode_addr = namenode_list_alias |
| else: |
| namenode_addr = '' |
| break |
| |
| # run hostname command on remote to get actual hostname |
| if namenode_addr == '': |
| ha_namenode_list = '' |
| default_namenode_alias = '' |
| with open(hdfs_site_file) as f: |
| xmldoc = minidom.parse(f) |
| for node in xmldoc.getElementsByTagName('property'): |
| if re.search('dfs.ha.namenodes.*', getPropName(node).strip()): |
| ha_namenode_list = getPropValue(node).strip() |
| default_namenode_alias = ha_namenode_list.split(',')[0].strip() |
| break |
| |
| if ha_namenode_list == '': |
| logger.error("cannot detect namenode from %s" % core_site_file) |
| raise GpCheckError("cannot detect namenode from %s" % core_site_file) |
| #sys.exit(1) |
| else: |
| with open(hdfs_site_file) as f: |
| xmldoc = minidom.parse(f) |
| for node in xmldoc.getElementsByTagName('property'): |
| namenode_rpc_address = "dfs.namenode.rpc-address.%s.%s" % (namenode_list_alias, |
| default_namenode_alias) |
| if getPropName(node) == namenode_rpc_address: |
| default_namenode_rpc_address = getPropValue(node).strip() |
| namenode_addr = default_namenode_rpc_address.split(':')[0].strip() |
| |
| if namenode_addr == '': |
| raise GpCheckError("cannot detect namenode from %s" % core_site_file) |
| else: |
| cmd = Command(namenode_addr, "hostname", REMOTE, namenode_addr) |
| pool.addCommand(cmd) |
| pool.join() |
| items = pool.getCompletedItems() |
| for i in items: |
| if i.results.rc or i.results.halt or not i.results.completed: |
| raise Exception("error running 'hostname' command: %s" % i.results.stderr.strip()) |
| namenode_host = i.results.stdout.strip() |
| logger.info("detect namenode hostname to be %s" % namenode_host) |
| return namenode_host |
| |
| |
| def createHostList(): |
| if options.verbose: |
| logger.info("trying to deduplicate hosts...") |
| |
| hostlist = [] |
| # read the host file if present |
| if options.file: |
| try: |
| with open(options.file, "r") as f: |
| hostlist = [line.strip() for line in f.readlines() if line.strip()] |
| |
| except IOError, e: |
| raise GpCheckError("error reading host file '%s': %s" % (options.file, str(e))) |
| else: |
| hostlist.append(options.host) |
| |
| # get actual hostname and deduplicate |
| try: |
| for hostname in hostlist: |
| cmd = Command(hostname, "hostname", REMOTE, hostname) |
| pool.addCommand(cmd) |
| |
| pool.join() |
| items = pool.getCompletedItems() |
| for i in items: |
| if i.results.rc or i.results.halt or not i.results.completed: |
| raise Exception("error running 'hostname' on host '%s': %s" % (i.remoteHost, i.results.stderr.strip())) |
| |
| actualHostname = i.results.stdout.strip() |
| if actualHostname not in gpcheck_info.hosts: |
| gpcheck_info.hosts[actualHostname] = GpCheckHost(actualHostname) |
| |
| except Exception, e: |
| raise GpCheckError("failed to collect 'hostname' on servers: %s" % str(e)) |
| |
| if options.verbose: |
| logger.info("trying to deduplicate hosts [success]") |
| |
| if HADOOP_HOME: |
| try: |
| namenode_host = getHDFSNamenodeHost() |
| if namenode_host in hostlist: |
| gpcheck_info.hosts[namenode_host] = GpCheckHost(namenode_host, is_namenode=True) |
| else: |
| logger.warning("utility will skip HDFS namenode check since it's not in current host list.") |
| |
| except Exception, e: |
| checkFailed(None, "utility will SKIP HDFS namenode check: %s" % str(e)) |
| |
| |
| def runCollections(): |
| logger.info("trying to collect server configuration...") |
| |
| # run gpcheck_hostdump on each server |
| runCollectionOnServers() |
| # copy hostdump file to master |
| copyFilesLocally() |
| # delete hostdump file on remote servers |
| deleteRemoteFiles() |
| |
| logger.info("trying to collect server configuration [success]") |
| |
| |
| def runCollectionOnServers(): |
| gpcheck_info.collection_start_time = time.time() |
| |
| def getDumpCommand(): |
| if gpcheck_info.host_type == HostType.GPCHECK_HOSTTYPE_GENERIC_LINUX: |
| host_type_cl = "--linux" |
| elif gpcheck_info.host_type == HostType.GPCHECK_HOSTTYPE_GENERIC_SOLARIS: |
| host_type_cl = "--solaris" |
| else: |
| raise GpCheckError("unsupported host type") |
| |
| cmd = "%s/sbin/gpcheck_hostdump --hawq %s" % (GPHOME, host_type_cl) |
| cmd += " --sysctl %s" % ",".join(gpcheck_config.sysctl_expected.keys()) |
| if HADOOP_HOME: |
| cmd += " --hadoop %s" % HADOOP_HOME |
| if options.yarn or options.yarn_ha: |
| cmd += " --yarn" |
| return cmd |
| |
| try: |
| cmdStr = getDumpCommand() |
| for host in gpcheck_info.hosts: |
| if options.verbose: |
| logger.info("collect data on host: %s" % host) |
| |
| cmd = Command(host, cmdStr, REMOTE, host) |
| pool.addCommand(cmd) |
| |
| pool.join() |
| items = pool.getCompletedItems() |
| for i in items: |
| if i.results.rc or i.results.halt or not i.results.completed: |
| raise Exception("error running gpcheck_hostdump on '%s': %s" % (i.remoteHost, i.results.stderr.strip())) |
| |
| gpcheck_info.hosts[i.remoteHost].datafile = i.results.stdout.strip() |
| |
| except Exception, e: |
| raise GpCheckError("Failed to collect data from servers:\n%s" % e) |
| |
| gpcheck_info.collection_end_time = time.time() |
| |
| |
| def copyFilesLocally(): |
| if options.verbose: |
| logger.info("copy hostdump files from remote servers to master") |
| |
| try: |
| for host in gpcheck_info.hosts: |
| cmdStr = "scp %s:%s %s/%s.data" % (host, gpcheck_info.hosts[host].datafile, tmpdir, host) |
| if options.verbose: |
| logger.info(cmdStr) |
| cmd = Command(host, cmdStr) |
| pool.addCommand(cmd) |
| |
| pool.join() |
| items = pool.getCompletedItems() |
| for i in items: |
| if i.results.rc or i.results.halt or not i.results.completed: |
| raise Exception("error running command %s: %s" % (i.cmdStr, i.results.stderr.strip())) |
| |
| except Exception, e: |
| raise GpCheckError("Failed to scp remote hostdump file to master:\n%s" % e) |
| |
| |
| def deleteRemoteFiles(): |
| if options.verbose: |
| logger.info("delete hostdump files on remote servers") |
| |
| try: |
| for host in gpcheck_info.hosts: |
| cmdStr = "rm -f %s" % gpcheck_info.hosts[host].datafile |
| if options.verbose: |
| logger.info(cmdStr) |
| cmd = Command(host, cmdStr, REMOTE, host) |
| pool.addCommand(cmd) |
| |
| pool.join() |
| items = pool.getCompletedItems() |
| for i in items: |
| if i.results.rc or i.results.halt or not i.results.completed: |
| raise Exception("error running command %s: %s" % (i.cmdStr, i.results.stderr.strip())) |
| |
| except Exception, e: |
| raise GpCheckError("Failed to delete remote hostdump file:\n%s" % e) |
| |
| |
| def readDataFiles(): |
| for host in gpcheck_info.hosts: |
| fname = "%s/%s.data" % (tmpdir, host) |
| try: |
| with open(fname, "rb") as f: |
| gpcheck_info.hosts[host].data = pickle.load(f) |
| |
| except Exception, e: |
| raise GpCheckError("Failed to load pickle file '%s': %s" % (fname, e)) |
| |
| |
| def readHAWQConfiguration(): |
| if options.verbose: |
| logger.info("trying to collect HAWQ configuration...") |
| |
| dbname = os.environ.get('PGDATABASE', 'template1') |
| try: |
| db = pg.connect(dbname=dbname) |
| except pg.InternalError, ex: |
| checkFailed(None, "utility cannot perform HAWQ CPU and Memory check because failed to connect to HAWQ") |
| return |
| |
| # read segment configurations |
| gpcheck_info.hawq_segment_configuration = db.query("select * from gp_segment_configuration").dictresult() |
| db.close() |
| |
| # read Memory GUC using hawqconfig |
| command = "hawqconfig -s %s" % HAWQ_GUC_MEMORY |
| p = subprocess.Popen(command, shell = True, |
| stdout = subprocess.PIPE, stderr = subprocess.PIPE) |
| result = p.communicate() |
| match_master = re.search(r'Value : (\d+)', result[0]) |
| |
| if match_master: |
| gpcheck_info.hawq_gucs[HAWQ_GUC_MEMORY] = (int(match_master.group(1))) |
| else: |
| checkFailed(None, "utility cannot perform HAWQ Memory check because failed to get GUC value using '%s'" % command) |
| return |
| |
| gpcheck_info.hawq_collected_ok = True |
| if options.verbose: |
| logger.info("trying to collect HAWQ configuration [success]") |
| |
| |
| def testConnectEmc(host): |
| if not host.is_a_master: |
| return |
| |
| expected = "Running" |
| |
| if host.data.connectemc.output != expected: |
| checkFailed(host.hostname, "Connect EMC is not running on master (try /etc/init.d/connectemc status)") |
| |
| |
| |
| def testSolarisEtcSystem(host): |
| requiredValues = { 'rlim_fd_cur' : '65536', |
| 'zfs:zfs_arc_max' : '0x600000000', |
| 'pcplusmp:apic_panic_on_nmi' : '1', |
| 'nopanicdebug' : '1' } |
| |
| results = dict() |
| |
| for k in requiredValues.keys(): |
| results[k] = 0 |
| |
| for key in host.data.etc_system.parameters.keys(): |
| |
| if key not in requiredValues: |
| continue |
| |
| foundValue = host.data.etc_system.parameters[key] |
| if foundValue == requiredValues[key]: |
| results[key] = 1 |
| |
| for k in results.keys(): |
| |
| if results[k]: |
| continue |
| |
| checkFailed(host.hostname, "/etc/system is missing expected line 'set %s=%s'" % (k, requiredValues[k])) |
| |
| |
| def testSolarisEtcProject(host): |
| |
| requiredValues = { 'default:3::::project.max-sem-ids=(priv,1024,deny);process.max-file-descriptor=(priv,252144,deny)' : 0 } |
| |
| unexpectedValues = set(['default:3::::']) |
| |
| for line in host.data.etc_project.lines: |
| if line in unexpectedValues: |
| checkFailed(host.hostname, "unexpected line in /etc/project: '%s'" % line) |
| continue |
| |
| if line in requiredValues: |
| requiredValues[line] = 1 |
| |
| for line in requiredValues.keys(): |
| if requiredValues[line]: |
| continue |
| |
| checkFailed(host.hostname, "/etc/project is missing expected line '%s'" % line) |
| |
| |
| def testSolarisEtcUserAttr(host): |
| |
| requiredValues = { 'gpadmin::::defaultpriv=basic,dtrace_user,dtrace_proc' : 0 } |
| |
| for line in host.data.etc_user_attr.lines: |
| if line in requiredValues: |
| requiredValues[line] = 1 |
| |
| for line in requiredValues.keys(): |
| if requiredValues[line]: |
| continue |
| |
| checkFailed(host.hostname, "/etc/user_attr is missing expected line '%s'" % line) |
| |
| |
| def testHAWQGUC(host): |
| if not gpcheck_info.hawq_collected_ok: |
| return |
| |
| if options.verbose: |
| logger.info("-- test HAWQ CPU/Memory Guc Settings") |
| |
| c = gpcheck_info.hawq_segment_configuration |
| master_hostname = filter(lambda x: x['role'] == 'm', c)[0]['hostname'] |
| |
| if host.hostname not in map(lambda x: x['hostname'], c): |
| logger.warning("host '%s' is not in HAWQ array" % host.hostname) |
| return |
| |
| actual_total_memory = host.data.machine.memory_in_MB |
| |
| guc_vmemsize_master = gpcheck_info.hawq_gucs[HAWQ_GUC_MEMORY] |
| # segment count on this host |
| num_segments = len(filter(lambda x: x['hostname'] == host.hostname, c)) |
| |
| if host.hostname == master_hostname: |
| if num_segments > 1: |
| checkFailed(host.hostname, "HAWQ master host has segments configured") |
| |
| if actual_total_memory < guc_vmemsize_master: |
| checkFailed(host.hostname, "HAWQ master host memory size '%s' is less than the '%s' size '%s'" % ( |
| actual_total_memory, HAWQ_GUC_MEMORY, guc_vmemsize_master)) |
| return |
| |
| # check HAWQ master's memory size |
| expected_vmemory_size = 8192 |
| if guc_vmemsize_master != expected_vmemory_size: |
| checkFailed(host.hostname, "HAWQ master's %s GUC value is %s, expected %s" % ( |
| HAWQ_GUC_MEMORY, guc_vmemsize_master, expected_vmemory_size)) |
| |
| else: |
| datanode_mem = gpcheck_config.hdfs_expected["dfs.mem.datanode.heap"] |
| |
| # check HAWQ memory size |
| if actual_total_memory < datanode_mem: |
| checkFailed(host.hostname, "HAWQ segment's host memory size '%s' is less than the expected data node memory size '%s'" % ( |
| actual_total_memory, datanode_mem)) |
| logger.warning("please change the expected data node memory 'dfs.mem.datanode.heap' in gpcheck.cnf file") |
| logger.warning("SKIP '%s' check" %(HAWQ_GUC_MEMORY)) |
| return |
| expect_vmemsize_per_segment = 8192 |
| if guc_vmemsize_master != expect_vmemsize_per_segment: |
| checkFailed(host.hostname, "HAWQ segment's %s GUC value on this host is %s, expected %s" % ( |
| HAWQ_GUC_MEMORY, guc_vmemsize_master, expect_vmemsize_per_segment)) |
| |
| |
| def testDiskCapacity(host): |
| if options.verbose: |
| logger.info("-- test Disk Capacity") |
| |
| for line in host.data.diskusage.lines: |
| if len(gpcheck_config.diskusage_mounts) == 0 or line.mount in gpcheck_config.diskusage_mounts: |
| actual_usage = int(line.used_percent[:-1]) |
| if actual_usage > gpcheck_config.diskusage_usagemax: |
| checkFailed(host.hostname, |
| "potential disk full risk: %s mounted on %s has used %s space" % ( |
| line.fs, line.mount, line.used_percent)) |
| return |
| |
| |
| def testHAWQconfig(host): |
| hawq = host.data.hawq |
| hdfs = host.data.hdfs |
| if hawq is None: |
| return # skip HAWQ test when hawq is None |
| |
| if hdfs is None: |
| return # skip HAWQ test when hdfs is None |
| |
| if options.verbose: |
| logger.info("-- test HAWQ config") |
| |
| if hawq.errormsg: |
| checkFailed(host.hostname, "collect HAWQ configuration error: %s" % hawq.errormsg) |
| return |
| |
| datanode_list = list() |
| if HADOOP_HOME: |
| datanode_list = parse_host_list_file("%s/etc/hadoop/slaves" % HADOOP_HOME) |
| is_datanode = False |
| if host.hostname in datanode_list: |
| is_datanode = True |
| |
| expect_config = gpcheck_config.hawq_expected |
| |
| if options.kerberos: |
| expect_config.update(gpcheck_config.hawq_kerberos_expected) |
| |
| if options.yarn or options.yarn_ha: |
| expect_config.update(gpcheck_config.hawq_yarn_expected) |
| |
| actual_config = hawq.site_config |
| hdfs_actual_config = hdfs.site_config |
| |
| for exp_key, exp_val in expect_config.items(): |
| if exp_key not in actual_config: |
| checkFailed(host.hostname, "HAWQ configuration missing: '%s' needs to be set to '%s'" % (exp_key, exp_val)) |
| |
| else: |
| actual_val = actual_config[exp_key] |
| et = (exp_key, exp_val, actual_val) |
| |
| if exp_key == "dfs.block.local-path-access.user": |
| if exp_val not in actual_val.split(','): |
| checkFailed(host.hostname, "HDFS configuration: '%s' should include user '%s', actual value is '%s'" % et) |
| elif exp_key == "dfs.namenode.handler.count": |
| if int(exp_val) > int(actual_val): |
| checkFailed(host.hostname, "HDFS configuration: '%s' should be at least '%s', actual value is '%s'" % et) |
| else: |
| if exp_val != actual_val: |
| checkFailed(host.hostname, "HAWQ configuration: expected '%s' for '%s', actual value is '%s'" % et) |
| |
| if not options.kerberos: |
| if 'hadoop.security.authentication' in actual_config: |
| if actual_config['hadoop.security.authentication'] != 'simple': |
| checkFailed(host.hostname, "HAWQ configuration: expected '%s' for '%s', actual value is '%s'" % ('simple', 'hadoop.security.authentication', actual_config['hadoop.security.authentication'])) |
| |
| if 'hadoop.security.authentication' in hdfs_actual_config: |
| if hdfs_actual_config['hadoop.security.authentication'] != 'simple': |
| checkFailed(host.hostname, "HAWQ configuration: expected '%s' for '%s', actual value is '%s'" % ('simple', 'hadoop.security.authentication', hdfs_actual_config['hadoop.security.authentication'])) |
| |
| if options.yarn or options.yarn_ha: |
| hawq_yarn_property_exist_list = ['hawq_rm_yarn_address', 'hawq_rm_yarn_scheduler_address', 'hawq_rm_yarn_app_name'] |
| for item in hawq_yarn_property_exist_list: |
| if item in actual_config: |
| if not actual_config[item]: |
| checkFailed(host.hostname, "HAWQ configuration: yarn.resourcemanager.address is empty") |
| else: |
| checkFailed(host.hostname, "HAWQ configuration: yarn.resourcemanager.address not defined") |
| |
| if 'dfs.client.read.shortcircuit' not in actual_config: |
| checkFailed(host.hostname, "HAWQ configuration dfs.client.read.shortcircuit not defined") |
| |
| if 'dfs.client.read.shortcircuit' not in hdfs_actual_config: |
| checkFailed(host.hostname, "HAWQ configuration dfs.client.read.shortcircuit not defined") |
| |
| if 'dfs.domain.socket.path' not in actual_config: |
| checkFailed(host.hostname, "HAWQ configuration dfs.domain.socket.path not defined") |
| |
| if 'dfs.domain.socket.path' not in hdfs_actual_config: |
| checkFailed(host.hostname, "HDFS configuration dfs.domain.socket.path not defined") |
| |
| if is_datanode and 'dfs.domain.socket.path' in actual_config and 'dfs.domain.socket.path' in hdfs_actual_config: |
| if actual_config['dfs.domain.socket.path'] != hdfs_actual_config['dfs.domain.socket.path']: |
| checkFailed(host.hostname, "HAWQ configuration: dfs.domain.socket.path expect to have the same value with HDFS configuration") |
| else: |
| cmd = "ls -l %s" % actual_config['dfs.domain.socket.path'] |
| (result, output, errmsg) = remote_ssh_output(cmd, host.hostname, '') |
| if result == 0: |
| if output.split(' ')[0][7:9] != 'rw': |
| checkFailed(host.hostname, "HAWQ configuration dfs.domain.socket.path: %s should have R/W access for both hawq and HDFS on %s" % (actual_config['dfs.domain.socket.path'], host.hostname)) |
| else: |
| checkFailed(host.hostname, "HAWQ configuration dfs.domain.socket.path: %s, does not exist on %s" % (actual_config['dfs.domain.socket.path'], host.hostname)) |
| |
| if 'output.replace-datanode-on-failure' in actual_config and len(datanode_list) > 0: |
| if len(datanode_list) < 4: |
| if actual_config['output.replace-datanode-on-failure'] == 'true': |
| checkFailed(host.hostname, "HAWQ configuration: output.replace-datanode-on-failure expect false, current is true") |
| else: |
| if actual_config['output.replace-datanode-on-failure'] == 'false': |
| checkFailed(host.hostname, "HAWQ configuration: output.replace-datanode-on-failure expect true, current is false") |
| else: |
| checkFailed(host.hostname, "HAWQ configuration: output.replace-datanode-on-failure not defined") |
| |
| |
| def testDiskCapacity(host): |
| if options.verbose: |
| logger.info("-- test Disk Capacity") |
| |
| for line in host.data.diskusage.lines: |
| if len(gpcheck_config.diskusage_mounts) == 0 or line.mount in gpcheck_config.diskusage_mounts: |
| actual_usage = int(line.used_percent[:-1]) |
| if actual_usage > gpcheck_config.diskusage_usagemax: |
| checkFailed(host.hostname, |
| "potential disk full risk: %s mounted on %s has used %s space" % ( |
| line.fs, line.mount, line.used_percent)) |
| return |
| |
| |
| def testHDFSConfig(host): |
| hdfs = host.data.hdfs |
| if hdfs is None: |
| return # skip HDFS test when hdfs is None |
| |
| if options.verbose: |
| logger.info("-- test HDFS config") |
| |
| if hdfs.errormsg: |
| checkFailed(host.hostname, "collect HDFS configuration error: %s" % hdfs.errormsg) |
| return |
| |
| expect_config = gpcheck_config.hdfs_expected |
| |
| if not options.hdfs_ha and not options.kerberos: |
| expect_config.update(gpcheck_config.hdfs_non_expected) |
| |
| if options.hdfs_ha and not options.kerberos: |
| expect_config.update(gpcheck_config.hdfs_ha_expected) |
| |
| if options.kerberos and not options.hdfs_ha: |
| expect_config.update(gpcheck_config.hdfs_kerberos_expected) |
| |
| if options.kerberos and options.hdfs_ha: |
| expect_config.update(gpcheck_config.hdfs_ha_kerberos_expected) |
| |
| |
| if options.yarn or options.yarn_ha: |
| expect_config.update(gpcheck_config.yarn_expected) |
| if not options.yarn_ha and not options.kerberos: |
| expect_config.update(gpcheck_config.yarn_non_expected) |
| |
| if options.yarn_ha: |
| expect_config.update(gpcheck_config.yarn_ha_expected) |
| if options.kerberos: |
| expect_config.update(gpcheck_config.yarn_kerberos_expected) |
| |
| actual_config = hdfs.site_config |
| actual_heap_size = hdfs.namenode_heap_size if host.is_namenode else hdfs.datanode_heap_size |
| |
| if host.data.machine.memory_in_MB < actual_heap_size: |
| checkFailed(host.hostname, "host memory size '%s' is less than the java max heap size '%s'" % (host.data.machine.memory_in_MB, actual_heap_size)) |
| |
| # test hdfs_site.xml setting |
| for exp_key, exp_val in expect_config.items(): |
| if exp_key.startswith("dfs.mem"): |
| continue # these options belongs to memory tests |
| |
| if exp_key not in actual_config: |
| checkFailed(host.hostname, "HDFS configuration missing: '%s' needs to be set to '%s'" % (exp_key, exp_val)) |
| |
| else: |
| actual_val = actual_config[exp_key] |
| et = (exp_key, exp_val, actual_val) |
| |
| if exp_key == "dfs.block.local-path-access.user": |
| if exp_val not in actual_val.split(','): |
| checkFailed(host.hostname, "HDFS configuration: '%s' should include user '%s', actual value is '%s'" % et) |
| |
| elif exp_key == "dfs.namenode.handler.count": |
| if int(exp_val) > int(actual_val): |
| checkFailed(host.hostname, "HDFS configuration: '%s' should be at least '%s', actual value is '%s'" % et) |
| |
| else: |
| if exp_val != actual_val: |
| checkFailed(host.hostname, "HDFS configuration: expected '%s' for '%s', actual value is '%s'" % et) |
| |
| # test hadoop memory setting |
| expect_namenode_heap = expect_config["dfs.mem.namenode.heap"] |
| expect_datanode_heap = expect_config["dfs.mem.datanode.heap"] |
| |
| if host.is_namenode and actual_heap_size < expect_namenode_heap: |
| checkFailed(host.hostname, "Namenode Java heap size is only %sM, we recommends at least %sM" % |
| (actual_heap_size, expect_namenode_heap)) |
| |
| if not host.is_namenode and actual_heap_size < expect_datanode_heap: |
| checkFailed(host.hostname, "Datanode Java heap size is only %sM, expect value is %sM" % |
| (actual_heap_size, expect_datanode_heap)) |
| |
| |
| # Check if nodemanager direcotries exists |
| directory_check_list = [] |
| datanode_list = list() |
| if HADOOP_HOME: |
| datanode_list = parse_host_list_file("%s/etc/hadoop/slaves" % HADOOP_HOME) |
| is_datanode = False |
| if host.hostname in datanode_list: |
| is_datanode = True |
| |
| if options.yarn or options.yarn_ha: |
| yarn_enabled = True |
| else: |
| yarn_enabled = False |
| |
| if yarn_enabled and is_datanode: |
| if 'yarn.nodemanager.local-dirs' in actual_config: |
| directory_check_list += actual_config['yarn.nodemanager.local-dirs'].split(',') |
| else: |
| checkFailed(host.hostname, "YARN configuration: yarn.nodemanager.local-dirs not defined") |
| |
| if 'yarn.nodemanager.log-dirs' in actual_config: |
| directory_check_list += actual_config['yarn.nodemanager.log-dirs'].split(',') |
| else: |
| checkFailed(host.hostname, "YARN configuration: yarn.nodemanager.log-dirs not defined") |
| |
| for directory in directory_check_list: |
| cmd = "test -e %s" % directory |
| (result, output, errmsg) = remote_ssh_output(cmd, host.hostname, '') |
| if result != 0: |
| checkFailed(host.hostname, "YARN nodemanager directory %s does not exist" % directory) |
| |
| # Check if resource manager property exists |
| if options.yarn: |
| yarn_property_exist_list = ['yarn.resourcemanager.address', 'yarn.resourcemanager.scheduler.address'] |
| |
| if options.yarn_ha: |
| yarn_property_exist_list = ['yarn.resourcemanager.hostname.rm1', 'yarn.resourcemanager.hostname.rm2'] |
| |
| if yarn_enabled: |
| for item in yarn_property_exist_list: |
| if item in actual_config: |
| if not actual_config[item]: |
| checkFailed(host.hostname, "YARN configuration: %s is empty" % item) |
| else: |
| checkFailed(host.hostname, "YARN configuration: %s not defined" % item) |
| |
| # Check yarn kerberos properties |
| if yarn_enabled and options.kerberos: |
| yarn_kerberos_check_list = ['yarn.nodemanager.keytab', 'yarn.nodemanager.principal', \ |
| 'yarn.resourcemanager.keytab', 'yarn.resourcemanager.principal'] |
| for item in yarn_kerberos_check_list: |
| if item in actual_config: |
| if not actual_config[item]: |
| checkFailed(host.hostname, "YARN configuration: %s is empty, expected non-empty" % item) |
| else: |
| checkFailed(host.hostname, "YARN configuration missing: %s" % item) |
| |
| |
| def testIOSchedulers(host): |
| if options.verbose: |
| logger.info("-- test IO scheduler") |
| |
| if host.data.ioschedulers.errormsg: |
| checkFailed(host.hostname, "collect IO scheduler data error: %s" % host.data.ioschedulers.errormsg) |
| return |
| |
| expectedScheduler = "deadline" |
| for dev in host.data.ioschedulers.devices: |
| scheduler = host.data.ioschedulers.devices[dev] |
| if scheduler != expectedScheduler: |
| checkFailed(host.hostname, |
| "on device (%s) IO scheduler '%s' does not match expected value '%s'" % (dev, scheduler, expectedScheduler)) |
| |
| |
| # perform this test only run as root |
| def testBlockdev(host): |
| if host.data.blockdev is None: |
| return |
| |
| if options.verbose: |
| logger.info("-- test block device readahead value") |
| |
| expectedReadAhead = "16384" |
| for dev in host.data.blockdev.ra: |
| ra = host.data.blockdev.ra[dev] |
| if ra != expectedReadAhead: |
| checkFailed(host.hostname, |
| "on device (%s) blockdev readahead value '%s' does not match expected value '%s'" % (dev, ra, expectedReadAhead)) |
| |
| |
| def testSysctl(host): |
| if options.verbose: |
| logger.info("-- test sysctl value") |
| |
| if host.data.sysctl.errormsg: |
| checkFailed(host.hostname, "collect sysctl params error: %s" % host.data.sysctl.errormsg) |
| return |
| |
| expected_values = gpcheck_config.sysctl_expected |
| real_values = host.data.sysctl.variables |
| |
| # gpcheck.conf specify a lowerbound value for these params, actual value can be larger |
| params_with_lowerbound = set() # sysctl params' value must be exactly the same |
| |
| for k in expected_values: |
| if k in params_with_lowerbound: |
| if int(real_values[k]) < int(expected_values[k]): |
| checkFailed(host.hostname, |
| "sysctl value for key '%s' has value '%s', but we expect at least '%s'" % (k, real_values[k], expected_values[k])) |
| |
| elif real_values[k] != expected_values[k]: # for other params, we expect the actual value to be the same value |
| checkFailed(host.hostname, |
| "sysctl value for key '%s' has value '%s' and expects '%s'" % (k, real_values[k], expected_values[k])) |
| |
| |
| def testLimitsConf(host): |
| if options.verbose: |
| logger.info("-- test /etc/security/limits.conf") |
| |
| if host.data.limitsconf.errormsg: |
| checkFailed(host.hostname, "collect limits.conf data error: %s" % host.data.limitsconf.errormsg) |
| return |
| |
| # both dict has the form: (type, item) => value |
| expect_data = gpcheck_config.limits_expected |
| actual_data = dict([((e.type, e.item), e.value) for e in host.data.limitsconf.lines if e.domain in ("gpadmin", "*")]) |
| expect_keyset = set(expect_data.keys()) |
| actual_keyset = set(actual_data.keys()) |
| |
| for key in expect_keyset.intersection(actual_keyset): |
| expect_val = int(expect_data[key]) |
| actual_val = int(actual_data[key]) |
| if actual_val < expect_val: |
| checkFailed(host.hostname, |
| "%s in /etc/security/limits.conf has value %d lower than expected value %d" % ( |
| " ".join(key), actual_val, expect_val)) |
| |
| for key in expect_keyset.difference(actual_keyset): |
| checkFailed(host.hostname, |
| "%s not found in /etc/security/limits.conf" % " ".join(key)) |
| |
| |
| def testLinuxMounts(host): |
| if options.verbose: |
| logger.info("-- test mount points") |
| |
| expected_mount_points = gpcheck_config.mount_points |
| actual_mount_points = set([m.dir for m in host.data.mounts.entries.values()]) |
| |
| if len(expected_mount_points) == 0: |
| if options.verbose: |
| logger.info("-- you didn't specify any mount points to be check in %s, ignore this test" % GPCHECK_CONFIG_FILE) |
| return |
| |
| if not actual_mount_points.issuperset(expected_mount_points): |
| for failed_mount in expected_mount_points.difference(actual_mount_points): |
| checkFailed(host.hostname, "%s is not mounted" % failed_mount) |
| |
| |
| def testNtp(host): |
| if options.verbose: |
| logger.info("-- test NTP") |
| |
| if host.data.ntp.currenttime < (gpcheck_info.collection_start_time - 1): |
| checkFailed(host.hostname, "potential NTPD issue. gpcheck start time (%s) time on machine (%s)" % (time.ctime(gpcheck_info.collection_start_time), time.ctime(host.data.ntp.currenttime))) |
| if host.data.ntp.currenttime > (gpcheck_info.collection_end_time + 1): |
| checkFailed(host.hostname, "potential NTPD issue. gpcheck end time (%s) time on machine (%s)" % (time.ctime(gpcheck_info.collection_start_time), time.ctime(host.data.ntp.currenttime))) |
| if not host.data.ntp.running: |
| checkFailed(host.hostname, "ntpd not detected on machine") |
| |
| |
| def testGenericLinuxHost(host): |
| logger.info("test on host: %s" % host.hostname) |
| if host.is_namenode: |
| testHAWQGUC(host) |
| testHAWQconfig(host) |
| testHDFSConfig(host) |
| testDiskCapacity(host) |
| testSysctl(host) |
| testLimitsConf(host) |
| testLinuxMounts(host) |
| testNtp(host) |
| |
| else: |
| testHAWQGUC(host) |
| testHAWQconfig(host) |
| testDiskCapacity(host) |
| testHDFSConfig(host) |
| testIOSchedulers(host) |
| testSysctl(host) |
| testLimitsConf(host) |
| testLinuxMounts(host) |
| testNtp(host) |
| |
| |
| def testGenericSolarisHost(host): |
| testSolarisEtcSystem(host) |
| testSolarisEtcProject(host) |
| testSolarisEtcUserAttr(host) |
| |
| def testUnameConsistency(): |
| logger.info("test uname consistency") |
| firstUname = None |
| firstHost = None |
| for _, host in gpcheck_info.hosts.items(): |
| uname = host.data.uname.output |
| if firstUname: |
| if firstUname != uname: |
| checkFailed(h, "uname -r output different among hosts: %s : %s != %s : %s" % (firstHost, firstUname, host.hostname, uname)) |
| else: |
| firstUname = uname |
| firstHost = host.hostname |
| |
| |
| def testGenericLinuxCluster(): |
| for _, host in gpcheck_info.hosts.items(): |
| testGenericLinuxHost(host) |
| testUnameConsistency() |
| |
| def testGenericLinuxClusterBlockDev(): |
| for _, host in gpcheck_info.hosts.items(): |
| if not host.is_namenode: |
| testBlockdev(host) |
| |
| def testGenericSolarisCluster(): |
| for _, host in gpcheck_info.hosts.items(): |
| testGenericSolarisHost(host) |
| testUnameConsistency() |
| |
| |
| def runTests(): |
| if gpcheck_info.host_type == HostType.GPCHECK_HOSTTYPE_GENERIC_LINUX: |
| testGenericLinuxCluster() |
| if gpcheck_info.is_root: |
| testGenericLinuxClusterBlockDev() |
| |
| elif gpcheck_info.host_type == HostType.GPCHECK_HOSTTYPE_GENERIC_SOLARIS: |
| testGenericSolarisCluster() |
| |
| else: |
| raise GpCheckError("No tests exist for this platform in gpcheck") |
| |
| # report checks result |
| logger.info("GPCHECK Result:") |
| logger.info("---------------------------------------") |
| if found_errors: |
| logger.info("check failed!\tfound %s error(s)" % found_errors) |
| else: |
| logger.info("all check succeed!") |
| logger.info("---------------------------------------") |
| |
| |
| def readZip(): |
| logger.info("trying to read zip file '%s'..." % options.zipin) |
| |
| words = options.zipin.split(".tar.gz") |
| if len(words) != 2: |
| raise GpCheckError("--zipin file needs to be a .tar.gz file") |
| fname = words[0] |
| |
| # untar |
| cmdStr = "tar xfz %s" % (options.zipin) |
| if options.verbose: |
| logger.info(cmdStr) |
| try: |
| cmd = Command("tarcmd", cmdStr) |
| pool.addCommand(cmd) |
| pool.join() |
| items = pool.getCompletedItems() |
| for i in items: |
| if i.results.rc or i.results.halt or not i.results.completed: |
| raise Exception("error running command '%s'" % cmdStr) |
| |
| except Exception, e: |
| raise GpCheckError("Failed to extract tar file '%s': %s" % (options.zipin, e)) |
| |
| # move extracted file to temp directory |
| newfname = "%s/%s" % (tmpdir, fname) |
| cmdStr = "mv %s %s" % (fname, newfname) |
| if options.verbose: |
| logger.info(cmdStr) |
| try: |
| cmd = Command("mvcmd", cmdStr) |
| pool.addCommand(cmd) |
| pool.join() |
| items = pool.getCompletedItems() |
| for i in items: |
| if i.results.rc or i.results.halt or not i.results.completed: |
| raise Exception("error running command '%s'" % cmdStr) |
| |
| except Exception, e: |
| raise GpCheckError("Failed to move file '%s' to temp directory: %s" % (fname, e)) |
| |
| # load pickle file |
| global gpcheck_info |
| try: |
| with open(newfname, "rb") as f: |
| gpcheck_info = pickle.load(f) |
| |
| except Exception, e: |
| raise GpCheckError("Failed to load pickle file '%s': %s" % (newfname, e)) |
| |
| logger.info("trying to read zip file '%s' [success]" % options.zipin) |
| |
| |
| def doZip(fname): |
| logger.info("dump gpcheck data into a zip file '%s.tar.gz'..." % fname) |
| |
| # dump to pickle file |
| try: |
| with open(fname, "wb") as f: |
| pickle.dump(gpcheck_info, f) |
| |
| except Exception, e: |
| raise GpCheckError("Failed to dump pickle file '%s':\n%s" % (fname, e)) |
| |
| # make a tar ball |
| cmdStr = "tar cfz %s.tar.gz %s" % (fname, fname) |
| if options.verbose: |
| logger.info(cmdStr) |
| try: |
| cmd = Command("tarcmd", cmdStr) |
| pool.addCommand(cmd) |
| pool.join() |
| items = pool.getCompletedItems() |
| for i in items: |
| if i.results.rc or i.results.halt or not i.results.completed: |
| raise Exception("error running command '%s': %s" % (cmdStr, i.results.stderr.strip())) |
| |
| except Exception, e: |
| raise GpCheckError("Failed to dump gpcheck data into a zip file:\n%s" % e) |
| |
| # delete pickle file |
| cmdStr = "rm -rf %s" % fname |
| if options.verbose: |
| logger.info(cmdStr) |
| try: |
| cmd = Command("rmcmd", cmdStr) |
| pool.addCommand(cmd) |
| pool.join() |
| items = pool.getCompletedItems() |
| for i in items: |
| if i.results.rc or i.results.halt or not i.results.completed: |
| raise Exception("error running command '%s': %s" % (cmdStr, i.results.stderr.strip())) |
| |
| except Exception, e: |
| raise GpCheckError("Failed to delete pickle file '%s':\n%s" % (fname, e)) |
| |
| logger.info("dump gpcheck data into a zip file '%s.tar.gz' [success]" % fname) |
| |
| |
| def doPrint(): |
| for h in sorted(gpcheck_info.hosts): |
| print "HOST: %s" % h |
| print gpcheck_info.hosts[h].data |
| print "----------------------------------------------------------------------\n" |
| |
| if gpcheck_info.hawq_collected_ok: |
| print "HAWQ guc settings:" |
| for guc_name, guc_val in gpcheck_info.hawq_gucs.items(): |
| print "GUC : %s\nMaster value: %s\nSegment value: %s\n" % (guc_name, guc_val[0], guc_val[1]) |
| |
| |
| if __name__ == '__main__': |
| |
| if gpcheck_info.is_root: |
| logger.info("gpcheck will perform block device's readahead checks when run as root") |
| try: |
| try: |
| checkPlatform() |
| parseargs() |
| readConfigFile() |
| |
| except GpCheckError, e: |
| logger.error(str(e)) |
| sys.exit(1) |
| if pool: |
| pool.join() |
| pool.haltWork() |
| pool.joinWorkers() |
| |
| try: |
| tmpdir = tempfile.mkdtemp(prefix='gpcheck') |
| except Exception, e: |
| logger.error("Error creating tmp dir on master: %s" % e) |
| sys.exit(1) |
| |
| try: |
| # Phase 1: collect input |
| if options.zipin: |
| readZip() # load information into gpcheck_info from zip |
| else: |
| # read host info into gpcheck_info.hosts from --file or --host |
| createHostList() |
| # collect each server's system environment configuration |
| runCollections() |
| # read collected data into gpcheck_info |
| readDataFiles() |
| # read HAWQ configuration |
| readHAWQConfiguration() |
| |
| # Phase 2: generate output |
| if options.stdout: |
| doPrint() |
| elif options.zipout: |
| doZip("./gpcheck_%s" % time.time()) |
| else: |
| runTests() |
| if found_errors: |
| sys.exit(1) |
| |
| except GpCheckError, e: |
| logger.error(str(e)) |
| sys.exit(1) |
| |
| finally: |
| logger.info("Clean up...") |
| try: |
| if tmpdir: |
| shutil.rmtree(tmpdir) |
| except Exception, e: |
| logger.error("error removing tempdir during job cleanup: %s" % e) |
| finally: |
| if pool: |
| pool.join() |
| pool.haltWork() |
| pool.joinWorkers() |