blob: 0e123360994f4cae24e32182354946c0eca4fb21 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <activemq/util/Config.h>
#include <activemq/connector/stomp/StompConnector.h>
#include <activemq/transport/Transport.h>
#include <activemq/connector/stomp/StompTopic.h>
#include <activemq/connector/stomp/StompQueue.h>
#include <activemq/connector/stomp/commands/ConnectCommand.h>
#include <activemq/connector/stomp/commands/ErrorCommand.h>
#include <activemq/connector/stomp/commands/BeginCommand.h>
#include <activemq/connector/stomp/commands/AbortCommand.h>
#include <activemq/connector/stomp/commands/AckCommand.h>
#include <activemq/connector/stomp/commands/CommitCommand.h>
#include <activemq/connector/stomp/commands/MessageCommand.h>
#include <activemq/connector/stomp/commands/BytesMessageCommand.h>
#include <activemq/connector/stomp/commands/TextMessageCommand.h>
#include <activemq/connector/stomp/commands/ConnectedCommand.h>
#include <activemq/connector/stomp/commands/DisconnectCommand.h>
#include <activemq/connector/stomp/StompProducerInfo.h>
#include <activemq/connector/stomp/StompTransactionInfo.h>
#include <activemq/connector/stomp/StompConnectionNegotiator.h>
#include <decaf/lang/Long.h>
#include <decaf/util/concurrent/Concurrent.h>
using namespace std;
using namespace activemq;
using namespace activemq::connector;
using namespace activemq::util;
using namespace activemq::transport;
using namespace activemq::exceptions;
using namespace activemq::connector::stomp;
using namespace activemq::connector::stomp::commands;
using namespace decaf;
using namespace decaf::util;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
LOGDECAF_INITIALIZE( logger, StompConnector, "activemq.connector.stomp.StompConnector" )
////////////////////////////////////////////////////////////////////////////////
StompConnector::StompConnector( Transport* transport,
const decaf::util::Properties& properties )
throw ( IllegalArgumentException ) {
if( transport == NULL ) {
throw IllegalArgumentException(
__FILE__, __LINE__,
"StompConnector::StompConnector - Transport cannot be NULL");
}
this->state = CONNECTION_STATE_DISCONNECTED;
this->exceptionListener = NULL;
this->messageListener = NULL;
this->sessionManager = NULL;
this->properties.copy( &properties );
// Create the connection negotiator and wrap our transport
this->transport = new StompConnectionNegotiator( transport, false );
// Observe the transport for events.
this->transport->setCommandListener( this );
this->transport->setTransportExceptionListener( this );
// Setup the reader and writer in the transport.
this->transport->setCommandReader( &reader );
this->transport->setCommandWriter( &writer );
// Register ourself for those commands that we process
addCmdListener( CommandConstants::ERROR_CMD, this );
}
////////////////////////////////////////////////////////////////////////////////
StompConnector::~StompConnector() {
try{
close();
delete transport;
delete sessionManager;
}
AMQ_CATCH_NOTHROW( ActiveMQException )
AMQ_CATCHALL_NOTHROW( )
}
////////////////////////////////////////////////////////////////////////////////
void StompConnector::enforceConnected() throw ( ConnectorException ) {
if( state != CONNECTION_STATE_CONNECTED ) {
throw StompConnectorException(
__FILE__, __LINE__,
"StompConnector::enforceConnected - Not Connected!" );
}
}
////////////////////////////////////////////////////////////////////////////////
void StompConnector::addCmdListener(
commands::CommandConstants::CommandId commandId,
StompCommandListener* listener ) {
cmdListenerMap.insert( make_pair( commandId, listener ) );
}
////////////////////////////////////////////////////////////////////////////////
void StompConnector::removeCmdListener(
commands::CommandConstants::CommandId commandId ) {
cmdListenerMap.erase( commandId );
}
////////////////////////////////////////////////////////////////////////////////
void StompConnector::start() throw( cms::CMSException ) {
try{
synchronized( &mutex ) {
if( state == CONNECTION_STATE_CONNECTED ) {
throw ActiveMQException(
__FILE__, __LINE__,
"StompConnector::start - already started" );
}
// Start the transport - this establishes the socket.
transport->start();
// Send the connect message to the broker.
connect();
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException );
}
////////////////////////////////////////////////////////////////////////////////
void StompConnector::close() throw( cms::CMSException ){
try{
if( state == CONNECTION_STATE_DISCONNECTED ){
return;
}
synchronized( &mutex ) {
// Send the disconnect message to the broker.
disconnect();
// Close the transport now that we've sent the last messages..
transport->close();
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException );
}
////////////////////////////////////////////////////////////////////////////////
void StompConnector::connect() {
try{
// Mark this connector as started.
state = CONNECTION_STATE_CONNECTING;
// Send the connect command to the broker
ConnectCommand cmd;
// Encode User Name and Password and Client ID
string login = getUsername();
if( login.length() > 0 ){
cmd.setLogin( login );
}
string password = getPassword();
if( password.length() > 0 ){
cmd.setPassword( password );
}
string clientId = getClientId();
if( clientId.length() > 0 ){
cmd.setClientId( clientId );
}
Response* response = transport->request( &cmd );
ConnectedCommand* connected =
dynamic_cast< ConnectedCommand* >( response );
if( connected == NULL ) {
delete response;
throw StompConnectorException(
__FILE__, __LINE__,
"StompConnector::connect - "
"Response not a connected response" );
}
// Connected so we now create the SessionManager
sessionManager = new StompSessionManager(
connected->getSessionId(), this, transport );
// Give our message listener to the session manager it will
// notify all the interested clients
sessionManager->setConsumerMessageListener( messageListener );
// Add the Session Manager as the Command Listener for
// Message commands so that it can route them to the
// correct consumers.
addCmdListener( CommandConstants::MESSAGE, sessionManager );
// In Stomp, the client Id is the same as the session id that is
// returned in the Connected response
properties.setProperty(
commands::CommandConstants::toString(
commands::CommandConstants::HEADER_CLIENT_ID ),
connected->getSessionId() );
// Tag us in the Connected State now.
state = CONNECTION_STATE_CONNECTED;
// Clean up
delete response;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void StompConnector::disconnect() {
try{
// Mark state as no longer connected.
state = CONNECTION_STATE_DISCONNECTED;
// Send the disconnect command to the broker.
DisconnectCommand cmd;
transport->oneway( &cmd );
} catch( CommandIOException& ex ){
transport->close();
throw ex;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException );
}
////////////////////////////////////////////////////////////////////////////////
SessionInfo* StompConnector::createSession(
cms::Session::AcknowledgeMode ackMode )
throw( ConnectorException ) {
try {
enforceConnected();
return sessionManager->createSession( ackMode );
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
ConsumerInfo* StompConnector::createConsumer(
const cms::Destination* destination,
SessionInfo* session,
const std::string& selector,
bool noLocal )
throw ( ConnectorException ) {
try{
enforceConnected();
return sessionManager->createConsumer(
destination, session, selector, noLocal );
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
void StompConnector::startConsumer(ConsumerInfo* consumer )
throw ( ConnectorException ) {
try {
enforceConnected();
return sessionManager->startConsumer(consumer);
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
ConsumerInfo* StompConnector::createDurableConsumer(
const cms::Topic* topic,
SessionInfo* session,
const std::string& name,
const std::string& selector,
bool noLocal )
throw ( ConnectorException ) {
try{
enforceConnected();
return sessionManager->createDurableConsumer(
topic, session, name, selector, noLocal );
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
ProducerInfo* StompConnector::createProducer(
const cms::Destination* destination,
SessionInfo* session )
throw ( ConnectorException ) {
try{
enforceConnected();
ProducerInfo* producer = new StompProducerInfo( this );
producer->setDestination( destination );
producer->setProducerId( producerIds.getNextSequenceId() );
producer->setSessionInfo( session );
return producer;
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
cms::Topic* StompConnector::createTopic( const std::string& name,
SessionInfo* session AMQCPP_UNUSED)
throw ( ConnectorException ) {
try {
enforceConnected();
return new StompTopic( name );
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
cms::Queue* StompConnector::createQueue( const std::string& name,
SessionInfo* session AMQCPP_UNUSED)
throw ( ConnectorException ) {
try {
enforceConnected();
return new StompQueue( name );
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
cms::TemporaryTopic* StompConnector::createTemporaryTopic(
SessionInfo* session AMQCPP_UNUSED)
throw ( ConnectorException, UnsupportedOperationException ) {
try {
throw UnsupportedOperationException(
__FILE__, __LINE__,
"StompConnector::createTemporaryTopic - No Stomp Support");
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_RETHROW( UnsupportedOperationException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
return NULL;
}
////////////////////////////////////////////////////////////////////////////////
cms::TemporaryQueue* StompConnector::createTemporaryQueue(
SessionInfo* session AMQCPP_UNUSED)
throw ( ConnectorException, UnsupportedOperationException ) {
try {
throw UnsupportedOperationException(
__FILE__, __LINE__,
"StompConnector::createTemporaryQueue - No Stomp Support");
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_RETHROW( UnsupportedOperationException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
return NULL;
}
////////////////////////////////////////////////////////////////////////////////
void StompConnector::send( cms::Message* message,
ProducerInfo* producerInfo )
throw ( ConnectorException ) {
try{
enforceConnected();
const SessionInfo* session = producerInfo->getSessionInfo();
Command* command = dynamic_cast< transport::Command* >( message );
if( command == NULL ) {
throw StompConnectorException(
__FILE__, __LINE__,
"StompConnector::send - "
"Message is not a valid stomp type.");
}
if( session->getAckMode() == cms::Session::SESSION_TRANSACTED ) {
StompCommand* stompCommand =
dynamic_cast< StompCommand* >( message );
if( stompCommand == NULL ) {
throw StompConnectorException(
__FILE__, __LINE__,
"StompConnector::send - "
"Message is not a valid stomp type.");
}
stompCommand->setTransactionId(
Long::toString(
session->getTransactionInfo()->getTransactionId() ) );
}
// Send it
transport->oneway( command );
}
catch( CommandIOException& ex ){
transport->close();
throw ConnectorException( __FILE__, __LINE__,
ex.what() );
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
void StompConnector::send( std::list<cms::Message*>& messages,
ProducerInfo* producerInfo )
throw ( ConnectorException ) {
try{
enforceConnected();
list< cms::Message* >::const_iterator itr = messages.begin();
for( ; itr != messages.end(); ++itr ) {
this->send( *itr, producerInfo );
}
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
void StompConnector::acknowledge( const SessionInfo* session,
const ConsumerInfo* consumer AMQCPP_UNUSED,
const cms::Message* message,
AckType ackType AMQCPP_UNUSED)
throw ( ConnectorException ) {
try {
enforceConnected();
// Auto to Stomp means don't do anything, so we drop it here
// for client acknowledge we have to send and ack.
if( session->getAckMode() == cms::Session::CLIENT_ACKNOWLEDGE ) {
AckCommand cmd;
if( message->getCMSMessageID() == "" ) {
throw StompConnectorException(
__FILE__, __LINE__,
"StompConnector::send - "
"Message has no Message Id, cannot ack.");
}
cmd.setMessageId( message->getCMSMessageID() );
transport->oneway( &cmd );
}
}
catch( CommandIOException& ex ){
transport->close();
throw ConnectorException( __FILE__, __LINE__,
ex.what() );
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
TransactionInfo* StompConnector::startTransaction(
SessionInfo* session )
throw ( ConnectorException ) {
try{
enforceConnected();
TransactionInfo* transaction = new StompTransactionInfo( this );
transaction->setTransactionId( transactionIds.getNextSequenceId() );
session->setTransactionInfo( transaction );
BeginCommand cmd;
cmd.setTransactionId(
Long::toString( transaction->getTransactionId() ) );
transport->oneway( &cmd );
return transaction;
}
catch( CommandIOException& ex ){
transport->close();
throw ConnectorException( __FILE__, __LINE__,
ex.what() );
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
return NULL;
}
////////////////////////////////////////////////////////////////////////////////
void StompConnector::commit( TransactionInfo* transaction,
SessionInfo* session AMQCPP_UNUSED)
throw ( ConnectorException ) {
try {
enforceConnected();
CommitCommand cmd;
cmd.setTransactionId(
Long::toString( transaction->getTransactionId() ) );
transport->oneway( &cmd );
}
catch( CommandIOException& ex ){
transport->close();
throw ConnectorException( __FILE__, __LINE__,
ex.what() );
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
void StompConnector::rollback( TransactionInfo* transaction,
SessionInfo* session AMQCPP_UNUSED)
throw ( ConnectorException ) {
try {
enforceConnected();
AbortCommand cmd;
cmd.setTransactionId(
Long::toString( transaction->getTransactionId() ) );
transport->oneway( &cmd );
}
catch( CommandIOException& ex ){
transport->close();
throw ConnectorException( __FILE__, __LINE__, ex.what() );
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* StompConnector::createMessage(
SessionInfo* session AMQCPP_UNUSED,
TransactionInfo* transaction )
throw ( ConnectorException ) {
try{
enforceConnected();
MessageCommand* cmd = new MessageCommand();
if( transaction != NULL ) {
cmd->setTransactionId(
Long::toString( transaction->getTransactionId() ) );
}
return cmd;
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
cms::BytesMessage* StompConnector::createBytesMessage(
SessionInfo* session AMQCPP_UNUSED,
TransactionInfo* transaction )
throw ( ConnectorException ) {
try{
enforceConnected();
BytesMessageCommand* cmd = new BytesMessageCommand();
if( transaction != NULL ) {
cmd->setTransactionId(
Long::toString( transaction->getTransactionId() ) );
}
return cmd;
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
cms::TextMessage* StompConnector::createTextMessage(
SessionInfo* session AMQCPP_UNUSED,
TransactionInfo* transaction )
throw ( ConnectorException ) {
try{
enforceConnected();
TextMessageCommand* cmd = new TextMessageCommand;
if( transaction != NULL ) {
cmd->setTransactionId(
Long::toString( transaction->getTransactionId() ) );
}
return cmd;
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
cms::MapMessage* StompConnector::createMapMessage(
SessionInfo* session AMQCPP_UNUSED,
TransactionInfo* transaction AMQCPP_UNUSED )
throw ( ConnectorException, UnsupportedOperationException ) {
try{
throw UnsupportedOperationException(
__FILE__, __LINE__,
"StompConnector::createMapMessage - No Stomp Support");
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_RETHROW( UnsupportedOperationException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
return NULL;
}
////////////////////////////////////////////////////////////////////////////////
void StompConnector::unsubscribe( const std::string& name AMQCPP_UNUSED )
throw ( ConnectorException, UnsupportedOperationException ) {
try {
throw UnsupportedOperationException(
__FILE__, __LINE__,
"StompConnector::unsubscribe - No Stomp Support");
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_RETHROW( UnsupportedOperationException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
void StompConnector::pullMessage( const connector::ConsumerInfo* info AMQCPP_UNUSED, long long timeout AMQCPP_UNUSED )
throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ) {
try {
throw UnsupportedOperationException(
__FILE__, __LINE__,
"StompConnector::pullMessage - No Stomp Support for Message Pull");
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_RETHROW( UnsupportedOperationException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
void StompConnector::closeResource( ConnectorResource* resource )
throw ( ConnectorException ) {
try{
ConsumerInfo* consumer =
dynamic_cast<ConsumerInfo*>(resource);
SessionInfo* session =
dynamic_cast<SessionInfo*>(resource);
if( consumer != NULL) {
try{
sessionManager->removeConsumer( consumer );
} catch( ConnectorException& ex ){
// Make sure we delete the resource
delete resource;
ex.setMark( __FILE__, __LINE__ );
throw ex;
}
} else if( session != NULL ) {
try{
sessionManager->removeSession( session );
} catch( ConnectorException& ex ){
// Make sure we delete the resource
delete resource;
ex.setMark( __FILE__, __LINE__ );
throw ex;
}
}
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
void StompConnector::onCommand( transport::Command* command ) {
try{
StompCommand* stompCommand = dynamic_cast< StompCommand* >(command);
if( stompCommand == NULL ) {
fire( ConnectorException(
__FILE__, __LINE__,
"StompConnector::onCommand - Recieved an unknown Command") );
}
CmdListenerMap::iterator itr =
cmdListenerMap.find( stompCommand->getStompCommandId() );
if( itr == cmdListenerMap.end() ) {
fire( ConnectorException(
__FILE__, __LINE__,
"StompConnector::onCommand - "
"Recieved command with no listener") );
// This isn't going an farther, so delete it.
delete command;
return; // we are done
}
// Hand off
itr->second->onStompCommand( stompCommand );
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
void StompConnector::onTransportException(
transport::Transport* source AMQCPP_UNUSED,
const decaf::lang::Exception& ex ) {
try{
// We're disconnected - the asynchronous error is expected.
if( state == CONNECTION_STATE_DISCONNECTED ){
return;
}
// We were not closing - log the stack trace.
LOGDECAF_WARN( logger, ex.getStackTraceString() );
// Inform the user of the error.
ActiveMQException convert( ex );
fire( convert );
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
void StompConnector::onStompCommand( commands::StompCommand* command )
throw ( StompConnectorException ) {
try{
ErrorCommand* error =
dynamic_cast<ErrorCommand*>( command );
if( error != NULL ) {
fire( StompConnectorException(
__FILE__, __LINE__,
( string( "StompConnector::onStompCommand - " ) +
error->getErrorMessage() ).c_str() ) );
// Shutdown
close();
}
// command is done here, delete it.
delete command;
}
AMQ_CATCH_RETHROW( StompConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, StompConnectorException )
AMQ_CATCHALL_THROW( StompConnectorException );
}