| #!/usr/bin/env python |
| # ----------------------------------------------------------------------- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # ----------------------------------------------------------------------- |
| |
| |
| import os |
| import sys |
| import subprocess |
| import datetime |
| import getopt |
| import time |
| |
| import db_util |
| |
| from ducc_util import DuccUtil |
| |
| class DuccRmQProcesses(DuccUtil): |
| |
| def usage(self, *msg): |
| if ( msg[0] != None ): |
| print ' '.join(msg) |
| print 'Usage:' |
| print ' q_processes options' |
| print '' |
| print 'Where options include:' |
| print ' -j --job jobid sho all processes for this job' |
| print ' -n --node show all processes on a node' |
| print ' -f --from date include only process since this date' |
| print ' -t --to date include processes only up to this date' |
| print '' |
| print 'Notes:' |
| print ' Omit -f and -t to get all processes.' |
| print '' |
| print ' If -f OR -t is specified, you MUST specifiy a node (-n) as well.' |
| print '' |
| print '' |
| print ' Date formats:' |
| print ' mm/dd/yy Use this to specify everything on a given day' |
| print ' mm/dd/yy.hh:mm Use this to specify a specific hour and minute.' |
| print ' Hours use the military (24-hour) clock.' |
| print '' |
| print 'Examples:' |
| print '' |
| print ' Show all work on bluejbob on Feb 15 2015 between noon and 4PM' |
| print ' q_processes --node bluejbob --from 2/14/15.12:00 --to 2/14/15.16:00' |
| print '' |
| print ' Show all history for job 1234' |
| print ' q_processes --job 1234' |
| print '' |
| print ' Show history for job 1234 on node bluebob' |
| print ' q_processes --job 1234 --node bluebob' |
| |
| sys.exit(0) |
| |
| |
| def parse_date(self, dat): |
| if ( '.' in dat ): |
| fmt = '%m/%d/%y.%H:%M' |
| else: |
| fmt = '%m/%d/%y' |
| |
| d = datetime.datetime.strptime(dat, fmt) |
| return int(time.mktime(d.timetuple()))*1000 |
| |
| def get_date(self, dat): |
| return datetime.datetime.fromtimestamp(dat) |
| |
| def main(self, argv): |
| |
| node = None |
| fromt = None |
| tot = None |
| jobid = None |
| conjunction = 'WHERE' |
| |
| print 'argv', argv |
| try: |
| opts, args = getopt.getopt(argv, 'f:j:n:t:h?', ['from=', 'to=', 'job=', 'node=', 'help', ]) |
| except: |
| self.usage("Invalid arguments " + ' '.join(argv)) |
| |
| |
| for ( o, a ) in opts: |
| if o in ('-n', '--node'): |
| node = a |
| elif o in ('-f', '--from'): |
| fromt = self.parse_date(a) |
| elif o in ('-t', '--to'): |
| tot = self.parse_date(a) |
| elif o in ('-j', '--job'): |
| jobid = a |
| elif o in ('-h', '-?', '--help'): |
| self.usage(None) |
| |
| |
| if ( ( fromt != None ) and ( tot != None ) and ( node == None ) ): |
| self.usage("Node must be specified when a date range is specified.") |
| |
| query = ['select * from ducc.processes'] |
| if ( node != None ): |
| query.append(conjunction) |
| conjunction = 'AND' |
| query.append("node='" + node + "'") |
| |
| |
| if ( fromt != None ): |
| query.append(conjunction) |
| conjunction = 'AND' |
| query.append("start > " + str(fromt)) |
| |
| if ( jobid != None ): |
| query.append(conjunction) |
| conjunction = 'AND' |
| query.append("job_id = " + jobid) |
| |
| if ( tot != None ): |
| if ( fromt == None ): |
| usage("--from must be specified if --to is also specified.") |
| query.append(conjunction) |
| conjunction = 'AND' |
| query.append("stop < " + str(tot)) |
| |
| if ( fromt != None ): |
| query.append("ALLOW FILTERING") |
| |
| query = '"' + ' '.join(query) + '"' |
| DH = self.DUCC_HOME |
| dbn = self.ducc_properties.get('ducc.database.host') |
| |
| CMD = [DH + '/cassandra-server/bin/cqlsh', dbn, '-u', 'guest', '-p', 'guest', '-e', query] |
| CMD = ' '.join(CMD) |
| print CMD |
| |
| lines = [] |
| proc = subprocess.Popen(CMD, bufsize=0, stdout=subprocess.PIPE, shell=True) |
| for line in proc.stdout: |
| # print '[]', line.strip() |
| lines.append(line) |
| |
| |
| header, lines = db_util.parse(lines, 'job_id') |
| for line in lines: |
| line['start'] = str(self.get_date(int(line['start'])/1000)) |
| line['stop'] = str(self.get_date(int(line['stop'])/1000)) |
| |
| db_util.format(header, lines) |
| |
| return |
| |
| |
| if __name__ == "__main__": |
| stopper = DuccRmQProcesses() |
| stopper.main(sys.argv[1:]) |
| |
| |