blob: 3a1fb50725f6e41abf7a979bee33fbdf8c928f01 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.test.client.failover;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.AMQConnectionURL;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.test.utils.FailoverBaseCase;
import org.apache.log4j.Logger;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Queue;
import javax.naming.NamingException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class FailoverTest extends FailoverBaseCase implements ConnectionListener
{
private static final Logger _logger = Logger.getLogger(FailoverTest.class);
private static final String QUEUE = "queue";
private static final int NUM_MESSAGES = 10;
private Connection connnection;
private Session producerSession;
private Queue queue;
private MessageProducer producer;
private Session consumerSession;
private MessageConsumer consumer;
private static int usedBrokers = 0;
private CountDownLatch failoverComplete;
private static final long DEFAULT_FAILOVER_TIME = 10000L;
private boolean CLUSTERED = Boolean.getBoolean("profile.clustered");
@Override
protected void setUp() throws Exception
{
super.setUp();
connnection = getConnection();
((AMQConnection) connnection).setConnectionListener(this);
connnection.start();
failoverComplete = new CountDownLatch(1);
}
private void init(boolean transacted, int mode) throws JMSException, NamingException
{
queue = (Queue) getInitialContext().lookup(QUEUE);
consumerSession = connnection.createSession(transacted, mode);
consumer = consumerSession.createConsumer(queue);
producerSession = connnection.createSession(transacted, mode);
producer = producerSession.createProducer(queue);
}
@Override
public void tearDown() throws Exception
{
try
{
connnection.close();
}
catch (Exception e)
{
}
super.tearDown();
}
private void consumeMessages(int toConsume, boolean transacted) throws JMSException
{
Message msg;
for (int i = 0; i < toConsume; i++)
{
msg = consumer.receive(1000);
assertNotNull("Message " + i + " was null!", msg);
assertEquals("message " + i, ((TextMessage) msg).getText());
}
if (transacted) {
consumerSession.commit();
}
}
private void sendMessages(int totalMessages, boolean transacted) throws JMSException
{
for (int i = 0; i < totalMessages; i++)
{
producer.send(producerSession.createTextMessage("message " + i));
}
if (transacted)
{
producerSession.commit();
}
}
public void testP2PFailover() throws Exception
{
testP2PFailover(NUM_MESSAGES, true, false);
}
public void testP2PFailoverWithMessagesLeft() throws Exception
{
testP2PFailover(NUM_MESSAGES, false, false);
}
public void testP2PFailoverTransacted() throws Exception
{
testP2PFailover(NUM_MESSAGES, true, false);
}
private void testP2PFailover(int totalMessages, boolean consumeAll, boolean transacted) throws JMSException, NamingException
{
Message msg = null;
init(transacted, Session.AUTO_ACKNOWLEDGE);
sendMessages(totalMessages, transacted);
// Consume some messages
int toConsume = totalMessages;
if (!consumeAll)
{
toConsume = totalMessages / 2;
}
consumeMessages(toConsume, transacted);
_logger.info("Failing over");
causeFailure(DEFAULT_FAILOVER_TIME);
if (!CLUSTERED)
{
msg = consumer.receive(500);
assertNull("Should not have received message from new broker!", msg);
}
// Check that messages still sent / received
sendMessages(totalMessages, transacted);
consumeMessages(totalMessages, transacted);
}
private void causeFailure(long delay)
{
failBroker();
_logger.info("Awaiting Failover completion");
try
{
if (!failoverComplete.await(delay, TimeUnit.MILLISECONDS))
{
fail("failover did not complete");
}
}
catch (InterruptedException e)
{
//evil ignore IE.
}
}
public void testClientAckFailover() throws Exception
{
init(false, Session.CLIENT_ACKNOWLEDGE);
sendMessages(1, false);
Message msg = consumer.receive();
assertNotNull("Expected msgs not received", msg);
causeFailure(DEFAULT_FAILOVER_TIME);
Exception failure = null;
try
{
msg.acknowledge();
}
catch (Exception e)
{
failure = e;
}
assertNotNull("Exception should be thrown", failure);
}
/**
* The client used to have a fixed timeout of 4 minutes after which failover would no longer work.
* Check that this code has not regressed
*
* @throws Exception if something unexpected occurs in the test.
*/
public void test4MinuteFailover() throws Exception
{
ConnectionURL connectionURL = getConnectionFactory().getConnectionURL();
int RETRIES = 4;
int DELAY = 60000;
//Set up a long delay on and large number of retries
BrokerDetails details = connectionURL.getBrokerDetails(1);
details.setProperty(BrokerDetails.OPTIONS_RETRY, String.valueOf(RETRIES));
details.setProperty(BrokerDetails.OPTIONS_CONNECT_DELAY, String.valueOf(DELAY));
connnection = new AMQConnection(connectionURL, null);
((AMQConnection) connnection).setConnectionListener(this);
//Start the connection
connnection.start();
long FAILOVER_DELAY = (RETRIES * DELAY);
// Use Nano seconds as it is more accurate for comparision.
long failTime = System.nanoTime() + FAILOVER_DELAY * 1000000;
//Fail the first broker
causeFailure(FAILOVER_DELAY + DEFAULT_FAILOVER_TIME);
//Reconnection should occur
assertTrue("Failover did not take long enough", System.nanoTime() > failTime);
}
public void bytesSent(long count)
{
}
public void bytesReceived(long count)
{
}
public boolean preFailover(boolean redirect)
{
return true;
}
public boolean preResubscribe()
{
return true;
}
public void failoverComplete()
{
failoverComplete.countDown();
}
}