| #!/usr/bin/env python |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| from qpidstore import jerr, jrnl, janal |
| import glob, optparse, os, sys, time |
| |
| |
| #== class Resize ============================================================== |
| |
| class Resize(object): |
| """ |
| Creates a new store journal and copies records from old journal to new. The new journal may be of |
| different size from the old one. The records are packed into the new journal (ie only remaining |
| enqueued records and associated transactions - if any - are copied over without spaces between them). |
| |
| The default action is to push the old journal down into a 'bak' sub-directory and then create a |
| new journal of the same size and pack it with the records from the old. However, it is possible to |
| suppress the pushdown (using --no-pushdown), in which case either a new journal id (using |
| --new-base-filename) or an old journal id (usnig --old-base-filename) must be supplied. In the former |
| case,a new journal will be created using the new base file name alongside the old one. In the latter |
| case, the old journal will be renamed to the supplied name, and the new one will take the default. |
| Note that both can be specified together with the --no-pushdown option. |
| |
| To resize the journal, use the optional --num-jfiles and/or --jfile-size parameters. These |
| should be large enough to write all the records or an error will result. If the size is large enough |
| to write all records, but too small to keep below the enqueue threshold, a warning will be printed. |
| Note that as any valid size will be accepted, a journal can also be shrunk, as long as it is sufficiently |
| big to accept the transferred records. |
| """ |
| |
| BAK_DIR = "bak" |
| JFILE_SIZE_PGS_MIN = 1 |
| JFILE_SIZE_PGS_MAX = 32768 |
| NUM_JFILES_MIN = 4 |
| NUM_JFILES_MAX = 64 |
| |
| def __init__(self): |
| """Constructor""" |
| self._opts = None |
| self._jdir = None |
| self._fname = None |
| self._fnum = None |
| self._file = None |
| self._file_rec_wr_cnt = None |
| self._filler_wr_cnt = None |
| self._last_rec_fid = None |
| self._last_rec_offs = None |
| self._rec_wr_cnt = None |
| |
| self._jrnl_info = None |
| self._jrnl_analysis = None |
| self._jrnl_reader = None |
| |
| self._process_args() |
| self._jrnl_info = jrnl.JrnlInfo(self._jdir, self._opts.bfn) |
| # FIXME: This is a hack... find an elegant way of getting the file size to jrec! |
| jrnl.JRNL_FILE_SIZE = self._jrnl_info.get_jrnl_file_size_bytes() |
| self._jrnl_analysis = janal.JrnlAnalyzer(self._jrnl_info) |
| self._jrnl_reader = janal.JrnlReader(self._jrnl_info, self._jrnl_analysis, self._opts.qflag, self._opts.rflag, |
| self._opts.vflag) |
| |
| def run(self): |
| """Perform the action of resizing the journal""" |
| if not self._opts.qflag: |
| print self._jrnl_analysis |
| self._jrnl_reader.run() |
| if self._opts.vflag: |
| print self._jrnl_info |
| if not self._opts.qflag: |
| print self._jrnl_reader.report(self._opts.vflag, self._opts.rflag) |
| self._handle_old_files() |
| self._create_new_files() |
| if not self._opts.qflag: |
| print "Transferred %d records to new journal." % self._rec_wr_cnt |
| self._chk_free() |
| |
| def _chk_free(self): |
| """Check if sufficient space is available in resized journal to be able to enqueue. Raise a warning if not.""" |
| if self._last_rec_fid == None or self._last_rec_offs == None: |
| return |
| wr_capacity_bytes = self._last_rec_fid * self._jrnl_info.get_jrnl_data_size_bytes() + self._last_rec_offs |
| tot_capacity_bytes = self._jrnl_info.get_tot_jrnl_data_size_bytes() |
| percent_full = 100.0 * wr_capacity_bytes / tot_capacity_bytes |
| if percent_full > 80.0: |
| raise jerr.JWarning("WARNING: Journal %s is %2.1f%% full and will likely not allow enqueuing of new records" |
| " until some existing records are dequeued." % |
| (self._jrnl_info.get_jrnl_id(), percent_full)) |
| |
| def _create_new_files(self): |
| """Create new journal files""" |
| # Assemble records to be transferred |
| master_record_list = {} |
| txn_record_list = self._jrnl_reader.txn_obj_list() |
| if self._opts.vflag and self._jrnl_reader.emap().size() > 0: |
| print "* Assembling %d records from emap" % self._jrnl_reader.emap().size() |
| for tup in self._jrnl_reader.emap().get_rec_list(): |
| hdr = tup[1] |
| hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi |
| master_record_list[long(hdr.rid)] = hdr |
| if hdr.xidsize > 0 and hdr.xid in txn_record_list: |
| txn_hdr = txn_record_list[hdr.xid] |
| del(txn_record_list[hdr.xid]) |
| txn_hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi |
| master_record_list[long(txn_hdr.rid)] = txn_hdr |
| if self._opts.vflag and self._jrnl_reader.tmap().size() > 0: |
| print "* Assembling %d records from tmap" % self._jrnl_reader.tmap().size() |
| for xid in self._jrnl_reader.tmap().xids(): |
| for l in self._jrnl_reader.tmap().get(xid): |
| hdr = l[1] |
| hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi |
| master_record_list[hdr.rid] = hdr |
| rid_list = master_record_list.keys() |
| rid_list.sort() |
| |
| # get base filename |
| bfn = self._opts.bfn |
| if self._opts.nbfn != None: |
| bfn = self._opts.nbfn |
| |
| # write jinf file |
| self._jrnl_info.resize(self._opts.njf, self._opts.jfs) |
| self._jrnl_info.write(self._jdir, bfn) |
| |
| # write records |
| if self._opts.vflag: |
| print "* Transferring records to new journal files" |
| fro = self._jrnl_info.get_jrnl_sblk_size_bytes() |
| while len(rid_list) > 0: |
| hdr = master_record_list[rid_list.pop(0)] |
| rec = hdr.encode() |
| pos = 0 |
| while pos < len(rec): |
| if self._file == None or self._file.tell() >= self._jrnl_info.get_jrnl_file_size_bytes(): |
| if self._file == None: |
| rid = hdr.rid |
| elif len(rid_list) == 0: |
| rid = 0 |
| else: |
| rid = rid_list[0] |
| if not self._rotate_file(rid, fro): |
| raise jerr.JournalSpaceExceededError() |
| if len(rec) - pos <= self._jrnl_info.get_jrnl_file_size_bytes() - self._file.tell(): |
| self._file.write(rec[pos:]) |
| self._fill_file(jrnl.Utils.size_in_bytes_to_blk(self._file.tell(), |
| self._jrnl_info.get_jrnl_dblk_size_bytes())) |
| pos = len(rec) |
| fro = self._jrnl_info.get_jrnl_sblk_size_bytes() |
| else: |
| flen = self._jrnl_info.get_jrnl_file_size_bytes() - self._file.tell() |
| self._file.write(rec[pos:pos + flen]) |
| pos += flen |
| rem = len(rec) - pos |
| if rem <= self._jrnl_info.get_jrnl_data_size_bytes(): |
| fro = (jrnl.Utils.size_in_bytes_to_blk(self._jrnl_info.get_jrnl_sblk_size_bytes() + rem, |
| self._jrnl_info.get_jrnl_dblk_size_bytes())) |
| else: |
| fro = 0 |
| self._rec_wr_cnt += 1 |
| self._file_rec_wr_cnt += 1 |
| self._fill_file(add_filler_recs = True) |
| while self._rotate_file(): |
| pass |
| |
| def _fill_file(self, to_posn = None, add_filler_recs = False): |
| """Fill a file to a known offset""" |
| if self._file == None: |
| return |
| if add_filler_recs: |
| nfr = int(jrnl.Utils.rem_bytes_in_blk(self._file, self._jrnl_info.get_jrnl_sblk_size_bytes()) / |
| self._jrnl_info.get_jrnl_dblk_size_bytes()) |
| if nfr > 0: |
| self._filler_wr_cnt = nfr |
| for i in range(0, nfr): |
| self._file.write("RHMx") |
| self._fill_file(jrnl.Utils.size_in_bytes_to_blk(self._file.tell(), |
| self._jrnl_info.get_jrnl_dblk_size_bytes())) |
| self._last_rec_fid = self._fnum |
| self._last_rec_offs = self._file.tell() |
| if to_posn == None: |
| to_posn = self._jrnl_info.get_jrnl_file_size_bytes() |
| elif to_posn > self._jrnl_info.get_jrnl_file_size_bytes(): |
| raise jerr.FillExceedsFileSizeError(to_posn, self._jrnl_info.get_jrnl_file_size_bytes()) |
| diff = to_posn - self._file.tell() |
| self._file.write(str("\0" * diff)) |
| #DEBUG |
| if self._file.tell() != to_posn: |
| raise jerr.FillSizeError(self._file.tell(), to_posn) |
| |
| def _rotate_file(self, rid = None, fro = None): |
| """Switch to the next logical file""" |
| if self._file != None: |
| self._file.close() |
| if self._opts.vflag: |
| if self._file_rec_wr_cnt == 0: |
| print " (empty)" |
| elif self._filler_wr_cnt == None: |
| print " (%d records)" % self._file_rec_wr_cnt |
| else: |
| print " (%d records + %d filler(s))" % (self._file_rec_wr_cnt, self._filler_wr_cnt) |
| if self._fnum == None: |
| self._fnum = 0 |
| self._rec_wr_cnt = 0 |
| elif self._fnum == self._jrnl_info.get_num_jrnl_files() - 1: |
| return False |
| else: |
| self._fnum += 1 |
| self._file_rec_wr_cnt = 0 |
| self._fname = os.path.join(self._jrnl_info.get_jrnl_dir(), "%s.%04x.jdat" % |
| (self._jrnl_info.get_jrnl_base_name(), self._fnum)) |
| if self._opts.vflag: |
| print "* Opening file %s" % self._fname, |
| self._file = open(self._fname, "w") |
| if rid == None or fro == None: |
| self._fill_file() |
| else: |
| now = time.time() |
| fhdr = jrnl.FileHdr(0, "RHMf", jrnl.Hdr.HDR_VER, int(jrnl.Hdr.BIG_ENDIAN), 0, rid) |
| fhdr.init(self._file, 0, self._fnum, self._fnum, fro, int(now), 1000000000*(now - int(now))) |
| self._file.write(fhdr.encode()) |
| self._fill_file(self._jrnl_info.get_jrnl_sblk_size_bytes()) |
| return True |
| |
| def _handle_old_files(self): |
| """Push old journal down into a backup directory""" |
| target_dir = self._jdir |
| if not self._opts.npd: |
| target_dir = os.path.join(self._jdir, self.BAK_DIR) |
| if os.path.exists(target_dir): |
| if self._opts.vflag: |
| print "* Pushdown directory %s exists, deleting content" % target_dir |
| for fname in glob.glob(os.path.join(target_dir, "*")): |
| os.unlink(fname) |
| else: |
| if self._opts.vflag: |
| print "* Creating new pushdown directory %s" % target_dir |
| os.mkdir(target_dir) |
| |
| if not self._opts.npd or self._opts.obfn != None: |
| if self._opts.obfn != None and self._opts.vflag: |
| print "* Renaming old journal files using base name %s" % self._opts.obfn |
| # .jdat files |
| for fname in glob.glob(os.path.join(self._jdir, "%s.*.jdat" % self._opts.bfn)): |
| tbfn = os.path.basename(fname) |
| if self._opts.obfn != None: |
| per1 = tbfn.rfind(".") |
| if per1 >= 0: |
| per2 = tbfn.rfind(".", 0, per1) |
| if per2 >= 0: |
| tbfn = "%s%s" % (self._opts.obfn, tbfn[per2:]) |
| os.rename(fname, os.path.join(target_dir, tbfn)) |
| # .jinf file |
| self._jrnl_info.write(target_dir, self._opts.obfn) |
| os.unlink(os.path.join(self._jdir, "%s.jinf" % self._opts.bfn)) |
| |
| def _print_options(self): |
| """Print program options""" |
| if self._opts.vflag: |
| print "Journal dir: %s" % self._jdir |
| print "Options: Base filename: %s" % self._opts.bfn |
| print " New base filename: %s" % self._opts.nbfn |
| print " Old base filename: %s" % self._opts.obfn |
| print " Pushdown: %s" % self._opts.npd |
| print " No. journal files: %d" % self._opts.njf |
| print " Journal file size: %d 64kiB blocks" % self._opts.jfs |
| print " Show records flag: %s" % self._opts.rflag |
| print " Verbose flag: %s" % True |
| print |
| |
| def _process_args(self): |
| """Process the command-line arguments""" |
| opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0") |
| opt.add_option("-b", "--base-filename", |
| action="store", dest="bfn", default="JournalData", |
| help="Base filename for old journal files") |
| opt.add_option("-B", "--new-base-filename", |
| action="store", dest="nbfn", |
| help="Base filename for new journal files") |
| opt.add_option("-n", "--no-pushdown", |
| action="store_true", dest="npd", |
| help="Suppress pushdown of old files into \"bak\" dir; old files will remain in existing dir") |
| opt.add_option("-N", "--num-jfiles", |
| action="store", type="int", dest="njf", default=8, |
| help="Number of files for new journal (%d-%d)" % (self.NUM_JFILES_MIN, self.NUM_JFILES_MAX)) |
| opt.add_option("-o", "--old-base-filename", |
| action="store", dest="obfn", |
| help="Base filename for old journal files") |
| opt.add_option("-q", "--quiet", |
| action="store_true", dest="qflag", |
| help="Quiet (suppress all non-error output)") |
| opt.add_option("-r", "--records", |
| action="store_true", dest="rflag", |
| help="Print remaining records and transactions") |
| opt.add_option("-s", "--jfile-size-pgs", |
| action="store", type="int", dest="jfs", default=24, |
| help="Size of each new journal file in 64kiB blocks (%d-%d)" % |
| (self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX)) |
| opt.add_option("-v", "--verbose", |
| action="store_true", dest="vflag", |
| help="Verbose output") |
| (self._opts, args) = opt.parse_args() |
| if len(args) == 0: |
| opt.error("No journal directory argument") |
| elif len(args) > 1: |
| opt.error("Too many positional arguments: %s" % args) |
| if self._opts.qflag and self._opts.rflag: |
| opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive") |
| if self._opts.qflag and self._opts.vflag: |
| opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive") |
| if self._opts.njf != None and (self._opts.njf < self.NUM_JFILES_MIN or self._opts.njf > self.NUM_JFILES_MAX): |
| opt.error("Number of files (%d) is out of range (%d-%d)" % |
| (self._opts.njf, self.NUM_JFILES_MIN, self.NUM_JFILES_MAX)) |
| if self._opts.jfs != None and (self._opts.jfs < self.JFILE_SIZE_PGS_MIN or |
| self._opts.jfs > self.JFILE_SIZE_PGS_MAX): |
| opt.error("File size (%d) is out of range (%d-%d)" % |
| (self._opts.jfs, self.JFILE_SIZE_PGS_MIN, self.JFILE_SIZE_PGS_MAX)) |
| if self._opts.npd != None and (self._opts.nbfn == None and self._opts.obfn == None): |
| opt.error("If (-n/--no-pushdown) is used, then at least one of (-B/--new-base-filename) and" |
| " (-o/--old-base-filename) must be used.") |
| self._jdir = args[0] |
| if not os.path.exists(self._jdir): |
| opt.error("Journal path \"%s\" does not exist" % self._jdir) |
| self._print_options() |
| |
| #============================================================================== |
| # main program |
| #============================================================================== |
| |
| if __name__ == "__main__": |
| R = Resize() |
| try: |
| R.run() |
| except Exception, e: |
| sys.exit(e) |