blob: bb1bc029652a0cca38e6c1140452060c46ef5a2a [file] [log] [blame]
#!/usr/bin/env python
# -----------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# -----------------------------------------------------------------------
import os
import sys
from ducc_util import DuccUtil
from properties import Properties
from properties import Property
from ducc_util_out import *
from optparse import OptionParser
from shutil import copy2
from tempfile import mkstemp
from shutil import move
from os import remove
common_timestamp = get_timestamp()
from ducc_logger import DuccLogger
logger = DuccLogger()
# -----------------------------------------------------------------------
# Extend OptionParser class
class ExtendedOptionParser(OptionParser):
# override epilog formatter so
# that newlines are not deleted!
def format_epilog(self, formatter):
return self.epilog
# -----------------------------------------------------------------------
# epilog for --help
def get_epilog():
epilog = ''
epilog = epilog+'\n'
epilog = epilog+'Run this command on the host which will become the new DUCC head node.'
epilog = epilog+'\n'
epilog = epilog+'\n'
epilog = epilog+'Prerequisites:'
epilog = epilog+'\n'
epilog = epilog+'1. the current ducc.head node in site.ducc.properties is up'
epilog = epilog+'\n'
epilog = epilog+'2. the head daemons (broker, database, or, pm, rm, sm, ws) on the ducc.head node are down (e.g. stop_ducc -c head)'
epilog = epilog+'\n'
epilog = epilog+'\n'
epilog = epilog+'Operation:'
epilog = epilog+'\n'
epilog = epilog+'To the extent possible, the cluster will be checked to see if '
epilog = epilog+'it is safe to edit the site.ducc.properties file, and if so '
epilog = epilog+'then a backup of the original file is made then the '
epilog = epilog+'requisite changes are made to realize the head node move. '
epilog = epilog+'Likewise, the appropriate ducc.nodes file is backed-up and updated as necessary.'
epilog = epilog+'\n'
return epilog
class MoveDucc(DuccUtil):
# exit code
code = 0
# if current ducc host is offline (ie unreachable):
# when True proceed with move anyway
# when False abort the move (the default)
offline = False
# exit
def exit(self):
text = 'exit code='+str(self.code)
logger.error(text)
sys.exit(self.code)
# abort
def abort(self):
message = 'move not performed'
logger.error(message)
self.exit()
# display by way of debug site.ducc.properties
def dump_properties(self,name,props):
logger.debug(name+':')
keys = props.get_keys()
for key in keys:
value = props.get(key)
logger.debug(key+'='+value)
# initialize
def initialize(self):
self.site_root = self.DUCC_HOME+'/resources/'
self.site_stem = 'site.ducc.properties'
self.site_path = self.site_root+self.site_stem
self.site_props = Properties()
self.site_props.load(self.site_path)
self.dump_properties(self.site_stem, self.site_props)
# parse command line
def parse_cmdline(self):
parser = ExtendedOptionParser(epilog=get_epilog())
width = 45
parser.formatter.help_position = width
parser.formatter.max_help_position = width
parser.add_option('-d','--debug', action='store_true', dest='flag_debug', default=False,
help='display debugging messages')
parser.add_option('-o','--offline', action='store_true', dest='flag_offline', default=False,
help='indicate current DUCC head node is offline, Note: USE THIS OPTION WITH EXTREME CAUTION else risk corrupting database')
parser.add_option('-q','--quiet', action='store_true', dest='flag_quiet', default=False,
help='do not display informational messages')
#parser.add_option('-t','--target', action='store', dest='target', default=self.localhost,
# help='the desired new DUCC head node, default='+self.localhost)
(options, args) = parser.parse_args()
if(options.flag_debug):
debug_on()
if(not options.flag_quiet):
info_on()
self.offline = options.flag_offline
try:
self.target = options.target
except:
self.target = self.localhost
self.target = self.target.split('.')[0]
# fetch required property from site.ducc.properties
def get_required_prop(self,props,stem,key):
value = props.get(key)
if(value == None):
message = key+' not found in '+stem
logger.warn(message)
self.code = 1
self.abort()
return value
# assure target is legal
def vette_target(self):
key = 'ducc.head'
props = self.site_props
stem = self.site_stem
value = self.get_required_prop(props,stem,key)
if(value == self.target):
message = 'target '+self.target+' is already ducc.head'
logger.warn(message)
self.code = 1
self.abort()
key = 'ducc.head.failover'
value = self.get_required_prop(props,stem,key)
if(not self.target in value):
message = 'target '+self.target+' not found in ducc.head.failover='+str(value)
logger.warn(message)
self.code = 1
self.abort()
# assure daemon is down
def vette_daemon(self, node, user, tuples, daemon):
for t in tuples:
if(t[2] == user):
if(t[0] == daemon):
message = 'node='+node+' pid='+t[1]+' user='+t[2]+' '+daemon+'=up'
logger.warn(message)
self.code = 1
# assure broker is down
def vette_broker(self):
if(self.is_amq_active()):
message = 'ActiveMQ listening at ' +self.broker_protocol + "://" + self.broker_host + ':' + self.broker_port
logger.warn(message)
self.code = 1
else:
message = 'ActiveMQ down'
logger.debug(message)
# assure database is down
def vette_db(self):
retry = 1
verbose = False
if(self.db_alive(retry,verbose)):
message = 'database alive'
logger.warn(message)
code = 1
else:
message = 'database down'
logger.debug(message)
# head node is not reachable
def abort_unreachable(self):
hint = 'hint: use flag "-o or --offline"'
logger.info(hint)
self.code = 1
self.abort()
# warn if --offline flag is ignored
def is_offline_honored(self):
if(self.offline):
message = '--offline request not honored'
logger.warn(message)
# assure head node daemons are down or
# require --offline flag when head node is unreachable
def vette_head(self):
props = self.site_props
key = 'ducc.head'
prop = props.get_property(key)
node = prop.v
self.head = node
message = message = 'node='+node+' '+'checking ducc head node daemons status'
logger.info(message)
operational = self.ssh_operational(node)
if(operational):
self.is_offline_honored();
hint = 'hint: run "stop_ducc -c head" or "check_ducc -k"'
user = os.environ['LOGNAME']
(bool, tuples) = self.find_ducc_process(node)
logger.debug('ducc processes:'+str(tuples))
self.vette_daemon(node, user, tuples, 'broker')
self.vette_daemon(node, user, tuples, 'database')
self.vette_daemon(node, user, tuples, 'orchestrator')
self.vette_daemon(node, user, tuples, 'pm')
self.vette_daemon(node, user, tuples, 'rm')
self.vette_daemon(node, user, tuples, 'sm')
self.vette_daemon(node, user, tuples, 'ws')
self.vette_broker()
self.vette_db()
if(self.code > 0):
logger.info(hint)
self.abort()
else:
message = 'node='+node+' '+'ducc head node daemons are down'
logger.info(message)
else:
message = 'node='+node+' '+'not reachable'
if(not self.offline):
logger.warn(message)
self.abort_unreachable()
else :
logger.info(message)
# assure node pool compatibility
def vette_nodepool(self):
default = '--default--'
node = self.head
np_head = str(self.get_nodepool(node,default))
message = 'nodepool[head]='+np_head
logger.debug(message)
node = self.target
np_target = str(self.get_nodepool(node,default))
message = 'nodepool[target]='+np_target
logger.debug(message)
if((not np_head == default) and (np_target == default)):
message = 'node='+self.head+' nodepool='+np_head
logger.info(message)
message = 'node='+self.target+' nodepool='+np_target
logger.info(message)
message = 'nodepool swap'
logger.debug(message)
elif(not np_head == np_target):
message = 'node='+self.head+' nodepool='+np_head
logger.warn(message)
message = 'node='+self.target+' nodepool='+np_target
logger.warn(message)
message = 'nodepools do not match'
logger.warn(message)
self.code = 1
self.abort()
# backup file
def backup_file(self,root,stem):
global common_timestamp
timestamp = str(common_timestamp).replace(' ','@')
back = stem+'.'+timestamp
message = 'creating backup file='+back
logger.info(message)
f_src = root+stem
f_tgt = root+back
copy2(f_src,f_tgt)
# backup existing site.ducc.properties file
def config_backup(self):
self.backup_file(self.site_root,self.site_stem)
# debug property changed
def tell_change(self,key,old,new):
message = key+'='+old+'->'+new
logger.debug(message)
# warn property not changed
def tell_no_change(self,key,old):
message = key+'='+old+'->'+'no change'
logger.warn(message)
# update properties
def update_site_ducc_properties(self):
global common_timestamp
comment = [ '# moved '+common_timestamp ]
file = self.site_path
props = self.site_props
changes = 0
# ducc.head
key = 'ducc.head'
prop = props.get_property(key)
old = prop.v
new = self.target
prop.v = new
prop.c = comment
self.tell_change(key,old,new)
changes = changes + 1
self.orig_head = old.split('.')[0]
# ducc.head
key = 'ducc.database.host'
prop = props.get_property(key)
old = prop.v
if(old == self.orig_head):
new = self.target
prop.v = new
prop.c = comment
self.tell_change(key,old,new)
changes = changes + 1
else:
new = 'no change'
self.tell_no_change(key,old)
self.orig_database = old.split('.')[0]
# name
key = 'ducc.cluster.name'
prop = props.get_property(key)
old = prop.v
new = self.target
if(self.orig_head in old):
new = old.replace(self.orig_head,self.target)
prop.v = new
prop.c = comment
self.tell_change(key,old,new)
changes = changes + 1
# write file
if(self.orig_head == self.orig_database):
props.write(file)
message = self.site_stem+' '+'updates='+str(changes)
logger.info(message)
else:
message = 'head:'+self.orig_head+' does not match '+'database:'+self.orig_database
logger.error(message)
code = 1
self.abort()
# edit nodefile
def edit_nodefile(self,source_file_path,before,after):
fh, target_file_path = mkstemp()
with open(target_file_path, 'w') as target_file:
with open(source_file_path, 'r') as source_file:
for line in source_file:
text = line.strip()
if(text == before):
target_file.write(after+'\n')
logger.info(before+' -> '+after)
else:
target_file.write(line)
remove(source_file_path)
move(target_file_path, source_file_path)
# update nodefile
def update_nodefile(self):
default = ''
np_head = str(self.get_nodepool_file(self.head,default))
np_target = str(self.get_nodepool_file(self.target,default))
message = 'file[head]='+np_head
logger.debug(message)
if(not np_head == 'null'):
np_head_file = np_head.rsplit('/',1)[1]
self.backup_file(self.site_root,np_head_file)
message = 'updating '+np_head_file
logger.info(message)
self.edit_nodefile(np_head,self.head,self.target)
message = 'file[target]='+np_target
logger.debug(message)
#if(not np_target == 'null'):
# np_target_file = np_target.rsplit('/',1)[1]
# self.backup_file(self.site_root,np_target_file)
#message = 'nodepool updated'
#logger.info(message)
# perform move and notify of success
def config_update(self):
self.update_site_ducc_properties()
self.update_nodefile()
message = 'move completed'
logger.info(message)
def main(self, argv):
self.parse_cmdline()
self.initialize()
self.vette_target()
self.vette_head()
self.vette_nodepool()
self.config_backup()
self.config_update()
if __name__ == '__main__':
instance = MoveDucc()
instance.main(sys.argv[1:])