blob: 7ce96f107e3823061a40887dcdd7056af19bc89f [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os, sys, re, tempfile, shutil, pickle, getpass, subprocess, time, glob, ConfigParser
from xml.dom import minidom
try:
from optparse import Option, OptionParser
from gppylib.gpparseopts import OptParser, OptChecker
from gppylib.gplog import get_default_logger, setup_tool_logging
from gppylib.commands.unix import getLocalHostname, getUserName, SYSTEM
from gppylib.commands.base import WorkerPool, Command, REMOTE
from gppylib.gpcheckutil import HostType, hosttype_str
from hawqpylib.hawqlib import remote_ssh_output
from pygresql.pgdb import DatabaseError
from pygresql import pg
import stat
from gppylib.gpsqlUtil import GpsqlUtil
import yaml
import xml
import time
import calendar
except ImportError, e:
sys.exit('Cannot import modules. Please check that you have sourced greenplum_path.sh. Detail: ' + str(e))
class GpCheckError(Exception):
pass
class GpCheckInfo:
def __init__(self):
self.is_root = (os.geteuid() == 0)
self.host_type = HostType.GPCHECK_HOSTTYPE_UNDEFINED
self.appliance_version = None
# record gpcheck_hostdump data for each host
self.hosts = dict() # hostname => GpCheckHost obj
# record HAWQ configuration
self.hawq_gucs = dict() # guc name => (master_value, segment_value)
self.hawq_segment_configuration = None
self.hawq_collected_ok = False # if successfully collect HAWQ gucs
self.collection_start_time = 0 # used in NTPD testing
self.collection_end_time = 0 # used in NTPD testing
class GpCheckHost:
def __init__(self, name, is_namenode=False):
self.hostname = name
self.datafile = None # pickle file on each host
self.data = None # `gpcheck_hostdump` collected data for each host
self.is_namenode = is_namenode
def __str__(self):
s = "%s datafile(%s)" % (self.hostname, self.datafile)
if self.is_namenode:
s += " namenode"
return s
class GpCheckConfig:
def __init__(self):
self.parser = ConfigParser.RawConfigParser()
self.gpcheck_config_version = 0
self.mount_points = set()
self.sysctl_expected = dict()
self.limits_expected = { # default value for limits.conf
("soft", "nofile"): 2900000,
("hard", "nofile"): 2900000,
("soft", "nproc") : 131072,
("hard", "nproc") : 131072 }
self.diskusage_mounts = []
self.diskusage_usagemax = 90 # max disk usage percentage
self.disk_space_total = 1000.0
self.hdfs_expected = { # default value for HDFS configuration
"dfs.mem.namenode.heap": 8192,
"dfs.mem.datanode.heap": 8192 }
self.hdfs_non_expected = {}
self.hdfs_ha_expected = {}
self.hdfs_kerberos_expected = {}
self.hdfs_ha_kerberos_expected = {}
self.yarn_expected = {}
self.yarn_non_expected = {}
self.yarn_ha_expected = {}
self.yarn_kerberos_expected = {}
self.yarn_ha_kerberos_expected = {}
self.hawq_expected = {}
self.hawq_kerberos_expected = {}
self.hawq_yarn_expected = {}
self.default_hash_table_bucket_number = 8
self.hawq_rm_nvseg_perquery_limit = 512
self.hawq_rm_nvseg_perquery_perseg_limit = 8
self.magma_shm_limit_per_block = 1
self.disk_write_min_bandwidth = 200.0
self.disk_read_min_bandwidth = 700.0
self.stream_min_bandwidth = 10000.0
self.network_min_bandwidth = 1000.0
def readConfigFile(self, config_file):
parsed_list = self.parser.read(config_file)
if len(parsed_list) != 1:
raise GpCheckError("cannot open file!")
if not self.parser.has_section("linux.sysctl"):
raise GpCheckError("require section 'linux.sysctl'")
section = "global"
if self.parser.has_option(section, "configfile_version"):
self.gpcheck_config_version = self.parser.getint(section, "configfile_version")
section = "linux.mount"
if self.parser.has_option(section, "mount.points"):
for p in self.parser.get(section, "mount.points").split(","):
self.mount_points.add(p.strip())
section = 'linux.sysctl'
for opt in self.parser.options(section):
if re.match('sysctl\.', opt):
fields = opt.split('sysctl.')
if len(fields) != 2:
raise GpCheckError("Bad config line entry '%s'" % opt)
self.sysctl_expected[fields[1]] = self.parser.get(section, opt)
section = "linux.limits"
for opt in self.parser.options(section):
key = tuple(opt.split("."))
self.limits_expected[key] = self.parser.getint(section, opt)
section = "linux.diskusage"
if self.parser.has_option(section, "diskusage.monitor.mounts"):
self.diskusage_mounts = [m.strip() for m in self.parser.get(section, "diskusage.monitor.mounts").split(",")]
if self.parser.has_option(section, "diskusage.monitor.usagemax"):
self.diskusage_usagemax = self.parser.get(section, "diskusage.monitor.usagemax")
try:
if self.diskusage_usagemax[-1] == "%":
self.diskusage_usagemax = int(self.diskusage_usagemax[:-1])
else:
self.diskusage_usagemax = int(self.diskusage_usagemax)
except Exception, e:
raise GpCheckError("Bad config entry value '%s' for 'diskusage.monitor.usagemax': %s" %
(self.diskusage_usagemax, e))
if self.parser.has_option(section, "disk.space.total.GB"):
try:
self.disk_space_total = float(self.parser.get(section, "disk.space.total.GB"))
except expression as e:
raise GpCheckError("Fail to read section 'linux.diskusage': %s" % (str(e)))
if not self.parser.has_section('hdfs.base'):
if not self.parser.has_section("hdfs"):
raise GpCheckError("require section 'hdfs'")
section = 'hdfs'
for opt in self.parser.options(section):
self.hdfs_expected[opt] = self.parser.get(section, opt)
try:
self.hdfs_expected["dfs.mem.namenode.heap"] = int(self.hdfs_expected["dfs.mem.namenode.heap"])
self.hdfs_expected["dfs.mem.datanode.heap"] = int(self.hdfs_expected["dfs.mem.datanode.heap"])
except ValueError, e:
raise GpCheckError("'dfs.mem.namenode.heap' or 'dfs.mem.namenode.heap' should be a number: %s" % e)
else:
section = 'hdfs.base'
for opt in self.parser.options(section):
self.hdfs_expected[opt] = self.parser.get(section, opt)
try:
self.hdfs_expected["dfs.mem.namenode.heap"] = int(self.hdfs_expected["dfs.mem.namenode.heap"])
self.hdfs_expected["dfs.mem.datanode.heap"] = int(self.hdfs_expected["dfs.mem.datanode.heap"])
except ValueError, e:
raise GpCheckError("'dfs.mem.namenode.heap' or 'dfs.mem.namenode.heap' should be a number: %s" % e)
section = 'hdfs.non'
for opt in self.parser.options(section):
self.hdfs_non_expected[opt] = self.parser.get(section, opt)
section = 'hdfs.ha'
for opt in self.parser.options(section):
self.hdfs_ha_expected[opt] = self.parser.get(section, opt)
section = 'hdfs.kerberos'
for opt in self.parser.options(section):
self.hdfs_kerberos_expected[opt] = self.parser.get(section, opt)
section = 'hdfs.ha.kerberos'
for opt in self.parser.options(section):
self.hdfs_ha_kerberos_expected[opt] = self.parser.get(section, opt)
section = 'yarn.base'
for opt in self.parser.options(section):
self.yarn_expected[opt] = self.parser.get(section, opt)
section = 'yarn.non'
for opt in self.parser.options(section):
self.yarn_non_expected[opt] = self.parser.get(section, opt)
section = 'yarn.ha'
for opt in self.parser.options(section):
self.yarn_ha_expected[opt] = self.parser.get(section, opt)
section = 'yarn.kerberos'
for opt in self.parser.options(section):
self.yarn_kerberos_expected[opt] = self.parser.get(section, opt)
section = 'yarn.ha.kerberos'
for opt in self.parser.options(section):
self.yarn_ha_kerberos_expected[opt] = self.parser.get(section, opt)
section = 'hawq.base'
for opt in self.parser.options(section):
self.hawq_expected[opt] = self.parser.get(section, opt)
section = 'hawq.kerberos'
for opt in self.parser.options(section):
self.hawq_kerberos_expected[opt] = self.parser.get(section, opt)
section = 'hawq.yarn'
for opt in self.parser.options(section):
self.hawq_yarn_expected[opt] = self.parser.get(section, opt)
section = "hawq.guc"
if self.parser.has_option(section, "default.hash.table.bucket.number"):
try:
self.default_hash_table_bucket_number = int(self.parser.get(section, "default.hash.table.bucket.number"))
except expression as e:
raise GpCheckError("Fail to read section 'hawq.guc': %s" % (str(e)))
if self.parser.has_option(section, "hawq.rm.nvseg.perquery.limit"):
try:
self.hawq_rm_nvseg_perquery_limit = int(self.parser.get(section, "hawq.rm.nvseg.perquery.limit"))
except expression as e:
raise GpCheckError("Fail to read section 'hawq.guc': %s" % (str(e)))
if self.parser.has_option(section, "hawq.rm.nvseg.perquery.perseg.limit"):
try:
self.hawq_rm_nvseg_perquery_perseg_limit = int(self.parser.get(section, "hawq.rm.nvseg.perquery.perseg.limit"))
except expression as e:
raise GpCheckError("Fail to read section 'hawq.guc': %s" % (str(e)))
if self.parser.has_option(section, "magma.shm.limit.per.block"):
try:
self.magma_shm_limit_per_block = int(self.parser.get(section, "magma.shm.limit.per.block"))
except expression as e:
raise GpCheckError("Fail to read section 'hawq.guc': %s" % (str(e)))
section = "hardware.performance"
if self.parser.has_option(section, "disk.write.min.bandwidth.MBs"):
try:
self.disk_write_min_bandwidth = float(self.parser.get(section, "disk.write.min.bandwidth.MBs"))
except expression as e:
raise GpCheckError("Fail to read section 'hardware.performance': %s" % (str(e)))
if self.parser.has_option(section, "disk.read.min.bandwidth.MBs"):
try:
self.disk_read_min_bandwidth = float(self.parser.get(section, "disk.read.min.bandwidth.MBs"))
except expression as e:
raise GpCheckError("Fail to read section 'hardware.performance': %s" % (str(e)))
if self.parser.has_option(section, "stream.min.bandwidth.MBs"):
try:
self.stream_min_bandwidth = float(self.parser.get(section, "stream.min.bandwidth.MBs"))
except expression as e:
raise GpCheckError("Fail to read section 'hardware.performance': %s" % (str(e)))
if self.parser.has_option(section, "network.min.bandwidth.MBs"):
try:
self.network_min_bandwidth = float(self.parser.get(section, "network.min.bandwidth.MBs"))
except expression as e:
raise GpCheckError("Fail to read section 'hardware.performance': %s" % (str(e)))
###### Global Variables #############
logger = get_default_logger()
EXECNAME = os.path.split(__file__)[-1]
setup_tool_logging(EXECNAME,getLocalHostname(),getUserName())
options = None
GPHOME = None
GPCHECK_CONFIG_FILE = None
HADOOP_HOME = None
gpcheck_info = GpCheckInfo()
gpcheck_config = GpCheckConfig()
pool = WorkerPool()
tmpdir = None
found_errors = 0
HAWQ_GUC_MEMORY = "hawq_re_memory_overcommit_max"
def checkPlatform():
host_type_map = { "linux": HostType.GPCHECK_HOSTTYPE_GENERIC_LINUX,
"sunos": HostType.GPCHECK_HOSTTYPE_GENERIC_SOLARIS }
try:
gpcheck_info.host_type = host_type_map[SYSTEM.getName()]
logger.info("Detected platform: %s" % hosttype_str(gpcheck_info.host_type))
except KeyError:
raise GpCheckError("No tests exists for this platform in gpcheck")
def parse_host_list_file(host_file):
host_list = list()
with open(host_file) as f:
hosts = f.readlines()
for host in hosts:
host = host.split("#",1)[0].strip()
if host:
host_list.append(host)
return host_list
def parseargs():
global options, GPHOME, HADOOP_HOME, GPCHECK_CONFIG_FILE
parser = OptParser(option_class=OptChecker, version='%prog version $Revision: #1 $')
parser.remove_option('-h')
parser.add_option('-?', '--help', action='help')
parser.add_option('--verbose', action='store_true')
parser.add_option('--stdout', action='store_true')
parser.add_option('--zipout', action='store_true')
parser.add_option('--zipin', type='string')
parser.add_option('--gphome', type='string')
# for HDFS xml and memory check
parser.add_option('--hadoop', '--hadoop-home', type='string')
parser.add_option('--hdfs', action='store_true')
parser.add_option('--hdfs-ha', dest="hdfs_ha", action='store_true')
parser.add_option('--yarn', action='store_true')
parser.add_option('--yarn-ha', dest="yarn_ha", action='store_true')
parser.add_option('--kerberos', action='store_true')
parser.add_option('-c', '--config', type='string') # optional: gpcheck config file path
parser.add_option('-f', '--file', type='string') # host file, for testing a list of hosts
parser.add_option('-h', '--host', type='string') # test a single host
# Adding new options: postSetup postUpgrade weeklyExam
parser.add_option('--postSetup', action='store_true')
parser.add_option('--postUpgrade', action='store_true')
parser.add_option('--weeklyExamine', action='store_true')
parser.add_option('-t', '--table', type='string') # optional: table to analyze
(options, args) = parser.parse_args()
if len(args) > 0:
if args[0] == 'help':
parser.print_help(sys.stderr)
sys.exit(0)
# GPHOME must be found
GPHOME = options.gphome if options.gphome else os.environ.get("GPHOME")
if not GPHOME:
raise GpCheckError("GPHOME not set, must be specified in --gphome")
GPCHECK_CONFIG_FILE = options.config if options.config else "%s/etc/gpcheck.cnf" % GPHOME
logger.info("Checks uses config file: %s", GPCHECK_CONFIG_FILE)
HADOOP_HOME = options.hadoop if options.hadoop else os.environ.get("HADOOP_HOME")
if not HADOOP_HOME:
checkFailed(None, "utility will SKIP HDFS configuration check because HADOOP_HOME is not specified in environment variable or --hadoop")
if options.yarn and not HADOOP_HOME:
options.yarn = False
checkFailed(None, "utility will SKIP YARN configuration check because HADOOP_HOME is not specified in environment variable or --hadoop")
# params check
if not options.file and not options.host and not options.zipin:
raise GpCheckError(" --file or --host or --zipin must be specified")
if options.file and options.host:
raise GpCheckError(" You can specify either --file or --host, but not both")
if options.stdout and options.zipout:
raise GpCheckError(" You can specify either --stdout or --zipout, but not both")
def readConfigFile():
try:
gpcheck_config.readConfigFile(GPCHECK_CONFIG_FILE)
except Exception, e:
raise GpCheckError("Field to read gpcheck config file '%s':\n%s" % (GPCHECK_CONFIG_FILE, e))
def checkFailed(host, msg):
global found_errors
found_errors += 1
if host:
logger.error("host(%s): %s", host, msg)
else:
logger.error(msg)
def getHDFSNamenodeHost():
core_site_file = os.path.join(HADOOP_HOME, "etc/hadoop/core-site.xml")
hdfs_site_file = os.path.join(HADOOP_HOME, "etc/hadoop/hdfs-site.xml")
logger.info("try to detect namenode from %s" % core_site_file)
# for processing property xml
getPropName = lambda node: node.getElementsByTagName('name')[0].childNodes[0].data
getPropValue = lambda node: node.getElementsByTagName('value')[0].childNodes[0].data
# read namenode address from core-site.xml
with open(core_site_file) as f:
xmldoc = minidom.parse(f)
namenode_addr = ''
for node in xmldoc.getElementsByTagName('property'):
if getPropName(node) == 'fs.default.name' or getPropName(node) == 'fs.defaultFS':
fsurl = getPropValue(node).strip()
namenode_list_alias = re.search(r"//([^:/]*)", fsurl).group(1)
if_ha_disabled = re.search(".*:[0-9]+$", fsurl)
if if_ha_disabled:
namenode_addr = namenode_list_alias
else:
namenode_addr = ''
break
# run hostname command on remote to get actual hostname
if namenode_addr == '':
ha_namenode_list = ''
default_namenode_alias = ''
with open(hdfs_site_file) as f:
xmldoc = minidom.parse(f)
for node in xmldoc.getElementsByTagName('property'):
if re.search('dfs.ha.namenodes.*', getPropName(node).strip()):
ha_namenode_list = getPropValue(node).strip()
default_namenode_alias = ha_namenode_list.split(',')[0].strip()
break
if ha_namenode_list == '':
logger.error("cannot detect namenode from %s" % core_site_file)
raise GpCheckError("cannot detect namenode from %s" % core_site_file)
#sys.exit(1)
else:
with open(hdfs_site_file) as f:
xmldoc = minidom.parse(f)
for node in xmldoc.getElementsByTagName('property'):
namenode_rpc_address = "dfs.namenode.rpc-address.%s.%s" % (namenode_list_alias,
default_namenode_alias)
if getPropName(node) == namenode_rpc_address:
default_namenode_rpc_address = getPropValue(node).strip()
namenode_addr = default_namenode_rpc_address.split(':')[0].strip()
if namenode_addr == '':
raise GpCheckError("cannot detect namenode from %s" % core_site_file)
else:
cmd = Command(namenode_addr, "hostname", REMOTE, namenode_addr)
pool.addCommand(cmd)
pool.join()
items = pool.getCompletedItems()
for i in items:
if i.results.rc or i.results.halt or not i.results.completed:
raise Exception("error running 'hostname' command: %s" % i.results.stderr.strip())
namenode_host = i.results.stdout.strip()
logger.info("detect namenode hostname to be %s" % namenode_host)
return namenode_host
def createHostList():
if options.verbose:
logger.info("trying to deduplicate hosts...")
hostlist = []
# read the host file if present
if options.file:
try:
with open(options.file, "r") as f:
hostlist = [line.strip() for line in f.readlines() if line.strip()]
except IOError, e:
raise GpCheckError("error reading host file '%s': %s" % (options.file, str(e)))
else:
hostlist.append(options.host)
# get actual hostname and deduplicate
try:
for hostname in hostlist:
cmd = Command(hostname, "hostname", REMOTE, hostname)
pool.addCommand(cmd)
pool.join()
items = pool.getCompletedItems()
for i in items:
if i.results.rc or i.results.halt or not i.results.completed:
raise Exception("error running 'hostname' on host '%s': %s" % (i.remoteHost, i.results.stderr.strip()))
actualHostname = i.results.stdout.strip()
if actualHostname not in gpcheck_info.hosts:
gpcheck_info.hosts[actualHostname] = GpCheckHost(actualHostname)
except Exception, e:
raise GpCheckError("failed to collect 'hostname' on servers: %s" % str(e))
if options.verbose:
logger.info("trying to deduplicate hosts [success]")
if HADOOP_HOME:
try:
namenode_host = getHDFSNamenodeHost()
if namenode_host in hostlist:
gpcheck_info.hosts[namenode_host] = GpCheckHost(namenode_host, is_namenode=True)
else:
logger.warning("utility will skip HDFS namenode check since it's not in current host list.")
except Exception, e:
checkFailed(None, "utility will SKIP HDFS namenode check: %s" % str(e))
def runCollections():
logger.info("trying to collect server configuration...")
# run gpcheck_hostdump on each server
runCollectionOnServers()
# copy hostdump file to master
copyFilesLocally()
# delete hostdump file on remote servers
deleteRemoteFiles()
logger.info("trying to collect server configuration [success]")
def runCollectionOnServers():
gpcheck_info.collection_start_time = time.time()
def getDumpCommand():
if gpcheck_info.host_type == HostType.GPCHECK_HOSTTYPE_GENERIC_LINUX:
host_type_cl = "--linux"
elif gpcheck_info.host_type == HostType.GPCHECK_HOSTTYPE_GENERIC_SOLARIS:
host_type_cl = "--solaris"
else:
raise GpCheckError("unsupported host type")
cmd = "%s/sbin/gpcheck_hostdump --hawq %s" % (GPHOME, host_type_cl)
cmd += " --sysctl %s" % ",".join(gpcheck_config.sysctl_expected.keys())
if HADOOP_HOME:
cmd += " --hadoop %s" % HADOOP_HOME
if options.yarn or options.yarn_ha:
cmd += " --yarn"
return cmd
try:
cmdStr = getDumpCommand()
for host in gpcheck_info.hosts:
if options.verbose:
logger.info("collect data on host: %s" % host)
cmd = Command(host, cmdStr, REMOTE, host)
pool.addCommand(cmd)
pool.join()
items = pool.getCompletedItems()
for i in items:
if i.results.rc or i.results.halt or not i.results.completed:
raise Exception("error running gpcheck_hostdump on '%s': %s" % (i.remoteHost, i.results.stderr.strip()))
gpcheck_info.hosts[i.remoteHost].datafile = i.results.stdout.strip()
except Exception, e:
raise GpCheckError("Failed to collect data from servers:\n%s" % e)
gpcheck_info.collection_end_time = time.time()
def copyFilesLocally():
if options.verbose:
logger.info("copy hostdump files from remote servers to master")
try:
for host in gpcheck_info.hosts:
cmdStr = "scp %s:%s %s/%s.data" % (host, gpcheck_info.hosts[host].datafile, tmpdir, host)
if options.verbose:
logger.info(cmdStr)
cmd = Command(host, cmdStr)
pool.addCommand(cmd)
pool.join()
items = pool.getCompletedItems()
for i in items:
if i.results.rc or i.results.halt or not i.results.completed:
raise Exception("error running command %s: %s" % (i.cmdStr, i.results.stderr.strip()))
except Exception, e:
raise GpCheckError("Failed to scp remote hostdump file to master:\n%s" % e)
def deleteRemoteFiles():
if options.verbose:
logger.info("delete hostdump files on remote servers")
try:
for host in gpcheck_info.hosts:
cmdStr = "rm -f %s" % gpcheck_info.hosts[host].datafile
if options.verbose:
logger.info(cmdStr)
cmd = Command(host, cmdStr, REMOTE, host)
pool.addCommand(cmd)
pool.join()
items = pool.getCompletedItems()
for i in items:
if i.results.rc or i.results.halt or not i.results.completed:
raise Exception("error running command %s: %s" % (i.cmdStr, i.results.stderr.strip()))
except Exception, e:
raise GpCheckError("Failed to delete remote hostdump file:\n%s" % e)
def readDataFiles():
for host in gpcheck_info.hosts:
fname = "%s/%s.data" % (tmpdir, host)
try:
with open(fname, "rb") as f:
gpcheck_info.hosts[host].data = pickle.load(f)
except Exception, e:
raise GpCheckError("Failed to load pickle file '%s': %s" % (fname, e))
def readHAWQConfiguration():
'''
This function is used to check hawq_re_memory_overcommit_max parameter.
Sets the maximum quota of memory overcommit (in MB) per physical segment for resource enforcement.
This parameter sets the memory quota that can be overcommited beyond the memory quota dynamically assigned by the resource manager.
'''
if options.verbose:
logger.info("trying to collect HAWQ configuration...")
try:
_sqlUtil = GpsqlUtil()
gpcheck_info.hawq_segment_configuration = _sqlUtil.performQuery("select * from gp_segment_configuration;")
except expression as identifier:
logger.error("utility cannot perform HAWQ CPU and Memory check because failed to connect to HAWQ")
logger.error(str(e))
# read Memory GUC using hawqconfig
command = "hawqconfig -s %s" % HAWQ_GUC_MEMORY
p = subprocess.Popen(command, shell = True,
stdout = subprocess.PIPE, stderr = subprocess.PIPE)
result = p.communicate()
match_master = re.search(r'Value : (\d+)', result[0])
if match_master:
gpcheck_info.hawq_gucs[HAWQ_GUC_MEMORY] = (int(match_master.group(1)))
else:
checkFailed(None, "utility cannot perform HAWQ Memory check because failed to get GUC value using '%s'" % command)
return
gpcheck_info.hawq_collected_ok = True
if options.verbose:
logger.info("trying to collect HAWQ configuration [success]")
def testConnectEmc(host):
if not host.is_a_master:
return
expected = "Running"
if host.data.connectemc.output != expected:
checkFailed(host.hostname, "Connect EMC is not running on master (try /etc/init.d/connectemc status)")
def testSolarisEtcSystem(host):
requiredValues = { 'rlim_fd_cur' : '65536',
'zfs:zfs_arc_max' : '0x600000000',
'pcplusmp:apic_panic_on_nmi' : '1',
'nopanicdebug' : '1' }
results = dict()
for k in requiredValues.keys():
results[k] = 0
for key in host.data.etc_system.parameters.keys():
if key not in requiredValues:
continue
foundValue = host.data.etc_system.parameters[key]
if foundValue == requiredValues[key]:
results[key] = 1
for k in results.keys():
if results[k]:
continue
checkFailed(host.hostname, "/etc/system is missing expected line 'set %s=%s'" % (k, requiredValues[k]))
def testSolarisEtcProject(host):
requiredValues = { 'default:3::::project.max-sem-ids=(priv,1024,deny);process.max-file-descriptor=(priv,252144,deny)' : 0 }
unexpectedValues = set(['default:3::::'])
for line in host.data.etc_project.lines:
if line in unexpectedValues:
checkFailed(host.hostname, "unexpected line in /etc/project: '%s'" % line)
continue
if line in requiredValues:
requiredValues[line] = 1
for line in requiredValues.keys():
if requiredValues[line]:
continue
checkFailed(host.hostname, "/etc/project is missing expected line '%s'" % line)
def testSolarisEtcUserAttr(host):
requiredValues = { 'gpadmin::::defaultpriv=basic,dtrace_user,dtrace_proc' : 0 }
for line in host.data.etc_user_attr.lines:
if line in requiredValues:
requiredValues[line] = 1
for line in requiredValues.keys():
if requiredValues[line]:
continue
checkFailed(host.hostname, "/etc/user_attr is missing expected line '%s'" % line)
def testHAWQGUC(host):
if not gpcheck_info.hawq_collected_ok:
return
if options.verbose:
logger.info("-- test HAWQ CPU/Memory Guc Settings")
c = gpcheck_info.hawq_segment_configuration
master_hostname = filter(lambda x: x['role'] == 'm', c)[0]['hostname']
if host.hostname not in map(lambda x: x['hostname'], c):
logger.warning("host '%s' is not in HAWQ array" % host.hostname)
return
actual_total_memory = host.data.machine.memory_in_MB
guc_vmemsize_master = gpcheck_info.hawq_gucs[HAWQ_GUC_MEMORY]
# segment count on this host
num_segments = len(filter(lambda x: x['hostname'] == host.hostname, c))
if host.hostname == master_hostname:
if num_segments > 1:
checkFailed(host.hostname, "HAWQ master host has segments configured")
if actual_total_memory < guc_vmemsize_master:
checkFailed(host.hostname, "HAWQ master host memory size '%s' is less than the '%s' size '%s'" % (
actual_total_memory, HAWQ_GUC_MEMORY, guc_vmemsize_master))
return
# check HAWQ master's memory size
expected_vmemory_size = 8192
if guc_vmemsize_master != expected_vmemory_size:
checkFailed(host.hostname, "HAWQ master's %s GUC value is %s, expected %s" % (
HAWQ_GUC_MEMORY, guc_vmemsize_master, expected_vmemory_size))
else:
datanode_mem = gpcheck_config.hdfs_expected["dfs.mem.datanode.heap"]
# check HAWQ memory size
if actual_total_memory < datanode_mem:
checkFailed(host.hostname, "HAWQ segment's host memory size '%s' is less than the expected data node memory size '%s'" % (
actual_total_memory, datanode_mem))
logger.warning("please change the expected data node memory 'dfs.mem.datanode.heap' in gpcheck.cnf file")
logger.warning("SKIP '%s' check" %(HAWQ_GUC_MEMORY))
return
expect_vmemsize_per_segment = 8192
if guc_vmemsize_master != expect_vmemsize_per_segment:
checkFailed(host.hostname, "HAWQ segment's %s GUC value on this host is %s, expected %s" % (
HAWQ_GUC_MEMORY, guc_vmemsize_master, expect_vmemsize_per_segment))
def testDiskCapacity(host):
if options.verbose:
logger.info("-- test Disk Capacity")
for line in host.data.diskusage.lines:
if len(gpcheck_config.diskusage_mounts) == 0 or line.mount in gpcheck_config.diskusage_mounts:
actual_usage = int(line.used_percent[:-1])
if actual_usage > gpcheck_config.diskusage_usagemax:
checkFailed(host.hostname,
"potential disk full risk: %s mounted on %s has used %s space" % (
line.fs, line.mount, line.used_percent))
return
def testHAWQconfig(host):
hawq = host.data.hawq
hdfs = host.data.hdfs
if hawq is None:
return # skip HAWQ test when hawq is None
if hdfs is None:
return # skip HAWQ test when hdfs is None
if options.verbose:
logger.info("-- test HAWQ config")
if hawq.errormsg:
checkFailed(host.hostname, "collect HAWQ configuration error: %s" % hawq.errormsg)
return
datanode_list = list()
if HADOOP_HOME:
datanode_list = parse_host_list_file("%s/etc/hadoop/slaves" % HADOOP_HOME)
is_datanode = False
if host.hostname in datanode_list:
is_datanode = True
expect_config = gpcheck_config.hawq_expected
if options.kerberos:
expect_config.update(gpcheck_config.hawq_kerberos_expected)
if options.yarn or options.yarn_ha:
expect_config.update(gpcheck_config.hawq_yarn_expected)
actual_config = hawq.site_config
hdfs_actual_config = hdfs.site_config
for exp_key, exp_val in expect_config.items():
if exp_key not in actual_config:
checkFailed(host.hostname, "HAWQ configuration missing: '%s' needs to be set to '%s'" % (exp_key, exp_val))
else:
actual_val = actual_config[exp_key]
et = (exp_key, exp_val, actual_val)
if exp_key == "dfs.block.local-path-access.user":
if exp_val not in actual_val.split(','):
checkFailed(host.hostname, "HDFS configuration: '%s' should include user '%s', actual value is '%s'" % et)
elif exp_key == "dfs.namenode.handler.count":
if int(exp_val) > int(actual_val):
checkFailed(host.hostname, "HDFS configuration: '%s' should be at least '%s', actual value is '%s'" % et)
else:
if exp_val != actual_val:
checkFailed(host.hostname, "HAWQ configuration: expected '%s' for '%s', actual value is '%s'" % et)
if not options.kerberos:
if 'hadoop.security.authentication' in actual_config:
if actual_config['hadoop.security.authentication'] != 'simple':
checkFailed(host.hostname, "HAWQ configuration: expected '%s' for '%s', actual value is '%s'" % ('simple', 'hadoop.security.authentication', actual_config['hadoop.security.authentication']))
if 'hadoop.security.authentication' in hdfs_actual_config:
if hdfs_actual_config['hadoop.security.authentication'] != 'simple':
checkFailed(host.hostname, "HAWQ configuration: expected '%s' for '%s', actual value is '%s'" % ('simple', 'hadoop.security.authentication', hdfs_actual_config['hadoop.security.authentication']))
if options.yarn or options.yarn_ha:
hawq_yarn_property_exist_list = ['hawq_rm_yarn_address', 'hawq_rm_yarn_scheduler_address', 'hawq_rm_yarn_app_name']
for item in hawq_yarn_property_exist_list:
if item in actual_config:
if not actual_config[item]:
checkFailed(host.hostname, "HAWQ configuration: yarn.resourcemanager.address is empty")
else:
checkFailed(host.hostname, "HAWQ configuration: yarn.resourcemanager.address not defined")
if 'dfs.client.read.shortcircuit' not in actual_config:
checkFailed(host.hostname, "HAWQ configuration dfs.client.read.shortcircuit not defined")
if 'dfs.client.read.shortcircuit' not in hdfs_actual_config:
checkFailed(host.hostname, "HAWQ configuration dfs.client.read.shortcircuit not defined")
if 'dfs.domain.socket.path' not in actual_config:
checkFailed(host.hostname, "HAWQ configuration dfs.domain.socket.path not defined")
if 'dfs.domain.socket.path' not in hdfs_actual_config:
checkFailed(host.hostname, "HDFS configuration dfs.domain.socket.path not defined")
if is_datanode and 'dfs.domain.socket.path' in actual_config and 'dfs.domain.socket.path' in hdfs_actual_config:
if actual_config['dfs.domain.socket.path'] != hdfs_actual_config['dfs.domain.socket.path']:
checkFailed(host.hostname, "HAWQ configuration: dfs.domain.socket.path expect to have the same value with HDFS configuration")
else:
cmd = "ls -l %s" % actual_config['dfs.domain.socket.path']
(result, output, errmsg) = remote_ssh_output(cmd, host.hostname, '')
if result == 0:
if output.split(' ')[0][7:9] != 'rw':
checkFailed(host.hostname, "HAWQ configuration dfs.domain.socket.path: %s should have R/W access for both hawq and HDFS on %s" % (actual_config['dfs.domain.socket.path'], host.hostname))
else:
checkFailed(host.hostname, "HAWQ configuration dfs.domain.socket.path: %s, does not exist on %s" % (actual_config['dfs.domain.socket.path'], host.hostname))
if 'output.replace-datanode-on-failure' in actual_config and len(datanode_list) > 0:
if len(datanode_list) < 4:
if actual_config['output.replace-datanode-on-failure'] == 'true':
checkFailed(host.hostname, "HAWQ configuration: output.replace-datanode-on-failure expect false, current is true")
else:
if actual_config['output.replace-datanode-on-failure'] == 'false':
checkFailed(host.hostname, "HAWQ configuration: output.replace-datanode-on-failure expect true, current is false")
else:
checkFailed(host.hostname, "HAWQ configuration: output.replace-datanode-on-failure not defined")
def testDiskCapacity(host):
if options.verbose:
logger.info("-- test Disk Capacity")
for line in host.data.diskusage.lines:
if len(gpcheck_config.diskusage_mounts) == 0 or line.mount in gpcheck_config.diskusage_mounts:
actual_usage = int(line.used_percent[:-1])
if actual_usage > gpcheck_config.diskusage_usagemax:
checkFailed(host.hostname,
"potential disk full risk: %s mounted on %s has used %s space" % (
line.fs, line.mount, line.used_percent))
return
def testHDFSConfig(host):
hdfs = host.data.hdfs
if hdfs is None:
return # skip HDFS test when hdfs is None
if options.verbose:
logger.info("-- test HDFS config")
if hdfs.errormsg:
checkFailed(host.hostname, "collect HDFS configuration error: %s" % hdfs.errormsg)
return
expect_config = gpcheck_config.hdfs_expected
if not options.hdfs_ha and not options.kerberos:
expect_config.update(gpcheck_config.hdfs_non_expected)
if options.hdfs_ha and not options.kerberos:
expect_config.update(gpcheck_config.hdfs_ha_expected)
if options.kerberos and not options.hdfs_ha:
expect_config.update(gpcheck_config.hdfs_kerberos_expected)
if options.kerberos and options.hdfs_ha:
expect_config.update(gpcheck_config.hdfs_ha_kerberos_expected)
if options.yarn or options.yarn_ha:
expect_config.update(gpcheck_config.yarn_expected)
if not options.yarn_ha and not options.kerberos:
expect_config.update(gpcheck_config.yarn_non_expected)
if options.yarn_ha:
expect_config.update(gpcheck_config.yarn_ha_expected)
if options.kerberos:
expect_config.update(gpcheck_config.yarn_kerberos_expected)
actual_config = hdfs.site_config
actual_heap_size = hdfs.namenode_heap_size if host.is_namenode else hdfs.datanode_heap_size
if host.data.machine.memory_in_MB < actual_heap_size:
checkFailed(host.hostname, "host memory size '%s' is less than the java max heap size '%s'" % (host.data.machine.memory_in_MB, actual_heap_size))
# test hdfs_site.xml setting
for exp_key, exp_val in expect_config.items():
if exp_key.startswith("dfs.mem"):
continue # these options belongs to memory tests
if exp_key not in actual_config:
checkFailed(host.hostname, "HDFS configuration missing: '%s' needs to be set to '%s'" % (exp_key, exp_val))
else:
actual_val = actual_config[exp_key]
et = (exp_key, exp_val, actual_val)
if exp_key == "dfs.block.local-path-access.user":
if exp_val not in actual_val.split(','):
checkFailed(host.hostname, "HDFS configuration: '%s' should include user '%s', actual value is '%s'" % et)
elif exp_key == "dfs.namenode.handler.count":
if int(exp_val) > int(actual_val):
checkFailed(host.hostname, "HDFS configuration: '%s' should be at least '%s', actual value is '%s'" % et)
else:
if exp_val != actual_val:
checkFailed(host.hostname, "HDFS configuration: expected '%s' for '%s', actual value is '%s'" % et)
# test hadoop memory setting
expect_namenode_heap = expect_config["dfs.mem.namenode.heap"]
expect_datanode_heap = expect_config["dfs.mem.datanode.heap"]
if host.is_namenode and actual_heap_size < expect_namenode_heap:
checkFailed(host.hostname, "Namenode Java heap size is only %sM, we recommends at least %sM" %
(actual_heap_size, expect_namenode_heap))
if not host.is_namenode and actual_heap_size < expect_datanode_heap:
checkFailed(host.hostname, "Datanode Java heap size is only %sM, expect value is %sM" %
(actual_heap_size, expect_datanode_heap))
# Check if nodemanager direcotries exists
directory_check_list = []
datanode_list = list()
if HADOOP_HOME:
datanode_list = parse_host_list_file("%s/etc/hadoop/slaves" % HADOOP_HOME)
is_datanode = False
if host.hostname in datanode_list:
is_datanode = True
if options.yarn or options.yarn_ha:
yarn_enabled = True
else:
yarn_enabled = False
if yarn_enabled and is_datanode:
if 'yarn.nodemanager.local-dirs' in actual_config:
directory_check_list += actual_config['yarn.nodemanager.local-dirs'].split(',')
else:
checkFailed(host.hostname, "YARN configuration: yarn.nodemanager.local-dirs not defined")
if 'yarn.nodemanager.log-dirs' in actual_config:
directory_check_list += actual_config['yarn.nodemanager.log-dirs'].split(',')
else:
checkFailed(host.hostname, "YARN configuration: yarn.nodemanager.log-dirs not defined")
for directory in directory_check_list:
cmd = "test -e %s" % directory
(result, output, errmsg) = remote_ssh_output(cmd, host.hostname, '')
if result != 0:
checkFailed(host.hostname, "YARN nodemanager directory %s does not exist" % directory)
# Check if resource manager property exists
if options.yarn:
yarn_property_exist_list = ['yarn.resourcemanager.address', 'yarn.resourcemanager.scheduler.address']
if options.yarn_ha:
yarn_property_exist_list = ['yarn.resourcemanager.hostname.rm1', 'yarn.resourcemanager.hostname.rm2']
if yarn_enabled:
for item in yarn_property_exist_list:
if item in actual_config:
if not actual_config[item]:
checkFailed(host.hostname, "YARN configuration: %s is empty" % item)
else:
checkFailed(host.hostname, "YARN configuration: %s not defined" % item)
# Check yarn kerberos properties
if yarn_enabled and options.kerberos:
yarn_kerberos_check_list = ['yarn.nodemanager.keytab', 'yarn.nodemanager.principal', \
'yarn.resourcemanager.keytab', 'yarn.resourcemanager.principal']
for item in yarn_kerberos_check_list:
if item in actual_config:
if not actual_config[item]:
checkFailed(host.hostname, "YARN configuration: %s is empty, expected non-empty" % item)
else:
checkFailed(host.hostname, "YARN configuration missing: %s" % item)
def testIOSchedulers(host):
if options.verbose:
logger.info("-- test IO scheduler")
if host.data.ioschedulers.errormsg:
checkFailed(host.hostname, "collect IO scheduler data error: %s" % host.data.ioschedulers.errormsg)
return
expectedScheduler = "deadline"
for dev in host.data.ioschedulers.devices:
scheduler = host.data.ioschedulers.devices[dev]
if scheduler != expectedScheduler:
checkFailed(host.hostname,
"on device (%s) IO scheduler '%s' does not match expected value '%s'" % (dev, scheduler, expectedScheduler))
# perform this test only run as root
def testBlockdev(host):
if host.data.blockdev is None:
return
if options.verbose:
logger.info("-- test block device readahead value")
expectedReadAhead = "16384"
for dev in host.data.blockdev.ra:
ra = host.data.blockdev.ra[dev]
if ra != expectedReadAhead:
checkFailed(host.hostname,
"on device (%s) blockdev readahead value '%s' does not match expected value '%s'" % (dev, ra, expectedReadAhead))
def testSysctl(host):
if options.verbose:
logger.info("-- test sysctl value")
if host.data.sysctl.errormsg:
checkFailed(host.hostname, "collect sysctl params error: %s" % host.data.sysctl.errormsg)
return
expected_values = gpcheck_config.sysctl_expected
real_values = host.data.sysctl.variables
# gpcheck.conf specify a lowerbound value for these params, actual value can be larger
params_with_lowerbound = set() # sysctl params' value must be exactly the same
for k in expected_values:
if k in params_with_lowerbound:
if int(real_values[k]) < int(expected_values[k]):
checkFailed(host.hostname,
"sysctl value for key '%s' has value '%s', but we expect at least '%s'" % (k, real_values[k], expected_values[k]))
elif real_values[k] != expected_values[k]: # for other params, we expect the actual value to be the same value
checkFailed(host.hostname,
"sysctl value for key '%s' has value '%s' and expects '%s'" % (k, real_values[k], expected_values[k]))
def testLimitsConf(host):
if options.verbose:
logger.info("-- test /etc/security/limits.conf")
if host.data.limitsconf.errormsg:
checkFailed(host.hostname, "collect limits.conf data error: %s" % host.data.limitsconf.errormsg)
return
# both dict has the form: (type, item) => value
expect_data = gpcheck_config.limits_expected
actual_data = dict([((e.type, e.item), e.value) for e in host.data.limitsconf.lines if e.domain in ("gpadmin", "*")])
expect_keyset = set(expect_data.keys())
actual_keyset = set(actual_data.keys())
for key in expect_keyset.intersection(actual_keyset):
expect_val = int(expect_data[key])
actual_val = int(actual_data[key])
if actual_val < expect_val:
checkFailed(host.hostname,
"%s in /etc/security/limits.conf has value %d lower than expected value %d" % (
" ".join(key), actual_val, expect_val))
for key in expect_keyset.difference(actual_keyset):
checkFailed(host.hostname,
"%s not found in /etc/security/limits.conf" % " ".join(key))
def testLinuxMounts(host):
if options.verbose:
logger.info("-- test mount points")
expected_mount_points = gpcheck_config.mount_points
actual_mount_points = set([m.dir for m in host.data.mounts.entries.values()])
if len(expected_mount_points) == 0:
if options.verbose:
logger.info("-- you didn't specify any mount points to be check in %s, ignore this test" % GPCHECK_CONFIG_FILE)
return
if not actual_mount_points.issuperset(expected_mount_points):
for failed_mount in expected_mount_points.difference(actual_mount_points):
checkFailed(host.hostname, "%s is not mounted" % failed_mount)
def testNtp(host):
if options.verbose:
logger.info("-- test NTP")
if host.data.ntp.currenttime < (gpcheck_info.collection_start_time - 1):
checkFailed(host.hostname, "potential NTPD issue. gpcheck start time (%s) time on machine (%s)" % (time.ctime(gpcheck_info.collection_start_time), time.ctime(host.data.ntp.currenttime)))
if host.data.ntp.currenttime > (gpcheck_info.collection_end_time + 1):
checkFailed(host.hostname, "potential NTPD issue. gpcheck end time (%s) time on machine (%s)" % (time.ctime(gpcheck_info.collection_start_time), time.ctime(host.data.ntp.currenttime)))
if not host.data.ntp.running:
checkFailed(host.hostname, "ntpd not detected on machine")
def testGenericLinuxHost(host):
logger.info("test on host: %s" % host.hostname)
if host.is_namenode:
testHAWQGUC(host)
testHAWQconfig(host)
testHDFSConfig(host)
testDiskCapacity(host)
testSysctl(host)
testLimitsConf(host)
testLinuxMounts(host)
testNtp(host)
else:
testHAWQGUC(host)
testHAWQconfig(host)
testDiskCapacity(host)
testHDFSConfig(host)
testIOSchedulers(host)
testSysctl(host)
testLimitsConf(host)
testLinuxMounts(host)
testNtp(host)
def testGenericSolarisHost(host):
testSolarisEtcSystem(host)
testSolarisEtcProject(host)
testSolarisEtcUserAttr(host)
def testUnameConsistency():
logger.info("test uname consistency")
firstUname = None
firstHost = None
for _, host in gpcheck_info.hosts.items():
uname = host.data.uname.output
if firstUname:
if firstUname != uname:
checkFailed(h, "uname -r output different among hosts: %s : %s != %s : %s" % (firstHost, firstUname, host.hostname, uname))
else:
firstUname = uname
firstHost = host.hostname
def testGenericLinuxCluster():
for _, host in gpcheck_info.hosts.items():
testGenericLinuxHost(host)
testUnameConsistency()
def testGenericLinuxClusterBlockDev():
for _, host in gpcheck_info.hosts.items():
if not host.is_namenode:
testBlockdev(host)
def testGenericSolarisCluster():
for _, host in gpcheck_info.hosts.items():
testGenericSolarisHost(host)
testUnameConsistency()
def runTests():
if gpcheck_info.host_type == HostType.GPCHECK_HOSTTYPE_GENERIC_LINUX:
testGenericLinuxCluster()
if gpcheck_info.is_root:
testGenericLinuxClusterBlockDev()
elif gpcheck_info.host_type == HostType.GPCHECK_HOSTTYPE_GENERIC_SOLARIS:
testGenericSolarisCluster()
else:
raise GpCheckError("No tests exist for this platform in gpcheck")
# report checks result
logger.info("GPCHECK Result:")
logger.info("---------------------------------------")
if found_errors:
logger.info("check failed!\tfound %s error(s)" % found_errors)
else:
logger.info("all check succeed!")
logger.info("---------------------------------------")
def readZip():
logger.info("trying to read zip file '%s'..." % options.zipin)
words = options.zipin.split(".tar.gz")
if len(words) != 2:
raise GpCheckError("--zipin file needs to be a .tar.gz file")
fname = words[0]
# untar
cmdStr = "tar xfz %s" % (options.zipin)
if options.verbose:
logger.info(cmdStr)
try:
cmd = Command("tarcmd", cmdStr)
pool.addCommand(cmd)
pool.join()
items = pool.getCompletedItems()
for i in items:
if i.results.rc or i.results.halt or not i.results.completed:
raise Exception("error running command '%s'" % cmdStr)
except Exception, e:
raise GpCheckError("Failed to extract tar file '%s': %s" % (options.zipin, e))
# move extracted file to temp directory
newfname = "%s/%s" % (tmpdir, fname)
cmdStr = "mv %s %s" % (fname, newfname)
if options.verbose:
logger.info(cmdStr)
try:
cmd = Command("mvcmd", cmdStr)
pool.addCommand(cmd)
pool.join()
items = pool.getCompletedItems()
for i in items:
if i.results.rc or i.results.halt or not i.results.completed:
raise Exception("error running command '%s'" % cmdStr)
except Exception, e:
raise GpCheckError("Failed to move file '%s' to temp directory: %s" % (fname, e))
# load pickle file
global gpcheck_info
try:
with open(newfname, "rb") as f:
gpcheck_info = pickle.load(f)
except Exception, e:
raise GpCheckError("Failed to load pickle file '%s': %s" % (newfname, e))
logger.info("trying to read zip file '%s' [success]" % options.zipin)
def doZip(fname):
logger.info("dump gpcheck data into a zip file '%s.tar.gz'..." % fname)
# dump to pickle file
try:
with open(fname, "wb") as f:
pickle.dump(gpcheck_info, f)
except Exception, e:
raise GpCheckError("Failed to dump pickle file '%s':\n%s" % (fname, e))
# make a tar ball
cmdStr = "tar cfz %s.tar.gz %s" % (fname, fname)
if options.verbose:
logger.info(cmdStr)
try:
cmd = Command("tarcmd", cmdStr)
pool.addCommand(cmd)
pool.join()
items = pool.getCompletedItems()
for i in items:
if i.results.rc or i.results.halt or not i.results.completed:
raise Exception("error running command '%s': %s" % (cmdStr, i.results.stderr.strip()))
except Exception, e:
raise GpCheckError("Failed to dump gpcheck data into a zip file:\n%s" % e)
# delete pickle file
cmdStr = "rm -rf %s" % fname
if options.verbose:
logger.info(cmdStr)
try:
cmd = Command("rmcmd", cmdStr)
pool.addCommand(cmd)
pool.join()
items = pool.getCompletedItems()
for i in items:
if i.results.rc or i.results.halt or not i.results.completed:
raise Exception("error running command '%s': %s" % (cmdStr, i.results.stderr.strip()))
except Exception, e:
raise GpCheckError("Failed to delete pickle file '%s':\n%s" % (fname, e))
logger.info("dump gpcheck data into a zip file '%s.tar.gz' [success]" % fname)
def doPrint():
for h in sorted(gpcheck_info.hosts):
print "HOST: %s" % h
print gpcheck_info.hosts[h].data
print "----------------------------------------------------------------------\n"
if gpcheck_info.hawq_collected_ok:
print "HAWQ guc settings:"
for guc_name, guc_val in gpcheck_info.hawq_gucs.items():
print "GUC : %s\nMaster value: %s\nSegment value: %s\n" % (guc_name, guc_val[0], guc_val[1])
def checkHostsFile():
'''
This function will check /etc/hosts file, and if it contains IPv6, it will be commented out
'''
rtn_output = subprocess.check_output("cat /etc/hosts", shell=True)
hostsContent = rtn_output.decode("utf8", "ignore")
newHostsContent = ""
# Trying to find if /etc/hosts contains ::1
for hostName in hostsContent.splitlines():
hostName = hostName.replace('\n', '')
if hostName.find("::") > -1 and hostName.find("#") < 0:
newHostsContent += "# " + hostName + "\n"
else:
newHostsContent += hostName + "\n"
return newHostsContent
def local_run(cmd):
'''Execute each shell command on local machine.'''
for elem in cmd:
result = subprocess.Popen(elem, shell=True).wait()
check_return_code(result)
def checkHardwarePerf():
"""
This function is used to check hardware performance of each node, which include disk IO, memory and network
"""
global options
hawq_home = os.getenv('GPHOME')
if not hawq_home:
print("HAWQ home directory is not defined, please check GPHOME settings.")
sys.exit(1)
source_hawq_env = ". %s/greenplum_path.sh" % hawq_home
cmd = ""
arguments = "-f %s -r dsN -D -S 8589934592 -d /tmp" % (options.file)
cmd = "%s; gpcheckperf %s" % (source_hawq_env, arguments)
rtn_output = subprocess.check_output(cmd, shell=True)
rtnContent = rtn_output.decode("utf8", "ignore")
# Display test result
print(rtn_output)
expected_disk_write_min_bandwidth = gpcheck_config.disk_write_min_bandwidth
expected_disk_read_min_bandwidth = gpcheck_config.disk_read_min_bandwidth
expected_stream_min_bandwidth = gpcheck_config.stream_min_bandwidth
expected_network_min_bandwidth = gpcheck_config.network_min_bandwidth
# Retrieve test result
hostname_real_disk_write_min_bandwidth = ""
real_disk_write_min_bandwidth = 0.0
hostname_real_disk_read_min_bandwidth = ""
real_disk_read_min_bandwidth = 0.0
hostname_real_stream_min_bandwidth = ""
real_stream_min_bandwidth = 0.0
real_network_min_bandwidth = 0.0
rtnContent = rtnContent.splitlines()
for rowItem in rtnContent:
if "disk write min bandwidth" in rowItem:
value_hostnane = rowItem.split(": ")[1]
hostname_real_disk_write_min_bandwidth = value_hostnane.split(" ")[1].strip()
real_disk_write_min_bandwidth = (float)(value_hostnane.split(" ")[0].strip())
elif "disk read min bandwidth" in rowItem:
value_hostnane = rowItem.split(": ")[1]
hostname_real_disk_read_min_bandwidth = value_hostnane.split(" ")[1].strip()
real_disk_read_min_bandwidth = (float)(value_hostnane.split(" ")[0].strip())
elif "stream min bandwidth" in rowItem:
value_hostnane = rowItem.split(": ")[1]
hostname_real_stream_min_bandwidth = value_hostnane.split(" ")[1].strip()
real_stream_min_bandwidth = (float)(value_hostnane.split(" ")[0].strip())
elif "min = " in rowItem:
value_unit = rowItem.split("= ")[1]
real_network_min_bandwidth = (float)(value_unit.split(" ")[0].strip())
elif "[Warning] single host only - abandon netperf test" in rowItem:
real_network_min_bandwidth = -1.0
if real_disk_write_min_bandwidth >= expected_disk_write_min_bandwidth:
logger.info("Pass: Disk write test")
else:
logger.error("Fail: Disk min write bandwidth %.2f (MB/s) is lower than expected value %.2f (MB/s). Node: %s." % (real_disk_write_min_bandwidth, expected_disk_write_min_bandwidth, hostname_real_disk_write_min_bandwidth))
if real_disk_read_min_bandwidth >= expected_disk_read_min_bandwidth:
logger.info("Pass: Disk read test")
else:
logger.error("Fail: Disk min read bandwidth %.2f (MB/s) is lower than expected value %.2f (MB/s). Node: %s." % (real_disk_read_min_bandwidth, expected_disk_read_min_bandwidth, hostname_real_disk_read_min_bandwidth))
if real_stream_min_bandwidth >= expected_stream_min_bandwidth:
logger.info("Pass: Stream test")
else:
logger.error("Fail: Stream min bandwidth %.2f (MB/s) is lower than expected value %.2f (MB/s). Node %s." % (real_stream_min_bandwidth, expected_stream_min_bandwidth, hostname_real_stream_min_bandwidth))
if real_network_min_bandwidth < 0:
logger.info("Warning: Single host only - abandon netperf test")
elif real_network_min_bandwidth >= expected_network_min_bandwidth:
logger.info("Pass: Network test")
else:
logger.error("Fail: Min network bandwidth %.2f (MB/s) is lower than expected value %.2f (MB/s)." % (real_network_min_bandwidth, expected_network_min_bandwidth))
def retrieve_DB_Version():
'''
This function is used to retrieve database version and RPM version.
'''
_sqlUtil = GpsqlUtil()
db_version = _sqlUtil.performQuery("select version();")
rtn_output = subprocess.check_output("sudo rpm -qa | grep hawq", shell=True)
rpm_version = rtn_output.decode("utf8", "ignore")
rpm_version = rpm_version.splitlines()[0].replace("\n", "")
logger.info("Database Version is %s" % db_version[0]["version"])
logger.info("RPM Version is %s" % rpm_version)
def retrieve_DB_process():
'''
This function is used to retrieve database process info.
'''
_sqlUtil = GpsqlUtil()
db_process = _sqlUtil.performQuery("SELECT * FROM gp_segment_configuration;")
if db_process is not None:
logger.info("Database Process Information:")
for listElem in db_process:
if listElem["status"].lower() == "u":
logger.info("Database node: %s is up." % listElem["hostname"])
else:
logger.error("Database node: %s is NOT up." % listElem["hostname"])
else:
logger.error("Fail to retrieve database process information.")
def check_DB_basic_function():
'''
This function will perform create table, insert data into table, select data from table
and drop table operations to test if database is functioning correctly.
'''
_sqlUtil = GpsqlUtil()
table_name = "hawqTest_" + str(calendar.timegm(time.gmtime()))
rowCount = 100000
try:
# Create table
createTable = _sqlUtil.performQuery("CREATE TABLE %s (id INT);" % table_name, 0)
if createTable != "FAIL":
logger.info("Pass: Create table is successful.")
except Exception as e:
logger.error("Fail to create table: %s" % (str(e)))
return None
try:
# Insert data into database
insertData = _sqlUtil.performQuery("INSERT INTO %s SELECT generate_series(1, %s);" % (table_name, str(rowCount)), 0)
rtnData = _sqlUtil.performQuery("SELECT count(*) FROM %s;" % (table_name))
if rtnData[0]["count"] == rowCount:
logger.info("Pass: Insert data into table is successful.")
logger.info("Pass: Select data from table is successful.")
else:
logger.error("Fail to insert data into table.")
except Exception as e:
logger.error("Fail to insert data into table.")
dropTable = _sqlUtil.performQuery("DROP TABLE %s;" % table_name)
return None
try:
# Drop table
dropTable = _sqlUtil.performQuery("DROP TABLE %s;" % table_name, 0)
if dropTable != "FAIL":
logger.info("Pass: Drop table is successful.")
except Exception as e:
logger.error("Fail to drop table %s. Please perform drop table operation manually." % table_name)
return None
def check_GUC_configuration():
'''
This function will perform GUC configuration check:
default_hash_table_bucket_number; hawq_rm_nvseg_perquery_limit;
hawq_rm_nvseg_perquery_perseg_limit;
magma_shm_limit_per_block
'''
expected_default_hash_table_bucket_number = gpcheck_config.default_hash_table_bucket_number
expected_hawq_rm_nvseg_perquery_limit = gpcheck_config.hawq_rm_nvseg_perquery_limit
expected_hawq_rm_nvseg_perquery_perseg_limit = gpcheck_config.hawq_rm_nvseg_perquery_perseg_limit
expected_magma_shm_limit_per_block = gpcheck_config.magma_shm_limit_per_block
# Connect to DB to retrieve GUC config values
_sqlUtil = GpsqlUtil()
actual_default_hash_table_bucket_number = None
actual_hawq_rm_nvseg_perquery_limit = None
actual_hawq_rm_nvseg_perquery_perseg_limit =None
actual_magma_shm_limit_per_block = None
try:
querySQL = "show default_hash_table_bucket_number;"
actual_default_hash_table_bucket_number = _sqlUtil.performQuery(querySQL)
actual_default_hash_table_bucket_number = actual_default_hash_table_bucket_number[0]["default_hash_table_bucket_number"]
if expected_default_hash_table_bucket_number > int(actual_default_hash_table_bucket_number):
logger.error("Fail: Default hash table bucket number %s is less than expected value %s." % (actual_default_hash_table_bucket_number, expected_default_hash_table_bucket_number))
else:
logger.info("Pass: Default hash table bucket number %s is NOT less than expected value %s." % (actual_default_hash_table_bucket_number, expected_default_hash_table_bucket_number))
querySQL = "show hawq_rm_nvseg_perquery_limit;"
actual_hawq_rm_nvseg_perquery_limit = _sqlUtil.performQuery(querySQL)
actual_hawq_rm_nvseg_perquery_limit = actual_hawq_rm_nvseg_perquery_limit[0]["hawq_rm_nvseg_perquery_limit"]
if expected_hawq_rm_nvseg_perquery_limit > int(actual_hawq_rm_nvseg_perquery_limit):
logger.error("Fail: Hawq rm nvseg perquery limit %s is less than expected value %s." % (actual_hawq_rm_nvseg_perquery_limit, expected_hawq_rm_nvseg_perquery_limit))
else:
logger.info("Pass: Hawq rm nvseg perquery limit %s is NOT less than expected value %s." % (actual_hawq_rm_nvseg_perquery_limit, expected_hawq_rm_nvseg_perquery_limit))
querySQL = "show hawq_rm_nvseg_perquery_perseg_limit;"
actual_hawq_rm_nvseg_perquery_perseg_limit = _sqlUtil.performQuery(querySQL)
actual_hawq_rm_nvseg_perquery_perseg_limit = actual_hawq_rm_nvseg_perquery_perseg_limit[0]["hawq_rm_nvseg_perquery_perseg_limit"]
if expected_hawq_rm_nvseg_perquery_perseg_limit > int(actual_hawq_rm_nvseg_perquery_perseg_limit):
logger.error("Fail: Hawq rm nvseg perquery perseg limit %s is less than expected value %s." % (actual_hawq_rm_nvseg_perquery_perseg_limit, expected_hawq_rm_nvseg_perquery_perseg_limit))
else:
logger.info("Pass: Hawq rm nvseg perquery perseg limit %s is NOT less than expected value %s." % (actual_hawq_rm_nvseg_perquery_perseg_limit, expected_hawq_rm_nvseg_perquery_perseg_limit))
querySQL = "show magma_shm_limit_per_block;"
actual_magma_shm_limit_per_block = _sqlUtil.performQuery(querySQL)
actual_magma_shm_limit_per_block = actual_magma_shm_limit_per_block[0]["magma_shm_limit_per_block"].replace("MB", "")
if expected_magma_shm_limit_per_block > int(actual_magma_shm_limit_per_block):
logger.error("Fail: Magma shm limit per block %sMB is less than expected value %sMB." % (actual_magma_shm_limit_per_block, expected_magma_shm_limit_per_block))
else:
logger.info("Pass: Magma shm limit per block %sMB is NOT less than expected value %sMB." % (actual_magma_shm_limit_per_block, expected_magma_shm_limit_per_block))
except Exception as e:
logger.error("Fail to execute %s." % (querySQL))
logger.error("Error Message: %s" % (str(e)))
def checkDiskUsage():
'''
This function is used to check disk total usage data of each node,
and compare total disk space and available ratio with expected value in gpconfig.yml file
'''
global options
rtn_output = subprocess.check_output("cat %s" % (options.file), shell=True)
hostsContent = rtn_output.decode("utf8", "ignore")
expected_disk_space_total = (float)(gpcheck_config.disk_space_total)
expected_disk_space_available = (float)(gpcheck_config.diskusage_usagemax)
# Check disk status of each host
for elem in hostsContent.splitlines():
hostName = elem.replace("\n", "")
hostName = hostName.replace(" ", "")
if hostName != "":
cmd = []
print("---------- Node Name: %s" % (hostName))
cmd.append("ssh %s df -h --total | grep Filesystem; df -h --total | grep total" % (hostName))
local_run(cmd)
rtn_output = subprocess.check_output("ssh %s df -h -P --total -BG | grep total" % (hostName), shell=True)
rtnContent = rtn_output.decode("utf8", "ignore")
rtnVal = ' '.join(rtnContent.splitlines()[0].split())
rtnVal = rtnVal.split(" ")
totalDiskSpace = (float)(rtnVal[1].replace("G", ""))
diskAvailableSpace = (float)(rtnVal[3].replace("G", ""))
diskAvaiableRatio = (diskAvailableSpace / totalDiskSpace) * 100.0
# Compare real value with expected value
if expected_disk_space_total > totalDiskSpace:
logger.error("Fail: Disk total space %.1fGB is less than expected value %.1fGB." % (totalDiskSpace, expected_disk_space_total))
else:
logger.info("Pass: Disk total space %.1fGB is NOT less than expected value %.1fGB." % (totalDiskSpace, expected_disk_space_total))
if expected_disk_space_available > diskAvaiableRatio:
logger.error("Fail: Disk available space ratio %.1f%% is less than expected value %.1f%%." % (diskAvaiableRatio, expected_disk_space_available))
else:
logger.info("Pass: Disk available space ratio %.1f%% is NOT less than expected value %.1f%%." % (diskAvaiableRatio, expected_disk_space_available))
def checkMasterStandbySync():
'''
This function is used to check if master and standby are synchronized
'''
_sqlUtil = GpsqlUtil()
gp_master_mirroring = _sqlUtil.performQuery("select * from gp_master_mirroring;")
gp_master_mirroring_rtn = gp_master_mirroring[0]["summary_state"].lower()
# Check if there is standby node
gp_segment_configuration = _sqlUtil.performQuery("select * from gp_segment_configuration where role = 's';")
standby_flag = False
if len(gp_segment_configuration) > 0:
standby_flag = True
if standby_flag:
if gp_master_mirroring_rtn == "synchronized":
logger.info("Pass: Master and Standby are synchronized.")
elif gp_master_mirroring_rtn == "not configured":
logger.info("Fail: Standby is NOT found.")
else:
logger.error("Fail: Master and Standby are NOT synchronized.")
else:
if gp_master_mirroring_rtn == "synchronized":
logger.info("Fail: Unexpected accident is occurring. Please check gp_segment_configuration and gp_master_mirroring tables.")
elif gp_master_mirroring_rtn == "not configured":
logger.info("Pass: Standby are NOT configured.")
else:
logger.error("Fail: Unexpected accident is occurring. Please check gp_segment_configuration and gp_master_mirroring tables.")
def checkTableAnalyzeStatus():
'''
This function is used to check if all tables are analyzed within three days
'''
_sqlUtil = GpsqlUtil()
analyzeInfo = _sqlUtil.performQuery("SELECT classid, objid, staactionname, stasysid, stausename, stasubtype, statime FROM pg_stat_last_operation pslo, pg_class pc WHERE pc.oid = pslo.objid and (now()-statime)>3;")
if len(analyzeInfo) > 0:
logger.error("Fail: There are %d tables which are NOT analyzed within three days. Please check pg_stat_last_operation and pg_class tables to get details." % (len(analyzeInfo)))
else:
logger.info("Pass: All tables are analyzed within three days.")
def check_kerberos_enablement():
'''
Read /usr/local/hawq/etc/hdfs-client.xml and /usr/local/hawq/etc/yarn-client.xml to check if kerberos is enabled.
'''
# Open hdfs-client.xml and get data
hdfs_client_Msg = subprocess.check_output("cat /usr/local/hawq/etc/hdfs-client.xml", shell=True)
root = xml.etree.ElementTree.fromstring(hdfs_client_Msg)
# Check if kerberos in hdfs-client.xml is enabled
flag = True
for elem in root.findall("property"):
elemValue = elem.find('name').text
if elemValue.lower() == "kerberos":
flag = False
break
if flag:
logger.error("Kerberos in hdfs-client.xml is NOT enabled.")
else:
logger.info("Kerberos in hdfs-client.xml is enabled.")
# Open yarn-client.xml and get data
yarn_client_Msg = subprocess.check_output("cat /usr/local/hawq/etc/yarn-client.xml", shell=True)
root = xml.etree.ElementTree.fromstring(yarn_client_Msg)
# Check if kerberos in yarn-client.xml is enabled
flag = True
for elem in root.findall("property"):
elemValue = elem.find('name').text
if elemValue.lower() == "kerberos":
flag = False
break
if flag:
logger.error("Kerberos in yarn-client.xml is NOT enabled.")
else:
logger.info("Kerberos in yarn-client.xml is enabled.")
def check_active_namenode():
'''
This function will read /usr/local/hawq/etc/hdfs-client.xml to check if first namenode is active namenode
'''
hdfs_client_Msg = subprocess.check_output("cat /usr/local/hawq/etc/hdfs-client.xml", shell=True)
root = xml.etree.ElementTree.fromstring(hdfs_client_Msg)
# Check Active Namenode in hdfs-client.xml
flag = True
firstNamenode = ""
nameservices = ""
for elem in root.findall("property"):
if elem.find("name").text == "dfs.nameservices":
nameservices = elem.find("value").text
break
for elem in root.findall("property"):
if elem.find("name").text == "dfs.ha.namenodes." + nameservices:
firstNamenode = elem.find("value").text.split(",")[0]
flag = False
break
if flag:
logger.error("Warning: Fail to retrieve namenode in hdfs-client.xml. Please check your HFDS configure in hdfs-client.xml.")
return None
try:
# Check if first namenode is active
subprocess.Popen("su - hdfs", shell=True).wait()
command = "hdfs haadmin -getServiceState " + firstNamenode
check_active_namenode_Msg = subprocess.check_output(command, shell=True)
if check_active_namenode_Msg == "active":
logger.info("Pass: First namenode %s is active name node." % (firstNamenode))
elif check_active_namenode_Msg == "standby":
logger.error("Fail: First namenode %s is NOT active name node." % (firstNamenode))
else:
logger.error("Fail: Fail to detect active name node.")
except Exception as e:
logger.error("Fail: HDFS Error: %s" % (str(e)))
finally:
# if current user is not root, switch back to root user
current_user = subprocess.check_output("whoami", shell=True)
if current_user != "root":
subprocess.Popen("exit", shell=True).wait()
def check_hdfs_local_read():
'''
Read /usr/local/hawq/etc/hdfs-client.xml and /etc/hadoop/conf/hdfs-site.xml files and find <name>dfs.domain.socket.path</name>.
If their socket file path are same, and permission is 666, it means hawq user can write them.
'''
# Check <name>dfs.domain.socket.path</name> in hdfs-client.xml
hdfs_client_Msg = subprocess.check_output("cat /usr/local/hawq/etc/hdfs-client.xml", shell=True)
root = xml.etree.ElementTree.fromstring(hdfs_client_Msg)
flag = True
socket_hdfs_client = ""
for elem in root.findall("property"):
elemValue = elem.find('name').text
if elemValue.lower() == "dfs.domain.socket.path":
socket_hdfs_client = elem.find("value").text
flag = False
break
if flag:
logger.error("Fail: Can Not find dfs.domain.socket.path in hdfs-client.xml.")
return None
# Check <name>dfs.domain.socket.path</name> in hdfs-site.xml
hdfs_client_Msg = subprocess.check_output("cat /etc/hadoop/conf/hdfs-site.xml", shell=True)
root = xml.etree.ElementTree.fromstring(hdfs_client_Msg)
flag = True
socket_hdfs_site = ""
for elem in root.findall("property"):
elemValue = elem.find('name').text
if elemValue.lower() == "dfs.domain.socket.path":
socket_hdfs_site = elem.find("value").text
flag = False
break
if flag:
logger.error("Fail: Can Not find dfs.domain.socket.path in hdfs-site.xml.")
return None
# Check file path
if socket_hdfs_client == socket_hdfs_site:
try:
file_permission_Msg = subprocess.check_output("ls -l %s" % (socket_hdfs_site), shell=True)
file_permission1 = file_permission_Msg[1:4]
file_permission2 = file_permission_Msg[4:7]
file_permission3 = file_permission_Msg[7:10]
if file_permission1.find("-") < 0 or file_permission1.find("-") > 1:
if file_permission2.find("-") < 0 or file_permission2.find("-") > 1:
if file_permission3.find("-") < 0 or file_permission3.find("-") > 1:
logger.info("Pass: HDFS local read is configured correctly.")
else:
logger.error("Fail: HDFS local read is NOT configured correctly.")
else:
logger.error("Fail: HDFS local read is NOT configured correctly.")
else:
logger.error("Fail: HDFS local read is NOT configured correctly.")
except Exception as e:
logger.error("Fail: HDFS local read is NOT configured correctly. \n Error Message is: %s" % (e))
else:
logger.error("Fail: socket file path in %s and %s are different." % (socket_hdfs_client, socket_hdfs_site))
def check_return_code(result, logger = None, error_msg = None, info_msg = None, exit_true = False):
'''Check shell command exit code.'''
if result != 0:
if error_msg and logger:
logger.error(error_msg)
else:
if info_msg and logger:
logger.info(info_msg)
if exit_true:
sys.exit(0)
return result
def overwrite_etc_hosts(filecontent = "", hostfile = ""):
'''
This function is used to overwrite /etc/hosts file and scp to all nodes in cluster
'''
cmd =[]
hawq_home = os.getenv('GPHOME')
if not hawq_home:
print("HAWQ home directory is not defined, please check GPHOME settings.")
sys.exit(1)
source_hawq_env = ". %s/greenplum_path.sh" % hawq_home
cmd.append(source_hawq_env)
argument = "echo \'" + filecontent + "\' > /etc/hosts"
cmd.append(argument)
argument = "gpscp -f %s /etc/hosts =:/etc/hosts" % (hostfile)
cmd.append(argument)
local_run(cmd)
def enable_coredump(hostfile = ""):
'''
This function is used to enable core dump on each node
'''
cmd =[]
hawq_home = os.getenv('GPHOME')
if not hawq_home:
print("HAWQ home directory is not defined, please check GPHOME settings.")
sys.exit(1)
source_hawq_env = ". %s/greenplum_path.sh" % hawq_home
cmd.append(source_hawq_env)
argument = "gpssh -f %s ulimit -c unlimited" % (hostfile)
cmd.append(argument)
local_run(cmd)
def install_ntp(hostfile = ""):
'''
This function is used to install and enable ntp on each node
'''
cmd =[]
hawq_home = os.getenv('GPHOME')
if not hawq_home:
print("HAWQ home directory is not defined, please check GPHOME settings.")
sys.exit(1)
source_hawq_env = ". %s/greenplum_path.sh" % hawq_home
cmd.append(source_hawq_env)
argument = "gpssh -f %s -e 'yum install -y ntp'" % (hostfile)
cmd.append(argument)
argument = "gpssh -f %s -e 'systemctl enable ntpd'" % (hostfile)
cmd.append(argument)
argument = "gpssh -f %s -e 'systemctl start ntpd'" % (hostfile)
cmd.append(argument)
local_run(cmd)
def disable_firewall(hostfile = ""):
'''
This function is used to disable firewall on each node
'''
cmd =[]
hawq_home = os.getenv('GPHOME')
if not hawq_home:
print("HAWQ home directory is not defined, please check GPHOME settings.")
sys.exit(1)
source_hawq_env = ". %s/greenplum_path.sh" % hawq_home
cmd.append(source_hawq_env)
argument = "gpssh -f %s -e 'systemctl stop iptables'" % (hostfile)
cmd.append(argument)
argument = "gpssh -f %s -e 'systemctl disable iptables'" % (hostfile)
cmd.append(argument)
argument = "gpssh -f %s -e 'systemctl stop firewalld'" % (hostfile)
cmd.append(argument)
argument = "gpssh -f %s -e 'systemctl disable firewalld'" % (hostfile)
cmd.append(argument)
local_run(cmd)
def disable_selinux(hostfile = ""):
'''
This function is used to disable selinux on each node
'''
cmd =[]
hawq_home = os.getenv('GPHOME')
if not hawq_home:
print("HAWQ home directory is not defined, please check GPHOME settings.")
sys.exit(1)
source_hawq_env = ". %s/greenplum_path.sh" % hawq_home
cmd.append(source_hawq_env)
argument = "gpssh -f %s -e 'sed -i \"s/^SELINUX\=enforcing/SELINUX\=disabled/g\" /etc/selinux/config'" % (hostfile)
cmd.append(argument)
argument = "gpssh -f %s -e 'setenforce 0'" % (hostfile)
cmd.append(argument)
local_run(cmd)
function_dispatcher = {} # Include all hawq check --postSetup --postUpgrade and --weeklyExamine functions
function_dispatcher.update({"overwrite_etc_hosts" : overwrite_etc_hosts})
function_dispatcher.update({"enable_coredump" : enable_coredump})
function_dispatcher.update({"install_ntp" : install_ntp})
function_dispatcher.update({"disable_firewall" : disable_firewall})
function_dispatcher.update({"disable_selinux" : disable_selinux})
function_dispatcher.update({"checkHardwarePerf" : checkHardwarePerf})
function_dispatcher.update({"Check config in /etc/sysctl.conf && /etc/security/limits.conf" : runTests})
function_dispatcher.update({"retrieve_DB_Version" : retrieve_DB_Version})
function_dispatcher.update({"retrieve_DB_process" : retrieve_DB_process})
function_dispatcher.update({"check_DB_basic_function" : check_DB_basic_function})
function_dispatcher.update({"check_GUC_configuration" : check_GUC_configuration})
function_dispatcher.update({"check_kerberos_enablement" : check_kerberos_enablement})
function_dispatcher.update({"check_active_namenode" : check_active_namenode})
function_dispatcher.update({"check_hdfs_local_read" : check_hdfs_local_read})
function_dispatcher.update({"checkDiskUsage" : checkDiskUsage})
function_dispatcher.update({"checkMasterStandbySync" : checkMasterStandbySync})
function_dispatcher.update({"checkTableAnalyzeStatus" : checkTableAnalyzeStatus})
if __name__ == '__main__':
if gpcheck_info.is_root:
logger.info("gpcheck will perform block device's readahead checks when run as root")
try:
try:
checkPlatform()
parseargs()
readConfigFile()
except GpCheckError, e:
logger.error(str(e))
sys.exit(1)
if pool:
pool.join()
pool.haltWork()
pool.joinWorkers()
try:
tmpdir = tempfile.mkdtemp(prefix='gpcheck')
except Exception, e:
logger.error("Error creating tmp dir on master: %s" % e)
sys.exit(1)
try:
# Phase 1: collect input
if options.zipin:
readZip() # load information into gpcheck_info from zip
else:
# read host info into gpcheck_info.hosts from --file or --host
createHostList()
# collect each server's system environment configuration
runCollections()
# read collected data into gpcheck_info
readDataFiles()
# read HAWQ configuration
readHAWQConfiguration()
# Phase 2: generate output
if options.postSetup:
cmd = []
counter = 1
logger.info("-------------- Execute post-setup check --------------\n")
# Step 1: Comment out ::1 localhost in /etc/hosts
logger.info("-------------- Step %d: Comment out ::1 localhost in /etc/hosts" % counter)
file_content = checkHostsFile()
function_dispatcher["overwrite_etc_hosts"](file_content, options.file)
counter += 1
print("")
# Step 2: Check hardware performance
logger.info("-------------- Step %d: Check hardware performance (This step will take a little bit more time to perform)" % counter)
function_dispatcher["checkHardwarePerf"]()
counter += 1
print("")
# Step 3: Enable core dump
logger.info("-------------- Step %d: Enable core dump" % counter)
function_dispatcher["enable_coredump"](options.file)
counter += 1
print("")
# Step 4: Install NTP
logger.info("-------------- Step %d: Install NTP" % counter)
function_dispatcher["install_ntp"](options.file)
counter += 1
print("")
# Step 5: Disable Firewall
logger.info("-------------- Step %d: Disable Firewall" % counter)
function_dispatcher["disable_firewall"](options.file)
counter += 1
print("")
# Step 6: Disable Selinux
logger.info("-------------- Step %d: Disable Selinux" % counter)
function_dispatcher["disable_selinux"](options.file)
counter += 1
print("")
# Step 7: Check configuration in /etc/sysctl.conf and /etc/security/limits.conf
logger.info("-------------- Step %d: Check configuration in /etc/sysctl.conf and /etc/security/limits.conf" % counter)
function_dispatcher["Check config in /etc/sysctl.conf && /etc/security/limits.conf"]()
counter += 1
print("")
# Step 8: Retrieve Database Version
logger.info("-------------- Step %d: Retrieve Database Version" % counter)
function_dispatcher["retrieve_DB_Version"]()
counter += 1
print("")
# Step 9: Retrieve Database Process
logger.info("-------------- Step %d: Retrieve Database Process" % counter)
function_dispatcher["retrieve_DB_process"]()
counter += 1
print("")
# Step 10: Check Database Basic Functions
logger.info("-------------- Step %d: Check Database Basic Functions" % counter)
function_dispatcher["check_DB_basic_function"]()
counter += 1
print("")
# Step 11: Check GUC Configuration
logger.info("-------------- Step %d: Check GUC Configuration" % counter)
function_dispatcher["check_GUC_configuration"]()
counter += 1
print("")
# Step 12: Check Kerberos Enablement
logger.info("-------------- Step %d: Check Kerberos Enablement" % counter)
function_dispatcher["check_kerberos_enablement"]()
counter += 1
print("")
# Step 13: Check Active Namenode
logger.info("-------------- Step %d: Check Active Namenode" % counter)
function_dispatcher["check_active_namenode"]()
counter += 1
print("")
# Step 14: Check HDFS Local Read
logger.info("-------------- Step %d: Check HDFS Local Read" % counter)
function_dispatcher["check_hdfs_local_read"]()
counter += 1
print("")
elif options.postUpgrade:
counter = 1
logger.info("-------------- Execute post-upgrade check --------------\n")
# Step 1: Retrieve Database Process
logger.info("-------------- Step %d: Retrieve Database Process" % counter)
function_dispatcher["retrieve_DB_process"]()
counter += 1
print("")
# Step 2: Retrieve Database Version
logger.info("-------------- Step %d: Retrieve Database Version" % counter)
function_dispatcher["retrieve_DB_Version"]()
counter += 1
print("")
# Step 10: Check Database Basic Functions
logger.info("-------------- Step %d: Check Database Basic Functions" % counter)
function_dispatcher["check_DB_basic_function"]()
counter += 1
print("")
elif options.weeklyExamine:
cmd = []
counter = 1
logger.info("-------------- Execute weekly-examine check --------------\n")
# Step 1: Check Disk Usage
logger.info("-------------- Step %d: Check Disk Usage" % counter)
function_dispatcher["checkDiskUsage"]()
counter += 1
print("")
# Step 2: Check if Master and Standby are synchronized
logger.info("-------------- Step %d: Check if Master and Standby are synchronized" % counter)
function_dispatcher["checkMasterStandbySync"]()
counter += 1
print("")
# Step 3: Check if the specified table is analyzed within three days
logger.info("-------------- Step %d: Check if the specified table is analyzed within three days" % counter)
function_dispatcher["checkTableAnalyzeStatus"]()
counter += 1
print("")
elif options.stdout:
doPrint()
elif options.zipout:
doZip("./gpcheck_%s" % time.time())
else:
runTests()
if found_errors:
sys.exit(1)
except GpCheckError, e:
logger.error(str(e))
sys.exit(1)
finally:
logger.info("Clean up...")
try:
if tmpdir:
shutil.rmtree(tmpdir)
except Exception, e:
logger.error("error removing tempdir during job cleanup: %s" % e)
finally:
if pool:
pool.join()
pool.haltWork()
pool.joinWorkers()