QPID-5362: Linearstore: No store tools exist for examining the journals - Bugfix and update for new partition and directory structure change from QPID-5671 and QPID-6303

git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1652490 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/tools/src/py/qlslibs/anal.py b/qpid/tools/src/py/qlslibs/analyze.py
similarity index 95%
rename from qpid/tools/src/py/qlslibs/anal.py
rename to qpid/tools/src/py/qlslibs/analyze.py
index df51c1b..a67e17e 100644
--- a/qpid/tools/src/py/qlslibs/anal.py
+++ b/qpid/tools/src/py/qlslibs/analyze.py
@@ -18,7 +18,7 @@
 #
 
 """
-Module: qlslibs.anal
+Module: qlslibs.analyze
 
 Classes for recovery and analysis of a Qpid Linear Store (QLS).
 """
@@ -41,8 +41,8 @@
         return self.num
 
 class JournalRecoveryManager(object):
-    TPL_DIR_NAME = 'tpl'
-    JRNL_DIR_NAME = 'jrnl'
+    TPL_DIR_NAME = 'tpl2'
+    JRNL_DIR_NAME = 'jrnl2'
     def __init__(self, directory, args):
         if not os.path.exists(directory):
             raise qlslibs.err.InvalidQlsDirectoryNameError(directory)
@@ -55,15 +55,15 @@
     def report(self, print_stats_flag):
         self._reconcile_transactions(self.prepared_list, self.args.txn)
         if self.tpl is not None:
-            self.tpl.report(print_stats_flag)
+            self.tpl.report(print_stats_flag, self.args.show_recovered_recs)
         for queue_name in sorted(self.journals.keys()):
-            self.journals[queue_name].report(print_stats_flag)
+            self.journals[queue_name].report(print_stats_flag, self.args.show_recovered_recs)
     def run(self):
         tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME)
         if os.path.exists(tpl_dir):
             self.tpl = Journal(tpl_dir, None, self.args)
             self.tpl.recover(self.high_rid_counter)
-            if self.args.show_recs or self.args.show_all_recs:
+            if self.args.show_recovery_recs or self.args.show_all_recs:
                 print
         jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME)
         self.prepared_list = self.tpl.txn_map.get_prepared_list() if self.tpl is not None else {}
@@ -72,8 +72,9 @@
                 jrnl = Journal(os.path.join(jrnl_dir, dir_entry), self.prepared_list, self.args)
                 jrnl.recover(self.high_rid_counter)
                 self.journals[jrnl.get_queue_name()] = jrnl
-                if self.args.show_recs or self.args.show_all_recs:
+                if self.args.show_recovery_recs or self.args.show_all_recs:
                     print
+        print
     def _reconcile_transactions(self, prepared_list, txn_flag):
         print 'Transaction reconciliation report:'
         print '=================================='
@@ -315,6 +316,7 @@
     """
     Instance of a Qpid Linear Store (QLS) journal.
     """
+    JRNL_SUFFIX = 'jrnl'
     def __init__(self, directory, xid_prepared_list, args):
         self.directory = directory
         self.queue_name = os.path.basename(directory)
@@ -350,7 +352,7 @@
     def get_queue_name(self):
         return self.queue_name
     def recover(self, high_rid_counter):
-        print 'Recovering', self.queue_name
+        print 'Recovering %s...' % self.queue_name,
         self._analyze_files()
         try:
             while self._get_next_record(high_rid_counter):
@@ -362,6 +364,7 @@
             print '0x%08x: **** FRO ERROR: queue=\"%s\" fid=0x%x fro actual=0x%08x expected=0x%08x' % \
             (err.get_expected_fro(), err.get_queue_name(), err.get_file_number(), err.get_record_offset(),
              err.get_expected_fro())
+        print 'done'
     def reconcile_transactions(self, prepared_list, txn_flag):
         xid_list = self.txn_map.get_xid_list()
         if len(xid_list) > 0:
@@ -385,13 +388,13 @@
                 print '  ', qlslibs.utils.format_xid(xid), '- Ignoring, not in prepared transaction list'
                 if txn_flag:
                     self.txn_map.abort(xid)
-    def report(self, print_stats_flag):
+    def report(self, print_stats_flag, show_recovered_records):
         print 'Journal "%s":' % self.queue_name
         print '=' * (11 + len(self.queue_name))
         if print_stats_flag:
             print str(self.statistics)
-        print self.enq_map.report_str(True, True)
-        print self.txn_map.report_str(True, True)
+        print self.enq_map.report_str(True, show_recovered_records)
+        print self.txn_map.report_str(True, show_recovered_records)
         JournalFile.report_header()
         for file_num in sorted(self.files.keys()):
             self.files[file_num].report()
@@ -405,7 +408,7 @@
     def _analyze_files(self):
         for dir_entry in os.listdir(self.directory):
             dir_entry_bits = dir_entry.split('.')
-            if len(dir_entry_bits) == 2 and dir_entry_bits[1] == JournalRecoveryManager.JRNL_DIR_NAME:
+            if len(dir_entry_bits) == 2 and dir_entry_bits[1] == Journal.JRNL_SUFFIX:
                 fq_file_name = os.path.join(self.directory, dir_entry)
                 file_handle = open(fq_file_name)
                 args = qlslibs.utils.load_args(file_handle, qlslibs.jrnl.RecordHeader)
@@ -413,7 +416,7 @@
                 file_hdr.init(file_handle, *qlslibs.utils.load_args(file_handle, qlslibs.jrnl.FileHeader))
                 if file_hdr.is_header_valid(file_hdr):
                     file_hdr.load(file_handle)
-                    if file_hdr.is_valid():
+                    if file_hdr.is_valid(False):
                         qlslibs.utils.skip(file_handle,
                                            file_hdr.file_header_size_sblks * qlslibs.utils.DEFAULT_SBLK_SIZE)
                         self.files[file_hdr.file_num] = JournalFile(file_hdr)
@@ -430,7 +433,7 @@
                                                qlslibs.utils.DEFAULT_DBLK_SIZE
             self.fill_to_offset = self.last_record_offset + \
                                   (self.num_filler_records_required * qlslibs.utils.DEFAULT_DBLK_SIZE)
-            if self.args.show_recs or self.args.show_all_recs:
+            if self.args.show_recovery_recs or self.args.show_all_recs:
                 print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \
                       (self.current_journal_file.file_header.file_num, self.last_record_offset,
                        self.num_filler_records_required, self.fill_to_offset)
@@ -460,7 +463,7 @@
             return False
         self.current_journal_file = self.files[file_num]
         self.first_rec_flag = True
-        if self.args.show_recs or self.args.show_all_recs:
+        if self.args.show_recovery_recs or self.args.show_all_recs:
             file_header = self.current_journal_file.file_header
             print '0x%x:%s' % (file_header.file_num, file_header.to_string())
         return True
@@ -480,18 +483,18 @@
         if isinstance(this_record, qlslibs.jrnl.EnqueueRecord):
             ok_flag = self._handle_enqueue_record(this_record, start_journal_file)
             high_rid_counter.check(this_record.record_id)
-            if self.args.show_recs or self.args.show_all_recs:
+            if self.args.show_recovery_recs or self.args.show_all_recs:
                 print '0x%x:%s' % (start_journal_file.file_header.file_num, \
                                    this_record.to_string(self.args.show_xids, self.args.show_data))
         elif isinstance(this_record, qlslibs.jrnl.DequeueRecord):
             ok_flag = self._handle_dequeue_record(this_record, start_journal_file)
             high_rid_counter.check(this_record.record_id)
-            if self.args.show_recs or self.args.show_all_recs:
+            if self.args.show_recovery_recs or self.args.show_all_recs:
                 print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids))
         elif isinstance(this_record, qlslibs.jrnl.TransactionRecord):
             ok_flag = self._handle_transaction_record(this_record, start_journal_file)
             high_rid_counter.check(this_record.record_id)
-            if self.args.show_recs or self.args.show_all_recs:
+            if self.args.show_recovery_recs or self.args.show_all_recs:
                 print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record.to_string(self.args.show_xids))
         else:
             self.statistics.filler_record_count += 1
diff --git a/qpid/tools/src/py/qlslibs/efp.py b/qpid/tools/src/py/qlslibs/efp.py
index 0fa27b3..1c751c3 100644
--- a/qpid/tools/src/py/qlslibs/efp.py
+++ b/qpid/tools/src/py/qlslibs/efp.py
@@ -85,15 +85,16 @@
         self.efp_partitions.remove(self.current_efp_partition)
         shutil.rmtree(os.path.join(self.current_efp_partition.efp_directory, dir_name))
     def report(self):
-        print 'Empty File Pool (EFP) report:'
-        print '============================='
-        print 'Found', len(self.efp_partitions), 'partition(s).'
+        print 'Empty File Pool (EFP) report'
+        print '============================'
+        print 'Found', len(self.efp_partitions), 'partition(s)'
         if (len(self.efp_partitions)) > 0:
+            sorted_efp_partitions = sorted(self.efp_partitions, key=lambda x: x.partition_number)
             EfpPartition.print_report_table_header()
-            for ptn in self.efp_partitions:
+            for ptn in sorted_efp_partitions:
                 ptn.print_report_table_line()
             print
-            for ptn in self.efp_partitions:
+            for ptn in sorted_efp_partitions:
                 ptn.report()
     def run(self, arg_tup):
         self._analyze_efp()
@@ -182,9 +183,12 @@
                                        self.tot_file_size_kb, self.directory)
     def report(self):
         print 'Partition %s:' % os.path.basename(self.directory)
-        EmptyFilePool.print_report_table_header()
-        for dir_name in self.efp_pools.keys():
-            self.efp_pools[dir_name].print_report_table_line()
+        if len(self.efp_pools) > 0:
+            EmptyFilePool.print_report_table_header()
+            for dir_name in self.efp_pools.keys():
+                self.efp_pools[dir_name].print_report_table_line()
+        else:
+            print '<empty - no EFPs found in this partition>'
         print
     def scan(self):
         if os.path.exists(self.directory):
@@ -217,13 +221,16 @@
     """
     EFP_DIR_SUFFIX = 'k'
     EFP_JRNL_EXTENTION = '.jrnl'
+    EFP_INUSE_DIRNAME = 'in_use'
+    EFP_RETURNED_DIRNAME = 'returned'
     def __init__(self, directory, partition_number):
         self.base_dir_name = os.path.basename(directory)
         self.directory = directory
         self.partition_number = partition_number
         self.data_size_kb = None
-        self.files = []
-        self.tot_file_size_kb = 0
+        self.efp_files = []
+        self.in_use_files = []
+        self.returned_files = []
         self._validate_efp_directory()
     def create_new_efp_files(self, num_files):
         """ Create one or more new empty journal files of the prescribed size for this EFP """
@@ -238,24 +245,37 @@
         """ Static function to create an EFP directory name from the size of the files it contains """
         return '%dk' % file_size_kb
     def get_tot_file_count(self):
-        return len(self.files)
+        return len(self.efp_files)
     def get_tot_file_size_kb(self):
-        return self.data_size_kb * len(self.files)
+        return self.data_size_kb * len(self.efp_files)
     @staticmethod
     def print_report_table_header():
-        print 'data_size_kb file_count tot_file_size_kb efp_directory'
-        print '------------ ---------- ---------------- -------------'
+        print '             ---------- efp ------------ --------- in_use ---------- -------- returned ---------'
+        print 'data_size_kb file_count tot_file_size_kb file_count tot_file_size_kb file_count tot_file_size_kb efp_directory'
+        print '------------ ---------- ---------------- ---------- ---------------- ---------- ---------------- -------------'
     def print_report_table_line(self):
-        print '%12d %10d %16d %s' % (self.data_size_kb, self.get_tot_file_count(),
-                                     self.get_tot_file_size_kb(), self.get_directory())
+        print '%12d %10d %16d %10d %16d %10d %16d %s' % (self.data_size_kb, len(self.efp_files),
+                                                         self.data_size_kb * len(self.efp_files),
+                                                         len(self.in_use_files),
+                                                         self.data_size_kb * len(self.in_use_files),
+                                                         len(self.returned_files),
+                                                         self.data_size_kb * len(self.returned_files),
+                                                         self.get_directory())
     def scan(self):
         for efp_file in os.listdir(self.directory):
+            if efp_file == self.EFP_INUSE_DIRNAME:
+                for in_use_file in os.listdir(os.path.join(self.directory, self.EFP_INUSE_DIRNAME)):
+                    self.in_use_files.append(in_use_file)
+                continue
+            if efp_file == self.EFP_RETURNED_DIRNAME:
+                for returned_file in os.listdir(os.path.join(self.directory, self.EFP_RETURNED_DIRNAME)):
+                    self.returned_files.append(returned_file)
+                continue
             if self._validate_efp_file(os.path.join(self.directory, efp_file)):
-                self.files.append(efp_file)
+                self.efp_files.append(efp_file)
     def _add_efp_file(self, efp_file_name):
         """ Add a single journal file of the appropriate size to this EFP. No file size check is made here. """
-        self.files.append(efp_file_name)
-        self.tot_file_size_kb += os.path.getsize(efp_file_name)
+        self.efp_files.append(efp_file_name)
     def _create_new_efp_file(self):
         """ Create a single new empty journal file of the prescribed size for this EFP """
         file_name = str(uuid.uuid4()) + EmptyFilePool.EFP_JRNL_EXTENTION
@@ -296,7 +316,7 @@
             return False
         file_hdr.load(file_handle)
         file_handle.close()
-        if not file_hdr.is_valid():
+        if not file_hdr.is_valid(True):
             return False
         return True
 
diff --git a/qpid/tools/src/py/qlslibs/jrnl.py b/qpid/tools/src/py/qlslibs/jrnl.py
index 555cf6a..ee25015 100644
--- a/qpid/tools/src/py/qlslibs/jrnl.py
+++ b/qpid/tools/src/py/qlslibs/jrnl.py
@@ -155,19 +155,24 @@
         self.queue_name = file_handle.read(self.queue_name_len)
     def is_end_of_file(self):
         return self.file_handle.tell() >= self.get_file_size()
-    def is_valid(self):
+    def is_valid(self, is_empty):
         if not RecordHeader.is_header_valid(self, self):
             return False
         if self.file_handle is None or self.file_header_size_sblks == 0 or self.partition_num == 0 or \
-           self.efp_data_size_kb == 0 or self.first_record_offset == 0 or self.timestamp_sec == 0 or \
-           self.timestamp_ns == 0 or self.file_num == 0:
+           self.efp_data_size_kb == 0:
             return False
-        if self.queue_name_len == 0:
-            return False
-        if self.queue_name is None:
-            return False
-        if len(self.queue_name) != self.queue_name_len:
-            return False
+        if is_empty:
+            if self.first_record_offset != 0 or self.timestamp_sec != 0 or self.timestamp_ns != 0 or \
+               self.file_num != 0 or self.queue_name_len != 0:
+                return False
+        else:
+            if self.first_record_offset == 0 or self.timestamp_sec == 0 or self.timestamp_ns == 0 or \
+               self.file_num == 0 or self.queue_name_len == 0:
+                return False
+            if self.queue_name is None:
+                return False
+            if len(self.queue_name) != self.queue_name_len:
+                return False
         return True
     def timestamp_str(self):
         """Get the timestamp of this record in string format"""
diff --git a/qpid/tools/src/py/qpid-qls-analyze b/qpid/tools/src/py/qpid-qls-analyze
index 5c7d04b..35b370f 100755
--- a/qpid/tools/src/py/qpid-qls-analyze
+++ b/qpid/tools/src/py/qpid-qls-analyze
@@ -28,11 +28,11 @@
 
 default = os.path.normpath('/usr/share/qpid-tools')
 home = os.environ.get('QPID_TOOLS_HOME', default)
-sys.path.append(os.path.join(home,'python'))
+sys.path.append(os.path.join(home, 'python'))
 
 import argparse
 import os
-import qlslibs.anal
+import qlslibs.analyze
 import qlslibs.efp
 
 class QlsAnalyzerArgParser(argparse.ArgumentParser):
@@ -45,7 +45,9 @@
                           help='Qpid Linear Store (QLS) directory to be analyzed')
         self.add_argument('--efp', action='store_true',
                           help='Analyze the Emtpy File Pool (EFP) and show stats')
-        self.add_argument('--show-recs', action='store_true',
+        self.add_argument('--show-recovered-recs', action='store_true',
+                          help='Show only recovered records')
+        self.add_argument('--show-recovery-recs', action='store_true',
                           help='Show material records found during recovery')
         self.add_argument('--show-all-recs', action='store_true',
                           help='Show all records (including fillers) found during recovery')
@@ -72,13 +74,13 @@
     * The Linear Store
     * The Transaction Prepared List (TPL)
     """
-    QLS_ANALYZE_VERSION = '0.1'
+    QLS_ANALYZE_VERSION = '1.0'
     def __init__(self):
         self.args = None
         self._process_args()
         self.qls_dir = os.path.abspath(self.args.qls_dir)
         self.efp_manager = qlslibs.efp.EfpManager(self.qls_dir, None)
-        self.jrnl_recovery_mgr = qlslibs.anal.JournalRecoveryManager(self.qls_dir, self.args)
+        self.jrnl_recovery_mgr = qlslibs.analyze.JournalRecoveryManager(self.qls_dir, self.args)
     def _process_args(self):
         """ Create arg parser and process args """
         parser = QlsAnalyzerArgParser()
@@ -101,10 +103,6 @@
 #==============================================================================
 
 if __name__ == "__main__":
-    # TODO: Remove this in due course
-    print 'WARNING: This program is still a work in progress and is largely untested.'
-    print '* USE AT YOUR OWN RISK *'
-    print
     M = QqpdLinearStoreAnalyzer()
     M.run()
     M.report()