blob: 453864feceae28e6ca362d16e9a882ce4bbaaa16 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "QueueBrowserTest.h"
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/util/CMSListener.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <cms/Message.h>
#include <cms/TextMessage.h>
#include <cms/Session.h>
#include <decaf/lang/Integer.h>
#include <memory>
using namespace activemq;
using namespace activemq::test;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
////////////////////////////////////////////////////////////////////////////////
QueueBrowserTest::QueueBrowserTest() {
}
////////////////////////////////////////////////////////////////////////////////
QueueBrowserTest::~QueueBrowserTest() {
}
////////////////////////////////////////////////////////////////////////////////
void QueueBrowserTest::testReceiveBrowseReceive() {
cms::Session* session(cmsProvider->getSession());
std::auto_ptr<cms::Queue> queue(session->createQueue("testReceiveBrowseReceive"));
std::auto_ptr<cms::MessageConsumer> consumer(session->createConsumer(queue.get()));
std::auto_ptr<cms::MessageProducer> producer(session->createProducer(queue.get()));
std::auto_ptr<cms::TextMessage> message1(session->createTextMessage("First Message"));
std::auto_ptr<cms::TextMessage> message2(session->createTextMessage("Second Message"));
std::auto_ptr<cms::TextMessage> message3(session->createTextMessage("Third Message"));
// lets consume any outstanding messages from previous test runs
cms::Message* message;
while ((message = consumer->receive(1000)) != NULL) {
delete message;
}
producer->send(message1.get());
producer->send(message2.get());
producer->send(message3.get());
// Get the first.
std::auto_ptr<cms::TextMessage> inbound(dynamic_cast<cms::TextMessage*>(consumer->receive(1000)));
CPPUNIT_ASSERT(inbound.get() != NULL);
CPPUNIT_ASSERT_EQUAL(message1->getText(), inbound->getText());
consumer->close();
std::auto_ptr<cms::QueueBrowser> browser(session->createBrowser(queue.get()));
cms::MessageEnumeration* enumeration = browser->getEnumeration();
// browse the second
CPPUNIT_ASSERT_MESSAGE("should have received the second message", enumeration->hasMoreMessages());
inbound.reset(dynamic_cast<cms::TextMessage*>(enumeration->nextMessage()));
CPPUNIT_ASSERT(inbound.get() != NULL);
CPPUNIT_ASSERT_EQUAL(message2->getText(), inbound->getText());
// browse the third.
CPPUNIT_ASSERT_MESSAGE("should have received the third message", enumeration->hasMoreMessages());
inbound.reset(dynamic_cast<cms::TextMessage*>(enumeration->nextMessage()));
CPPUNIT_ASSERT(inbound.get() != NULL);
CPPUNIT_ASSERT_EQUAL(message3->getText(), inbound->getText());
// There should be no more.
bool tooMany = false;
while (enumeration->hasMoreMessages()) {
tooMany = true;
}
CPPUNIT_ASSERT_MESSAGE("Should not have browsed any more messages", !tooMany);
browser->close();
// Re-open the consumer
consumer.reset(session->createConsumer(queue.get()));
// Receive the second.
inbound.reset(dynamic_cast<cms::TextMessage*>(consumer->receive(1000)));
CPPUNIT_ASSERT(inbound.get() != NULL);
CPPUNIT_ASSERT_EQUAL(message2->getText(), inbound->getText());
// Receive the third.
inbound.reset(dynamic_cast<cms::TextMessage*>(consumer->receive(1000)));
CPPUNIT_ASSERT(inbound.get() != NULL);
CPPUNIT_ASSERT_EQUAL(message3->getText(), inbound->getText());
consumer->close();
browser->close();
producer->close();
cmsProvider->close();
}
////////////////////////////////////////////////////////////////////////////////
void QueueBrowserTest::testBrowseReceive() {
std::auto_ptr<cms::TextMessage> inbound;
cms::Session* session(cmsProvider->getSession());
std::auto_ptr<cms::Queue> queue(session->createQueue("testBrowseReceive"));
std::auto_ptr<cms::TextMessage> message1(session->createTextMessage("First Message"));
std::auto_ptr<cms::MessageProducer> producer(session->createProducer(queue.get()));
producer->send(message1.get());
// create browser first
std::auto_ptr<cms::QueueBrowser> browser(session->createBrowser(queue.get()));
cms::MessageEnumeration* enumeration = browser->getEnumeration();
// create consumer
std::auto_ptr<cms::MessageConsumer> consumer(session->createConsumer(queue.get()));
// browse the first message
CPPUNIT_ASSERT_MESSAGE("should have received the first message", enumeration->hasMoreMessages());
inbound.reset(dynamic_cast<cms::TextMessage*>(enumeration->nextMessage()));
CPPUNIT_ASSERT(inbound.get() != NULL);
CPPUNIT_ASSERT_EQUAL(message1->getText(), inbound->getText());
// Receive the first message.
inbound.reset(dynamic_cast<cms::TextMessage*>(consumer->receive(1000)));
CPPUNIT_ASSERT(inbound.get() != NULL);
CPPUNIT_ASSERT_EQUAL(message1->getText(), inbound->getText());
consumer->close();
browser->close();
producer->close();
}
////////////////////////////////////////////////////////////////////////////////
void QueueBrowserTest::testQueueBrowserWith2Consumers() {
static const int numMessages = 100;
ActiveMQConnection* connection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection());
CPPUNIT_ASSERT(connection != NULL);
connection->setAlwaysSyncSend(false);
std::auto_ptr<cms::Session> session(connection->createSession(cms::Session::CLIENT_ACKNOWLEDGE));
std::auto_ptr<cms::Queue> queue(session->createQueue("testQueueBrowserWith2Consumers"));
std::auto_ptr<cms::Queue> queuePrefetch10(
session->createQueue("testQueueBrowserWith2Consumers?consumer.prefetchSize=10"));
std::auto_ptr<cms::Queue> queuePrefetch1(
session->createQueue("testQueueBrowserWith2Consumers?consumer.prefetchSize=1"));
std::auto_ptr<ActiveMQConnectionFactory> factory(new ActiveMQConnectionFactory(cmsProvider->getBrokerURL()));
std::auto_ptr<ActiveMQConnection> connection2(dynamic_cast<ActiveMQConnection*>(factory->createConnection()));
connection2->start();
std::auto_ptr<cms::Session> session2(connection2->createSession(cms::Session::AUTO_ACKNOWLEDGE));
std::auto_ptr<cms::MessageProducer> producer(session->createProducer(queue.get()));
std::auto_ptr<cms::MessageConsumer> consumer(session->createConsumer(queuePrefetch10.get()));
producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
for (int i = 0; i < numMessages; i++) {
std::auto_ptr<cms::TextMessage> message(
session->createTextMessage(std::string("Message: ") + Integer::toString(i)));
producer->send(message.get());
}
std::auto_ptr<cms::QueueBrowser> browser(session2->createBrowser(queuePrefetch1.get()));
cms::MessageEnumeration* browserView = browser->getEnumeration();
std::vector<cms::Message*> messages;
for (int i = 0; i < numMessages; i++) {
cms::Message* m1 = consumer->receive(5000);
CPPUNIT_ASSERT_MESSAGE(std::string("m1 is null for index: ") + Integer::toString(i), m1 != NULL);
messages.push_back(m1);
}
for (int i = 0; i < numMessages && browserView->hasMoreMessages(); i++) {
cms::Message* m1 = messages[i];
cms::Message* m2 = browserView->nextMessage();
CPPUNIT_ASSERT_MESSAGE(std::string("m2 is null for index: ") + Integer::toString(i), m2 != NULL);
CPPUNIT_ASSERT(m1->getCMSMessageID() == m2->getCMSMessageID());
delete m2;
}
CPPUNIT_ASSERT_MESSAGE("nothing left in the browser", !browserView->hasMoreMessages());
CPPUNIT_ASSERT_MESSAGE("consumer finished", consumer->receiveNoWait() == NULL);
for (std::size_t ix = 0; ix < messages.size(); ++ix) {
cms::Message* msg = messages[ix];
msg->acknowledge();
delete msg;
}
}
////////////////////////////////////////////////////////////////////////////////
void QueueBrowserTest::testRepeatedQueueBrowserCreateDestroy() {
ActiveMQConnection* connection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection());
CPPUNIT_ASSERT(connection != NULL);
std::auto_ptr<cms::Session> session(connection->createSession(cms::Session::SESSION_TRANSACTED));
std::auto_ptr<cms::Queue> queue(session->createTemporaryQueue());
std::auto_ptr<cms::MessageProducer> producer(session->createProducer(queue.get()));
std::auto_ptr<cms::TextMessage> textMessage(session->createTextMessage("Test"));
producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
producer->send(textMessage.get());
session->commit();
connection->start();
std::auto_ptr<cms::QueueBrowser> browser(session->createBrowser(queue.get()));
for (int i = 0; i < 200; i++) {
browser.reset(session->createBrowser(queue.get()));
cms::MessageEnumeration* browserView = browser->getEnumeration();
if (browserView->hasMoreMessages()) {
std::auto_ptr<cms::Message> message(browserView->nextMessage());
CPPUNIT_ASSERT(message.get() != NULL);
}
browser.reset(NULL);
}
}
////////////////////////////////////////////////////////////////////////////////
void QueueBrowserTest::testRepeatedQueueBrowserCreateDestroyWithMessageInQueue() {
ActiveMQConnection* connection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection());
CPPUNIT_ASSERT(connection != NULL);
std::auto_ptr<cms::Session> session(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
std::auto_ptr<cms::Queue> queue(session->createTemporaryQueue());
std::auto_ptr<cms::MessageProducer> producer(session->createProducer(queue.get()));
std::auto_ptr<cms::TextMessage> textMessage(session->createTextMessage("Test"));
producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
for (int i = 0 ; i < 10; ++i) {
producer->send(textMessage.get());
}
connection->start();
std::auto_ptr<cms::QueueBrowser> browser(session->createBrowser(queue.get()));
for (int i = 0; i < 200; i++) {
browser.reset(session->createBrowser(queue.get()));
cms::MessageEnumeration* browserView = browser->getEnumeration();
if (browserView->hasMoreMessages()) {
std::auto_ptr<cms::Message> message(browserView->nextMessage());
CPPUNIT_ASSERT(message.get() != NULL);
}
browser.reset(NULL);
}
}
////////////////////////////////////////////////////////////////////////////////
void QueueBrowserTest::testBrowsingExpirationIsIgnored() {
const int MESSAGES_TO_SEND = 50;
ActiveMQConnection* connection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection());
CPPUNIT_ASSERT(connection != NULL);
std::auto_ptr<cms::Session> session(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
std::auto_ptr<cms::Queue> queue(session->createTemporaryQueue());
std::auto_ptr<cms::MessageProducer> producer(session->createProducer(queue.get()));
producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
producer->setTimeToLive(1000);
// Load the Queue with messages set to expire.
for (int i = 1; i <= MESSAGES_TO_SEND; i++) {
std::auto_ptr<cms::TextMessage> textMessage(session->createTextMessage("Message: " + Integer::toString(i)));
producer->send(textMessage.get());
}
std::auto_ptr<cms::QueueBrowser> browser(session->createBrowser(queue.get()));
cms::MessageEnumeration* enumeration = browser->getEnumeration();
int browsed = 0;
Thread::sleep(1000);
while (enumeration->hasMoreMessages()) {
std::auto_ptr<cms::Message> message(enumeration->nextMessage());
CPPUNIT_ASSERT(message.get() != NULL);
browsed++;
}
CPPUNIT_ASSERT_EQUAL_MESSAGE("Should have browsed all", MESSAGES_TO_SEND, browsed);
browser->close();
}