blob: 21101620200ecab48d502e10b1b68627d90df228 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "StompConnectionNegotiator.h"
#include <activemq/connector/stomp/commands/ConnectCommand.h>
#include <activemq/connector/stomp/commands/ConnectedCommand.h>
using namespace std;
using namespace activemq;
using namespace activemq::exceptions;
using namespace activemq::transport;
using namespace activemq::concurrent;
using namespace activemq::connector;
using namespace activemq::connector::stomp;
using namespace activemq::connector::stomp::commands;
////////////////////////////////////////////////////////////////////////////////
StompConnectionNegotiator::StompConnectionNegotiator( Transport* next, bool own ) :
TransportFilter( next, own ),
readyCountDownLatch(1)
{
this->connected = false;
this->closed = true;
this->connectedCmd = NULL;
}
////////////////////////////////////////////////////////////////////////////////
StompConnectionNegotiator::~StompConnectionNegotiator()
{
// Close the transport and destroy it.
close();
}
////////////////////////////////////////////////////////////////////////////////
void StompConnectionNegotiator::oneway( Command* command )
throw( CommandIOException, exceptions::UnsupportedOperationException ) {
try{
if( closed || next == NULL ){
throw CommandIOException(
__FILE__, __LINE__,
"StompConnectionNegotiator::oneway - transport already closed" );
}
next->oneway( command );
}
AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
AMQ_CATCH_RETHROW( CommandIOException )
AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException )
AMQ_CATCHALL_THROW( CommandIOException )
}
////////////////////////////////////////////////////////////////////////////////
Response* StompConnectionNegotiator::request( Command* command )
throw( CommandIOException, exceptions::UnsupportedOperationException ) {
try{
if( closed || next == NULL ){
throw CommandIOException(
__FILE__, __LINE__,
"StompConnectionNegotiator::request - transport already closed" );
}
// Once connected we just pass through all requests.
if( connected ) {
return next->request( command );
} else {
ConnectCommand* connect = dynamic_cast<ConnectCommand*>( command );
if( connect == NULL ) {
throw CommandIOException(
__FILE__,
__LINE__,
"StompConnectionNegotiator::request"
"Invalid Command Received: only a connect command "
"can be sent before connected." );
}
// Send the connect request
next->oneway( command );
if( !readyCountDownLatch.await( negotiationTimeout ) ) {
throw CommandIOException(
__FILE__,
__LINE__,
"StompConnectionNegotiator::request"
"Connection Negotiate timeout: peer did not "
"send a connected command." );
}
// return the connected command
return connectedCmd;
}
}
AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
AMQ_CATCH_RETHROW( CommandIOException )
AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException )
AMQ_CATCHALL_THROW( CommandIOException )
}
////////////////////////////////////////////////////////////////////////////////
void StompConnectionNegotiator::onCommand( Command* command ) {
ConnectedCommand* response =
dynamic_cast<ConnectedCommand*>( command );
if( response != NULL && !this->connected ) {
// Store for the main thread.
this->connectedCmd = new ConnectedWrapper( *response );
this->connected = true;
// Done with this, we have a copy of its internals.
delete response;
readyCountDownLatch.countDown();
return;
}
// Send along to the next interested party.
fire( command );
}
////////////////////////////////////////////////////////////////////////////////
void StompConnectionNegotiator::onTransportException(
Transport* source AMQCPP_UNUSED,
const exceptions::ActiveMQException& ex ) {
readyCountDownLatch.countDown();
fire( ex );
}
////////////////////////////////////////////////////////////////////////////////
void StompConnectionNegotiator::start() throw( cms::CMSException ){
/**
* We're already started.
*/
if( !closed ){
return;
}
if( commandlistener == NULL ){
throw exceptions::ActiveMQException(
__FILE__, __LINE__,
"StompConnectionNegotiator::start - "
"commandListener is invalid" );
}
if( exceptionListener == NULL ){
throw exceptions::ActiveMQException(
__FILE__, __LINE__,
"StompConnectionNegotiator::start - "
"exceptionListener is invalid" );
}
if( next == NULL ){
throw exceptions::ActiveMQException(
__FILE__, __LINE__,
"StompConnectionNegotiator::start - "
"next transport is NULL" );
}
// Start the delegate transport object.
next->start();
// Mark it as open.
closed = false;
connected = false;
}
////////////////////////////////////////////////////////////////////////////////
void StompConnectionNegotiator::close() throw( cms::CMSException ){
if( !closed && next != NULL ){
next->close();
}
closed = true;
connected = false;
}