blob: 7327bdfb1d49acb9b56a1998b01c2de205dff1e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "FailoverTransport.h"
#include <activemq/commands/ConnectionControl.h>
#include <activemq/commands/ShutdownInfo.h>
#include <activemq/commands/RemoveInfo.h>
#include <activemq/transport/TransportRegistry.h>
#include <activemq/threads/DedicatedTaskRunner.h>
#include <activemq/threads/CompositeTaskRunner.h>
#include <decaf/util/Random.h>
#include <decaf/lang/System.h>
#include <decaf/lang/Integer.h>
using namespace std;
using namespace activemq;
using namespace activemq::state;
using namespace activemq::commands;
using namespace activemq::exceptions;
using namespace activemq::transport;
using namespace activemq::transport::failover;
using namespace decaf;
using namespace decaf::io;
using namespace decaf::net;
using namespace decaf::util;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
FailoverTransport::FailoverTransport() {
this->timeout = -1;
this->initialReconnectDelay = 10;
this->maxReconnectDelay = 1000 * 30;
this->backOffMultiplier = 2;
this->useExponentialBackOff = true;
this->initialized = false;
this->maxReconnectAttempts = 0;
this->startupMaxReconnectAttempts = 0;
this->connectFailures = 0;
this->reconnectDelay = this->initialReconnectDelay;
this->trackMessages = false;
this->trackTransactionProducers = true;
this->maxCacheSize = 128 * 1024;
this->started = false;
this->closed = false;
this->connected = false;
this->connectionInterruptProcessingComplete = false;
this->firstConnection = true;
this->transportListener = NULL;
this->uris.reset( new URIPool() );
this->stateTracker.setTrackTransactions( true );
this->myTransportListener.reset( new FailoverTransportListener( this ) );
this->closeTask.reset( new CloseTransportsTask() );
this->taskRunner.reset( new CompositeTaskRunner() );
this->backups.reset( new BackupTransportPool( taskRunner, closeTask, uris ) );
this->taskRunner->addTask( this );
this->taskRunner->addTask( this->closeTask.get() );
}
////////////////////////////////////////////////////////////////////////////////
FailoverTransport::~FailoverTransport() {
try{
close();
}
AMQ_CATCH_NOTHROW( Exception )
AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isShutdownCommand( const Pointer<Command>& command ) const {
if( command != NULL ) {
if( command->isShutdownInfo() || command->isRemoveInfo() ) {
return true;
}
}
return false;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::add( const std::string& uri ) {
try {
uris->addURI( URI( uri ) );
reconnect();
}
AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::addURI( const List<URI>& uris ) {
std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
while( iter->hasNext() ) {
this->uris->addURI( iter->next() );
}
reconnect();
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::removeURI( const List<URI>& uris ) {
std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
while( iter->hasNext() ) {
this->uris->removeURI( iter->next() );
}
reconnect();
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::reconnect( const decaf::net::URI& uri )
throw( decaf::io::IOException ) {
try {
this->uris->addURI( uri );
reconnect();
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
AMQ_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setTransportListener( TransportListener* listener ) {
synchronized( &listenerMutex ) {
this->transportListener = listener;
listenerMutex.notifyAll();
}
}
////////////////////////////////////////////////////////////////////////////////
TransportListener* FailoverTransport::getTransportListener() const {
synchronized( &listenerMutex ) {
return this->transportListener;
}
return NULL;
}
////////////////////////////////////////////////////////////////////////////////
std::string FailoverTransport::getRemoteAddress() const {
synchronized( &reconnectMutex ) {
if( connectedTransport != NULL ) {
return connectedTransport->getRemoteAddress();
}
}
return "";
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::oneway( const Pointer<Command>& command )
throw( IOException, decaf::lang::exceptions::UnsupportedOperationException ) {
Pointer<Exception> error;
try {
synchronized( &reconnectMutex ) {
if( isShutdownCommand( command ) && connectedTransport == NULL ) {
if( command->isShutdownInfo() ) {
// Skipping send of ShutdownInfo command when not connected.
return;
}
if( command->isRemoveInfo() || command->isMessageAck() ) {
// Simulate response to RemoveInfo command or Ack as they will be stale.
stateTracker.track( command );
Pointer<Response> response( new Response() );
response->setCorrelationId( command->getCommandId() );
myTransportListener->onCommand( response );
return;
}
}
// Keep trying until the message is sent.
for( int i = 0; !closed; i++ ) {
try {
// Wait for transport to be connected.
Pointer<Transport> transport = connectedTransport;
long long start = System::currentTimeMillis();
bool timedout = false;
while( transport == NULL && !closed && connectionFailure == NULL ) {
long long end = System::currentTimeMillis();
if( timeout > 0 && ( end - start > timeout ) ) {
timedout = true;
break;
}
reconnectMutex.wait( 100 );
transport = connectedTransport;
}
if( transport == NULL ) {
// Previous loop may have exited due to us being disposed.
if( closed ) {
error.reset( new IOException(
__FILE__, __LINE__, "Transport disposed.") );
} else if( connectionFailure != NULL ) {
error = connectionFailure;
} else if( timedout == true ) {
error.reset( new IOException(
__FILE__, __LINE__,
"Failover timeout of %d ms reached.", timeout ) );
} else {
error.reset( new IOException(
__FILE__, __LINE__, "Unexpected failure.") );
}
break;
}
// If it was a request and it was not being tracked by the state
// tracker, then hold it in the requestMap so that we can replay
// it later.
Pointer<Tracked> tracked;
try{
tracked = stateTracker.track( command );
synchronized( &requestMap ) {
if( tracked != NULL && tracked->isWaitingForResponse() ) {
requestMap.put( command->getCommandId(), tracked );
} else if( tracked == NULL && command->isResponseRequired() ) {
requestMap.put( command->getCommandId(), command );
}
}
} catch( Exception& ex ) {
ex.setMark( __FILE__, __LINE__ );
error.reset( ex.clone() );
break;
}
// Send the message.
try {
transport->oneway( command );
stateTracker.trackBack( command );
} catch( IOException& e ) {
e.setMark( __FILE__, __LINE__ );
// If the command was not tracked.. we will retry in
// this method
if( tracked == NULL ) {
// since we will retry in this method.. take it out of the
// request map so that it is not sent 2 times on recovery
if( command->isResponseRequired() ) {
requestMap.remove( command->getCommandId() );
}
// Rethrow the exception so it will handled by
// the outer catch
throw e;
}
}
return;
} catch( IOException& e ) {
e.setMark( __FILE__, __LINE__ );
handleTransportFailure( e );
}
}
}
}
AMQ_CATCH_NOTHROW( Exception )
AMQ_CATCHALL_NOTHROW()
if( !closed ) {
if( error != NULL ) {
throw IOException( *error );
}
}
}
////////////////////////////////////////////////////////////////////////////////
Pointer<Response> FailoverTransport::request( const Pointer<Command>& command AMQCPP_UNUSED )
throw( IOException,
decaf::lang::exceptions::UnsupportedOperationException ) {
throw decaf::lang::exceptions::UnsupportedOperationException(
__FILE__, __LINE__, "FailoverTransport::request - Not Supported" );
}
////////////////////////////////////////////////////////////////////////////////
Pointer<Response> FailoverTransport::request( const Pointer<Command>& command AMQCPP_UNUSED,
unsigned int timeout AMQCPP_UNUSED )
throw( IOException,
decaf::lang::exceptions::UnsupportedOperationException ) {
throw decaf::lang::exceptions::UnsupportedOperationException(
__FILE__, __LINE__, "FailoverTransport::request - Not Supported" );
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::start() throw( IOException ) {
try{
synchronized( &reconnectMutex ) {
if( this->started ) {
return;
}
started = true;
stateTracker.setMaxCacheSize( this->getMaxCacheSize() );
stateTracker.setTrackMessages( this->isTrackMessages() );
stateTracker.setTrackTransactionProducers( this->isTrackTransactionProducers() );
if( connectedTransport != NULL ) {
stateTracker.restore( connectedTransport );
} else {
reconnect();
}
}
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
AMQ_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::stop() throw( IOException ) {
try{
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
AMQ_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::close() throw( IOException ) {
try{
Pointer<Transport> transportToStop;
synchronized( &reconnectMutex ) {
if (!started) {
return;
}
started = false;
closed = true;
connected = false;
backups->setEnabled( false );
requestMap.clear();
if( connectedTransport != NULL ) {
transportToStop.swap( connectedTransport );
}
reconnectMutex.notifyAll();
}
synchronized( &sleepMutex ) {
sleepMutex.notifyAll();
}
taskRunner->shutdown( 2000 );
if( transportToStop != NULL ) {
transportToStop->close();
}
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
AMQ_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::reconnect() {
synchronized( &reconnectMutex ) {
if( started ) {
taskRunner->wakeup();
}
}
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::restoreTransport( const Pointer<Transport>& transport )
throw( IOException ) {
try{
transport->start();
//send information to the broker - informing it we are an ft client
Pointer<ConnectionControl> cc( new ConnectionControl() );
cc->setFaultTolerant( true );
transport->oneway( cc );
stateTracker.restore( transport );
std::vector< Pointer<Command> > commands;
synchronized( &requestMap ) {
commands = requestMap.values();
}
std::vector< Pointer<Command> >::const_iterator iter = commands.begin();
for( ; iter != commands.end(); ++iter ) {
transport->oneway( *iter );
}
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
AMQ_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::handleTransportFailure( const decaf::lang::Exception& error AMQCPP_UNUSED )
throw( decaf::lang::Exception ) {
Pointer<Transport> transport;
synchronized( &reconnectMutex ) {
connectedTransport.swap( transport );
}
if( transport != NULL ) {
//std::cout << "Failover: Connection to has been unexpectedly terminated." << std::endl;
if( this->disposedListener != NULL ) {
transport->setTransportListener( disposedListener.get() );
}
// Hand off to the close task so it gets done in a different thread.
closeTask->add( transport );
synchronized( &reconnectMutex ) {
initialized = false;
uris->addURI( *connectedTransportURI );
connectedTransportURI.reset( NULL );
connected = false;
// Place the State Tracker into a reconnection state.
this->stateTracker.transportInterrupted();
// Notify before we attempt to reconnect so that the consumers have a chance
// to cleanup their state.
if( transportListener != NULL ) {
transportListener->transportInterrupted();
}
if( started ) {
taskRunner->wakeup();
}
}
}
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isPending() const {
bool result = false;
synchronized( &reconnectMutex ) {
if( this->connectedTransport == NULL && !closed && started ) {
int reconnectAttempts = 0;
if( firstConnection ) {
if( startupMaxReconnectAttempts != 0 ) {
reconnectAttempts = startupMaxReconnectAttempts;
}
}
if( reconnectAttempts == 0 ) {
reconnectAttempts = maxReconnectAttempts;
}
if( reconnectAttempts > 0 && connectFailures >= reconnectAttempts ) {
result = false;
} else {
result = true;
}
}
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::iterate() {
Pointer<Exception> failure;
synchronized( &reconnectMutex ) {
if( closed || connectionFailure != NULL ) {
reconnectMutex.notifyAll();
}
if( connectedTransport != NULL || closed || connectionFailure != NULL ) {
return false;
} else {
StlList<URI> failures;
Pointer<Transport> transport;
URI uri;
if( !useExponentialBackOff ) {
reconnectDelay = initialReconnectDelay;
}
if( backups->isEnabled() ) {
Pointer<BackupTransport> backupTransport = backups->getBackup();
if( backupTransport != NULL ) {
transport = backupTransport->getTransport();
uri = backupTransport->getUri();
transport->setTransportListener( myTransportListener.get() );
try {
if( started ) {
restoreTransport( transport );
}
} catch( Exception& e ) {
if( transport != NULL ) {
if( this->disposedListener != NULL ) {
transport->setTransportListener( disposedListener.get() );
}
// Hand off to the close task so it gets done in a different thread
// this prevents a deadlock from occurring if the Transport happens
// to call back through our onException method or locks in some other
// way.
closeTask->add( transport );
taskRunner->wakeup();
transport.reset( NULL );
}
this->uris->addURI( uri );
}
}
}
while( transport == NULL && !closed ) {
try{
uri = uris->getURI();
} catch( NoSuchElementException& ex ) {
break;
}
try {
transport = createTransport( uri );
transport->setTransportListener( myTransportListener.get() );
transport->start();
if( started ) {
restoreTransport( transport );
}
} catch( Exception& e ) {
e.setMark( __FILE__, __LINE__ );
if( transport != NULL ) {
if( this->disposedListener != NULL ) {
transport->setTransportListener( disposedListener.get() );
}
try{
transport->stop();
} catch(...) {}
// Hand off to the close task so it gets done in a different thread
// this prevents a deadlock from occurring if the Transport happens
// to call back through our onException method or locks in some other
// way.
closeTask->add( transport );
taskRunner->wakeup();
transport.reset( NULL );
}
failures.add( uri );
failure.reset( e.clone() );
}
}
// Return the failures to the pool, we will try again on the next iteration.
this->uris->addURIs( failures );
if( transport != NULL ) {
reconnectDelay = initialReconnectDelay;
connectedTransportURI.reset( new URI( uri ) );
connectedTransport = transport;
reconnectMutex.notifyAll();
connectFailures = 0;
connected = true;
// Make sure on initial startup, that the transportListener
// has been initialized for this instance.
synchronized( &listenerMutex ) {
if( transportListener == NULL ) {
// if it isn't set after 2secs - it probably never will be
listenerMutex.wait( 2000 );
}
}
if( transportListener != NULL ) {
transportListener->transportResumed();
}
if( firstConnection ) {
firstConnection = false;
}
return false;
}
}
int reconnectAttempts = 0;
if( firstConnection ) {
if( startupMaxReconnectAttempts != 0 ) {
reconnectAttempts = startupMaxReconnectAttempts;
}
}
if( reconnectAttempts == 0 ) {
reconnectAttempts = maxReconnectAttempts;
}
if( reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts ) {
connectionFailure = failure;
// Make sure on initial startup, that the transportListener has been initialized
// for this instance.
synchronized( &listenerMutex ) {
if( transportListener == NULL ) {
listenerMutex.wait( 2000 );
}
}
if( transportListener != NULL ) {
Pointer<IOException> ioException;
try{
ioException = connectionFailure.dynamicCast<IOException>();
}
AMQ_CATCH_NOTHROW( ClassCastException )
if( ioException != NULL ) {
transportListener->onException( *connectionFailure );
} else {
transportListener->onException( IOException( *connectionFailure ) );
}
}
reconnectMutex.notifyAll();
return false;
}
}
if( !closed ) {
synchronized( &sleepMutex ) {
sleepMutex.wait( (unsigned int)reconnectDelay );
}
if( useExponentialBackOff ) {
// Exponential increment of reconnect delay.
reconnectDelay *= backOffMultiplier;
if( reconnectDelay > maxReconnectDelay ) {
reconnectDelay = maxReconnectDelay;
}
}
}
return !closed;
}
////////////////////////////////////////////////////////////////////////////////
Pointer<Transport> FailoverTransport::createTransport( const URI& location ) const
throw ( decaf::io::IOException ) {
try{
TransportFactory* factory =
TransportRegistry::getInstance().findFactory( location.getScheme() );
if( factory == NULL ) {
throw new IOException(
__FILE__, __LINE__, "Invalid URI specified, no valid Factory Found.");
}
Pointer<Transport> transport( factory->createComposite( location ) );
return transport;
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
AMQ_CATCHALL_THROW( IOException )
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setConnectionInterruptProcessingComplete( const Pointer<commands::ConnectionId>& connectionId ) {
synchronized( &reconnectMutex ) {
stateTracker.connectionInterruptProcessingComplete( this, connectionId );
}
}