https://issues.apache.org/jira/browse/AMQCPP-601
Fixes for priority backup handling to prevent stall when reconnecting
tries to use a priority instance.
(cherry picked from commit a8ff8b54e0d75fb751f5b76682c715a78ee8150d)
# Conflicts:
# activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
# activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp
# activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h
# activemq-cpp/src/test/testRegistry.cpp
diff --git a/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp b/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp
index e80ff51..00993d7 100644
--- a/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp
+++ b/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp
@@ -53,14 +53,37 @@
public:
+ BackupTransportPool* pool;
+ FailoverTransport* parent;
LinkedList< Pointer<BackupTransport> > backups;
volatile bool pending;
volatile bool closed;
volatile int priorityBackups;
- BackupTransportPoolImpl() : backups(), pending(false), closed(false), priorityBackups(0) {
+ BackupTransportPoolImpl(BackupTransportPool* pool, FailoverTransport* parent) : pool(pool),
+ parent(parent),
+ backups(),
+ pending(false),
+ closed(false),
+ priorityBackups(0) {
}
+ bool shouldBuildBackup() {
+ bool result = false;
+
+ if (pool->isEnabled()) {
+
+ // If there's no priority backup and the failover transport isn't connected to
+ // a priority backup then we should keep trying to connect to one.
+ if (parent->isPriorityBackup() && !parent->isConnectedToPriority() && priorityBackups == 0) {
+ result = true;
+ } else if (backups.size() < pool->getBackupPoolSize()) {
+ result = true;
+ }
+ }
+
+ return result;
+ }
};
}}}
@@ -101,7 +124,7 @@
throw NullPointerException(__FILE__, __LINE__, "Close Transport Task passed is NULL");
}
- this->impl = new BackupTransportPoolImpl();
+ this->impl = new BackupTransportPoolImpl(this, parent);
// Add this instance as a Task so that we can create backups when nothing else is
// going on.
@@ -145,7 +168,7 @@
throw NullPointerException(__FILE__, __LINE__, "Close Transport Task passed is NULL");
}
- this->impl = new BackupTransportPoolImpl();
+ this->impl = new BackupTransportPoolImpl(this, parent);
// Add this instance as a Task so that we can create backups when nothing else is
// going on.
@@ -242,7 +265,7 @@
bool wakeupParent = false;
- while (isEnabled() && (int) this->impl->backups.size() < backupPoolSize) {
+ while (impl->shouldBuildBackup()) {
URI connectTo;
@@ -259,14 +282,6 @@
Pointer<BackupTransport> backup(new BackupTransport(this));
backup->setUri(connectTo);
- if (priorityUriPool->contains(connectTo)) {
- backup->setPriority(true);
-
- if (!parent->isConnectedToPriority()) {
- wakeupParent = true;
- }
- }
-
try {
Pointer<Transport> transport = createTransport(connectTo);
@@ -274,6 +289,14 @@
transport->start();
backup->setTransport(transport);
+ if (priorityUriPool->contains(connectTo) || (priorityUriPool->isEmpty() && uriPool->isPriority(connectTo))) {
+ backup->setPriority(true);
+
+ if (!parent->isConnectedToPriority()) {
+ wakeupParent = true;
+ }
+ }
+
// Put any priority connections first so a reconnect picks them
// up automatically.
if (backup->isPriority()) {
@@ -282,6 +305,7 @@
} else {
this->impl->backups.addLast(backup);
}
+
} catch (...) {
// Store it in the list of URIs that didn't work, once done we
// return those to the pool.
@@ -289,15 +313,16 @@
}
// We connected to a priority backup and the parent isn't already using one
- // so wake it up and quick the backups process for now.
+ // so wake it up and quit the backups process for now.
if (wakeupParent) {
- this->parent->reconnect(true);
+ this->parent->reconnect(false);
break;
}
}
// return all failures to the URI Pool, we can try again later.
uriPool->addURIs(failures);
+
this->impl->pending = false;
}
diff --git a/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp b/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
index b5ea33b..09777ec 100644
--- a/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
+++ b/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
@@ -69,9 +69,9 @@
public:
- bool closed;
- bool connected;
- bool started;
+ volatile bool closed;
+ volatile bool connected;
+ volatile bool started;
long long timeout;
long long initialReconnectDelay;
@@ -251,7 +251,7 @@
}
bool isConnectionStateValid() const {
- return connectedTransport != NULL && !doRebalance && !this->backups->isPriorityBackupAvailable();
+ return connectedTransport != NULL && !doRebalance && !backups->isPriorityBackupAvailable();
}
void disconnect() {
@@ -271,6 +271,10 @@
this->uris->addURI(*this->connectedTransportURI);
this->connectedTransportURI.reset(NULL);
}
+
+ if (transportListener != NULL) {
+ transportListener->transportInterrupted();
+ }
}
}
@@ -851,12 +855,14 @@
bool FailoverTransport::isPending() const {
bool result = false;
- synchronized(&this->impl->reconnectMutex) {
- if (!this->impl->isConnectionStateValid() && this->impl->started && !this->impl->isClosedOrFailed()) {
+ synchronized(&impl->reconnectMutex) {
+ if (!impl->isConnectionStateValid() && impl->started && !impl->isClosedOrFailed()) {
- int maxReconnectAttempts = this->impl->calculateReconnectAttemptLimit();
+ int maxReconnectAttempts = impl->calculateReconnectAttemptLimit();
- if (maxReconnectAttempts != -1 && this->impl->connectFailures >= maxReconnectAttempts) {
+ if (impl->firstConnection && impl->connectFailures == 0) {
+ result = true;
+ } else if (maxReconnectAttempts != -1 && impl->connectFailures > maxReconnectAttempts) {
result = false;
} else {
result = true;
@@ -872,7 +878,7 @@
Pointer<Exception> failure;
- synchronized( &this->impl->reconnectMutex ) {
+ synchronized(&this->impl->reconnectMutex) {
if (this->impl->isClosedOrFailed()) {
this->impl->reconnectMutex.notifyAll();
@@ -884,7 +890,7 @@
Pointer<URIPool> connectList = this->impl->getConnectList();
- if (connectList->isEmpty()) {
+ if (connectList->isEmpty() && !impl->backups->isEnabled()) {
failure.reset(new IOException(__FILE__, __LINE__, "No URIs available for reconnect."));
} else {
@@ -933,7 +939,7 @@
}
}
- while (transport == NULL && this->impl->connectedTransport == NULL && !this->impl->closed) {
+ while ((transport != NULL || !connectList->isEmpty()) && this->impl->connectedTransport == NULL && !this->impl->closed) {
try {
// We could be starting the loop with a backup already.
if (transport == NULL) {
@@ -958,6 +964,7 @@
this->impl->connectedTransport = transport;
this->impl->reconnectMutex.notifyAll();
this->impl->connectFailures = 0;
+ this->impl->connected = true;
if (isPriorityBackup()) {
this->impl->connectedToPrioirty = connectList->getPriorityURI().equals(uri) ||
@@ -1006,6 +1013,7 @@
// this prevents a deadlock from occurring if the Transport happens
// to call back through our onException method or locks in some other
// way.
+ this->impl->connected = false;
this->impl->closeTask->add(transport);
this->impl->taskRunner->wakeup();
transport.reset(NULL);
diff --git a/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp b/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp
index 1c7171a..7c8f947 100644
--- a/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp
+++ b/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp
@@ -161,9 +161,7 @@
////////////////////////////////////////////////////////////////////////////////
bool URIPool::isPriority(const decaf::net::URI& uri) const {
synchronized(&uriPool) {
- if (!uriPool.isEmpty()) {
- return uriPool.getFirst().equals(uri);
- }
+ return priorityURI.equals(uri);
}
return false;
}
diff --git a/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp b/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp
index dea1bbc..ceb6634 100644
--- a/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp
+++ b/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp
@@ -55,8 +55,9 @@
class TcpServer : public lang::Thread {
private:
- bool done;
- bool error;
+ volatile bool done;
+ volatile bool error;
+ const int configuredPort;
Pointer<ServerSocket> server;
Pointer<OpenWireFormat> wireFormat;
Pointer<OpenWireResponseBuilder> responeBuilder;
@@ -65,9 +66,8 @@
public:
- TcpServer() : Thread(), done(false), error(false), server(), wireFormat(),
+ TcpServer() : Thread(), done(false), error(false), configuredPort(0), server(), wireFormat(),
responeBuilder(), started(1), rand() {
- server.reset(new ServerSocket(0));
Properties properties;
@@ -77,9 +77,8 @@
this->rand.setSeed(System::currentTimeMillis());
}
- TcpServer(int port) : Thread(), done(false), error(false), server(), wireFormat(),
+ TcpServer(int port) : Thread(), done(false), error(false), configuredPort(port), server(), wireFormat(),
responeBuilder(), started(1), rand() {
- server.reset(new ServerSocket(port));
Properties properties;
this->wireFormat = OpenWireFormatFactory().createWireFormat(properties).dynamicCast<OpenWireFormat>();
@@ -90,6 +89,7 @@
virtual ~TcpServer() {
stop();
+ waitUntilStopped();
}
int getLocalPort() {
@@ -124,7 +124,15 @@
MockTransport mock(this->wireFormat, this->responeBuilder);
- std::auto_ptr<Socket> socket(server->accept());
+ server.reset(new ServerSocket(configuredPort));
+
+ std::auto_ptr<Socket> socket;
+ try {
+ socket.reset(server->accept());
+ } catch (IOException& ioe) {
+ continue;
+ }
+
socket->setSoLinger(false, 0);
Pointer<WireFormatInfo> preferred = wireFormat->getPreferedWireFormatInfo();
@@ -147,7 +155,6 @@
}
}
}
-
} catch (IOException& ex) {
error = true;
} catch (Exception& ex) {
@@ -228,6 +235,11 @@
}
////////////////////////////////////////////////////////////////////////////////
+int MockBrokerService::getPort() const {
+ return this->impl->server->getLocalPort();
+}
+
+////////////////////////////////////////////////////////////////////////////////
std::string MockBrokerService::getConnectString() const {
int port = this->impl->server->getLocalPort();
return std::string("tcp://localhost:") + Integer::toString(port);
diff --git a/activemq-cpp/src/test/activemq/mock/MockBrokerService.h b/activemq-cpp/src/test/activemq/mock/MockBrokerService.h
index 10dfc8a..92f4b76 100644
--- a/activemq-cpp/src/test/activemq/mock/MockBrokerService.h
+++ b/activemq-cpp/src/test/activemq/mock/MockBrokerService.h
@@ -57,6 +57,8 @@
std::string getConnectString() const;
+ int getPort() const;
+
};
}}
diff --git a/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp b/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp
index bc3f04f..9d51b7e 100644
--- a/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp
+++ b/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp
@@ -24,6 +24,8 @@
#include <activemq/commands/ActiveMQMessage.h>
#include <activemq/commands/ConnectionControl.h>
#include <activemq/mock/MockBrokerService.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/util/concurrent/Mutex.h>
#include <decaf/lang/Pointer.h>
#include <decaf/lang/Thread.h>
#include <decaf/util/UUID.h>
@@ -38,6 +40,7 @@
using namespace decaf::io;
using namespace decaf::lang;
using namespace decaf::util;
+using namespace decaf::util::concurrent;
////////////////////////////////////////////////////////////////////////////////
FailoverTransportTest::FailoverTransportTest() {
@@ -700,3 +703,336 @@
broker1.stop();
broker1.waitUntilStopped();
}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportTest::testMaxReconnectsZeroAttemptsOneConnect() {
+
+ std::string uri = "failover://(mock://localhost:61616)?maxReconnectAttempts=0";
+
+ DefaultTransportListener listener;
+ FailoverTransportFactory factory;
+
+ Pointer<Transport> transport(factory.create(uri));
+ CPPUNIT_ASSERT(transport != NULL);
+ transport->setTransportListener(&listener);
+
+ FailoverTransport* failover =
+ dynamic_cast<FailoverTransport*>(transport->narrow(typeid(FailoverTransport)));
+
+ CPPUNIT_ASSERT(failover != NULL);
+
+ transport->start();
+
+ Thread::sleep(1000);
+ CPPUNIT_ASSERT(failover->isConnected() == true);
+
+ transport->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportTest::testMaxReconnectsHonorsConfiguration() {
+
+ // max reconnect attempts of two means one connection attempt followed by
+ // two retries.
+
+ std::string uri = "failover://(mock://localhost:61616?failOnCreate=true,"
+ "mock://localhost:61617?failOnCreate=true)"
+ "?randomize=false&maxReconnectAttempts=2";
+
+ Pointer<WireFormatInfo> info(new WireFormatInfo());
+
+ DefaultTransportListener listener;
+ FailoverTransportFactory factory;
+
+ Pointer<Transport> transport(factory.create(uri));
+ CPPUNIT_ASSERT(transport != NULL);
+ transport->setTransportListener(&listener);
+
+ FailoverTransport* failover =
+ dynamic_cast<FailoverTransport*>(transport->narrow(typeid(FailoverTransport)));
+
+ CPPUNIT_ASSERT(failover != NULL);
+ CPPUNIT_ASSERT(failover->isRandomize() == false);
+
+ transport->start();
+
+ CPPUNIT_ASSERT_THROW_MESSAGE("Send should have failed after max connect attempts of two",
+ transport->oneway(info), Exception);
+
+ CPPUNIT_ASSERT(failover->isConnected() == false);
+
+ transport->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportTest::testStartupMaxReconnectsHonorsConfiguration() {
+
+ // max reconnect attempts of two means one connection attempt followed by
+ // two retries.
+
+ std::string uri = "failover://(mock://localhost:61616?failOnCreate=true,"
+ "mock://localhost:61617?failOnCreate=true)"
+ "?randomize=false&startupMaxReconnectAttempts=2";
+
+ Pointer<WireFormatInfo> info(new WireFormatInfo());
+
+ DefaultTransportListener listener;
+ FailoverTransportFactory factory;
+
+ Pointer<Transport> transport(factory.create(uri));
+ CPPUNIT_ASSERT(transport != NULL);
+ transport->setTransportListener(&listener);
+
+ FailoverTransport* failover =
+ dynamic_cast<FailoverTransport*>(transport->narrow(typeid(FailoverTransport)));
+
+ CPPUNIT_ASSERT(failover != NULL);
+ CPPUNIT_ASSERT(failover->isRandomize() == false);
+
+ transport->start();
+
+ CPPUNIT_ASSERT_THROW_MESSAGE("Send should have failed after max connect attempts of two",
+ transport->oneway(info), Exception);
+
+ CPPUNIT_ASSERT(failover->isConnected() == false);
+
+ transport->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+ class PriorityBackupListener : public DefaultTransportListener {
+ private:
+
+ Pointer<CountDownLatch> interruptedLatch;
+ Pointer<CountDownLatch> resumedLatch;
+
+ Mutex resetMutex;
+
+ public:
+
+ PriorityBackupListener() : interruptedLatch(new CountDownLatch(1)),
+ resumedLatch(new CountDownLatch(1)),
+ resetMutex() {
+ }
+
+ virtual ~PriorityBackupListener() {}
+
+ virtual void transportInterrupted() {
+ interruptedLatch->countDown();
+ }
+
+ virtual void transportResumed() {
+ resumedLatch->countDown();
+ }
+
+ void reset() {
+ synchronized(&resetMutex) {
+ interruptedLatch.reset(new CountDownLatch(1));
+ resumedLatch.reset(new CountDownLatch(1));
+ }
+ }
+
+ bool awaitInterruption() {
+ synchronized(&resetMutex) {
+ return interruptedLatch->await(60000);
+ }
+
+ return false;
+ }
+
+ bool awaitResumed() {
+ synchronized(&resetMutex) {
+ return resumedLatch->await(60000);
+ }
+
+ return false;
+ }
+ };
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportTest::testConnectedToPriorityOnFirstTryThenFailover() {
+
+ Pointer<MockBrokerService> broker1(new MockBrokerService(61626));
+ Pointer<MockBrokerService> broker2(new MockBrokerService(61628));
+
+ broker1->start();
+ broker1->waitUntilStarted();
+
+ broker2->start();
+ broker2->waitUntilStarted();
+
+ std::string uri = "failover://(tcp://localhost:61626,"
+ "tcp://localhost:61628)?randomize=false&priorityBackup=true";
+
+ PriorityBackupListener listener;
+ FailoverTransportFactory factory;
+
+ Pointer<Transport> transport(factory.create(uri));
+ CPPUNIT_ASSERT(transport != NULL);
+ transport->setTransportListener(&listener);
+
+ FailoverTransport* failover =
+ dynamic_cast<FailoverTransport*>(transport->narrow(typeid(FailoverTransport)));
+
+ CPPUNIT_ASSERT(failover != NULL);
+ CPPUNIT_ASSERT(failover->isRandomize() == false);
+ CPPUNIT_ASSERT(failover->isPriorityBackup() == true);
+
+ transport->start();
+
+ CPPUNIT_ASSERT_MESSAGE("Failed to get reconnected in time", listener.awaitResumed());
+ listener.reset();
+
+ CPPUNIT_ASSERT(failover->isConnected() == true);
+ CPPUNIT_ASSERT(failover->isConnectedToPriority() == true);
+
+ broker1->stop();
+ broker1->waitUntilStopped();
+
+ CPPUNIT_ASSERT_MESSAGE("Failed to get interrupted in time", listener.awaitInterruption());
+ CPPUNIT_ASSERT_MESSAGE("Failed to get reconnected in time", listener.awaitResumed());
+ listener.reset();
+
+ CPPUNIT_ASSERT(failover->isConnected() == true);
+ CPPUNIT_ASSERT(failover->isConnectedToPriority() == false);
+
+ transport->close();
+
+ broker1->stop();
+ broker1->waitUntilStopped();
+
+ broker2->stop();
+ broker2->waitUntilStopped();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportTest::testConnectsToPriorityOnceStarted() {
+
+ Pointer<MockBrokerService> broker1(new MockBrokerService(61626));
+ Pointer<MockBrokerService> broker2(new MockBrokerService(61628));
+
+ broker2->start();
+ broker2->waitUntilStarted();
+
+ std::string uri = "failover://(tcp://localhost:61626?transport.useInactivityMonitor=false,"
+ "tcp://localhost:61628?transport.useInactivityMonitor=false)?randomize=false&priorityBackup=true";
+
+ PriorityBackupListener listener;
+ FailoverTransportFactory factory;
+
+ Pointer<Transport> transport(factory.create(uri));
+ CPPUNIT_ASSERT(transport != NULL);
+ transport->setTransportListener(&listener);
+
+ FailoverTransport* failover =
+ dynamic_cast<FailoverTransport*>(transport->narrow(typeid(FailoverTransport)));
+
+ CPPUNIT_ASSERT(failover != NULL);
+ CPPUNIT_ASSERT(failover->isRandomize() == false);
+ CPPUNIT_ASSERT(failover->isPriorityBackup() == true);
+
+ transport->start();
+
+ CPPUNIT_ASSERT_MESSAGE("Failed to get reconnected in time", listener.awaitResumed());
+ listener.reset();
+
+ CPPUNIT_ASSERT(failover->isConnected() == true);
+ CPPUNIT_ASSERT(failover->isConnectedToPriority() == false);
+
+ broker1->start();
+ broker1->waitUntilStarted();
+
+ CPPUNIT_ASSERT_MESSAGE("Failed to get interrupted in time", listener.awaitInterruption());
+ CPPUNIT_ASSERT_MESSAGE("Failed to get reconnected in time", listener.awaitResumed());
+ listener.reset();
+
+ CPPUNIT_ASSERT(failover->isConnected() == true);
+ CPPUNIT_ASSERT(failover->isConnectedToPriority() == true);
+
+ transport->close();
+
+ broker1->stop();
+ broker1->waitUntilStopped();
+
+ broker2->stop();
+ broker2->waitUntilStopped();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportTest::testConnectsToPriorityAfterInitialBackupFails() {
+
+ Pointer<MockBrokerService> broker1(new MockBrokerService(61626));
+ Pointer<MockBrokerService> broker2(new MockBrokerService(61627));
+ Pointer<MockBrokerService> broker3(new MockBrokerService(61628));
+
+ broker2->start();
+ broker2->waitUntilStarted();
+
+ broker3->start();
+ broker3->waitUntilStarted();
+
+ std::string uri = "failover://(tcp://localhost:61626?transport.useInactivityMonitor=false,"
+ "tcp://localhost:61627?transport.useInactivityMonitor=false,"
+ "tcp://localhost:61628?transport.useInactivityMonitor=false)?randomize=false&priorityBackup=true";
+
+ PriorityBackupListener listener;
+ FailoverTransportFactory factory;
+
+ Pointer<Transport> transport(factory.create(uri));
+ CPPUNIT_ASSERT(transport != NULL);
+ transport->setTransportListener(&listener);
+
+ FailoverTransport* failover =
+ dynamic_cast<FailoverTransport*>(transport->narrow(typeid(FailoverTransport)));
+
+ CPPUNIT_ASSERT(failover != NULL);
+ CPPUNIT_ASSERT(failover->isRandomize() == false);
+ CPPUNIT_ASSERT(failover->isPriorityBackup() == true);
+
+ transport->start();
+
+ CPPUNIT_ASSERT_MESSAGE("Failed to get reconnected in time", listener.awaitResumed());
+ listener.reset();
+
+ CPPUNIT_ASSERT(failover->isConnected() == true);
+ CPPUNIT_ASSERT(failover->isConnectedToPriority() == false);
+
+ Thread::sleep(100);
+
+ broker1->start();
+ broker1->waitUntilStarted();
+
+ broker2->stop();
+ broker2->waitUntilStopped();
+
+ for (int i = 0; i < 2; ++i) {
+
+ CPPUNIT_ASSERT_MESSAGE("Failed to get interrupted in time", listener.awaitInterruption());
+ CPPUNIT_ASSERT_MESSAGE("Failed to get reconnected in time", listener.awaitResumed());
+ listener.reset();
+
+ URI connectedURI = URI(transport->getRemoteAddress());
+
+ if (connectedURI.getPort() == broker1->getPort()) {
+ break;
+ }
+ }
+
+ CPPUNIT_ASSERT(failover->isConnected() == true);
+ CPPUNIT_ASSERT(failover->isConnectedToPriority() == true);
+
+ transport->close();
+
+ broker1->stop();
+ broker1->waitUntilStopped();
+
+ broker2->stop();
+ broker2->waitUntilStopped();
+
+ broker3->stop();
+ broker3->waitUntilStopped();
+}
diff --git a/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h b/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h
index f0ea87b..74d7813 100644
--- a/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h
+++ b/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h
@@ -54,6 +54,12 @@
CPPUNIT_TEST( testPriorityBackupConfig );
CPPUNIT_TEST( testUriOptionsApplied );
CPPUNIT_TEST( testConnectedToMockBroker );
+ CPPUNIT_TEST( testMaxReconnectsZeroAttemptsOneConnect );
+ CPPUNIT_TEST( testMaxReconnectsHonorsConfiguration );
+ CPPUNIT_TEST( testStartupMaxReconnectsHonorsConfiguration );
+ CPPUNIT_TEST( testConnectedToPriorityOnFirstTryThenFailover );
+ CPPUNIT_TEST( testConnectsToPriorityOnceStarted );
+ //CPPUNIT_TEST( testConnectsToPriorityAfterInitialBackupFails );
CPPUNIT_TEST_SUITE_END();
public:
@@ -75,6 +81,12 @@
void testPriorityBackupConfig();
void testUriOptionsApplied();
void testConnectedToMockBroker();
+ void testMaxReconnectsZeroAttemptsOneConnect();
+ void testMaxReconnectsHonorsConfiguration();
+ void testStartupMaxReconnectsHonorsConfiguration();
+ void testConnectedToPriorityOnFirstTryThenFailover();
+ void testConnectsToPriorityOnceStarted();
+ void testConnectsToPriorityAfterInitialBackupFails();
private:
diff --git a/activemq-cpp/src/test/testRegistry.cpp b/activemq-cpp/src/test/testRegistry.cpp
index 4936891..dcea2a7 100644
--- a/activemq-cpp/src/test/testRegistry.cpp
+++ b/activemq-cpp/src/test/testRegistry.cpp
@@ -121,15 +121,6 @@
#include <activemq/transport/inactivity/InactivityMonitorTest.h>
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::transport::inactivity::InactivityMonitorTest );
-//#include <activemq/transport/discovery/DiscoveryAgentRegistryTest.h>
-//CPPUNIT_TEST_SUITE_REGISTRATION( activemq::transport::discovery::DiscoveryAgentRegistryTest );
-//#include <activemq/transport/discovery/DiscoveryTransportFactoryTest.h>
-//CPPUNIT_TEST_SUITE_REGISTRATION( activemq::transport::discovery::DiscoveryTransportFactoryTest );
-//#include <activemq/transport/discovery/AbstractDiscoveryAgentTest.h>
-//CPPUNIT_TEST_SUITE_REGISTRATION( activemq::transport::discovery::AbstractDiscoveryAgentTest );
-//#include <activemq/transport/discovery/AbstractDiscoveryAgentFactoryTest.h>
-//CPPUNIT_TEST_SUITE_REGISTRATION( activemq::transport::discovery::AbstractDiscoveryAgentFactoryTest );
-
#include <activemq/transport/TransportRegistryTest.h>
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::transport::TransportRegistryTest );
#include <activemq/transport/IOTransportTest.h>