blob: 1fb39adfad8904ffb09b85d8027bc6251ca7eda7 [file]
#!/usr/bin/env python3
#
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
#
#------------------------------------------------------------------------
# The gpmemwatcher utility is a daemon that runs on all the servers of a GPDB
# cluster. It tracks the memory usage of each process on GPDB every minute (by
# default). It collects PS output every minute. It is a low impact process that
# consumes only 4 MB of memory. Expect that it will generate approximately 30
# MB of data over a 24-hour period.
#
# Reference this Knowledge Base article for usage information
# https://community.pivotal.io/s/article/How-to-Deploy-Start-and-Stop-memwatcher-utility
#------------------------------------------------------------------------
import gzip
import os
import subprocess
import sys
import signal
import time
from optparse import OptionParser
from datetime import datetime
from gppylib.commands import gp
pidfile = 'reswatch.pid'
ps_file = 'ps.out.gz'
open_files = []
gphome = os.getenv('GPHOME', None)
gpverstr = " ".join(gp.GpVersion.local(None, gphome).split()[3:])
# Deamon code as per Steven's - Advanced programming in the UNIX envirionment
def createDaemon():
try:
pid = os.fork()
except OSError as e:
raise Exception("%s [%d]" % (e.strerror, e.errno))
if pid > 0: # we are the controlling parent
sys.exit(0)
# We are the child
# Call setsid() to create a new session & process group without a controlling terminal
# This process becomes the session and group leader
#and now....
os.setsid()
# We should call a second fork here. The rational is that this process, because it is a session leader
# it *might* be able to aquire a terminal. If we are to be paranoid, we should fork() a second child
# exit the first and run everything in the second child. As this child is not a session leader the python
# process will have no chance to aquire a controlling terminal
try:
pid = os.fork()
except OSError as e:
raise Exception("%s [%d]" % (e.strerror, e.errno))
if pid > 0:
# We are the session leader - exit so that the child can run with no chance of grabbing the terminal
# This is rather paranoid
sys.exit(0)
# We are the child in the new session but not a session leader, work will continue in this process now
# Reset our file mask:
os.umask(0)
def lauchAsDaemon(workdir):
#Test to be sure that workdir exists and we can write to it:
if not (os.path.isdir(workdir) and os.access(workdir, os.W_OK)):
raise (Exception, "Unable to write to work directory: %s" % (workdir))
# OK - after minimal testing, let's fork off and run as a daemon
createDaemon()
# now that we're running as a daemon
# Let's dump our pid:
pid = str(os.getpid())
os.chdir(workdir)
with open(pidfile, 'w') as f:
f.write(pid)
# Now we register for SIGINT/SIGTERM so that we can clean up properly
# atexit apparently doesn't get called here
for sig in (signal.SIGINT, signal.SIGTERM):
signal.signal(sig, cleanupLogFiles)
runPSProcess()
def cleanupLogFiles(signum, frame):
for f in open_files:
f.close()
sys.exit(0)
def runPSProcess(sleepInt=60):
ps_cmd = 'ps -ewwopid,ppid,rss,vsz,pmem,pcpu,time,etime,start_time,wchan,stat,psr,args '
try:
outfile = gzip.open(ps_file, 'wt')
open_files.append(outfile)
while True:
outfile.write(datetime.now().strftime("\n\n>>>%y:%m:%d:%H:%M:%S<<<\n"))
cmd = subprocess.Popen(ps_cmd, shell=True, stdout=subprocess.PIPE, universal_newlines=True)
outfile.writelines(cmd.stdout)
outfile.flush()
time.sleep(sleepInt)
finally:
outfile.close()
def parseArgs():
parser = OptionParser(version='%prog version ' + gpverstr)
parser.add_option('-f', '--host_file', action='store', type='string', dest='hostfile')
# parser.add_option( '--host', action='append', type='string', dest='hosts')
parser.add_option('-d', '--work_dir', action='store', type='string', dest='work_dir')
parser.add_option('-r', '--results_dir', action='store', type='string', dest='results_dir')
parser.add_option('--stop', action='store_true', dest='stop')
parser.add_option('--daemon', action='store_true', dest='daemon')
parser.set_defaults(work_dir='/data1',
results_dir='/data',
stop=False,
daemon=False)
(option, args) = parser.parse_args()
if len(args):
parser.error("Invalid arguments, Please check the usage by running 'gpmemwatcher -h'.")
return (option, args)
def launchProcess(host, workdir):
dest_dir = os.path.join(workdir, 'gpsupport_reswatch')
# Let's try to guess at the python environment. If we have GPHOME set we'll use it otherwise
# We'll need to default to whatever is current in the shell
if not gphome:
raise Exception('Environment Variable GPHOME not set')
py_string = 'source ' + os.path.join(gphome, 'cloudberry-env.sh') + '; '
# Now let's just quick check the host as to whether the python version is >= 2.6
try:
subprocess.check_call("ssh -T %s '%s python -c \"import sys; sys.exit(1) if sys.hexversion < 0x020600f0 else 0\"'" % (host, py_string), shell=True)
except subprocess.CalledProcessError as e:
#print >> sys.stderr, 'Python version on host %s is < 2.6.0. Aborting' % (host)
print('Python version on host %s is < 2.6.0. Aborting' % (host), file=sys.stderr)
sys.exit(1)
try:
subprocess.check_call("ssh -T %s 'mkdir %s'" % (host, dest_dir), shell=True)
except subprocess.CalledProcessError as e:
err = "Error when trying to create directory: " + dest_dir + " on host: " + host
print(err, file=sys.stderr)
print(e)
sys.exit(1)
try:
# SSH is doing something with it's terminal handling here I don't fully understand
# If we don't force the creation of a pseudo TTY the ssh hangs when the node process exits (forked into background)
subprocess.check_call("ssh -qtt %s '%s %s/bin/gpmemwatcher --daemon -d %s'" % (host, py_string, gphome, dest_dir), shell=True)
except subprocess.CalledProcessError as e:
err = 'Error when trying to launch resource watcher on host %s, aborting' % (host)
print(err, file=sys.stderr)
sys.exit(1)
def stopProcesses(host, workdir):
dest_dir = os.path.join(workdir, 'gpsupport_reswatch')
try:
subprocess.check_call("ssh -T %s 'kill $(cat %s/%s)'" % (host, dest_dir, pidfile), shell=True)
except subprocess.CalledProcessError as e:
print('Error stopping process on host: ' + host, file=sys.stderr)
print(e)
return
try:
subprocess.check_call('rsync -q %s:%s/%s ./%s.%s' % (host, dest_dir, ps_file, host, ps_file), shell=True)
except subprocess.CalledProcessError as e:
print('Error retrieving data from host: ' + host, file=sys.stderr)
print(e)
return
try:
subprocess.check_call("ssh -T %s 'rm -rf %s'" % (host, dest_dir), shell=True)
except subprocess.CalledProcessError as e:
print('Error removing work directory on host: ' + host, file=sys.stderr)
print(e)
return
def main():
(options, args) = parseArgs()
if options.daemon is True:
lauchAsDaemon(options.work_dir)
if not options.hostfile:
print("Error: host_file must be provided as an argument. Please check the usage by running 'gpmemwatcher -h'.\n")
return
with open(options.hostfile, 'r') as f:
hostmap = map(lambda x: tuple(x.rstrip().split(':')), f)
if options.stop:
for mapping in hostmap:
stopProcesses(*mapping)
else:
for mapping in hostmap:
launchProcess(*mapping)
if __name__ == '__main__':
main()