| #! /usr/bin/env python |
| # ----------------------------------------------------------------------- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # ----------------------------------------------------------------------- |
| |
| # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
| # + |
| # + ducc_watcher |
| # + |
| # + purpose: send e-mail when a DUCC daemon overall state changes |
| # + |
| # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
| |
| import ast |
| import datetime |
| import getpass |
| import json |
| #import logging |
| import logging.handlers |
| import os |
| import smtplib |
| import socket |
| import ssl |
| import string |
| import sys |
| import time |
| import urllib2 |
| |
| #from optparse import HelpFormatter |
| #from optparse import OptionGroup |
| from optparse import OptionParser |
| |
| # ---------------------------------------------- |
| |
| # Extend OptionParser class |
| class ExtendedOptionParser(OptionParser): |
| # override epilog formatter so |
| # that newlines are not deleted! |
| def format_epilog(self, formatter): |
| return self.epilog |
| |
| # ---------------------------------------------- |
| |
| name = 'ducc_watcher' |
| |
| version = 2.0 |
| |
| webserver_lifetime_millis = 0 |
| |
| webserver = 'Webserver' |
| head_daemons = [ 'Orchestrator', 'ResourceManager', 'Database', 'Broker', 'ProcessManager', 'ServiceManager', webserver ] |
| |
| jda = 'JobDriverAllocation' |
| |
| flag_info = True |
| flag_trace = False |
| logger = None |
| |
| port = '42133' |
| |
| path = None |
| log_file = None |
| state_file = None |
| |
| flag_agents = False |
| flag_verbose = False |
| |
| flag_ssl_bypass = False |
| |
| mail_host = 'localhost' |
| email_list = None |
| |
| list_errors = [] |
| list_warns = [] |
| |
| url_timeout = 30 |
| |
| # produce a time stamp |
| def get_timestamp(): |
| tod = time.time() |
| timestamp = datetime.datetime.fromtimestamp(tod).strftime('%Y-%m-%d %H:%M:%S') |
| return timestamp |
| |
| # get the host running this script |
| def get_host(): |
| host = socket.gethostname() |
| return host |
| |
| # get the user running this script |
| def get_user(): |
| user = getpass.getuser() |
| return user |
| |
| # make directories, if not already existing |
| def mkdirs(path): |
| debug('mkdirs: path='+path) |
| if(os.path.exists(path)): |
| return |
| try: |
| os.makedirs(path) |
| except Exception as e: |
| exception(e) |
| |
| # info message to log |
| def info(text): |
| global logger |
| level = 'I' |
| line = get_timestamp()+' '+get_user()+'@'+get_host()+' '+level+' '+text |
| logger.info(line) |
| return line |
| |
| # trace message to log |
| def trace(text): |
| global logger |
| global flag_trace |
| level = 'T' |
| line = get_timestamp()+' '+get_user()+'@'+get_host()+' '+level+' '+text |
| if(flag_trace): |
| logger.debug(line) |
| return line |
| |
| # debug message to log |
| def debug(text): |
| global logger |
| level = 'D' |
| line = get_timestamp()+' '+get_user()+'@'+get_host()+' '+level+' '+text |
| logger.debug(line) |
| return line |
| |
| # error message to log |
| def error(text): |
| global logger |
| global list_errors |
| level = 'E' |
| line = get_timestamp()+' '+get_user()+'@'+get_host()+' '+level+' '+text |
| logger.error(line) |
| list_errors.append(line) |
| return line |
| |
| # warn message to log |
| def warn(text): |
| global logger |
| global list_warns |
| level = 'W' |
| line = get_timestamp()+' '+get_user()+'@'+get_host()+' '+level+' '+text |
| logger.warn(line) |
| list_warns.append(line) |
| return line |
| |
| # _exit |
| def _exit(code): |
| text = 'exit code='+str(code) |
| email(text) |
| error(text) |
| sys.exit(code) |
| |
| # exception |
| def exception(e): |
| line = error(str(e)) |
| return line |
| |
| # epilog for --help |
| def get_epilog(): |
| epilog = '' |
| return epilog |
| |
| # debug is normally not set |
| def validate_debug(options): |
| global logger |
| if(options.flag_debug): |
| logger.setLevel(logging.DEBUG) |
| else: |
| logger.setLevel(logging.INFO) |
| |
| # consider head node daemons only |
| # unless --agents is specified |
| def validate_agents(options): |
| global flag_agents |
| if(options.flag_agents): |
| flag_agents = True |
| |
| # don't reduce noise |
| def validate_verbose(options): |
| global flag_verbose |
| if(options.flag_verbose): |
| flag_verbose = True |
| |
| # bypass ssl certification verfication |
| def validate_ssl_bypass(options): |
| global flag_ssl_bypass |
| if(options.flag_ssl_bypass): |
| flag_ssl_bypass = True |
| |
| # ignore job driver allocation |
| # unless --job-driver-allocation is specified |
| def validate_job_driver_allocation(options): |
| global job_driver_allocation |
| job_driver_allocation = options.job_driver_allocation |
| |
| # use /tmp/<userid> as log+state directory |
| # unless --path is specified |
| def validate_path(options): |
| if(options.path == None): |
| options.path = '/tmp'+'/'+get_user() |
| mkdirs(options.path) |
| |
| # setup rotating log file handler with |
| # 8 versions of 8M bytes with base name |
| # ducc_watcher.<target>.log |
| def setup_log_file(options): |
| global name |
| global target |
| global logger |
| log_file = options.path |
| if(not log_file.endswith('/')): |
| log_file = log_file + '/' |
| log_file = log_file + name + '.' + target +'.log' |
| handler = logging.handlers.RotatingFileHandler( |
| log_file, maxBytes=8*1024*1024, backupCount=8) |
| logger.addHandler(handler) |
| debug('log_file: '+log_file) |
| |
| # ducc_watcher.<target>.state |
| def setup_state_file(options): |
| global name |
| global target |
| global state_file |
| state_file = options.path |
| if(not state_file.endswith('/')): |
| state_file = state_file + '/' |
| state_file = state_file + name + '.' + target +'.state' |
| debug('state_file: '+state_file) |
| |
| # must specify --target host:port of WS for fetching |
| # of daemons status |
| def validate_target(options): |
| global port |
| global target |
| global ducc_url_base |
| global ducc_url_servlet_system_daemons_data |
| global ducc_url_servlet_reservations_data |
| protocol_https = 'https://' |
| protocol_http = 'http://' |
| protocol = protocol_http |
| if(options.target == None): |
| error('required "target" not specified') |
| _exit(1) |
| target = options.target |
| if(':' not in target): |
| target = target+':'+str(port) |
| if(target.startswith(protocol_https)): |
| target = target.replace(protocol_https,'',1) |
| protocol = protocol_https |
| elif(target.startswith(protocol_http)): |
| target = target.replace(protocol_http,'',1) |
| #protocol = protocol_http |
| ducc_url_base = protocol+target |
| # |
| servlet = '/ducc-servlet/json-format-aaData-daemons' |
| ducc_url_servlet_system_daemons_data = protocol+target+servlet |
| debug('target: '+ducc_url_servlet_system_daemons_data) |
| # |
| servlet = '/ducc-servlet/json-format-reservations'+'?maxRecords=1024&stateType=Active' |
| ducc_url_servlet_reservations_data = protocol+target+servlet |
| debug('target: '+ducc_url_servlet_reservations_data) |
| |
| # mail host, if any |
| def validate_mail_host(options): |
| global mail_host |
| if(not options.mail_host == None): |
| mail_host = options.mail_host |
| debug('mail-host: '+str(mail_host)) |
| |
| # list of e-mail recipients, if any |
| def validate_email_list(options): |
| global email_list |
| if(not options.email_list == None): |
| email_list = options.email_list.split() |
| debug('email-list: '+str(email_list)) |
| |
| # parse command line |
| def parse_cmdline(): |
| global name |
| global mail_host |
| parser = ExtendedOptionParser(epilog=get_epilog()) |
| width = 45 |
| parser.formatter.help_position = width |
| parser.formatter.max_help_position = width |
| parser.add_option('-a','--agents', action='store_true', dest='flag_agents', default=False, |
| help='include agents') |
| parser.add_option('-d','--debug', action='store_true', dest='flag_debug', default=False, |
| help='display debugging messages') |
| parser.add_option('-e','--email-list', action='store', dest='email_list', default=None, |
| help='blank separated list of email addresses to receive down + error notifications') |
| parser.add_option('-j','--job-driver-allocation', action='store', dest='job_driver_allocation', default=None, |
| help='check job driver allocation for specified class') |
| parser.add_option('-m','--mail-host', action='store', dest='mail_host', default=None, |
| help='mail host (default='+mail_host+')') |
| parser.add_option('-p','--path', action='store', dest='path', default=None, |
| help='path to directory where log and state information are written, default is /tmp'+'/'+get_user()) |
| parser.add_option('-s','--ssl-bypass', action='store_true', dest='flag_ssl_bypass', default=False, |
| help='bypass SSL certificate verification (not recommended)') |
| parser.add_option('-t','--target', action='store', dest='target', default=None, |
| help='[REQUIRED] <host> with default port of '+port+' or <host>:<port> or http://<host>:<port> or https://<host>:<port>') |
| parser.add_option('-v','--verbose', action='store_true', dest='flag_verbose', default=False, |
| help='do not reduce noise (in log file)') |
| |
| (options, args) = parser.parse_args() |
| # |
| debug(str(options)) |
| debug(str(args)) |
| # -d |
| validate_debug(options) |
| # -t |
| validate_target(options) |
| # -e |
| validate_email_list(options) |
| # -m |
| validate_mail_host(options) |
| # -p |
| validate_path(options) |
| # dependencies |
| setup_log_file(options) |
| setup_state_file(options) |
| # -a |
| validate_agents(options) |
| # -v |
| validate_verbose(options) |
| # -s |
| validate_ssl_bypass(options) |
| # -j |
| validate_job_driver_allocation(options) |
| |
| # determine if named daemon is one of the head node ones |
| def is_head(key): |
| global head_daemons |
| retVal = False |
| if(key in head_daemons): |
| retVal = True |
| return retVal |
| |
| def is_jda(key): |
| global jda |
| retVal = False |
| if(key == jda): |
| retVal = True |
| return retVal |
| |
| def is_key(key): |
| retVal = False |
| if(is_head(key)): |
| retVal = True |
| elif(is_jda(key)): |
| retVal = True |
| return retVal |
| |
| # get rid of noise. remove if |
| # 1. state unknown |
| # 2. if agent and 'up' and not verbose mode |
| # 3. if agent and agents are not wanted |
| def filter_state(state_dict): |
| global flag_agents |
| global flag_verbose |
| retVal = {} |
| for key in state_dict: |
| value = state_dict[key] |
| head = is_key(key) |
| agent = not head |
| if(value == 'unknown'): |
| #print 'remove', key ,value |
| pass |
| elif(agent and (value == 'up') and (not flag_verbose)): |
| #print 'remove', key ,value |
| pass |
| else: |
| if(head): |
| retVal[key] = value |
| elif(flag_agents): |
| retVal[key] = value |
| else: |
| #print 'remove', key ,value |
| pass |
| return retVal |
| |
| # read previous daemons state |
| def init_state_previous(): |
| global state_file |
| try: |
| with open(state_file, 'r') as f: |
| f.read() |
| except: |
| with open(state_file, 'w') as f: |
| f.seek(0) |
| f.write('{}'+'\n') |
| f.truncate() |
| |
| # read precious daemons state |
| def read_state_previous(): |
| global state_dict_previous |
| global state_file |
| state_dict_previous = {} |
| try: |
| with open(state_file, 'r') as f: |
| s = f.read() |
| state_dict_previous = ast.literal_eval(s) |
| debug('state_previous(read): '+str(state_dict_previous)) |
| state_dict_previous = filter_state(state_dict_previous) |
| debug('state_previous(filter): '+str(state_dict_previous)) |
| except Exception as e: |
| error('unable to read state from '+state_file) |
| exception(e) |
| |
| # current becomes previous daemons state |
| def write_state_current(): |
| global state_dict_current |
| global state_file |
| try: |
| with open(state_file, 'w') as f: |
| f.seek(0) |
| f.write(str(state_dict_current)+'\n') |
| f.truncate() |
| debug('state_previous(write): '+str(state_dict_current)) |
| except Exception as e: |
| error('unable to write state to '+state_file) |
| exception(e) |
| |
| # remove <span></span> html decorations |
| def _undecorate(text): |
| retVal = text |
| if(text != None): |
| item = text |
| index = item.find('</span>') |
| if(index > 0): |
| item = item[:index] |
| index = item.find('>') |
| if(index > 0): |
| item = item[index+1:] |
| retVal = item |
| return retVal |
| |
| def toMillis(dts): |
| debug(dts) |
| d = dts.split(' ')[0] |
| t= dts.split(' ')[1] |
| dt = datetime.datetime.strptime(d+' '+t, '%Y.%m.%d %H:%M:%S').strftime('%s') |
| ms = int(dt)*1000 |
| debug(str(ms)) |
| return ms |
| |
| def elapsedMillis(dts): |
| then = toMillis(dts) |
| now = 1000*time.time() |
| diff = now - then |
| text = str(dts)+' '+str(diff) |
| debug(text) |
| return diff |
| |
| def open_url(url): |
| global url_timeout |
| global flag_ssl_bypass |
| if(flag_ssl_bypass and url.startswith('https')): |
| context = ssl._create_unverified_context() |
| response = urllib2.urlopen(url, timeout=url_timeout, context=context) |
| else: |
| response = urllib2.urlopen(url, timeout=url_timeout) |
| return response |
| |
| # fetch daemons state |
| # col[0] = Status |
| # col[1] = Daemon Name |
| # col[2] = Boot Time |
| # col[3] = Host IP |
| # col[4] = Host Name |
| def fetch_state_daemons(): |
| global flag_agents |
| global state_dict_current |
| global ducc_url_servlet_system_daemons_data |
| global webserver |
| global webserver_lifetime_millis |
| |
| state_dict_current = {} |
| daemons = {} |
| try: |
| opener = urllib2.build_opener() |
| urllib2.install_opener(opener) |
| if(flag_agents): |
| opener.addheaders.append(('Cookie', 'DUCCagents=show')) |
| response = open_url(ducc_url_servlet_system_daemons_data) |
| data = response.read() |
| jdata = json.loads(data)['aaData'] |
| for row in jdata: |
| if(len(row) > 4): |
| status = _undecorate(row[0]) |
| daemon = row[1] |
| date = row[2] |
| if(daemon == webserver): |
| webserver_lifetime_millis = elapsedMillis(date) |
| if(daemon == 'Agent'): |
| daemon = daemon+'@'+row[4] |
| daemons[daemon] = status |
| #print "data="+str(jdata) |
| for daemon in daemons: |
| status = daemons[daemon] |
| debug(daemon+':'+' '+status+' ') |
| state_dict_current[daemon] = status |
| debug('state_current(read): '+str(state_dict_current)) |
| state_dict_current = filter_state(state_dict_current) |
| debug('state_current(filter): '+str(state_dict_current)) |
| except Exception as e: |
| # for WS status to down whenever contact fails |
| daemon = webserver |
| status = 'unreachable' |
| state_dict_current[daemon] = status |
| error('unable to fetch data from '+ducc_url_servlet_system_daemons_data) |
| exception(e) |
| debug('state_current: '+str(state_dict_current)) |
| |
| # check if RM is 'up' |
| def rm_up(): |
| global state_dict_current |
| retVal = False |
| debug(str(state_dict_current)) |
| if(state_dict_current != None): |
| rm_state = get_state(state_dict_current,'ResourceManager') |
| if(rm_state == 'up'): |
| retVal = True |
| return retVal |
| |
| # fetch job driver allocation |
| def fetch_state_job_driver_allocation(): |
| global state_dict_current |
| global jda |
| global job_driver_allocation |
| global ducc_url_servlet_reservations_data |
| global webserver |
| global url_timeout |
| try: |
| if(job_driver_allocation != None): |
| if(rm_up()): |
| opener = urllib2.build_opener() |
| urllib2.install_opener(opener) |
| debug(ducc_url_servlet_reservations_data) |
| response = open_url(ducc_url_servlet_reservations_data) |
| data = response.read() |
| debug(data) |
| json_data = json.loads(data) |
| count = 0 |
| if(json_data != None): |
| reservations_list = json_data |
| debug(str(len(reservations_list))) |
| rcount = 0 |
| for reservation in reservations_list: |
| res_id = reservation['id'] |
| rclass = reservation['rclass'] |
| state = reservation['state'] |
| rcount = rcount + 1 |
| if(rclass == job_driver_allocation): |
| state = reservation['state'] |
| if(state == 'Assigned'): |
| count = count+1 |
| text = 'id:'+str(res_id)+' '+'class:'+rclass+' '+'state:'+state |
| debug(text) |
| if(count == 0): |
| state_dict_current[jda] = 'not assigned' |
| else: |
| state_dict_current[jda] = 'assigned' |
| debug(state_dict_current[jda]) |
| except Exception as e: |
| # for WS status to down whenever contact fails |
| daemon = webserver |
| status = 'unreachable' |
| state_dict_current[daemon] = status |
| error('unable to fetch data from '+ducc_url_servlet_reservations_data) |
| exception(e) |
| |
| # fetch current state |
| def fetch_state_current(): |
| fetch_state_daemons() |
| fetch_state_job_driver_allocation() |
| |
| # determine state summary overall: |
| # { "up", "up, JD allocation pending", "down", "unknown", "no data" }, |
| # and list the daemons in each state |
| def summarize_state(state_dict): |
| up = [] |
| down = [] |
| unknown = [] |
| jd = [] |
| for key in state_dict: |
| state = state_dict.get(key, '?') |
| if(state == 'unreachable'): |
| down.append(key) |
| elif(state == 'down'): |
| down.append(key) |
| elif(state == 'up'): |
| up.append(key) |
| elif(state == 'unknown'): |
| unknown.append(key) |
| elif(state == 'not assigned'): |
| jd = [ 'not assigned' ] |
| elif(state == 'assigned'): |
| jd = [] |
| else: |
| warn(key+'='+state) |
| if(len(down) > 0): |
| overall = 'down' |
| elif(len(unknown) > 0): |
| overall = 'unknown' |
| elif(len(jd) > 0): |
| overall = 'up, JD allocation pending...' |
| elif(len(up) > 0): |
| overall = 'up' |
| else: |
| overall = 'no data' |
| summary = { 'overall':overall, 'up': up, 'down':down, 'unknown':unknown, 'jd':jd } |
| debug(str(summary)) |
| return summary |
| |
| # summarize state previous and current |
| def summarize(): |
| global state_dict_current |
| global state_dict_previous |
| global summary_current |
| global summary_previous |
| summary_previous = summarize_state(state_dict_previous) |
| info('previous: '+str(summary_previous)) |
| summary_current = summarize_state(state_dict_current) |
| info('current: '+str(summary_current)) |
| |
| # send email |
| def email(HOST, SUBJECT, TO, FROM, TEXT): |
| try: |
| BODY = string.join(( |
| "From: %s" % FROM, |
| "To: %s" % TO, |
| "Subject: %s" % SUBJECT , |
| "", |
| TEXT |
| ), "\r\n") |
| server = smtplib.SMTP(HOST) |
| server.sendmail(FROM, [TO], BODY) |
| server.quit() |
| info('sent: ['+TO+'] '+TEXT) |
| except Exception as e: |
| error('not sent: ['+TO+'] '+TEXT) |
| exception(e) |
| |
| # send email |
| def email_to_list(HOST, SUBJECT, TO_LIST, FROM, TEXT): |
| if(TO_LIST == None): |
| info('e-mail list empty') |
| else: |
| for TO in TO_LIST: |
| email(HOST, SUBJECT, TO, FROM, TEXT) |
| |
| # wait to WS to be up > 60 seconds to complain about overall state 'unknown' |
| def is_reportable(overall_current): |
| global webserver_lifetime_millis |
| retVal = True |
| try: |
| if(webserver_lifetime_millis < 60*1000): |
| if(overall_current == 'unknown'): |
| retVal = False |
| except: |
| pass |
| debug('reportable: '+str(retVal)) |
| return retVal |
| |
| # e-mail message subject |
| def get_subject(status): |
| global ducc_url_base |
| subject = 'DUCC'+' '+'status='+status+' '+ducc_url_base |
| return subject |
| |
| def get_lines(LIST): |
| LINES = '' |
| if(len(LIST) > 0): |
| for item in LIST: |
| LINES = LINES+item+'\n' |
| LINES = LINES |
| return LINES |
| |
| def get_errors(): |
| global list_errors |
| return get_lines(list_errors) |
| |
| def get_warnings(): |
| global list_warns |
| return get_lines(list_warns) |
| |
| def toString(LIST): |
| retVal = '' |
| for item in LIST: |
| retVal = retVal+item+' ' |
| return retVal |
| |
| def add_details(key): |
| global summary_current |
| details = '' |
| result = get_state(summary_current,key) |
| debug(key+'='+str(result)) |
| if(result != None): |
| if(len(result) > 0): |
| details = key+':'+' '+toString(result)+'\n' |
| return details |
| |
| # e-mail message body |
| def get_body(text): |
| global name |
| global ducc_url_base |
| sender = get_user()+'@'+get_host() |
| body = '['+sender+']'+' '+name+' '+'reports'+' '+ducc_url_base+' '+'state change:'+' '+text+'\n\n' |
| body = body+add_details('down') |
| body = body+add_details('unknown') |
| return body |
| |
| def get_state(state_dict,key): |
| try: |
| state = state_dict[key] |
| except: |
| state = None |
| debug(str(state)) |
| return state |
| |
| # e-mail state changes, if any |
| def email_state_changes(): |
| global summary_current |
| global summary_previous |
| global mail_host |
| global email_list |
| key = 'overall' |
| overall_previous = get_state(summary_previous,key) |
| info('previous: '+key+'='+overall_previous) |
| overall_current = get_state(summary_current,key) |
| info('current: '+key+'='+overall_current) |
| if(overall_previous != overall_current): |
| if(is_reportable(overall_current)): |
| status = overall_current |
| TIME = get_timestamp() |
| subject = get_subject(status) |
| HOST = mail_host |
| SUBJECT = subject |
| TO_LIST = email_list |
| FROM = get_user()+'@'+get_host() |
| TEXT = TIME+' '+get_body(str(status)) |
| TEXT = get_warnings()+TEXT |
| TEXT = get_errors()+TEXT |
| email_to_list(HOST, SUBJECT, TO_LIST, FROM, TEXT) |
| else: |
| debug('not reportable') |
| else: |
| debug('no state change') |
| |
| # assemble console display info comprising problems + rc |
| def eval(): |
| global state_dict_current |
| global head_daemons |
| problems = {} |
| rc = 0 |
| #print state_dict_current |
| for key, value in state_dict_current.items(): |
| if(is_jda(key)): |
| pass |
| elif(value == 'up'): |
| pass |
| else: |
| if key in head_daemons: |
| if(rc < 2): |
| rc = 2 |
| else: |
| if(rc < 1): |
| rc = 1 |
| problems[key] = value |
| return rc, problems |
| |
| # check for DUCC daemon status changes |
| def main(argv): |
| global logger |
| try: |
| logger = logging.getLogger('logger') |
| #handler = logging.StreamHandler(sys.stdout) |
| handler = logging.FileHandler('/dev/null') |
| logger.addHandler(handler) |
| parse_cmdline() |
| init_state_previous() |
| read_state_previous() |
| fetch_state_current() |
| summarize() |
| write_state_current() |
| email_state_changes() |
| rc, problems = eval() |
| ts = get_timestamp() |
| for daemon, status in problems.iteritems(): |
| print ts, daemon, status |
| print ts, 'rc='+str(rc) |
| sys.exit(rc) |
| except Exception as e: |
| error('exception in main') |
| exception(e) |
| rc = -1 |
| sys.exit(rc) |
| |
| if __name__ == '__main__': |
| main(sys.argv[1:]) |