blob: d00057d324424e49a49a14c82c5d97f6be79201b [file] [log] [blame]
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os, sys, re, tempfile, shutil, pickle, getpass, subprocess, time, glob, ConfigParser
from xml.dom import minidom
try:
from optparse import Option, OptionParser
from gppylib.gpparseopts import OptParser, OptChecker
from gppylib.gplog import get_default_logger, setup_tool_logging
from gppylib.commands.unix import getLocalHostname, getUserName, SYSTEM
from gppylib.commands.base import WorkerPool, Command, REMOTE
from gppylib.gpcheckutil import HostType, hosttype_str
from hawqpylib.hawqlib import remote_ssh_output
from pygresql.pgdb import DatabaseError
from pygresql import pg
import stat
except ImportError, e:
sys.exit('Cannot import modules. Please check that you have sourced greenplum_path.sh. Detail: ' + str(e))
class GpCheckError(Exception):
pass
class GpCheckInfo:
def __init__(self):
self.is_root = (os.geteuid() == 0)
self.host_type = HostType.GPCHECK_HOSTTYPE_UNDEFINED
self.appliance_version = None
# record gpcheck_hostdump data for each host
self.hosts = dict() # hostname => GpCheckHost obj
# record HAWQ configuration
self.hawq_gucs = dict() # guc name => (master_value, segment_value)
self.hawq_segment_configuration = None
self.hawq_collected_ok = False # if successfully collect HAWQ gucs
self.collection_start_time = 0 # used in NTPD testing
self.collection_end_time = 0 # used in NTPD testing
class GpCheckHost:
def __init__(self, name, is_namenode=False):
self.hostname = name
self.datafile = None # pickle file on each host
self.data = None # `gpcheck_hostdump` collected data for each host
self.is_namenode = is_namenode
def __str__(self):
s = "%s datafile(%s)" % (self.hostname, self.datafile)
if self.is_namenode:
s += " namenode"
return s
class GpCheckConfig:
def __init__(self):
self.parser = ConfigParser.RawConfigParser()
self.gpcheck_config_version = 0
self.mount_points = set()
self.sysctl_expected = dict()
self.limits_expected = { # default value for limits.conf
("soft", "nofile"): 2900000,
("hard", "nofile"): 2900000,
("soft", "nproc") : 131072,
("hard", "nproc") : 131072 }
self.diskusage_mounts = []
self.diskusage_usagemax = 90 # max disk usage percentage
self.hdfs_expected = { # default value for HDFS configuration
"dfs.mem.namenode.heap": 8192,
"dfs.mem.datanode.heap": 8192 }
self.hdfs_non_expected = {}
self.hdfs_ha_expected = {}
self.hdfs_kerberos_expected = {}
self.hdfs_ha_kerberos_expected = {}
self.yarn_expected = {}
self.yarn_non_expected = {}
self.yarn_ha_expected = {}
self.yarn_kerberos_expected = {}
self.yarn_ha_kerberos_expected = {}
self.hawq_expected = {}
self.hawq_kerberos_expected = {}
self.hawq_yarn_expected = {}
def readConfigFile(self, config_file):
parsed_list = self.parser.read(config_file)
if len(parsed_list) != 1:
raise GpCheckError("cannot open file!")
if not self.parser.has_section("linux.sysctl"):
raise GpCheckError("require section 'linux.sysctl'")
section = "global"
if self.parser.has_option(section, "configfile_version"):
self.gpcheck_config_version = self.parser.getint(section, "configfile_version")
section = "linux.mount"
if self.parser.has_option(section, "mount.points"):
for p in self.parser.get(section, "mount.points").split(","):
self.mount_points.add(p.strip())
section = 'linux.sysctl'
for opt in self.parser.options(section):
if re.match('sysctl\.', opt):
fields = opt.split('sysctl.')
if len(fields) != 2:
raise GpCheckError("Bad config line entry '%s'" % opt)
self.sysctl_expected[fields[1]] = self.parser.get(section, opt)
section = "linux.limits"
for opt in self.parser.options(section):
key = tuple(opt.split("."))
self.limits_expected[key] = self.parser.getint(section, opt)
section = "linux.diskusage"
if self.parser.has_option(section, "diskusage.monitor.mounts"):
self.diskusage_mounts = [m.strip() for m in self.parser.get(section, "diskusage.monitor.mounts").split(",")]
if self.parser.has_option(section, "diskusage.monitor.usagemax"):
self.diskusage_usagemax = self.parser.get(section, "diskusage.monitor.usagemax")
try:
if self.diskusage_usagemax[-1] == "%":
self.diskusage_usagemax = int(self.diskusage_usagemax[:-1])
else:
self.diskusage_usagemax = int(self.diskusage_usagemax)
except Exception, e:
raise GpCheckError("Bad config entry value '%s' for 'diskusage.monitor.usagemax': %s" %
(self.diskusage_usagemax, e))
if not self.parser.has_section('hdfs.base'):
if not self.parser.has_section("hdfs"):
raise GpCheckError("require section 'hdfs'")
section = 'hdfs'
for opt in self.parser.options(section):
self.hdfs_expected[opt] = self.parser.get(section, opt)
try:
self.hdfs_expected["dfs.mem.namenode.heap"] = int(self.hdfs_expected["dfs.mem.namenode.heap"])
self.hdfs_expected["dfs.mem.datanode.heap"] = int(self.hdfs_expected["dfs.mem.datanode.heap"])
except ValueError, e:
raise GpCheckError("'dfs.mem.namenode.heap' or 'dfs.mem.namenode.heap' should be a number: %s" % e)
else:
section = 'hdfs.base'
for opt in self.parser.options(section):
self.hdfs_expected[opt] = self.parser.get(section, opt)
try:
self.hdfs_expected["dfs.mem.namenode.heap"] = int(self.hdfs_expected["dfs.mem.namenode.heap"])
self.hdfs_expected["dfs.mem.datanode.heap"] = int(self.hdfs_expected["dfs.mem.datanode.heap"])
except ValueError, e:
raise GpCheckError("'dfs.mem.namenode.heap' or 'dfs.mem.namenode.heap' should be a number: %s" % e)
section = 'hdfs.non'
for opt in self.parser.options(section):
self.hdfs_non_expected[opt] = self.parser.get(section, opt)
section = 'hdfs.ha'
for opt in self.parser.options(section):
self.hdfs_ha_expected[opt] = self.parser.get(section, opt)
section = 'hdfs.kerberos'
for opt in self.parser.options(section):
self.hdfs_kerberos_expected[opt] = self.parser.get(section, opt)
section = 'hdfs.ha.kerberos'
for opt in self.parser.options(section):
self.hdfs_ha_kerberos_expected[opt] = self.parser.get(section, opt)
section = 'yarn.base'
for opt in self.parser.options(section):
self.yarn_expected[opt] = self.parser.get(section, opt)
section = 'yarn.non'
for opt in self.parser.options(section):
self.yarn_non_expected[opt] = self.parser.get(section, opt)
section = 'yarn.ha'
for opt in self.parser.options(section):
self.yarn_ha_expected[opt] = self.parser.get(section, opt)
section = 'yarn.kerberos'
for opt in self.parser.options(section):
self.yarn_kerberos_expected[opt] = self.parser.get(section, opt)
section = 'yarn.ha.kerberos'
for opt in self.parser.options(section):
self.yarn_ha_kerberos_expected[opt] = self.parser.get(section, opt)
section = 'hawq.base'
for opt in self.parser.options(section):
self.hawq_expected[opt] = self.parser.get(section, opt)
section = 'hawq.kerberos'
for opt in self.parser.options(section):
self.hawq_kerberos_expected[opt] = self.parser.get(section, opt)
section = 'hawq.yarn'
for opt in self.parser.options(section):
self.hawq_yarn_expected[opt] = self.parser.get(section, opt)
###### Global Variables #############
logger = get_default_logger()
EXECNAME = os.path.split(__file__)[-1]
setup_tool_logging(EXECNAME,getLocalHostname(),getUserName())
options = None
GPHOME = None
GPCHECK_CONFIG_FILE = None
HADOOP_HOME = None
gpcheck_info = GpCheckInfo()
gpcheck_config = GpCheckConfig()
pool = WorkerPool()
tmpdir = None
found_errors = 0
HAWQ_GUC_MEMORY = "hawq_re_memory_overcommit_max"
def checkPlatform():
host_type_map = { "linux": HostType.GPCHECK_HOSTTYPE_GENERIC_LINUX,
"sunos": HostType.GPCHECK_HOSTTYPE_GENERIC_SOLARIS }
try:
gpcheck_info.host_type = host_type_map[SYSTEM.getName()]
logger.info("Detected platform: %s" % hosttype_str(gpcheck_info.host_type))
except KeyError:
raise GpCheckError("No tests exists for this platform in gpcheck")
def parse_host_list_file(host_file):
host_list = list()
with open(host_file) as f:
hosts = f.readlines()
for host in hosts:
host = host.split("#",1)[0].strip()
if host:
host_list.append(host)
return host_list
def parseargs():
global options, GPHOME, HADOOP_HOME, GPCHECK_CONFIG_FILE
parser = OptParser(option_class=OptChecker, version='%prog version $Revision: #1 $')
parser.remove_option('-h')
parser.add_option('-?', '--help', action='help')
parser.add_option('--verbose', action='store_true')
parser.add_option('--stdout', action='store_true')
parser.add_option('--zipout', action='store_true')
parser.add_option('--zipin', type='string')
parser.add_option('--gphome', type='string')
# for HDFS xml and memory check
parser.add_option('--hadoop', '--hadoop-home', type='string')
parser.add_option('--hdfs', action='store_true')
parser.add_option('--hdfs-ha', dest="hdfs_ha", action='store_true')
parser.add_option('--yarn', action='store_true')
parser.add_option('--yarn-ha', dest="yarn_ha", action='store_true')
parser.add_option('--kerberos', action='store_true')
parser.add_option('-c', '--config', type='string') # optional: gpcheck config file path
parser.add_option('-f', '--file', type='string') # host file, for testing a list of hosts
parser.add_option('-h', '--host', type='string') # test a single host
(options, args) = parser.parse_args()
if len(args) > 0:
if args[0] == 'help':
parser.print_help(sys.stderr)
sys.exit(0)
# GPHOME must be found
GPHOME = options.gphome if options.gphome else os.environ.get("GPHOME")
if not GPHOME:
raise GpCheckError("GPHOME not set, must be specified in --gphome")
GPCHECK_CONFIG_FILE = options.config if options.config else "%s/etc/gpcheck.cnf" % GPHOME
logger.info("Checks uses config file: %s", GPCHECK_CONFIG_FILE)
HADOOP_HOME = options.hadoop if options.hadoop else os.environ.get("HADOOP_HOME")
if not HADOOP_HOME:
checkFailed(None, "utility will SKIP HDFS configuration check because HADOOP_HOME is not specified in environment variable or --hadoop")
if options.yarn and not HADOOP_HOME:
options.yarn = False
checkFailed(None, "utility will SKIP YARN configuration check because HADOOP_HOME is not specified in environment variable or --hadoop")
# params check
if not options.file and not options.host and not options.zipin:
raise GpCheckError(" --file or --host or --zipin must be specified")
if options.file and options.host:
raise GpCheckError(" You can specify either --file or --host, but not both")
if options.stdout and options.zipout:
raise GpCheckError(" You can specify either --stdout or --zipout, but not both")
def readConfigFile():
try:
gpcheck_config.readConfigFile(GPCHECK_CONFIG_FILE)
except Exception, e:
raise GpCheckError("Field to read gpcheck config file '%s':\n%s" % (GPCHECK_CONFIG_FILE, e))
def checkFailed(host, msg):
global found_errors
found_errors += 1
if host:
logger.error("host(%s): %s", host, msg)
else:
logger.error(msg)
def getHDFSNamenodeHost():
core_site_file = os.path.join(HADOOP_HOME, "etc/hadoop/core-site.xml")
hdfs_site_file = os.path.join(HADOOP_HOME, "etc/hadoop/hdfs-site.xml")
logger.info("try to detect namenode from %s" % core_site_file)
# for processing property xml
getPropName = lambda node: node.getElementsByTagName('name')[0].childNodes[0].data
getPropValue = lambda node: node.getElementsByTagName('value')[0].childNodes[0].data
# read namenode address from core-site.xml
with open(core_site_file) as f:
xmldoc = minidom.parse(f)
namenode_addr = ''
for node in xmldoc.getElementsByTagName('property'):
if getPropName(node) == 'fs.default.name' or getPropName(node) == 'fs.defaultFS':
fsurl = getPropValue(node).strip()
namenode_list_alias = re.search(r"//([^:/]*)", fsurl).group(1)
if_ha_disabled = re.search(".*:[0-9]+$", fsurl)
if if_ha_disabled:
namenode_addr = namenode_list_alias
else:
namenode_addr = ''
break
# run hostname command on remote to get actual hostname
if namenode_addr == '':
ha_namenode_list = ''
default_namenode_alias = ''
with open(hdfs_site_file) as f:
xmldoc = minidom.parse(f)
for node in xmldoc.getElementsByTagName('property'):
if re.search('dfs.ha.namenodes.*', getPropName(node).strip()):
ha_namenode_list = getPropValue(node).strip()
default_namenode_alias = ha_namenode_list.split(',')[0].strip()
break
if ha_namenode_list == '':
logger.error("cannot detect namenode from %s" % core_site_file)
raise GpCheckError("cannot detect namenode from %s" % core_site_file)
#sys.exit(1)
else:
with open(hdfs_site_file) as f:
xmldoc = minidom.parse(f)
for node in xmldoc.getElementsByTagName('property'):
namenode_rpc_address = "dfs.namenode.rpc-address.%s.%s" % (namenode_list_alias,
default_namenode_alias)
if getPropName(node) == namenode_rpc_address:
default_namenode_rpc_address = getPropValue(node).strip()
namenode_addr = default_namenode_rpc_address.split(':')[0].strip()
if namenode_addr == '':
raise GpCheckError("cannot detect namenode from %s" % core_site_file)
else:
cmd = Command(namenode_addr, "hostname", REMOTE, namenode_addr)
pool.addCommand(cmd)
pool.join()
items = pool.getCompletedItems()
for i in items:
if i.results.rc or i.results.halt or not i.results.completed:
raise Exception("error running 'hostname' command: %s" % i.results.stderr.strip())
namenode_host = i.results.stdout.strip()
logger.info("detect namenode hostname to be %s" % namenode_host)
return namenode_host
def createHostList():
if options.verbose:
logger.info("trying to deduplicate hosts...")
hostlist = []
# read the host file if present
if options.file:
try:
with open(options.file, "r") as f:
hostlist = [line.strip() for line in f.readlines() if line.strip()]
except IOError, e:
raise GpCheckError("error reading host file '%s': %s" % (options.file, str(e)))
else:
hostlist.append(options.host)
# get actual hostname and deduplicate
try:
for hostname in hostlist:
cmd = Command(hostname, "hostname", REMOTE, hostname)
pool.addCommand(cmd)
pool.join()
items = pool.getCompletedItems()
for i in items:
if i.results.rc or i.results.halt or not i.results.completed:
raise Exception("error running 'hostname' on host '%s': %s" % (i.remoteHost, i.results.stderr.strip()))
actualHostname = i.results.stdout.strip()
if actualHostname not in gpcheck_info.hosts:
gpcheck_info.hosts[actualHostname] = GpCheckHost(actualHostname)
except Exception, e:
raise GpCheckError("failed to collect 'hostname' on servers: %s" % str(e))
if options.verbose:
logger.info("trying to deduplicate hosts [success]")
if HADOOP_HOME:
try:
namenode_host = getHDFSNamenodeHost()
if namenode_host in hostlist:
gpcheck_info.hosts[namenode_host] = GpCheckHost(namenode_host, is_namenode=True)
else:
logger.warning("utility will skip HDFS namenode check since it's not in current host list.")
except Exception, e:
checkFailed(None, "utility will SKIP HDFS namenode check: %s" % str(e))
def runCollections():
logger.info("trying to collect server configuration...")
# run gpcheck_hostdump on each server
runCollectionOnServers()
# copy hostdump file to master
copyFilesLocally()
# delete hostdump file on remote servers
deleteRemoteFiles()
logger.info("trying to collect server configuration [success]")
def runCollectionOnServers():
gpcheck_info.collection_start_time = time.time()
def getDumpCommand():
if gpcheck_info.host_type == HostType.GPCHECK_HOSTTYPE_GENERIC_LINUX:
host_type_cl = "--linux"
elif gpcheck_info.host_type == HostType.GPCHECK_HOSTTYPE_GENERIC_SOLARIS:
host_type_cl = "--solaris"
else:
raise GpCheckError("unsupported host type")
cmd = "%s/sbin/gpcheck_hostdump --hawq %s" % (GPHOME, host_type_cl)
cmd += " --sysctl %s" % ",".join(gpcheck_config.sysctl_expected.keys())
if HADOOP_HOME:
cmd += " --hadoop %s" % HADOOP_HOME
if options.yarn or options.yarn_ha:
cmd += " --yarn"
return cmd
try:
cmdStr = getDumpCommand()
for host in gpcheck_info.hosts:
if options.verbose:
logger.info("collect data on host: %s" % host)
cmd = Command(host, cmdStr, REMOTE, host)
pool.addCommand(cmd)
pool.join()
items = pool.getCompletedItems()
for i in items:
if i.results.rc or i.results.halt or not i.results.completed:
raise Exception("error running gpcheck_hostdump on '%s': %s" % (i.remoteHost, i.results.stderr.strip()))
gpcheck_info.hosts[i.remoteHost].datafile = i.results.stdout.strip()
except Exception, e:
raise GpCheckError("Failed to collect data from servers:\n%s" % e)
gpcheck_info.collection_end_time = time.time()
def copyFilesLocally():
if options.verbose:
logger.info("copy hostdump files from remote servers to master")
try:
for host in gpcheck_info.hosts:
cmdStr = "scp %s:%s %s/%s.data" % (host, gpcheck_info.hosts[host].datafile, tmpdir, host)
if options.verbose:
logger.info(cmdStr)
cmd = Command(host, cmdStr)
pool.addCommand(cmd)
pool.join()
items = pool.getCompletedItems()
for i in items:
if i.results.rc or i.results.halt or not i.results.completed:
raise Exception("error running command %s: %s" % (i.cmdStr, i.results.stderr.strip()))
except Exception, e:
raise GpCheckError("Failed to scp remote hostdump file to master:\n%s" % e)
def deleteRemoteFiles():
if options.verbose:
logger.info("delete hostdump files on remote servers")
try:
for host in gpcheck_info.hosts:
cmdStr = "rm -f %s" % gpcheck_info.hosts[host].datafile
if options.verbose:
logger.info(cmdStr)
cmd = Command(host, cmdStr, REMOTE, host)
pool.addCommand(cmd)
pool.join()
items = pool.getCompletedItems()
for i in items:
if i.results.rc or i.results.halt or not i.results.completed:
raise Exception("error running command %s: %s" % (i.cmdStr, i.results.stderr.strip()))
except Exception, e:
raise GpCheckError("Failed to delete remote hostdump file:\n%s" % e)
def readDataFiles():
for host in gpcheck_info.hosts:
fname = "%s/%s.data" % (tmpdir, host)
try:
with open(fname, "rb") as f:
gpcheck_info.hosts[host].data = pickle.load(f)
except Exception, e:
raise GpCheckError("Failed to load pickle file '%s': %s" % (fname, e))
def readHAWQConfiguration():
if options.verbose:
logger.info("trying to collect HAWQ configuration...")
dbname = os.environ.get('PGDATABASE', 'template1')
try:
db = pg.connect(dbname=dbname)
except pg.InternalError, ex:
checkFailed(None, "utility cannot perform HAWQ CPU and Memory check because failed to connect to HAWQ")
return
# read segment configurations
gpcheck_info.hawq_segment_configuration = db.query("select * from gp_segment_configuration").dictresult()
db.close()
# read Memory GUC using hawqconfig
command = "hawqconfig -s %s" % HAWQ_GUC_MEMORY
p = subprocess.Popen(command, shell = True,
stdout = subprocess.PIPE, stderr = subprocess.PIPE)
result = p.communicate()
match_master = re.search(r'Value : (\d+)', result[0])
if match_master:
gpcheck_info.hawq_gucs[HAWQ_GUC_MEMORY] = (int(match_master.group(1)))
else:
checkFailed(None, "utility cannot perform HAWQ Memory check because failed to get GUC value using '%s'" % command)
return
gpcheck_info.hawq_collected_ok = True
if options.verbose:
logger.info("trying to collect HAWQ configuration [success]")
def testConnectEmc(host):
if not host.is_a_master:
return
expected = "Running"
if host.data.connectemc.output != expected:
checkFailed(host.hostname, "Connect EMC is not running on master (try /etc/init.d/connectemc status)")
def testSolarisEtcSystem(host):
requiredValues = { 'rlim_fd_cur' : '65536',
'zfs:zfs_arc_max' : '0x600000000',
'pcplusmp:apic_panic_on_nmi' : '1',
'nopanicdebug' : '1' }
results = dict()
for k in requiredValues.keys():
results[k] = 0
for key in host.data.etc_system.parameters.keys():
if key not in requiredValues:
continue
foundValue = host.data.etc_system.parameters[key]
if foundValue == requiredValues[key]:
results[key] = 1
for k in results.keys():
if results[k]:
continue
checkFailed(host.hostname, "/etc/system is missing expected line 'set %s=%s'" % (k, requiredValues[k]))
def testSolarisEtcProject(host):
requiredValues = { 'default:3::::project.max-sem-ids=(priv,1024,deny);process.max-file-descriptor=(priv,252144,deny)' : 0 }
unexpectedValues = set(['default:3::::'])
for line in host.data.etc_project.lines:
if line in unexpectedValues:
checkFailed(host.hostname, "unexpected line in /etc/project: '%s'" % line)
continue
if line in requiredValues:
requiredValues[line] = 1
for line in requiredValues.keys():
if requiredValues[line]:
continue
checkFailed(host.hostname, "/etc/project is missing expected line '%s'" % line)
def testSolarisEtcUserAttr(host):
requiredValues = { 'gpadmin::::defaultpriv=basic,dtrace_user,dtrace_proc' : 0 }
for line in host.data.etc_user_attr.lines:
if line in requiredValues:
requiredValues[line] = 1
for line in requiredValues.keys():
if requiredValues[line]:
continue
checkFailed(host.hostname, "/etc/user_attr is missing expected line '%s'" % line)
def testHAWQGUC(host):
if not gpcheck_info.hawq_collected_ok:
return
if options.verbose:
logger.info("-- test HAWQ CPU/Memory Guc Settings")
c = gpcheck_info.hawq_segment_configuration
master_hostname = filter(lambda x: x['role'] == 'm', c)[0]['hostname']
if host.hostname not in map(lambda x: x['hostname'], c):
logger.warning("host '%s' is not in HAWQ array" % host.hostname)
return
actual_total_memory = host.data.machine.memory_in_MB
guc_vmemsize_master = gpcheck_info.hawq_gucs[HAWQ_GUC_MEMORY]
# segment count on this host
num_segments = len(filter(lambda x: x['hostname'] == host.hostname, c))
if host.hostname == master_hostname:
if num_segments > 1:
checkFailed(host.hostname, "HAWQ master host has segments configured")
if actual_total_memory < guc_vmemsize_master:
checkFailed(host.hostname, "HAWQ master host memory size '%s' is less than the '%s' size '%s'" % (
actual_total_memory, HAWQ_GUC_MEMORY, guc_vmemsize_master))
return
# check HAWQ master's memory size
expected_vmemory_size = 8192
if guc_vmemsize_master != expected_vmemory_size:
checkFailed(host.hostname, "HAWQ master's %s GUC value is %s, expected %s" % (
HAWQ_GUC_MEMORY, guc_vmemsize_master, expected_vmemory_size))
else:
datanode_mem = gpcheck_config.hdfs_expected["dfs.mem.datanode.heap"]
# check HAWQ memory size
if actual_total_memory < datanode_mem:
checkFailed(host.hostname, "HAWQ segment's host memory size '%s' is less than the expected data node memory size '%s'" % (
actual_total_memory, datanode_mem))
logger.warning("please change the expected data node memory 'dfs.mem.datanode.heap' in gpcheck.cnf file")
logger.warning("SKIP '%s' check" %(HAWQ_GUC_MEMORY))
return
expect_vmemsize_per_segment = 8192
if guc_vmemsize_master != expect_vmemsize_per_segment:
checkFailed(host.hostname, "HAWQ segment's %s GUC value on this host is %s, expected %s" % (
HAWQ_GUC_MEMORY, guc_vmemsize_master, expect_vmemsize_per_segment))
def testDiskCapacity(host):
if options.verbose:
logger.info("-- test Disk Capacity")
for line in host.data.diskusage.lines:
if len(gpcheck_config.diskusage_mounts) == 0 or line.mount in gpcheck_config.diskusage_mounts:
actual_usage = int(line.used_percent[:-1])
if actual_usage > gpcheck_config.diskusage_usagemax:
checkFailed(host.hostname,
"potential disk full risk: %s mounted on %s has used %s space" % (
line.fs, line.mount, line.used_percent))
return
def testHAWQconfig(host):
hawq = host.data.hawq
hdfs = host.data.hdfs
if hawq is None:
return # skip HAWQ test when hawq is None
if hdfs is None:
return # skip HAWQ test when hdfs is None
if options.verbose:
logger.info("-- test HAWQ config")
if hawq.errormsg:
checkFailed(host.hostname, "collect HAWQ configuration error: %s" % hawq.errormsg)
return
datanode_list = list()
if HADOOP_HOME:
datanode_list = parse_host_list_file("%s/etc/hadoop/slaves" % HADOOP_HOME)
is_datanode = False
if host.hostname in datanode_list:
is_datanode = True
expect_config = gpcheck_config.hawq_expected
if options.kerberos:
expect_config.update(gpcheck_config.hawq_kerberos_expected)
if options.yarn or options.yarn_ha:
expect_config.update(gpcheck_config.hawq_yarn_expected)
actual_config = hawq.site_config
hdfs_actual_config = hdfs.site_config
for exp_key, exp_val in expect_config.items():
if exp_key not in actual_config:
checkFailed(host.hostname, "HAWQ configuration missing: '%s' needs to be set to '%s'" % (exp_key, exp_val))
else:
actual_val = actual_config[exp_key]
et = (exp_key, exp_val, actual_val)
if exp_key == "dfs.block.local-path-access.user":
if exp_val not in actual_val.split(','):
checkFailed(host.hostname, "HDFS configuration: '%s' should include user '%s', actual value is '%s'" % et)
elif exp_key == "dfs.namenode.handler.count":
if int(exp_val) > int(actual_val):
checkFailed(host.hostname, "HDFS configuration: '%s' should be at least '%s', actual value is '%s'" % et)
else:
if exp_val != actual_val:
checkFailed(host.hostname, "HAWQ configuration: expected '%s' for '%s', actual value is '%s'" % et)
if not options.kerberos:
if 'hadoop.security.authentication' in actual_config:
if actual_config['hadoop.security.authentication'] != 'simple':
checkFailed(host.hostname, "HAWQ configuration: expected '%s' for '%s', actual value is '%s'" % ('simple', 'hadoop.security.authentication', actual_config['hadoop.security.authentication']))
if 'hadoop.security.authentication' in hdfs_actual_config:
if hdfs_actual_config['hadoop.security.authentication'] != 'simple':
checkFailed(host.hostname, "HAWQ configuration: expected '%s' for '%s', actual value is '%s'" % ('simple', 'hadoop.security.authentication', hdfs_actual_config['hadoop.security.authentication']))
if options.yarn or options.yarn_ha:
hawq_yarn_property_exist_list = ['hawq_rm_yarn_address', 'hawq_rm_yarn_scheduler_address', 'hawq_rm_yarn_app_name']
for item in hawq_yarn_property_exist_list:
if item in actual_config:
if not actual_config[item]:
checkFailed(host.hostname, "HAWQ configuration: yarn.resourcemanager.address is empty")
else:
checkFailed(host.hostname, "HAWQ configuration: yarn.resourcemanager.address not defined")
if 'dfs.client.read.shortcircuit' not in actual_config:
checkFailed(host.hostname, "HAWQ configuration dfs.client.read.shortcircuit not defined")
if 'dfs.client.read.shortcircuit' not in hdfs_actual_config:
checkFailed(host.hostname, "HAWQ configuration dfs.client.read.shortcircuit not defined")
if 'dfs.domain.socket.path' not in actual_config:
checkFailed(host.hostname, "HAWQ configuration dfs.domain.socket.path not defined")
if 'dfs.domain.socket.path' not in hdfs_actual_config:
checkFailed(host.hostname, "HDFS configuration dfs.domain.socket.path not defined")
if is_datanode and 'dfs.domain.socket.path' in actual_config and 'dfs.domain.socket.path' in hdfs_actual_config:
if actual_config['dfs.domain.socket.path'] != hdfs_actual_config['dfs.domain.socket.path']:
checkFailed(host.hostname, "HAWQ configuration: dfs.domain.socket.path expect to have the same value with HDFS configuration")
else:
cmd = "ls -l %s" % actual_config['dfs.domain.socket.path']
(result, output, errmsg) = remote_ssh_output(cmd, host.hostname, '')
if result == 0:
if output.split(' ')[0][7:9] != 'rw':
checkFailed(host.hostname, "HAWQ configuration dfs.domain.socket.path: %s should have R/W access for both hawq and HDFS on %s" % (actual_config['dfs.domain.socket.path'], host.hostname))
else:
checkFailed(host.hostname, "HAWQ configuration dfs.domain.socket.path: %s, does not exist on %s" % (actual_config['dfs.domain.socket.path'], host.hostname))
if 'output.replace-datanode-on-failure' in actual_config and len(datanode_list) > 0:
if len(datanode_list) < 4:
if actual_config['output.replace-datanode-on-failure'] == 'true':
checkFailed(host.hostname, "HAWQ configuration: output.replace-datanode-on-failure expect false, current is true")
else:
if actual_config['output.replace-datanode-on-failure'] == 'false':
checkFailed(host.hostname, "HAWQ configuration: output.replace-datanode-on-failure expect true, current is false")
else:
checkFailed(host.hostname, "HAWQ configuration: output.replace-datanode-on-failure not defined")
def testDiskCapacity(host):
if options.verbose:
logger.info("-- test Disk Capacity")
for line in host.data.diskusage.lines:
if len(gpcheck_config.diskusage_mounts) == 0 or line.mount in gpcheck_config.diskusage_mounts:
actual_usage = int(line.used_percent[:-1])
if actual_usage > gpcheck_config.diskusage_usagemax:
checkFailed(host.hostname,
"potential disk full risk: %s mounted on %s has used %s space" % (
line.fs, line.mount, line.used_percent))
return
def testHDFSConfig(host):
hdfs = host.data.hdfs
if hdfs is None:
return # skip HDFS test when hdfs is None
if options.verbose:
logger.info("-- test HDFS config")
if hdfs.errormsg:
checkFailed(host.hostname, "collect HDFS configuration error: %s" % hdfs.errormsg)
return
expect_config = gpcheck_config.hdfs_expected
if not options.hdfs_ha and not options.kerberos:
expect_config.update(gpcheck_config.hdfs_non_expected)
if options.hdfs_ha and not options.kerberos:
expect_config.update(gpcheck_config.hdfs_ha_expected)
if options.kerberos and not options.hdfs_ha:
expect_config.update(gpcheck_config.hdfs_kerberos_expected)
if options.kerberos and options.hdfs_ha:
expect_config.update(gpcheck_config.hdfs_ha_kerberos_expected)
if options.yarn or options.yarn_ha:
expect_config.update(gpcheck_config.yarn_expected)
if not options.yarn_ha and not options.kerberos:
expect_config.update(gpcheck_config.yarn_non_expected)
if options.yarn_ha:
expect_config.update(gpcheck_config.yarn_ha_expected)
if options.kerberos:
expect_config.update(gpcheck_config.yarn_kerberos_expected)
actual_config = hdfs.site_config
actual_heap_size = hdfs.namenode_heap_size if host.is_namenode else hdfs.datanode_heap_size
if host.data.machine.memory_in_MB < actual_heap_size:
checkFailed(host.hostname, "host memory size '%s' is less than the java max heap size '%s'" % (host.data.machine.memory_in_MB, actual_heap_size))
# test hdfs_site.xml setting
for exp_key, exp_val in expect_config.items():
if exp_key.startswith("dfs.mem"):
continue # these options belongs to memory tests
if exp_key not in actual_config:
checkFailed(host.hostname, "HDFS configuration missing: '%s' needs to be set to '%s'" % (exp_key, exp_val))
else:
actual_val = actual_config[exp_key]
et = (exp_key, exp_val, actual_val)
if exp_key == "dfs.block.local-path-access.user":
if exp_val not in actual_val.split(','):
checkFailed(host.hostname, "HDFS configuration: '%s' should include user '%s', actual value is '%s'" % et)
elif exp_key == "dfs.namenode.handler.count":
if int(exp_val) > int(actual_val):
checkFailed(host.hostname, "HDFS configuration: '%s' should be at least '%s', actual value is '%s'" % et)
else:
if exp_val != actual_val:
checkFailed(host.hostname, "HDFS configuration: expected '%s' for '%s', actual value is '%s'" % et)
# test hadoop memory setting
expect_namenode_heap = expect_config["dfs.mem.namenode.heap"]
expect_datanode_heap = expect_config["dfs.mem.datanode.heap"]
if host.is_namenode and actual_heap_size < expect_namenode_heap:
checkFailed(host.hostname, "Namenode Java heap size is only %sM, we recommends at least %sM" %
(actual_heap_size, expect_namenode_heap))
if not host.is_namenode and actual_heap_size < expect_datanode_heap:
checkFailed(host.hostname, "Datanode Java heap size is only %sM, expect value is %sM" %
(actual_heap_size, expect_datanode_heap))
# Check if nodemanager direcotries exists
directory_check_list = []
datanode_list = list()
if HADOOP_HOME:
datanode_list = parse_host_list_file("%s/etc/hadoop/slaves" % HADOOP_HOME)
is_datanode = False
if host.hostname in datanode_list:
is_datanode = True
if options.yarn or options.yarn_ha:
yarn_enabled = True
else:
yarn_enabled = False
if yarn_enabled and is_datanode:
if 'yarn.nodemanager.local-dirs' in actual_config:
directory_check_list += actual_config['yarn.nodemanager.local-dirs'].split(',')
else:
checkFailed(host.hostname, "YARN configuration: yarn.nodemanager.local-dirs not defined")
if 'yarn.nodemanager.log-dirs' in actual_config:
directory_check_list += actual_config['yarn.nodemanager.log-dirs'].split(',')
else:
checkFailed(host.hostname, "YARN configuration: yarn.nodemanager.log-dirs not defined")
for directory in directory_check_list:
cmd = "test -e %s" % directory
(result, output, errmsg) = remote_ssh_output(cmd, host.hostname, '')
if result != 0:
checkFailed(host.hostname, "YARN nodemanager directory %s does not exist" % directory)
# Check if resource manager property exists
if options.yarn:
yarn_property_exist_list = ['yarn.resourcemanager.address', 'yarn.resourcemanager.scheduler.address']
if options.yarn_ha:
yarn_property_exist_list = ['yarn.resourcemanager.hostname.rm1', 'yarn.resourcemanager.hostname.rm2']
if yarn_enabled:
for item in yarn_property_exist_list:
if item in actual_config:
if not actual_config[item]:
checkFailed(host.hostname, "YARN configuration: %s is empty" % item)
else:
checkFailed(host.hostname, "YARN configuration: %s not defined" % item)
# Check yarn kerberos properties
if yarn_enabled and options.kerberos:
yarn_kerberos_check_list = ['yarn.nodemanager.keytab', 'yarn.nodemanager.principal', \
'yarn.resourcemanager.keytab', 'yarn.resourcemanager.principal']
for item in yarn_kerberos_check_list:
if item in actual_config:
if not actual_config[item]:
checkFailed(host.hostname, "YARN configuration: %s is empty, expected non-empty" % item)
else:
checkFailed(host.hostname, "YARN configuration missing: %s" % item)
def testIOSchedulers(host):
if options.verbose:
logger.info("-- test IO scheduler")
if host.data.ioschedulers.errormsg:
checkFailed(host.hostname, "collect IO scheduler data error: %s" % host.data.ioschedulers.errormsg)
return
expectedScheduler = "deadline"
for dev in host.data.ioschedulers.devices:
scheduler = host.data.ioschedulers.devices[dev]
if scheduler != expectedScheduler:
checkFailed(host.hostname,
"on device (%s) IO scheduler '%s' does not match expected value '%s'" % (dev, scheduler, expectedScheduler))
# perform this test only run as root
def testBlockdev(host):
if host.data.blockdev is None:
return
if options.verbose:
logger.info("-- test block device readahead value")
expectedReadAhead = "16384"
for dev in host.data.blockdev.ra:
ra = host.data.blockdev.ra[dev]
if ra != expectedReadAhead:
checkFailed(host.hostname,
"on device (%s) blockdev readahead value '%s' does not match expected value '%s'" % (dev, ra, expectedReadAhead))
def testSysctl(host):
if options.verbose:
logger.info("-- test sysctl value")
if host.data.sysctl.errormsg:
checkFailed(host.hostname, "collect sysctl params error: %s" % host.data.sysctl.errormsg)
return
expected_values = gpcheck_config.sysctl_expected
real_values = host.data.sysctl.variables
# gpcheck.conf specify a lowerbound value for these params, actual value can be larger
params_with_lowerbound = set() # sysctl params' value must be exactly the same
for k in expected_values:
if k in params_with_lowerbound:
if int(real_values[k]) < int(expected_values[k]):
checkFailed(host.hostname,
"sysctl value for key '%s' has value '%s', but we expect at least '%s'" % (k, real_values[k], expected_values[k]))
elif real_values[k] != expected_values[k]: # for other params, we expect the actual value to be the same value
checkFailed(host.hostname,
"sysctl value for key '%s' has value '%s' and expects '%s'" % (k, real_values[k], expected_values[k]))
def testLimitsConf(host):
if options.verbose:
logger.info("-- test /etc/security/limits.conf")
if host.data.limitsconf.errormsg:
checkFailed(host.hostname, "collect limits.conf data error: %s" % host.data.limitsconf.errormsg)
return
# both dict has the form: (type, item) => value
expect_data = gpcheck_config.limits_expected
actual_data = dict([((e.type, e.item), e.value) for e in host.data.limitsconf.lines if e.domain in ("gpadmin", "*")])
expect_keyset = set(expect_data.keys())
actual_keyset = set(actual_data.keys())
for key in expect_keyset.intersection(actual_keyset):
expect_val = int(expect_data[key])
actual_val = int(actual_data[key])
if actual_val < expect_val:
checkFailed(host.hostname,
"%s in /etc/security/limits.conf has value %d lower than expected value %d" % (
" ".join(key), actual_val, expect_val))
for key in expect_keyset.difference(actual_keyset):
checkFailed(host.hostname,
"%s not found in /etc/security/limits.conf" % " ".join(key))
def testLinuxMounts(host):
if options.verbose:
logger.info("-- test mount points")
expected_mount_points = gpcheck_config.mount_points
actual_mount_points = set([m.dir for m in host.data.mounts.entries.values()])
if len(expected_mount_points) == 0:
if options.verbose:
logger.info("-- you didn't specify any mount points to be check in %s, ignore this test" % GPCHECK_CONFIG_FILE)
return
if not actual_mount_points.issuperset(expected_mount_points):
for failed_mount in expected_mount_points.difference(actual_mount_points):
checkFailed(host.hostname, "%s is not mounted" % failed_mount)
def testNtp(host):
if options.verbose:
logger.info("-- test NTP")
if host.data.ntp.currenttime < (gpcheck_info.collection_start_time - 1):
checkFailed(host.hostname, "potential NTPD issue. gpcheck start time (%s) time on machine (%s)" % (time.ctime(gpcheck_info.collection_start_time), time.ctime(host.data.ntp.currenttime)))
if host.data.ntp.currenttime > (gpcheck_info.collection_end_time + 1):
checkFailed(host.hostname, "potential NTPD issue. gpcheck end time (%s) time on machine (%s)" % (time.ctime(gpcheck_info.collection_start_time), time.ctime(host.data.ntp.currenttime)))
if not host.data.ntp.running:
checkFailed(host.hostname, "ntpd not detected on machine")
def testGenericLinuxHost(host):
logger.info("test on host: %s" % host.hostname)
if host.is_namenode:
testHAWQGUC(host)
testHAWQconfig(host)
testHDFSConfig(host)
testDiskCapacity(host)
testSysctl(host)
testLimitsConf(host)
testLinuxMounts(host)
testNtp(host)
else:
testHAWQGUC(host)
testHAWQconfig(host)
testDiskCapacity(host)
testHDFSConfig(host)
testIOSchedulers(host)
testSysctl(host)
testLimitsConf(host)
testLinuxMounts(host)
testNtp(host)
def testGenericSolarisHost(host):
testSolarisEtcSystem(host)
testSolarisEtcProject(host)
testSolarisEtcUserAttr(host)
def testUnameConsistency():
logger.info("test uname consistency")
firstUname = None
firstHost = None
for _, host in gpcheck_info.hosts.items():
uname = host.data.uname.output
if firstUname:
if firstUname != uname:
checkFailed(h, "uname -r output different among hosts: %s : %s != %s : %s" % (firstHost, firstUname, host.hostname, uname))
else:
firstUname = uname
firstHost = host.hostname
def testGenericLinuxCluster():
for _, host in gpcheck_info.hosts.items():
testGenericLinuxHost(host)
testUnameConsistency()
def testGenericLinuxClusterBlockDev():
for _, host in gpcheck_info.hosts.items():
if not host.is_namenode:
testBlockdev(host)
def testGenericSolarisCluster():
for _, host in gpcheck_info.hosts.items():
testGenericSolarisHost(host)
testUnameConsistency()
def runTests():
if gpcheck_info.host_type == HostType.GPCHECK_HOSTTYPE_GENERIC_LINUX:
testGenericLinuxCluster()
if gpcheck_info.is_root:
testGenericLinuxClusterBlockDev()
elif gpcheck_info.host_type == HostType.GPCHECK_HOSTTYPE_GENERIC_SOLARIS:
testGenericSolarisCluster()
else:
raise GpCheckError("No tests exist for this platform in gpcheck")
# report checks result
logger.info("GPCHECK Result:")
logger.info("---------------------------------------")
if found_errors:
logger.info("check failed!\tfound %s error(s)" % found_errors)
else:
logger.info("all check succeed!")
logger.info("---------------------------------------")
def readZip():
logger.info("trying to read zip file '%s'..." % options.zipin)
words = options.zipin.split(".tar.gz")
if len(words) != 2:
raise GpCheckError("--zipin file needs to be a .tar.gz file")
fname = words[0]
# untar
cmdStr = "tar xfz %s" % (options.zipin)
if options.verbose:
logger.info(cmdStr)
try:
cmd = Command("tarcmd", cmdStr)
pool.addCommand(cmd)
pool.join()
items = pool.getCompletedItems()
for i in items:
if i.results.rc or i.results.halt or not i.results.completed:
raise Exception("error running command '%s'" % cmdStr)
except Exception, e:
raise GpCheckError("Failed to extract tar file '%s': %s" % (options.zipin, e))
# move extracted file to temp directory
newfname = "%s/%s" % (tmpdir, fname)
cmdStr = "mv %s %s" % (fname, newfname)
if options.verbose:
logger.info(cmdStr)
try:
cmd = Command("mvcmd", cmdStr)
pool.addCommand(cmd)
pool.join()
items = pool.getCompletedItems()
for i in items:
if i.results.rc or i.results.halt or not i.results.completed:
raise Exception("error running command '%s'" % cmdStr)
except Exception, e:
raise GpCheckError("Failed to move file '%s' to temp directory: %s" % (fname, e))
# load pickle file
global gpcheck_info
try:
with open(newfname, "rb") as f:
gpcheck_info = pickle.load(f)
except Exception, e:
raise GpCheckError("Failed to load pickle file '%s': %s" % (newfname, e))
logger.info("trying to read zip file '%s' [success]" % options.zipin)
def doZip(fname):
logger.info("dump gpcheck data into a zip file '%s.tar.gz'..." % fname)
# dump to pickle file
try:
with open(fname, "wb") as f:
pickle.dump(gpcheck_info, f)
except Exception, e:
raise GpCheckError("Failed to dump pickle file '%s':\n%s" % (fname, e))
# make a tar ball
cmdStr = "tar cfz %s.tar.gz %s" % (fname, fname)
if options.verbose:
logger.info(cmdStr)
try:
cmd = Command("tarcmd", cmdStr)
pool.addCommand(cmd)
pool.join()
items = pool.getCompletedItems()
for i in items:
if i.results.rc or i.results.halt or not i.results.completed:
raise Exception("error running command '%s': %s" % (cmdStr, i.results.stderr.strip()))
except Exception, e:
raise GpCheckError("Failed to dump gpcheck data into a zip file:\n%s" % e)
# delete pickle file
cmdStr = "rm -rf %s" % fname
if options.verbose:
logger.info(cmdStr)
try:
cmd = Command("rmcmd", cmdStr)
pool.addCommand(cmd)
pool.join()
items = pool.getCompletedItems()
for i in items:
if i.results.rc or i.results.halt or not i.results.completed:
raise Exception("error running command '%s': %s" % (cmdStr, i.results.stderr.strip()))
except Exception, e:
raise GpCheckError("Failed to delete pickle file '%s':\n%s" % (fname, e))
logger.info("dump gpcheck data into a zip file '%s.tar.gz' [success]" % fname)
def doPrint():
for h in sorted(gpcheck_info.hosts):
print "HOST: %s" % h
print gpcheck_info.hosts[h].data
print "----------------------------------------------------------------------\n"
if gpcheck_info.hawq_collected_ok:
print "HAWQ guc settings:"
for guc_name, guc_val in gpcheck_info.hawq_gucs.items():
print "GUC : %s\nMaster value: %s\nSegment value: %s\n" % (guc_name, guc_val[0], guc_val[1])
if __name__ == '__main__':
if gpcheck_info.is_root:
logger.info("gpcheck will perform block device's readahead checks when run as root")
try:
try:
checkPlatform()
parseargs()
readConfigFile()
except GpCheckError, e:
logger.error(str(e))
sys.exit(1)
if pool:
pool.join()
pool.haltWork()
pool.joinWorkers()
try:
tmpdir = tempfile.mkdtemp(prefix='gpcheck')
except Exception, e:
logger.error("Error creating tmp dir on master: %s" % e)
sys.exit(1)
try:
# Phase 1: collect input
if options.zipin:
readZip() # load information into gpcheck_info from zip
else:
# read host info into gpcheck_info.hosts from --file or --host
createHostList()
# collect each server's system environment configuration
runCollections()
# read collected data into gpcheck_info
readDataFiles()
# read HAWQ configuration
readHAWQConfiguration()
# Phase 2: generate output
if options.stdout:
doPrint()
elif options.zipout:
doZip("./gpcheck_%s" % time.time())
else:
runTests()
if found_errors:
sys.exit(1)
except GpCheckError, e:
logger.error(str(e))
sys.exit(1)
finally:
logger.info("Clean up...")
try:
if tmpdir:
shutil.rmtree(tmpdir)
except Exception, e:
logger.error("error removing tempdir during job cleanup: %s" % e)
finally:
if pool:
pool.join()
pool.haltWork()
pool.joinWorkers()