blob: 8a77c48faf4ab080abac82ce4d074f953e764b6c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "OpenWireMessageListenerRedeliveryTest.h"
#include <cms/ConnectionFactory.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <activemq/core/policies/DefaultRedeliveryPolicy.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <decaf/lang/Pointer.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/ArrayList.h>
#include <decaf/util/concurrent/TimeUnit.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/util/concurrent/atomic/AtomicInteger.h>
#include <memory>
using namespace cms;
using namespace std;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace decaf::util::concurrent::atomic;
using namespace activemq;
using namespace activemq::core;
using namespace activemq::test;
using namespace activemq::test::openwire;
////////////////////////////////////////////////////////////////////////////////
namespace {
static std::string DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = "dlqDeliveryFailureCause";
cms::Connection* createConnection(const std::string& brokerUri) {
ActiveMQConnectionFactory factory(brokerUri);
factory.getRedeliveryPolicy()->setInitialRedeliveryDelay(0);
factory.getRedeliveryPolicy()->setRedeliveryDelay(1000);
factory.getRedeliveryPolicy()->setMaximumRedeliveries(3);
factory.getRedeliveryPolicy()->setBackOffMultiplier((short) 2);
factory.getRedeliveryPolicy()->setUseExponentialBackOff(true);
return factory.createConnection();
}
class TestMessageListener : public cms::MessageListener {
private:
int counter;
cms::Session* session;
public:
TestMessageListener(cms::Session* session) : counter(0), session(session) {}
int getCounter() {
return counter;
}
virtual void onMessage(const cms::Message* message) {
try {
counter++;
if (counter <= 4) {
session->rollback();
} else {
message->acknowledge();
session->commit();
}
} catch (CMSException& e) {
}
}
};
class ExceptionMessageListener : public cms::MessageListener {
private:
CountDownLatch doneLatch;
ArrayList<std::string> received;
std::string testName;
int maxDeliveries;
int count;
public:
ExceptionMessageListener(const std::string& testName, int messageCount, int maxDeliveries) :
doneLatch(messageCount),
received(),
testName(testName),
maxDeliveries(maxDeliveries),
count(0) {
}
int getCount() {
return count;
}
ArrayList<std::string>& getReceived() {
return received;
}
bool await(long long timeout, const TimeUnit& unit) {
return doneLatch.await(timeout, unit);
}
virtual void onMessage(const cms::Message* message) {
try {
const TextMessage* textMessage = dynamic_cast<const cms::TextMessage*>(message);
received.add(textMessage->getText());
} catch (cms::CMSException& e) {
e.printStackTrace();
}
if (++count < maxDeliveries) {
throw decaf::lang::exceptions::RuntimeException(
__FILE__, __LINE__, testName.append(" forced a redelivery").c_str());
}
// new blood
count = 0;
doneLatch.countDown();
}
};
class TrackingMessageListener : public cms::MessageListener {
private:
CountDownLatch doneLatch;
ArrayList< Pointer<cms::Message> > received;
std::string testName;
int count;
public:
TrackingMessageListener(const std::string& testName) :
doneLatch(1),
received(),
testName(testName),
count(0) {
}
TrackingMessageListener(const std::string& testName, int expected) :
doneLatch(expected),
received(),
testName(testName),
count(0) {
}
int getCount() {
return received.size();
}
ArrayList< Pointer<cms::Message> >& getReceived() {
return received;
}
bool await(long long timeout, const TimeUnit& unit) {
return doneLatch.await(timeout, unit);
}
virtual void onMessage(const cms::Message* message) {
try {
Pointer<cms::Message> copy(message->clone());
received.add(copy);
doneLatch.countDown();
} catch (cms::CMSException& e) {
e.printStackTrace();
}
}
};
class FailingMessageListener : public cms::MessageListener {
private:
CountDownLatch doneLatch;
cms::Session* session;
std::string testName;
public:
FailingMessageListener(cms::Session* session, const std::string& testName, int expected) :
doneLatch(expected),
session(session),
testName(testName) {
}
bool await(long long timeout, const TimeUnit& unit) {
return doneLatch.await(timeout, unit);
}
virtual void onMessage(const cms::Message* message) {
try {
doneLatch.countDown();
if (session->isTransacted()) {
session->rollback();
}
} catch (cms::CMSException& e) {
e.printStackTrace();
}
throw decaf::lang::exceptions::RuntimeException(
__FILE__, __LINE__, (testName + " forced a redelivery").c_str());
}
};
}
////////////////////////////////////////////////////////////////////////////////
OpenWireMessageListenerRedeliveryTest::OpenWireMessageListenerRedeliveryTest() {
}
////////////////////////////////////////////////////////////////////////////////
OpenWireMessageListenerRedeliveryTest::~OpenWireMessageListenerRedeliveryTest() {
}
////////////////////////////////////////////////////////////////////////////////
void OpenWireMessageListenerRedeliveryTest::testQueueRollbackConsumerListener() {
std::auto_ptr<cms::Connection> connection(createConnection(getBrokerURL()));
connection->start();
std::auto_ptr<Session> session(connection->createSession(cms::Session::SESSION_TRANSACTED));
std::auto_ptr<Queue> queue(session->createQueue("testQueueRollbackConsumerListener"));
std::auto_ptr<MessageProducer> producer(session->createProducer(queue.get()));
std::auto_ptr<Message> message(session->createTextMessage("Hello"));
ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection.get());
amqConnection->destroyDestination(queue.get());
producer->setDeliveryMode(cms::DeliveryMode::PERSISTENT);
producer->send(message.get());
session->commit();
std::auto_ptr<MessageConsumer> consumer(session->createConsumer(queue.get()));
TestMessageListener listener(session.get());
consumer->setMessageListener(&listener);
TimeUnit::MILLISECONDS.sleep(500);
// first try.. should get 2 since there is no delay on the first redelivery.
CPPUNIT_ASSERT_EQUAL(2, listener.getCounter());
TimeUnit::MILLISECONDS.sleep(1000);
// 2nd redeliver (redelivery after 1 sec)
CPPUNIT_ASSERT_EQUAL(3, listener.getCounter());
TimeUnit::MILLISECONDS.sleep(2000);
// 3rd redeliver (redelivery after 2 seconds) - it should give up after that
CPPUNIT_ASSERT_EQUAL(4, listener.getCounter());
// create new message
std::auto_ptr<Message> secondMessage(session->createTextMessage("Hello 2"));
producer->send(secondMessage.get());
session->commit();
TimeUnit::MILLISECONDS.sleep(500);
// it should be committed, so no redelivery
CPPUNIT_ASSERT_EQUAL(5, listener.getCounter());
TimeUnit::MILLISECONDS.sleep(1500);
// no redelivery, counter should still be 4
CPPUNIT_ASSERT_EQUAL(5, listener.getCounter());
connection->close();
}
////////////////////////////////////////////////////////////////////////////////
void OpenWireMessageListenerRedeliveryTest::testQueueSessionListenerExceptionRetry() {
std::auto_ptr<cms::Connection> connection(createConnection(getBrokerURL()));
connection->start();
std::auto_ptr<Session> session(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
std::auto_ptr<Queue> queue(session->createQueue("testQueueSessionListenerExceptionRetry"));
std::auto_ptr<MessageProducer> producer(session->createProducer(queue.get()));
std::auto_ptr<Message> message1(session->createTextMessage("1"));
std::auto_ptr<Message> message2(session->createTextMessage("2"));
ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection.get());
amqConnection->destroyDestination(queue.get());
producer->setDeliveryMode(cms::DeliveryMode::PERSISTENT);
producer->send(message1.get());
producer->send(message2.get());
std::auto_ptr<MessageConsumer> consumer(session->createConsumer(queue.get()));
int maxDeliveries = amqConnection->getRedeliveryPolicy()->getMaximumRedeliveries();
ExceptionMessageListener listener("testQueueSessionListenerExceptionRetry", 2, maxDeliveries);
consumer->setMessageListener(&listener);
CPPUNIT_ASSERT_MESSAGE("got message before retry expiry", listener.await(30, TimeUnit::SECONDS));
for (int i = 0; i < maxDeliveries; i++) {
CPPUNIT_ASSERT_EQUAL_MESSAGE("got first redelivered: " + Integer::toString(i),
std::string("1"), listener.getReceived().get(i));
}
for (int i = maxDeliveries; i < maxDeliveries * 2; i++) {
CPPUNIT_ASSERT_EQUAL_MESSAGE("got first redelivered: " + Integer::toString(i),
std::string("2"), listener.getReceived().get(i));
}
connection->close();
}
////////////////////////////////////////////////////////////////////////////////
void OpenWireMessageListenerRedeliveryTest::testQueueSessionListenerExceptionDlq() {
const std::string TEST_NAME = "testQueueSessionListenerExceptionDlq";
std::auto_ptr<cms::Connection> connection(createConnection(getBrokerURL()));
connection->start();
std::auto_ptr<Session> session(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
std::auto_ptr<Queue> queue(session->createQueue(TEST_NAME));
std::auto_ptr<Queue> dlq(session->createQueue("ActiveMQ.DLQ"));
ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection.get());
amqConnection->destroyDestination(queue.get());
amqConnection->destroyDestination(dlq.get());
std::auto_ptr<MessageProducer> producer(session->createProducer(queue.get()));
std::auto_ptr<Message> message(session->createTextMessage("1"));
producer->setDeliveryMode(cms::DeliveryMode::PERSISTENT);
producer->send(message.get());
// Track messages going to DLQ
TrackingMessageListener dlqListener(TEST_NAME);
std::auto_ptr<MessageConsumer> dlqConsumer(session->createConsumer(dlq.get()));
dlqConsumer->setMessageListener(&dlqListener);
// Receive and throw
int maxDeliveries = amqConnection->getRedeliveryPolicy()->getMaximumRedeliveries();
FailingMessageListener listener(session.get(), TEST_NAME, maxDeliveries);
std::auto_ptr<MessageConsumer> consumer(session->createConsumer(queue.get()));
consumer->setMessageListener(&listener);
CPPUNIT_ASSERT_MESSAGE("got message before retry expiry", listener.await(20, TimeUnit::SECONDS));
// check DLQ
CPPUNIT_ASSERT_MESSAGE("got dlq message", dlqListener.await(30, TimeUnit::SECONDS));
// check DLQ message cause is captured
Pointer<cms::Message> dlqMessage = dlqListener.getReceived().get(0);
CPPUNIT_ASSERT_MESSAGE("dlq message captured", dlqMessage != NULL);
String cause = dlqMessage->getStringProperty(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
CPPUNIT_ASSERT_MESSAGE("cause 'cause' exception is remembered", cause.contains("Exception"));
CPPUNIT_ASSERT_MESSAGE("is correct exception", cause.contains(TEST_NAME));
CPPUNIT_ASSERT_MESSAGE("cause exception is remembered", cause.contains("JMSException"));
CPPUNIT_ASSERT_MESSAGE("cause policy is remembered", cause.contains("RedeliveryPolicy"));
connection->close();
}
////////////////////////////////////////////////////////////////////////////////
void OpenWireMessageListenerRedeliveryTest::testTransactedQueueSessionListenerExceptionDlq() {
const std::string TEST_NAME = "testTransactedQueueSessionListenerExceptionDlq";
std::auto_ptr<cms::Connection> connection(createConnection(getBrokerURL()));
connection->start();
std::auto_ptr<Session> session(connection->createSession(cms::Session::SESSION_TRANSACTED));
std::auto_ptr<Queue> queue(session->createQueue(TEST_NAME));
std::auto_ptr<Queue> dlq(session->createQueue("ActiveMQ.DLQ"));
ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection.get());
amqConnection->destroyDestination(queue.get());
amqConnection->destroyDestination(dlq.get());
std::auto_ptr<MessageProducer> producer(session->createProducer(queue.get()));
std::auto_ptr<Message> message(session->createTextMessage("1"));
producer->setDeliveryMode(cms::DeliveryMode::PERSISTENT);
producer->send(message.get());
session->commit();
// Track messages going to DLQ
TrackingMessageListener dlqListener(TEST_NAME);
std::auto_ptr<MessageConsumer> dlqConsumer(session->createConsumer(dlq.get()));
dlqConsumer->setMessageListener(&dlqListener);
// Receive and throw
int maxDeliveries = amqConnection->getRedeliveryPolicy()->getMaximumRedeliveries();
FailingMessageListener listener(session.get(), TEST_NAME, maxDeliveries);
std::auto_ptr<MessageConsumer> consumer(session->createConsumer(queue.get()));
consumer->setMessageListener(&listener);
CPPUNIT_ASSERT_MESSAGE("got message before retry expiry", listener.await(20, TimeUnit::SECONDS));
// check DLQ
CPPUNIT_ASSERT_MESSAGE("got dlq message", dlqListener.await(30, TimeUnit::SECONDS));
// check DLQ message cause is captured
Pointer<cms::Message> dlqMessage = dlqListener.getReceived().get(0);
CPPUNIT_ASSERT_MESSAGE("dlq message captured", dlqMessage != NULL);
String cause = dlqMessage->getStringProperty(DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
CPPUNIT_ASSERT_MESSAGE("cause 'cause' exception is remembered", cause.contains("Exception"));
CPPUNIT_ASSERT_MESSAGE("is correct exception", cause.contains(TEST_NAME));
CPPUNIT_ASSERT_MESSAGE("cause exception is remembered", cause.contains("JMSException"));
CPPUNIT_ASSERT_MESSAGE("cause policy is remembered", cause.contains("RedeliveryPolicy"));
connection->close();
}