blob: 063dc6f3207d79f6e16b51ebfafe2ff014d1cb33 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ActiveMQConnectionTest.h"
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::core::ActiveMQConnectionTest );
#include <activemq/concurrent/Concurrent.h>
#include <activemq/concurrent/Mutex.h>
#include <activemq/concurrent/Thread.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/transport/MockTransport.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/core/ActiveMQConnectionData.h>
#include <activemq/connector/stomp/StompConnector.h>
#include <activemq/connector/openwire/OpenWireConnector.h>
#include <activemq/util/Properties.h>
#include <activemq/transport/MockTransportFactory.h>
#include <activemq/transport/TransportFactoryMap.h>
#include <activemq/connector/stomp/StompConsumerInfo.h>
#include <activemq/connector/stomp/StompProducerInfo.h>
#include <activemq/connector/stomp/StompTransactionInfo.h>
#include <activemq/connector/stomp/StompSessionInfo.h>
#include <activemq/connector/stomp/StompTopic.h>
#include <activemq/connector/stomp/commands/TextMessageCommand.h>
#include <activemq/util/Config.h>
using namespace activemq;
using namespace activemq::core;
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionTest::test1WithStomp()
{
try
{
MyMessageListener listener;
MyExceptionListener exListener;
MyCommandListener cmdListener;
MyDispatcher msgListener;
std::string connectionId = "testConnectionId";
util::Properties* properties =
new util::Properties();
transport::Transport* transport = NULL;
// Default to Stomp
properties->setProperty( "wireFormat", "stomp" );
transport::TransportFactory* factory =
transport::TransportFactoryMap::getInstance().lookup( "mock" );
if( factory == NULL ){
CPPUNIT_ASSERT( false );
}
// Create the transport.
transport = factory->createTransport( *properties );
if( transport == NULL ){
CPPUNIT_ASSERT( false );
}
transport::MockTransport* dTransport =
dynamic_cast< transport::MockTransport*>( transport );
CPPUNIT_ASSERT( dTransport != NULL );
dTransport->setCommandListener( &cmdListener );
connector::stomp::StompConnector* connector =
new connector::stomp::StompConnector(
transport, *properties );
connector->start();
ActiveMQConnection connection(
new ActiveMQConnectionData(
connector, transport, properties) );
// First - make sure exceptions are working.
connection.setExceptionListener( &exListener );
CPPUNIT_ASSERT( exListener.caughtOne == false );
dTransport->fireException( exceptions::ActiveMQException( __FILE__, __LINE__, "test" ) );
CPPUNIT_ASSERT( exListener.caughtOne == true );
cms::Session* session1 = connection.createSession();
cms::Session* session2 = connection.createSession();
cms::Session* session3 = connection.createSession();
CPPUNIT_ASSERT( session1 != NULL );
CPPUNIT_ASSERT( session2 != NULL );
CPPUNIT_ASSERT( session3 != NULL );
connector::stomp::StompSessionInfo session;
connector::stomp::StompConsumerInfo consumer;
session.setSessionId( 1 );
session.setConnectionId( "TEST:123" );
session.setAckMode( cms::Session::AUTO_ACKNOWLEDGE );
connector::stomp::StompTopic myTopic( "test" );
consumer.setConsumerId( 1 );
consumer.setSessionInfo( &session );
consumer.setDestination( &myTopic );
connection.addDispatcher( &consumer, &msgListener );
connector::stomp::commands::TextMessageCommand* cmd =
new connector::stomp::commands::TextMessageCommand;
connector::stomp::StompTopic topic1( "test" );
cmd->setCMSDestination( &topic1 );
connector::ConsumerMessageListener* consumerListener =
dynamic_cast< connector::ConsumerMessageListener* >(
&connection );
connection.start();
CPPUNIT_ASSERT( consumerListener != NULL );
consumerListener->onConsumerMessage( &consumer, cmd );
CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener.messages.size() );
connection.removeDispatcher( &consumer );
msgListener.messages.clear();
consumerListener->onConsumerMessage( &consumer, cmd );
CPPUNIT_ASSERT_EQUAL( 0, (int)msgListener.messages.size() );
connection.addDispatcher( &consumer, &msgListener );
connection.stop();
consumerListener->onConsumerMessage( &consumer, cmd );
connection.start();
CPPUNIT_ASSERT_EQUAL( 1, (int)msgListener.messages.size() );
delete cmd;
cmd = new connector::stomp::commands::TextMessageCommand;
connector::stomp::StompTopic topic2( "test" );
cmd->setCMSDestination( &topic2 );
consumerListener->onConsumerMessage( &consumer, cmd );
CPPUNIT_ASSERT_EQUAL( 2, (int)msgListener.messages.size() );
connection.removeDispatcher( &consumer );
msgListener.messages.clear();
session1->close();
session2->close();
session3->close();
connection.close();
exListener.caughtOne = false;
consumerListener->onConsumerMessage( &consumer, cmd );
CPPUNIT_ASSERT( exListener.caughtOne == true );
delete cmd;
delete session1;
delete session2;
delete session3;
} catch( exceptions::ActiveMQException& ex ) {
ex.printStackTrace();
throw ex;
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionTest::test2WithStomp()
{
try
{
MyMessageListener listener;
MyExceptionListener exListener;
MyCommandListener cmdListener;
MyDispatcher msgListener;
std::string connectionId = "testConnectionId";
util::Properties* properties =
new util::Properties();
transport::Transport* transport = NULL;
// Default to Stomp
properties->setProperty( "wireFormat", "stomp" );
transport::TransportFactory* factory =
transport::TransportFactoryMap::getInstance().lookup( "mock" );
if( factory == NULL ){
CPPUNIT_ASSERT( false );
}
// Create the transport.
transport = factory->createTransport( *properties );
if( transport == NULL ){
CPPUNIT_ASSERT( false );
}
transport::MockTransport* dTransport =
dynamic_cast< transport::MockTransport*>( transport );
CPPUNIT_ASSERT( dTransport != NULL );
dTransport->setCommandListener( &cmdListener );
connector::stomp::StompConnector* connector =
new connector::stomp::StompConnector(
transport, *properties );
connector->start();
ActiveMQConnection connection(
new ActiveMQConnectionData(
connector, transport, properties) );
connection.getClientID();
connection.close();
CPPUNIT_ASSERT( connection.getClientID() == "" );
} catch( exceptions::ActiveMQException& ex ) {
ex.printStackTrace();
throw ex;
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionTest::test2WithOpenwire()
{
try
{
MyMessageListener listener;
MyExceptionListener exListener;
MyCommandListener cmdListener;
MyDispatcher msgListener;
std::string connectionId = "testConnectionId";
util::Properties* properties =
new util::Properties();
transport::Transport* transport = NULL;
// Default to Stomp
properties->setProperty( "wireFormat", "openwire" );
transport::TransportFactory* factory =
transport::TransportFactoryMap::getInstance().lookup( "mock" );
if( factory == NULL ){
CPPUNIT_ASSERT( false );
}
// Create the transport.
transport = factory->createTransport( *properties );
if( transport == NULL ){
CPPUNIT_ASSERT( false );
}
transport::MockTransport* dTransport =
dynamic_cast< transport::MockTransport*>( transport );
CPPUNIT_ASSERT( dTransport != NULL );
dTransport->setCommandListener( &cmdListener );
connector::Connector* connector =
new connector::openwire::OpenWireConnector(
transport, *properties );
connector->start();
ActiveMQConnection connection(
new ActiveMQConnectionData(
connector, transport, properties) );
connection.getClientID();
connection.close();
CPPUNIT_ASSERT( connection.getClientID() == "" );
} catch( exceptions::ActiveMQException& ex ) {
ex.printStackTrace();
throw ex;
}
}