| #!/usr/bin/env python |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| from qpidstore import jerr, jrnl, janal |
| import optparse, os, sys |
| |
| |
| #== class StoreChk ============================================================ |
| |
| class StoreChk(object): |
| """ |
| This class: |
| 1. Reads a journal jinf file, and from its info: |
| 2. Analyzes the journal data files to determine which is the last to be written, then |
| 3. Reads and analyzes all the records in the journal files. |
| The only public method is run() which kicks off the analysis. |
| """ |
| |
| def __init__(self): |
| """Constructor""" |
| # params |
| self.opts = None |
| |
| self._jdir = None |
| |
| # recovery analysis objects |
| # self._jrnl_info = None |
| # self.jrnl_rdr = None |
| |
| self._process_args() |
| self._jrnl_info = jrnl.JrnlInfo(self._jdir, self.opts.bfn) |
| # FIXME: This is a hack... find an elegant way of getting the file size to jrec! |
| jrnl.JRNL_FILE_SIZE = self._jrnl_info.get_jrnl_file_size_bytes() |
| self.jrnl_anal = janal.JrnlAnalyzer(self._jrnl_info) |
| self.jrnl_rdr = janal.JrnlReader(self._jrnl_info, self.jrnl_anal, self.opts.qflag, self.opts.rflag, |
| self.opts.vflag) |
| |
| def run(self): |
| """Run the store check""" |
| if not self.opts.qflag: |
| print self._jrnl_info |
| print self.jrnl_anal |
| self.jrnl_rdr.run() |
| self._report() |
| |
| def _report(self): |
| """Print the results of the store check""" |
| if not self.opts.qflag: |
| print |
| print " === REPORT ====" |
| print |
| print "Records: %8d non-transactional" % \ |
| (self.jrnl_rdr.get_msg_cnt() - self.jrnl_rdr.get_txn_msg_cnt()) |
| print " %8d transactional" % self.jrnl_rdr.get_txn_msg_cnt() |
| print " %8d total" % self.jrnl_rdr.get_msg_cnt() |
| print |
| print "Transactions: %8d aborts" % self.jrnl_rdr.get_abort_cnt() |
| print " %8d commits" % self.jrnl_rdr.get_commit_cnt() |
| print " %8d total" % (self.jrnl_rdr.get_abort_cnt() + self.jrnl_rdr.get_commit_cnt()) |
| print |
| if self.jrnl_rdr.emap().size() > 0: |
| print "Remaining enqueued records (sorted by rid): " |
| rid_list = self.jrnl_rdr.emap().rids() |
| rid_list.sort() |
| for rid in rid_list: |
| l = self.jrnl_rdr.emap().get(rid) |
| locked = "" |
| if l[2]: |
| locked += " (locked)" |
| print " fid=%d %s%s" % (l[0], l[1], locked) |
| print "WARNING: Enqueue-Dequeue mismatch, %d enqueued records remain." % self.jrnl_rdr.emap().size() |
| else: |
| print "No remaining enqueued records found (emap empty)." |
| print |
| if self.jrnl_rdr.tmap().size() > 0: |
| txn_rec_cnt = 0 |
| print "Incomplete transactions: " |
| for xid in self.jrnl_rdr.tmap().xids(): |
| jrnl.Utils.format_xid(xid) |
| recs = self.jrnl_rdr.tmap().get(xid) |
| for l in recs: |
| print " fid=%d %s" % (l[0], l[1]) |
| print " Total: %d records for %s" % (len(recs), jrnl.Utils.format_xid(xid)) |
| print |
| txn_rec_cnt += len(recs) |
| print "WARNING: Incomplete transactions found, %d xids remain containing a total of %d records." % \ |
| (self.jrnl_rdr.tmap().size(), txn_rec_cnt) |
| else: |
| print "No incomplete transactions found (tmap empty)." |
| print |
| print "%d enqueues, %d journal records processed." % \ |
| (self.jrnl_rdr.get_msg_cnt(), self.jrnl_rdr.get_rec_cnt()) |
| |
| |
| def _process_args(self): |
| """Process the command-line arguments""" |
| opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0") |
| opt.add_option("-b", "--base-filename", |
| action="store", dest="bfn", default="JournalData", |
| help="Base filename for old journal files") |
| opt.add_option("-q", "--quiet", |
| action="store_true", dest="qflag", |
| help="Quiet (suppress all non-error output)") |
| opt.add_option("-r", "--records", |
| action="store_true", dest="rflag", |
| help="Print all records and transactions (including consumed/closed)") |
| opt.add_option("-v", "--verbose", |
| action="store_true", dest="vflag", |
| help="Verbose output") |
| (self.opts, args) = opt.parse_args() |
| if len(args) == 0: |
| opt.error("No journal directory argument") |
| elif len(args) > 1: |
| opt.error("Too many positional arguments: %s" % args) |
| if self.opts.qflag and self.opts.rflag: |
| opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive") |
| if self.opts.qflag and self.opts.vflag: |
| opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive") |
| self._jdir = args[0] |
| if not os.path.exists(self._jdir): |
| opt.error("Journal path \"%s\" does not exist" % self._jdir) |
| |
| |
| #== class CsvStoreChk ========================================================= |
| |
| class CsvStoreChk(StoreChk): |
| """ |
| This class, in addition to analyzing a journal, can compare the journal footprint (ie enqueued/dequeued/transaction |
| record counts) to expected values from a CSV file. This can be used for additional automated testing, and is |
| currently in use in the long store tests for journal encode testing. |
| """ |
| |
| # CSV file cols |
| TEST_NUM_COL = 0 |
| NUM_MSGS_COL = 5 |
| MIN_MSG_SIZE_COL = 7 |
| MAX_MSG_SIZE_COL = 8 |
| MIN_XID_SIZE_COL = 9 |
| MAX_XID_SIZE_COL = 10 |
| AUTO_DEQ_COL = 11 |
| TRANSIENT_COL = 12 |
| EXTERN_COL = 13 |
| COMMENT_COL = 20 |
| |
| def __init__(self): |
| """Constructor""" |
| StoreChk.__init__(self) |
| |
| # csv params |
| self.num_msgs = None |
| self.msg_len = None |
| self.auto_deq = None |
| self.xid_len = None |
| self.transient = None |
| self.extern = None |
| |
| self._warning = [] |
| |
| self.jrnl_rdr.set_callbacks(self, CsvStoreChk._csv_pre_run_chk, CsvStoreChk._csv_enq_chk, |
| CsvStoreChk._csv_deq_chk, CsvStoreChk._csv_txn_chk, CsvStoreChk._csv_post_run_chk) |
| self._get_csv_test() |
| |
| def _get_csv_test(self): |
| """Get a test from the CSV reader""" |
| if self.opts.csvfn != None and self.opts.tnum != None: |
| tparams = self._read_csv_file(self.opts.csvfn, self.opts.tnum) |
| if tparams == None: |
| print "ERROR: Test %d not found in CSV file \"%s\"" % (self.opts.tnum, self.opts.csvfn) |
| sys.exit(1) |
| self.num_msgs = tparams["num_msgs"] |
| if tparams["min_size"] == tparams["max_size"]: |
| self.msg_len = tparams["max_size"] |
| else: |
| self.msg_len = 0 |
| self.auto_deq = tparams["auto_deq"] |
| if tparams["xid_min_size"] == tparams["xid_max_size"]: |
| self.xid_len = tparams["xid_max_size"] |
| else: |
| self.xid_len = 0 |
| self.transient = tparams["transient"] |
| self.extern = tparams["extern"] |
| |
| def _read_csv_file(self, filename, tnum): |
| """Read the CSV test parameter file""" |
| try: |
| csvf = open(filename, "r") |
| except IOError: |
| print "ERROR: Unable to open CSV file \"%s\"" % filename |
| sys.exit(1) |
| for line in csvf: |
| str_list = line.strip().split(",") |
| if len(str_list[0]) > 0 and str_list[0][0] != "\"": |
| try: |
| if (int(str_list[self.TEST_NUM_COL]) == tnum): |
| return { "num_msgs": int(str_list[self.NUM_MSGS_COL]), |
| "min_size": int(str_list[self.MIN_MSG_SIZE_COL]), |
| "max_size": int(str_list[self.MAX_MSG_SIZE_COL]), |
| "auto_deq": not (str_list[self.AUTO_DEQ_COL] == "FALSE" or |
| str_list[self.AUTO_DEQ_COL] == "0"), |
| "xid_min_size": int(str_list[self.MIN_XID_SIZE_COL]), |
| "xid_max_size": int(str_list[self.MAX_XID_SIZE_COL]), |
| "transient": not (str_list[self.TRANSIENT_COL] == "FALSE" or |
| str_list[self.TRANSIENT_COL] == "0"), |
| "extern": not (str_list[self.EXTERN_COL] == "FALSE" or |
| str_list[self.EXTERN_COL] == "0"), |
| "comment": str_list[self.COMMENT_COL] } |
| except Exception: |
| pass |
| return None |
| |
| def _process_args(self): |
| """Process command-line arguments""" |
| opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0") |
| opt.add_option("-b", "--base-filename", |
| action="store", dest="bfn", default="JournalData", |
| help="Base filename for old journal files") |
| opt.add_option("-c", "--csv-filename", |
| action="store", dest="csvfn", |
| help="CSV filename containing test parameters") |
| opt.add_option("-q", "--quiet", |
| action="store_true", dest="qflag", |
| help="Quiet (suppress all non-error output)") |
| opt.add_option("-r", "--records", |
| action="store_true", dest="rflag", |
| help="Print all records and transactions (including consumed/closed)") |
| opt.add_option("-t", "--test-num", |
| action="store", type="int", dest="tnum", |
| help="Test number from CSV file - only valid if CSV file named") |
| opt.add_option("-v", "--verbose", |
| action="store_true", dest="vflag", |
| help="Verbose output") |
| (self.opts, args) = opt.parse_args() |
| if len(args) == 0: |
| opt.error("No journal directory argument") |
| elif len(args) > 1: |
| opt.error("Too many positional arguments: %s" % args) |
| if self.opts.qflag and self.opts.rflag: |
| opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive") |
| if self.opts.qflag and self.opts.vflag: |
| opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive") |
| self._jdir = args[0] |
| if not os.path.exists(self._jdir): |
| opt.error("Journal path \"%s\" does not exist" % self._jdir) |
| |
| # Callbacks for checking against CSV test parameters. Return False if ok, True to raise error. |
| |
| #@staticmethod |
| def _csv_pre_run_chk(csv_store_chk): |
| """Check performed before a test runs""" |
| if csv_store_chk.num_msgs == None: |
| return |
| if csv_store_chk.jrnl_anal.is_empty() and csv_store_chk.num_msgs > 0: |
| raise jerr.AllJrnlFilesEmptyCsvError(csv_store_chk.get_opts().tnum, csv_store_chk.num_msgs) |
| return False |
| _csv_pre_run_chk = staticmethod(_csv_pre_run_chk) |
| |
| #@staticmethod |
| def _csv_enq_chk(csv_store_chk, hdr): |
| """Check performed before each enqueue operation""" |
| #if csv_store_chk.num_msgs == None: return |
| # |
| if csv_store_chk.extern != None: |
| if csv_store_chk.extern != hdr.extern: |
| raise jerr.ExternFlagCsvError(csv_store_chk.opts.tnum, csv_store_chk.extern) |
| if hdr.extern and hdr.data != None: |
| raise jerr.ExternFlagWithDataCsvError(csv_store_chk.opts.tnum) |
| if csv_store_chk.msg_len != None and csv_store_chk.msg_len > 0 and hdr.data != None and \ |
| len(hdr.data) != csv_store_chk.msg_len: |
| raise jerr.MessageLengthCsvError(csv_store_chk.opts.tnum, csv_store_chk.msg_len, len(hdr.data)) |
| if csv_store_chk.xid_len != None and csv_store_chk.xid_len > 0 and len(hdr.xid) != csv_store_chk.xid_len: |
| raise jerr.XidLengthCsvError(csv_store_chk.opts.tnum, csv_store_chk.xid_len, len(hdr.xid)) |
| if csv_store_chk.transient != None and hdr.transient != csv_store_chk.transient: |
| raise jerr.TransactionCsvError(csv_store_chk.opts.tnum, csv_store_chk.transient) |
| return False |
| _csv_enq_chk = staticmethod(_csv_enq_chk) |
| |
| #@staticmethod |
| def _csv_deq_chk(csv_store_chk, hdr): |
| """Check performed before each dequeue operation""" |
| if csv_store_chk.auto_deq != None and not csv_store_chk.auto_deq: |
| raise jerr.JWarning("[CSV %d] WARNING: Dequeue record rid=%d found in non-dequeue test - ignoring." % |
| (csv_store_chk.opts.tnum, hdr.rid)) |
| #self._warning.append("[CSV %d] WARNING: Dequeue record rid=%d found in non-dequeue test - ignoring." % |
| # (csv_store_chk.opts.tnum, hdr.rid)) |
| return False |
| _csv_deq_chk = staticmethod(_csv_deq_chk) |
| |
| #@staticmethod |
| def _csv_txn_chk(csv_store_chk, hdr): |
| """Check performed before each transaction commit/abort""" |
| return False |
| _csv_txn_chk = staticmethod(_csv_txn_chk) |
| |
| #@staticmethod |
| def _csv_post_run_chk(csv_store_chk): |
| """Cehck performed after the completion of the test""" |
| # Exclude this check if lastFileFlag is set - the count may be less than the number of msgs sent because |
| # of journal overwriting |
| if csv_store_chk.num_msgs != None and not csv_store_chk.jrnl_rdr.is_last_file() and \ |
| csv_store_chk.num_msgs != csv_store_chk.jrnl_rdr.get_msg_cnt(): |
| raise jerr.NumMsgsCsvError(csv_store_chk.opts.tnum, csv_store_chk.num_msgs, |
| csv_store_chk.jrnl_rdr.get_msg_cnt()) |
| return False |
| _csv_post_run_chk = staticmethod(_csv_post_run_chk) |
| |
| #============================================================================== |
| # main program |
| #============================================================================== |
| |
| if __name__ == "__main__": |
| M = CsvStoreChk() |
| try: |
| M.run() |
| except Exception, e: |
| sys.exit(e) |