| #!/usr/bin/env python |
| # ----------------------------------------------------------------------- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # ----------------------------------------------------------------------- |
| |
| import os |
| import sys |
| |
| from ducc_util import DuccUtil |
| from properties import Properties |
| from properties import Property |
| from ducc_util_out import * |
| |
| from optparse import OptionParser |
| from shutil import copy2 |
| |
| from tempfile import mkstemp |
| from shutil import move |
| from os import remove |
| |
| common_timestamp = get_timestamp() |
| |
| from ducc_logger import DuccLogger |
| logger = DuccLogger() |
| |
| # ----------------------------------------------------------------------- |
| # Extend OptionParser class |
| class ExtendedOptionParser(OptionParser): |
| # override epilog formatter so |
| # that newlines are not deleted! |
| def format_epilog(self, formatter): |
| return self.epilog |
| # ----------------------------------------------------------------------- |
| |
| # epilog for --help |
| def get_epilog(): |
| epilog = '' |
| epilog = epilog+'\n' |
| epilog = epilog+'Run this command on the host which will become the new DUCC head node.' |
| epilog = epilog+'\n' |
| epilog = epilog+'\n' |
| epilog = epilog+'Prerequisites:' |
| epilog = epilog+'\n' |
| epilog = epilog+'1. the current ducc.head node in site.ducc.properties is up' |
| epilog = epilog+'\n' |
| epilog = epilog+'2. the head daemons (broker, database, or, pm, rm, sm, ws) on the ducc.head node are down (e.g. stop_ducc -c head)' |
| epilog = epilog+'\n' |
| epilog = epilog+'\n' |
| epilog = epilog+'Operation:' |
| epilog = epilog+'\n' |
| epilog = epilog+'To the extent possible, the cluster will be checked to see if ' |
| epilog = epilog+'it is safe to edit the site.ducc.properties file, and if so ' |
| epilog = epilog+'then a backup of the original file is made then the ' |
| epilog = epilog+'requisite changes are made to realize the head node move. ' |
| epilog = epilog+'Likewise, the appropriate ducc.nodes file is backed-up and updated as necessary.' |
| epilog = epilog+'\n' |
| return epilog |
| |
| class MoveDucc(DuccUtil): |
| |
| # exit code |
| code = 0 |
| |
| # if current ducc host is offline (ie unreachable): |
| # when True proceed with move anyway |
| # when False abort the move (the default) |
| offline = False |
| |
| # exit |
| def exit(self): |
| text = 'exit code='+str(self.code) |
| logger.error(text) |
| sys.exit(self.code) |
| |
| # abort |
| def abort(self): |
| message = 'move not performed' |
| logger.error(message) |
| self.exit() |
| |
| # display by way of debug site.ducc.properties |
| def dump_properties(self,name,props): |
| logger.debug(name+':') |
| keys = props.get_keys() |
| for key in keys: |
| value = props.get(key) |
| logger.debug(key+'='+value) |
| |
| # initialize |
| def initialize(self): |
| self.site_root = self.DUCC_HOME+'/resources/' |
| self.site_stem = 'site.ducc.properties' |
| self.site_path = self.site_root+self.site_stem |
| self.site_props = Properties() |
| self.site_props.load(self.site_path) |
| self.dump_properties(self.site_stem, self.site_props) |
| |
| # parse command line |
| def parse_cmdline(self): |
| parser = ExtendedOptionParser(epilog=get_epilog()) |
| width = 45 |
| parser.formatter.help_position = width |
| parser.formatter.max_help_position = width |
| parser.add_option('-d','--debug', action='store_true', dest='flag_debug', default=False, |
| help='display debugging messages') |
| parser.add_option('-o','--offline', action='store_true', dest='flag_offline', default=False, |
| help='indicate current DUCC head node is offline, Note: USE THIS OPTION WITH EXTREME CAUTION else risk corrupting database') |
| parser.add_option('-q','--quiet', action='store_true', dest='flag_quiet', default=False, |
| help='do not display informational messages') |
| #parser.add_option('-t','--target', action='store', dest='target', default=self.localhost, |
| # help='the desired new DUCC head node, default='+self.localhost) |
| (options, args) = parser.parse_args() |
| if(options.flag_debug): |
| debug_on() |
| if(not options.flag_quiet): |
| info_on() |
| self.offline = options.flag_offline |
| try: |
| self.target = options.target |
| except: |
| self.target = self.localhost |
| self.target = self.target.split('.')[0] |
| |
| # fetch required property from site.ducc.properties |
| def get_required_prop(self,props,stem,key): |
| value = props.get(key) |
| if(value == None): |
| message = key+' not found in '+stem |
| logger.warn(message) |
| self.code = 1 |
| self.abort() |
| return value |
| |
| # assure target is legal |
| def vette_target(self): |
| key = 'ducc.head' |
| props = self.site_props |
| stem = self.site_stem |
| value = self.get_required_prop(props,stem,key) |
| if(value == self.target): |
| message = 'target '+self.target+' is already ducc.head' |
| logger.warn(message) |
| self.code = 1 |
| self.abort() |
| key = 'ducc.head.failover' |
| value = self.get_required_prop(props,stem,key) |
| if(not self.target in value): |
| message = 'target '+self.target+' not found in ducc.head.failover='+str(value) |
| logger.warn(message) |
| self.code = 1 |
| self.abort() |
| |
| # assure daemon is down |
| def vette_daemon(self, node, user, tuples, daemon): |
| for t in tuples: |
| if(t[2] == user): |
| if(t[0] == daemon): |
| message = 'node='+node+' pid='+t[1]+' user='+t[2]+' '+daemon+'=up' |
| logger.warn(message) |
| self.code = 1 |
| |
| # assure broker is down |
| def vette_broker(self): |
| if(self.is_amq_active()): |
| message = 'ActiveMQ listening at ' +self.broker_protocol + "://" + self.broker_host + ':' + self.broker_port |
| logger.warn(message) |
| self.code = 1 |
| else: |
| message = 'ActiveMQ down' |
| logger.debug(message) |
| |
| # assure database is down |
| def vette_db(self): |
| retry = 1 |
| verbose = False |
| if(self.db_alive(retry,verbose)): |
| message = 'database alive' |
| logger.warn(message) |
| code = 1 |
| else: |
| message = 'database down' |
| logger.debug(message) |
| |
| # head node is not reachable |
| def abort_unreachable(self): |
| hint = 'hint: use flag "-o or --offline"' |
| logger.info(hint) |
| self.code = 1 |
| self.abort() |
| |
| # warn if --offline flag is ignored |
| def is_offline_honored(self): |
| if(self.offline): |
| message = '--offline request not honored' |
| logger.warn(message) |
| |
| # assure head node daemons are down or |
| # require --offline flag when head node is unreachable |
| def vette_head(self): |
| props = self.site_props |
| key = 'ducc.head' |
| prop = props.get_property(key) |
| node = prop.v |
| self.head = node |
| message = message = 'node='+node+' '+'checking ducc head node daemons status' |
| logger.info(message) |
| operational = self.ssh_operational(node) |
| if(operational): |
| self.is_offline_honored(); |
| hint = 'hint: run "stop_ducc -c head" or "check_ducc -k"' |
| user = os.environ['LOGNAME'] |
| (bool, tuples) = self.find_ducc_process(node) |
| logger.debug('ducc processes:'+str(tuples)) |
| self.vette_daemon(node, user, tuples, 'broker') |
| self.vette_daemon(node, user, tuples, 'database') |
| self.vette_daemon(node, user, tuples, 'orchestrator') |
| self.vette_daemon(node, user, tuples, 'pm') |
| self.vette_daemon(node, user, tuples, 'rm') |
| self.vette_daemon(node, user, tuples, 'sm') |
| self.vette_daemon(node, user, tuples, 'ws') |
| self.vette_broker() |
| self.vette_db() |
| if(self.code > 0): |
| logger.info(hint) |
| self.abort() |
| else: |
| message = 'node='+node+' '+'ducc head node daemons are down' |
| logger.info(message) |
| else: |
| message = 'node='+node+' '+'not reachable' |
| if(not self.offline): |
| logger.warn(message) |
| self.abort_unreachable() |
| else : |
| logger.info(message) |
| |
| # assure node pool compatibility |
| def vette_nodepool(self): |
| default = '--default--' |
| node = self.head |
| np_head = str(self.get_nodepool(node,default)) |
| message = 'nodepool[head]='+np_head |
| logger.debug(message) |
| node = self.target |
| np_target = str(self.get_nodepool(node,default)) |
| message = 'nodepool[target]='+np_target |
| logger.debug(message) |
| if((not np_head == default) and (np_target == default)): |
| message = 'node='+self.head+' nodepool='+np_head |
| logger.info(message) |
| message = 'node='+self.target+' nodepool='+np_target |
| logger.info(message) |
| message = 'nodepool swap' |
| logger.debug(message) |
| elif(not np_head == np_target): |
| message = 'node='+self.head+' nodepool='+np_head |
| logger.warn(message) |
| message = 'node='+self.target+' nodepool='+np_target |
| logger.warn(message) |
| message = 'nodepools do not match' |
| logger.warn(message) |
| self.code = 1 |
| self.abort() |
| |
| # backup file |
| def backup_file(self,root,stem): |
| global common_timestamp |
| timestamp = str(common_timestamp).replace(' ','@') |
| back = stem+'.'+timestamp |
| message = 'creating backup file='+back |
| logger.info(message) |
| f_src = root+stem |
| f_tgt = root+back |
| copy2(f_src,f_tgt) |
| |
| # backup existing site.ducc.properties file |
| def config_backup(self): |
| self.backup_file(self.site_root,self.site_stem) |
| |
| # debug property changed |
| def tell_change(self,key,old,new): |
| message = key+'='+old+'->'+new |
| logger.debug(message) |
| |
| # warn property not changed |
| def tell_no_change(self,key,old): |
| message = key+'='+old+'->'+'no change' |
| logger.warn(message) |
| |
| # update properties |
| def update_site_ducc_properties(self): |
| global common_timestamp |
| comment = [ '# moved '+common_timestamp ] |
| file = self.site_path |
| props = self.site_props |
| changes = 0 |
| # ducc.head |
| key = 'ducc.head' |
| prop = props.get_property(key) |
| old = prop.v |
| new = self.target |
| prop.v = new |
| prop.c = comment |
| self.tell_change(key,old,new) |
| changes = changes + 1 |
| self.orig_head = old.split('.')[0] |
| # ducc.head |
| key = 'ducc.database.host' |
| prop = props.get_property(key) |
| old = prop.v |
| if(old == self.orig_head): |
| new = self.target |
| prop.v = new |
| prop.c = comment |
| self.tell_change(key,old,new) |
| changes = changes + 1 |
| else: |
| new = 'no change' |
| self.tell_no_change(key,old) |
| self.orig_database = old.split('.')[0] |
| # name |
| key = 'ducc.cluster.name' |
| prop = props.get_property(key) |
| old = prop.v |
| new = self.target |
| if(self.orig_head in old): |
| new = old.replace(self.orig_head,self.target) |
| prop.v = new |
| prop.c = comment |
| self.tell_change(key,old,new) |
| changes = changes + 1 |
| # write file |
| if(self.orig_head == self.orig_database): |
| props.write(file) |
| message = self.site_stem+' '+'updates='+str(changes) |
| logger.info(message) |
| else: |
| message = 'head:'+self.orig_head+' does not match '+'database:'+self.orig_database |
| logger.error(message) |
| code = 1 |
| self.abort() |
| |
| # edit nodefile |
| def edit_nodefile(self,source_file_path,before,after): |
| fh, target_file_path = mkstemp() |
| with open(target_file_path, 'w') as target_file: |
| with open(source_file_path, 'r') as source_file: |
| for line in source_file: |
| text = line.strip() |
| if(text == before): |
| target_file.write(after+'\n') |
| logger.info(before+' -> '+after) |
| else: |
| target_file.write(line) |
| remove(source_file_path) |
| move(target_file_path, source_file_path) |
| |
| # update nodefile |
| def update_nodefile(self): |
| default = '' |
| np_head = str(self.get_nodepool_file(self.head,default)) |
| np_target = str(self.get_nodepool_file(self.target,default)) |
| message = 'file[head]='+np_head |
| logger.debug(message) |
| if(not np_head == 'null'): |
| np_head_file = np_head.rsplit('/',1)[1] |
| self.backup_file(self.site_root,np_head_file) |
| message = 'updating '+np_head_file |
| logger.info(message) |
| self.edit_nodefile(np_head,self.head,self.target) |
| message = 'file[target]='+np_target |
| logger.debug(message) |
| #if(not np_target == 'null'): |
| # np_target_file = np_target.rsplit('/',1)[1] |
| # self.backup_file(self.site_root,np_target_file) |
| #message = 'nodepool updated' |
| #logger.info(message) |
| |
| # perform move and notify of success |
| def config_update(self): |
| self.update_site_ducc_properties() |
| self.update_nodefile() |
| message = 'move completed' |
| logger.info(message) |
| |
| def main(self, argv): |
| self.parse_cmdline() |
| self.initialize() |
| self.vette_target() |
| self.vette_head() |
| self.vette_nodepool() |
| self.config_backup() |
| self.config_update() |
| |
| if __name__ == '__main__': |
| instance = MoveDucc() |
| instance.main(sys.argv[1:]) |