| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "OpenwireSimpleTest.h" |
| |
| #include <activemq/util/CMSListener.h> |
| #include <activemq/core/ActiveMQConnectionFactory.h> |
| #include <activemq/core/ActiveMQConnection.h> |
| #include <activemq/core/PrefetchPolicy.h> |
| #include <activemq/exceptions/ActiveMQException.h> |
| |
| #include <decaf/util/UUID.h> |
| #include <decaf/lang/Thread.h> |
| |
| using namespace std; |
| using namespace cms; |
| using namespace activemq; |
| using namespace activemq::core; |
| using namespace activemq::test; |
| using namespace activemq::test::openwire; |
| using namespace activemq::util; |
| using namespace activemq::exceptions; |
| using namespace decaf; |
| using namespace decaf::lang; |
| using namespace decaf::util; |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| OpenwireSimpleTest::OpenwireSimpleTest() { |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| OpenwireSimpleTest::~OpenwireSimpleTest() { |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void OpenwireSimpleTest::testWithZeroConsumerPrefetch() { |
| |
| cmsProvider->setTopic(false); |
| cmsProvider->setDestinationName(UUID::randomUUID().toString() + "?consumer.prefetchSize=0"); |
| |
| cmsProvider->reconnectSession(); |
| |
| // Create CMS Object for Comms |
| cms::Session* session(cmsProvider->getSession()); |
| cms::MessageConsumer* consumer = cmsProvider->getConsumer(); |
| cms::MessageProducer* producer = cmsProvider->getProducer(); |
| producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); |
| |
| auto_ptr<cms::TextMessage> txtMessage(session->createTextMessage("TEST MESSAGE")); |
| |
| // Send some text messages |
| producer->send(txtMessage.get()); |
| |
| auto_ptr<cms::Message> message(consumer->receive(1000)); |
| CPPUNIT_ASSERT(message.get() != NULL); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void OpenwireSimpleTest::testWithZeroConsumerPrefetch2() { |
| |
| cmsProvider->setTopic(false); |
| ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection()); |
| amqConnection->getPrefetchPolicy()->setQueuePrefetch(0); |
| amqConnection->getPrefetchPolicy()->setTopicPrefetch(0); |
| cmsProvider->reconnectSession(); |
| |
| // Create CMS Object for Comms |
| cms::Session* session(cmsProvider->getSession()); |
| cms::MessageConsumer* consumer = cmsProvider->getConsumer(); |
| cms::MessageProducer* producer = cmsProvider->getProducer(); |
| producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); |
| |
| auto_ptr<cms::TextMessage> txtMessage(session->createTextMessage("TEST MESSAGE")); |
| |
| // Send some text messages |
| producer->send(txtMessage.get()); |
| |
| auto_ptr<cms::Message> message(consumer->receive(1000)); |
| CPPUNIT_ASSERT(message.get() != NULL); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void OpenwireSimpleTest::testWithZeroConsumerPrefetchAndNoMessage() { |
| |
| cmsProvider->setTopic(false); |
| cmsProvider->setDestinationName(UUID::randomUUID().toString() + "?consumer.prefetchSize=0"); |
| |
| cmsProvider->reconnectSession(); |
| |
| // Create CMS Object for Comms |
| cms::Session* session(cmsProvider->getSession()); |
| cms::MessageConsumer* consumer = cmsProvider->getConsumer(); |
| |
| // Should be no message and no exceptions |
| auto_ptr<cms::Message> message(consumer->receiveNoWait()); |
| CPPUNIT_ASSERT(message.get() == NULL); |
| |
| // Should be no message and no exceptions |
| message.reset(consumer->receive(1000)); |
| CPPUNIT_ASSERT(message.get() == NULL); |
| |
| consumer->close(); |
| session->close(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void OpenwireSimpleTest::testWithZeroConsumerPrefetchAndNoMessage2() { |
| |
| cmsProvider->setTopic(false); |
| ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection()); |
| amqConnection->getPrefetchPolicy()->setQueuePrefetch(0); |
| amqConnection->getPrefetchPolicy()->setTopicPrefetch(0); |
| cmsProvider->reconnectSession(); |
| |
| // Create CMS Object for Comms |
| cms::Session* session(cmsProvider->getSession()); |
| cms::MessageConsumer* consumer = cmsProvider->getConsumer(); |
| |
| // Should be no message and no exceptions |
| auto_ptr<cms::Message> message(consumer->receiveNoWait()); |
| CPPUNIT_ASSERT(message.get() == NULL); |
| |
| // Should be no message and no exceptions |
| message.reset(consumer->receive(1000)); |
| CPPUNIT_ASSERT(message.get() == NULL); |
| |
| consumer->close(); |
| session->close(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void OpenwireSimpleTest::testMapMessageSendToQueue() { |
| |
| cmsProvider->setTopic(false); |
| cmsProvider->setDestinationName(UUID::randomUUID().toString() + "?consumer.prefetchSize=0"); |
| |
| cmsProvider->reconnectSession(); |
| |
| // Create CMS Object for Comms |
| cms::Session* session(cmsProvider->getSession()); |
| cms::MessageConsumer* consumer = cmsProvider->getConsumer(); |
| cms::MessageProducer* producer = cmsProvider->getProducer(); |
| producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); |
| |
| unsigned char byteValue = 'A'; |
| char charValue = 'B'; |
| bool booleanValue = true; |
| short shortValue = 2048; |
| int intValue = 655369; |
| long long longValue = 0xFFFFFFFF00000000ULL; |
| float floatValue = 45.6545f; |
| double doubleValue = 654564.654654; |
| std::string stringValue = "The test string"; |
| |
| auto_ptr<cms::MapMessage> mapMessage(session->createMapMessage()); |
| |
| mapMessage->setString("stringKey", stringValue); |
| mapMessage->setBoolean("boolKey", booleanValue); |
| mapMessage->setByte("byteKey", byteValue); |
| mapMessage->setChar("charKey", charValue); |
| mapMessage->setShort("shortKey", shortValue); |
| mapMessage->setInt("intKey", intValue); |
| mapMessage->setLong("longKey", longValue); |
| mapMessage->setFloat("floatKey", floatValue); |
| mapMessage->setDouble("doubleKey", doubleValue); |
| |
| std::vector<unsigned char> bytes; |
| bytes.push_back(65); |
| bytes.push_back(66); |
| bytes.push_back(67); |
| bytes.push_back(68); |
| bytes.push_back(69); |
| mapMessage->setBytes("bytesKey", bytes); |
| |
| // Send some text messages |
| producer->send(mapMessage.get()); |
| |
| auto_ptr<cms::Message> message(consumer->receive(2000)); |
| CPPUNIT_ASSERT(message.get() != NULL); |
| |
| cms::MapMessage* recvMapMessage = dynamic_cast<MapMessage*>(message.get()); |
| CPPUNIT_ASSERT(recvMapMessage != NULL); |
| CPPUNIT_ASSERT(recvMapMessage->getString("stringKey") == stringValue); |
| CPPUNIT_ASSERT(recvMapMessage->getBoolean("boolKey") == booleanValue); |
| CPPUNIT_ASSERT(recvMapMessage->getByte("byteKey") == byteValue); |
| CPPUNIT_ASSERT(recvMapMessage->getChar("charKey") == charValue); |
| CPPUNIT_ASSERT(recvMapMessage->getShort("shortKey") == shortValue); |
| CPPUNIT_ASSERT(recvMapMessage->getInt("intKey") == intValue); |
| CPPUNIT_ASSERT(recvMapMessage->getLong("longKey") == longValue); |
| CPPUNIT_ASSERT(recvMapMessage->getFloat("floatKey") == floatValue); |
| CPPUNIT_ASSERT(recvMapMessage->getDouble("doubleKey") == doubleValue); |
| CPPUNIT_ASSERT(recvMapMessage->getBytes("bytesKey") == bytes); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void OpenwireSimpleTest::testMapMessageSendToTopic() { |
| |
| // Create CMS Object for Comms |
| cms::Session* session(cmsProvider->getSession()); |
| cms::MessageConsumer* consumer = cmsProvider->getConsumer(); |
| cms::MessageProducer* producer = cmsProvider->getProducer(); |
| producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); |
| |
| unsigned char byteValue = 'A'; |
| char charValue = 'B'; |
| bool booleanValue = true; |
| short shortValue = 2048; |
| int intValue = 655369; |
| long long longValue = 0xFFFFFFFF00000000ULL; |
| float floatValue = 45.6545f; |
| double doubleValue = 654564.654654; |
| std::string stringValue = "The test string"; |
| |
| auto_ptr<cms::MapMessage> mapMessage(session->createMapMessage()); |
| |
| mapMessage->setString("stringKey", stringValue); |
| mapMessage->setBoolean("boolKey", booleanValue); |
| mapMessage->setByte("byteKey", byteValue); |
| mapMessage->setChar("charKey", charValue); |
| mapMessage->setShort("shortKey", shortValue); |
| mapMessage->setInt("intKey", intValue); |
| mapMessage->setLong("longKey", longValue); |
| mapMessage->setFloat("floatKey", floatValue); |
| mapMessage->setDouble("doubleKey", doubleValue); |
| |
| std::vector<unsigned char> bytes; |
| bytes.push_back(65); |
| bytes.push_back(66); |
| bytes.push_back(67); |
| bytes.push_back(68); |
| bytes.push_back(69); |
| mapMessage->setBytes("bytesKey", bytes); |
| |
| // Send some text messages |
| producer->send(mapMessage.get()); |
| |
| auto_ptr<cms::Message> message(consumer->receive(2000)); |
| CPPUNIT_ASSERT(message.get() != NULL); |
| |
| cms::MapMessage* recvMapMessage = dynamic_cast<MapMessage*>(message.get()); |
| CPPUNIT_ASSERT(recvMapMessage != NULL); |
| CPPUNIT_ASSERT(recvMapMessage->getString("stringKey") == stringValue); |
| CPPUNIT_ASSERT(recvMapMessage->getBoolean("boolKey") == booleanValue); |
| CPPUNIT_ASSERT(recvMapMessage->getByte("byteKey") == byteValue); |
| CPPUNIT_ASSERT(recvMapMessage->getChar("charKey") == charValue); |
| CPPUNIT_ASSERT(recvMapMessage->getShort("shortKey") == shortValue); |
| CPPUNIT_ASSERT(recvMapMessage->getInt("intKey") == intValue); |
| CPPUNIT_ASSERT(recvMapMessage->getLong("longKey") == longValue); |
| CPPUNIT_ASSERT(recvMapMessage->getFloat("floatKey") == floatValue); |
| CPPUNIT_ASSERT(recvMapMessage->getDouble("doubleKey") == doubleValue); |
| CPPUNIT_ASSERT(recvMapMessage->getBytes("bytesKey") == bytes); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void OpenwireSimpleTest::testDestroyDestination() { |
| |
| try { |
| |
| cmsProvider->setDestinationName("testDestroyDestination"); |
| cmsProvider->reconnectSession(); |
| |
| // Create CMS Object for Comms |
| cms::Session* session(cmsProvider->getSession()); |
| cms::MessageConsumer* consumer = cmsProvider->getConsumer(); |
| cms::MessageProducer* producer = cmsProvider->getProducer(); |
| producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); |
| |
| auto_ptr<cms::TextMessage> txtMessage(session->createTextMessage("TEST MESSAGE")); |
| |
| // Send some text messages |
| producer->send(txtMessage.get()); |
| |
| auto_ptr<cms::Message> message(consumer->receive(1000)); |
| CPPUNIT_ASSERT(message.get() != NULL); |
| |
| ActiveMQConnection* connection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection()); |
| |
| CPPUNIT_ASSERT(connection != NULL); |
| |
| try { |
| connection->destroyDestination(cmsProvider->getDestination()); |
| CPPUNIT_ASSERT_MESSAGE("Destination Should be in use.", false); |
| } catch (ActiveMQException& ex) { |
| } |
| |
| cmsProvider->reconnectSession(); |
| |
| connection->destroyDestination(cmsProvider->getDestination()); |
| |
| } catch (ActiveMQException& ex) { |
| ex.printStackTrace(); |
| CPPUNIT_ASSERT_MESSAGE("CAUGHT EXCEPTION", false); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void OpenwireSimpleTest::tesstStreamMessage() { |
| |
| // Create CMS Object for Comms |
| cms::Session* session(cmsProvider->getSession()); |
| cms::MessageConsumer* consumer = cmsProvider->getConsumer(); |
| cms::MessageProducer* producer = cmsProvider->getProducer(); |
| producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); |
| |
| unsigned char byteValue = 'A'; |
| char charValue = 'B'; |
| bool booleanValue = true; |
| short shortValue = 2048; |
| int intValue = 655369; |
| long long longValue = 0xFFFFFFFF00000000ULL; |
| float floatValue = 45.6545f; |
| double doubleValue = 654564.654654; |
| std::string stringValue = "The test string"; |
| |
| auto_ptr<cms::StreamMessage> streamMessage(session->createStreamMessage()); |
| |
| streamMessage->writeString(stringValue); |
| streamMessage->writeBoolean(booleanValue); |
| streamMessage->writeByte(byteValue); |
| streamMessage->writeChar(charValue); |
| streamMessage->writeShort(shortValue); |
| streamMessage->writeInt(intValue); |
| streamMessage->writeLong(longValue); |
| streamMessage->writeFloat(floatValue); |
| streamMessage->writeDouble(doubleValue); |
| |
| std::vector<unsigned char> bytes; |
| std::vector<unsigned char> readBytes(100); |
| bytes.push_back(65); |
| bytes.push_back(66); |
| bytes.push_back(67); |
| bytes.push_back(68); |
| bytes.push_back(69); |
| streamMessage->writeBytes(bytes); |
| |
| // Send some text messages |
| producer->send(streamMessage.get()); |
| |
| auto_ptr<cms::Message> message(consumer->receive(2000)); |
| CPPUNIT_ASSERT(message.get() != NULL); |
| |
| cms::StreamMessage* rcvStreamMessage = dynamic_cast<StreamMessage*>(message.get()); |
| CPPUNIT_ASSERT(rcvStreamMessage != NULL); |
| CPPUNIT_ASSERT(rcvStreamMessage->readString() == stringValue); |
| CPPUNIT_ASSERT(rcvStreamMessage->readBoolean() == booleanValue); |
| CPPUNIT_ASSERT(rcvStreamMessage->readByte() == byteValue); |
| CPPUNIT_ASSERT(rcvStreamMessage->readChar() == charValue); |
| CPPUNIT_ASSERT(rcvStreamMessage->readShort() == shortValue); |
| CPPUNIT_ASSERT(rcvStreamMessage->readInt() == intValue); |
| CPPUNIT_ASSERT(rcvStreamMessage->readLong() == longValue); |
| CPPUNIT_ASSERT(rcvStreamMessage->readFloat() == floatValue); |
| CPPUNIT_ASSERT(rcvStreamMessage->readDouble() == doubleValue); |
| CPPUNIT_ASSERT(rcvStreamMessage->readBytes(readBytes) == (int )bytes.size()); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void OpenwireSimpleTest::testMessageIdSetOnSend() { |
| |
| // Create CMS Object for Comms |
| cms::Session* session(cmsProvider->getSession()); |
| cms::MessageProducer* producer = cmsProvider->getProducer(); |
| producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); |
| |
| auto_ptr<cms::Message> message(session->createMessage()); |
| producer->send(message.get()); |
| |
| CPPUNIT_ASSERT(message->getCMSMessageID() != ""); |
| CPPUNIT_ASSERT(message->getCMSDestination() != NULL); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void OpenwireSimpleTest::testReceiveWithSessionSyncDispatch() { |
| |
| ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection()); |
| amqConnection->setAlwaysSessionAsync(false); |
| |
| cmsProvider->reconnectSession(); |
| |
| // Create CMS Object for Comms |
| cms::Session* session(cmsProvider->getSession()); |
| cms::MessageConsumer* consumer = cmsProvider->getConsumer(); |
| cms::MessageProducer* producer = cmsProvider->getProducer(); |
| producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); |
| |
| auto_ptr<cms::TextMessage> txtMessage(session->createTextMessage("TEST MESSAGE")); |
| |
| // Send some text messages |
| producer->send(txtMessage.get()); |
| |
| auto_ptr<cms::Message> message(consumer->receive(1000)); |
| CPPUNIT_ASSERT(message.get() != NULL); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void OpenwireSimpleTest::testWithZeroConsumerPrefetchAndZeroRedelivery() { |
| |
| ActiveMQConnectionFactory factory(getBrokerURL()); |
| auto_ptr<cms::Connection> connection(factory.createConnection()); |
| |
| connection->start(); |
| |
| { |
| auto_ptr<cms::Session> session(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE)); |
| auto_ptr<cms::Queue> queue(session->createQueue("testWithZeroConsumerPrefetchAndZeroRedelivery")); |
| auto_ptr<cms::MessageProducer> producer(session->createProducer(queue.get())); |
| |
| auto_ptr<cms::Message> message(session->createTextMessage("Hello")); |
| producer->send(message.get()); |
| producer->close(); |
| session->close(); |
| } |
| |
| { |
| auto_ptr<cms::Session> session(connection->createSession(cms::Session::SESSION_TRANSACTED)); |
| auto_ptr<cms::Queue> queue(session->createQueue("testWithZeroConsumerPrefetchAndZeroRedelivery")); |
| auto_ptr<cms::MessageConsumer> consumer(session->createConsumer(queue.get())); |
| |
| auto_ptr<cms::Message> message(consumer->receive(5000)); |
| CPPUNIT_ASSERT(message.get() != NULL); |
| |
| session->rollback(); |
| session->close(); |
| connection->close(); |
| } |
| |
| connection.reset(factory.createConnection()); |
| connection->start(); |
| ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection.get()); |
| |
| // Now we test the zero prefetc + zero max redelivery case. |
| amqConnection->getRedeliveryPolicy()->setMaximumRedeliveries(0); |
| amqConnection->getPrefetchPolicy()->setQueuePrefetch(0); |
| |
| auto_ptr<cms::Session> session(connection->createSession(cms::Session::SESSION_TRANSACTED)); |
| auto_ptr<cms::Queue> queue(session->createQueue("testWithZeroConsumerPrefetchAndZeroRedelivery")); |
| auto_ptr<cms::MessageConsumer> consumer(session->createConsumer(queue.get())); |
| |
| auto_ptr<cms::Message> message(consumer->receive(5000)); |
| CPPUNIT_ASSERT(message.get() == NULL); |
| |
| session->commit(); |
| session->close(); |
| |
| amqConnection->destroyDestination(queue.get()); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void OpenwireSimpleTest::testWithZeroConsumerPrefetchWithInFlightExpiration() { |
| |
| ActiveMQConnectionFactory factory(getBrokerURL()); |
| auto_ptr<cms::Connection> connection(factory.createConnection()); |
| |
| ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection.get()); |
| amqConnection->getPrefetchPolicy()->setAll(0); |
| |
| connection->start(); |
| |
| { |
| auto_ptr<cms::Session> session(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE)); |
| auto_ptr<cms::Queue> queue(session->createQueue("testWithZeroConsumerPrefetchWithInFlightExpiration")); |
| |
| amqConnection->destroyDestination(queue.get()); |
| |
| auto_ptr<cms::MessageProducer> producer(session->createProducer(queue.get())); |
| |
| auto_ptr<cms::Message> expiredMessage(session->createTextMessage("Expired")); |
| auto_ptr<cms::Message> validMessage(session->createTextMessage("Valid")); |
| producer->send(expiredMessage.get(), cms::Message::DEFAULT_DELIVERY_MODE, cms::Message::DEFAULT_MSG_PRIORITY, 2000); |
| producer->send(validMessage.get()); |
| session->close(); |
| } |
| |
| auto_ptr<cms::Session> session(connection->createSession(cms::Session::SESSION_TRANSACTED)); |
| auto_ptr<cms::Queue> queue(session->createQueue("testWithZeroConsumerPrefetchWithInFlightExpiration")); |
| auto_ptr<cms::MessageConsumer> consumer(session->createConsumer(queue.get())); |
| |
| { |
| auto_ptr<cms::Message> message(consumer->receive(5000)); |
| CPPUNIT_ASSERT(message.get() != NULL); |
| TextMessage* received = dynamic_cast<TextMessage*>(message.get()); |
| CPPUNIT_ASSERT_EQUAL(std::string("Expired"), received->getText()); |
| } |
| |
| session->rollback(); |
| Thread::sleep(2500); |
| |
| { |
| auto_ptr<cms::Message> message(consumer->receive(5000)); |
| CPPUNIT_ASSERT(message.get() != NULL); |
| TextMessage* received = dynamic_cast<TextMessage*>(message.get()); |
| CPPUNIT_ASSERT_EQUAL(std::string("Valid"), received->getText()); |
| } |
| |
| session->commit(); |
| session->close(); |
| |
| amqConnection->destroyDestination(queue.get()); |
| } |