| #!/usr/bin/env python |
| # ----------------------------------------------------------------------- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # ----------------------------------------------------------------------- |
| |
| import glob |
| import os |
| import socket |
| import subprocess |
| import sys |
| import tarfile |
| |
| from optparse import HelpFormatter |
| from optparse import OptionGroup |
| from optparse import OptionParser |
| |
| import logging |
| |
| from ducc_util import DuccUtil |
| |
| # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
| # + |
| # + db_tool |
| # + |
| # + purpose: save & restore snapshots of DUCC's Cassandra database |
| # + |
| # +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
| |
| # Extend OptionParser class |
| class DbToolOptionParser(OptionParser): |
| |
| # override epilog formatter so |
| # that newlines are not deleted! |
| def format_epilog(self, formatter): |
| return self.epilog |
| |
| # A DUCC tool for creating/restoring Cassandra database backups. |
| # See --help |
| |
| class Logger(): |
| |
| def __init__(self, options): |
| self.options = options |
| |
| #Get the root logger |
| logger = logging.getLogger() |
| #Have to set the root logger level, it defaults to logging.WARNING |
| logger.setLevel(logging.NOTSET) |
| |
| formatter = logging.Formatter('%(asctime)s %(message)s') |
| |
| logging_handler_out = logging.StreamHandler(sys.stdout) |
| logging_handler_out.setLevel(logging.INFO) |
| logging_handler_out.setFormatter(formatter) |
| logger.addHandler(logging_handler_out) |
| |
| #logging_handler_err = logging.StreamHandler(sys.stderr) |
| #logging_handler_err.setLevel(logging.WARNING) |
| #logging_handler_err.setFormatter(formatter) |
| #logger.addHandler(logging_handler_err) |
| |
| def info(self, text): |
| if(self.options.var_quiet): |
| return |
| if(text == None): |
| return |
| if(text == ''): |
| return |
| for line in text.splitlines(): |
| logging.info('I '+line) |
| |
| def debug(self, text): |
| if(not self.options.var_verbose): |
| return |
| if(text == None): |
| return |
| if(text == ''): |
| return |
| for line in text.splitlines(): |
| logging.debug('D '+line) |
| |
| def warn(self, text): |
| if(text == None): |
| return |
| if(text == ''): |
| return |
| for line in text.splitlines(): |
| logging.warn('W '+line) |
| |
| class DbTool(DuccUtil): |
| |
| # constants |
| subdir_state = '/state' |
| subdir_database = subdir_state+'/database' |
| subdir_database_data = subdir_database+'/data' |
| subdir_cassandra = '/cassandra-server' |
| subdir_cassandra_bin = subdir_cassandra+'/bin' |
| cmd_name_nodetool = 'nodetool' |
| snapshot_name = 'SNAPSHOT' |
| |
| # keywords |
| kw_save = 'save' |
| kw_save_overwrite = 'save-overwrite' |
| kw_restore = 'restore' |
| kw_restore_overwrite = 'restore-overwrite' |
| kw_verbose = 'verbose' |
| kw_quiet = 'quiet' |
| |
| def __init__(self): |
| DuccUtil.__init__(self) |
| self.cmd_nodetool = self.DUCC_HOME+self.subdir_cassandra_bin+'/'+self.cmd_name_nodetool |
| self.ducc_database = self.DUCC_HOME+self.subdir_database |
| self.ducc_database_data = self.DUCC_HOME+self.subdir_database_data |
| |
| def terminate(self): |
| text = 'unable to continue, processing terminated' |
| self.logger.warn(text) |
| raise SystemExit(1) |
| |
| def complete(self): |
| text = 'processing completed successfully' |
| self.logger.info(text) |
| raise SystemExit(0) |
| |
| def get_epilog(self): |
| epilog = '' |
| |
| epilog = epilog + '\n' |
| epilog = epilog + 'Example:' |
| epilog = epilog + '\n' |
| epilog = epilog + 'bash-4.1$ ./db_tool --save-overwrite /backups/ducc/ducc-db.tar.gz' |
| epilog = epilog + '\n' |
| epilog = epilog + '2016-09-30 14:05:25,938 I save-overwrite' |
| epilog = epilog + '\n' |
| epilog = epilog + '2016-09-30 14:05:28,570 I Uptime (seconds) : 506' |
| epilog = epilog + '\n' |
| epilog = epilog + '2016-09-30 14:05:28,571 I database is up' |
| epilog = epilog + '\n' |
| epilog = epilog + '2016-09-30 14:05:28,571 I remove snapshot' |
| epilog = epilog + '\n' |
| epilog = epilog + '2016-09-30 14:05:31,152 I create snapshot' |
| epilog = epilog + '\n' |
| epilog = epilog + '2016-09-30 14:05:34,002 I remove /home/degenaro/ducc-db.tar.gz' |
| epilog = epilog + '\n' |
| epilog = epilog + '2016-09-30 14:05:34,015 I create /home/degenaro/ducc-db.tar.gz' |
| epilog = epilog + '\n' |
| epilog = epilog + '2016-09-30 14:05:34,499 I count[files]=303' |
| epilog = epilog + '\n' |
| epilog = epilog + '2016-09-30 14:05:34,499 I size[bytes]=177844' |
| epilog = epilog + '\n' |
| epilog = epilog + '2016-09-30 14:05:34,499 I processing completed successfully' |
| epilog = epilog + '\n' |
| return epilog |
| |
| # parse command line |
| def parse(self, argv): |
| self.parser = DbToolOptionParser(epilog=self.get_epilog()) |
| self.parser.formatter.help_position = 36 |
| self.parser.formatter.max_help_position = 36 |
| self.parser.add_option('--'+self.kw_save, action='store', dest='var_save', default=None, metavar='PATH.tar.gz', |
| help='create snapshot of database and save to specified tar.gz file; requires database to be up') |
| self.parser.add_option('--'+self.kw_save_overwrite, action='store', dest='var_save_overwrite', default=None, metavar='PATH.tar.gz', |
| help='create snapshot of database and save to specified tar.gz file, overwriting previous file if one exists; requires database to be up') |
| self.parser.add_option('--'+self.kw_restore, action='store', dest='var_restore', default=None, metavar='PATH.tar.gz', |
| help='extract snapshot from specified tar.gz file and restore database; requires database to be down') |
| self.parser.add_option('--'+self.kw_restore_overwrite, action='store', dest='var_restore_overwrite', default=None, metavar='PATH.tar.gz', |
| help='extract snapshot from specified tar.gz file and restore database, overwriting previous database if one exists; requires database to be down') |
| self.parser.add_option('--'+self.kw_quiet, action='store_true', dest='var_quiet', default=False, |
| help='print no informational messages') |
| self.parser.add_option('--'+self.kw_verbose, action='store_true', dest='var_verbose', default=False, |
| help='print extra debug messages') |
| (self.options, self.args) = self.parser.parse_args() |
| # at least one of { save, save-overwrite, restore } |
| if (self.options.var_save == None) and (self.options.var_save_overwrite == None) and (self.options.var_restore == None) and (self.options.var_restore_overwrite == None): |
| self.parser.error('option --'+self.kw_save+ |
| ' or --'+self.kw_save_overwrite+ |
| ' or --'+self.kw_restore+ |
| ' or --'+self.kw_restore_overwrite+ |
| ' must be specified') |
| # exactly one of { save, save-overwrite, restore, restore-overwrite } |
| if (self.options.var_save != None) and (self.options.var_save_overwrite != None): |
| self.parser.error('options --'+self.kw_save+' and --'+self.kw_save_overwrite+' are mutually exclusive') |
| if (self.options.var_save != None) and (self.options.var_restore != None): |
| self.parser.error('options --'+self.kw_save+' and --'+self.kw_restore+' are mutually exclusive') |
| if (self.options.var_save != None) and (self.options.var_restore_overwrite != None): |
| self.parser.error('options --'+self.kw_save+' and --'+self.kw_restore_overwrite+' are mutually exclusive') |
| if (self.options.var_save_overwrite != None) and (self.options.var_restore != None): |
| self.parser.error('options --'+self.kw_save_overwrite+' and --'+self.kw_restore+' are mutually exclusive') |
| if (self.options.var_save_overwrite != None) and (self.options.var_restore_overwrite != None): |
| self.parser.error('options --'+self.kw_save_overwrite+' and --'+self.kw_restore_overwrite+' are mutually exclusive') |
| if (self.options.var_restore != None) and (self.options.var_restore_overwrite != None): |
| self.parser.error('options --'+self.kw_restore+' and --'+self.kw_restore_overwrite+' are mutually exclusive') |
| # exactly one of { quiet, verbose } |
| if (self.options.var_quiet) and (self.options.var_verbose): |
| self.parser.error('options --'+self.kw_quiet+' and --'+self.kw_verbose+' are mutually exclusive') |
| |
| def db_status(self): |
| status = 'unknown' |
| p = subprocess.Popen([self.cmd_nodetool, 'info'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| out, err = p.communicate() |
| if ((err != None) and (err != '')): |
| self.logger.debug('err: '+err) |
| if (err.startswith(self.cmd_name_nodetool+': Failed to connect')): |
| status = 'down' |
| elif ((out != None) and (out != '')): |
| self.logger.debug('out: '+out) |
| for line in out.splitlines(): |
| if(line.startswith('Uptime')): |
| text = ' '.join(line.split()) |
| self.logger.info(text) |
| status = 'up' |
| break |
| return status |
| |
| def assure_database_up(self): |
| if(self.db_status() != 'up'): |
| text = 'database is not up' |
| self.logger.warn(text) |
| self.terminate() |
| text = 'database is up' |
| self.logger.info(text) |
| |
| def assure_database_down(self): |
| if(self.db_status() != 'down'): |
| text = 'database is not down' |
| self.logger.warn(text) |
| self.terminate() |
| text = 'database is down' |
| self.logger.info(text) |
| |
| # remove snapshot |
| def remove_snapshot(self): |
| text = 'remove snapshot' |
| self.logger.info(text) |
| name = self.snapshot_name |
| p = subprocess.Popen([self.cmd_nodetool, 'clearsnapshot', '-t', name], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| out, err = p.communicate() |
| self.logger.debug(out) |
| if(err != None): |
| if(err != ''): |
| self.logger.warn(err) |
| text = 'remove snapshot failure' |
| self.logger.warn(text) |
| self.terminate() |
| |
| # create snapshot |
| def create_snapshot(self): |
| text = 'create snapshot' |
| self.logger.info(text) |
| name = self.snapshot_name |
| p = subprocess.Popen([self.cmd_nodetool, 'snapshot', '-t', name], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| out, err = p.communicate() |
| self.logger.debug(out) |
| if(err != None): |
| if(err != ''): |
| self.logger.warn(err) |
| text = 'create snapshot failure' |
| self.logger.warn(text) |
| self.terminate() |
| |
| # remove database directory |
| def remove_directory(self): |
| path = self.ducc_database |
| if(not os.path.exists(path)): |
| return |
| text = 'remove '+path |
| self.logger.info(text) |
| p = subprocess.Popen(['/bin/rm', '-fr', path], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| out, err = p.communicate() |
| self.logger.debug(out) |
| if(err != None): |
| if(err != ''): |
| self.logger.warn(err) |
| text = 'remove '+path+' failure' |
| self.logger.warn(text) |
| self.terminate() |
| |
| # create database directory |
| def create_directory(self): |
| path = self.ducc_database_data |
| if(os.path.exists(path)): |
| return |
| text = 'create '+path |
| self.logger.info(text) |
| os.makedirs(path) |
| |
| # remove tar.gz |
| def remove_targz(self): |
| if(not os.path.exists(self.targz)): |
| return |
| text = 'remove '+self.targz |
| self.logger.info(text) |
| p = subprocess.Popen(['/bin/rm', self.targz], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| out, err = p.communicate() |
| self.logger.debug(out) |
| if(err != None): |
| if(err != ''): |
| self.logger.warn(err) |
| text = 'remove '+self.targz+' failure' |
| self.logger.warn(text) |
| self.terminate() |
| |
| # create targz directory |
| def makedirs_targz(self): |
| path = self.targz.rsplit('/',1)[0] |
| if(os.path.exists(path)): |
| return |
| text = 'makedirs '+path |
| self.logger.info(text) |
| os.makedirs(path) |
| |
| # create tar.gz |
| def create_targz(self): |
| text = 'create '+self.targz |
| self.logger.info(text) |
| source = self.ducc_database_data |
| target = self.targz |
| filelist = glob.glob(source+'/*/**/snapshots/SNAPSHOT/**') |
| tarf = tarfile.open(target,'w:gz') |
| count = 0 |
| try: |
| for file in filelist: |
| relative_name = file.split(source+'/')[1] |
| no_snapshot_name = relative_name.split('/snapshots/SNAPSHOT') |
| restore_name = no_snapshot_name[0]+no_snapshot_name[1] |
| tarf.add(file, restore_name) |
| self.logger.debug('add: '+restore_name) |
| count = count + 1 |
| finally: |
| tarf.close() |
| self.logger.info('count[files]='+str(count)) |
| size = os.path.getsize(self.targz) |
| self.logger.info('size[bytes]='+str(size)) |
| |
| # check tar.gz |
| def check_targz(self): |
| text = 'check '+self.targz |
| self.logger.info(text) |
| target = self.targz |
| p = subprocess.Popen(['/bin/tar', '-tzf', target], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| out, err = p.communicate() |
| self.logger.debug(out) |
| if(err != None): |
| if(err != ''): |
| self.logger.warn(err) |
| text = 'check '+self.targz+' failure' |
| self.logger.warn(text) |
| self.terminate() |
| size = os.path.getsize(self.targz) |
| self.logger.info('size[bytes]='+str(size)) |
| #count = 0 |
| #for line in out.splitlines(): |
| # count = count + 1 |
| #self.logger.info('count[files]='+str(count)) |
| |
| # extract tar.gz |
| def extract_targz(self): |
| text = 'extract '+self.targz |
| self.logger.info(text) |
| source = self.targz |
| target = self.ducc_database_data |
| p = subprocess.Popen(['/bin/tar', '-xvzf', source, '-C', target], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| out, err = p.communicate() |
| self.logger.debug(out) |
| if(err != None): |
| if(err != ''): |
| self.logger.warn(err) |
| text = 'extract '+self.targz+' failure' |
| self.logger.warn(text) |
| self.terminate() |
| count = 0 |
| for line in out.splitlines(): |
| self.logger.debug("extract: "+line) |
| count = count + 1 |
| self.logger.info('count[files]='+str(count)) |
| |
| # steps for --save |
| def save_virgin(self): |
| text = 'save' |
| self.logger.info(text) |
| self.targz = self.options.var_save |
| if(os.path.exists(self.targz)): |
| text = self.targz+' '+'exists' |
| self.logger.warn(text) |
| self.terminate() |
| self.save() |
| |
| # steps for save-overwrite |
| def save_overwrite(self): |
| text = self.kw_save_overwrite |
| self.logger.info(text) |
| self.targz = self.options.var_save_overwrite |
| if(os.path.exists(self.targz)): |
| if(not os.path.isfile(self.targz)): |
| text = self.targz+' '+'not a file' |
| self.logger.warn(text) |
| self.terminate() |
| if(not os.access(self.targz, os.W_OK)): |
| text = self.targz+' '+'not writable' |
| self.logger.warn(text) |
| self.terminate() |
| self.save() |
| |
| # common steps for --save and --save-overwrite |
| def save(self): |
| self.assure_database_up() |
| self.remove_snapshot() |
| self.create_snapshot() |
| self.remove_targz() |
| self.makedirs_targz() |
| self.create_targz() |
| self.complete() |
| |
| # steps for --restore |
| def restore_virgin(self): |
| text = 'restore' |
| self.logger.info(text) |
| self.targz = self.options.var_restore |
| path = self.ducc_database |
| if(os.path.exists(path)): |
| text = path+' '+'exists' |
| self.logger.warn(text) |
| self.terminate() |
| self.restore() |
| |
| # steps for --restore-overwrite |
| def restore_overwrite(self): |
| text = self.kw_restore_overwrite |
| self.logger.info(text) |
| self.targz = self.options.var_restore_overwrite |
| path = self.ducc_database |
| if(os.path.exists(path)): |
| if(os.path.isfile(path)): |
| text = path+' '+'not a directory' |
| self.logger.warn(text) |
| self.terminate() |
| if(not os.access(path, os.W_OK)): |
| text = path+' '+'not writable' |
| self.logger.warn(text) |
| self.terminate() |
| self.restore() |
| |
| # common steps for --restore and --restore-overwrite |
| def restore(self): |
| text = self.kw_restore |
| self.logger.debug(text) |
| self.assure_database_down() |
| self.check_targz() |
| self.remove_directory() |
| self.create_directory() |
| self.extract_targz() |
| self.complete() |
| |
| # record hostname where invoked |
| def hostname(self): |
| text = 'host='+socket.gethostbyaddr(socket.gethostname())[0] |
| self.logger.info(text) |
| |
| # --save or --save-overwrite or --restore or --restore-overwrite |
| def main(self, argv): |
| self.parse(argv) |
| self.logger = Logger(self.options) |
| self.hostname() |
| if(self.options.var_save != None): |
| self.save_virgin() |
| elif (self.options.var_save_overwrite != None): |
| self.save_overwrite() |
| elif (self.options.var_restore != None): |
| self.restore_virgin() |
| elif (self.options.var_restore_overwrite != None): |
| self.restore_overwrite() |
| else: |
| text = 'huh?' |
| self.logger.warn(text) |
| |
| if __name__ == '__main__': |
| instance = DbTool() |
| instance.main(sys.argv[1:]) |