blob: 80ef494c219baaf910aa1872728183e6b4ac497c [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "unit_test.h"
#include "test_tools.h"
#include "MessagingFixture.h"
#include "qpid/broker/BrokerObserver.h"
#include "qpid/broker/TransactionObserver.h"
#include "qpid/broker/TxBuffer.h"
#include "qpid/broker/Queue.h"
#include "qpid/ha/types.h"
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/lexical_cast.hpp>
#include <iostream>
#include <vector>
namespace qpid {
namespace tests {
using framing::SequenceSet;
using messaging::Message;
using boost::shared_ptr;
using namespace boost::assign;
using namespace boost;
using namespace broker;
using namespace std;
using namespace messaging;
using namespace types;
QPID_AUTO_TEST_SUITE(TransactionalObserverTest)
Message msg(string content) { return Message(content); }
struct MockTransactionObserver : public TransactionObserver {
bool prep;
vector<string> events;
MockTransactionObserver(bool prep_=true) : prep(prep_) {}
void record(const string& e) { events.push_back(e); }
void enqueue(const shared_ptr<Queue>& q, const broker::Message& m) {
record("enqueue "+q->getName()+" "+m.getContent());
}
void dequeue(const Queue::shared_ptr& q, SequenceNumber p, SequenceNumber r) {
record("dequeue "+q->getName()+" "+
lexical_cast<string>(p)+" "+lexical_cast<string>(r));
}
bool prepare() { record("prepare"); return prep; }
void commit() { record("commit"); }
void rollback() {record("rollback"); }
};
struct MockBrokerObserver : public BrokerObserver {
bool prep;
shared_ptr<MockTransactionObserver> tx;
MockBrokerObserver(bool prep_=true) : prep(prep_) {}
void startTx(const intrusive_ptr<TxBuffer>& buffer) {
if (!tx) { // Don't overwrite first tx with automatically started second tx.
tx.reset(new MockTransactionObserver(prep));
buffer->setObserver(tx);
}
}
};
Session simpleTxTransaction(MessagingFixture& fix) {
fix.session.createSender("q1;{create:always}").send(msg("foo")); // Not in TX
// Transaction with 1 enqueue and 1 dequeue.
Session txSession = fix.connection.createTransactionalSession();
BOOST_CHECK_EQUAL("foo", txSession.createReceiver("q1").fetch().getContent());
txSession.acknowledge();
txSession.createSender("q2;{create:always}").send(msg("bar"));
return txSession;
}
QPID_AUTO_TEST_CASE(testTxCommit) {
MessagingFixture fix;
shared_ptr<MockBrokerObserver> brokerObserver(new MockBrokerObserver);
fix.broker->getBrokerObservers().add(brokerObserver);
Session txSession = simpleTxTransaction(fix);
txSession.commit();
// Note on ordering: observers see enqueues as they happen, but dequeues just
// before prepare.
BOOST_CHECK_EQUAL(
list_of<string>("enqueue q2 bar")("dequeue q1 1 0")("prepare")("commit"),
brokerObserver->tx->events
);
}
QPID_AUTO_TEST_CASE(testTxFail) {
MessagingFixture fix;
shared_ptr<MockBrokerObserver> brokerObserver(new MockBrokerObserver(false));
fix.broker->getBrokerObservers().add(brokerObserver);
Session txSession = simpleTxTransaction(fix);
try {
ScopedSuppressLogging sl; // Suppress messages for expected error.
txSession.commit();
BOOST_FAIL("Expected exception");
} catch(...) {}
BOOST_CHECK_EQUAL(
list_of<string>("enqueue q2 bar")("dequeue q1 1 0")("prepare")("rollback"),
brokerObserver->tx->events
);
}
QPID_AUTO_TEST_CASE(testTxRollback) {
MessagingFixture fix;
shared_ptr<MockBrokerObserver> brokerObserver(new MockBrokerObserver(false));
fix.broker->getBrokerObservers().add(brokerObserver);
Session txSession = simpleTxTransaction(fix);
txSession.rollback();
// Note: The dequeue does not appear here. This is because TxAccepts
// (i.e. dequeues) are not enlisted until SemanticState::commit and are
// never enlisted if the transaction is rolled back.
BOOST_CHECK_EQUAL(
list_of<string>("enqueue q2 bar")("rollback"),
brokerObserver->tx->events
);
}
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests