| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "FailoverTransport.h" |
| |
| #include <activemq/commands/ConnectionControl.h> |
| #include <activemq/commands/ShutdownInfo.h> |
| #include <activemq/commands/RemoveInfo.h> |
| #include <activemq/transport/TransportRegistry.h> |
| #include <activemq/threads/DedicatedTaskRunner.h> |
| #include <activemq/threads/CompositeTaskRunner.h> |
| #include <activemq/transport/failover/BackupTransportPool.h> |
| #include <activemq/transport/failover/URIPool.h> |
| #include <activemq/transport/failover/FailoverTransportListener.h> |
| #include <activemq/transport/failover/CloseTransportsTask.h> |
| #include <activemq/transport/failover/URIPool.h> |
| #include <decaf/util/Random.h> |
| #include <decaf/util/StringTokenizer.h> |
| #include <decaf/util/LinkedList.h> |
| #include <decaf/util/StlMap.h> |
| #include <decaf/util/concurrent/TimeUnit.h> |
| #include <decaf/util/concurrent/Mutex.h> |
| #include <decaf/lang/System.h> |
| #include <decaf/lang/Integer.h> |
| |
| using namespace std; |
| using namespace activemq; |
| using namespace activemq::state; |
| using namespace activemq::commands; |
| using namespace activemq::exceptions; |
| using namespace activemq::threads; |
| using namespace activemq::transport; |
| using namespace activemq::transport::failover; |
| using namespace decaf; |
| using namespace decaf::io; |
| using namespace decaf::net; |
| using namespace decaf::util; |
| using namespace decaf::util::concurrent; |
| using namespace decaf::lang; |
| using namespace decaf::lang::exceptions; |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| namespace activemq { |
| namespace transport { |
| namespace failover { |
| |
| class FailoverTransportImpl { |
| private: |
| |
| FailoverTransportImpl(const FailoverTransportImpl&); |
| FailoverTransportImpl& operator= (const FailoverTransportImpl&); |
| |
| static const int DEFAULT_INITIAL_RECONNECT_DELAY; |
| static const int INFINITE_WAIT; |
| |
| public: |
| |
| bool closed; |
| bool connected; |
| bool started; |
| |
| long long timeout; |
| long long initialReconnectDelay; |
| long long maxReconnectDelay; |
| long long backOffMultiplier; |
| bool useExponentialBackOff; |
| bool initialized; |
| int maxReconnectAttempts; |
| int startupMaxReconnectAttempts; |
| int connectFailures; |
| long long reconnectDelay; |
| bool trackMessages; |
| bool trackTransactionProducers; |
| int maxCacheSize; |
| int maxPullCacheSize; |
| bool connectionInterruptProcessingComplete; |
| bool firstConnection; |
| bool updateURIsSupported; |
| bool reconnectSupported; |
| bool rebalanceUpdateURIs; |
| bool priorityBackup; |
| bool backupsEnabled; |
| volatile bool shutdown; |
| |
| bool doRebalance; |
| bool connectedToPrioirty; |
| |
| mutable Mutex reconnectMutex; |
| mutable Mutex sleepMutex; |
| mutable Mutex listenerMutex; |
| |
| StlMap<int, Pointer<Command> > requestMap; |
| |
| Pointer<URIPool> uris; |
| Pointer<URIPool> priorityUris; |
| Pointer<URIPool> updated; |
| Pointer<URI> connectedTransportURI; |
| Pointer<Transport> connectedTransport; |
| Pointer<Exception> connectionFailure; |
| Pointer<BackupTransportPool> backups; |
| Pointer<CloseTransportsTask> closeTask; |
| Pointer<CompositeTaskRunner> taskRunner; |
| Pointer<TransportListener> disposedListener; |
| Pointer<TransportListener> myTransportListener; |
| |
| TransportListener* transportListener; |
| |
| FailoverTransportImpl(FailoverTransport* parent) : |
| closed(false), |
| connected(false), |
| started(false), |
| timeout(INFINITE_WAIT), |
| initialReconnectDelay(DEFAULT_INITIAL_RECONNECT_DELAY), |
| maxReconnectDelay(1000*30), |
| backOffMultiplier(2), |
| useExponentialBackOff(true), |
| initialized(false), |
| maxReconnectAttempts(INFINITE_WAIT), |
| startupMaxReconnectAttempts(INFINITE_WAIT), |
| connectFailures(0), |
| reconnectDelay(DEFAULT_INITIAL_RECONNECT_DELAY), |
| trackMessages(false), |
| trackTransactionProducers(true), |
| maxCacheSize(128*1024), |
| maxPullCacheSize(10), |
| connectionInterruptProcessingComplete(false), |
| firstConnection(true), |
| updateURIsSupported(true), |
| reconnectSupported(true), |
| rebalanceUpdateURIs(true), |
| priorityBackup(false), |
| backupsEnabled(false), |
| shutdown(false), |
| doRebalance(false), |
| connectedToPrioirty(false), |
| reconnectMutex(), |
| sleepMutex(), |
| listenerMutex(), |
| requestMap(), |
| uris(new URIPool()), |
| priorityUris(new URIPool()), |
| updated(new URIPool()), |
| connectedTransportURI(), |
| connectedTransport(), |
| connectionFailure(), |
| backups(), |
| closeTask(new CloseTransportsTask()), |
| taskRunner(new CompositeTaskRunner()), |
| disposedListener(), |
| myTransportListener(new FailoverTransportListener(parent)), |
| transportListener(NULL) { |
| |
| this->backups.reset( |
| new BackupTransportPool(parent, taskRunner, closeTask, uris, updated, priorityUris)); |
| |
| this->taskRunner->addTask(parent); |
| this->taskRunner->addTask(this->closeTask.get()); |
| } |
| |
| bool isPriority(const decaf::net::URI& uri) { |
| return priorityUris->contains(uri) || uris->isPriority(uri); |
| } |
| |
| Pointer<URIPool> getConnectList() { |
| // Pick an appropriate URI pool, updated is always preferred if updates are |
| // enabled and we have any, otherwise we fallback to our original list so that |
| // we ensure we always try something. |
| Pointer<URIPool> uris = this->uris; |
| if (this->updateURIsSupported && !this->updated->isEmpty()) { |
| uris = this->updated; |
| } |
| return uris; |
| } |
| |
| void doDelay() { |
| if (reconnectDelay > 0) { |
| synchronized (&sleepMutex) { |
| try { |
| sleepMutex.wait(reconnectDelay); |
| } catch (InterruptedException& e) { |
| Thread::currentThread()->interrupt(); |
| } |
| } |
| } |
| |
| if (useExponentialBackOff) { |
| // Exponential increment of reconnect delay. |
| reconnectDelay *= backOffMultiplier; |
| if (reconnectDelay > maxReconnectDelay) { |
| reconnectDelay = maxReconnectDelay; |
| } |
| } |
| } |
| |
| int calculateReconnectAttemptLimit() const { |
| int maxReconnectValue = maxReconnectAttempts; |
| if (firstConnection && startupMaxReconnectAttempts != INFINITE_WAIT) { |
| maxReconnectValue = startupMaxReconnectAttempts; |
| } |
| return maxReconnectValue; |
| } |
| |
| bool canReconnect() const { |
| return started && 0 != calculateReconnectAttemptLimit(); |
| } |
| |
| /** |
| * This must be called with the reconnect mutex locked. |
| */ |
| void propagateFailureToExceptionListener() { |
| if (this->transportListener != NULL) { |
| |
| Pointer<IOException> ioException; |
| try { |
| ioException = this->connectionFailure.dynamicCast<IOException>(); |
| } |
| AMQ_CATCH_NOTHROW(ClassCastException) |
| |
| if (ioException != NULL) { |
| transportListener->onException(*this->connectionFailure); |
| } else { |
| transportListener->onException(IOException(*this->connectionFailure)); |
| } |
| } |
| |
| reconnectMutex.notifyAll(); |
| } |
| |
| void resetReconnectDelay() { |
| if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) { |
| reconnectDelay = initialReconnectDelay; |
| } |
| } |
| |
| bool isClosedOrFailed() const { |
| return closed || connectionFailure != NULL; |
| } |
| |
| bool isConnectionStateValid() const { |
| return connectedTransport != NULL && !doRebalance && !this->backups->isPriorityBackupAvailable(); |
| } |
| |
| void disconnect() { |
| Pointer<Transport> transport; |
| transport.swap(this->connectedTransport); |
| |
| if (transport != NULL) { |
| |
| if (this->disposedListener != NULL) { |
| transport->setTransportListener(this->disposedListener.get()); |
| } |
| |
| // Hand off to the close task so it gets done in a different thread. |
| this->closeTask->add(transport); |
| |
| if (this->connectedTransportURI != NULL) { |
| this->uris->addURI(*this->connectedTransportURI); |
| this->connectedTransportURI.reset(NULL); |
| } |
| } |
| } |
| |
| bool willReconnect() { |
| return firstConnection || 0 != calculateReconnectAttemptLimit(); |
| } |
| }; |
| |
| const int FailoverTransportImpl::DEFAULT_INITIAL_RECONNECT_DELAY = 10; |
| const int FailoverTransportImpl::INFINITE_WAIT = -1; |
| |
| }}} |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| FailoverTransport::FailoverTransport() : stateTracker(), impl(NULL) { |
| this->impl = new FailoverTransportImpl(this); |
| this->stateTracker.setTrackTransactions(true); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| FailoverTransport::~FailoverTransport() { |
| try { |
| close(); |
| } |
| AMQ_CATCHALL_NOTHROW() |
| |
| try { |
| delete this->impl; |
| } |
| AMQ_CATCHALL_NOTHROW() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::add(bool rebalance, const std::string& uri) { |
| |
| try { |
| if (this->impl->uris->addURI(URI(uri))) { |
| reconnect(rebalance); |
| } |
| } |
| AMQ_CATCHALL_NOTHROW() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::addURI(bool rebalance, const List<URI>& uris) { |
| |
| bool newUri = false; |
| |
| std::auto_ptr<Iterator<URI> > iter(uris.iterator()); |
| while (iter->hasNext()) { |
| if (this->impl->uris->addURI(iter->next())) { |
| newUri = true; |
| } |
| } |
| |
| if (newUri) { |
| reconnect(rebalance); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::removeURI(bool rebalance, const List<URI>& uris) { |
| |
| bool changed = false; |
| |
| std::auto_ptr<Iterator<URI> > iter(uris.iterator()); |
| synchronized( &this->impl->reconnectMutex ) { |
| while (iter->hasNext()) { |
| if (this->impl->uris->removeURI(iter->next())) { |
| changed = true; |
| } |
| } |
| } |
| |
| if (changed) { |
| reconnect(rebalance); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::reconnect(const decaf::net::URI& uri) { |
| |
| try { |
| if (this->impl->uris->addURI(uri)) { |
| reconnect(true); |
| } |
| } |
| AMQ_CATCH_RETHROW(IOException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) |
| AMQ_CATCHALL_THROW(IOException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setTransportListener(TransportListener* listener) { |
| synchronized( &this->impl->listenerMutex ) { |
| this->impl->transportListener = listener; |
| this->impl->listenerMutex.notifyAll(); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| TransportListener* FailoverTransport::getTransportListener() const { |
| synchronized( &this->impl->listenerMutex ) { |
| return this->impl->transportListener; |
| } |
| |
| return NULL; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| std::string FailoverTransport::getRemoteAddress() const { |
| synchronized( &this->impl->reconnectMutex ) { |
| if (this->impl->connectedTransport != NULL) { |
| return this->impl->connectedTransport->getRemoteAddress(); |
| } |
| } |
| return ""; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::oneway(const Pointer<Command> command) { |
| |
| Pointer<Exception> error; |
| |
| try { |
| |
| synchronized(&this->impl->reconnectMutex) { |
| |
| if (command != NULL && this->impl->connectedTransport == NULL) { |
| |
| if (command->isShutdownInfo()) { |
| // Skipping send of ShutdownInfo command when not connected. |
| return; |
| } |
| |
| if (command->isRemoveInfo() || command->isMessageAck()) { |
| // Simulate response to RemoveInfo command or Ack as they will be stale. |
| stateTracker.track(command); |
| |
| if (command->isResponseRequired()) { |
| Pointer<Response> response(new Response()); |
| response->setCorrelationId(command->getCommandId()); |
| this->impl->myTransportListener->onCommand(response); |
| } |
| |
| return; |
| } else if (command->isMessagePull()) { |
| // Simulate response to MessagePull if timed as we can't honor that now. |
| Pointer<MessagePull> pullRequest = command.dynamicCast<MessagePull>(); |
| if (pullRequest->getTimeout() != 0) { |
| Pointer<MessageDispatch> dispatch(new MessageDispatch()); |
| dispatch->setConsumerId(pullRequest->getConsumerId()); |
| dispatch->setDestination(pullRequest->getDestination()); |
| this->impl->myTransportListener->onCommand(dispatch); |
| } |
| |
| return; |
| } |
| } |
| |
| // Keep trying until the message is sent. |
| for (int i = 0; !this->impl->closed; i++) { |
| try { |
| |
| // Wait for transport to be connected. |
| Pointer<Transport> transport = this->impl->connectedTransport; |
| long long start = System::currentTimeMillis(); |
| bool timedout = false; |
| |
| while (transport == NULL && !this->impl->closed && |
| this->impl->connectionFailure == NULL && this->impl->willReconnect()) { |
| |
| long long end = System::currentTimeMillis(); |
| if (command->isMessage() && this->impl->timeout > 0 && (end - start > this->impl->timeout)) { |
| timedout = true; |
| break; |
| } |
| |
| this->impl->reconnectMutex.wait(100); |
| transport = this->impl->connectedTransport; |
| } |
| |
| if (transport == NULL) { |
| // Previous loop may have exited due to us being disposed. |
| if (this->impl->closed) { |
| error.reset(new IOException(__FILE__, __LINE__, "Transport disposed.")); |
| } else if (this->impl->connectionFailure != NULL) { |
| error = this->impl->connectionFailure; |
| } else if (timedout == true) { |
| error.reset(new IOException(__FILE__, __LINE__, |
| "Failover timeout of %d ms reached.", this->impl->timeout)); |
| } else if (!this->impl->willReconnect()) { |
| error.reset(new IOException(__FILE__, __LINE__, |
| "Maximum reconnection attempts exceeded")); |
| } else { |
| error.reset(new IOException(__FILE__, __LINE__, "Unexpected failure.")); |
| } |
| |
| break; |
| } |
| |
| // If it was a request and it was not being tracked by the state |
| // tracker, then hold it in the requestMap so that we can replay |
| // it later. |
| Pointer<Tracked> tracked; |
| try { |
| tracked = stateTracker.track(command); |
| synchronized(&this->impl->requestMap) { |
| if (tracked != NULL && tracked->isWaitingForResponse()) { |
| this->impl->requestMap.put(command->getCommandId(), tracked); |
| } else if (tracked == NULL && command->isResponseRequired()) { |
| this->impl->requestMap.put(command->getCommandId(), command); |
| } |
| } |
| } catch (Exception& ex) { |
| ex.setMark(__FILE__, __LINE__); |
| error.reset(ex.clone()); |
| break; |
| } |
| |
| // Send the message. |
| try { |
| transport->oneway(command); |
| stateTracker.trackBack(command); |
| if (command->isShutdownInfo()) { |
| this->impl->shutdown = true; |
| } |
| } catch (IOException& e) { |
| |
| e.setMark(__FILE__, __LINE__); |
| |
| // If the command was not tracked.. we will retry in this method |
| if (tracked == NULL && this->impl->canReconnect()) { |
| |
| // since we will retry in this method.. take it out of the |
| // request map so that it is not sent 2 times on recovery |
| if (command->isResponseRequired()) { |
| this->impl->requestMap.remove(command->getCommandId()); |
| } |
| |
| // re-throw the exception so it will handled by the outer catch |
| throw; |
| } else { |
| // Trigger the reconnect since we can't count on inactivity or |
| // other socket events to trip the failover condition. |
| handleTransportFailure(e); |
| } |
| } |
| |
| return; |
| } catch (IOException& e) { |
| e.setMark(__FILE__, __LINE__); |
| handleTransportFailure(e); |
| } |
| } |
| } |
| } catch (InterruptedException& ex) { |
| Thread::currentThread()->interrupt(); |
| throw InterruptedIOException(__FILE__, __LINE__, "FailoverTransport oneway() interrupted"); |
| } |
| AMQ_CATCHALL_NOTHROW() |
| |
| if (!this->impl->closed) { |
| if (error != NULL) { |
| throw IOException(*error); |
| } |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| Pointer<FutureResponse> FailoverTransport::asyncRequest(const Pointer<Command> command AMQCPP_UNUSED, |
| const Pointer<ResponseCallback> responseCallback AMQCPP_UNUSED) { |
| throw decaf::lang::exceptions::UnsupportedOperationException(__FILE__, __LINE__, "FailoverTransport::asyncRequest - Not Supported"); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| Pointer<Response> FailoverTransport::request(const Pointer<Command> command AMQCPP_UNUSED) { |
| throw decaf::lang::exceptions::UnsupportedOperationException(__FILE__, __LINE__, "FailoverTransport::request - Not Supported"); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| Pointer<Response> FailoverTransport::request(const Pointer<Command> command AMQCPP_UNUSED, unsigned int timeout AMQCPP_UNUSED) { |
| throw decaf::lang::exceptions::UnsupportedOperationException(__FILE__, __LINE__, "FailoverTransport::request - Not Supported"); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::start() { |
| |
| try { |
| |
| synchronized(&this->impl->reconnectMutex) { |
| |
| if (this->impl->started) { |
| return; |
| } |
| |
| this->impl->started = true; |
| |
| if (this->impl->backupsEnabled || this->impl->priorityBackup) { |
| this->impl->backups->setEnabled(true); |
| } |
| this->impl->taskRunner->start(); |
| |
| stateTracker.setMaxMessageCacheSize(this->getMaxCacheSize()); |
| stateTracker.setMaxMessagePullCacheSize(this->getMaxPullCacheSize()); |
| stateTracker.setTrackMessages(this->isTrackMessages()); |
| stateTracker.setTrackTransactionProducers(this->isTrackTransactionProducers()); |
| |
| if (this->impl->connectedTransport != NULL) { |
| stateTracker.restore(this->impl->connectedTransport); |
| } else { |
| reconnect(false); |
| } |
| } |
| } |
| AMQ_CATCH_RETHROW(IOException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) |
| AMQ_CATCHALL_THROW(IOException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::stop() { |
| |
| try { |
| synchronized(&this->impl->reconnectMutex) { |
| this->impl->started = false; |
| this->impl->backups->setEnabled(false); |
| } |
| } |
| AMQ_CATCH_RETHROW(IOException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) |
| AMQ_CATCHALL_THROW(IOException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::close() { |
| |
| try { |
| |
| Pointer<Transport> transportToStop; |
| |
| synchronized(&this->impl->reconnectMutex) { |
| |
| if (this->impl->closed) { |
| return; |
| } |
| |
| this->impl->started = false; |
| this->impl->closed = true; |
| this->impl->connected = false; |
| |
| this->impl->backups->setEnabled(false); |
| this->impl->requestMap.clear(); |
| |
| if (this->impl->connectedTransport != NULL) { |
| transportToStop.swap(this->impl->connectedTransport); |
| } |
| |
| this->impl->reconnectMutex.notifyAll(); |
| } |
| |
| this->impl->backups->close(); |
| |
| synchronized( &this->impl->sleepMutex ) { |
| this->impl->sleepMutex.notifyAll(); |
| } |
| |
| this->impl->taskRunner->shutdown(TimeUnit::MINUTES.toMillis(5)); |
| |
| if (transportToStop != NULL) { |
| transportToStop->close(); |
| } |
| } |
| AMQ_CATCH_RETHROW(IOException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) |
| AMQ_CATCHALL_THROW(IOException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::reconnect(bool rebalance) { |
| |
| Pointer<Transport> transport; |
| |
| synchronized( &this->impl->reconnectMutex ) { |
| if (this->impl->started) { |
| |
| if (rebalance) { |
| this->impl->doRebalance = true; |
| } |
| |
| try { |
| this->impl->taskRunner->wakeup(); |
| } catch (InterruptedException& ex) { |
| Thread::currentThread()->interrupt(); |
| } |
| } |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::restoreTransport(const Pointer<Transport> transport) { |
| |
| try { |
| |
| transport->start(); |
| |
| //send information to the broker - informing it we are an ft client |
| Pointer<ConnectionControl> cc(new ConnectionControl()); |
| cc->setFaultTolerant(true); |
| transport->oneway(cc); |
| |
| stateTracker.restore(transport); |
| |
| decaf::util::StlMap<int, Pointer<Command> > commands; |
| synchronized(&this->impl->requestMap) { |
| commands.copy(this->impl->requestMap); |
| } |
| |
| Pointer<Iterator<Pointer<Command> > > iter(commands.values().iterator()); |
| while (iter->hasNext()) { |
| transport->oneway(iter->next()); |
| } |
| } |
| AMQ_CATCH_RETHROW(IOException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) |
| AMQ_CATCHALL_THROW(IOException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::handleTransportFailure(const decaf::lang::Exception& error) { |
| |
| if (this->impl->shutdown) { |
| // shutdown info sent and remote socket closed and we see that before a local close |
| // let the close do the work |
| return; |
| } |
| |
| synchronized(&this->impl->reconnectMutex) { |
| |
| if (this->impl->shutdown) { |
| return; |
| } |
| |
| Pointer<Transport> transport; |
| this->impl->connectedTransport.swap(transport); |
| |
| if (transport != NULL) { |
| |
| if (this->impl->disposedListener != NULL) { |
| transport->setTransportListener(this->impl->disposedListener.get()); |
| } |
| |
| // Hand off to the close task so it gets done in a different thread. |
| this->impl->closeTask->add(transport); |
| |
| bool reconnectOk = this->impl->canReconnect(); |
| URI failedUri = *this->impl->connectedTransportURI; |
| |
| this->impl->initialized = false; |
| this->impl->uris->addURI(failedUri); |
| this->impl->connectedTransportURI.reset(NULL); |
| this->impl->connected = false; |
| this->impl->connectedToPrioirty = false; |
| |
| // Place the State Tracker into a reconnection state. |
| this->stateTracker.transportInterrupted(); |
| |
| // Notify before we attempt to reconnect so that the consumers have a chance |
| // to cleanup their state. |
| if (reconnectOk) { |
| if (this->impl->transportListener != NULL) { |
| this->impl->transportListener->transportInterrupted(); |
| } |
| |
| this->impl->updated->removeURI(failedUri); |
| this->impl->taskRunner->wakeup(); |
| } else if (!this->impl->closed) { |
| this->impl->connectionFailure.reset(error.clone()); |
| this->impl->propagateFailureToExceptionListener(); |
| } |
| } |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::handleConnectionControl(const Pointer<Command> control) { |
| |
| try { |
| |
| Pointer<ConnectionControl> ctrlCommand = control.dynamicCast<ConnectionControl>(); |
| |
| std::string reconnectStr = ctrlCommand->getReconnectTo(); |
| if (!reconnectStr.empty()) { |
| |
| std::remove(reconnectStr.begin(), reconnectStr.end(), ' '); |
| |
| if (reconnectStr.length() > 0) { |
| try { |
| if (isReconnectSupported()) { |
| reconnect(URI(reconnectStr)); |
| } |
| } catch (Exception& e) { |
| } |
| } |
| } |
| |
| processNewTransports(ctrlCommand->isRebalanceConnection(), ctrlCommand->getConnectedBrokers()); |
| } |
| AMQ_CATCH_RETHROW(Exception) |
| AMQ_CATCHALL_THROW(Exception) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::processNewTransports(bool rebalance, std::string newTransports) { |
| |
| if (!newTransports.empty()) { |
| |
| std::remove(newTransports.begin(), newTransports.end(), ' '); |
| |
| if (newTransports.length() > 0 && isUpdateURIsSupported()) { |
| |
| LinkedList<URI> list; |
| StringTokenizer tokenizer(newTransports, ","); |
| |
| while (tokenizer.hasMoreTokens()) { |
| std::string str = tokenizer.nextToken(); |
| try { |
| URI uri(str); |
| list.add(uri); |
| } catch (Exception& e) { |
| } |
| } |
| |
| if (!list.isEmpty()) { |
| try { |
| updateURIs(rebalance, list); |
| } catch (IOException& e) { |
| } |
| } |
| } |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::updateURIs(bool rebalance, const decaf::util::List<decaf::net::URI>& updatedURIs) { |
| |
| if (isUpdateURIsSupported()) { |
| |
| Pointer<URIPool> copy(new URIPool(*this->impl->updated)); |
| this->impl->updated->clear(); |
| |
| if (!updatedURIs.isEmpty()) { |
| |
| StlSet<URI> set; |
| |
| for (int i = 0; i < updatedURIs.size(); i++) { |
| set.add(updatedURIs.get(i)); |
| } |
| |
| Pointer<Iterator<URI> > setIter(set.iterator()); |
| while (setIter->hasNext()) { |
| URI value = setIter->next(); |
| this->impl->updated->addURI(value); |
| } |
| |
| if (!(copy->isEmpty() && this->impl->updated->isEmpty()) && |
| !(copy->equals(*this->impl->updated))) { |
| |
| synchronized(&this->impl->reconnectMutex) { |
| reconnect(rebalance); |
| } |
| } |
| } |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool FailoverTransport::isPending() const { |
| bool result = false; |
| |
| synchronized(&this->impl->reconnectMutex) { |
| if (!this->impl->isConnectionStateValid() && this->impl->started && !this->impl->isClosedOrFailed()) { |
| |
| int maxReconnectAttempts = this->impl->calculateReconnectAttemptLimit(); |
| |
| if (maxReconnectAttempts != -1 && this->impl->connectFailures >= maxReconnectAttempts) { |
| result = false; |
| } else { |
| result = true; |
| } |
| } |
| } |
| |
| return result; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool FailoverTransport::iterate() { |
| |
| Pointer<Exception> failure; |
| |
| synchronized( &this->impl->reconnectMutex ) { |
| |
| if (this->impl->isClosedOrFailed()) { |
| this->impl->reconnectMutex.notifyAll(); |
| } |
| |
| if (this->impl->isConnectionStateValid() || this->impl->isClosedOrFailed()) { |
| return false; |
| } else { |
| |
| Pointer<URIPool> connectList = this->impl->getConnectList(); |
| |
| if (connectList->isEmpty()) { |
| failure.reset(new IOException(__FILE__, __LINE__, "No URIs available for reconnect.")); |
| } else { |
| |
| if (this->impl->doRebalance) { |
| if (this->impl->connectedToPrioirty || connectList->getPriorityURI().equals(*this->impl->connectedTransportURI)) { |
| // already connected to first in the list, no need to rebalance |
| this->impl->doRebalance = false; |
| return false; |
| } else { |
| // break any existing connect for rebalance. |
| this->impl->disconnect(); |
| } |
| |
| this->impl->doRebalance = false; |
| } |
| |
| this->impl->resetReconnectDelay(); |
| |
| LinkedList<URI> failures; |
| Pointer<Transport> transport; |
| URI uri; |
| |
| if (this->impl->backups->isEnabled()) { |
| Pointer<BackupTransport> backupTransport = this->impl->backups->getBackup(); |
| if (backupTransport != NULL) { |
| transport = backupTransport->getTransport(); |
| uri = backupTransport->getUri(); |
| if (this->impl->priorityBackup && this->impl->backups->isPriorityBackupAvailable()) { |
| // A priority connection is available and we aren't connected to |
| // any other priority transports so disconnect and use the backup. |
| this->impl->disconnect(); |
| } |
| } |
| } |
| |
| // Sleep for the reconnectDelay if there's no backup and we aren't trying |
| // for the first time, or we were disposed for some reason. |
| if (transport == NULL && !this->impl->firstConnection && |
| (this->impl->reconnectDelay > 0) && !this->impl->closed) { |
| synchronized (&this->impl->sleepMutex) { |
| try { |
| this->impl->sleepMutex.wait(this->impl->reconnectDelay); |
| } catch (InterruptedException& e) { |
| Thread::currentThread()->interrupt(); |
| } |
| } |
| } |
| |
| while (transport == NULL && this->impl->connectedTransport == NULL && !this->impl->closed) { |
| try { |
| // We could be starting the loop with a backup already. |
| if (transport == NULL) { |
| try { |
| uri = connectList->getURI(); |
| } catch (NoSuchElementException& ex) { |
| break; |
| } |
| |
| transport = createTransport(uri); |
| } |
| |
| transport->setTransportListener(this->impl->myTransportListener.get()); |
| transport->start(); |
| |
| if (this->impl->started && !this->impl->firstConnection) { |
| restoreTransport(transport); |
| } |
| |
| this->impl->reconnectDelay = this->impl->initialReconnectDelay; |
| this->impl->connectedTransportURI.reset(new URI(uri)); |
| this->impl->connectedTransport = transport; |
| this->impl->reconnectMutex.notifyAll(); |
| this->impl->connectFailures = 0; |
| |
| if (isPriorityBackup()) { |
| this->impl->connectedToPrioirty = connectList->getPriorityURI().equals(uri) || |
| this->impl->priorityUris->contains(uri); |
| } else { |
| this->impl->connectedToPrioirty = false; |
| } |
| |
| // Make sure on initial startup, that the transportListener |
| // has been initialized for this instance. |
| synchronized(&this->impl->listenerMutex) { |
| if (this->impl->transportListener == NULL) { |
| // if it isn't set after 2secs - it probably never will be |
| this->impl->listenerMutex.wait(2000); |
| } |
| } |
| |
| if (this->impl->transportListener != NULL) { |
| this->impl->transportListener->transportResumed(); |
| } |
| |
| if (this->impl->firstConnection) { |
| this->impl->firstConnection = false; |
| } |
| |
| // Return the failures to the pool, we will try again on the next iteration. |
| connectList->addURIs(failures); |
| |
| this->impl->connected = true; |
| return false; |
| |
| } catch (Exception& e) { |
| e.setMark(__FILE__, __LINE__); |
| |
| if (transport != NULL) { |
| if (this->impl->disposedListener != NULL) { |
| transport->setTransportListener(this->impl->disposedListener.get()); |
| } |
| |
| try { |
| transport->stop(); |
| } catch (...) { |
| } |
| |
| // Hand off to the close task so it gets done in a different thread |
| // this prevents a deadlock from occurring if the Transport happens |
| // to call back through our onException method or locks in some other |
| // way. |
| this->impl->closeTask->add(transport); |
| this->impl->taskRunner->wakeup(); |
| transport.reset(NULL); |
| } |
| |
| failures.add(uri); |
| failure.reset(e.clone()); |
| } |
| } |
| |
| // Return the failures to the pool, we will try again on the next iteration. |
| connectList->addURIs(failures); |
| } |
| } |
| |
| int reconnectAttempts = this->impl->calculateReconnectAttemptLimit(); |
| |
| if (reconnectAttempts >= 0 && ++this->impl->connectFailures >= reconnectAttempts) { |
| this->impl->connectionFailure = failure; |
| |
| // Make sure on initial startup, that the transportListener has been initialized |
| // for this instance. |
| synchronized(&this->impl->listenerMutex) { |
| if (this->impl->transportListener == NULL) { |
| this->impl->listenerMutex.wait(2000); |
| } |
| } |
| |
| this->impl->propagateFailureToExceptionListener(); |
| return false; |
| } |
| } |
| |
| if (!this->impl->closed) { |
| this->impl->doDelay(); |
| } |
| |
| return !this->impl->closed; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| Pointer<Transport> FailoverTransport::createTransport(const URI& location) const { |
| |
| try { |
| |
| TransportFactory* factory = TransportRegistry::getInstance().findFactory(location.getScheme()); |
| |
| if (factory == NULL) { |
| throw new IOException(__FILE__, __LINE__, "Invalid URI specified, no valid Factory Found."); |
| } |
| |
| Pointer<Transport> transport(factory->createComposite(location)); |
| |
| return transport; |
| } |
| AMQ_CATCH_RETHROW(IOException) |
| AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException) |
| AMQ_CATCHALL_THROW(IOException) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setConnectionInterruptProcessingComplete(const Pointer<commands::ConnectionId> connectionId) { |
| |
| synchronized(&this->impl->reconnectMutex) { |
| stateTracker.connectionInterruptProcessingComplete(this, connectionId); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool FailoverTransport::isConnected() const { |
| return this->impl->connected; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool FailoverTransport::isClosed() const { |
| return this->impl->closed; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool FailoverTransport::isInitialized() const { |
| return this->impl->initialized; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setInitialized(bool value) { |
| this->impl->initialized = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| Transport* FailoverTransport::narrow(const std::type_info& typeId) { |
| |
| if (typeid( *this ) == typeId) { |
| return this; |
| } |
| |
| if (this->impl->connectedTransport != NULL) { |
| return this->impl->connectedTransport->narrow(typeId); |
| } |
| |
| return NULL; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::processResponse(const Pointer<Response> response) { |
| |
| Pointer<Command> object; |
| |
| synchronized(&(this->impl->requestMap)) { |
| try { |
| object = this->impl->requestMap.remove(response->getCorrelationId()); |
| } catch (NoSuchElementException& ex) { |
| // Not tracking this request in our map, not an error. |
| } |
| } |
| |
| if (object != NULL) { |
| try { |
| Pointer<Tracked> tracked = object.dynamicCast<Tracked>(); |
| tracked->onResponse(); |
| } |
| AMQ_CATCH_NOTHROW(ClassCastException) |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| Pointer<wireformat::WireFormat> FailoverTransport::getWireFormat() const { |
| |
| Pointer<wireformat::WireFormat> result; |
| Pointer<Transport> transport = this->impl->connectedTransport; |
| |
| if (transport != NULL) { |
| result = transport->getWireFormat(); |
| } |
| |
| return result; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| long long FailoverTransport::getTimeout() const { |
| return this->impl->timeout; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setTimeout(long long value) { |
| this->impl->timeout = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| long long FailoverTransport::getInitialReconnectDelay() const { |
| return this->impl->initialReconnectDelay; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setInitialReconnectDelay(long long value) { |
| this->impl->initialReconnectDelay = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| long long FailoverTransport::getMaxReconnectDelay() const { |
| return this->impl->maxReconnectDelay; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setMaxReconnectDelay(long long value) { |
| this->impl->maxReconnectDelay = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| long long FailoverTransport::getBackOffMultiplier() const { |
| return this->impl->backOffMultiplier; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setBackOffMultiplier(long long value) { |
| this->impl->backOffMultiplier = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool FailoverTransport::isUseExponentialBackOff() const { |
| return this->impl->useExponentialBackOff; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setUseExponentialBackOff(bool value) { |
| this->impl->useExponentialBackOff = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool FailoverTransport::isRandomize() const { |
| return this->impl->uris->isRandomize(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setRandomize(bool value) { |
| this->impl->uris->setRandomize(value); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| int FailoverTransport::getMaxReconnectAttempts() const { |
| return this->impl->maxReconnectAttempts; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setMaxReconnectAttempts(int value) { |
| this->impl->maxReconnectAttempts = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| int FailoverTransport::getStartupMaxReconnectAttempts() const { |
| return this->impl->startupMaxReconnectAttempts; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setStartupMaxReconnectAttempts(int value) { |
| this->impl->startupMaxReconnectAttempts = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| long long FailoverTransport::getReconnectDelay() const { |
| return this->impl->reconnectDelay; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setReconnectDelay(long long value) { |
| this->impl->reconnectDelay = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool FailoverTransport::isBackup() const { |
| return this->impl->backupsEnabled; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setBackup(bool value) { |
| this->impl->backupsEnabled = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| int FailoverTransport::getBackupPoolSize() const { |
| return this->impl->backups->getBackupPoolSize(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setBackupPoolSize(int value) { |
| this->impl->backups->setBackupPoolSize(value); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool FailoverTransport::isTrackMessages() const { |
| return this->impl->trackMessages; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setTrackMessages(bool value) { |
| this->impl->trackMessages = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool FailoverTransport::isTrackTransactionProducers() const { |
| return this->impl->trackTransactionProducers; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setTrackTransactionProducers(bool value) { |
| this->impl->trackTransactionProducers = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| int FailoverTransport::getMaxCacheSize() const { |
| return this->impl->maxCacheSize; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setMaxCacheSize(int value) { |
| this->impl->maxCacheSize = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| int FailoverTransport::getMaxPullCacheSize() const { |
| return this->impl->maxPullCacheSize; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setMaxPullCacheSize(int value) { |
| this->impl->maxPullCacheSize = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool FailoverTransport::isReconnectSupported() const { |
| return this->impl->reconnectSupported; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setReconnectSupported(bool value) { |
| this->impl->reconnectSupported = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool FailoverTransport::isUpdateURIsSupported() const { |
| return this->impl->updateURIsSupported; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setUpdateURIsSupported(bool value) { |
| this->impl->updateURIsSupported = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool FailoverTransport::isRebalanceUpdateURIs() const { |
| return this->impl->rebalanceUpdateURIs; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setRebalanceUpdateURIs(bool rebalanceUpdateURIs) { |
| this->impl->rebalanceUpdateURIs = rebalanceUpdateURIs; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool FailoverTransport::isPriorityBackup() const { |
| return this->impl->priorityBackup; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setPriorityBackup(bool priorityBackup) { |
| this->impl->priorityBackup = priorityBackup; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool FailoverTransport::isConnectedToPriority() const { |
| return this->impl->connectedToPrioirty; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void FailoverTransport::setPriorityURIs(const std::string& priorityURIs AMQCPP_UNUSED) { |
| StringTokenizer tokenizer(priorityURIs, ","); |
| while (tokenizer.hasMoreTokens()) { |
| std::string str = tokenizer.nextToken(); |
| try { |
| this->impl->priorityUris->addURI(URI(str)); |
| } catch (Exception& e) { |
| } |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| const List<URI>& FailoverTransport::getPriorityURIs() const { |
| return this->impl->priorityUris->getURIList(); |
| } |