blob: a413fe418d478cf30f27c2e0a43e746f9dd7c8ff [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpid/broker/DtxWorkRecord.h"
#include "qpid/framing/reply_exceptions.h"
#include <boost/format.hpp>
#include <boost/mem_fn.hpp>
using boost::mem_fn;
using qpid::sys::Mutex;
using namespace qpid::broker;
using namespace qpid::framing;
DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) :
xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {}
DtxWorkRecord::~DtxWorkRecord()
{
if (timeout.get()) {
timeout->cancel();
}
}
bool DtxWorkRecord::prepare()
{
Mutex::ScopedLock locker(lock);
if (check()) {
txn = store->begin(xid);
if (prepare(txn.get())) {
store->prepare(*txn);
prepared = true;
} else {
abort();
//TODO: this should probably be flagged as internal error
}
} else {
//some part of the work has been marked rollback only
abort();
}
return prepared;
}
bool DtxWorkRecord::prepare(TransactionContext* _txn)
{
bool succeeded(true);
for (Work::iterator i = work.begin(); succeeded && i != work.end(); i++) {
succeeded = (*i)->prepare(_txn);
}
return succeeded;
}
bool DtxWorkRecord::commit(bool onePhase)
{
Mutex::ScopedLock locker(lock);
if (check()) {
if (prepared) {
//already prepared i.e. 2pc
if (onePhase) {
throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has been prepared, one-phase option not valid!"));
}
store->commit(*txn);
txn.reset();
std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
return true;
} else {
//1pc commit optimisation, don't need a 2pc transaction context:
if (!onePhase) {
throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!"));
}
std::auto_ptr<TransactionContext> localtxn = store->begin();
if (prepare(localtxn.get())) {
store->commit(*localtxn);
std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
return true;
} else {
store->abort(*localtxn);
abort();
//TODO: this should probably be flagged as internal error
return false;
}
}
} else {
//some part of the work has been marked rollback only
abort();
return false;
}
}
void DtxWorkRecord::rollback()
{
Mutex::ScopedLock locker(lock);
check();
abort();
}
void DtxWorkRecord::add(DtxBuffer::shared_ptr ops)
{
Mutex::ScopedLock locker(lock);
if (expired) {
throw DtxTimeoutException(QPID_MSG("Branch with xid " << xid << " has timed out."));
}
if (completed) {
throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been completed!"));
}
work.push_back(ops);
}
bool DtxWorkRecord::check()
{
if (expired) {
throw DtxTimeoutException();
}
if (!completed) {
//iterate through all DtxBuffers and ensure they are all ended
for (Work::iterator i = work.begin(); i != work.end(); i++) {
if (!(*i)->isEnded()) {
throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " not completed!"));
} else if ((*i)->isRollbackOnly()) {
rolledback = true;
}
}
completed = true;
}
return !rolledback;
}
void DtxWorkRecord::abort()
{
if (txn.get()) {
store->abort(*txn);
txn.reset();
}
std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::rollback));
}
void DtxWorkRecord::recover(std::auto_ptr<TPCTransactionContext> _txn, DtxBuffer::shared_ptr ops)
{
add(ops);
txn = _txn;
ops->markEnded();
completed = true;
prepared = true;
}
void DtxWorkRecord::timedout()
{
Mutex::ScopedLock locker(lock);
expired = true;
rolledback = true;
if (!completed) {
for (Work::iterator i = work.begin(); i != work.end(); i++) {
if (!(*i)->isEnded()) {
(*i)->timedout();
}
}
}
abort();
}
size_t DtxWorkRecord::indexOf(const DtxBuffer::shared_ptr& buf) {
Work::iterator i = std::find(work.begin(), work.end(), buf);
if (i == work.end()) throw NotFoundException(
QPID_MSG("Can't find DTX buffer for xid: " << buf->getXid()));
return i - work.begin();
}
DtxBuffer::shared_ptr DtxWorkRecord::operator[](size_t i) const {
if (i > work.size())
throw NotFoundException(
QPID_MSG("Can't find DTX buffer " << i << " for xid: " << xid));
return work[i];
}