| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #include "qpid/legacystore/TxnCtxt.h" |
| |
| #include <sstream> |
| |
| #include "qpid/legacystore/jrnl/jexception.h" |
| #include "qpid/legacystore/StoreException.h" |
| |
| namespace mrg { |
| namespace msgstore { |
| |
| void TxnCtxt::completeTxn(bool commit) { |
| sync(); |
| for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) { |
| commitTxn(static_cast<JournalImpl*>(*i), commit); |
| } |
| impactedQueues.clear(); |
| if (preparedXidStorePtr) |
| commitTxn(preparedXidStorePtr, commit); |
| } |
| |
| void TxnCtxt::commitTxn(JournalImpl* jc, bool commit) { |
| if (jc && loggedtx) { /* if using journal */ |
| boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl); |
| dtokp->addRef(); |
| dtokp->set_external_rid(true); |
| dtokp->set_rid(loggedtx->next()); |
| try { |
| if (commit) { |
| jc->txn_commit(dtokp.get(), getXid()); |
| sync(); |
| } else { |
| jc->txn_abort(dtokp.get(), getXid()); |
| } |
| } catch (const journal::jexception& e) { |
| THROW_STORE_EXCEPTION(std::string("Error commit") + e.what()); |
| } |
| } |
| } |
| |
| // static |
| uuid_t TxnCtxt::uuid; |
| |
| // static |
| IdSequence TxnCtxt::uuidSeq; |
| |
| // static |
| bool TxnCtxt::staticInit = TxnCtxt::setUuid(); |
| |
| // static |
| bool TxnCtxt::setUuid() { |
| ::uuid_generate(uuid); |
| return true; |
| } |
| |
| TxnCtxt::TxnCtxt(IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), txn(0) { |
| if (loggedtx) { |
| // // Human-readable tid: 53 bytes |
| // // uuit_t is a char[16] |
| // tid.reserve(53); |
| // u_int64_t* u1 = (u_int64_t*)uuid; |
| // u_int64_t* u2 = (u_int64_t*)(uuid + sizeof(u_int64_t)); |
| // std::stringstream s; |
| // s << "tid:" << std::hex << std::setfill('0') << std::setw(16) << uuidSeq.next() << ":" << std::setw(16) << *u1 << std::setw(16) << *u2; |
| // tid.assign(s.str()); |
| |
| // Binary tid: 24 bytes |
| tid.reserve(24); |
| u_int64_t c = uuidSeq.next(); |
| tid.append((char*)&c, sizeof(c)); |
| tid.append((char*)&uuid, sizeof(uuid)); |
| } |
| } |
| |
| TxnCtxt::TxnCtxt(std::string _tid, IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), tid(_tid), txn(0) {} |
| |
| TxnCtxt::~TxnCtxt() { abort(); } |
| |
| void TxnCtxt::sync() { |
| if (loggedtx) { |
| try { |
| for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) |
| jrnl_flush(static_cast<JournalImpl*>(*i)); |
| if (preparedXidStorePtr) |
| jrnl_flush(preparedXidStorePtr); |
| for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) |
| jrnl_sync(static_cast<JournalImpl*>(*i), &journal::jcntl::_aio_cmpl_timeout); |
| if (preparedXidStorePtr) |
| jrnl_sync(preparedXidStorePtr, &journal::jcntl::_aio_cmpl_timeout); |
| } catch (const journal::jexception& e) { |
| THROW_STORE_EXCEPTION(std::string("Error during txn sync: ") + e.what()); |
| } |
| } |
| } |
| |
| void TxnCtxt::jrnl_flush(JournalImpl* jc) { |
| if (jc && !(jc->is_txn_synced(getXid()))) |
| jc->flush(); |
| } |
| |
| void TxnCtxt::jrnl_sync(JournalImpl* jc, timespec* timeout) { |
| if (!jc || jc->is_txn_synced(getXid())) |
| return; |
| while (jc->get_wr_aio_evt_rem()) { |
| if (jc->get_wr_events(timeout) == journal::jerrno::AIO_TIMEOUT && timeout) |
| THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::jrnl_sync()")); |
| } |
| } |
| |
| void TxnCtxt::begin(DbEnv* env, bool sync) { |
| int err; |
| try { err = env->txn_begin(0, &txn, 0); } |
| catch (const DbException&) { txn = 0; throw; } |
| if (err != 0) { |
| std::ostringstream oss; |
| oss << "Error: Env::txn_begin() returned error code: " << err; |
| THROW_STORE_EXCEPTION(oss.str()); |
| } |
| if (sync) |
| globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser)); |
| } |
| |
| void TxnCtxt::commit() { |
| if (txn) { |
| txn->commit(0); |
| txn = 0; |
| globalHolder.reset(); |
| } |
| } |
| |
| void TxnCtxt::abort(){ |
| if (txn) { |
| txn->abort(); |
| txn = 0; |
| globalHolder.reset(); |
| } |
| } |
| |
| DbTxn* TxnCtxt::get() { return txn; } |
| |
| bool TxnCtxt::isTPC() { return false; } |
| |
| const std::string& TxnCtxt::getXid() { return tid; } |
| |
| void TxnCtxt::addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); } |
| |
| void TxnCtxt::complete(bool commit) { completeTxn(commit); } |
| |
| bool TxnCtxt::impactedQueuesEmpty() { return impactedQueues.empty(); } |
| |
| DataTokenImpl* TxnCtxt::getDtok() { return dtokp.get(); } |
| |
| void TxnCtxt::incrDtokRef() { dtokp->addRef(); } |
| |
| void TxnCtxt::recoverDtok(const u_int64_t rid, const std::string xid) { |
| dtokp->set_rid(rid); |
| dtokp->set_wstate(DataTokenImpl::ENQ); |
| dtokp->set_xid(xid); |
| dtokp->set_external_rid(true); |
| } |
| |
| TPCTxnCtxt::TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {} |
| |
| }} |