blob: 0133f2b7d00044d004a2dee6937be9940d8f36aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "TransactionTest.h"
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/util/CMSListener.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <stdexcept>
using namespace std;
using namespace cms;
using namespace activemq;
using namespace activemq::core;
using namespace activemq::test;
using namespace activemq::util;
using namespace activemq::exceptions;
////////////////////////////////////////////////////////////////////////////////
TransactionTest::TransactionTest() {
}
////////////////////////////////////////////////////////////////////////////////
TransactionTest::~TransactionTest() {
}
////////////////////////////////////////////////////////////////////////////////
void TransactionTest::testSendReceiveTransactedBatches() {
// Create CMS Object for Comms
cms::Session* session = cmsProvider->getSession();
cms::MessageConsumer* consumer = cmsProvider->getConsumer();
cms::MessageProducer* producer = cmsProvider->getProducer();
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
for (int j = 0; j < batchCount - 8; j++) {
auto_ptr<TextMessage> message(session->createTextMessage("Batch Message"));
for (int i = 0; i < batchSize; i++) {
CPPUNIT_ASSERT_NO_THROW_MESSAGE("Send should not throw an exception here.", producer->send(message.get()));
}
CPPUNIT_ASSERT_NO_THROW_MESSAGE("Session Commit should not throw an exception here:", session->commit());
for (int i = 0; i < batchSize; i++) {
CPPUNIT_ASSERT_NO_THROW_MESSAGE("Receive Shouldn't throw a Message here:",
message.reset(dynamic_cast<TextMessage*>(consumer->receive(1000 * 5))));
CPPUNIT_ASSERT_MESSAGE("Failed to receive all messages in batch", message.get() != NULL);
CPPUNIT_ASSERT(string("Batch Message") == message->getText());
}
CPPUNIT_ASSERT_NO_THROW_MESSAGE("Session Commit should not throw an exception here:", session->commit());
}
}
////////////////////////////////////////////////////////////////////////////////
void TransactionTest::testSendRollback() {
// Create CMS Object for Comms
cms::Session* session = cmsProvider->getSession();
cms::MessageConsumer* consumer = cmsProvider->getConsumer();
cms::MessageProducer* producer = cmsProvider->getProducer();
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
auto_ptr<TextMessage> outbound1(session->createTextMessage("First Message"));
auto_ptr<TextMessage> outbound2(session->createTextMessage("Second Message"));
// sends a message
producer->send(outbound1.get());
session->commit();
// sends a message that gets rollbacked
auto_ptr<Message> rollback(session->createTextMessage("I'm going to get rolled back."));
producer->send(rollback.get());
session->rollback();
// sends a message
producer->send(outbound2.get());
session->commit();
// receives the first message
auto_ptr<TextMessage> inbound1(dynamic_cast<TextMessage*>(consumer->receive(1500)));
// receives the second message
auto_ptr<TextMessage> inbound2(dynamic_cast<TextMessage*>(consumer->receive(4000)));
// validates that the rollbacked was not consumed
session->commit();
CPPUNIT_ASSERT(outbound1->getText() == inbound1->getText());
CPPUNIT_ASSERT(outbound2->getText() == inbound2->getText());
}
////////////////////////////////////////////////////////////////////////////////
void TransactionTest::testSendRollbackCommitRollback() {
// Create CMS Object for Comms
cms::Session* session = cmsProvider->getSession();
cms::MessageConsumer* consumer = cmsProvider->getConsumer();
cms::MessageProducer* producer = cmsProvider->getProducer();
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
auto_ptr<TextMessage> outbound1(session->createTextMessage("First Message"));
auto_ptr<TextMessage> outbound2(session->createTextMessage("Second Message"));
// sends them and then rolls back.
producer->send(outbound1.get());
producer->send(outbound2.get());
session->rollback();
// Send one and commit.
producer->send(outbound1.get());
session->commit();
// receives the first message
auto_ptr<TextMessage> inbound1(dynamic_cast<TextMessage*>(consumer->receive(1500)));
CPPUNIT_ASSERT(NULL == consumer->receive( 1500 ));
CPPUNIT_ASSERT(outbound1->getText() == inbound1->getText());
session->rollback();
inbound1.reset(dynamic_cast<TextMessage*>(consumer->receive(1500)));
CPPUNIT_ASSERT(NULL == consumer->receive( 1500 ));
CPPUNIT_ASSERT(outbound1->getText() == inbound1->getText());
// validates that the rollbacked was not consumed
session->commit();
}
////////////////////////////////////////////////////////////////////////////////
void TransactionTest::testSendSessionClose() {
Pointer<ActiveMQConnectionFactory> connectionFactory(
new ActiveMQConnectionFactory(getBrokerURL()));
Pointer<Connection> connection(connectionFactory->createConnection());
Pointer<ActiveMQConnection> amqConnection = connection.dynamicCast<ActiveMQConnection>();
connection->start();
Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
Pointer<Queue> destination(session->createQueue("testSendSessionClose"));
// Create the messages used for this test
auto_ptr<TextMessage> outbound1(session->createTextMessage("First Message"));
auto_ptr<TextMessage> outbound2(session->createTextMessage("Second Message"));
Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
Pointer<MessageProducer> producer(session->createProducer(destination.get()));
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
// Send Message #1
producer->send(outbound1.get());
session->commit();
// Send a Message but roll it back by closing the session that owns the resources
auto_ptr<cms::Message> rollback(session->createTextMessage("I'm going to get rolled back."));
producer->send(outbound2.get());
session->close();
session.reset(connection->createSession(Session::SESSION_TRANSACTED));
destination.reset(session->createQueue("testSendSessionClose"));
consumer.reset(session->createConsumer(destination.get()));
producer.reset(session->createProducer(destination.get()));
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
// Send Message #2
producer->send(outbound2.get());
session->commit();
// receives the first message
auto_ptr<TextMessage> inbound1(dynamic_cast<TextMessage*>(consumer->receive(1500)));
// receives the second message
auto_ptr<TextMessage> inbound2(dynamic_cast<TextMessage*>(consumer->receive(4000)));
// validates that the rolled back was not consumed
session->commit();
CPPUNIT_ASSERT_MESSAGE("Failed to receive first message", inbound1.get() != NULL);
CPPUNIT_ASSERT_MESSAGE("Failed to receive second message", inbound2.get() != NULL);
CPPUNIT_ASSERT_MESSAGE("First messages aren't equal", outbound1->getText() == inbound1->getText());
CPPUNIT_ASSERT_MESSAGE("Second messages aren't equal", outbound2->getText() == inbound2->getText());
session->close();
amqConnection->destroyDestination(destination.get());
}
////////////////////////////////////////////////////////////////////////////////
void TransactionTest::testWithTTLSet() {
cmsProvider->getProducer()->setDeliveryMode(DeliveryMode::PERSISTENT);
cms::MessageConsumer* consumer = cmsProvider->getConsumer();
auto_ptr<TextMessage> outbound1(cmsProvider->getSession()->createTextMessage("First Message"));
const std::size_t NUM_MESSAGES = 50;
// sends a message
for (std::size_t i = 0; i < NUM_MESSAGES; ++i) {
cmsProvider->getProducer()->send(outbound1.get(), cms::DeliveryMode::PERSISTENT,
cmsProvider->getProducer()->getPriority(), 120 * 1000);
}
cmsProvider->getSession()->commit();
for (std::size_t i = 0; i < NUM_MESSAGES; ++i) {
// receives the second message
auto_ptr<TextMessage> inbound1(dynamic_cast<TextMessage*>(consumer->receive(600000)));
CPPUNIT_ASSERT(inbound1.get() != NULL);
CPPUNIT_ASSERT(outbound1->getText() == inbound1->getText());
}
cmsProvider->getSession()->commit();
}
////////////////////////////////////////////////////////////////////////////////
void TransactionTest::testSessionCommitAfterConsumerClosed() {
ActiveMQConnectionFactory factory(getBrokerURL());
auto_ptr<cms::Connection> connection(factory.createConnection());
{
auto_ptr<cms::Session> session(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
auto_ptr<cms::Queue> queue(session->createQueue("testSessionCommitAfterConsumerClosed"));
auto_ptr<cms::MessageProducer> producer(session->createProducer(queue.get()));
auto_ptr<cms::Message> message(session->createTextMessage("Hello"));
producer->send(message.get());
producer->close();
session->close();
}
auto_ptr<cms::Session> session(connection->createSession(cms::Session::SESSION_TRANSACTED));
auto_ptr<cms::Queue> queue(session->createQueue("testSessionCommitAfterConsumerClosed"));
auto_ptr<cms::MessageConsumer> consumer(session->createConsumer(queue.get()));
connection->start();
auto_ptr<cms::Message> message(consumer->receive(5000));
CPPUNIT_ASSERT(message.get() != NULL);
consumer->close();
session->commit();
}