| #!/usr/bin/env python3 |
| # |
| # Copyright 2021 VMware, Inc. |
| # SPDX-License-Identifier: Apache-2.0 |
| # |
| #------------------------------------------------------------------------ |
| # The gpmemwatcher utility is a daemon that runs on all the servers of a GPDB |
| # cluster. It tracks the memory usage of each process on GPDB every minute (by |
| # default). It collects PS output every minute. It is a low impact process that |
| # consumes only 4 MB of memory. Expect that it will generate approximately 30 |
| # MB of data over a 24-hour period. |
| # |
| # Reference this Knowledge Base article for usage information |
| # https://community.pivotal.io/s/article/How-to-Deploy-Start-and-Stop-memwatcher-utility |
| #------------------------------------------------------------------------ |
| |
| import gzip |
| import os |
| import subprocess |
| import sys |
| import signal |
| import time |
| from optparse import OptionParser |
| from datetime import datetime |
| from gppylib.commands import gp |
| |
| pidfile = 'reswatch.pid' |
| ps_file = 'ps.out.gz' |
| open_files = [] |
| gphome = os.getenv('GPHOME', None) |
| gpverstr = " ".join(gp.GpVersion.local(None, gphome).split()[3:]) |
| |
| # Deamon code as per Steven's - Advanced programming in the UNIX envirionment |
| def createDaemon(): |
| try: |
| pid = os.fork() |
| except OSError as e: |
| raise Exception("%s [%d]" % (e.strerror, e.errno)) |
| |
| if pid > 0: # we are the controlling parent |
| sys.exit(0) |
| # We are the child |
| # Call setsid() to create a new session & process group without a controlling terminal |
| # This process becomes the session and group leader |
| #and now.... |
| os.setsid() |
| # We should call a second fork here. The rational is that this process, because it is a session leader |
| # it *might* be able to aquire a terminal. If we are to be paranoid, we should fork() a second child |
| # exit the first and run everything in the second child. As this child is not a session leader the python |
| # process will have no chance to aquire a controlling terminal |
| try: |
| pid = os.fork() |
| except OSError as e: |
| raise Exception("%s [%d]" % (e.strerror, e.errno)) |
| |
| if pid > 0: |
| # We are the session leader - exit so that the child can run with no chance of grabbing the terminal |
| # This is rather paranoid |
| sys.exit(0) |
| |
| # We are the child in the new session but not a session leader, work will continue in this process now |
| # Reset our file mask: |
| os.umask(0) |
| |
| |
| def lauchAsDaemon(workdir): |
| #Test to be sure that workdir exists and we can write to it: |
| if not (os.path.isdir(workdir) and os.access(workdir, os.W_OK)): |
| raise (Exception, "Unable to write to work directory: %s" % (workdir)) |
| # OK - after minimal testing, let's fork off and run as a daemon |
| createDaemon() |
| # now that we're running as a daemon |
| # Let's dump our pid: |
| pid = str(os.getpid()) |
| os.chdir(workdir) |
| with open(pidfile, 'w') as f: |
| f.write(pid) |
| # Now we register for SIGINT/SIGTERM so that we can clean up properly |
| # atexit apparently doesn't get called here |
| for sig in (signal.SIGINT, signal.SIGTERM): |
| signal.signal(sig, cleanupLogFiles) |
| runPSProcess() |
| |
| |
| def cleanupLogFiles(signum, frame): |
| for f in open_files: |
| f.close() |
| sys.exit(0) |
| |
| |
| def runPSProcess(sleepInt=60): |
| ps_cmd = 'ps -ewwopid,ppid,rss,vsz,pmem,pcpu,time,etime,start_time,wchan,stat,psr,args ' |
| try: |
| outfile = gzip.open(ps_file, 'wt') |
| open_files.append(outfile) |
| while True: |
| outfile.write(datetime.now().strftime("\n\n>>>%y:%m:%d:%H:%M:%S<<<\n")) |
| cmd = subprocess.Popen(ps_cmd, shell=True, stdout=subprocess.PIPE, universal_newlines=True) |
| outfile.writelines(cmd.stdout) |
| outfile.flush() |
| time.sleep(sleepInt) |
| finally: |
| outfile.close() |
| |
| |
| def parseArgs(): |
| parser = OptionParser(version='%prog version ' + gpverstr) |
| |
| parser.add_option('-f', '--host_file', action='store', type='string', dest='hostfile') |
| # parser.add_option( '--host', action='append', type='string', dest='hosts') |
| parser.add_option('-d', '--work_dir', action='store', type='string', dest='work_dir') |
| parser.add_option('-r', '--results_dir', action='store', type='string', dest='results_dir') |
| parser.add_option('--stop', action='store_true', dest='stop') |
| parser.add_option('--daemon', action='store_true', dest='daemon') |
| |
| parser.set_defaults(work_dir='/data1', |
| results_dir='/data', |
| stop=False, |
| daemon=False) |
| (option, args) = parser.parse_args() |
| if len(args): |
| parser.error("Invalid arguments, Please check the usage by running 'gpmemwatcher -h'.") |
| |
| return (option, args) |
| |
| |
| def launchProcess(host, workdir): |
| dest_dir = os.path.join(workdir, 'gpsupport_reswatch') |
| # Let's try to guess at the python environment. If we have GPHOME set we'll use it otherwise |
| # We'll need to default to whatever is current in the shell |
| |
| if not gphome: |
| raise Exception('Environment Variable GPHOME not set') |
| |
| py_string = 'source ' + os.path.join(gphome, 'cloudberry-env.sh') + '; ' |
| |
| # Now let's just quick check the host as to whether the python version is >= 2.6 |
| try: |
| subprocess.check_call("ssh -T %s '%s python -c \"import sys; sys.exit(1) if sys.hexversion < 0x020600f0 else 0\"'" % (host, py_string), shell=True) |
| except subprocess.CalledProcessError as e: |
| #print >> sys.stderr, 'Python version on host %s is < 2.6.0. Aborting' % (host) |
| print('Python version on host %s is < 2.6.0. Aborting' % (host), file=sys.stderr) |
| sys.exit(1) |
| |
| try: |
| subprocess.check_call("ssh -T %s 'mkdir %s'" % (host, dest_dir), shell=True) |
| except subprocess.CalledProcessError as e: |
| err = "Error when trying to create directory: " + dest_dir + " on host: " + host |
| print(err, file=sys.stderr) |
| print(e) |
| sys.exit(1) |
| |
| try: |
| # SSH is doing something with it's terminal handling here I don't fully understand |
| # If we don't force the creation of a pseudo TTY the ssh hangs when the node process exits (forked into background) |
| subprocess.check_call("ssh -qtt %s '%s %s/bin/gpmemwatcher --daemon -d %s'" % (host, py_string, gphome, dest_dir), shell=True) |
| except subprocess.CalledProcessError as e: |
| err = 'Error when trying to launch resource watcher on host %s, aborting' % (host) |
| print(err, file=sys.stderr) |
| sys.exit(1) |
| |
| |
| def stopProcesses(host, workdir): |
| dest_dir = os.path.join(workdir, 'gpsupport_reswatch') |
| |
| try: |
| subprocess.check_call("ssh -T %s 'kill $(cat %s/%s)'" % (host, dest_dir, pidfile), shell=True) |
| except subprocess.CalledProcessError as e: |
| print('Error stopping process on host: ' + host, file=sys.stderr) |
| print(e) |
| return |
| |
| try: |
| subprocess.check_call('rsync -q %s:%s/%s ./%s.%s' % (host, dest_dir, ps_file, host, ps_file), shell=True) |
| except subprocess.CalledProcessError as e: |
| print('Error retrieving data from host: ' + host, file=sys.stderr) |
| print(e) |
| return |
| |
| try: |
| subprocess.check_call("ssh -T %s 'rm -rf %s'" % (host, dest_dir), shell=True) |
| except subprocess.CalledProcessError as e: |
| print('Error removing work directory on host: ' + host, file=sys.stderr) |
| print(e) |
| return |
| |
| |
| def main(): |
| (options, args) = parseArgs() |
| if options.daemon is True: |
| lauchAsDaemon(options.work_dir) |
| |
| if not options.hostfile: |
| print("Error: host_file must be provided as an argument. Please check the usage by running 'gpmemwatcher -h'.\n") |
| return |
| |
| with open(options.hostfile, 'r') as f: |
| hostmap = map(lambda x: tuple(x.rstrip().split(':')), f) |
| |
| if options.stop: |
| for mapping in hostmap: |
| stopProcesses(*mapping) |
| else: |
| for mapping in hostmap: |
| launchProcess(*mapping) |
| |
| |
| if __name__ == '__main__': |
| main() |