| #!/usr/bin/env python |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import os, sys, re, tempfile, shutil, pickle, getpass, subprocess, time, glob, ConfigParser |
| from xml.dom import minidom |
| |
| try: |
| from optparse import Option, OptionParser |
| from gppylib.gpparseopts import OptParser, OptChecker |
| from gppylib.gplog import get_default_logger, setup_tool_logging |
| from gppylib.commands.unix import getLocalHostname, getUserName, SYSTEM |
| from gppylib.commands.base import WorkerPool, Command, REMOTE |
| from gppylib.gpcheckutil import HostType, hosttype_str |
| from hawqpylib.hawqlib import remote_ssh_output |
| from pygresql.pgdb import DatabaseError |
| from pygresql import pg |
| import stat |
| from gppylib.gpsqlUtil import GpsqlUtil |
| import yaml |
| import xml |
| import time |
| import calendar |
| |
| except ImportError, e: |
| sys.exit('Cannot import modules. Please check that you have sourced greenplum_path.sh. Detail: ' + str(e)) |
| |
| |
| class GpCheckError(Exception): |
| pass |
| |
| |
| class GpCheckInfo: |
| def __init__(self): |
| self.is_root = (os.geteuid() == 0) |
| self.host_type = HostType.GPCHECK_HOSTTYPE_UNDEFINED |
| self.appliance_version = None |
| |
| # record gpcheck_hostdump data for each host |
| self.hosts = dict() # hostname => GpCheckHost obj |
| |
| # record HAWQ configuration |
| self.hawq_gucs = dict() # guc name => (master_value, segment_value) |
| self.hawq_segment_configuration = None |
| self.hawq_collected_ok = False # if successfully collect HAWQ gucs |
| |
| self.collection_start_time = 0 # used in NTPD testing |
| self.collection_end_time = 0 # used in NTPD testing |
| |
| |
| class GpCheckHost: |
| def __init__(self, name, is_namenode=False): |
| self.hostname = name |
| self.datafile = None # pickle file on each host |
| self.data = None # `gpcheck_hostdump` collected data for each host |
| self.is_namenode = is_namenode |
| |
| def __str__(self): |
| s = "%s datafile(%s)" % (self.hostname, self.datafile) |
| if self.is_namenode: |
| s += " namenode" |
| return s |
| |
| |
| class GpCheckConfig: |
| def __init__(self): |
| self.parser = ConfigParser.RawConfigParser() |
| self.gpcheck_config_version = 0 |
| |
| self.mount_points = set() |
| self.sysctl_expected = dict() |
| |
| self.limits_expected = { # default value for limits.conf |
| ("soft", "nofile"): 2900000, |
| ("hard", "nofile"): 2900000, |
| ("soft", "nproc") : 131072, |
| ("hard", "nproc") : 131072 } |
| |
| self.diskusage_mounts = [] |
| self.diskusage_usagemax = 90 # max disk usage percentage |
| self.disk_space_total = 1000.0 |
| |
| self.hdfs_expected = { # default value for HDFS configuration |
| "dfs.mem.namenode.heap": 8192, |
| "dfs.mem.datanode.heap": 8192 } |
| self.hdfs_non_expected = {} |
| self.hdfs_ha_expected = {} |
| self.hdfs_kerberos_expected = {} |
| self.hdfs_ha_kerberos_expected = {} |
| |
| self.yarn_expected = {} |
| self.yarn_non_expected = {} |
| self.yarn_ha_expected = {} |
| self.yarn_kerberos_expected = {} |
| self.yarn_ha_kerberos_expected = {} |
| |
| self.hawq_expected = {} |
| self.hawq_kerberos_expected = {} |
| self.hawq_yarn_expected = {} |
| |
| self.default_hash_table_bucket_number = 8 |
| self.hawq_rm_nvseg_perquery_limit = 512 |
| self.hawq_rm_nvseg_perquery_perseg_limit = 8 |
| self.magma_shm_limit_per_block = 1 |
| |
| self.disk_write_min_bandwidth = 200.0 |
| self.disk_read_min_bandwidth = 700.0 |
| self.stream_min_bandwidth = 10000.0 |
| self.network_min_bandwidth = 1000.0 |
| |
| |
| def readConfigFile(self, config_file): |
| parsed_list = self.parser.read(config_file) |
| if len(parsed_list) != 1: |
| raise GpCheckError("cannot open file!") |
| |
| if not self.parser.has_section("linux.sysctl"): |
| raise GpCheckError("require section 'linux.sysctl'") |
| |
| section = "global" |
| if self.parser.has_option(section, "configfile_version"): |
| self.gpcheck_config_version = self.parser.getint(section, "configfile_version") |
| |
| section = "linux.mount" |
| if self.parser.has_option(section, "mount.points"): |
| for p in self.parser.get(section, "mount.points").split(","): |
| self.mount_points.add(p.strip()) |
| |
| section = 'linux.sysctl' |
| for opt in self.parser.options(section): |
| if re.match('sysctl\.', opt): |
| fields = opt.split('sysctl.') |
| if len(fields) != 2: |
| raise GpCheckError("Bad config line entry '%s'" % opt) |
| self.sysctl_expected[fields[1]] = self.parser.get(section, opt) |
| |
| section = "linux.limits" |
| for opt in self.parser.options(section): |
| key = tuple(opt.split(".")) |
| self.limits_expected[key] = self.parser.getint(section, opt) |
| |
| section = "linux.diskusage" |
| if self.parser.has_option(section, "diskusage.monitor.mounts"): |
| self.diskusage_mounts = [m.strip() for m in self.parser.get(section, "diskusage.monitor.mounts").split(",")] |
| if self.parser.has_option(section, "diskusage.monitor.usagemax"): |
| self.diskusage_usagemax = self.parser.get(section, "diskusage.monitor.usagemax") |
| try: |
| if self.diskusage_usagemax[-1] == "%": |
| self.diskusage_usagemax = int(self.diskusage_usagemax[:-1]) |
| else: |
| self.diskusage_usagemax = int(self.diskusage_usagemax) |
| |
| except Exception, e: |
| raise GpCheckError("Bad config entry value '%s' for 'diskusage.monitor.usagemax': %s" % |
| (self.diskusage_usagemax, e)) |
| if self.parser.has_option(section, "disk.space.total.GB"): |
| try: |
| self.disk_space_total = float(self.parser.get(section, "disk.space.total.GB")) |
| except expression as e: |
| raise GpCheckError("Fail to read section 'linux.diskusage': %s" % (str(e))) |
| |
| if not self.parser.has_section('hdfs.base'): |
| if not self.parser.has_section("hdfs"): |
| raise GpCheckError("require section 'hdfs'") |
| |
| section = 'hdfs' |
| for opt in self.parser.options(section): |
| self.hdfs_expected[opt] = self.parser.get(section, opt) |
| try: |
| self.hdfs_expected["dfs.mem.namenode.heap"] = int(self.hdfs_expected["dfs.mem.namenode.heap"]) |
| self.hdfs_expected["dfs.mem.datanode.heap"] = int(self.hdfs_expected["dfs.mem.datanode.heap"]) |
| except ValueError, e: |
| raise GpCheckError("'dfs.mem.namenode.heap' or 'dfs.mem.namenode.heap' should be a number: %s" % e) |
| else: |
| section = 'hdfs.base' |
| for opt in self.parser.options(section): |
| self.hdfs_expected[opt] = self.parser.get(section, opt) |
| try: |
| self.hdfs_expected["dfs.mem.namenode.heap"] = int(self.hdfs_expected["dfs.mem.namenode.heap"]) |
| self.hdfs_expected["dfs.mem.datanode.heap"] = int(self.hdfs_expected["dfs.mem.datanode.heap"]) |
| except ValueError, e: |
| raise GpCheckError("'dfs.mem.namenode.heap' or 'dfs.mem.namenode.heap' should be a number: %s" % e) |
| |
| section = 'hdfs.non' |
| for opt in self.parser.options(section): |
| self.hdfs_non_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'hdfs.ha' |
| for opt in self.parser.options(section): |
| self.hdfs_ha_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'hdfs.kerberos' |
| for opt in self.parser.options(section): |
| self.hdfs_kerberos_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'hdfs.ha.kerberos' |
| for opt in self.parser.options(section): |
| self.hdfs_ha_kerberos_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'yarn.base' |
| for opt in self.parser.options(section): |
| self.yarn_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'yarn.non' |
| for opt in self.parser.options(section): |
| self.yarn_non_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'yarn.ha' |
| for opt in self.parser.options(section): |
| self.yarn_ha_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'yarn.kerberos' |
| for opt in self.parser.options(section): |
| self.yarn_kerberos_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'yarn.ha.kerberos' |
| for opt in self.parser.options(section): |
| self.yarn_ha_kerberos_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'hawq.base' |
| for opt in self.parser.options(section): |
| self.hawq_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'hawq.kerberos' |
| for opt in self.parser.options(section): |
| self.hawq_kerberos_expected[opt] = self.parser.get(section, opt) |
| |
| section = 'hawq.yarn' |
| for opt in self.parser.options(section): |
| self.hawq_yarn_expected[opt] = self.parser.get(section, opt) |
| |
| section = "hawq.guc" |
| if self.parser.has_option(section, "default.hash.table.bucket.number"): |
| try: |
| self.default_hash_table_bucket_number = int(self.parser.get(section, "default.hash.table.bucket.number")) |
| except expression as e: |
| raise GpCheckError("Fail to read section 'hawq.guc': %s" % (str(e))) |
| if self.parser.has_option(section, "hawq.rm.nvseg.perquery.limit"): |
| try: |
| self.hawq_rm_nvseg_perquery_limit = int(self.parser.get(section, "hawq.rm.nvseg.perquery.limit")) |
| except expression as e: |
| raise GpCheckError("Fail to read section 'hawq.guc': %s" % (str(e))) |
| if self.parser.has_option(section, "hawq.rm.nvseg.perquery.perseg.limit"): |
| try: |
| self.hawq_rm_nvseg_perquery_perseg_limit = int(self.parser.get(section, "hawq.rm.nvseg.perquery.perseg.limit")) |
| except expression as e: |
| raise GpCheckError("Fail to read section 'hawq.guc': %s" % (str(e))) |
| if self.parser.has_option(section, "magma.shm.limit.per.block"): |
| try: |
| self.magma_shm_limit_per_block = int(self.parser.get(section, "magma.shm.limit.per.block")) |
| except expression as e: |
| raise GpCheckError("Fail to read section 'hawq.guc': %s" % (str(e))) |
| |
| section = "hardware.performance" |
| if self.parser.has_option(section, "disk.write.min.bandwidth.MBs"): |
| try: |
| self.disk_write_min_bandwidth = float(self.parser.get(section, "disk.write.min.bandwidth.MBs")) |
| except expression as e: |
| raise GpCheckError("Fail to read section 'hardware.performance': %s" % (str(e))) |
| if self.parser.has_option(section, "disk.read.min.bandwidth.MBs"): |
| try: |
| self.disk_read_min_bandwidth = float(self.parser.get(section, "disk.read.min.bandwidth.MBs")) |
| except expression as e: |
| raise GpCheckError("Fail to read section 'hardware.performance': %s" % (str(e))) |
| if self.parser.has_option(section, "stream.min.bandwidth.MBs"): |
| try: |
| self.stream_min_bandwidth = float(self.parser.get(section, "stream.min.bandwidth.MBs")) |
| except expression as e: |
| raise GpCheckError("Fail to read section 'hardware.performance': %s" % (str(e))) |
| if self.parser.has_option(section, "network.min.bandwidth.MBs"): |
| try: |
| self.network_min_bandwidth = float(self.parser.get(section, "network.min.bandwidth.MBs")) |
| except expression as e: |
| raise GpCheckError("Fail to read section 'hardware.performance': %s" % (str(e))) |
| |
| ###### Global Variables ############# |
| logger = get_default_logger() |
| EXECNAME = os.path.split(__file__)[-1] |
| setup_tool_logging(EXECNAME,getLocalHostname(),getUserName()) |
| |
| options = None |
| GPHOME = None |
| GPCHECK_CONFIG_FILE = None |
| HADOOP_HOME = None |
| |
| gpcheck_info = GpCheckInfo() |
| gpcheck_config = GpCheckConfig() |
| pool = WorkerPool() |
| tmpdir = None |
| found_errors = 0 |
| |
| HAWQ_GUC_MEMORY = "hawq_re_memory_overcommit_max" |
| |
| |
| def checkPlatform(): |
| host_type_map = { "linux": HostType.GPCHECK_HOSTTYPE_GENERIC_LINUX, |
| "sunos": HostType.GPCHECK_HOSTTYPE_GENERIC_SOLARIS } |
| try: |
| gpcheck_info.host_type = host_type_map[SYSTEM.getName()] |
| logger.info("Detected platform: %s" % hosttype_str(gpcheck_info.host_type)) |
| |
| except KeyError: |
| raise GpCheckError("No tests exists for this platform in gpcheck") |
| |
| |
| def parse_host_list_file(host_file): |
| host_list = list() |
| with open(host_file) as f: |
| hosts = f.readlines() |
| for host in hosts: |
| host = host.split("#",1)[0].strip() |
| if host: |
| host_list.append(host) |
| return host_list |
| |
| def parseargs(): |
| global options, GPHOME, HADOOP_HOME, GPCHECK_CONFIG_FILE |
| |
| parser = OptParser(option_class=OptChecker, version='%prog version $Revision: #1 $') |
| parser.remove_option('-h') |
| parser.add_option('-?', '--help', action='help') |
| parser.add_option('--verbose', action='store_true') |
| parser.add_option('--stdout', action='store_true') |
| parser.add_option('--zipout', action='store_true') |
| parser.add_option('--zipin', type='string') |
| parser.add_option('--gphome', type='string') |
| # for HDFS xml and memory check |
| parser.add_option('--hadoop', '--hadoop-home', type='string') |
| parser.add_option('--hdfs', action='store_true') |
| parser.add_option('--hdfs-ha', dest="hdfs_ha", action='store_true') |
| parser.add_option('--yarn', action='store_true') |
| parser.add_option('--yarn-ha', dest="yarn_ha", action='store_true') |
| parser.add_option('--kerberos', action='store_true') |
| |
| parser.add_option('-c', '--config', type='string') # optional: gpcheck config file path |
| parser.add_option('-f', '--file', type='string') # host file, for testing a list of hosts |
| parser.add_option('-h', '--host', type='string') # test a single host |
| |
| # Adding new options: postSetup postUpgrade weeklyExam |
| parser.add_option('--postSetup', action='store_true') |
| parser.add_option('--postUpgrade', action='store_true') |
| parser.add_option('--weeklyExamine', action='store_true') |
| parser.add_option('-t', '--table', type='string') # optional: table to analyze |
| |
| (options, args) = parser.parse_args() |
| if len(args) > 0: |
| if args[0] == 'help': |
| parser.print_help(sys.stderr) |
| sys.exit(0) |
| |
| # GPHOME must be found |
| GPHOME = options.gphome if options.gphome else os.environ.get("GPHOME") |
| if not GPHOME: |
| raise GpCheckError("GPHOME not set, must be specified in --gphome") |
| GPCHECK_CONFIG_FILE = options.config if options.config else "%s/etc/gpcheck.cnf" % GPHOME |
| logger.info("Checks uses config file: %s", GPCHECK_CONFIG_FILE) |
| |
| HADOOP_HOME = options.hadoop if options.hadoop else os.environ.get("HADOOP_HOME") |
| |
| if not HADOOP_HOME: |
| checkFailed(None, "utility will SKIP HDFS configuration check because HADOOP_HOME is not specified in environment variable or --hadoop") |
| |
| if options.yarn and not HADOOP_HOME: |
| options.yarn = False |
| checkFailed(None, "utility will SKIP YARN configuration check because HADOOP_HOME is not specified in environment variable or --hadoop") |
| |
| # params check |
| if not options.file and not options.host and not options.zipin: |
| raise GpCheckError(" --file or --host or --zipin must be specified") |
| |
| if options.file and options.host: |
| raise GpCheckError(" You can specify either --file or --host, but not both") |
| |
| if options.stdout and options.zipout: |
| raise GpCheckError(" You can specify either --stdout or --zipout, but not both") |
| |
| |
| def readConfigFile(): |
| try: |
| gpcheck_config.readConfigFile(GPCHECK_CONFIG_FILE) |
| |
| except Exception, e: |
| raise GpCheckError("Field to read gpcheck config file '%s':\n%s" % (GPCHECK_CONFIG_FILE, e)) |
| |
| |
| def checkFailed(host, msg): |
| global found_errors |
| found_errors += 1 |
| if host: |
| logger.error("host(%s): %s", host, msg) |
| else: |
| logger.error(msg) |
| |
| |
| def getHDFSNamenodeHost(): |
| core_site_file = os.path.join(HADOOP_HOME, "etc/hadoop/core-site.xml") |
| hdfs_site_file = os.path.join(HADOOP_HOME, "etc/hadoop/hdfs-site.xml") |
| logger.info("try to detect namenode from %s" % core_site_file) |
| |
| # for processing property xml |
| getPropName = lambda node: node.getElementsByTagName('name')[0].childNodes[0].data |
| getPropValue = lambda node: node.getElementsByTagName('value')[0].childNodes[0].data |
| |
| # read namenode address from core-site.xml |
| with open(core_site_file) as f: |
| xmldoc = minidom.parse(f) |
| namenode_addr = '' |
| for node in xmldoc.getElementsByTagName('property'): |
| if getPropName(node) == 'fs.default.name' or getPropName(node) == 'fs.defaultFS': |
| fsurl = getPropValue(node).strip() |
| namenode_list_alias = re.search(r"//([^:/]*)", fsurl).group(1) |
| if_ha_disabled = re.search(".*:[0-9]+$", fsurl) |
| if if_ha_disabled: |
| namenode_addr = namenode_list_alias |
| else: |
| namenode_addr = '' |
| break |
| |
| # run hostname command on remote to get actual hostname |
| if namenode_addr == '': |
| ha_namenode_list = '' |
| default_namenode_alias = '' |
| with open(hdfs_site_file) as f: |
| xmldoc = minidom.parse(f) |
| for node in xmldoc.getElementsByTagName('property'): |
| if re.search('dfs.ha.namenodes.*', getPropName(node).strip()): |
| ha_namenode_list = getPropValue(node).strip() |
| default_namenode_alias = ha_namenode_list.split(',')[0].strip() |
| break |
| |
| if ha_namenode_list == '': |
| logger.error("cannot detect namenode from %s" % core_site_file) |
| raise GpCheckError("cannot detect namenode from %s" % core_site_file) |
| #sys.exit(1) |
| else: |
| with open(hdfs_site_file) as f: |
| xmldoc = minidom.parse(f) |
| for node in xmldoc.getElementsByTagName('property'): |
| namenode_rpc_address = "dfs.namenode.rpc-address.%s.%s" % (namenode_list_alias, |
| default_namenode_alias) |
| if getPropName(node) == namenode_rpc_address: |
| default_namenode_rpc_address = getPropValue(node).strip() |
| namenode_addr = default_namenode_rpc_address.split(':')[0].strip() |
| |
| if namenode_addr == '': |
| raise GpCheckError("cannot detect namenode from %s" % core_site_file) |
| else: |
| cmd = Command(namenode_addr, "hostname", REMOTE, namenode_addr) |
| pool.addCommand(cmd) |
| pool.join() |
| items = pool.getCompletedItems() |
| for i in items: |
| if i.results.rc or i.results.halt or not i.results.completed: |
| raise Exception("error running 'hostname' command: %s" % i.results.stderr.strip()) |
| namenode_host = i.results.stdout.strip() |
| logger.info("detect namenode hostname to be %s" % namenode_host) |
| return namenode_host |
| |
| def createHostList(): |
| if options.verbose: |
| logger.info("trying to deduplicate hosts...") |
| |
| hostlist = [] |
| # read the host file if present |
| if options.file: |
| try: |
| with open(options.file, "r") as f: |
| hostlist = [line.strip() for line in f.readlines() if line.strip()] |
| |
| except IOError, e: |
| raise GpCheckError("error reading host file '%s': %s" % (options.file, str(e))) |
| else: |
| hostlist.append(options.host) |
| |
| # get actual hostname and deduplicate |
| try: |
| for hostname in hostlist: |
| cmd = Command(hostname, "hostname", REMOTE, hostname) |
| pool.addCommand(cmd) |
| |
| pool.join() |
| items = pool.getCompletedItems() |
| for i in items: |
| if i.results.rc or i.results.halt or not i.results.completed: |
| raise Exception("error running 'hostname' on host '%s': %s" % (i.remoteHost, i.results.stderr.strip())) |
| |
| actualHostname = i.results.stdout.strip() |
| if actualHostname not in gpcheck_info.hosts: |
| gpcheck_info.hosts[actualHostname] = GpCheckHost(actualHostname) |
| |
| except Exception, e: |
| raise GpCheckError("failed to collect 'hostname' on servers: %s" % str(e)) |
| |
| if options.verbose: |
| logger.info("trying to deduplicate hosts [success]") |
| |
| if HADOOP_HOME: |
| try: |
| namenode_host = getHDFSNamenodeHost() |
| if namenode_host in hostlist: |
| gpcheck_info.hosts[namenode_host] = GpCheckHost(namenode_host, is_namenode=True) |
| else: |
| logger.warning("utility will skip HDFS namenode check since it's not in current host list.") |
| |
| except Exception, e: |
| checkFailed(None, "utility will SKIP HDFS namenode check: %s" % str(e)) |
| |
| |
| def runCollections(): |
| logger.info("trying to collect server configuration...") |
| |
| # run gpcheck_hostdump on each server |
| runCollectionOnServers() |
| # copy hostdump file to master |
| copyFilesLocally() |
| # delete hostdump file on remote servers |
| deleteRemoteFiles() |
| |
| logger.info("trying to collect server configuration [success]") |
| |
| |
| def runCollectionOnServers(): |
| gpcheck_info.collection_start_time = time.time() |
| |
| def getDumpCommand(): |
| if gpcheck_info.host_type == HostType.GPCHECK_HOSTTYPE_GENERIC_LINUX: |
| host_type_cl = "--linux" |
| elif gpcheck_info.host_type == HostType.GPCHECK_HOSTTYPE_GENERIC_SOLARIS: |
| host_type_cl = "--solaris" |
| else: |
| raise GpCheckError("unsupported host type") |
| |
| cmd = "%s/sbin/gpcheck_hostdump --hawq %s" % (GPHOME, host_type_cl) |
| cmd += " --sysctl %s" % ",".join(gpcheck_config.sysctl_expected.keys()) |
| if HADOOP_HOME: |
| cmd += " --hadoop %s" % HADOOP_HOME |
| if options.yarn or options.yarn_ha: |
| cmd += " --yarn" |
| return cmd |
| |
| try: |
| cmdStr = getDumpCommand() |
| for host in gpcheck_info.hosts: |
| if options.verbose: |
| logger.info("collect data on host: %s" % host) |
| |
| cmd = Command(host, cmdStr, REMOTE, host) |
| pool.addCommand(cmd) |
| |
| pool.join() |
| items = pool.getCompletedItems() |
| for i in items: |
| if i.results.rc or i.results.halt or not i.results.completed: |
| raise Exception("error running gpcheck_hostdump on '%s': %s" % (i.remoteHost, i.results.stderr.strip())) |
| |
| gpcheck_info.hosts[i.remoteHost].datafile = i.results.stdout.strip() |
| |
| except Exception, e: |
| raise GpCheckError("Failed to collect data from servers:\n%s" % e) |
| |
| gpcheck_info.collection_end_time = time.time() |
| |
| |
| def copyFilesLocally(): |
| if options.verbose: |
| logger.info("copy hostdump files from remote servers to master") |
| |
| try: |
| for host in gpcheck_info.hosts: |
| cmdStr = "scp %s:%s %s/%s.data" % (host, gpcheck_info.hosts[host].datafile, tmpdir, host) |
| if options.verbose: |
| logger.info(cmdStr) |
| cmd = Command(host, cmdStr) |
| pool.addCommand(cmd) |
| |
| pool.join() |
| items = pool.getCompletedItems() |
| for i in items: |
| if i.results.rc or i.results.halt or not i.results.completed: |
| raise Exception("error running command %s: %s" % (i.cmdStr, i.results.stderr.strip())) |
| |
| except Exception, e: |
| raise GpCheckError("Failed to scp remote hostdump file to master:\n%s" % e) |
| |
| |
| def deleteRemoteFiles(): |
| if options.verbose: |
| logger.info("delete hostdump files on remote servers") |
| |
| try: |
| for host in gpcheck_info.hosts: |
| cmdStr = "rm -f %s" % gpcheck_info.hosts[host].datafile |
| if options.verbose: |
| logger.info(cmdStr) |
| cmd = Command(host, cmdStr, REMOTE, host) |
| pool.addCommand(cmd) |
| |
| pool.join() |
| items = pool.getCompletedItems() |
| for i in items: |
| if i.results.rc or i.results.halt or not i.results.completed: |
| raise Exception("error running command %s: %s" % (i.cmdStr, i.results.stderr.strip())) |
| |
| except Exception, e: |
| raise GpCheckError("Failed to delete remote hostdump file:\n%s" % e) |
| |
| |
| def readDataFiles(): |
| for host in gpcheck_info.hosts: |
| fname = "%s/%s.data" % (tmpdir, host) |
| try: |
| with open(fname, "rb") as f: |
| gpcheck_info.hosts[host].data = pickle.load(f) |
| |
| except Exception, e: |
| raise GpCheckError("Failed to load pickle file '%s': %s" % (fname, e)) |
| |
| |
| def readHAWQConfiguration(): |
| ''' |
| This function is used to check hawq_re_memory_overcommit_max parameter. |
| |
| Sets the maximum quota of memory overcommit (in MB) per physical segment for resource enforcement. |
| |
| This parameter sets the memory quota that can be overcommited beyond the memory quota dynamically assigned by the resource manager. |
| ''' |
| if options.verbose: |
| logger.info("trying to collect HAWQ configuration...") |
| |
| try: |
| _sqlUtil = GpsqlUtil() |
| gpcheck_info.hawq_segment_configuration = _sqlUtil.performQuery("select * from gp_segment_configuration;") |
| except expression as identifier: |
| logger.error("utility cannot perform HAWQ CPU and Memory check because failed to connect to HAWQ") |
| logger.error(str(e)) |
| |
| # read Memory GUC using hawqconfig |
| command = "hawqconfig -s %s" % HAWQ_GUC_MEMORY |
| p = subprocess.Popen(command, shell = True, |
| stdout = subprocess.PIPE, stderr = subprocess.PIPE) |
| result = p.communicate() |
| match_master = re.search(r'Value : (\d+)', result[0]) |
| |
| if match_master: |
| gpcheck_info.hawq_gucs[HAWQ_GUC_MEMORY] = (int(match_master.group(1))) |
| else: |
| checkFailed(None, "utility cannot perform HAWQ Memory check because failed to get GUC value using '%s'" % command) |
| return |
| |
| gpcheck_info.hawq_collected_ok = True |
| if options.verbose: |
| logger.info("trying to collect HAWQ configuration [success]") |
| |
| |
| def testConnectEmc(host): |
| if not host.is_a_master: |
| return |
| |
| expected = "Running" |
| |
| if host.data.connectemc.output != expected: |
| checkFailed(host.hostname, "Connect EMC is not running on master (try /etc/init.d/connectemc status)") |
| |
| |
| |
| def testSolarisEtcSystem(host): |
| requiredValues = { 'rlim_fd_cur' : '65536', |
| 'zfs:zfs_arc_max' : '0x600000000', |
| 'pcplusmp:apic_panic_on_nmi' : '1', |
| 'nopanicdebug' : '1' } |
| |
| results = dict() |
| |
| for k in requiredValues.keys(): |
| results[k] = 0 |
| |
| for key in host.data.etc_system.parameters.keys(): |
| |
| if key not in requiredValues: |
| continue |
| |
| foundValue = host.data.etc_system.parameters[key] |
| if foundValue == requiredValues[key]: |
| results[key] = 1 |
| |
| for k in results.keys(): |
| |
| if results[k]: |
| continue |
| |
| checkFailed(host.hostname, "/etc/system is missing expected line 'set %s=%s'" % (k, requiredValues[k])) |
| |
| |
| def testSolarisEtcProject(host): |
| |
| requiredValues = { 'default:3::::project.max-sem-ids=(priv,1024,deny);process.max-file-descriptor=(priv,252144,deny)' : 0 } |
| |
| unexpectedValues = set(['default:3::::']) |
| |
| for line in host.data.etc_project.lines: |
| if line in unexpectedValues: |
| checkFailed(host.hostname, "unexpected line in /etc/project: '%s'" % line) |
| continue |
| |
| if line in requiredValues: |
| requiredValues[line] = 1 |
| |
| for line in requiredValues.keys(): |
| if requiredValues[line]: |
| continue |
| |
| checkFailed(host.hostname, "/etc/project is missing expected line '%s'" % line) |
| |
| |
| def testSolarisEtcUserAttr(host): |
| |
| requiredValues = { 'gpadmin::::defaultpriv=basic,dtrace_user,dtrace_proc' : 0 } |
| |
| for line in host.data.etc_user_attr.lines: |
| if line in requiredValues: |
| requiredValues[line] = 1 |
| |
| for line in requiredValues.keys(): |
| if requiredValues[line]: |
| continue |
| |
| checkFailed(host.hostname, "/etc/user_attr is missing expected line '%s'" % line) |
| |
| |
| def testHAWQGUC(host): |
| if not gpcheck_info.hawq_collected_ok: |
| return |
| |
| if options.verbose: |
| logger.info("-- test HAWQ CPU/Memory Guc Settings") |
| |
| c = gpcheck_info.hawq_segment_configuration |
| master_hostname = filter(lambda x: x['role'] == 'm', c)[0]['hostname'] |
| |
| if host.hostname not in map(lambda x: x['hostname'], c): |
| logger.warning("host '%s' is not in HAWQ array" % host.hostname) |
| return |
| |
| actual_total_memory = host.data.machine.memory_in_MB |
| |
| guc_vmemsize_master = gpcheck_info.hawq_gucs[HAWQ_GUC_MEMORY] |
| # segment count on this host |
| num_segments = len(filter(lambda x: x['hostname'] == host.hostname, c)) |
| |
| if host.hostname == master_hostname: |
| if num_segments > 1: |
| checkFailed(host.hostname, "HAWQ master host has segments configured") |
| |
| if actual_total_memory < guc_vmemsize_master: |
| checkFailed(host.hostname, "HAWQ master host memory size '%s' is less than the '%s' size '%s'" % ( |
| actual_total_memory, HAWQ_GUC_MEMORY, guc_vmemsize_master)) |
| return |
| |
| # check HAWQ master's memory size |
| expected_vmemory_size = 8192 |
| if guc_vmemsize_master != expected_vmemory_size: |
| checkFailed(host.hostname, "HAWQ master's %s GUC value is %s, expected %s" % ( |
| HAWQ_GUC_MEMORY, guc_vmemsize_master, expected_vmemory_size)) |
| |
| else: |
| datanode_mem = gpcheck_config.hdfs_expected["dfs.mem.datanode.heap"] |
| |
| # check HAWQ memory size |
| if actual_total_memory < datanode_mem: |
| checkFailed(host.hostname, "HAWQ segment's host memory size '%s' is less than the expected data node memory size '%s'" % ( |
| actual_total_memory, datanode_mem)) |
| logger.warning("please change the expected data node memory 'dfs.mem.datanode.heap' in gpcheck.cnf file") |
| logger.warning("SKIP '%s' check" %(HAWQ_GUC_MEMORY)) |
| return |
| expect_vmemsize_per_segment = 8192 |
| if guc_vmemsize_master != expect_vmemsize_per_segment: |
| checkFailed(host.hostname, "HAWQ segment's %s GUC value on this host is %s, expected %s" % ( |
| HAWQ_GUC_MEMORY, guc_vmemsize_master, expect_vmemsize_per_segment)) |
| |
| |
| def testDiskCapacity(host): |
| if options.verbose: |
| logger.info("-- test Disk Capacity") |
| |
| for line in host.data.diskusage.lines: |
| if len(gpcheck_config.diskusage_mounts) == 0 or line.mount in gpcheck_config.diskusage_mounts: |
| actual_usage = int(line.used_percent[:-1]) |
| if actual_usage > gpcheck_config.diskusage_usagemax: |
| checkFailed(host.hostname, |
| "potential disk full risk: %s mounted on %s has used %s space" % ( |
| line.fs, line.mount, line.used_percent)) |
| return |
| |
| |
| def testHAWQconfig(host): |
| hawq = host.data.hawq |
| hdfs = host.data.hdfs |
| if hawq is None: |
| return # skip HAWQ test when hawq is None |
| |
| if hdfs is None: |
| return # skip HAWQ test when hdfs is None |
| |
| if options.verbose: |
| logger.info("-- test HAWQ config") |
| |
| if hawq.errormsg: |
| checkFailed(host.hostname, "collect HAWQ configuration error: %s" % hawq.errormsg) |
| return |
| |
| datanode_list = list() |
| if HADOOP_HOME: |
| datanode_list = parse_host_list_file("%s/etc/hadoop/slaves" % HADOOP_HOME) |
| is_datanode = False |
| if host.hostname in datanode_list: |
| is_datanode = True |
| |
| expect_config = gpcheck_config.hawq_expected |
| |
| if options.kerberos: |
| expect_config.update(gpcheck_config.hawq_kerberos_expected) |
| |
| if options.yarn or options.yarn_ha: |
| expect_config.update(gpcheck_config.hawq_yarn_expected) |
| |
| actual_config = hawq.site_config |
| hdfs_actual_config = hdfs.site_config |
| |
| for exp_key, exp_val in expect_config.items(): |
| if exp_key not in actual_config: |
| checkFailed(host.hostname, "HAWQ configuration missing: '%s' needs to be set to '%s'" % (exp_key, exp_val)) |
| |
| else: |
| actual_val = actual_config[exp_key] |
| et = (exp_key, exp_val, actual_val) |
| |
| if exp_key == "dfs.block.local-path-access.user": |
| if exp_val not in actual_val.split(','): |
| checkFailed(host.hostname, "HDFS configuration: '%s' should include user '%s', actual value is '%s'" % et) |
| elif exp_key == "dfs.namenode.handler.count": |
| if int(exp_val) > int(actual_val): |
| checkFailed(host.hostname, "HDFS configuration: '%s' should be at least '%s', actual value is '%s'" % et) |
| else: |
| if exp_val != actual_val: |
| checkFailed(host.hostname, "HAWQ configuration: expected '%s' for '%s', actual value is '%s'" % et) |
| |
| if not options.kerberos: |
| if 'hadoop.security.authentication' in actual_config: |
| if actual_config['hadoop.security.authentication'] != 'simple': |
| checkFailed(host.hostname, "HAWQ configuration: expected '%s' for '%s', actual value is '%s'" % ('simple', 'hadoop.security.authentication', actual_config['hadoop.security.authentication'])) |
| |
| if 'hadoop.security.authentication' in hdfs_actual_config: |
| if hdfs_actual_config['hadoop.security.authentication'] != 'simple': |
| checkFailed(host.hostname, "HAWQ configuration: expected '%s' for '%s', actual value is '%s'" % ('simple', 'hadoop.security.authentication', hdfs_actual_config['hadoop.security.authentication'])) |
| |
| if options.yarn or options.yarn_ha: |
| hawq_yarn_property_exist_list = ['hawq_rm_yarn_address', 'hawq_rm_yarn_scheduler_address', 'hawq_rm_yarn_app_name'] |
| for item in hawq_yarn_property_exist_list: |
| if item in actual_config: |
| if not actual_config[item]: |
| checkFailed(host.hostname, "HAWQ configuration: yarn.resourcemanager.address is empty") |
| else: |
| checkFailed(host.hostname, "HAWQ configuration: yarn.resourcemanager.address not defined") |
| |
| if 'dfs.client.read.shortcircuit' not in actual_config: |
| checkFailed(host.hostname, "HAWQ configuration dfs.client.read.shortcircuit not defined") |
| |
| if 'dfs.client.read.shortcircuit' not in hdfs_actual_config: |
| checkFailed(host.hostname, "HAWQ configuration dfs.client.read.shortcircuit not defined") |
| |
| if 'dfs.domain.socket.path' not in actual_config: |
| checkFailed(host.hostname, "HAWQ configuration dfs.domain.socket.path not defined") |
| |
| if 'dfs.domain.socket.path' not in hdfs_actual_config: |
| checkFailed(host.hostname, "HDFS configuration dfs.domain.socket.path not defined") |
| |
| if is_datanode and 'dfs.domain.socket.path' in actual_config and 'dfs.domain.socket.path' in hdfs_actual_config: |
| if actual_config['dfs.domain.socket.path'] != hdfs_actual_config['dfs.domain.socket.path']: |
| checkFailed(host.hostname, "HAWQ configuration: dfs.domain.socket.path expect to have the same value with HDFS configuration") |
| else: |
| cmd = "ls -l %s" % actual_config['dfs.domain.socket.path'] |
| (result, output, errmsg) = remote_ssh_output(cmd, host.hostname, '') |
| if result == 0: |
| if output.split(' ')[0][7:9] != 'rw': |
| checkFailed(host.hostname, "HAWQ configuration dfs.domain.socket.path: %s should have R/W access for both hawq and HDFS on %s" % (actual_config['dfs.domain.socket.path'], host.hostname)) |
| else: |
| checkFailed(host.hostname, "HAWQ configuration dfs.domain.socket.path: %s, does not exist on %s" % (actual_config['dfs.domain.socket.path'], host.hostname)) |
| |
| if 'output.replace-datanode-on-failure' in actual_config and len(datanode_list) > 0: |
| if len(datanode_list) < 4: |
| if actual_config['output.replace-datanode-on-failure'] == 'true': |
| checkFailed(host.hostname, "HAWQ configuration: output.replace-datanode-on-failure expect false, current is true") |
| else: |
| if actual_config['output.replace-datanode-on-failure'] == 'false': |
| checkFailed(host.hostname, "HAWQ configuration: output.replace-datanode-on-failure expect true, current is false") |
| else: |
| checkFailed(host.hostname, "HAWQ configuration: output.replace-datanode-on-failure not defined") |
| |
| |
| def testDiskCapacity(host): |
| if options.verbose: |
| logger.info("-- test Disk Capacity") |
| |
| for line in host.data.diskusage.lines: |
| if len(gpcheck_config.diskusage_mounts) == 0 or line.mount in gpcheck_config.diskusage_mounts: |
| actual_usage = int(line.used_percent[:-1]) |
| if actual_usage > gpcheck_config.diskusage_usagemax: |
| checkFailed(host.hostname, |
| "potential disk full risk: %s mounted on %s has used %s space" % ( |
| line.fs, line.mount, line.used_percent)) |
| return |
| |
| |
| def testHDFSConfig(host): |
| hdfs = host.data.hdfs |
| if hdfs is None: |
| return # skip HDFS test when hdfs is None |
| |
| if options.verbose: |
| logger.info("-- test HDFS config") |
| |
| if hdfs.errormsg: |
| checkFailed(host.hostname, "collect HDFS configuration error: %s" % hdfs.errormsg) |
| return |
| |
| expect_config = gpcheck_config.hdfs_expected |
| |
| if not options.hdfs_ha and not options.kerberos: |
| expect_config.update(gpcheck_config.hdfs_non_expected) |
| |
| if options.hdfs_ha and not options.kerberos: |
| expect_config.update(gpcheck_config.hdfs_ha_expected) |
| |
| if options.kerberos and not options.hdfs_ha: |
| expect_config.update(gpcheck_config.hdfs_kerberos_expected) |
| |
| if options.kerberos and options.hdfs_ha: |
| expect_config.update(gpcheck_config.hdfs_ha_kerberos_expected) |
| |
| |
| if options.yarn or options.yarn_ha: |
| expect_config.update(gpcheck_config.yarn_expected) |
| if not options.yarn_ha and not options.kerberos: |
| expect_config.update(gpcheck_config.yarn_non_expected) |
| |
| if options.yarn_ha: |
| expect_config.update(gpcheck_config.yarn_ha_expected) |
| if options.kerberos: |
| expect_config.update(gpcheck_config.yarn_kerberos_expected) |
| |
| actual_config = hdfs.site_config |
| actual_heap_size = hdfs.namenode_heap_size if host.is_namenode else hdfs.datanode_heap_size |
| |
| if host.data.machine.memory_in_MB < actual_heap_size: |
| checkFailed(host.hostname, "host memory size '%s' is less than the java max heap size '%s'" % (host.data.machine.memory_in_MB, actual_heap_size)) |
| |
| # test hdfs_site.xml setting |
| for exp_key, exp_val in expect_config.items(): |
| if exp_key.startswith("dfs.mem"): |
| continue # these options belongs to memory tests |
| |
| if exp_key not in actual_config: |
| checkFailed(host.hostname, "HDFS configuration missing: '%s' needs to be set to '%s'" % (exp_key, exp_val)) |
| |
| else: |
| actual_val = actual_config[exp_key] |
| et = (exp_key, exp_val, actual_val) |
| |
| if exp_key == "dfs.block.local-path-access.user": |
| if exp_val not in actual_val.split(','): |
| checkFailed(host.hostname, "HDFS configuration: '%s' should include user '%s', actual value is '%s'" % et) |
| |
| elif exp_key == "dfs.namenode.handler.count": |
| if int(exp_val) > int(actual_val): |
| checkFailed(host.hostname, "HDFS configuration: '%s' should be at least '%s', actual value is '%s'" % et) |
| |
| else: |
| if exp_val != actual_val: |
| checkFailed(host.hostname, "HDFS configuration: expected '%s' for '%s', actual value is '%s'" % et) |
| |
| # test hadoop memory setting |
| expect_namenode_heap = expect_config["dfs.mem.namenode.heap"] |
| expect_datanode_heap = expect_config["dfs.mem.datanode.heap"] |
| |
| if host.is_namenode and actual_heap_size < expect_namenode_heap: |
| checkFailed(host.hostname, "Namenode Java heap size is only %sM, we recommends at least %sM" % |
| (actual_heap_size, expect_namenode_heap)) |
| |
| if not host.is_namenode and actual_heap_size < expect_datanode_heap: |
| checkFailed(host.hostname, "Datanode Java heap size is only %sM, expect value is %sM" % |
| (actual_heap_size, expect_datanode_heap)) |
| |
| |
| # Check if nodemanager direcotries exists |
| directory_check_list = [] |
| datanode_list = list() |
| if HADOOP_HOME: |
| datanode_list = parse_host_list_file("%s/etc/hadoop/slaves" % HADOOP_HOME) |
| is_datanode = False |
| if host.hostname in datanode_list: |
| is_datanode = True |
| |
| if options.yarn or options.yarn_ha: |
| yarn_enabled = True |
| else: |
| yarn_enabled = False |
| |
| if yarn_enabled and is_datanode: |
| if 'yarn.nodemanager.local-dirs' in actual_config: |
| directory_check_list += actual_config['yarn.nodemanager.local-dirs'].split(',') |
| else: |
| checkFailed(host.hostname, "YARN configuration: yarn.nodemanager.local-dirs not defined") |
| |
| if 'yarn.nodemanager.log-dirs' in actual_config: |
| directory_check_list += actual_config['yarn.nodemanager.log-dirs'].split(',') |
| else: |
| checkFailed(host.hostname, "YARN configuration: yarn.nodemanager.log-dirs not defined") |
| |
| for directory in directory_check_list: |
| cmd = "test -e %s" % directory |
| (result, output, errmsg) = remote_ssh_output(cmd, host.hostname, '') |
| if result != 0: |
| checkFailed(host.hostname, "YARN nodemanager directory %s does not exist" % directory) |
| |
| # Check if resource manager property exists |
| if options.yarn: |
| yarn_property_exist_list = ['yarn.resourcemanager.address', 'yarn.resourcemanager.scheduler.address'] |
| |
| if options.yarn_ha: |
| yarn_property_exist_list = ['yarn.resourcemanager.hostname.rm1', 'yarn.resourcemanager.hostname.rm2'] |
| |
| if yarn_enabled: |
| for item in yarn_property_exist_list: |
| if item in actual_config: |
| if not actual_config[item]: |
| checkFailed(host.hostname, "YARN configuration: %s is empty" % item) |
| else: |
| checkFailed(host.hostname, "YARN configuration: %s not defined" % item) |
| |
| # Check yarn kerberos properties |
| if yarn_enabled and options.kerberos: |
| yarn_kerberos_check_list = ['yarn.nodemanager.keytab', 'yarn.nodemanager.principal', \ |
| 'yarn.resourcemanager.keytab', 'yarn.resourcemanager.principal'] |
| for item in yarn_kerberos_check_list: |
| if item in actual_config: |
| if not actual_config[item]: |
| checkFailed(host.hostname, "YARN configuration: %s is empty, expected non-empty" % item) |
| else: |
| checkFailed(host.hostname, "YARN configuration missing: %s" % item) |
| |
| |
| def testIOSchedulers(host): |
| if options.verbose: |
| logger.info("-- test IO scheduler") |
| |
| if host.data.ioschedulers.errormsg: |
| checkFailed(host.hostname, "collect IO scheduler data error: %s" % host.data.ioschedulers.errormsg) |
| return |
| |
| expectedScheduler = "deadline" |
| for dev in host.data.ioschedulers.devices: |
| scheduler = host.data.ioschedulers.devices[dev] |
| if scheduler != expectedScheduler: |
| checkFailed(host.hostname, |
| "on device (%s) IO scheduler '%s' does not match expected value '%s'" % (dev, scheduler, expectedScheduler)) |
| |
| |
| # perform this test only run as root |
| def testBlockdev(host): |
| if host.data.blockdev is None: |
| return |
| |
| if options.verbose: |
| logger.info("-- test block device readahead value") |
| |
| expectedReadAhead = "16384" |
| for dev in host.data.blockdev.ra: |
| ra = host.data.blockdev.ra[dev] |
| if ra != expectedReadAhead: |
| checkFailed(host.hostname, |
| "on device (%s) blockdev readahead value '%s' does not match expected value '%s'" % (dev, ra, expectedReadAhead)) |
| |
| |
| def testSysctl(host): |
| if options.verbose: |
| logger.info("-- test sysctl value") |
| |
| if host.data.sysctl.errormsg: |
| checkFailed(host.hostname, "collect sysctl params error: %s" % host.data.sysctl.errormsg) |
| return |
| |
| expected_values = gpcheck_config.sysctl_expected |
| real_values = host.data.sysctl.variables |
| |
| # gpcheck.conf specify a lowerbound value for these params, actual value can be larger |
| params_with_lowerbound = set() # sysctl params' value must be exactly the same |
| |
| for k in expected_values: |
| if k in params_with_lowerbound: |
| if int(real_values[k]) < int(expected_values[k]): |
| checkFailed(host.hostname, |
| "sysctl value for key '%s' has value '%s', but we expect at least '%s'" % (k, real_values[k], expected_values[k])) |
| |
| elif real_values[k] != expected_values[k]: # for other params, we expect the actual value to be the same value |
| checkFailed(host.hostname, |
| "sysctl value for key '%s' has value '%s' and expects '%s'" % (k, real_values[k], expected_values[k])) |
| |
| |
| def testLimitsConf(host): |
| if options.verbose: |
| logger.info("-- test /etc/security/limits.conf") |
| |
| if host.data.limitsconf.errormsg: |
| checkFailed(host.hostname, "collect limits.conf data error: %s" % host.data.limitsconf.errormsg) |
| return |
| |
| # both dict has the form: (type, item) => value |
| expect_data = gpcheck_config.limits_expected |
| actual_data = dict([((e.type, e.item), e.value) for e in host.data.limitsconf.lines if e.domain in ("gpadmin", "*")]) |
| expect_keyset = set(expect_data.keys()) |
| actual_keyset = set(actual_data.keys()) |
| |
| for key in expect_keyset.intersection(actual_keyset): |
| expect_val = int(expect_data[key]) |
| actual_val = int(actual_data[key]) |
| if actual_val < expect_val: |
| checkFailed(host.hostname, |
| "%s in /etc/security/limits.conf has value %d lower than expected value %d" % ( |
| " ".join(key), actual_val, expect_val)) |
| |
| for key in expect_keyset.difference(actual_keyset): |
| checkFailed(host.hostname, |
| "%s not found in /etc/security/limits.conf" % " ".join(key)) |
| |
| |
| def testLinuxMounts(host): |
| if options.verbose: |
| logger.info("-- test mount points") |
| |
| expected_mount_points = gpcheck_config.mount_points |
| actual_mount_points = set([m.dir for m in host.data.mounts.entries.values()]) |
| |
| if len(expected_mount_points) == 0: |
| if options.verbose: |
| logger.info("-- you didn't specify any mount points to be check in %s, ignore this test" % GPCHECK_CONFIG_FILE) |
| return |
| |
| if not actual_mount_points.issuperset(expected_mount_points): |
| for failed_mount in expected_mount_points.difference(actual_mount_points): |
| checkFailed(host.hostname, "%s is not mounted" % failed_mount) |
| |
| |
| def testNtp(host): |
| if options.verbose: |
| logger.info("-- test NTP") |
| |
| if host.data.ntp.currenttime < (gpcheck_info.collection_start_time - 1): |
| checkFailed(host.hostname, "potential NTPD issue. gpcheck start time (%s) time on machine (%s)" % (time.ctime(gpcheck_info.collection_start_time), time.ctime(host.data.ntp.currenttime))) |
| if host.data.ntp.currenttime > (gpcheck_info.collection_end_time + 1): |
| checkFailed(host.hostname, "potential NTPD issue. gpcheck end time (%s) time on machine (%s)" % (time.ctime(gpcheck_info.collection_start_time), time.ctime(host.data.ntp.currenttime))) |
| if not host.data.ntp.running: |
| checkFailed(host.hostname, "ntpd not detected on machine") |
| |
| |
| def testGenericLinuxHost(host): |
| logger.info("test on host: %s" % host.hostname) |
| if host.is_namenode: |
| testHAWQGUC(host) |
| testHAWQconfig(host) |
| testHDFSConfig(host) |
| testDiskCapacity(host) |
| testSysctl(host) |
| testLimitsConf(host) |
| testLinuxMounts(host) |
| testNtp(host) |
| |
| else: |
| testHAWQGUC(host) |
| testHAWQconfig(host) |
| testDiskCapacity(host) |
| testHDFSConfig(host) |
| testIOSchedulers(host) |
| testSysctl(host) |
| testLimitsConf(host) |
| testLinuxMounts(host) |
| testNtp(host) |
| |
| |
| def testGenericSolarisHost(host): |
| testSolarisEtcSystem(host) |
| testSolarisEtcProject(host) |
| testSolarisEtcUserAttr(host) |
| |
| def testUnameConsistency(): |
| logger.info("test uname consistency") |
| firstUname = None |
| firstHost = None |
| for _, host in gpcheck_info.hosts.items(): |
| uname = host.data.uname.output |
| if firstUname: |
| if firstUname != uname: |
| checkFailed(h, "uname -r output different among hosts: %s : %s != %s : %s" % (firstHost, firstUname, host.hostname, uname)) |
| else: |
| firstUname = uname |
| firstHost = host.hostname |
| |
| |
| def testGenericLinuxCluster(): |
| for _, host in gpcheck_info.hosts.items(): |
| testGenericLinuxHost(host) |
| testUnameConsistency() |
| |
| def testGenericLinuxClusterBlockDev(): |
| for _, host in gpcheck_info.hosts.items(): |
| if not host.is_namenode: |
| testBlockdev(host) |
| |
| def testGenericSolarisCluster(): |
| for _, host in gpcheck_info.hosts.items(): |
| testGenericSolarisHost(host) |
| testUnameConsistency() |
| |
| |
| def runTests(): |
| if gpcheck_info.host_type == HostType.GPCHECK_HOSTTYPE_GENERIC_LINUX: |
| testGenericLinuxCluster() |
| if gpcheck_info.is_root: |
| testGenericLinuxClusterBlockDev() |
| |
| elif gpcheck_info.host_type == HostType.GPCHECK_HOSTTYPE_GENERIC_SOLARIS: |
| testGenericSolarisCluster() |
| |
| else: |
| raise GpCheckError("No tests exist for this platform in gpcheck") |
| |
| # report checks result |
| logger.info("GPCHECK Result:") |
| logger.info("---------------------------------------") |
| if found_errors: |
| logger.info("check failed!\tfound %s error(s)" % found_errors) |
| else: |
| logger.info("all check succeed!") |
| logger.info("---------------------------------------") |
| |
| |
| def readZip(): |
| logger.info("trying to read zip file '%s'..." % options.zipin) |
| |
| words = options.zipin.split(".tar.gz") |
| if len(words) != 2: |
| raise GpCheckError("--zipin file needs to be a .tar.gz file") |
| fname = words[0] |
| |
| # untar |
| cmdStr = "tar xfz %s" % (options.zipin) |
| if options.verbose: |
| logger.info(cmdStr) |
| try: |
| cmd = Command("tarcmd", cmdStr) |
| pool.addCommand(cmd) |
| pool.join() |
| items = pool.getCompletedItems() |
| for i in items: |
| if i.results.rc or i.results.halt or not i.results.completed: |
| raise Exception("error running command '%s'" % cmdStr) |
| |
| except Exception, e: |
| raise GpCheckError("Failed to extract tar file '%s': %s" % (options.zipin, e)) |
| |
| # move extracted file to temp directory |
| newfname = "%s/%s" % (tmpdir, fname) |
| cmdStr = "mv %s %s" % (fname, newfname) |
| if options.verbose: |
| logger.info(cmdStr) |
| try: |
| cmd = Command("mvcmd", cmdStr) |
| pool.addCommand(cmd) |
| pool.join() |
| items = pool.getCompletedItems() |
| for i in items: |
| if i.results.rc or i.results.halt or not i.results.completed: |
| raise Exception("error running command '%s'" % cmdStr) |
| |
| except Exception, e: |
| raise GpCheckError("Failed to move file '%s' to temp directory: %s" % (fname, e)) |
| |
| # load pickle file |
| global gpcheck_info |
| try: |
| with open(newfname, "rb") as f: |
| gpcheck_info = pickle.load(f) |
| |
| except Exception, e: |
| raise GpCheckError("Failed to load pickle file '%s': %s" % (newfname, e)) |
| |
| logger.info("trying to read zip file '%s' [success]" % options.zipin) |
| |
| def doZip(fname): |
| logger.info("dump gpcheck data into a zip file '%s.tar.gz'..." % fname) |
| |
| # dump to pickle file |
| try: |
| with open(fname, "wb") as f: |
| pickle.dump(gpcheck_info, f) |
| |
| except Exception, e: |
| raise GpCheckError("Failed to dump pickle file '%s':\n%s" % (fname, e)) |
| |
| # make a tar ball |
| cmdStr = "tar cfz %s.tar.gz %s" % (fname, fname) |
| if options.verbose: |
| logger.info(cmdStr) |
| try: |
| cmd = Command("tarcmd", cmdStr) |
| pool.addCommand(cmd) |
| pool.join() |
| items = pool.getCompletedItems() |
| for i in items: |
| if i.results.rc or i.results.halt or not i.results.completed: |
| raise Exception("error running command '%s': %s" % (cmdStr, i.results.stderr.strip())) |
| |
| except Exception, e: |
| raise GpCheckError("Failed to dump gpcheck data into a zip file:\n%s" % e) |
| |
| # delete pickle file |
| cmdStr = "rm -rf %s" % fname |
| if options.verbose: |
| logger.info(cmdStr) |
| try: |
| cmd = Command("rmcmd", cmdStr) |
| pool.addCommand(cmd) |
| pool.join() |
| items = pool.getCompletedItems() |
| for i in items: |
| if i.results.rc or i.results.halt or not i.results.completed: |
| raise Exception("error running command '%s': %s" % (cmdStr, i.results.stderr.strip())) |
| |
| except Exception, e: |
| raise GpCheckError("Failed to delete pickle file '%s':\n%s" % (fname, e)) |
| |
| logger.info("dump gpcheck data into a zip file '%s.tar.gz' [success]" % fname) |
| |
| |
| def doPrint(): |
| for h in sorted(gpcheck_info.hosts): |
| print "HOST: %s" % h |
| print gpcheck_info.hosts[h].data |
| print "----------------------------------------------------------------------\n" |
| |
| if gpcheck_info.hawq_collected_ok: |
| print "HAWQ guc settings:" |
| for guc_name, guc_val in gpcheck_info.hawq_gucs.items(): |
| print "GUC : %s\nMaster value: %s\nSegment value: %s\n" % (guc_name, guc_val[0], guc_val[1]) |
| |
| def checkHostsFile(): |
| ''' |
| This function will check /etc/hosts file, and if it contains IPv6, it will be commented out |
| ''' |
| rtn_output = subprocess.check_output("cat /etc/hosts", shell=True) |
| hostsContent = rtn_output.decode("utf8", "ignore") |
| |
| newHostsContent = "" |
| # Trying to find if /etc/hosts contains ::1 |
| for hostName in hostsContent.splitlines(): |
| hostName = hostName.replace('\n', '') |
| if hostName.find("::") > -1 and hostName.find("#") < 0: |
| newHostsContent += "# " + hostName + "\n" |
| else: |
| newHostsContent += hostName + "\n" |
| |
| return newHostsContent |
| |
| def local_run(cmd): |
| '''Execute each shell command on local machine.''' |
| for elem in cmd: |
| result = subprocess.Popen(elem, shell=True).wait() |
| check_return_code(result) |
| |
| def checkHardwarePerf(): |
| """ |
| This function is used to check hardware performance of each node, which include disk IO, memory and network |
| """ |
| global options |
| hawq_home = os.getenv('GPHOME') |
| if not hawq_home: |
| print("HAWQ home directory is not defined, please check GPHOME settings.") |
| sys.exit(1) |
| source_hawq_env = ". %s/greenplum_path.sh" % hawq_home |
| cmd = "" |
| arguments = "-f %s -r dsN -D -S 8589934592 -d /tmp" % (options.file) |
| cmd = "%s; gpcheckperf %s" % (source_hawq_env, arguments) |
| rtn_output = subprocess.check_output(cmd, shell=True) |
| rtnContent = rtn_output.decode("utf8", "ignore") |
| |
| # Display test result |
| print(rtn_output) |
| |
| expected_disk_write_min_bandwidth = gpcheck_config.disk_write_min_bandwidth |
| expected_disk_read_min_bandwidth = gpcheck_config.disk_read_min_bandwidth |
| expected_stream_min_bandwidth = gpcheck_config.stream_min_bandwidth |
| expected_network_min_bandwidth = gpcheck_config.network_min_bandwidth |
| |
| # Retrieve test result |
| hostname_real_disk_write_min_bandwidth = "" |
| real_disk_write_min_bandwidth = 0.0 |
| |
| hostname_real_disk_read_min_bandwidth = "" |
| real_disk_read_min_bandwidth = 0.0 |
| |
| hostname_real_stream_min_bandwidth = "" |
| real_stream_min_bandwidth = 0.0 |
| |
| real_network_min_bandwidth = 0.0 |
| rtnContent = rtnContent.splitlines() |
| for rowItem in rtnContent: |
| if "disk write min bandwidth" in rowItem: |
| value_hostnane = rowItem.split(": ")[1] |
| hostname_real_disk_write_min_bandwidth = value_hostnane.split(" ")[1].strip() |
| real_disk_write_min_bandwidth = (float)(value_hostnane.split(" ")[0].strip()) |
| elif "disk read min bandwidth" in rowItem: |
| value_hostnane = rowItem.split(": ")[1] |
| hostname_real_disk_read_min_bandwidth = value_hostnane.split(" ")[1].strip() |
| real_disk_read_min_bandwidth = (float)(value_hostnane.split(" ")[0].strip()) |
| elif "stream min bandwidth" in rowItem: |
| value_hostnane = rowItem.split(": ")[1] |
| hostname_real_stream_min_bandwidth = value_hostnane.split(" ")[1].strip() |
| real_stream_min_bandwidth = (float)(value_hostnane.split(" ")[0].strip()) |
| elif "min = " in rowItem: |
| value_unit = rowItem.split("= ")[1] |
| real_network_min_bandwidth = (float)(value_unit.split(" ")[0].strip()) |
| elif "[Warning] single host only - abandon netperf test" in rowItem: |
| real_network_min_bandwidth = -1.0 |
| |
| if real_disk_write_min_bandwidth >= expected_disk_write_min_bandwidth: |
| logger.info("Pass: Disk write test") |
| else: |
| logger.error("Fail: Disk min write bandwidth %.2f (MB/s) is lower than expected value %.2f (MB/s). Node: %s." % (real_disk_write_min_bandwidth, expected_disk_write_min_bandwidth, hostname_real_disk_write_min_bandwidth)) |
| |
| if real_disk_read_min_bandwidth >= expected_disk_read_min_bandwidth: |
| logger.info("Pass: Disk read test") |
| else: |
| logger.error("Fail: Disk min read bandwidth %.2f (MB/s) is lower than expected value %.2f (MB/s). Node: %s." % (real_disk_read_min_bandwidth, expected_disk_read_min_bandwidth, hostname_real_disk_read_min_bandwidth)) |
| |
| if real_stream_min_bandwidth >= expected_stream_min_bandwidth: |
| logger.info("Pass: Stream test") |
| else: |
| logger.error("Fail: Stream min bandwidth %.2f (MB/s) is lower than expected value %.2f (MB/s). Node %s." % (real_stream_min_bandwidth, expected_stream_min_bandwidth, hostname_real_stream_min_bandwidth)) |
| |
| if real_network_min_bandwidth < 0: |
| logger.info("Warning: Single host only - abandon netperf test") |
| elif real_network_min_bandwidth >= expected_network_min_bandwidth: |
| logger.info("Pass: Network test") |
| else: |
| logger.error("Fail: Min network bandwidth %.2f (MB/s) is lower than expected value %.2f (MB/s)." % (real_network_min_bandwidth, expected_network_min_bandwidth)) |
| |
| def retrieve_DB_Version(): |
| ''' |
| This function is used to retrieve database version and RPM version. |
| ''' |
| _sqlUtil = GpsqlUtil() |
| db_version = _sqlUtil.performQuery("select version();") |
| |
| rtn_output = subprocess.check_output("sudo rpm -qa | grep hawq", shell=True) |
| rpm_version = rtn_output.decode("utf8", "ignore") |
| rpm_version = rpm_version.splitlines()[0].replace("\n", "") |
| |
| logger.info("Database Version is %s" % db_version[0]["version"]) |
| logger.info("RPM Version is %s" % rpm_version) |
| |
| def retrieve_DB_process(): |
| ''' |
| This function is used to retrieve database process info. |
| ''' |
| _sqlUtil = GpsqlUtil() |
| db_process = _sqlUtil.performQuery("SELECT * FROM gp_segment_configuration;") |
| |
| if db_process is not None: |
| logger.info("Database Process Information:") |
| for listElem in db_process: |
| if listElem["status"].lower() == "u": |
| logger.info("Database node: %s is up." % listElem["hostname"]) |
| else: |
| logger.error("Database node: %s is NOT up." % listElem["hostname"]) |
| else: |
| logger.error("Fail to retrieve database process information.") |
| |
| def check_DB_basic_function(): |
| ''' |
| This function will perform create table, insert data into table, select data from table |
| |
| and drop table operations to test if database is functioning correctly. |
| ''' |
| _sqlUtil = GpsqlUtil() |
| |
| table_name = "hawqTest_" + str(calendar.timegm(time.gmtime())) |
| rowCount = 100000 |
| try: |
| # Create table |
| createTable = _sqlUtil.performQuery("CREATE TABLE %s (id INT);" % table_name, 0) |
| if createTable != "FAIL": |
| logger.info("Pass: Create table is successful.") |
| except Exception as e: |
| logger.error("Fail to create table: %s" % (str(e))) |
| return None |
| |
| try: |
| # Insert data into database |
| insertData = _sqlUtil.performQuery("INSERT INTO %s SELECT generate_series(1, %s);" % (table_name, str(rowCount)), 0) |
| rtnData = _sqlUtil.performQuery("SELECT count(*) FROM %s;" % (table_name)) |
| if rtnData[0]["count"] == rowCount: |
| logger.info("Pass: Insert data into table is successful.") |
| logger.info("Pass: Select data from table is successful.") |
| else: |
| logger.error("Fail to insert data into table.") |
| except Exception as e: |
| logger.error("Fail to insert data into table.") |
| dropTable = _sqlUtil.performQuery("DROP TABLE %s;" % table_name) |
| return None |
| |
| try: |
| # Drop table |
| dropTable = _sqlUtil.performQuery("DROP TABLE %s;" % table_name, 0) |
| if dropTable != "FAIL": |
| logger.info("Pass: Drop table is successful.") |
| except Exception as e: |
| logger.error("Fail to drop table %s. Please perform drop table operation manually." % table_name) |
| return None |
| |
| def check_GUC_configuration(): |
| ''' |
| This function will perform GUC configuration check: |
| |
| default_hash_table_bucket_number; hawq_rm_nvseg_perquery_limit; |
| |
| hawq_rm_nvseg_perquery_perseg_limit; |
| |
| magma_shm_limit_per_block |
| ''' |
| expected_default_hash_table_bucket_number = gpcheck_config.default_hash_table_bucket_number |
| expected_hawq_rm_nvseg_perquery_limit = gpcheck_config.hawq_rm_nvseg_perquery_limit |
| expected_hawq_rm_nvseg_perquery_perseg_limit = gpcheck_config.hawq_rm_nvseg_perquery_perseg_limit |
| expected_magma_shm_limit_per_block = gpcheck_config.magma_shm_limit_per_block |
| |
| # Connect to DB to retrieve GUC config values |
| _sqlUtil = GpsqlUtil() |
| |
| actual_default_hash_table_bucket_number = None |
| actual_hawq_rm_nvseg_perquery_limit = None |
| actual_hawq_rm_nvseg_perquery_perseg_limit =None |
| actual_magma_shm_limit_per_block = None |
| |
| try: |
| querySQL = "show default_hash_table_bucket_number;" |
| actual_default_hash_table_bucket_number = _sqlUtil.performQuery(querySQL) |
| actual_default_hash_table_bucket_number = actual_default_hash_table_bucket_number[0]["default_hash_table_bucket_number"] |
| if expected_default_hash_table_bucket_number > int(actual_default_hash_table_bucket_number): |
| logger.error("Fail: Default hash table bucket number %s is less than expected value %s." % (actual_default_hash_table_bucket_number, expected_default_hash_table_bucket_number)) |
| else: |
| logger.info("Pass: Default hash table bucket number %s is NOT less than expected value %s." % (actual_default_hash_table_bucket_number, expected_default_hash_table_bucket_number)) |
| |
| querySQL = "show hawq_rm_nvseg_perquery_limit;" |
| actual_hawq_rm_nvseg_perquery_limit = _sqlUtil.performQuery(querySQL) |
| actual_hawq_rm_nvseg_perquery_limit = actual_hawq_rm_nvseg_perquery_limit[0]["hawq_rm_nvseg_perquery_limit"] |
| if expected_hawq_rm_nvseg_perquery_limit > int(actual_hawq_rm_nvseg_perquery_limit): |
| logger.error("Fail: Hawq rm nvseg perquery limit %s is less than expected value %s." % (actual_hawq_rm_nvseg_perquery_limit, expected_hawq_rm_nvseg_perquery_limit)) |
| else: |
| logger.info("Pass: Hawq rm nvseg perquery limit %s is NOT less than expected value %s." % (actual_hawq_rm_nvseg_perquery_limit, expected_hawq_rm_nvseg_perquery_limit)) |
| |
| querySQL = "show hawq_rm_nvseg_perquery_perseg_limit;" |
| actual_hawq_rm_nvseg_perquery_perseg_limit = _sqlUtil.performQuery(querySQL) |
| actual_hawq_rm_nvseg_perquery_perseg_limit = actual_hawq_rm_nvseg_perquery_perseg_limit[0]["hawq_rm_nvseg_perquery_perseg_limit"] |
| if expected_hawq_rm_nvseg_perquery_perseg_limit > int(actual_hawq_rm_nvseg_perquery_perseg_limit): |
| logger.error("Fail: Hawq rm nvseg perquery perseg limit %s is less than expected value %s." % (actual_hawq_rm_nvseg_perquery_perseg_limit, expected_hawq_rm_nvseg_perquery_perseg_limit)) |
| else: |
| logger.info("Pass: Hawq rm nvseg perquery perseg limit %s is NOT less than expected value %s." % (actual_hawq_rm_nvseg_perquery_perseg_limit, expected_hawq_rm_nvseg_perquery_perseg_limit)) |
| |
| querySQL = "show magma_shm_limit_per_block;" |
| actual_magma_shm_limit_per_block = _sqlUtil.performQuery(querySQL) |
| actual_magma_shm_limit_per_block = actual_magma_shm_limit_per_block[0]["magma_shm_limit_per_block"].replace("MB", "") |
| if expected_magma_shm_limit_per_block > int(actual_magma_shm_limit_per_block): |
| logger.error("Fail: Magma shm limit per block %sMB is less than expected value %sMB." % (actual_magma_shm_limit_per_block, expected_magma_shm_limit_per_block)) |
| else: |
| logger.info("Pass: Magma shm limit per block %sMB is NOT less than expected value %sMB." % (actual_magma_shm_limit_per_block, expected_magma_shm_limit_per_block)) |
| except Exception as e: |
| logger.error("Fail to execute %s." % (querySQL)) |
| logger.error("Error Message: %s" % (str(e))) |
| |
| def checkDiskUsage(): |
| ''' |
| This function is used to check disk total usage data of each node, |
| |
| and compare total disk space and available ratio with expected value in gpconfig.yml file |
| ''' |
| global options |
| |
| rtn_output = subprocess.check_output("cat %s" % (options.file), shell=True) |
| hostsContent = rtn_output.decode("utf8", "ignore") |
| |
| expected_disk_space_total = (float)(gpcheck_config.disk_space_total) |
| expected_disk_space_available = (float)(gpcheck_config.diskusage_usagemax) |
| |
| # Check disk status of each host |
| for elem in hostsContent.splitlines(): |
| hostName = elem.replace("\n", "") |
| hostName = hostName.replace(" ", "") |
| if hostName != "": |
| cmd = [] |
| print("---------- Node Name: %s" % (hostName)) |
| cmd.append("ssh %s df -h --total | grep Filesystem; df -h --total | grep total" % (hostName)) |
| local_run(cmd) |
| rtn_output = subprocess.check_output("ssh %s df -h -P --total -BG | grep total" % (hostName), shell=True) |
| rtnContent = rtn_output.decode("utf8", "ignore") |
| rtnVal = ' '.join(rtnContent.splitlines()[0].split()) |
| rtnVal = rtnVal.split(" ") |
| totalDiskSpace = (float)(rtnVal[1].replace("G", "")) |
| diskAvailableSpace = (float)(rtnVal[3].replace("G", "")) |
| diskAvaiableRatio = (diskAvailableSpace / totalDiskSpace) * 100.0 |
| |
| # Compare real value with expected value |
| if expected_disk_space_total > totalDiskSpace: |
| logger.error("Fail: Disk total space %.1fGB is less than expected value %.1fGB." % (totalDiskSpace, expected_disk_space_total)) |
| else: |
| logger.info("Pass: Disk total space %.1fGB is NOT less than expected value %.1fGB." % (totalDiskSpace, expected_disk_space_total)) |
| |
| if expected_disk_space_available > diskAvaiableRatio: |
| logger.error("Fail: Disk available space ratio %.1f%% is less than expected value %.1f%%." % (diskAvaiableRatio, expected_disk_space_available)) |
| else: |
| logger.info("Pass: Disk available space ratio %.1f%% is NOT less than expected value %.1f%%." % (diskAvaiableRatio, expected_disk_space_available)) |
| |
| def checkMasterStandbySync(): |
| ''' |
| This function is used to check if master and standby are synchronized |
| ''' |
| _sqlUtil = GpsqlUtil() |
| gp_master_mirroring = _sqlUtil.performQuery("select * from gp_master_mirroring;") |
| gp_master_mirroring_rtn = gp_master_mirroring[0]["summary_state"].lower() |
| |
| # Check if there is standby node |
| gp_segment_configuration = _sqlUtil.performQuery("select * from gp_segment_configuration where role = 's';") |
| standby_flag = False |
| if len(gp_segment_configuration) > 0: |
| standby_flag = True |
| |
| if standby_flag: |
| if gp_master_mirroring_rtn == "synchronized": |
| logger.info("Pass: Master and Standby are synchronized.") |
| elif gp_master_mirroring_rtn == "not configured": |
| logger.info("Fail: Standby is NOT found.") |
| else: |
| logger.error("Fail: Master and Standby are NOT synchronized.") |
| else: |
| if gp_master_mirroring_rtn == "synchronized": |
| logger.info("Fail: Unexpected accident is occurring. Please check gp_segment_configuration and gp_master_mirroring tables.") |
| elif gp_master_mirroring_rtn == "not configured": |
| logger.info("Pass: Standby are NOT configured.") |
| else: |
| logger.error("Fail: Unexpected accident is occurring. Please check gp_segment_configuration and gp_master_mirroring tables.") |
| |
| def checkTableAnalyzeStatus(): |
| ''' |
| This function is used to check if all tables are analyzed within three days |
| ''' |
| _sqlUtil = GpsqlUtil() |
| analyzeInfo = _sqlUtil.performQuery("SELECT classid, objid, staactionname, stasysid, stausename, stasubtype, statime FROM pg_stat_last_operation pslo, pg_class pc WHERE pc.oid = pslo.objid and (now()-statime)>3;") |
| if len(analyzeInfo) > 0: |
| logger.error("Fail: There are %d tables which are NOT analyzed within three days. Please check pg_stat_last_operation and pg_class tables to get details." % (len(analyzeInfo))) |
| else: |
| logger.info("Pass: All tables are analyzed within three days.") |
| |
| def check_kerberos_enablement(): |
| ''' |
| Read /usr/local/hawq/etc/hdfs-client.xml and /usr/local/hawq/etc/yarn-client.xml to check if kerberos is enabled. |
| ''' |
| # Open hdfs-client.xml and get data |
| hdfs_client_Msg = subprocess.check_output("cat /usr/local/hawq/etc/hdfs-client.xml", shell=True) |
| root = xml.etree.ElementTree.fromstring(hdfs_client_Msg) |
| |
| # Check if kerberos in hdfs-client.xml is enabled |
| flag = True |
| for elem in root.findall("property"): |
| elemValue = elem.find('name').text |
| if elemValue.lower() == "kerberos": |
| flag = False |
| break |
| if flag: |
| logger.error("Kerberos in hdfs-client.xml is NOT enabled.") |
| else: |
| logger.info("Kerberos in hdfs-client.xml is enabled.") |
| |
| # Open yarn-client.xml and get data |
| yarn_client_Msg = subprocess.check_output("cat /usr/local/hawq/etc/yarn-client.xml", shell=True) |
| root = xml.etree.ElementTree.fromstring(yarn_client_Msg) |
| |
| # Check if kerberos in yarn-client.xml is enabled |
| flag = True |
| for elem in root.findall("property"): |
| elemValue = elem.find('name').text |
| if elemValue.lower() == "kerberos": |
| flag = False |
| break |
| if flag: |
| logger.error("Kerberos in yarn-client.xml is NOT enabled.") |
| else: |
| logger.info("Kerberos in yarn-client.xml is enabled.") |
| |
| def check_active_namenode(): |
| ''' |
| This function will read /usr/local/hawq/etc/hdfs-client.xml to check if first namenode is active namenode |
| ''' |
| hdfs_client_Msg = subprocess.check_output("cat /usr/local/hawq/etc/hdfs-client.xml", shell=True) |
| root = xml.etree.ElementTree.fromstring(hdfs_client_Msg) |
| |
| # Check Active Namenode in hdfs-client.xml |
| flag = True |
| firstNamenode = "" |
| nameservices = "" |
| for elem in root.findall("property"): |
| if elem.find("name").text == "dfs.nameservices": |
| nameservices = elem.find("value").text |
| break |
| |
| for elem in root.findall("property"): |
| if elem.find("name").text == "dfs.ha.namenodes." + nameservices: |
| firstNamenode = elem.find("value").text.split(",")[0] |
| flag = False |
| break |
| if flag: |
| logger.error("Warning: Fail to retrieve namenode in hdfs-client.xml. Please check your HFDS configure in hdfs-client.xml.") |
| return None |
| |
| try: |
| # Check if first namenode is active |
| subprocess.Popen("su - hdfs", shell=True).wait() |
| command = "hdfs haadmin -getServiceState " + firstNamenode |
| check_active_namenode_Msg = subprocess.check_output(command, shell=True) |
| |
| if check_active_namenode_Msg == "active": |
| logger.info("Pass: First namenode %s is active name node." % (firstNamenode)) |
| elif check_active_namenode_Msg == "standby": |
| logger.error("Fail: First namenode %s is NOT active name node." % (firstNamenode)) |
| else: |
| logger.error("Fail: Fail to detect active name node.") |
| except Exception as e: |
| logger.error("Fail: HDFS Error: %s" % (str(e))) |
| finally: |
| # if current user is not root, switch back to root user |
| current_user = subprocess.check_output("whoami", shell=True) |
| if current_user != "root": |
| subprocess.Popen("exit", shell=True).wait() |
| |
| def check_hdfs_local_read(): |
| ''' |
| Read /usr/local/hawq/etc/hdfs-client.xml and /etc/hadoop/conf/hdfs-site.xml files and find <name>dfs.domain.socket.path</name>. |
| |
| If their socket file path are same, and permission is 666, it means hawq user can write them. |
| ''' |
| # Check <name>dfs.domain.socket.path</name> in hdfs-client.xml |
| hdfs_client_Msg = subprocess.check_output("cat /usr/local/hawq/etc/hdfs-client.xml", shell=True) |
| root = xml.etree.ElementTree.fromstring(hdfs_client_Msg) |
| |
| flag = True |
| socket_hdfs_client = "" |
| for elem in root.findall("property"): |
| elemValue = elem.find('name').text |
| if elemValue.lower() == "dfs.domain.socket.path": |
| socket_hdfs_client = elem.find("value").text |
| flag = False |
| break |
| if flag: |
| logger.error("Fail: Can Not find dfs.domain.socket.path in hdfs-client.xml.") |
| return None |
| |
| # Check <name>dfs.domain.socket.path</name> in hdfs-site.xml |
| hdfs_client_Msg = subprocess.check_output("cat /etc/hadoop/conf/hdfs-site.xml", shell=True) |
| root = xml.etree.ElementTree.fromstring(hdfs_client_Msg) |
| |
| flag = True |
| socket_hdfs_site = "" |
| for elem in root.findall("property"): |
| elemValue = elem.find('name').text |
| if elemValue.lower() == "dfs.domain.socket.path": |
| socket_hdfs_site = elem.find("value").text |
| flag = False |
| break |
| if flag: |
| logger.error("Fail: Can Not find dfs.domain.socket.path in hdfs-site.xml.") |
| return None |
| |
| # Check file path |
| if socket_hdfs_client == socket_hdfs_site: |
| try: |
| file_permission_Msg = subprocess.check_output("ls -l %s" % (socket_hdfs_site), shell=True) |
| file_permission1 = file_permission_Msg[1:4] |
| file_permission2 = file_permission_Msg[4:7] |
| file_permission3 = file_permission_Msg[7:10] |
| if file_permission1.find("-") < 0 or file_permission1.find("-") > 1: |
| if file_permission2.find("-") < 0 or file_permission2.find("-") > 1: |
| if file_permission3.find("-") < 0 or file_permission3.find("-") > 1: |
| logger.info("Pass: HDFS local read is configured correctly.") |
| else: |
| logger.error("Fail: HDFS local read is NOT configured correctly.") |
| else: |
| logger.error("Fail: HDFS local read is NOT configured correctly.") |
| else: |
| logger.error("Fail: HDFS local read is NOT configured correctly.") |
| except Exception as e: |
| logger.error("Fail: HDFS local read is NOT configured correctly. \n Error Message is: %s" % (e)) |
| else: |
| logger.error("Fail: socket file path in %s and %s are different." % (socket_hdfs_client, socket_hdfs_site)) |
| |
| def check_return_code(result, logger = None, error_msg = None, info_msg = None, exit_true = False): |
| '''Check shell command exit code.''' |
| if result != 0: |
| if error_msg and logger: |
| logger.error(error_msg) |
| else: |
| if info_msg and logger: |
| logger.info(info_msg) |
| if exit_true: |
| sys.exit(0) |
| return result |
| |
| def overwrite_etc_hosts(filecontent = "", hostfile = ""): |
| ''' |
| This function is used to overwrite /etc/hosts file and scp to all nodes in cluster |
| ''' |
| cmd =[] |
| hawq_home = os.getenv('GPHOME') |
| if not hawq_home: |
| print("HAWQ home directory is not defined, please check GPHOME settings.") |
| sys.exit(1) |
| source_hawq_env = ". %s/greenplum_path.sh" % hawq_home |
| cmd.append(source_hawq_env) |
| |
| argument = "echo \'" + filecontent + "\' > /etc/hosts" |
| cmd.append(argument) |
| argument = "gpscp -f %s /etc/hosts =:/etc/hosts" % (hostfile) |
| cmd.append(argument) |
| |
| local_run(cmd) |
| |
| def enable_coredump(hostfile = ""): |
| ''' |
| This function is used to enable core dump on each node |
| ''' |
| cmd =[] |
| hawq_home = os.getenv('GPHOME') |
| if not hawq_home: |
| print("HAWQ home directory is not defined, please check GPHOME settings.") |
| sys.exit(1) |
| source_hawq_env = ". %s/greenplum_path.sh" % hawq_home |
| cmd.append(source_hawq_env) |
| |
| argument = "gpssh -f %s ulimit -c unlimited" % (hostfile) |
| cmd.append(argument) |
| |
| local_run(cmd) |
| |
| def install_ntp(hostfile = ""): |
| ''' |
| This function is used to install and enable ntp on each node |
| ''' |
| cmd =[] |
| hawq_home = os.getenv('GPHOME') |
| if not hawq_home: |
| print("HAWQ home directory is not defined, please check GPHOME settings.") |
| sys.exit(1) |
| source_hawq_env = ". %s/greenplum_path.sh" % hawq_home |
| cmd.append(source_hawq_env) |
| |
| argument = "gpssh -f %s -e 'yum install -y ntp'" % (hostfile) |
| cmd.append(argument) |
| argument = "gpssh -f %s -e 'systemctl enable ntpd'" % (hostfile) |
| cmd.append(argument) |
| argument = "gpssh -f %s -e 'systemctl start ntpd'" % (hostfile) |
| cmd.append(argument) |
| |
| local_run(cmd) |
| |
| def disable_firewall(hostfile = ""): |
| ''' |
| This function is used to disable firewall on each node |
| ''' |
| cmd =[] |
| hawq_home = os.getenv('GPHOME') |
| if not hawq_home: |
| print("HAWQ home directory is not defined, please check GPHOME settings.") |
| sys.exit(1) |
| source_hawq_env = ". %s/greenplum_path.sh" % hawq_home |
| cmd.append(source_hawq_env) |
| |
| argument = "gpssh -f %s -e 'systemctl stop iptables'" % (hostfile) |
| cmd.append(argument) |
| argument = "gpssh -f %s -e 'systemctl disable iptables'" % (hostfile) |
| cmd.append(argument) |
| argument = "gpssh -f %s -e 'systemctl stop firewalld'" % (hostfile) |
| cmd.append(argument) |
| argument = "gpssh -f %s -e 'systemctl disable firewalld'" % (hostfile) |
| cmd.append(argument) |
| |
| local_run(cmd) |
| |
| def disable_selinux(hostfile = ""): |
| ''' |
| This function is used to disable selinux on each node |
| ''' |
| cmd =[] |
| hawq_home = os.getenv('GPHOME') |
| if not hawq_home: |
| print("HAWQ home directory is not defined, please check GPHOME settings.") |
| sys.exit(1) |
| source_hawq_env = ". %s/greenplum_path.sh" % hawq_home |
| cmd.append(source_hawq_env) |
| |
| argument = "gpssh -f %s -e 'sed -i \"s/^SELINUX\=enforcing/SELINUX\=disabled/g\" /etc/selinux/config'" % (hostfile) |
| cmd.append(argument) |
| argument = "gpssh -f %s -e 'setenforce 0'" % (hostfile) |
| cmd.append(argument) |
| |
| local_run(cmd) |
| |
| function_dispatcher = {} # Include all hawq check --postSetup --postUpgrade and --weeklyExamine functions |
| function_dispatcher.update({"overwrite_etc_hosts" : overwrite_etc_hosts}) |
| function_dispatcher.update({"enable_coredump" : enable_coredump}) |
| function_dispatcher.update({"install_ntp" : install_ntp}) |
| function_dispatcher.update({"disable_firewall" : disable_firewall}) |
| function_dispatcher.update({"disable_selinux" : disable_selinux}) |
| function_dispatcher.update({"checkHardwarePerf" : checkHardwarePerf}) |
| function_dispatcher.update({"Check config in /etc/sysctl.conf && /etc/security/limits.conf" : runTests}) |
| function_dispatcher.update({"retrieve_DB_Version" : retrieve_DB_Version}) |
| function_dispatcher.update({"retrieve_DB_process" : retrieve_DB_process}) |
| function_dispatcher.update({"check_DB_basic_function" : check_DB_basic_function}) |
| function_dispatcher.update({"check_GUC_configuration" : check_GUC_configuration}) |
| function_dispatcher.update({"check_kerberos_enablement" : check_kerberos_enablement}) |
| function_dispatcher.update({"check_active_namenode" : check_active_namenode}) |
| function_dispatcher.update({"check_hdfs_local_read" : check_hdfs_local_read}) |
| function_dispatcher.update({"checkDiskUsage" : checkDiskUsage}) |
| function_dispatcher.update({"checkMasterStandbySync" : checkMasterStandbySync}) |
| function_dispatcher.update({"checkTableAnalyzeStatus" : checkTableAnalyzeStatus}) |
| |
| if __name__ == '__main__': |
| |
| if gpcheck_info.is_root: |
| logger.info("gpcheck will perform block device's readahead checks when run as root") |
| try: |
| try: |
| checkPlatform() |
| parseargs() |
| readConfigFile() |
| |
| except GpCheckError, e: |
| logger.error(str(e)) |
| sys.exit(1) |
| if pool: |
| pool.join() |
| pool.haltWork() |
| pool.joinWorkers() |
| |
| try: |
| tmpdir = tempfile.mkdtemp(prefix='gpcheck') |
| except Exception, e: |
| logger.error("Error creating tmp dir on master: %s" % e) |
| sys.exit(1) |
| |
| try: |
| # Phase 1: collect input |
| if options.zipin: |
| readZip() # load information into gpcheck_info from zip |
| else: |
| # read host info into gpcheck_info.hosts from --file or --host |
| createHostList() |
| # collect each server's system environment configuration |
| runCollections() |
| # read collected data into gpcheck_info |
| readDataFiles() |
| # read HAWQ configuration |
| readHAWQConfiguration() |
| |
| # Phase 2: generate output |
| if options.postSetup: |
| cmd = [] |
| counter = 1 |
| |
| logger.info("-------------- Execute post-setup check --------------\n") |
| |
| # Step 1: Comment out ::1 localhost in /etc/hosts |
| logger.info("-------------- Step %d: Comment out ::1 localhost in /etc/hosts" % counter) |
| file_content = checkHostsFile() |
| function_dispatcher["overwrite_etc_hosts"](file_content, options.file) |
| counter += 1 |
| print("") |
| |
| # Step 2: Check hardware performance |
| logger.info("-------------- Step %d: Check hardware performance (This step will take a little bit more time to perform)" % counter) |
| function_dispatcher["checkHardwarePerf"]() |
| counter += 1 |
| print("") |
| |
| # Step 3: Enable core dump |
| logger.info("-------------- Step %d: Enable core dump" % counter) |
| function_dispatcher["enable_coredump"](options.file) |
| counter += 1 |
| print("") |
| |
| # Step 4: Install NTP |
| logger.info("-------------- Step %d: Install NTP" % counter) |
| function_dispatcher["install_ntp"](options.file) |
| counter += 1 |
| print("") |
| |
| # Step 5: Disable Firewall |
| logger.info("-------------- Step %d: Disable Firewall" % counter) |
| function_dispatcher["disable_firewall"](options.file) |
| counter += 1 |
| print("") |
| |
| # Step 6: Disable Selinux |
| logger.info("-------------- Step %d: Disable Selinux" % counter) |
| function_dispatcher["disable_selinux"](options.file) |
| counter += 1 |
| print("") |
| |
| # Step 7: Check configuration in /etc/sysctl.conf and /etc/security/limits.conf |
| logger.info("-------------- Step %d: Check configuration in /etc/sysctl.conf and /etc/security/limits.conf" % counter) |
| function_dispatcher["Check config in /etc/sysctl.conf && /etc/security/limits.conf"]() |
| counter += 1 |
| print("") |
| |
| # Step 8: Retrieve Database Version |
| logger.info("-------------- Step %d: Retrieve Database Version" % counter) |
| function_dispatcher["retrieve_DB_Version"]() |
| counter += 1 |
| print("") |
| |
| # Step 9: Retrieve Database Process |
| logger.info("-------------- Step %d: Retrieve Database Process" % counter) |
| function_dispatcher["retrieve_DB_process"]() |
| counter += 1 |
| print("") |
| |
| # Step 10: Check Database Basic Functions |
| logger.info("-------------- Step %d: Check Database Basic Functions" % counter) |
| function_dispatcher["check_DB_basic_function"]() |
| counter += 1 |
| print("") |
| |
| # Step 11: Check GUC Configuration |
| logger.info("-------------- Step %d: Check GUC Configuration" % counter) |
| function_dispatcher["check_GUC_configuration"]() |
| counter += 1 |
| print("") |
| |
| # Step 12: Check Kerberos Enablement |
| logger.info("-------------- Step %d: Check Kerberos Enablement" % counter) |
| function_dispatcher["check_kerberos_enablement"]() |
| counter += 1 |
| print("") |
| |
| # Step 13: Check Active Namenode |
| logger.info("-------------- Step %d: Check Active Namenode" % counter) |
| function_dispatcher["check_active_namenode"]() |
| counter += 1 |
| print("") |
| |
| # Step 14: Check HDFS Local Read |
| logger.info("-------------- Step %d: Check HDFS Local Read" % counter) |
| function_dispatcher["check_hdfs_local_read"]() |
| counter += 1 |
| print("") |
| elif options.postUpgrade: |
| counter = 1 |
| |
| logger.info("-------------- Execute post-upgrade check --------------\n") |
| |
| # Step 1: Retrieve Database Process |
| logger.info("-------------- Step %d: Retrieve Database Process" % counter) |
| function_dispatcher["retrieve_DB_process"]() |
| counter += 1 |
| print("") |
| |
| # Step 2: Retrieve Database Version |
| logger.info("-------------- Step %d: Retrieve Database Version" % counter) |
| function_dispatcher["retrieve_DB_Version"]() |
| counter += 1 |
| print("") |
| |
| # Step 10: Check Database Basic Functions |
| logger.info("-------------- Step %d: Check Database Basic Functions" % counter) |
| function_dispatcher["check_DB_basic_function"]() |
| counter += 1 |
| print("") |
| elif options.weeklyExamine: |
| cmd = [] |
| counter = 1 |
| |
| logger.info("-------------- Execute weekly-examine check --------------\n") |
| |
| # Step 1: Check Disk Usage |
| logger.info("-------------- Step %d: Check Disk Usage" % counter) |
| function_dispatcher["checkDiskUsage"]() |
| counter += 1 |
| print("") |
| |
| # Step 2: Check if Master and Standby are synchronized |
| logger.info("-------------- Step %d: Check if Master and Standby are synchronized" % counter) |
| function_dispatcher["checkMasterStandbySync"]() |
| counter += 1 |
| print("") |
| |
| # Step 3: Check if the specified table is analyzed within three days |
| logger.info("-------------- Step %d: Check if the specified table is analyzed within three days" % counter) |
| function_dispatcher["checkTableAnalyzeStatus"]() |
| counter += 1 |
| print("") |
| elif options.stdout: |
| doPrint() |
| elif options.zipout: |
| doZip("./gpcheck_%s" % time.time()) |
| else: |
| runTests() |
| if found_errors: |
| sys.exit(1) |
| |
| except GpCheckError, e: |
| logger.error(str(e)) |
| sys.exit(1) |
| |
| finally: |
| logger.info("Clean up...") |
| try: |
| if tmpdir: |
| shutil.rmtree(tmpdir) |
| except Exception, e: |
| logger.error("error removing tempdir during job cleanup: %s" % e) |
| finally: |
| if pool: |
| pool.join() |
| pool.haltWork() |
| pool.joinWorkers() |