QPID-6192: [Java Broker] Add supporting test case guarding case when failover occurs when busy
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1635076 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
index 3331a8a..a224247 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/FailoverBehaviourTest.java
@@ -57,6 +57,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Test suite to test all possible failover corner cases
@@ -713,6 +714,106 @@
browserCloseWhileFailoverImpl(Session.AUTO_ACKNOWLEDGE);
}
+ public void testKillBrokerFailoverWhilstPublishingInFlight() throws Exception
+ {
+ doFailoverWhilstPublishingInFlight(true);
+ }
+
+ public void testStopBrokerFailoverWhilstPublishingInFlight() throws Exception
+ {
+ doFailoverWhilstPublishingInFlight(false);
+ }
+
+ private void doFailoverWhilstPublishingInFlight(boolean hardKill) throws JMSException, InterruptedException
+ {
+ init(Session.SESSION_TRANSACTED, false);
+
+ final int numberOfMessages = 200;
+
+ final CountDownLatch halfWay = new CountDownLatch(1);
+ final CountDownLatch allDone = new CountDownLatch(1);
+ final AtomicReference<Exception> exception = new AtomicReference<>();
+
+ Runnable producerRunnable = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ Thread.currentThread().setName("ProducingThread");
+
+ try
+ {
+ for(int i=0; i< numberOfMessages; i++)
+ {
+ boolean success = false;
+ while(!success)
+ {
+ try
+ {
+ Message message = _producerSession.createMessage();
+ message.setIntProperty("msgNum", i);
+ _producer.send(message);
+ _producerSession.commit();
+ success = true;
+ }
+ catch (javax.jms.IllegalStateException e)
+ {
+ // fail - failover should not leave a JMS object in an illegal state
+ throw e;
+ }
+ catch (JMSException e)
+ {
+ // OK we will be failing over
+ _logger.debug("Got JMS exception, probably just failing over", e);
+ }
+ }
+
+ if (i > numberOfMessages / 2 && halfWay.getCount() == 1)
+ {
+ halfWay.countDown();
+ }
+ }
+
+ allDone.countDown();
+ }
+ catch (Exception e)
+ {
+ exception.set(e);
+ }
+ }
+ };
+
+ Thread producerThread = new Thread(producerRunnable);
+ producerThread.start();
+
+ assertTrue("Didn't get to half way within timeout", halfWay.await(30000, TimeUnit.MILLISECONDS));
+
+ if (hardKill)
+ {
+ _logger.debug("Killing the Broker");
+ killBroker(getFailingPort());
+ }
+ else
+ {
+ _logger.debug("Stopping the Broker");
+ stopBroker(getFailingPort());
+ }
+
+ if (exception.get() != null)
+ {
+ _logger.error("Unexpected exception from producer thread", exception.get());
+ }
+ assertNull("Producer thread should not have got an exception", exception.get());
+
+ assertTrue("All producing work was not completed", allDone.await(30000, TimeUnit.MILLISECONDS));
+
+ producerThread.join(30000);
+
+ // Extra work to prove the session still okay
+ assertNotNull(_producerSession.createTemporaryQueue());
+ }
+
+
private Message publishWhileFailingOver(int autoAcknowledge) throws JMSException, InterruptedException
{
setDelayedFailoverPolicy(5);