QPID-6551: [C++ broker]: linearstore raising JERR_LFCR_SEQNUMNOTFOUND after sending many DTX transactions

git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1680861 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
index e6e07dc..08d565c 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
@@ -114,15 +114,15 @@
     }
 }
 
-uint32_t LinearFileController::getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
+uint32_t LinearFileController::getEnqueuedRecordCount(const uint64_t fileSeqNumber) {
     return find(fileSeqNumber)->getEnqueuedRecordCount();
 }
 
-uint32_t LinearFileController::incrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
+uint32_t LinearFileController::incrEnqueuedRecordCount(const uint64_t fileSeqNumber) {
     return find(fileSeqNumber)->incrEnqueuedRecordCount();
 }
 
-uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
+uint32_t LinearFileController::decrEnqueuedRecordCount(const uint64_t fileSeqNumber) {
     uint32_t r = find(fileSeqNumber)->decrEnqueuedRecordCount();
 
     // TODO: Re-evaluate after testing and profiling
@@ -136,11 +136,11 @@
     return r;
 }
 
-uint32_t LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) {
+uint32_t LinearFileController::addWriteCompletedDblkCount(const uint64_t fileSeqNumber, const uint32_t a) {
     return find(fileSeqNumber)->addCompletedDblkCount(a);
 }
 
-uint16_t LinearFileController::decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber) {
+uint16_t LinearFileController::decrOutstandingAioOperationCount(const uint64_t fileSeqNumber) {
     return find(fileSeqNumber)->decrOutstandingAioOperationCount();
 }
 
@@ -199,9 +199,9 @@
 
 void LinearFileController::addJournalFile(const std::string& fileName,
                                           const efpIdentity_t& efpIdentity,
-                                          const uint64_t fileNumber,
+                                          const uint64_t fileSeqNumber,
                                           const uint32_t completedDblkCount) {
-    JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber, jcntlRef_.id());
+    JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileSeqNumber, jcntlRef_.id());
     addJournalFile(jfp, completedDblkCount, true);
 }
 
@@ -215,7 +215,7 @@
     return currentJournalFilePtr_ != 0;
 }
 
-JournalFile* LinearFileController::find(const efpFileCount_t fileSeqNumber) {
+JournalFile* LinearFileController::find(const uint64_t fileSeqNumber) {
     if (currentJournalFilePtr_ && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber)
         return currentJournalFilePtr_;
 
diff --git a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h
index 05f0814..3cdfb72 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h
@@ -74,12 +74,12 @@
     void purgeEmptyFilesToEfp();
 
     // Functions for manipulating counts of non-current JournalFile instances in journalFileList_
-    uint32_t getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
-    uint32_t incrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
-    uint32_t decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber);
-    uint32_t addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber,
+    uint32_t getEnqueuedRecordCount(const uint64_t fileSeqNumber);
+    uint32_t incrEnqueuedRecordCount(const uint64_t fileSeqNumber);
+    uint32_t decrEnqueuedRecordCount(const uint64_t fileSeqNumber);
+    uint32_t addWriteCompletedDblkCount(const uint64_t fileSeqNumber,
                                         const uint32_t a);
-    uint16_t decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber);
+    uint16_t decrOutstandingAioOperationCount(const uint64_t fileSeqNumber);
 
     // Pass-through functions for current JournalFile class
     void asyncFileHeaderWrite(io_context_t ioContextPtr,
@@ -101,11 +101,11 @@
 protected:
     void addJournalFile(const std::string& fileName,
                         const efpIdentity_t& efpIdentity,
-                        const uint64_t fileNumber,
+                        const uint64_t fileSeqNumber,
                         const uint32_t completedDblkCount);
     void assertCurrentJournalFileValid(const char* const functionName) const;
     bool checkCurrentJournalFileValid() const;
-    JournalFile* find(const efpFileCount_t fileSeqNumber);
+    JournalFile* find(const uint64_t fileSeqNumber);
     uint64_t getNextFileSeqNum();
     void pullEmptyFileFromEfp();
 };
diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
index 73a16f0..254566e 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
@@ -148,7 +148,7 @@
                     // Unlock any affected enqueues in emap
                     for (tdl_itr_t i=tdl.begin(); i<tdl.end(); i++) {
                         if (i->enq_flag_) { // enq op - decrement enqueue count
-                            fileNumberMap_[i->pfid_]->journalFilePtr_->decrEnqueuedRecordCount();
+                            fileNumberMap_[i->fid_]->journalFilePtr_->decrEnqueuedRecordCount();
                         } else if (enqueueMapRef_.is_enqueued(i->drid_, true)) { // deq op - unlock enq record
                             if (enqueueMapRef_.unlock(i->drid_) < enq_map::EMAP_OK) { // fail
                                 // enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND
@@ -738,7 +738,7 @@
                 txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
                 for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
                     if (itr->enq_flag_) {
-                        fileNumberMap_[itr->pfid_]->journalFilePtr_->decrEnqueuedRecordCount();
+                        fileNumberMap_[itr->fid_]->journalFilePtr_->decrEnqueuedRecordCount();
                     } else {
                         enqueueMapRef_.unlock(itr->drid_); // ignore not found error
                     }
@@ -765,11 +765,11 @@
                 txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
                 for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
                     if (itr->enq_flag_) { // txn enqueue
-//std::cout << "[rid=0x" << std::hex << itr->rid_ << std::dec << " fid=" << itr->pfid_ << " fpos=0x" << std::hex << itr->foffs_ << "]" << std::dec << std::flush; // DEBUG
-                        if (enqueueMapRef_.insert_pfid(itr->rid_, itr->pfid_, itr->foffs_) < enq_map::EMAP_OK) { // fail
+//std::cout << "[rid=0x" << std::hex << itr->rid_ << std::dec << " fid=" << itr->fid_ << " fpos=0x" << std::hex << itr->foffs_ << "]" << std::dec << std::flush; // DEBUG
+                        if (enqueueMapRef_.insert_pfid(itr->rid_, itr->fid_, itr->foffs_) < enq_map::EMAP_OK) { // fail
                             // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
                             std::ostringstream oss;
-                            oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->pfid_;
+                            oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->fid_;
                             throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "RecoveryManager", "getNextRecordHeader");
                         }
                     } else { // txn dequeue
@@ -854,7 +854,7 @@
         qpid::linearstore::journal::txn_data_list_t tdsl = transactionMapRef_.get_tdata_list(*j);
         for (qpid::linearstore::journal::tdl_itr_t k=tdsl.begin(); k!=tdsl.end(); ++k) {
             if (k->enq_flag_) {
-                recordIdList_.push_back(RecoveredRecordData_t(k->rid_, k->pfid_, k->foffs_, true));
+                recordIdList_.push_back(RecoveredRecordData_t(k->rid_, k->fid_, k->foffs_, true));
             }
         }
     }
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
index 3ada3d2..8336d36 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
@@ -36,14 +36,14 @@
 
 txn_data_t::txn_data_t(const uint64_t rid,
                        const uint64_t drid,
-                       const uint16_t pfid,
+                       const uint64_t fid,
                        const uint64_t foffs,
                        const bool enq_flag,
                        const bool tpc_flag,
                        const bool commit_flag):
         rid_(rid),
         drid_(drid),
-        pfid_(pfid),
+        fid_(fid),
         foffs_(foffs),
         enq_flag_(enq_flag),
         tpc_flag_(tpc_flag),
diff --git a/qpid/cpp/src/qpid/linearstore/journal/txn_map.h b/qpid/cpp/src/qpid/linearstore/journal/txn_map.h
index 996d54b..e79c052 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/txn_map.h
+++ b/qpid/cpp/src/qpid/linearstore/journal/txn_map.h
@@ -39,7 +39,7 @@
     {
         uint64_t rid_;      ///< Record id for this operation
         uint64_t drid_;     ///< Dequeue record id for this operation
-        uint16_t pfid_;     ///< Physical file id, to be used when transferring to emap on commit
+        uint64_t fid_;      ///< File seq number, to be used when transferring to emap on commit
         uint64_t foffs_;    ///< Offset in file for this record
         bool enq_flag_;     ///< If true, enq op, otherwise deq op
         bool tpc_flag_;     ///< 2PC transaction if true
@@ -47,7 +47,7 @@
         bool aio_compl_;    ///< Initially false, set to true when record AIO returns
         txn_data_t(const uint64_t rid,
                    const uint64_t drid,
-                   const uint16_t pfid,
+                   const uint64_t fid,
                    const uint64_t foffs,
                    const bool enq_flag,
                    const bool tpc_flag,
diff --git a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
index 7dc8f1d..1ff18da 100644
--- a/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
+++ b/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
@@ -307,7 +307,7 @@
                 for (tdl_const_itr_t i=tdl.begin(); i!=tdl.end() && !found; ++i) {
                     if (i->rid_ == dtokp->dequeue_rid()) {
                         found = true;
-                        dtokp->set_fid(i->pfid_);
+                        dtokp->set_fid(i->fid_);
                         break;
                     }
                 }
@@ -451,7 +451,7 @@
 				if (!itr->enq_flag_)
 				    _emap.unlock(itr->drid_); // ignore rid not found error
                 if (itr->enq_flag_) {
-                    fidl.push_back(itr->pfid_);
+                    fidl.push_back(itr->fid_);
                 }
             }
             std::pair<pending_txn_map_itr_t, bool> res = _txn_pending_map.insert(std::pair<std::string, fidl_t>(xid, fidl));
@@ -551,11 +551,11 @@
             {
                 if (itr->enq_flag_) // txn enqueue
                 {
-                    if (_emap.insert_pfid(itr->rid_, itr->pfid_, 0) < enq_map::EMAP_OK) // fail
+                    if (_emap.insert_pfid(itr->rid_, itr->fid_, 0) < enq_map::EMAP_OK) // fail
                     {
                         // The only error code emap::insert_pfid() returns is enq_map::EMAP_DUP_RID.
                         std::ostringstream oss;
-                        oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->pfid_;
+                        oss << std::hex << "rid=0x" << itr->rid_ << " _pfid=0x" << itr->fid_;
                         throw jexception(jerrno::JERR_MAP_DUPLICATE, oss.str(), "wmgr", "commit");
                     }
                 }