https://issues.apache.org/jira/browse/AMQCPP-609
Watch for shutdown and don't reconnect if the remote drops the
connection before the close on the transport is registered.
(cherry picked from commit f93fa9ea187693c26614cee17e7a7855cea3e7c0)
diff --git a/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp b/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
index 3e1059a..b5ea33b 100644
--- a/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
+++ b/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
@@ -94,6 +94,7 @@
bool rebalanceUpdateURIs;
bool priorityBackup;
bool backupsEnabled;
+ volatile bool shutdown;
bool doRebalance;
bool connectedToPrioirty;
@@ -143,6 +144,7 @@
rebalanceUpdateURIs(true),
priorityBackup(false),
backupsEnabled(false),
+ shutdown(false),
doRebalance(false),
connectedToPrioirty(false),
reconnectMutex(),
@@ -226,7 +228,7 @@
try {
ioException = this->connectionFailure.dynamicCast<IOException>();
}
- AMQ_CATCH_NOTHROW( ClassCastException)
+ AMQ_CATCH_NOTHROW(ClassCastException)
if (ioException != NULL) {
transportListener->onException(*this->connectionFailure);
@@ -272,6 +274,9 @@
}
}
+ bool willReconnect() {
+ return firstConnection || 0 != calculateReconnectAttemptLimit();
+ }
};
const int FailoverTransportImpl::DEFAULT_INITIAL_RECONNECT_DELAY = 10;
@@ -435,7 +440,9 @@
long long start = System::currentTimeMillis();
bool timedout = false;
- while (transport == NULL && !this->impl->closed && this->impl->connectionFailure == NULL) {
+ while (transport == NULL && !this->impl->closed &&
+ this->impl->connectionFailure == NULL && this->impl->willReconnect()) {
+
long long end = System::currentTimeMillis();
if (command->isMessage() && this->impl->timeout > 0 && (end - start > this->impl->timeout)) {
timedout = true;
@@ -455,6 +462,9 @@
} else if (timedout == true) {
error.reset(new IOException(__FILE__, __LINE__,
"Failover timeout of %d ms reached.", this->impl->timeout));
+ } else if (!this->impl->willReconnect()) {
+ error.reset(new IOException(__FILE__, __LINE__,
+ "Maximum reconnection attempts exceeded"));
} else {
error.reset(new IOException(__FILE__, __LINE__, "Unexpected failure."));
}
@@ -468,7 +478,7 @@
Pointer<Tracked> tracked;
try {
tracked = stateTracker.track(command);
- synchronized( &this->impl->requestMap ) {
+ synchronized(&this->impl->requestMap) {
if (tracked != NULL && tracked->isWaitingForResponse()) {
this->impl->requestMap.put(command->getCommandId(), tracked);
} else if (tracked == NULL && command->isResponseRequired()) {
@@ -485,13 +495,15 @@
try {
transport->oneway(command);
stateTracker.trackBack(command);
+ if (command->isShutdownInfo()) {
+ this->impl->shutdown = true;
+ }
} catch (IOException& e) {
e.setMark(__FILE__, __LINE__);
- // If the command was not tracked.. we will retry in
- // this method
- if (tracked == NULL) {
+ // If the command was not tracked.. we will retry in this method
+ if (tracked == NULL && this->impl->canReconnect()) {
// since we will retry in this method.. take it out of the
// request map so that it is not sent 2 times on recovery
@@ -690,8 +702,18 @@
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::handleTransportFailure(const decaf::lang::Exception& error) {
+ if (this->impl->shutdown) {
+ // shutdown info sent and remote socket closed and we see that before a local close
+ // let the close do the work
+ return;
+ }
+
synchronized(&this->impl->reconnectMutex) {
+ if (this->impl->shutdown) {
+ return;
+ }
+
Pointer<Transport> transport;
this->impl->connectedTransport.swap(transport);