blob: f861c531781418f7fc024dabdc10e0709ca18457 [file] [log] [blame]
#!/usr/bin/env python
# -----------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# -----------------------------------------------------------------------
import glob
import os
import socket
import subprocess
import sys
import tarfile
from optparse import HelpFormatter
from optparse import OptionGroup
from optparse import OptionParser
import logging
from ducc_util import DuccUtil
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# +
# + db_tool
# +
# + purpose: save & restore snapshots of DUCC's Cassandra database
# +
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# Extend OptionParser class
class DbToolOptionParser(OptionParser):
# override epilog formatter so
# that newlines are not deleted!
def format_epilog(self, formatter):
return self.epilog
# A DUCC tool for creating/restoring Cassandra database backups.
# See --help
class Logger():
def __init__(self, options):
self.options = options
#Get the root logger
logger = logging.getLogger()
#Have to set the root logger level, it defaults to logging.WARNING
logger.setLevel(logging.NOTSET)
formatter = logging.Formatter('%(asctime)s %(message)s')
logging_handler_out = logging.StreamHandler(sys.stdout)
logging_handler_out.setLevel(logging.INFO)
logging_handler_out.setFormatter(formatter)
logger.addHandler(logging_handler_out)
#logging_handler_err = logging.StreamHandler(sys.stderr)
#logging_handler_err.setLevel(logging.WARNING)
#logging_handler_err.setFormatter(formatter)
#logger.addHandler(logging_handler_err)
def info(self, text):
if(self.options.var_quiet):
return
if(text == None):
return
if(text == ''):
return
for line in text.splitlines():
logging.info('I '+line)
def debug(self, text):
if(not self.options.var_verbose):
return
if(text == None):
return
if(text == ''):
return
for line in text.splitlines():
logging.debug('D '+line)
def warn(self, text):
if(text == None):
return
if(text == ''):
return
for line in text.splitlines():
logging.warn('W '+line)
class DbTool(DuccUtil):
# constants
subdir_state = '/state'
subdir_database = subdir_state+'/database'
subdir_database_data = subdir_database+'/data'
subdir_cassandra = '/cassandra-server'
subdir_cassandra_bin = subdir_cassandra+'/bin'
cmd_name_nodetool = 'nodetool'
snapshot_name = 'SNAPSHOT'
# keywords
kw_save = 'save'
kw_save_overwrite = 'save-overwrite'
kw_restore = 'restore'
kw_restore_overwrite = 'restore-overwrite'
kw_verbose = 'verbose'
kw_quiet = 'quiet'
def __init__(self):
DuccUtil.__init__(self)
self.cmd_nodetool = self.DUCC_HOME+self.subdir_cassandra_bin+'/'+self.cmd_name_nodetool
self.ducc_database = self.DUCC_HOME+self.subdir_database
self.ducc_database_data = self.DUCC_HOME+self.subdir_database_data
def terminate(self):
text = 'unable to continue, processing terminated'
self.logger.warn(text)
raise SystemExit(1)
def complete(self):
text = 'processing completed successfully'
self.logger.info(text)
raise SystemExit(0)
def get_epilog(self):
epilog = ''
epilog = epilog + '\n'
epilog = epilog + 'Example:'
epilog = epilog + '\n'
epilog = epilog + 'bash-4.1$ ./db_tool --save-overwrite /backups/ducc/ducc-db.tar.gz'
epilog = epilog + '\n'
epilog = epilog + '2016-09-30 14:05:25,938 I save-overwrite'
epilog = epilog + '\n'
epilog = epilog + '2016-09-30 14:05:28,570 I Uptime (seconds) : 506'
epilog = epilog + '\n'
epilog = epilog + '2016-09-30 14:05:28,571 I database is up'
epilog = epilog + '\n'
epilog = epilog + '2016-09-30 14:05:28,571 I remove snapshot'
epilog = epilog + '\n'
epilog = epilog + '2016-09-30 14:05:31,152 I create snapshot'
epilog = epilog + '\n'
epilog = epilog + '2016-09-30 14:05:34,002 I remove /home/degenaro/ducc-db.tar.gz'
epilog = epilog + '\n'
epilog = epilog + '2016-09-30 14:05:34,015 I create /home/degenaro/ducc-db.tar.gz'
epilog = epilog + '\n'
epilog = epilog + '2016-09-30 14:05:34,499 I count[files]=303'
epilog = epilog + '\n'
epilog = epilog + '2016-09-30 14:05:34,499 I size[bytes]=177844'
epilog = epilog + '\n'
epilog = epilog + '2016-09-30 14:05:34,499 I processing completed successfully'
epilog = epilog + '\n'
return epilog
# parse command line
def parse(self, argv):
self.parser = DbToolOptionParser(epilog=self.get_epilog())
self.parser.formatter.help_position = 36
self.parser.formatter.max_help_position = 36
self.parser.add_option('--'+self.kw_save, action='store', dest='var_save', default=None, metavar='PATH.tar.gz',
help='create snapshot of database and save to specified tar.gz file; requires database to be up')
self.parser.add_option('--'+self.kw_save_overwrite, action='store', dest='var_save_overwrite', default=None, metavar='PATH.tar.gz',
help='create snapshot of database and save to specified tar.gz file, overwriting previous file if one exists; requires database to be up')
self.parser.add_option('--'+self.kw_restore, action='store', dest='var_restore', default=None, metavar='PATH.tar.gz',
help='extract snapshot from specified tar.gz file and restore database; requires database to be down')
self.parser.add_option('--'+self.kw_restore_overwrite, action='store', dest='var_restore_overwrite', default=None, metavar='PATH.tar.gz',
help='extract snapshot from specified tar.gz file and restore database, overwriting previous database if one exists; requires database to be down')
self.parser.add_option('--'+self.kw_quiet, action='store_true', dest='var_quiet', default=False,
help='print no informational messages')
self.parser.add_option('--'+self.kw_verbose, action='store_true', dest='var_verbose', default=False,
help='print extra debug messages')
(self.options, self.args) = self.parser.parse_args()
# at least one of { save, save-overwrite, restore }
if (self.options.var_save == None) and (self.options.var_save_overwrite == None) and (self.options.var_restore == None) and (self.options.var_restore_overwrite == None):
self.parser.error('option --'+self.kw_save+
' or --'+self.kw_save_overwrite+
' or --'+self.kw_restore+
' or --'+self.kw_restore_overwrite+
' must be specified')
# exactly one of { save, save-overwrite, restore, restore-overwrite }
if (self.options.var_save != None) and (self.options.var_save_overwrite != None):
self.parser.error('options --'+self.kw_save+' and --'+self.kw_save_overwrite+' are mutually exclusive')
if (self.options.var_save != None) and (self.options.var_restore != None):
self.parser.error('options --'+self.kw_save+' and --'+self.kw_restore+' are mutually exclusive')
if (self.options.var_save != None) and (self.options.var_restore_overwrite != None):
self.parser.error('options --'+self.kw_save+' and --'+self.kw_restore_overwrite+' are mutually exclusive')
if (self.options.var_save_overwrite != None) and (self.options.var_restore != None):
self.parser.error('options --'+self.kw_save_overwrite+' and --'+self.kw_restore+' are mutually exclusive')
if (self.options.var_save_overwrite != None) and (self.options.var_restore_overwrite != None):
self.parser.error('options --'+self.kw_save_overwrite+' and --'+self.kw_restore_overwrite+' are mutually exclusive')
if (self.options.var_restore != None) and (self.options.var_restore_overwrite != None):
self.parser.error('options --'+self.kw_restore+' and --'+self.kw_restore_overwrite+' are mutually exclusive')
# exactly one of { quiet, verbose }
if (self.options.var_quiet) and (self.options.var_verbose):
self.parser.error('options --'+self.kw_quiet+' and --'+self.kw_verbose+' are mutually exclusive')
def db_status(self):
status = 'unknown'
p = subprocess.Popen([self.cmd_nodetool, 'info'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if ((err != None) and (err != '')):
self.logger.debug('err: '+err)
if (err.startswith(self.cmd_name_nodetool+': Failed to connect')):
status = 'down'
elif ((out != None) and (out != '')):
self.logger.debug('out: '+out)
for line in out.splitlines():
if(line.startswith('Uptime')):
text = ' '.join(line.split())
self.logger.info(text)
status = 'up'
break
return status
def assure_database_up(self):
if(self.db_status() != 'up'):
text = 'database is not up'
self.logger.warn(text)
self.terminate()
text = 'database is up'
self.logger.info(text)
def assure_database_down(self):
if(self.db_status() != 'down'):
text = 'database is not down'
self.logger.warn(text)
self.terminate()
text = 'database is down'
self.logger.info(text)
# remove snapshot
def remove_snapshot(self):
text = 'remove snapshot'
self.logger.info(text)
name = self.snapshot_name
p = subprocess.Popen([self.cmd_nodetool, 'clearsnapshot', '-t', name], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
self.logger.debug(out)
if(err != None):
if(err != ''):
self.logger.warn(err)
text = 'remove snapshot failure'
self.logger.warn(text)
self.terminate()
# create snapshot
def create_snapshot(self):
text = 'create snapshot'
self.logger.info(text)
name = self.snapshot_name
p = subprocess.Popen([self.cmd_nodetool, 'snapshot', '-t', name], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
self.logger.debug(out)
if(err != None):
if(err != ''):
self.logger.warn(err)
text = 'create snapshot failure'
self.logger.warn(text)
self.terminate()
# remove database directory
def remove_directory(self):
path = self.ducc_database
if(not os.path.exists(path)):
return
text = 'remove '+path
self.logger.info(text)
p = subprocess.Popen(['/bin/rm', '-fr', path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
self.logger.debug(out)
if(err != None):
if(err != ''):
self.logger.warn(err)
text = 'remove '+path+' failure'
self.logger.warn(text)
self.terminate()
# create database directory
def create_directory(self):
path = self.ducc_database_data
if(os.path.exists(path)):
return
text = 'create '+path
self.logger.info(text)
os.makedirs(path)
# remove tar.gz
def remove_targz(self):
if(not os.path.exists(self.targz)):
return
text = 'remove '+self.targz
self.logger.info(text)
p = subprocess.Popen(['/bin/rm', self.targz], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
self.logger.debug(out)
if(err != None):
if(err != ''):
self.logger.warn(err)
text = 'remove '+self.targz+' failure'
self.logger.warn(text)
self.terminate()
# create targz directory
def makedirs_targz(self):
path = self.targz.rsplit('/',1)[0]
if(os.path.exists(path)):
return
text = 'makedirs '+path
self.logger.info(text)
os.makedirs(path)
# create tar.gz
def create_targz(self):
text = 'create '+self.targz
self.logger.info(text)
source = self.ducc_database_data
target = self.targz
filelist = glob.glob(source+'/*/**/snapshots/SNAPSHOT/**')
tarf = tarfile.open(target,'w:gz')
count = 0
try:
for file in filelist:
relative_name = file.split(source+'/')[1]
no_snapshot_name = relative_name.split('/snapshots/SNAPSHOT')
restore_name = no_snapshot_name[0]+no_snapshot_name[1]
tarf.add(file, restore_name)
self.logger.debug('add: '+restore_name)
count = count + 1
finally:
tarf.close()
self.logger.info('count[files]='+str(count))
size = os.path.getsize(self.targz)
self.logger.info('size[bytes]='+str(size))
# check tar.gz
def check_targz(self):
text = 'check '+self.targz
self.logger.info(text)
target = self.targz
p = subprocess.Popen(['/bin/tar', '-tzf', target], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
self.logger.debug(out)
if(err != None):
if(err != ''):
self.logger.warn(err)
text = 'check '+self.targz+' failure'
self.logger.warn(text)
self.terminate()
size = os.path.getsize(self.targz)
self.logger.info('size[bytes]='+str(size))
#count = 0
#for line in out.splitlines():
# count = count + 1
#self.logger.info('count[files]='+str(count))
# extract tar.gz
def extract_targz(self):
text = 'extract '+self.targz
self.logger.info(text)
source = self.targz
target = self.ducc_database_data
p = subprocess.Popen(['/bin/tar', '-xvzf', source, '-C', target], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
self.logger.debug(out)
if(err != None):
if(err != ''):
self.logger.warn(err)
text = 'extract '+self.targz+' failure'
self.logger.warn(text)
self.terminate()
count = 0
for line in out.splitlines():
self.logger.debug("extract: "+line)
count = count + 1
self.logger.info('count[files]='+str(count))
# steps for --save
def save_virgin(self):
text = 'save'
self.logger.info(text)
self.targz = self.options.var_save
if(os.path.exists(self.targz)):
text = self.targz+' '+'exists'
self.logger.warn(text)
self.terminate()
self.save()
# steps for save-overwrite
def save_overwrite(self):
text = self.kw_save_overwrite
self.logger.info(text)
self.targz = self.options.var_save_overwrite
if(os.path.exists(self.targz)):
if(not os.path.isfile(self.targz)):
text = self.targz+' '+'not a file'
self.logger.warn(text)
self.terminate()
if(not os.access(self.targz, os.W_OK)):
text = self.targz+' '+'not writable'
self.logger.warn(text)
self.terminate()
self.save()
# common steps for --save and --save-overwrite
def save(self):
self.assure_database_up()
self.remove_snapshot()
self.create_snapshot()
self.remove_targz()
self.makedirs_targz()
self.create_targz()
self.complete()
# steps for --restore
def restore_virgin(self):
text = 'restore'
self.logger.info(text)
self.targz = self.options.var_restore
path = self.ducc_database
if(os.path.exists(path)):
text = path+' '+'exists'
self.logger.warn(text)
self.terminate()
self.restore()
# steps for --restore-overwrite
def restore_overwrite(self):
text = self.kw_restore_overwrite
self.logger.info(text)
self.targz = self.options.var_restore_overwrite
path = self.ducc_database
if(os.path.exists(path)):
if(os.path.isfile(path)):
text = path+' '+'not a directory'
self.logger.warn(text)
self.terminate()
if(not os.access(path, os.W_OK)):
text = path+' '+'not writable'
self.logger.warn(text)
self.terminate()
self.restore()
# common steps for --restore and --restore-overwrite
def restore(self):
text = self.kw_restore
self.logger.debug(text)
self.assure_database_down()
self.check_targz()
self.remove_directory()
self.create_directory()
self.extract_targz()
self.complete()
# record hostname where invoked
def hostname(self):
text = 'host='+socket.gethostbyaddr(socket.gethostname())[0]
self.logger.info(text)
# --save or --save-overwrite or --restore or --restore-overwrite
def main(self, argv):
self.parse(argv)
self.logger = Logger(self.options)
self.hostname()
if(self.options.var_save != None):
self.save_virgin()
elif (self.options.var_save_overwrite != None):
self.save_overwrite()
elif (self.options.var_restore != None):
self.restore_virgin()
elif (self.options.var_restore_overwrite != None):
self.restore_overwrite()
else:
text = 'huh?'
self.logger.warn(text)
if __name__ == '__main__':
instance = DbTool()
instance.main(sys.argv[1:])