| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #include "qpid/linearstore/JournalImpl.h" |
| |
| #include "qpid/linearstore/DataTokenImpl.h" |
| #include "qpid/linearstore/JournalLogImpl.h" |
| #include "qpid/linearstore/journal/jexception.h" |
| #include "qpid/linearstore/StoreException.h" |
| #include "qpid/management/ManagementAgent.h" |
| |
| namespace qpid { |
| namespace linearstore { |
| |
| InactivityFireEvent::InactivityFireEvent(JournalImpl* p, |
| const ::qpid::sys::Duration timeout): |
| ::qpid::sys::TimerTask(timeout, p->id()), _parent(p), |
| _state(NOT_ADDED) |
| {} |
| |
| void InactivityFireEvent::reset(qpid::sys::Timer& timer) { |
| ::qpid::sys::Mutex::ScopedLock sl(_ifeStateLock); |
| switch (_state) { |
| case NOT_ADDED: |
| timer.add(this); |
| break; |
| case FIRED : |
| restart(); |
| timer.add(this); |
| break; |
| case FLUSHED: |
| restart(); |
| break; |
| case CANCELLED: |
| return; |
| default:; // ignore |
| } |
| _state = RUNNING; |
| } |
| |
| ::qpid::linearstore::journal::iores InactivityFireEvent::flush(bool blockFlag) { |
| ::qpid::sys::Mutex::ScopedLock sl(_ifeStateLock); |
| if (_state == RUNNING) { |
| ::qpid::linearstore::journal::iores res = _parent->do_flush(blockFlag); |
| _state = FLUSHED; |
| return res; |
| } |
| return ::qpid::linearstore::journal::RHM_IORES_SUCCESS; |
| } |
| |
| void InactivityFireEvent::fire() { |
| ::qpid::sys::Mutex::ScopedLock sl(_ifeStateLock); |
| switch (_state) { |
| case RUNNING: |
| _parent->do_flush(false); |
| _state = FIRED; |
| break; |
| case FLUSHED: |
| _state = FIRED; |
| break; |
| default:; // ignore |
| } |
| } |
| |
| void InactivityFireEvent::cancel() { |
| ::qpid::sys::TimerTask::cancel(); |
| { |
| ::qpid::sys::Mutex::ScopedLock sl(_ifeStateLock); |
| _state = CANCELLED; |
| } |
| } |
| |
| GetEventsFireEvent::GetEventsFireEvent(JournalImpl* p, |
| const ::qpid::sys::Duration timeout): |
| ::qpid::sys::TimerTask(timeout, "JournalGetEvents:"+p->id()), _parent(p) |
| {} |
| |
| void GetEventsFireEvent::fire() { |
| ::qpid::sys::Mutex::ScopedLock sl(_gefe_lock); |
| if (_parent) { |
| _parent->getEventsFire(); |
| } |
| } |
| |
| JournalImpl::JournalImpl(::qpid::sys::Timer& timer_, |
| const std::string& journalId, |
| const std::string& journalDirectory, |
| JournalLogImpl& journalLogRef, |
| const ::qpid::sys::Duration getEventsTimeout, |
| const ::qpid::sys::Duration flushTimeout, |
| ::qpid::management::ManagementAgent* a, |
| DeleteCallback onDelete): |
| jcntl(journalId, journalDirectory, journalLogRef), |
| timer(timer_), |
| _journalLogRef(journalLogRef), |
| getEventsTimerSetFlag(false), |
| deleteCallback(onDelete) |
| { |
| getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout); |
| inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout); |
| |
| initManagement(a); |
| |
| std::ostringstream oss; |
| oss << "Journal directory = \"" << journalDirectory << "\""; |
| QLS_LOG2(debug, _jid, oss.str()); |
| } |
| |
| JournalImpl::~JournalImpl() |
| { |
| if (deleteCallback) deleteCallback(*this); |
| if (_init_flag && !_stop_flag){ |
| try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls are complete! |
| catch (const ::qpid::linearstore::journal::jexception& e) { QLS_LOG2(error, _jid, e.what()); } |
| } |
| getEventsFireEventsPtr->cancel(); |
| inactivityFireEventPtr->cancel(); |
| |
| if (_mgmtObject.get() != 0) { |
| _mgmtObject->resourceDestroy(); |
| _mgmtObject.reset(); |
| } |
| |
| QLS_LOG2(info, _jid, "Stopped"); |
| } |
| |
| void |
| JournalImpl::initManagement(::qpid::management::ManagementAgent* a) |
| { |
| _agent = a; |
| if (_agent != 0) |
| { |
| _mgmtObject = ::qmf::org::apache::qpid::linearstore::Journal::shared_ptr ( |
| new ::qmf::org::apache::qpid::linearstore::Journal(_agent, this, _jid)); |
| |
| _mgmtObject->set_directory(_jdir.dirname()); |
| // _mgmtObject->set_baseFileName(_base_filename); |
| // _mgmtObject->set_readPageSize(JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE); |
| // _mgmtObject->set_readPages(JRNL_RMGR_PAGES); |
| |
| // The following will be set on initialize(), but being properties, these must be set to 0 in the meantime |
| //_mgmtObject->set_initialFileCount(0); |
| //_mgmtObject->set_dataFileSize(0); |
| //_mgmtObject->set_currentFileCount(0); |
| _mgmtObject->set_writePageSize(0); |
| _mgmtObject->set_writePages(0); |
| |
| _agent->addObject(_mgmtObject, 0, true); |
| } |
| } |
| |
| |
| void |
| JournalImpl::initialize(::qpid::linearstore::journal::EmptyFilePool* efpp_, |
| const uint16_t wcache_num_pages, |
| const uint32_t wcache_pgsize_sblks, |
| ::qpid::linearstore::journal::aio_callback* const cbp, |
| const std::string& nonDefaultParamsMsg) |
| { |
| // efpp->createJournal(_jdir); |
| // QLS_LOG2(info, _jid, "Initialized"); |
| jcntl::initialize(efpp_, wcache_num_pages, wcache_pgsize_sblks, cbp); |
| if (nonDefaultParamsMsg.size() > 0) { |
| QLS_LOG2(info, _jid, "Created, parameters:" << nonDefaultParamsMsg); |
| } else { |
| QLS_LOG2(info, _jid, "Created"); |
| } |
| // TODO: replace for linearstore: _lpmgr |
| /* |
| if (_mgmtObject.get() != 0) |
| { |
| _mgmtObject->set_initialFileCount(_lpmgr.num_jfiles()); |
| _mgmtObject->set_autoExpand(_lpmgr.is_ae()); |
| _mgmtObject->set_currentFileCount(_lpmgr.num_jfiles()); |
| _mgmtObject->set_maxFileCount(_lpmgr.ae_max_jfiles()); |
| _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE); |
| _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE); |
| _mgmtObject->set_writePages(wcache_num_pages); |
| } |
| if (_agent != 0) |
| _agent->raiseEvent::(qmf::org::apache::qpid::linearstore::EventCreated(_jid, _jfsize_sblks * JRNL_SBLK_SIZE, _lpmgr.num_jfiles()), |
| qpid::management::ManagementAgent::SEV_NOTE); |
| */ |
| } |
| |
| void |
| JournalImpl::recover(boost::shared_ptr< ::qpid::linearstore::journal::EmptyFilePoolManager> efpm, |
| const uint16_t wcache_num_pages, |
| const uint32_t wcache_pgsize_sblks, |
| ::qpid::linearstore::journal::aio_callback* const cbp, |
| boost::ptr_list<PreparedTransaction>* prep_tx_list_ptr, |
| uint64_t& highest_rid, |
| uint64_t queue_id) |
| { |
| std::ostringstream oss1; |
| oss1 << "Recover;"; |
| oss1 << " queue_id = 0x" << std::hex << queue_id << std::dec; |
| oss1 << " wcache_pgsize_sblks=" << wcache_pgsize_sblks; |
| oss1 << " wcache_num_pages=" << wcache_num_pages; |
| QLS_LOG2(debug, _jid, oss1.str()); |
| // TODO: replace for linearstore: _lpmgr |
| /* |
| if (_mgmtObject.get() != 0) |
| { |
| _mgmtObject->set_initialFileCount(_lpmgr.num_jfiles()); |
| _mgmtObject->set_autoExpand(_lpmgr.is_ae()); |
| _mgmtObject->set_currentFileCount(_lpmgr.num_jfiles()); |
| _mgmtObject->set_maxFileCount(_lpmgr.ae_max_jfiles()); |
| _mgmtObject->set_dataFileSize(_jfsize_sblks * JRNL_SBLK_SIZE); |
| _mgmtObject->set_writePageSize(wcache_pgsize_sblks * JRNL_SBLK_SIZE); |
| _mgmtObject->set_writePages(wcache_num_pages); |
| } |
| */ |
| |
| // TODO: This is ugly, find a way for RecoveryManager to use boost::ptr_list<PreparedTransaction>* directly |
| if (prep_tx_list_ptr) { |
| // Create list of prepared xids |
| std::vector<std::string> prep_xid_list; |
| for (PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) { |
| prep_xid_list.push_back(i->xid); |
| } |
| |
| jcntl::recover(efpm.get(), wcache_num_pages, wcache_pgsize_sblks, cbp, &prep_xid_list, highest_rid); |
| } else { |
| jcntl::recover(efpm.get(), wcache_num_pages, wcache_pgsize_sblks, cbp, 0, highest_rid); |
| } |
| |
| // Populate PreparedTransaction lists from _tmap |
| if (prep_tx_list_ptr) |
| { |
| for (PreparedTransaction::list::iterator i = prep_tx_list_ptr->begin(); i != prep_tx_list_ptr->end(); i++) { |
| ::qpid::linearstore::journal::txn_data_list_t tdl = _tmap.get_tdata_list(i->xid); // tdl will be empty if xid not found |
| for (::qpid::linearstore::journal::tdl_itr_t tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) { |
| if (tdl_itr->enq_flag_) { // enqueue op |
| i->enqueues->add(queue_id, tdl_itr->rid_); |
| } else { // dequeue op |
| i->dequeues->add(queue_id, tdl_itr->drid_); |
| } |
| } |
| } |
| } |
| std::ostringstream oss2; |
| oss2 << "Recover phase 1 complete; highest rid found = 0x" << std::hex << highest_rid; |
| oss2 << std::dec << "; emap.size=" << _emap.size() << "; tmap.size=" << _tmap.size(); |
| oss2 << "; journal now read-only."; |
| QLS_LOG2(debug, _jid, oss2.str()); |
| |
| if (_mgmtObject.get() != 0) |
| { |
| _mgmtObject->inc_recordDepth(_emap.size()); |
| _mgmtObject->inc_enqueues(_emap.size()); |
| _mgmtObject->inc_txn(_tmap.size()); |
| _mgmtObject->inc_txnEnqueues(_tmap.enq_cnt()); |
| _mgmtObject->inc_txnDequeues(_tmap.deq_cnt()); |
| } |
| } |
| |
| void |
| JournalImpl::recover_complete() |
| { |
| jcntl::recover_complete(); |
| QLS_LOG2(debug, _jid, "Recover phase 2 complete; journal now writable."); |
| // TODO: replace for linearstore: _lpmgr |
| /* |
| if (_agent != 0) |
| _agent->raiseEvent(qmf::org::apache::qpid::linearstore::EventRecovered(_jid, _jfsize_sblks * JRNL_SBLK_SIZE, _lpmgr.num_jfiles(), |
| _emap.size(), _tmap.size(), _tmap.enq_cnt(), _tmap.deq_cnt()), qpid::management::ManagementAgent::SEV_NOTE); |
| */ |
| } |
| |
| |
| void |
| JournalImpl::enqueue_data_record(const void* const data_buff, |
| const size_t tot_data_len, |
| const size_t this_data_len, |
| ::qpid::linearstore::journal::data_tok* dtokp, |
| const bool transient) |
| { |
| handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient)); |
| |
| if (_mgmtObject.get() != 0) |
| { |
| _mgmtObject->inc_enqueues(); |
| _mgmtObject->inc_recordDepth(); |
| } |
| } |
| |
| void |
| JournalImpl::enqueue_extern_data_record(const size_t tot_data_len, |
| ::qpid::linearstore::journal::data_tok* dtokp, |
| const bool transient) |
| { |
| handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient)); |
| |
| if (_mgmtObject.get() != 0) |
| { |
| _mgmtObject->inc_enqueues(); |
| _mgmtObject->inc_recordDepth(); |
| } |
| } |
| |
| void |
| JournalImpl::enqueue_txn_data_record(const void* const data_buff, |
| const size_t tot_data_len, |
| const size_t this_data_len, |
| ::qpid::linearstore::journal::data_tok* dtokp, |
| const std::string& xid, |
| const bool tpc_flag, |
| const bool transient) |
| { |
| bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false; |
| |
| handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, tpc_flag, transient)); |
| |
| if (_mgmtObject.get() != 0) |
| { |
| if (!txn_incr) // If this xid was not in _tmap, it will be now... |
| _mgmtObject->inc_txn(); |
| _mgmtObject->inc_enqueues(); |
| _mgmtObject->inc_txnEnqueues(); |
| _mgmtObject->inc_recordDepth(); |
| } |
| } |
| |
| void |
| JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, |
| ::qpid::linearstore::journal::data_tok* dtokp, |
| const std::string& xid, |
| const bool tpc_flag, |
| const bool transient) |
| { |
| bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false; |
| |
| handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, tpc_flag, transient)); |
| |
| if (_mgmtObject.get() != 0) |
| { |
| if (!txn_incr) // If this xid was not in _tmap, it will be now... |
| _mgmtObject->inc_txn(); |
| _mgmtObject->inc_enqueues(); |
| _mgmtObject->inc_txnEnqueues(); |
| _mgmtObject->inc_recordDepth(); |
| } |
| } |
| |
| void |
| JournalImpl::dequeue_data_record(::qpid::linearstore::journal::data_tok* const dtokp, |
| const bool txn_coml_commit) |
| { |
| handleIoResult(jcntl::dequeue_data_record(dtokp, txn_coml_commit)); |
| |
| if (_mgmtObject.get() != 0) |
| { |
| _mgmtObject->inc_dequeues(); |
| _mgmtObject->inc_txnDequeues(); |
| _mgmtObject->dec_recordDepth(); |
| } |
| } |
| |
| void |
| JournalImpl::dequeue_txn_data_record(::qpid::linearstore::journal::data_tok* const dtokp, |
| const std::string& xid, |
| const bool tpc_flag, |
| const bool txn_coml_commit) |
| { |
| bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false; |
| |
| handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, tpc_flag, txn_coml_commit)); |
| |
| if (_mgmtObject.get() != 0) |
| { |
| if (!txn_incr) // If this xid was not in _tmap, it will be now... |
| _mgmtObject->inc_txn(); |
| _mgmtObject->inc_dequeues(); |
| _mgmtObject->inc_txnDequeues(); |
| _mgmtObject->dec_recordDepth(); |
| } |
| } |
| |
| void |
| JournalImpl::txn_abort(::qpid::linearstore::journal::data_tok* const dtokp, |
| const std::string& xid) |
| { |
| handleIoResult(jcntl::txn_abort(dtokp, xid)); |
| |
| if (_mgmtObject.get() != 0) |
| { |
| _mgmtObject->dec_txn(); |
| _mgmtObject->inc_txnAborts(); |
| } |
| } |
| |
| void |
| JournalImpl::txn_commit(::qpid::linearstore::journal::data_tok* const dtokp, |
| const std::string& xid) |
| { |
| handleIoResult(jcntl::txn_commit(dtokp, xid)); |
| |
| if (_mgmtObject.get() != 0) |
| { |
| _mgmtObject->dec_txn(); |
| _mgmtObject->inc_txnCommits(); |
| } |
| } |
| |
| void |
| JournalImpl::stop(bool block_till_aio_cmpl) |
| { |
| inactivityFireEventPtr->cancel(); |
| jcntl::stop(block_till_aio_cmpl); |
| |
| if (_mgmtObject.get() != 0) { |
| _mgmtObject->resourceDestroy(); |
| _mgmtObject.reset(); |
| } |
| } |
| |
| ::qpid::linearstore::journal::iores |
| JournalImpl::flush(const bool block_till_aio_cmpl) |
| { |
| return inactivityFireEventPtr->flush(block_till_aio_cmpl); |
| } |
| |
| // This flush call is accessed via the InactivityFireEvent::flush() and |
| // InactivityFireEvent::fire() calls only. |
| ::qpid::linearstore::journal::iores |
| JournalImpl::do_flush(const bool block_till_aio_cmpl) { |
| const ::qpid::linearstore::journal::iores res = jcntl::flush(block_till_aio_cmpl); |
| { |
| ::qpid::sys::Mutex::ScopedLock sl(_getf_lock); |
| if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { |
| setGetEventTimer(); |
| } |
| } |
| return res; |
| } |
| |
| void |
| JournalImpl::getEventsFire() |
| { |
| ::qpid::sys::Mutex::ScopedLock sl(_getf_lock); |
| getEventsTimerSetFlag = false; |
| if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(0); } |
| if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); } |
| } |
| |
| void |
| JournalImpl::wr_aio_cb(std::vector< ::qpid::linearstore::journal::data_tok*>& dtokl) |
| { |
| for (std::vector< ::qpid::linearstore::journal::data_tok*>::const_iterator i=dtokl.begin(); i!=dtokl.end(); i++) |
| { |
| DataTokenImpl* dtokp = static_cast<DataTokenImpl*>(*i); |
| if (/*!is_stopped() &&*/ dtokp->getSourceMessage()) |
| { |
| switch (dtokp->wstate()) |
| { |
| case ::qpid::linearstore::journal::data_tok::ENQ: |
| //std::cout << "<<<>>> JournalImpl::wr_aio_cb() ENQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG |
| dtokp->getSourceMessage()->enqueueComplete(); |
| break; |
| case ::qpid::linearstore::journal::data_tok::DEQ: |
| //std::cout << "<<<>>> JournalImpl::wr_aio_cb() DEQ dtokp rid=0x" << std::hex << dtokp->rid() << std::dec << std::endl << std::flush; // DEBUG |
| /* Don't need to signal until we have a way to ack completion of dequeue in AMQP |
| dtokp->getSourceMessage()->dequeueComplete(); |
| if ( dtokp->getSourceMessage()->isDequeueComplete() ) // clear id after last dequeue |
| dtokp->getSourceMessage()->setPersistenceId(0); |
| */ |
| break; |
| default: ; |
| } |
| } |
| dtokp->release(); |
| } |
| } |
| |
| void |
| JournalImpl::rd_aio_cb(std::vector<uint16_t>& /*pil*/) |
| {} |
| |
| void |
| JournalImpl::createStore() { |
| |
| } |
| |
| void |
| JournalImpl::handleIoResult(const ::qpid::linearstore::journal::iores r) |
| { |
| inactivityFireEventPtr->reset(timer); |
| switch (r) |
| { |
| case ::qpid::linearstore::journal::RHM_IORES_SUCCESS: |
| return; |
| default: |
| { |
| std::ostringstream oss; |
| oss << "Unexpected I/O response (" << ::qpid::linearstore::journal::iores_str(r) << ")."; |
| QLS_LOG2(error, _jid, oss.str()); |
| THROW_STORE_FULL_EXCEPTION(oss.str()); |
| } |
| } |
| } |
| |
| ::qpid::management::Manageable::status_t JournalImpl::ManagementMethod (uint32_t /*methodId*/, |
| ::qpid::management::Args& /*args*/, |
| std::string& /*text*/) |
| { |
| Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; |
| |
| /* |
| switch (methodId) |
| { |
| case _qmf::Journal::METHOD_EXPAND : |
| //_qmf::ArgsJournalExpand& eArgs = (_qmf::ArgsJournalExpand&) args; |
| |
| // Implement "expand" using eArgs.i_by (expand-by argument) |
| |
| status = Manageable::STATUS_NOT_IMPLEMENTED; |
| break; |
| } |
| */ |
| |
| return status; |
| } |
| |
| }} |