blob: f6d70cb3c6685e5344563d865470fa3098c07482 [file] [log] [blame]
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from qpidstore import jerr, jrnl, janal
import optparse, os, sys
#== class StoreChk ============================================================
class StoreChk(object):
"""
This class:
1. Reads a journal jinf file, and from its info:
2. Analyzes the journal data files to determine which is the last to be written, then
3. Reads and analyzes all the records in the journal files.
The only public method is run() which kicks off the analysis.
"""
def __init__(self):
"""Constructor"""
# params
self.opts = None
self._jdir = None
# recovery analysis objects
# self._jrnl_info = None
# self.jrnl_rdr = None
self._process_args()
self._jrnl_info = jrnl.JrnlInfo(self._jdir, self.opts.bfn)
# FIXME: This is a hack... find an elegant way of getting the file size to jrec!
jrnl.JRNL_FILE_SIZE = self._jrnl_info.get_jrnl_file_size_bytes()
self.jrnl_anal = janal.JrnlAnalyzer(self._jrnl_info)
self.jrnl_rdr = janal.JrnlReader(self._jrnl_info, self.jrnl_anal, self.opts.qflag, self.opts.rflag,
self.opts.vflag)
def run(self):
"""Run the store check"""
if not self.opts.qflag:
print self._jrnl_info
print self.jrnl_anal
self.jrnl_rdr.run()
self._report()
def _report(self):
"""Print the results of the store check"""
if not self.opts.qflag:
print
print " === REPORT ===="
print
print "Records: %8d non-transactional" % \
(self.jrnl_rdr.get_msg_cnt() - self.jrnl_rdr.get_txn_msg_cnt())
print " %8d transactional" % self.jrnl_rdr.get_txn_msg_cnt()
print " %8d total" % self.jrnl_rdr.get_msg_cnt()
print
print "Transactions: %8d aborts" % self.jrnl_rdr.get_abort_cnt()
print " %8d commits" % self.jrnl_rdr.get_commit_cnt()
print " %8d total" % (self.jrnl_rdr.get_abort_cnt() + self.jrnl_rdr.get_commit_cnt())
print
if self.jrnl_rdr.emap().size() > 0:
print "Remaining enqueued records (sorted by rid): "
rid_list = self.jrnl_rdr.emap().rids()
rid_list.sort()
for rid in rid_list:
l = self.jrnl_rdr.emap().get(rid)
locked = ""
if l[2]:
locked += " (locked)"
print " fid=%d %s%s" % (l[0], l[1], locked)
print "WARNING: Enqueue-Dequeue mismatch, %d enqueued records remain." % self.jrnl_rdr.emap().size()
else:
print "No remaining enqueued records found (emap empty)."
print
if self.jrnl_rdr.tmap().size() > 0:
txn_rec_cnt = 0
print "Incomplete transactions: "
for xid in self.jrnl_rdr.tmap().xids():
jrnl.Utils.format_xid(xid)
recs = self.jrnl_rdr.tmap().get(xid)
for l in recs:
print " fid=%d %s" % (l[0], l[1])
print " Total: %d records for %s" % (len(recs), jrnl.Utils.format_xid(xid))
print
txn_rec_cnt += len(recs)
print "WARNING: Incomplete transactions found, %d xids remain containing a total of %d records." % \
(self.jrnl_rdr.tmap().size(), txn_rec_cnt)
else:
print "No incomplete transactions found (tmap empty)."
print
print "%d enqueues, %d journal records processed." % \
(self.jrnl_rdr.get_msg_cnt(), self.jrnl_rdr.get_rec_cnt())
def _process_args(self):
"""Process the command-line arguments"""
opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
opt.add_option("-b", "--base-filename",
action="store", dest="bfn", default="JournalData",
help="Base filename for old journal files")
opt.add_option("-q", "--quiet",
action="store_true", dest="qflag",
help="Quiet (suppress all non-error output)")
opt.add_option("-r", "--records",
action="store_true", dest="rflag",
help="Print all records and transactions (including consumed/closed)")
opt.add_option("-v", "--verbose",
action="store_true", dest="vflag",
help="Verbose output")
(self.opts, args) = opt.parse_args()
if len(args) == 0:
opt.error("No journal directory argument")
elif len(args) > 1:
opt.error("Too many positional arguments: %s" % args)
if self.opts.qflag and self.opts.rflag:
opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
if self.opts.qflag and self.opts.vflag:
opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
self._jdir = args[0]
if not os.path.exists(self._jdir):
opt.error("Journal path \"%s\" does not exist" % self._jdir)
#== class CsvStoreChk =========================================================
class CsvStoreChk(StoreChk):
"""
This class, in addition to analyzing a journal, can compare the journal footprint (ie enqueued/dequeued/transaction
record counts) to expected values from a CSV file. This can be used for additional automated testing, and is
currently in use in the long store tests for journal encode testing.
"""
# CSV file cols
TEST_NUM_COL = 0
NUM_MSGS_COL = 5
MIN_MSG_SIZE_COL = 7
MAX_MSG_SIZE_COL = 8
MIN_XID_SIZE_COL = 9
MAX_XID_SIZE_COL = 10
AUTO_DEQ_COL = 11
TRANSIENT_COL = 12
EXTERN_COL = 13
COMMENT_COL = 20
def __init__(self):
"""Constructor"""
StoreChk.__init__(self)
# csv params
self.num_msgs = None
self.msg_len = None
self.auto_deq = None
self.xid_len = None
self.transient = None
self.extern = None
self._warning = []
self.jrnl_rdr.set_callbacks(self, CsvStoreChk._csv_pre_run_chk, CsvStoreChk._csv_enq_chk,
CsvStoreChk._csv_deq_chk, CsvStoreChk._csv_txn_chk, CsvStoreChk._csv_post_run_chk)
self._get_csv_test()
def _get_csv_test(self):
"""Get a test from the CSV reader"""
if self.opts.csvfn != None and self.opts.tnum != None:
tparams = self._read_csv_file(self.opts.csvfn, self.opts.tnum)
if tparams == None:
print "ERROR: Test %d not found in CSV file \"%s\"" % (self.opts.tnum, self.opts.csvfn)
sys.exit(1)
self.num_msgs = tparams["num_msgs"]
if tparams["min_size"] == tparams["max_size"]:
self.msg_len = tparams["max_size"]
else:
self.msg_len = 0
self.auto_deq = tparams["auto_deq"]
if tparams["xid_min_size"] == tparams["xid_max_size"]:
self.xid_len = tparams["xid_max_size"]
else:
self.xid_len = 0
self.transient = tparams["transient"]
self.extern = tparams["extern"]
def _read_csv_file(self, filename, tnum):
"""Read the CSV test parameter file"""
try:
csvf = open(filename, "r")
except IOError:
print "ERROR: Unable to open CSV file \"%s\"" % filename
sys.exit(1)
for line in csvf:
str_list = line.strip().split(",")
if len(str_list[0]) > 0 and str_list[0][0] != "\"":
try:
if (int(str_list[self.TEST_NUM_COL]) == tnum):
return { "num_msgs": int(str_list[self.NUM_MSGS_COL]),
"min_size": int(str_list[self.MIN_MSG_SIZE_COL]),
"max_size": int(str_list[self.MAX_MSG_SIZE_COL]),
"auto_deq": not (str_list[self.AUTO_DEQ_COL] == "FALSE" or
str_list[self.AUTO_DEQ_COL] == "0"),
"xid_min_size": int(str_list[self.MIN_XID_SIZE_COL]),
"xid_max_size": int(str_list[self.MAX_XID_SIZE_COL]),
"transient": not (str_list[self.TRANSIENT_COL] == "FALSE" or
str_list[self.TRANSIENT_COL] == "0"),
"extern": not (str_list[self.EXTERN_COL] == "FALSE" or
str_list[self.EXTERN_COL] == "0"),
"comment": str_list[self.COMMENT_COL] }
except Exception:
pass
return None
def _process_args(self):
"""Process command-line arguments"""
opt = optparse.OptionParser(usage="%prog [options] DIR", version="%prog 1.0")
opt.add_option("-b", "--base-filename",
action="store", dest="bfn", default="JournalData",
help="Base filename for old journal files")
opt.add_option("-c", "--csv-filename",
action="store", dest="csvfn",
help="CSV filename containing test parameters")
opt.add_option("-q", "--quiet",
action="store_true", dest="qflag",
help="Quiet (suppress all non-error output)")
opt.add_option("-r", "--records",
action="store_true", dest="rflag",
help="Print all records and transactions (including consumed/closed)")
opt.add_option("-t", "--test-num",
action="store", type="int", dest="tnum",
help="Test number from CSV file - only valid if CSV file named")
opt.add_option("-v", "--verbose",
action="store_true", dest="vflag",
help="Verbose output")
(self.opts, args) = opt.parse_args()
if len(args) == 0:
opt.error("No journal directory argument")
elif len(args) > 1:
opt.error("Too many positional arguments: %s" % args)
if self.opts.qflag and self.opts.rflag:
opt.error("Quiet (-q/--quiet) and record (-r/--records) options are mutually exclusive")
if self.opts.qflag and self.opts.vflag:
opt.error("Quiet (-q/--quiet) and verbose (-v/--verbose) options are mutually exclusive")
self._jdir = args[0]
if not os.path.exists(self._jdir):
opt.error("Journal path \"%s\" does not exist" % self._jdir)
# Callbacks for checking against CSV test parameters. Return False if ok, True to raise error.
#@staticmethod
def _csv_pre_run_chk(csv_store_chk):
"""Check performed before a test runs"""
if csv_store_chk.num_msgs == None:
return
if csv_store_chk.jrnl_anal.is_empty() and csv_store_chk.num_msgs > 0:
raise jerr.AllJrnlFilesEmptyCsvError(csv_store_chk.get_opts().tnum, csv_store_chk.num_msgs)
return False
_csv_pre_run_chk = staticmethod(_csv_pre_run_chk)
#@staticmethod
def _csv_enq_chk(csv_store_chk, hdr):
"""Check performed before each enqueue operation"""
#if csv_store_chk.num_msgs == None: return
#
if csv_store_chk.extern != None:
if csv_store_chk.extern != hdr.extern:
raise jerr.ExternFlagCsvError(csv_store_chk.opts.tnum, csv_store_chk.extern)
if hdr.extern and hdr.data != None:
raise jerr.ExternFlagWithDataCsvError(csv_store_chk.opts.tnum)
if csv_store_chk.msg_len != None and csv_store_chk.msg_len > 0 and hdr.data != None and \
len(hdr.data) != csv_store_chk.msg_len:
raise jerr.MessageLengthCsvError(csv_store_chk.opts.tnum, csv_store_chk.msg_len, len(hdr.data))
if csv_store_chk.xid_len != None and csv_store_chk.xid_len > 0 and len(hdr.xid) != csv_store_chk.xid_len:
raise jerr.XidLengthCsvError(csv_store_chk.opts.tnum, csv_store_chk.xid_len, len(hdr.xid))
if csv_store_chk.transient != None and hdr.transient != csv_store_chk.transient:
raise jerr.TransactionCsvError(csv_store_chk.opts.tnum, csv_store_chk.transient)
return False
_csv_enq_chk = staticmethod(_csv_enq_chk)
#@staticmethod
def _csv_deq_chk(csv_store_chk, hdr):
"""Check performed before each dequeue operation"""
if csv_store_chk.auto_deq != None and not csv_store_chk.auto_deq:
raise jerr.JWarning("[CSV %d] WARNING: Dequeue record rid=%d found in non-dequeue test - ignoring." %
(csv_store_chk.opts.tnum, hdr.rid))
#self._warning.append("[CSV %d] WARNING: Dequeue record rid=%d found in non-dequeue test - ignoring." %
# (csv_store_chk.opts.tnum, hdr.rid))
return False
_csv_deq_chk = staticmethod(_csv_deq_chk)
#@staticmethod
def _csv_txn_chk(csv_store_chk, hdr):
"""Check performed before each transaction commit/abort"""
return False
_csv_txn_chk = staticmethod(_csv_txn_chk)
#@staticmethod
def _csv_post_run_chk(csv_store_chk):
"""Cehck performed after the completion of the test"""
# Exclude this check if lastFileFlag is set - the count may be less than the number of msgs sent because
# of journal overwriting
if csv_store_chk.num_msgs != None and not csv_store_chk.jrnl_rdr.is_last_file() and \
csv_store_chk.num_msgs != csv_store_chk.jrnl_rdr.get_msg_cnt():
raise jerr.NumMsgsCsvError(csv_store_chk.opts.tnum, csv_store_chk.num_msgs,
csv_store_chk.jrnl_rdr.get_msg_cnt())
return False
_csv_post_run_chk = staticmethod(_csv_post_run_chk)
#==============================================================================
# main program
#==============================================================================
if __name__ == "__main__":
M = CsvStoreChk()
try:
M.run()
except Exception, e:
sys.exit(e)