blob: 39e1d609ed307e767074db62fd4b319253dbd056 [file] [log] [blame]
#! /usr/bin/env python
# -----------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# -----------------------------------------------------------------------
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# +
# + ducc_watcher
# +
# + purpose: send e-mail when a DUCC daemon overall state changes
# +
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
import ast
import datetime
import getpass
import json
#import logging
import logging.handlers
import os
import smtplib
import socket
import string
import sys
import time
import urllib2
#from optparse import HelpFormatter
#from optparse import OptionGroup
from optparse import OptionParser
# ----------------------------------------------
# Extend OptionParser class
class ExtendedOptionParser(OptionParser):
# override epilog formatter so
# that newlines are not deleted!
def format_epilog(self, formatter):
return self.epilog
# ----------------------------------------------
name = 'ducc_watcher'
version = 2.0
webserver_lifetime_millis = 0
webserver = 'Webserver'
head_daemons = [ 'Orchestrator', 'ResourceManager', 'Database', 'Broker', 'ProcessManager', 'ServiceManager', webserver ]
jda = 'JobDriverAllocation'
flag_info = True
flag_trace = False
logger = None
port = '42133'
path = None
log_file = None
state_file = None
flag_agents = False
flag_verbose = False
mail_host = 'localhost'
email_list = None
list_errors = []
list_warns = []
url_timeout = 30
# produce a time stamp
def get_timestamp():
tod = time.time()
timestamp = datetime.datetime.fromtimestamp(tod).strftime('%Y-%m-%d %H:%M:%S')
return timestamp
# get the host running this script
def get_host():
host = socket.gethostname()
return host
# get the user running this script
def get_user():
user = getpass.getuser()
return user
# make directories, if not already existing
def mkdirs(path):
debug('mkdirs: path='+path)
if(os.path.exists(path)):
return
try:
os.makedirs(path)
except Exception as e:
exception(e)
# info message to log
def info(text):
global logger
level = 'I'
line = get_timestamp()+' '+get_user()+'@'+get_host()+' '+level+' '+text
logger.info(line)
return line
# trace message to log
def trace(text):
global logger
global flag_trace
level = 'T'
line = get_timestamp()+' '+get_user()+'@'+get_host()+' '+level+' '+text
if(flag_trace):
logger.debug(line)
return line
# debug message to log
def debug(text):
global logger
level = 'D'
line = get_timestamp()+' '+get_user()+'@'+get_host()+' '+level+' '+text
logger.debug(line)
return line
# error message to log
def error(text):
global logger
global list_errors
level = 'E'
line = get_timestamp()+' '+get_user()+'@'+get_host()+' '+level+' '+text
logger.error(line)
list_errors.append(line)
return line
# warn message to log
def warn(text):
global logger
global list_warns
level = 'W'
line = get_timestamp()+' '+get_user()+'@'+get_host()+' '+level+' '+text
logger.warn(line)
list_warns.append(line)
return line
# _exit
def _exit(code):
text = 'exit code='+str(code)
email(text)
error(text)
sys.exit(code)
# exception
def exception(e):
line = error(str(e))
return line
# epilog for --help
def get_epilog():
epilog = ''
return epilog
# debug is normally not set
def validate_debug(options):
global logger
if(options.flag_debug):
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
# consider head node daemons only
# unless --agents is specified
def validate_agents(options):
global flag_agents
if(options.flag_agents):
flag_agents = True
# don't reduce noise
def validate_verbose(options):
global flag_verbose
if(options.flag_verbose):
flag_verbose = True
# ignore job driver allocation
# unless --job-driver-allocation is specified
def validate_job_driver_allocation(options):
global job_driver_allocation
job_driver_allocation = options.job_driver_allocation
# use /tmp/<userid> as log+state directory
# unless --path is specified
def validate_path(options):
if(options.path == None):
options.path = '/tmp'+'/'+get_user()
mkdirs(options.path)
# setup rotating log file handler with
# 8 versions of 8M bytes with base name
# ducc_watcher.<target>.log
def setup_log_file(options):
global name
global target
global logger
log_file = options.path
if(not log_file.endswith('/')):
log_file = log_file + '/'
log_file = log_file + name + '.' + target +'.log'
handler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=8*1024*1024, backupCount=8)
logger.addHandler(handler)
debug('log_file: '+log_file)
# ducc_watcher.<target>.state
def setup_state_file(options):
global name
global target
global state_file
state_file = options.path
if(not state_file.endswith('/')):
state_file = state_file + '/'
state_file = state_file + name + '.' + target +'.state'
debug('state_file: '+state_file)
# must specify --target host:port of WS for fetching
# of daemons status
def validate_target(options):
global port
global target
global ducc_url_base
global ducc_url_servlet_system_daemons_data
global ducc_url_servlet_reservations_data
protocol = 'http://'
if(options.target == None):
error('required "target" not specified')
_exit(1)
target = options.target
if(':' not in target):
target = target+':'+str(port)
if(target.startswith(protocol)):
target = target.replace(protocol,'',1)
ducc_url_base = protocol+target
#
servlet = '/ducc-servlet/json-format-aaData-daemons'
ducc_url_servlet_system_daemons_data = protocol+target+servlet
debug('target: '+ducc_url_servlet_system_daemons_data)
#
servlet = '/ducc-servlet/json-format-reservations'+'?maxRecords=1024&stateType=Active'
ducc_url_servlet_reservations_data = protocol+target+servlet
debug('target: '+ducc_url_servlet_reservations_data)
# mail host, if any
def validate_mail_host(options):
global mail_host
if(not options.mail_host == None):
mail_host = options.mail_host
debug('mail-host: '+str(mail_host))
# list of e-mail recipients, if any
def validate_email_list(options):
global email_list
if(not options.email_list == None):
email_list = options.email_list.split()
debug('email-list: '+str(email_list))
# parse command line
def parse_cmdline():
global name
global mail_host
parser = ExtendedOptionParser(epilog=get_epilog())
width = 45
parser.formatter.help_position = width
parser.formatter.max_help_position = width
parser.add_option('-a','--agents', action='store_true', dest='flag_agents', default=False,
help='include agents')
parser.add_option('-d','--debug', action='store_true', dest='flag_debug', default=False,
help='display debugging messages')
parser.add_option('-e','--email-list', action='store', dest='email_list', default=None,
help='blank separated list of email addresses to receive down + error notifications')
parser.add_option('-j','--job-driver-allocation', action='store', dest='job_driver_allocation', default=None,
help='check job driver allocation for specified class')
parser.add_option('-m','--mail-host', action='store', dest='mail_host', default=None,
help='mail host (default='+mail_host+')')
parser.add_option('-p','--path', action='store', dest='path', default=None,
help='path to directory where log and state information are written, default is /tmp'+'/'+get_user())
parser.add_option('-t','--target', action='store', dest='target', default=None,
help='[REQUIRED] <host> with default port of '+port+' or <host>:<port>')
parser.add_option('-v','--verbose', action='store_true', dest='flag_verbose', default=False,
help='do not reduce noise (in log file)')
(options, args) = parser.parse_args()
#
debug(str(options))
debug(str(args))
# -d
validate_debug(options)
# -t
validate_target(options)
# -e
validate_email_list(options)
# -m
validate_mail_host(options)
# -p
validate_path(options)
# dependencies
setup_log_file(options)
setup_state_file(options)
# -a
validate_agents(options)
# -v
validate_verbose(options)
# -j
validate_job_driver_allocation(options)
# determine if named daemon is one of the head node ones
def is_head(key):
global head_daemons
retVal = False
if(key in head_daemons):
retVal = True
return retVal
def is_jda(key):
global jda
retVal = False
if(key == jda):
retVal = True
return retVal
def is_key(key):
retVal = False
if(is_head(key)):
retVal = True
elif(is_jda(key)):
retVal = True
return retVal
# get rid of noise. remove if
# 1. state unknown
# 2. if agent and 'up' and not verbose mode
# 3. if agent and agents are not wanted
def filter_state(state_dict):
global flag_agents
global flag_verbose
retVal = {}
for key in state_dict:
value = state_dict[key]
head = is_key(key)
agent = not head
if(value == 'unknown'):
#print 'remove', key ,value
pass
elif(agent and (value == 'up') and (not flag_verbose)):
#print 'remove', key ,value
pass
else:
if(head):
retVal[key] = value
elif(flag_agents):
retVal[key] = value
else:
#print 'remove', key ,value
pass
return retVal
# read previous daemons state
def init_state_previous():
global state_file
try:
with open(state_file, 'r') as f:
f.read()
except:
with open(state_file, 'w') as f:
f.seek(0)
f.write('{}'+'\n')
f.truncate()
# read precious daemons state
def read_state_previous():
global state_dict_previous
global state_file
state_dict_previous = {}
try:
with open(state_file, 'r') as f:
s = f.read()
state_dict_previous = ast.literal_eval(s)
debug('state_previous(read): '+str(state_dict_previous))
state_dict_previous = filter_state(state_dict_previous)
debug('state_previous(filter): '+str(state_dict_previous))
except Exception as e:
error('unable to read state from '+state_file)
exception(e)
# current becomes previous daemons state
def write_state_current():
global state_dict_current
global state_file
try:
with open(state_file, 'w') as f:
f.seek(0)
f.write(str(state_dict_current)+'\n')
f.truncate()
debug('state_previous(write): '+str(state_dict_current))
except Exception as e:
error('unable to write state to '+state_file)
exception(e)
# remove <span></span> html decorations
def _undecorate(text):
retVal = text
if(text != None):
item = text
index = item.find('</span>')
if(index > 0):
item = item[:index]
index = item.find('>')
if(index > 0):
item = item[index+1:]
retVal = item
return retVal
def toMillis(dts):
debug(dts)
d = dts.split(' ')[0]
t= dts.split(' ')[1]
dt = datetime.datetime.strptime(d+' '+t, '%Y.%m.%d %H:%M:%S').strftime('%s')
ms = int(dt)*1000
debug(str(ms))
return ms
def elapsedMillis(dts):
then = toMillis(dts)
now = 1000*time.time()
diff = now - then
text = str(dts)+' '+str(diff)
debug(text)
return diff
# fetch daemons state
# col[0] = Status
# col[1] = Daemon Name
# col[2] = Boot Time
# col[3] = Host IP
# col[4] = Host Name
def fetch_state_daemons():
global flag_agents
global state_dict_current
global ducc_url_servlet_system_daemons_data
global webserver
global webserver_lifetime_millis
global url_timeout
state_dict_current = {}
daemons = {}
try:
opener = urllib2.build_opener()
if(flag_agents):
opener.addheaders.append(('Cookie', 'DUCCagents=show'))
response = opener.open(ducc_url_servlet_system_daemons_data, timeout=url_timeout)
data = response.read()
jdata = json.loads(data)['aaData']
for row in jdata:
if(len(row) > 4):
status = _undecorate(row[0])
daemon = row[1]
date = row[2]
if(daemon == webserver):
webserver_lifetime_millis = elapsedMillis(date)
if(daemon == 'Agent'):
daemon = daemon+'@'+row[4]
daemons[daemon] = status
#print "data="+str(jdata)
for daemon in daemons:
status = daemons[daemon]
debug(daemon+':'+' '+status+' ')
state_dict_current[daemon] = status
debug('state_current(read): '+str(state_dict_current))
state_dict_current = filter_state(state_dict_current)
debug('state_current(filter): '+str(state_dict_current))
except Exception as e:
# for WS status to down whenever contact fails
daemon = webserver
status = 'unreachable'
state_dict_current[daemon] = status
error('unable to fetch data from '+ducc_url_servlet_system_daemons_data)
exception(e)
debug('state_current: '+str(state_dict_current))
# check if RM is 'up'
def rm_up():
global state_dict_current
retVal = False
debug(str(state_dict_current))
if(state_dict_current != None):
rm_state = get_state(state_dict_current,'ResourceManager')
if(rm_state == 'up'):
retVal = True
return retVal
# fetch job driver allocation
def fetch_state_job_driver_allocation():
global state_dict_current
global jda
global job_driver_allocation
global ducc_url_servlet_reservations_data
global webserver
global url_timeout
try:
if(job_driver_allocation != None):
if(rm_up()):
opener = urllib2.build_opener()
debug(ducc_url_servlet_reservations_data)
response = opener.open(ducc_url_servlet_reservations_data, timeout=url_timeout)
data = response.read()
debug(data)
json_data = json.loads(data)
count = 0
if(json_data != None):
reservations_list = json_data
debug(str(len(reservations_list)))
rcount = 0
for reservation in reservations_list:
res_id = reservation['id']
rclass = reservation['rclass']
state = reservation['state']
rcount = rcount + 1
if(rclass == job_driver_allocation):
state = reservation['state']
if(state == 'Assigned'):
count = count+1
text = 'id:'+str(res_id)+' '+'class:'+rclass+' '+'state:'+state
debug(text)
if(count == 0):
state_dict_current[jda] = 'not assigned'
else:
state_dict_current[jda] = 'assigned'
debug(state_dict_current[jda])
except Exception as e:
# for WS status to down whenever contact fails
daemon = webserver
status = 'unreachable'
state_dict_current[daemon] = status
error('unable to fetch data from '+ducc_url_servlet_reservations_data)
exception(e)
# fetch current state
def fetch_state_current():
fetch_state_daemons()
fetch_state_job_driver_allocation()
# determine state summary overall:
# { "up", "up, JD allocation pending", "down", "unknown", "no data" },
# and list the daemons in each state
def summarize_state(state_dict):
up = []
down = []
unknown = []
jd = []
for key in state_dict:
state = state_dict.get(key, '?')
if(state == 'unreachable'):
down.append(key)
elif(state == 'down'):
down.append(key)
elif(state == 'up'):
up.append(key)
elif(state == 'unknown'):
unknown.append(key)
elif(state == 'not assigned'):
jd = [ 'not assigned' ]
elif(state == 'assigned'):
jd = []
else:
warn(key+'='+state)
if(len(down) > 0):
overall = 'down'
elif(len(unknown) > 0):
overall = 'unknown'
elif(len(jd) > 0):
overall = 'up, JD allocation pending...'
elif(len(up) > 0):
overall = 'up'
else:
overall = 'no data'
summary = { 'overall':overall, 'up': up, 'down':down, 'unknown':unknown, 'jd':jd }
debug(str(summary))
return summary
# summarize state previous and current
def summarize():
global state_dict_current
global state_dict_previous
global summary_current
global summary_previous
summary_previous = summarize_state(state_dict_previous)
info('previous: '+str(summary_previous))
summary_current = summarize_state(state_dict_current)
info('current: '+str(summary_current))
# send email
def email(HOST, SUBJECT, TO, FROM, TEXT):
try:
BODY = string.join((
"From: %s" % FROM,
"To: %s" % TO,
"Subject: %s" % SUBJECT ,
"",
TEXT
), "\r\n")
server = smtplib.SMTP(HOST)
server.sendmail(FROM, [TO], BODY)
server.quit()
info('sent: ['+TO+'] '+TEXT)
except Exception as e:
error('not sent: ['+TO+'] '+TEXT)
exception(e)
# send email
def email_to_list(HOST, SUBJECT, TO_LIST, FROM, TEXT):
if(TO_LIST == None):
info('e-mail list empty')
else:
for TO in TO_LIST:
email(HOST, SUBJECT, TO, FROM, TEXT)
# wait to WS to be up > 60 seconds to complain about overall state 'unknown'
def is_reportable(overall_current):
global webserver_lifetime_millis
retVal = True
try:
if(webserver_lifetime_millis < 60*1000):
if(overall_current == 'unknown'):
retVal = False
except:
pass
debug('reportable: '+str(retVal))
return retVal
# e-mail message subject
def get_subject(status):
global ducc_url_base
subject = 'DUCC'+' '+'status='+status+' '+ducc_url_base
return subject
def get_lines(LIST):
LINES = ''
if(len(LIST) > 0):
for item in LIST:
LINES = LINES+item+'\n'
LINES = LINES
return LINES
def get_errors():
global list_errors
return get_lines(list_errors)
def get_warnings():
global list_warns
return get_lines(list_warns)
def toString(LIST):
retVal = ''
for item in LIST:
retVal = retVal+item+' '
return retVal
def add_details(key):
global summary_current
details = ''
result = get_state(summary_current,key)
debug(key+'='+str(result))
if(result != None):
if(len(result) > 0):
details = key+':'+' '+toString(result)+'\n'
return details
# e-mail message body
def get_body(text):
global name
global ducc_url_base
sender = get_user()+'@'+get_host()
body = '['+sender+']'+' '+name+' '+'reports'+' '+ducc_url_base+' '+'state change:'+' '+text+'\n\n'
body = body+add_details('down')
body = body+add_details('unknown')
return body
def get_state(state_dict,key):
try:
state = state_dict[key]
except:
state = None
debug(str(state))
return state
# e-mail state changes, if any
def email_state_changes():
global summary_current
global summary_previous
global mail_host
global email_list
key = 'overall'
overall_previous = get_state(summary_previous,key)
info('previous: '+key+'='+overall_previous)
overall_current = get_state(summary_current,key)
info('current: '+key+'='+overall_current)
if(overall_previous != overall_current):
if(is_reportable(overall_current)):
status = overall_current
TIME = get_timestamp()
subject = get_subject(status)
HOST = mail_host
SUBJECT = subject
TO_LIST = email_list
FROM = get_user()+'@'+get_host()
TEXT = TIME+' '+get_body(str(status))
TEXT = get_warnings()+TEXT
TEXT = get_errors()+TEXT
email_to_list(HOST, SUBJECT, TO_LIST, FROM, TEXT)
else:
debug('not reportable')
else:
debug('no state change')
# assemble console display info comprising problems + rc
def eval():
global state_dict_current
global head_daemons
problems = {}
rc = 0
#print state_dict_current
for key, value in state_dict_current.items():
if(is_jda(key)):
pass
elif(value == 'up'):
pass
else:
if key in head_daemons:
if(rc < 2):
rc = 2
else:
if(rc < 1):
rc = 1
problems[key] = value
return rc, problems
# check for DUCC daemon status changes
def main(argv):
global logger
try:
logger = logging.getLogger('logger')
#handler = logging.StreamHandler(sys.stdout)
handler = logging.FileHandler('/dev/null')
logger.addHandler(handler)
parse_cmdline()
init_state_previous()
read_state_previous()
fetch_state_current()
summarize()
write_state_current()
email_state_changes()
rc, problems = eval()
ts = get_timestamp()
for daemon, status in problems.iteritems():
print ts, daemon, status
print ts, 'rc='+str(rc)
sys.exit(rc)
except Exception as e:
error('exception in main')
exception(e)
rc = -1
sys.exit(rc)
if __name__ == '__main__':
main(sys.argv[1:])