blob: 4e298abf91cc7180ed7fcc214ee1767805e59bed [file] [log] [blame]
#!/usr/bin/env python
# -----------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# -----------------------------------------------------------------------
import os
import sys
import subprocess
import datetime
import getopt
import time
import db_util
from ducc_util import DuccUtil
class DuccRmQProcesses(DuccUtil):
def usage(self, *msg):
if ( msg[0] != None ):
print ' '.join(msg)
print 'Usage:'
print ' q_processes options'
print ''
print 'Where options include:'
print ' -j --job jobid sho all processes for this job'
print ' -n --node show all processes on a node'
print ' -f --from date include only process since this date'
print ' -t --to date include processes only up to this date'
print ''
print 'Notes:'
print ' Omit -f and -t to get all processes.'
print ''
print ' If -f OR -t is specified, you MUST specifiy a node (-n) as well.'
print ''
print ''
print ' Date formats:'
print ' mm/dd/yy Use this to specify everything on a given day'
print ' mm/dd/yy.hh:mm Use this to specify a specific hour and minute.'
print ' Hours use the military (24-hour) clock.'
print ''
print 'Examples:'
print ''
print ' Show all work on bluejbob on Feb 15 2015 between noon and 4PM'
print ' q_processes --node bluejbob --from 2/14/15.12:00 --to 2/14/15.16:00'
print ''
print ' Show all history for job 1234'
print ' q_processes --job 1234'
print ''
print ' Show history for job 1234 on node bluebob'
print ' q_processes --job 1234 --node bluebob'
sys.exit(0)
def parse_date(self, dat):
if ( '.' in dat ):
fmt = '%m/%d/%y.%H:%M'
else:
fmt = '%m/%d/%y'
d = datetime.datetime.strptime(dat, fmt)
return int(time.mktime(d.timetuple()))*1000
def get_date(self, dat):
return datetime.datetime.fromtimestamp(dat)
def main(self, argv):
node = None
fromt = None
tot = None
jobid = None
conjunction = 'WHERE'
print 'argv', argv
try:
opts, args = getopt.getopt(argv, 'f:j:n:t:h?', ['from=', 'to=', 'job=', 'node=', 'help', ])
except:
self.usage("Invalid arguments " + ' '.join(argv))
for ( o, a ) in opts:
if o in ('-n', '--node'):
node = a
elif o in ('-f', '--from'):
fromt = self.parse_date(a)
elif o in ('-t', '--to'):
tot = self.parse_date(a)
elif o in ('-j', '--job'):
jobid = a
elif o in ('-h', '-?', '--help'):
self.usage(None)
if ( ( fromt != None ) and ( tot != None ) and ( node == None ) ):
self.usage("Node must be specified when a date range is specified.")
query = ['select * from ducc.processes']
if ( node != None ):
query.append(conjunction)
conjunction = 'AND'
query.append("node='" + node + "'")
if ( fromt != None ):
query.append(conjunction)
conjunction = 'AND'
query.append("start > " + str(fromt))
if ( jobid != None ):
query.append(conjunction)
conjunction = 'AND'
query.append("job_id = " + jobid)
if ( tot != None ):
if ( fromt == None ):
usage("--from must be specified if --to is also specified.")
query.append(conjunction)
conjunction = 'AND'
query.append("stop < " + str(tot))
if ( fromt != None ):
query.append("ALLOW FILTERING")
query = '"' + ' '.join(query) + '"'
DH = self.DUCC_HOME
dbn = self.ducc_properties.get('ducc.database.host')
CMD = [DH + '/cassandra-server/bin/cqlsh', dbn, '-u', 'guest', '-p', 'guest', '-e', query]
CMD = ' '.join(CMD)
print CMD
lines = []
proc = subprocess.Popen(CMD, bufsize=0, stdout=subprocess.PIPE, shell=True)
for line in proc.stdout:
# print '[]', line.strip()
lines.append(line)
header, lines = db_util.parse(lines, 'job_id')
for line in lines:
line['start'] = str(self.get_date(int(line['start'])/1000))
line['stop'] = str(self.get_date(int(line['stop'])/1000))
db_util.format(header, lines)
return
if __name__ == "__main__":
stopper = DuccRmQProcesses()
stopper.main(sys.argv[1:])