blob: 962e4d82de851278fe208f656ea576c9039692dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "StompSessionManagerTest.h"
#include <activemq/transport/TransportFactoryMap.h>
using namespace activemq;
using namespace activemq::transport;
using namespace activemq::connector;
using namespace activemq::connector::stomp;
using namespace std;
////////////////////////////////////////////////////////////////////////////////
void StompSessionManagerTest::testSessions() {
SessionInfo* info1 = manager->createSession( cms::Session::AUTO_ACKNOWLEDGE );
CPPUNIT_ASSERT( info1->getAckMode() == cms::Session::AUTO_ACKNOWLEDGE );
CPPUNIT_ASSERT( info1->getConnectionId() == connectionId );
SessionInfo* info2 = manager->createSession( cms::Session::DUPS_OK_ACKNOWLEDGE );
CPPUNIT_ASSERT( info2->getAckMode() == cms::Session::DUPS_OK_ACKNOWLEDGE );
CPPUNIT_ASSERT( info2->getConnectionId() == connectionId );
SessionInfo* info3 = manager->createSession( cms::Session::CLIENT_ACKNOWLEDGE );
CPPUNIT_ASSERT( info3->getAckMode() == cms::Session::CLIENT_ACKNOWLEDGE );
CPPUNIT_ASSERT( info3->getConnectionId() == connectionId );
SessionInfo* info4 = manager->createSession( cms::Session::SESSION_TRANSACTED );
CPPUNIT_ASSERT( info4->getAckMode() == cms::Session::SESSION_TRANSACTED );
CPPUNIT_ASSERT( info4->getConnectionId() == connectionId );
delete info1;
delete info2;
delete info3;
delete info4;
}
////////////////////////////////////////////////////////////////////////////////
void StompSessionManagerTest::testConsumers()
{
SessionInfo* info1 = manager->createSession( cms::Session::AUTO_ACKNOWLEDGE );
std::string sel1 = "";
StompTopic dest1( "dummy.topic.1" );
ConsumerInfo* cinfo1 = manager->createConsumer( &dest1, info1, sel1 );
manager->startConsumer( cinfo1 );
CPPUNIT_ASSERT( cinfo1->getSessionInfo() == info1 );
CPPUNIT_ASSERT( cinfo1->getDestination()->toProviderString() == dest1.toProviderString() );
CPPUNIT_ASSERT( cinfo1->getMessageSelector() == sel1 );
SessionInfo* info2 = manager->createSession( cms::Session::DUPS_OK_ACKNOWLEDGE );
std::string sel2 = "mysel2";
StompTopic dest2( "dummy.topic.2" );
ConsumerInfo* cinfo2 = manager->createConsumer( &dest2, info2, sel2 );
manager->startConsumer( cinfo2 );
CPPUNIT_ASSERT( cinfo2->getSessionInfo() == info2 );
CPPUNIT_ASSERT( cinfo2->getDestination()->toProviderString() == dest2.toProviderString() );
CPPUNIT_ASSERT( cinfo2->getMessageSelector() == sel2 );
SessionInfo* info3 = manager->createSession( cms::Session::CLIENT_ACKNOWLEDGE );
std::string sel3 = "mysel3";
StompQueue dest3( "dummy.queue.1" );
ConsumerInfo* cinfo3 = manager->createConsumer( &dest3, info3, sel3 );
manager->startConsumer( cinfo3 );
CPPUNIT_ASSERT( cinfo3->getSessionInfo() == info3 );
CPPUNIT_ASSERT( cinfo3->getDestination()->toProviderString() == dest3.toProviderString() );
CPPUNIT_ASSERT( cinfo3->getMessageSelector() == sel3 );
SessionInfo* info4 = manager->createSession( cms::Session::SESSION_TRANSACTED );
std::string sel4 = "";
StompTopic dest4( "dummy.queue.2" );
ConsumerInfo* cinfo4 = manager->createConsumer( &dest4, info4, sel4 );
manager->startConsumer( cinfo4 );
CPPUNIT_ASSERT( cinfo4->getSessionInfo() == info4 );
CPPUNIT_ASSERT( cinfo4->getDestination()->toProviderString() == dest4.toProviderString() );
CPPUNIT_ASSERT( cinfo4->getMessageSelector() == sel4 );
delete info1;
delete info2;
delete info3;
delete info4;
delete cinfo1;
delete cinfo2;
delete cinfo3;
delete cinfo4;
}
////////////////////////////////////////////////////////////////////////////////
void StompSessionManagerTest::testCommand()
{
StompTopic dest1( "dummy.topic" );
StompTopic dest2( "dummy.topic2" );
SessionInfo* info1 = manager->createSession( cms::Session::AUTO_ACKNOWLEDGE );
ConsumerInfo* cinfo1 = manager->createConsumer( &dest1, info1, "" );
manager->startConsumer( cinfo1 );
SessionInfo* info2 = manager->createSession( cms::Session::DUPS_OK_ACKNOWLEDGE );
ConsumerInfo* cinfo2 = manager->createConsumer( &dest1, info2, "" );
manager->startConsumer( cinfo2 );
SessionInfo* info3 = manager->createSession( cms::Session::CLIENT_ACKNOWLEDGE );
ConsumerInfo* cinfo3 = manager->createConsumer( &dest2, info3, "" );
manager->startConsumer( cinfo3 );
SessionInfo* info4 = manager->createSession( cms::Session::SESSION_TRANSACTED );
ConsumerInfo* cinfo4 = manager->createConsumer( &dest2, info4, "" );
manager->startConsumer( cinfo4 );
MyMessageListener listener;
manager->setConsumerMessageListener( &listener );
commands::TextMessageCommand* msg = new commands::TextMessageCommand();
msg->setCMSDestination( &dest1 );
msg->setText( "hello world" );
manager->onStompCommand( msg );
CPPUNIT_ASSERT( listener.consumers.size() == 2 );
for( unsigned int ix=0; ix<listener.consumers.size(); ++ix ){
CPPUNIT_ASSERT( listener.consumers[ix] == cinfo1 ||
listener.consumers[ix] == cinfo2 );
}
// Clean up the consumers list
listener.consumers.clear();
msg = new commands::TextMessageCommand();
msg->setCMSDestination( &dest2 );
msg->setText( "hello world" );
manager->onStompCommand( msg );
CPPUNIT_ASSERT( listener.consumers.size() == 2 );
for( unsigned int ix=0; ix<listener.consumers.size(); ++ix ){
CPPUNIT_ASSERT( listener.consumers[ix] == cinfo3 ||
listener.consumers[ix] == cinfo4 );
}
delete info1;
delete info2;
delete info3;
delete info4;
delete cinfo1;
delete cinfo2;
delete cinfo3;
delete cinfo4;
}
////////////////////////////////////////////////////////////////////////////////
void StompSessionManagerTest::testSendingCommands(){
StompTopic dest1( "dummy.topic.1" );
MyCommandListener cmdListener;
transport->setOutgoingCommandListener( &cmdListener );
SessionInfo* info1 = manager->createSession( cms::Session::AUTO_ACKNOWLEDGE );
ConsumerInfo* cinfo1 = manager->createConsumer( &dest1, info1, "" );
manager->startConsumer( cinfo1 );
CPPUNIT_ASSERT( cmdListener.cmd != NULL );
cmdListener.cmd = NULL;
SessionInfo* info2 = manager->createSession( cms::Session::DUPS_OK_ACKNOWLEDGE );
ConsumerInfo* cinfo2 = manager->createConsumer( &dest1, info2, "" );
manager->startConsumer( cinfo2 );
CPPUNIT_ASSERT( cmdListener.cmd == NULL );
cmdListener.cmd = NULL;
manager->removeConsumer( cinfo1 );
CPPUNIT_ASSERT( cmdListener.cmd == NULL );
cmdListener.cmd = NULL;
manager->removeConsumer( cinfo2 );
CPPUNIT_ASSERT( cmdListener.cmd != NULL );
delete info1;
delete info2;
delete cinfo1;
delete cinfo2;
}
////////////////////////////////////////////////////////////////////////////////
void StompSessionManagerTest::testSubscribeOptions(){
MyProperty retroactive =
std::make_pair( "activemq.retroactive", "true" );
MyProperty prefetchSize =
std::make_pair( "activemq.prefetchSize", "1000" );
MyProperty maxPendingMsgLimit =
std::make_pair( "activemq.maximumPendingMessageLimit", "0" );
MyProperty noLocal =
std::make_pair( "activemq.noLocal", "true" );
MyProperty dispatchAsync =
std::make_pair( "activemq.dispatchAsync", "true" );
MyProperty selector =
std::make_pair( "selector", "test" );
MyProperty exclusive =
std::make_pair( "activemq.exclusive", "true" );
MyProperty priority =
std::make_pair( "activemq.priority", "1" );
SessionInfo* session = NULL;
ConsumerInfo* consumer = NULL;
MyCommandListener cmdListener;
transport->setOutgoingCommandListener( &cmdListener );
session = manager->createSession( cms::Session::AUTO_ACKNOWLEDGE );
cmdListener.expected.clear();
StompTopic dest1( "dummy.topic.1" );
consumer = manager->createConsumer( &dest1, session, "" );
manager->startConsumer( consumer );
CPPUNIT_ASSERT( consumer != NULL );
CPPUNIT_ASSERT( cmdListener.subscribe != NULL );
manager->removeConsumer( consumer );
CPPUNIT_ASSERT( cmdListener.cmd != NULL );
delete consumer;
cmdListener.cmd = NULL;
cmdListener.subscribe = NULL;
cmdListener.expected.clear();
cmdListener.expected.push_back( retroactive );
StompTopic dest2( "dummy.topic.1?consumer.retroactive=true" );
consumer = manager->createConsumer( &dest2, session, "" );
manager->startConsumer( consumer );
CPPUNIT_ASSERT( consumer != NULL );
CPPUNIT_ASSERT( cmdListener.subscribe != NULL );
manager->removeConsumer( consumer );
CPPUNIT_ASSERT( cmdListener.cmd != NULL );
delete consumer;
cmdListener.cmd = NULL;
cmdListener.subscribe = NULL;
cmdListener.expected.clear();
cmdListener.expected.push_back( retroactive );
cmdListener.expected.push_back( prefetchSize );
cmdListener.expected.push_back( maxPendingMsgLimit );
cmdListener.expected.push_back( noLocal );
cmdListener.expected.push_back( dispatchAsync );
cmdListener.expected.push_back( selector );
cmdListener.expected.push_back( exclusive );
cmdListener.expected.push_back( priority );
StompTopic dest3(
std::string( "dummy.topic.1?" ) +
"consumer.retroactive=" + retroactive.second + "&" +
"consumer.prefetchSize=" + prefetchSize.second + "&" +
"consumer.maximumPendingMessageLimit=" + maxPendingMsgLimit.second + "&" +
"consumer.noLocal=" + noLocal.second + "&" +
"consumer.dispatchAsync=" + dispatchAsync.second + "&" +
"consumer.selector=" + selector.second + "&" +
"consumer.exclusive=" + exclusive.second + "&" +
"consumer.priority=" + priority.second );
consumer = manager->createConsumer( &dest3, session, "" );
manager->startConsumer( consumer );
CPPUNIT_ASSERT( consumer != NULL );
CPPUNIT_ASSERT( cmdListener.subscribe != NULL );
manager->removeConsumer( consumer );
CPPUNIT_ASSERT( cmdListener.cmd != NULL );
delete consumer;
cmdListener.cmd = NULL;
cmdListener.subscribe = NULL;
cmdListener.expected.clear();
cmdListener.expected.push_back( retroactive );
cmdListener.expected.push_back( prefetchSize );
cmdListener.expected.push_back( maxPendingMsgLimit );
cmdListener.expected.push_back( noLocal );
cmdListener.expected.push_back( dispatchAsync );
cmdListener.expected.push_back( selector );
cmdListener.expected.push_back( exclusive );
cmdListener.expected.push_back( priority );
StompTopic dest4(
std::string( "dummy.topic.1?" ) +
"consumer.retroactive=" + retroactive.second + "&" +
"consumer.prefetchSize=" + prefetchSize.second + "&" +
"consumer.maximumPendingMessageLimit=" + maxPendingMsgLimit.second + "&" +
"consumer.dispatchAsync=" + dispatchAsync.second + "&" +
"consumer.selector=" + selector.second + "&" +
"consumer.exclusive=" + exclusive.second + "&" +
"consumer.priority=" + priority.second );
consumer = manager->createConsumer( &dest4, session, "", true );
manager->startConsumer( consumer );
CPPUNIT_ASSERT( consumer != NULL );
CPPUNIT_ASSERT( cmdListener.subscribe != NULL );
manager->removeConsumer( consumer );
CPPUNIT_ASSERT( cmdListener.cmd != NULL );
delete consumer;
cmdListener.cmd = NULL;
cmdListener.subscribe = NULL;
// Done
delete session;
}
////////////////////////////////////////////////////////////////////////////////
void StompSessionManagerTest::setUp() {
this->manager = NULL;
this->connector = NULL;
this->transport = NULL;
this->connectionId = "testConnectionId";
decaf::util::Properties properties;
// Default to Stomp
properties.setProperty( "wireFormat", "stomp" );
properties.setProperty( "client-id", connectionId );
transport::TransportFactory* factory =
transport::TransportFactoryMap::getInstance().lookup( "mock" );
if( factory == NULL ){
CPPUNIT_ASSERT( false );
}
// Create the transport.
this->transport =
dynamic_cast<MockTransport*>( factory->createTransport( properties ) );
if( transport == NULL ){
CPPUNIT_ASSERT( false );
}
// Using a pointer for the connector so we ensure the proper destruction
// order of objects - connector before the transport.
this->connector = new StompConnector( transport, properties );
this->manager = new StompSessionManager( connectionId, NULL, transport );
}
////////////////////////////////////////////////////////////////////////////////
void StompSessionManagerTest::tearDown() {
// Clear this before we go down so it doesn't notif an non-existant client.
transport->setOutgoingCommandListener( NULL );
delete manager;
delete connector;
delete transport;
}