blob: 13c78c1e145799627413ef69c18c49a3b772b58b [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.test.unit.ack;
import java.util.concurrent.CountDownLatch;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
/**
* This is a quick manual test to validate acking after failover with a
* transacted session.
*
* Start an external broker then run this test. Std Err will print.
* Sent Message: 1
* Received Message: 1
*
* You can then restart the external broker, which will cause failover, which
* will be complete when the following appears.
*
* Failover Complete
*
* A second message send/receive cycle is then done to validate that the
* connection/session are still working.
*
*/
public class QuickAcking extends QpidBrokerTestCase implements ConnectionListener
{
protected AMQConnection _connection;
protected Queue _queue;
protected Session _session;
protected MessageConsumer _consumer;
private CountDownLatch _failedOver;
private static final String INDEX = "INDEX";
private int _count = 0;
public void setUp()
{
// Prevent broker startup. Broker must be run manually.
}
public void test() throws Exception
{
_failedOver = new CountDownLatch(1);
_connection = new AMQConnection("amqp://guest:guest@client/test?brokerlist='localhost?retries='20'&connectdelay='2000''");
_session = _connection.createSession(true, Session.SESSION_TRANSACTED);
_queue = _session.createQueue("QAtest");
_consumer = _session.createConsumer(_queue);
_connection.setConnectionListener(this);
_connection.start();
sendAndReceive();
_failedOver.await();
sendAndReceive();
}
private void sendAndReceive()
throws Exception
{
sendMessage();
Message message = _consumer.receive();
if (message.getIntProperty(INDEX) != _count)
{
throw new Exception("Incorrect message recieved:" + _count);
}
if (_session.getTransacted())
{
_session.commit();
}
System.err.println("Recevied Message:" + _count);
}
private void sendMessage() throws JMSException
{
MessageProducer producer = _session.createProducer(_queue);
Message message = _session.createMessage();
_count++;
message.setIntProperty(INDEX, _count);
producer.send(message);
if (_session.getTransacted())
{
_session.commit();
}
producer.close();
System.err.println("Sent Message:" + _count);
}
public void bytesSent(long count)
{
//To change body of implemented methods use File | Settings | File Templates.
}
public void bytesReceived(long count)
{
//To change body of implemented methods use File | Settings | File Templates.
}
public boolean preFailover(boolean redirect)
{
return true;
}
public boolean preResubscribe()
{
return true;
}
public void failoverComplete()
{
System.err.println("Failover Complete");
_failedOver.countDown();
}
}