blob: b5ea33be194dd11f663297d463770bdab0eef446 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "FailoverTransport.h"
#include <activemq/commands/ConnectionControl.h>
#include <activemq/commands/ShutdownInfo.h>
#include <activemq/commands/RemoveInfo.h>
#include <activemq/transport/TransportRegistry.h>
#include <activemq/threads/DedicatedTaskRunner.h>
#include <activemq/threads/CompositeTaskRunner.h>
#include <activemq/transport/failover/BackupTransportPool.h>
#include <activemq/transport/failover/URIPool.h>
#include <activemq/transport/failover/FailoverTransportListener.h>
#include <activemq/transport/failover/CloseTransportsTask.h>
#include <activemq/transport/failover/URIPool.h>
#include <decaf/util/Random.h>
#include <decaf/util/StringTokenizer.h>
#include <decaf/util/LinkedList.h>
#include <decaf/util/StlMap.h>
#include <decaf/util/concurrent/TimeUnit.h>
#include <decaf/util/concurrent/Mutex.h>
#include <decaf/lang/System.h>
#include <decaf/lang/Integer.h>
using namespace std;
using namespace activemq;
using namespace activemq::state;
using namespace activemq::commands;
using namespace activemq::exceptions;
using namespace activemq::threads;
using namespace activemq::transport;
using namespace activemq::transport::failover;
using namespace decaf;
using namespace decaf::io;
using namespace decaf::net;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
namespace activemq {
namespace transport {
namespace failover {
class FailoverTransportImpl {
private:
FailoverTransportImpl(const FailoverTransportImpl&);
FailoverTransportImpl& operator= (const FailoverTransportImpl&);
static const int DEFAULT_INITIAL_RECONNECT_DELAY;
static const int INFINITE_WAIT;
public:
bool closed;
bool connected;
bool started;
long long timeout;
long long initialReconnectDelay;
long long maxReconnectDelay;
long long backOffMultiplier;
bool useExponentialBackOff;
bool initialized;
int maxReconnectAttempts;
int startupMaxReconnectAttempts;
int connectFailures;
long long reconnectDelay;
bool trackMessages;
bool trackTransactionProducers;
int maxCacheSize;
int maxPullCacheSize;
bool connectionInterruptProcessingComplete;
bool firstConnection;
bool updateURIsSupported;
bool reconnectSupported;
bool rebalanceUpdateURIs;
bool priorityBackup;
bool backupsEnabled;
volatile bool shutdown;
bool doRebalance;
bool connectedToPrioirty;
mutable Mutex reconnectMutex;
mutable Mutex sleepMutex;
mutable Mutex listenerMutex;
StlMap<int, Pointer<Command> > requestMap;
Pointer<URIPool> uris;
Pointer<URIPool> priorityUris;
Pointer<URIPool> updated;
Pointer<URI> connectedTransportURI;
Pointer<Transport> connectedTransport;
Pointer<Exception> connectionFailure;
Pointer<BackupTransportPool> backups;
Pointer<CloseTransportsTask> closeTask;
Pointer<CompositeTaskRunner> taskRunner;
Pointer<TransportListener> disposedListener;
Pointer<TransportListener> myTransportListener;
TransportListener* transportListener;
FailoverTransportImpl(FailoverTransport* parent) :
closed(false),
connected(false),
started(false),
timeout(INFINITE_WAIT),
initialReconnectDelay(DEFAULT_INITIAL_RECONNECT_DELAY),
maxReconnectDelay(1000*30),
backOffMultiplier(2),
useExponentialBackOff(true),
initialized(false),
maxReconnectAttempts(INFINITE_WAIT),
startupMaxReconnectAttempts(INFINITE_WAIT),
connectFailures(0),
reconnectDelay(DEFAULT_INITIAL_RECONNECT_DELAY),
trackMessages(false),
trackTransactionProducers(true),
maxCacheSize(128*1024),
maxPullCacheSize(10),
connectionInterruptProcessingComplete(false),
firstConnection(true),
updateURIsSupported(true),
reconnectSupported(true),
rebalanceUpdateURIs(true),
priorityBackup(false),
backupsEnabled(false),
shutdown(false),
doRebalance(false),
connectedToPrioirty(false),
reconnectMutex(),
sleepMutex(),
listenerMutex(),
requestMap(),
uris(new URIPool()),
priorityUris(new URIPool()),
updated(new URIPool()),
connectedTransportURI(),
connectedTransport(),
connectionFailure(),
backups(),
closeTask(new CloseTransportsTask()),
taskRunner(new CompositeTaskRunner()),
disposedListener(),
myTransportListener(new FailoverTransportListener(parent)),
transportListener(NULL) {
this->backups.reset(
new BackupTransportPool(parent, taskRunner, closeTask, uris, updated, priorityUris));
this->taskRunner->addTask(parent);
this->taskRunner->addTask(this->closeTask.get());
}
bool isPriority(const decaf::net::URI& uri) {
return priorityUris->contains(uri) || uris->isPriority(uri);
}
Pointer<URIPool> getConnectList() {
// Pick an appropriate URI pool, updated is always preferred if updates are
// enabled and we have any, otherwise we fallback to our original list so that
// we ensure we always try something.
Pointer<URIPool> uris = this->uris;
if (this->updateURIsSupported && !this->updated->isEmpty()) {
uris = this->updated;
}
return uris;
}
void doDelay() {
if (reconnectDelay > 0) {
synchronized (&sleepMutex) {
try {
sleepMutex.wait(reconnectDelay);
} catch (InterruptedException& e) {
Thread::currentThread()->interrupt();
}
}
}
if (useExponentialBackOff) {
// Exponential increment of reconnect delay.
reconnectDelay *= backOffMultiplier;
if (reconnectDelay > maxReconnectDelay) {
reconnectDelay = maxReconnectDelay;
}
}
}
int calculateReconnectAttemptLimit() const {
int maxReconnectValue = maxReconnectAttempts;
if (firstConnection && startupMaxReconnectAttempts != INFINITE_WAIT) {
maxReconnectValue = startupMaxReconnectAttempts;
}
return maxReconnectValue;
}
bool canReconnect() const {
return started && 0 != calculateReconnectAttemptLimit();
}
/**
* This must be called with the reconnect mutex locked.
*/
void propagateFailureToExceptionListener() {
if (this->transportListener != NULL) {
Pointer<IOException> ioException;
try {
ioException = this->connectionFailure.dynamicCast<IOException>();
}
AMQ_CATCH_NOTHROW(ClassCastException)
if (ioException != NULL) {
transportListener->onException(*this->connectionFailure);
} else {
transportListener->onException(IOException(*this->connectionFailure));
}
}
reconnectMutex.notifyAll();
}
void resetReconnectDelay() {
if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
reconnectDelay = initialReconnectDelay;
}
}
bool isClosedOrFailed() const {
return closed || connectionFailure != NULL;
}
bool isConnectionStateValid() const {
return connectedTransport != NULL && !doRebalance && !this->backups->isPriorityBackupAvailable();
}
void disconnect() {
Pointer<Transport> transport;
transport.swap(this->connectedTransport);
if (transport != NULL) {
if (this->disposedListener != NULL) {
transport->setTransportListener(this->disposedListener.get());
}
// Hand off to the close task so it gets done in a different thread.
this->closeTask->add(transport);
if (this->connectedTransportURI != NULL) {
this->uris->addURI(*this->connectedTransportURI);
this->connectedTransportURI.reset(NULL);
}
}
}
bool willReconnect() {
return firstConnection || 0 != calculateReconnectAttemptLimit();
}
};
const int FailoverTransportImpl::DEFAULT_INITIAL_RECONNECT_DELAY = 10;
const int FailoverTransportImpl::INFINITE_WAIT = -1;
}}}
////////////////////////////////////////////////////////////////////////////////
FailoverTransport::FailoverTransport() : stateTracker(), impl(NULL) {
this->impl = new FailoverTransportImpl(this);
this->stateTracker.setTrackTransactions(true);
}
////////////////////////////////////////////////////////////////////////////////
FailoverTransport::~FailoverTransport() {
try {
close();
}
AMQ_CATCHALL_NOTHROW()
try {
delete this->impl;
}
AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::add(bool rebalance, const std::string& uri) {
try {
if (this->impl->uris->addURI(URI(uri))) {
reconnect(rebalance);
}
}
AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::addURI(bool rebalance, const List<URI>& uris) {
bool newUri = false;
std::auto_ptr<Iterator<URI> > iter(uris.iterator());
while (iter->hasNext()) {
if (this->impl->uris->addURI(iter->next())) {
newUri = true;
}
}
if (newUri) {
reconnect(rebalance);
}
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::removeURI(bool rebalance, const List<URI>& uris) {
bool changed = false;
std::auto_ptr<Iterator<URI> > iter(uris.iterator());
synchronized( &this->impl->reconnectMutex ) {
while (iter->hasNext()) {
if (this->impl->uris->removeURI(iter->next())) {
changed = true;
}
}
}
if (changed) {
reconnect(rebalance);
}
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::reconnect(const decaf::net::URI& uri) {
try {
if (this->impl->uris->addURI(uri)) {
reconnect(true);
}
}
AMQ_CATCH_RETHROW(IOException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setTransportListener(TransportListener* listener) {
synchronized( &this->impl->listenerMutex ) {
this->impl->transportListener = listener;
this->impl->listenerMutex.notifyAll();
}
}
////////////////////////////////////////////////////////////////////////////////
TransportListener* FailoverTransport::getTransportListener() const {
synchronized( &this->impl->listenerMutex ) {
return this->impl->transportListener;
}
return NULL;
}
////////////////////////////////////////////////////////////////////////////////
std::string FailoverTransport::getRemoteAddress() const {
synchronized( &this->impl->reconnectMutex ) {
if (this->impl->connectedTransport != NULL) {
return this->impl->connectedTransport->getRemoteAddress();
}
}
return "";
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::oneway(const Pointer<Command> command) {
Pointer<Exception> error;
try {
synchronized(&this->impl->reconnectMutex) {
if (command != NULL && this->impl->connectedTransport == NULL) {
if (command->isShutdownInfo()) {
// Skipping send of ShutdownInfo command when not connected.
return;
}
if (command->isRemoveInfo() || command->isMessageAck()) {
// Simulate response to RemoveInfo command or Ack as they will be stale.
stateTracker.track(command);
if (command->isResponseRequired()) {
Pointer<Response> response(new Response());
response->setCorrelationId(command->getCommandId());
this->impl->myTransportListener->onCommand(response);
}
return;
} else if (command->isMessagePull()) {
// Simulate response to MessagePull if timed as we can't honor that now.
Pointer<MessagePull> pullRequest = command.dynamicCast<MessagePull>();
if (pullRequest->getTimeout() != 0) {
Pointer<MessageDispatch> dispatch(new MessageDispatch());
dispatch->setConsumerId(pullRequest->getConsumerId());
dispatch->setDestination(pullRequest->getDestination());
this->impl->myTransportListener->onCommand(dispatch);
}
return;
}
}
// Keep trying until the message is sent.
for (int i = 0; !this->impl->closed; i++) {
try {
// Wait for transport to be connected.
Pointer<Transport> transport = this->impl->connectedTransport;
long long start = System::currentTimeMillis();
bool timedout = false;
while (transport == NULL && !this->impl->closed &&
this->impl->connectionFailure == NULL && this->impl->willReconnect()) {
long long end = System::currentTimeMillis();
if (command->isMessage() && this->impl->timeout > 0 && (end - start > this->impl->timeout)) {
timedout = true;
break;
}
this->impl->reconnectMutex.wait(100);
transport = this->impl->connectedTransport;
}
if (transport == NULL) {
// Previous loop may have exited due to us being disposed.
if (this->impl->closed) {
error.reset(new IOException(__FILE__, __LINE__, "Transport disposed."));
} else if (this->impl->connectionFailure != NULL) {
error = this->impl->connectionFailure;
} else if (timedout == true) {
error.reset(new IOException(__FILE__, __LINE__,
"Failover timeout of %d ms reached.", this->impl->timeout));
} else if (!this->impl->willReconnect()) {
error.reset(new IOException(__FILE__, __LINE__,
"Maximum reconnection attempts exceeded"));
} else {
error.reset(new IOException(__FILE__, __LINE__, "Unexpected failure."));
}
break;
}
// If it was a request and it was not being tracked by the state
// tracker, then hold it in the requestMap so that we can replay
// it later.
Pointer<Tracked> tracked;
try {
tracked = stateTracker.track(command);
synchronized(&this->impl->requestMap) {
if (tracked != NULL && tracked->isWaitingForResponse()) {
this->impl->requestMap.put(command->getCommandId(), tracked);
} else if (tracked == NULL && command->isResponseRequired()) {
this->impl->requestMap.put(command->getCommandId(), command);
}
}
} catch (Exception& ex) {
ex.setMark(__FILE__, __LINE__);
error.reset(ex.clone());
break;
}
// Send the message.
try {
transport->oneway(command);
stateTracker.trackBack(command);
if (command->isShutdownInfo()) {
this->impl->shutdown = true;
}
} catch (IOException& e) {
e.setMark(__FILE__, __LINE__);
// If the command was not tracked.. we will retry in this method
if (tracked == NULL && this->impl->canReconnect()) {
// since we will retry in this method.. take it out of the
// request map so that it is not sent 2 times on recovery
if (command->isResponseRequired()) {
this->impl->requestMap.remove(command->getCommandId());
}
// re-throw the exception so it will handled by the outer catch
throw;
} else {
// Trigger the reconnect since we can't count on inactivity or
// other socket events to trip the failover condition.
handleTransportFailure(e);
}
}
return;
} catch (IOException& e) {
e.setMark(__FILE__, __LINE__);
handleTransportFailure(e);
}
}
}
} catch (InterruptedException& ex) {
Thread::currentThread()->interrupt();
throw InterruptedIOException(__FILE__, __LINE__, "FailoverTransport oneway() interrupted");
}
AMQ_CATCHALL_NOTHROW()
if (!this->impl->closed) {
if (error != NULL) {
throw IOException(*error);
}
}
}
////////////////////////////////////////////////////////////////////////////////
Pointer<FutureResponse> FailoverTransport::asyncRequest(const Pointer<Command> command AMQCPP_UNUSED,
const Pointer<ResponseCallback> responseCallback AMQCPP_UNUSED) {
throw decaf::lang::exceptions::UnsupportedOperationException(__FILE__, __LINE__, "FailoverTransport::asyncRequest - Not Supported");
}
////////////////////////////////////////////////////////////////////////////////
Pointer<Response> FailoverTransport::request(const Pointer<Command> command AMQCPP_UNUSED) {
throw decaf::lang::exceptions::UnsupportedOperationException(__FILE__, __LINE__, "FailoverTransport::request - Not Supported");
}
////////////////////////////////////////////////////////////////////////////////
Pointer<Response> FailoverTransport::request(const Pointer<Command> command AMQCPP_UNUSED, unsigned int timeout AMQCPP_UNUSED) {
throw decaf::lang::exceptions::UnsupportedOperationException(__FILE__, __LINE__, "FailoverTransport::request - Not Supported");
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::start() {
try {
synchronized(&this->impl->reconnectMutex) {
if (this->impl->started) {
return;
}
this->impl->started = true;
if (this->impl->backupsEnabled || this->impl->priorityBackup) {
this->impl->backups->setEnabled(true);
}
this->impl->taskRunner->start();
stateTracker.setMaxMessageCacheSize(this->getMaxCacheSize());
stateTracker.setMaxMessagePullCacheSize(this->getMaxPullCacheSize());
stateTracker.setTrackMessages(this->isTrackMessages());
stateTracker.setTrackTransactionProducers(this->isTrackTransactionProducers());
if (this->impl->connectedTransport != NULL) {
stateTracker.restore(this->impl->connectedTransport);
} else {
reconnect(false);
}
}
}
AMQ_CATCH_RETHROW(IOException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::stop() {
try {
synchronized(&this->impl->reconnectMutex) {
this->impl->started = false;
this->impl->backups->setEnabled(false);
}
}
AMQ_CATCH_RETHROW(IOException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::close() {
try {
Pointer<Transport> transportToStop;
synchronized(&this->impl->reconnectMutex) {
if (this->impl->closed) {
return;
}
this->impl->started = false;
this->impl->closed = true;
this->impl->connected = false;
this->impl->backups->setEnabled(false);
this->impl->requestMap.clear();
if (this->impl->connectedTransport != NULL) {
transportToStop.swap(this->impl->connectedTransport);
}
this->impl->reconnectMutex.notifyAll();
}
this->impl->backups->close();
synchronized( &this->impl->sleepMutex ) {
this->impl->sleepMutex.notifyAll();
}
this->impl->taskRunner->shutdown(TimeUnit::MINUTES.toMillis(5));
if (transportToStop != NULL) {
transportToStop->close();
}
}
AMQ_CATCH_RETHROW(IOException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::reconnect(bool rebalance) {
Pointer<Transport> transport;
synchronized( &this->impl->reconnectMutex ) {
if (this->impl->started) {
if (rebalance) {
this->impl->doRebalance = true;
}
try {
this->impl->taskRunner->wakeup();
} catch (InterruptedException& ex) {
Thread::currentThread()->interrupt();
}
}
}
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::restoreTransport(const Pointer<Transport> transport) {
try {
transport->start();
//send information to the broker - informing it we are an ft client
Pointer<ConnectionControl> cc(new ConnectionControl());
cc->setFaultTolerant(true);
transport->oneway(cc);
stateTracker.restore(transport);
decaf::util::StlMap<int, Pointer<Command> > commands;
synchronized(&this->impl->requestMap) {
commands.copy(this->impl->requestMap);
}
Pointer<Iterator<Pointer<Command> > > iter(commands.values().iterator());
while (iter->hasNext()) {
transport->oneway(iter->next());
}
}
AMQ_CATCH_RETHROW(IOException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::handleTransportFailure(const decaf::lang::Exception& error) {
if (this->impl->shutdown) {
// shutdown info sent and remote socket closed and we see that before a local close
// let the close do the work
return;
}
synchronized(&this->impl->reconnectMutex) {
if (this->impl->shutdown) {
return;
}
Pointer<Transport> transport;
this->impl->connectedTransport.swap(transport);
if (transport != NULL) {
if (this->impl->disposedListener != NULL) {
transport->setTransportListener(this->impl->disposedListener.get());
}
// Hand off to the close task so it gets done in a different thread.
this->impl->closeTask->add(transport);
bool reconnectOk = this->impl->canReconnect();
URI failedUri = *this->impl->connectedTransportURI;
this->impl->initialized = false;
this->impl->uris->addURI(failedUri);
this->impl->connectedTransportURI.reset(NULL);
this->impl->connected = false;
this->impl->connectedToPrioirty = false;
// Place the State Tracker into a reconnection state.
this->stateTracker.transportInterrupted();
// Notify before we attempt to reconnect so that the consumers have a chance
// to cleanup their state.
if (reconnectOk) {
if (this->impl->transportListener != NULL) {
this->impl->transportListener->transportInterrupted();
}
this->impl->updated->removeURI(failedUri);
this->impl->taskRunner->wakeup();
} else if (!this->impl->closed) {
this->impl->connectionFailure.reset(error.clone());
this->impl->propagateFailureToExceptionListener();
}
}
}
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::handleConnectionControl(const Pointer<Command> control) {
try {
Pointer<ConnectionControl> ctrlCommand = control.dynamicCast<ConnectionControl>();
std::string reconnectStr = ctrlCommand->getReconnectTo();
if (!reconnectStr.empty()) {
std::remove(reconnectStr.begin(), reconnectStr.end(), ' ');
if (reconnectStr.length() > 0) {
try {
if (isReconnectSupported()) {
reconnect(URI(reconnectStr));
}
} catch (Exception& e) {
}
}
}
processNewTransports(ctrlCommand->isRebalanceConnection(), ctrlCommand->getConnectedBrokers());
}
AMQ_CATCH_RETHROW(Exception)
AMQ_CATCHALL_THROW(Exception)
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::processNewTransports(bool rebalance, std::string newTransports) {
if (!newTransports.empty()) {
std::remove(newTransports.begin(), newTransports.end(), ' ');
if (newTransports.length() > 0 && isUpdateURIsSupported()) {
LinkedList<URI> list;
StringTokenizer tokenizer(newTransports, ",");
while (tokenizer.hasMoreTokens()) {
std::string str = tokenizer.nextToken();
try {
URI uri(str);
list.add(uri);
} catch (Exception& e) {
}
}
if (!list.isEmpty()) {
try {
updateURIs(rebalance, list);
} catch (IOException& e) {
}
}
}
}
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::updateURIs(bool rebalance, const decaf::util::List<decaf::net::URI>& updatedURIs) {
if (isUpdateURIsSupported()) {
Pointer<URIPool> copy(new URIPool(*this->impl->updated));
this->impl->updated->clear();
if (!updatedURIs.isEmpty()) {
StlSet<URI> set;
for (int i = 0; i < updatedURIs.size(); i++) {
set.add(updatedURIs.get(i));
}
Pointer<Iterator<URI> > setIter(set.iterator());
while (setIter->hasNext()) {
URI value = setIter->next();
this->impl->updated->addURI(value);
}
if (!(copy->isEmpty() && this->impl->updated->isEmpty()) &&
!(copy->equals(*this->impl->updated))) {
synchronized(&this->impl->reconnectMutex) {
reconnect(rebalance);
}
}
}
}
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isPending() const {
bool result = false;
synchronized(&this->impl->reconnectMutex) {
if (!this->impl->isConnectionStateValid() && this->impl->started && !this->impl->isClosedOrFailed()) {
int maxReconnectAttempts = this->impl->calculateReconnectAttemptLimit();
if (maxReconnectAttempts != -1 && this->impl->connectFailures >= maxReconnectAttempts) {
result = false;
} else {
result = true;
}
}
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::iterate() {
Pointer<Exception> failure;
synchronized( &this->impl->reconnectMutex ) {
if (this->impl->isClosedOrFailed()) {
this->impl->reconnectMutex.notifyAll();
}
if (this->impl->isConnectionStateValid() || this->impl->isClosedOrFailed()) {
return false;
} else {
Pointer<URIPool> connectList = this->impl->getConnectList();
if (connectList->isEmpty()) {
failure.reset(new IOException(__FILE__, __LINE__, "No URIs available for reconnect."));
} else {
if (this->impl->doRebalance) {
if (this->impl->connectedToPrioirty || connectList->getPriorityURI().equals(*this->impl->connectedTransportURI)) {
// already connected to first in the list, no need to rebalance
this->impl->doRebalance = false;
return false;
} else {
// break any existing connect for rebalance.
this->impl->disconnect();
}
this->impl->doRebalance = false;
}
this->impl->resetReconnectDelay();
LinkedList<URI> failures;
Pointer<Transport> transport;
URI uri;
if (this->impl->backups->isEnabled()) {
Pointer<BackupTransport> backupTransport = this->impl->backups->getBackup();
if (backupTransport != NULL) {
transport = backupTransport->getTransport();
uri = backupTransport->getUri();
if (this->impl->priorityBackup && this->impl->backups->isPriorityBackupAvailable()) {
// A priority connection is available and we aren't connected to
// any other priority transports so disconnect and use the backup.
this->impl->disconnect();
}
}
}
// Sleep for the reconnectDelay if there's no backup and we aren't trying
// for the first time, or we were disposed for some reason.
if (transport == NULL && !this->impl->firstConnection &&
(this->impl->reconnectDelay > 0) && !this->impl->closed) {
synchronized (&this->impl->sleepMutex) {
try {
this->impl->sleepMutex.wait(this->impl->reconnectDelay);
} catch (InterruptedException& e) {
Thread::currentThread()->interrupt();
}
}
}
while (transport == NULL && this->impl->connectedTransport == NULL && !this->impl->closed) {
try {
// We could be starting the loop with a backup already.
if (transport == NULL) {
try {
uri = connectList->getURI();
} catch (NoSuchElementException& ex) {
break;
}
transport = createTransport(uri);
}
transport->setTransportListener(this->impl->myTransportListener.get());
transport->start();
if (this->impl->started && !this->impl->firstConnection) {
restoreTransport(transport);
}
this->impl->reconnectDelay = this->impl->initialReconnectDelay;
this->impl->connectedTransportURI.reset(new URI(uri));
this->impl->connectedTransport = transport;
this->impl->reconnectMutex.notifyAll();
this->impl->connectFailures = 0;
if (isPriorityBackup()) {
this->impl->connectedToPrioirty = connectList->getPriorityURI().equals(uri) ||
this->impl->priorityUris->contains(uri);
} else {
this->impl->connectedToPrioirty = false;
}
// Make sure on initial startup, that the transportListener
// has been initialized for this instance.
synchronized(&this->impl->listenerMutex) {
if (this->impl->transportListener == NULL) {
// if it isn't set after 2secs - it probably never will be
this->impl->listenerMutex.wait(2000);
}
}
if (this->impl->transportListener != NULL) {
this->impl->transportListener->transportResumed();
}
if (this->impl->firstConnection) {
this->impl->firstConnection = false;
}
// Return the failures to the pool, we will try again on the next iteration.
connectList->addURIs(failures);
this->impl->connected = true;
return false;
} catch (Exception& e) {
e.setMark(__FILE__, __LINE__);
if (transport != NULL) {
if (this->impl->disposedListener != NULL) {
transport->setTransportListener(this->impl->disposedListener.get());
}
try {
transport->stop();
} catch (...) {
}
// Hand off to the close task so it gets done in a different thread
// this prevents a deadlock from occurring if the Transport happens
// to call back through our onException method or locks in some other
// way.
this->impl->closeTask->add(transport);
this->impl->taskRunner->wakeup();
transport.reset(NULL);
}
failures.add(uri);
failure.reset(e.clone());
}
}
// Return the failures to the pool, we will try again on the next iteration.
connectList->addURIs(failures);
}
}
int reconnectAttempts = this->impl->calculateReconnectAttemptLimit();
if (reconnectAttempts >= 0 && ++this->impl->connectFailures >= reconnectAttempts) {
this->impl->connectionFailure = failure;
// Make sure on initial startup, that the transportListener has been initialized
// for this instance.
synchronized(&this->impl->listenerMutex) {
if (this->impl->transportListener == NULL) {
this->impl->listenerMutex.wait(2000);
}
}
this->impl->propagateFailureToExceptionListener();
return false;
}
}
if (!this->impl->closed) {
this->impl->doDelay();
}
return !this->impl->closed;
}
////////////////////////////////////////////////////////////////////////////////
Pointer<Transport> FailoverTransport::createTransport(const URI& location) const {
try {
TransportFactory* factory = TransportRegistry::getInstance().findFactory(location.getScheme());
if (factory == NULL) {
throw new IOException(__FILE__, __LINE__, "Invalid URI specified, no valid Factory Found.");
}
Pointer<Transport> transport(factory->createComposite(location));
return transport;
}
AMQ_CATCH_RETHROW(IOException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setConnectionInterruptProcessingComplete(const Pointer<commands::ConnectionId> connectionId) {
synchronized(&this->impl->reconnectMutex) {
stateTracker.connectionInterruptProcessingComplete(this, connectionId);
}
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isConnected() const {
return this->impl->connected;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isClosed() const {
return this->impl->closed;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isInitialized() const {
return this->impl->initialized;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setInitialized(bool value) {
this->impl->initialized = value;
}
////////////////////////////////////////////////////////////////////////////////
Transport* FailoverTransport::narrow(const std::type_info& typeId) {
if (typeid( *this ) == typeId) {
return this;
}
if (this->impl->connectedTransport != NULL) {
return this->impl->connectedTransport->narrow(typeId);
}
return NULL;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::processResponse(const Pointer<Response> response) {
Pointer<Command> object;
synchronized(&(this->impl->requestMap)) {
try {
object = this->impl->requestMap.remove(response->getCorrelationId());
} catch (NoSuchElementException& ex) {
// Not tracking this request in our map, not an error.
}
}
if (object != NULL) {
try {
Pointer<Tracked> tracked = object.dynamicCast<Tracked>();
tracked->onResponse();
}
AMQ_CATCH_NOTHROW(ClassCastException)
}
}
////////////////////////////////////////////////////////////////////////////////
Pointer<wireformat::WireFormat> FailoverTransport::getWireFormat() const {
Pointer<wireformat::WireFormat> result;
Pointer<Transport> transport = this->impl->connectedTransport;
if (transport != NULL) {
result = transport->getWireFormat();
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
long long FailoverTransport::getTimeout() const {
return this->impl->timeout;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setTimeout(long long value) {
this->impl->timeout = value;
}
////////////////////////////////////////////////////////////////////////////////
long long FailoverTransport::getInitialReconnectDelay() const {
return this->impl->initialReconnectDelay;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setInitialReconnectDelay(long long value) {
this->impl->initialReconnectDelay = value;
}
////////////////////////////////////////////////////////////////////////////////
long long FailoverTransport::getMaxReconnectDelay() const {
return this->impl->maxReconnectDelay;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setMaxReconnectDelay(long long value) {
this->impl->maxReconnectDelay = value;
}
////////////////////////////////////////////////////////////////////////////////
long long FailoverTransport::getBackOffMultiplier() const {
return this->impl->backOffMultiplier;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setBackOffMultiplier(long long value) {
this->impl->backOffMultiplier = value;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isUseExponentialBackOff() const {
return this->impl->useExponentialBackOff;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setUseExponentialBackOff(bool value) {
this->impl->useExponentialBackOff = value;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isRandomize() const {
return this->impl->uris->isRandomize();
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setRandomize(bool value) {
this->impl->uris->setRandomize(value);
}
////////////////////////////////////////////////////////////////////////////////
int FailoverTransport::getMaxReconnectAttempts() const {
return this->impl->maxReconnectAttempts;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setMaxReconnectAttempts(int value) {
this->impl->maxReconnectAttempts = value;
}
////////////////////////////////////////////////////////////////////////////////
int FailoverTransport::getStartupMaxReconnectAttempts() const {
return this->impl->startupMaxReconnectAttempts;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setStartupMaxReconnectAttempts(int value) {
this->impl->startupMaxReconnectAttempts = value;
}
////////////////////////////////////////////////////////////////////////////////
long long FailoverTransport::getReconnectDelay() const {
return this->impl->reconnectDelay;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setReconnectDelay(long long value) {
this->impl->reconnectDelay = value;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isBackup() const {
return this->impl->backupsEnabled;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setBackup(bool value) {
this->impl->backupsEnabled = value;
}
////////////////////////////////////////////////////////////////////////////////
int FailoverTransport::getBackupPoolSize() const {
return this->impl->backups->getBackupPoolSize();
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setBackupPoolSize(int value) {
this->impl->backups->setBackupPoolSize(value);
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isTrackMessages() const {
return this->impl->trackMessages;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setTrackMessages(bool value) {
this->impl->trackMessages = value;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isTrackTransactionProducers() const {
return this->impl->trackTransactionProducers;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setTrackTransactionProducers(bool value) {
this->impl->trackTransactionProducers = value;
}
////////////////////////////////////////////////////////////////////////////////
int FailoverTransport::getMaxCacheSize() const {
return this->impl->maxCacheSize;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setMaxCacheSize(int value) {
this->impl->maxCacheSize = value;
}
////////////////////////////////////////////////////////////////////////////////
int FailoverTransport::getMaxPullCacheSize() const {
return this->impl->maxPullCacheSize;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setMaxPullCacheSize(int value) {
this->impl->maxPullCacheSize = value;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isReconnectSupported() const {
return this->impl->reconnectSupported;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setReconnectSupported(bool value) {
this->impl->reconnectSupported = value;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isUpdateURIsSupported() const {
return this->impl->updateURIsSupported;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setUpdateURIsSupported(bool value) {
this->impl->updateURIsSupported = value;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isRebalanceUpdateURIs() const {
return this->impl->rebalanceUpdateURIs;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setRebalanceUpdateURIs(bool rebalanceUpdateURIs) {
this->impl->rebalanceUpdateURIs = rebalanceUpdateURIs;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isPriorityBackup() const {
return this->impl->priorityBackup;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setPriorityBackup(bool priorityBackup) {
this->impl->priorityBackup = priorityBackup;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isConnectedToPriority() const {
return this->impl->connectedToPrioirty;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setPriorityURIs(const std::string& priorityURIs AMQCPP_UNUSED) {
StringTokenizer tokenizer(priorityURIs, ",");
while (tokenizer.hasMoreTokens()) {
std::string str = tokenizer.nextToken();
try {
this->impl->priorityUris->addURI(URI(str));
} catch (Exception& e) {
}
}
}
////////////////////////////////////////////////////////////////////////////////
const List<URI>& FailoverTransport::getPriorityURIs() const {
return this->impl->priorityUris->getURIList();
}