blob: e5e9fdb6a6a551846119f8d5cd0d2fc71a453c86 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "SimpleTest.h"
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/library/ActiveMQCPP.h>
#include <activemq/util/CMSListener.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <decaf/lang/Thread.h>
#include <decaf/util/UUID.h>
using namespace std;
using namespace cms;
using namespace activemq;
using namespace activemq::core;
using namespace activemq::test;
using namespace activemq::util;
using namespace activemq::exceptions;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
////////////////////////////////////////////////////////////////////////////////
void SimpleTest::testAutoAck() {
// Create CMS Object for Comms
cms::Session* session(cmsProvider->getSession());
CMSListener listener(session);
cms::MessageConsumer* consumer = cmsProvider->getConsumer();
consumer->setMessageListener(&listener);
cms::MessageProducer* producer = cmsProvider->getProducer();
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
auto_ptr<cms::TextMessage> txtMessage(session->createTextMessage("TEST MESSAGE"));
auto_ptr<cms::BytesMessage> bytesMessage(session->createBytesMessage());
for (unsigned int i = 0; i < IntegrationCommon::defaultMsgCount; ++i) {
producer->send(txtMessage.get());
}
for (unsigned int i = 0; i < IntegrationCommon::defaultMsgCount; ++i) {
producer->send(bytesMessage.get());
}
// Wait for the messages to get here
listener.asyncWaitForMessages(IntegrationCommon::defaultMsgCount * 2);
unsigned int numReceived = listener.getNumReceived();
CPPUNIT_ASSERT(numReceived == IntegrationCommon::defaultMsgCount * 2);
}
////////////////////////////////////////////////////////////////////////////////
void SimpleTest::testClientAck() {
cmsProvider->setAckMode(cms::Session::CLIENT_ACKNOWLEDGE);
cmsProvider->reconnectSession();
// Create CMS Object for Comms
cms::Session* session(cmsProvider->getSession());
CMSListener listener(session);
cms::MessageConsumer* consumer = cmsProvider->getConsumer();
consumer->setMessageListener(&listener);
cms::MessageProducer* producer = cmsProvider->getProducer();
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
auto_ptr<cms::TextMessage> txtMessage(session->createTextMessage("TEST MESSAGE"));
auto_ptr<cms::BytesMessage> bytesMessage(session->createBytesMessage());
for (unsigned int i = 0; i < IntegrationCommon::defaultMsgCount; ++i) {
producer->send(txtMessage.get());
}
for (unsigned int i = 0; i < IntegrationCommon::defaultMsgCount; ++i) {
producer->send(bytesMessage.get());
}
// Wait for the messages to get here
listener.asyncWaitForMessages(IntegrationCommon::defaultMsgCount * 2);
unsigned int numReceived = listener.getNumReceived();
CPPUNIT_ASSERT(numReceived == IntegrationCommon::defaultMsgCount * 2);
}
////////////////////////////////////////////////////////////////////////////////
void SimpleTest::testProducerWithNullDestination() {
// Create CMS Object for Comms
cms::Session* session(cmsProvider->getSession());
CMSListener listener(session);
cms::MessageConsumer* consumer = cmsProvider->getConsumer();
consumer->setMessageListener(&listener);
cms::MessageProducer* producer = cmsProvider->getNoDestProducer();
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
auto_ptr<cms::TextMessage> txtMessage(session->createTextMessage("TEST MESSAGE"));
producer->send(cmsProvider->getDestination(), txtMessage.get());
// Wait for the messages to get here
listener.asyncWaitForMessages(1);
unsigned int numReceived = listener.getNumReceived();
CPPUNIT_ASSERT(numReceived == 1);
}
////////////////////////////////////////////////////////////////////////////////
void SimpleTest::testProducerSendWithNullDestination() {
// Create CMS Object for Comms
cms::Session* session(cmsProvider->getSession());
CMSListener listener(session);
cms::MessageProducer* producer = cmsProvider->getProducer();
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
auto_ptr<cms::TextMessage> txtMessage(session->createTextMessage("TEST MESSAGE"));
CPPUNIT_ASSERT_THROW_MESSAGE("Should Throw an InvalidDestinationException",
producer->send( NULL, txtMessage.get() ), cms::InvalidDestinationException);
producer = cmsProvider->getNoDestProducer();
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
CPPUNIT_ASSERT_THROW_MESSAGE("Should Throw an UnsupportedOperationException",
producer->send( NULL, txtMessage.get() ), cms::UnsupportedOperationException);
}
////////////////////////////////////////////////////////////////////////////////
void SimpleTest::testProducerSendToNonDefaultDestination() {
// Create CMS Object for Comms
cms::Session* session(cmsProvider->getSession());
CMSListener listener(session);
cms::MessageProducer* producer = cmsProvider->getProducer();
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
auto_ptr<cms::TextMessage> txtMessage(session->createTextMessage("TEST MESSAGE"));
auto_ptr<cms::Destination> destination(session->createTemporaryTopic());
CPPUNIT_ASSERT_THROW_MESSAGE("Should Throw an UnsupportedOperationException",
producer->send(destination.get(), txtMessage.get()), cms::UnsupportedOperationException);
}
////////////////////////////////////////////////////////////////////////////////
void SimpleTest::testSyncReceive() {
// Create CMS Object for Comms
cms::Session* session(cmsProvider->getSession());
cms::MessageConsumer* consumer = cmsProvider->getConsumer();
cms::MessageProducer* producer = cmsProvider->getProducer();
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
auto_ptr<cms::TextMessage> txtMessage(session->createTextMessage("TEST MESSAGE"));
// Send some text messages
producer->send(txtMessage.get());
auto_ptr<cms::Message> message(consumer->receive(2000));
CPPUNIT_ASSERT(message.get() != NULL);
}
////////////////////////////////////////////////////////////////////////////////
void SimpleTest::testSyncReceiveClientAck() {
cmsProvider->setAckMode(cms::Session::CLIENT_ACKNOWLEDGE);
cmsProvider->reconnectSession();
// Create CMS Object for Comms
cms::Session* session(cmsProvider->getSession());
cms::MessageConsumer* consumer = cmsProvider->getConsumer();
cms::MessageProducer* producer = cmsProvider->getProducer();
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
auto_ptr<cms::TextMessage> txtMessage(session->createTextMessage("TEST MESSAGE"));
// Send some text messages
producer->send(txtMessage.get());
auto_ptr<cms::Message> message(consumer->receive(2000));
CPPUNIT_ASSERT(message.get() != NULL);
}
////////////////////////////////////////////////////////////////////////////////
void SimpleTest::testMultipleConnections() {
// Create CMS Object for Comms
auto_ptr<ActiveMQConnectionFactory> factory(new ActiveMQConnectionFactory(cmsProvider->getBrokerURL()));
auto_ptr<cms::Connection> connection1(factory->createConnection());
connection1->start();
auto_ptr<cms::Connection> connection2(factory->createConnection());
connection2->start();
CPPUNIT_ASSERT(connection1->getClientID() != connection2->getClientID());
auto_ptr<cms::Session> session1(connection1->createSession());
auto_ptr<cms::Session> session2(connection1->createSession());
auto_ptr<cms::Topic> topic(session1->createTopic(UUID::randomUUID().toString()));
auto_ptr<cms::MessageConsumer> consumer1(session1->createConsumer(topic.get()));
auto_ptr<cms::MessageConsumer> consumer2(session2->createConsumer(topic.get()));
auto_ptr<cms::MessageProducer> producer(session2->createProducer(topic.get()));
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
auto_ptr<cms::TextMessage> textMessage(session2->createTextMessage());
// Send some text messages
producer->send(textMessage.get());
auto_ptr<cms::Message> message(consumer1->receive(2000));
CPPUNIT_ASSERT(message.get() != NULL);
message.reset(consumer2->receive(2000));
CPPUNIT_ASSERT(message.get() != NULL);
// Clean up if we can
consumer1->close();
consumer2->close();
producer->close();
session1->close();
session2->close();
this->cmsProvider->destroyDestination(topic.get());
}
////////////////////////////////////////////////////////////////////////////////
void SimpleTest::testMultipleSessions() {
// Create CMS Object for Comms
auto_ptr<cms::Session> session1(cmsProvider->getConnection()->createSession());
auto_ptr<cms::Session> session2(cmsProvider->getConnection()->createSession());
auto_ptr<cms::Topic> topic(session1->createTopic(UUID::randomUUID().toString()));
auto_ptr<cms::MessageConsumer> consumer1(session1->createConsumer(topic.get()));
auto_ptr<cms::MessageConsumer> consumer2(session2->createConsumer(topic.get()));
auto_ptr<cms::MessageProducer> producer(session2->createProducer(topic.get()));
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
auto_ptr<cms::TextMessage> textMessage(session2->createTextMessage());
// Send some text messages
producer->send(textMessage.get());
auto_ptr<cms::Message> message(consumer1->receive(2000));
CPPUNIT_ASSERT(message.get() != NULL);
message.reset(consumer2->receive(2000));
CPPUNIT_ASSERT(message.get() != NULL);
// Clean up if we can
consumer1->close();
consumer2->close();
producer->close();
session1->close();
session2->close();
this->cmsProvider->destroyDestination(topic.get());
}
////////////////////////////////////////////////////////////////////////////////
void SimpleTest::testReceiveAlreadyInQueue() {
// Create CMS Object for Comms
auto_ptr<ActiveMQConnectionFactory> factory(new ActiveMQConnectionFactory(cmsProvider->getBrokerURL()));
auto_ptr<cms::Connection> connection(factory->createConnection());
auto_ptr<cms::Session> session(connection->createSession());
auto_ptr<cms::Topic> topic(session->createTopic(UUID::randomUUID().toString()));
auto_ptr<cms::MessageConsumer> consumer(session->createConsumer(topic.get()));
auto_ptr<cms::MessageProducer> producer(session->createProducer(topic.get()));
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
auto_ptr<cms::TextMessage> textMessage(session->createTextMessage());
// Send some text messages
producer->send(textMessage.get());
Thread::sleep(250);
connection->start();
auto_ptr<cms::Message> message(consumer->receive(2000));
CPPUNIT_ASSERT(message.get() != NULL);
// Clean up if we can
consumer->close();
producer->close();
session->close();
this->cmsProvider->destroyDestination(topic.get());
}
////////////////////////////////////////////////////////////////////////////////
void SimpleTest::testQuickCreateAndDestroy() {
auto_ptr<ActiveMQConnectionFactory> factory(new ActiveMQConnectionFactory(cmsProvider->getBrokerURL()));
auto_ptr<cms::Connection> connection(factory->createConnection());
auto_ptr<cms::Session> session(connection->createSession());
session.reset( NULL);
connection.reset( NULL);
connection.reset(factory->createConnection());
session.reset(connection->createSession());
connection->start();
session.reset( NULL);
connection.reset( NULL);
for (int i = 0; i < 50; ++i) {
CMSProvider lcmsProvider(this->getBrokerURL());
lcmsProvider.getSession();
lcmsProvider.getConsumer();
lcmsProvider.getProducer();
}
}
////////////////////////////////////////////////////////////////////////////////
void SimpleTest::testBytesMessageSendRecv() {
// Create CMS Object for Comms
cms::Session* session(cmsProvider->getSession());
cms::MessageConsumer* consumer = cmsProvider->getConsumer();
cms::MessageProducer* producer = cmsProvider->getProducer();
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
auto_ptr<cms::BytesMessage> bytesMessage(session->createBytesMessage());
bytesMessage->writeBoolean(true);
bytesMessage->writeByte(127);
bytesMessage->writeDouble(123456.789);
bytesMessage->writeInt(65537);
bytesMessage->writeString("TEST-STRING");
// Send some text messages
producer->send(bytesMessage.get());
auto_ptr<cms::Message> message(consumer->receive(2000));
CPPUNIT_ASSERT(message.get() != NULL);
CPPUNIT_ASSERT_THROW_MESSAGE("Should throw an ActiveMQExceptio", message->setStringProperty("FOO", "BAR"), cms::CMSException);
BytesMessage* bytesMessage2 = dynamic_cast<cms::BytesMessage*>(message.get());
CPPUNIT_ASSERT(bytesMessage2 != NULL);
CPPUNIT_ASSERT_THROW_MESSAGE("Should throw an ActiveMQExceptio", bytesMessage2->writeBoolean(false), cms::CMSException);
CPPUNIT_ASSERT(bytesMessage2->getBodyLength() > 0);
unsigned char* result = bytesMessage2->getBodyBytes();
CPPUNIT_ASSERT(result != NULL);
delete[] result;
bytesMessage2->reset();
CPPUNIT_ASSERT(bytesMessage2->readBoolean() == true);
CPPUNIT_ASSERT(bytesMessage2->readByte() == 127);
CPPUNIT_ASSERT(bytesMessage2->readDouble() == 123456.789);
CPPUNIT_ASSERT(bytesMessage2->readInt() == 65537);
CPPUNIT_ASSERT(bytesMessage2->readString() == "TEST-STRING");
}
////////////////////////////////////////////////////////////////////////////////
namespace {
class Listener: public cms::MessageListener {
private:
bool passed;
bool triggered;
public:
Listener() : MessageListener(), passed(false), triggered(false) {}
virtual ~Listener() {}
bool isPassed() {
return passed;
}
bool isTriggered() {
return triggered;
}
void onMessage(const cms::Message* message) {
try {
triggered = true;
const BytesMessage* bytesMessage = dynamic_cast<const cms::BytesMessage*>(message);
CPPUNIT_ASSERT(bytesMessage != NULL);
CPPUNIT_ASSERT(bytesMessage->getBodyLength() > 0);
unsigned char* result = bytesMessage->getBodyBytes();
CPPUNIT_ASSERT(result != NULL);
delete[] result;
passed = true;
} catch (...) {
passed = false;
}
}
};
}
////////////////////////////////////////////////////////////////////////////////
void SimpleTest::testBytesMessageSendRecvAsync() {
Listener listener;
// Create CMS Object for Comms
cms::Session* session(cmsProvider->getSession());
cms::MessageConsumer* consumer = cmsProvider->getConsumer();
consumer->setMessageListener(&listener);
cms::MessageProducer* producer = cmsProvider->getProducer();
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
auto_ptr<cms::BytesMessage> bytesMessage(session->createBytesMessage());
bytesMessage->writeBoolean(true);
bytesMessage->writeByte(127);
bytesMessage->writeDouble(123456.789);
bytesMessage->writeInt(65537);
bytesMessage->writeString("TEST-STRING");
// Send some text messages
producer->send(bytesMessage.get());
int count = 0;
while (!listener.isTriggered() && count++ < 30) {
decaf::lang::Thread::sleep(100);
}
CPPUNIT_ASSERT(listener.isPassed());
}
////////////////////////////////////////////////////////////////////////////////
void SimpleTest::testLibraryInitShutdownInit() {
{
this->tearDown();
// Shutdown the ActiveMQ library
CPPUNIT_ASSERT_NO_THROW(activemq::library::ActiveMQCPP::shutdownLibrary());
}
{
// Initialize the ActiveMQ library
CPPUNIT_ASSERT_NO_THROW(activemq::library::ActiveMQCPP::initializeLibrary());
cmsProvider.reset(new util::CMSProvider(getBrokerURL()));
}
}