| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.test.unit.ack; |
| |
| import org.apache.qpid.client.AMQConnection; |
| import org.apache.qpid.client.AMQQueue; |
| import org.apache.qpid.framing.AMQShortString; |
| import org.apache.qpid.jms.Session; |
| import org.apache.qpid.test.utils.QpidTestCase; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageListener; |
| import javax.jms.MessageProducer; |
| import javax.jms.Queue; |
| import javax.jms.TextMessage; |
| |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| public class RecoverTest extends QpidTestCase |
| { |
| private static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class); |
| |
| private Exception _error; |
| private AtomicInteger count; |
| |
| protected void setUp() throws Exception |
| { |
| super.setUp(); |
| _error = null; |
| count = new AtomicInteger(); |
| } |
| |
| protected void tearDown() throws Exception |
| { |
| super.tearDown(); |
| count = null; |
| } |
| |
| public void testRecoverResendsMsgs() throws Exception |
| { |
| AMQConnection con = (AMQConnection) getConnection("guest", "guest"); |
| |
| Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); |
| Queue queue = |
| new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"), |
| new AMQShortString("someQ"), false, true); |
| MessageConsumer consumer = consumerSession.createConsumer(queue); |
| // force synch to ensure the consumer has resulted in a bound queue |
| // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); |
| // This is the default now |
| |
| AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); |
| Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); |
| MessageProducer producer = producerSession.createProducer(queue); |
| |
| _logger.info("Sending four messages"); |
| producer.send(producerSession.createTextMessage("msg1")); |
| producer.send(producerSession.createTextMessage("msg2")); |
| producer.send(producerSession.createTextMessage("msg3")); |
| producer.send(producerSession.createTextMessage("msg4")); |
| |
| con2.close(); |
| |
| _logger.info("Starting connection"); |
| con.start(); |
| TextMessage tm = (TextMessage) consumer.receive(); |
| tm.acknowledge(); |
| _logger.info("Received and acknowledged first message"); |
| consumer.receive(); |
| consumer.receive(); |
| consumer.receive(); |
| _logger.info("Received all four messages. Calling recover with three outstanding messages"); |
| // no ack for last three messages so when I call recover I expect to get three messages back |
| consumerSession.recover(); |
| tm = (TextMessage) consumer.receive(3000); |
| assertEquals("msg2", tm.getText()); |
| |
| tm = (TextMessage) consumer.receive(3000); |
| assertEquals("msg3", tm.getText()); |
| |
| tm = (TextMessage) consumer.receive(3000); |
| assertEquals("msg4", tm.getText()); |
| |
| _logger.info("Received redelivery of three messages. Acknowledging last message"); |
| tm.acknowledge(); |
| |
| _logger.info("Calling acknowledge with no outstanding messages"); |
| // all acked so no messages to be delivered |
| consumerSession.recover(); |
| |
| tm = (TextMessage) consumer.receiveNoWait(); |
| assertNull(tm); |
| _logger.info("No messages redelivered as is expected"); |
| |
| con.close(); |
| } |
| |
| public void testRecoverResendsMsgsAckOnEarlier() throws Exception |
| { |
| AMQConnection con = (AMQConnection) getConnection("guest", "guest"); |
| |
| Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); |
| Queue queue = |
| new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"), |
| new AMQShortString("someQ"), false, true); |
| MessageConsumer consumer = consumerSession.createConsumer(queue); |
| // force synch to ensure the consumer has resulted in a bound queue |
| // ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); |
| // This is the default now |
| |
| AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); |
| Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); |
| MessageProducer producer = producerSession.createProducer(queue); |
| |
| _logger.info("Sending four messages"); |
| producer.send(producerSession.createTextMessage("msg1")); |
| producer.send(producerSession.createTextMessage("msg2")); |
| producer.send(producerSession.createTextMessage("msg3")); |
| producer.send(producerSession.createTextMessage("msg4")); |
| |
| con2.close(); |
| |
| _logger.info("Starting connection"); |
| con.start(); |
| TextMessage tm = (TextMessage) consumer.receive(); |
| consumer.receive(); |
| tm.acknowledge(); |
| _logger.info("Received 2 messages, acknowledge() first message, should acknowledge both"); |
| |
| consumer.receive(); |
| consumer.receive(); |
| _logger.info("Received all four messages. Calling recover with two outstanding messages"); |
| // no ack for last three messages so when I call recover I expect to get three messages back |
| consumerSession.recover(); |
| TextMessage tm3 = (TextMessage) consumer.receive(3000); |
| assertEquals("msg3", tm3.getText()); |
| |
| TextMessage tm4 = (TextMessage) consumer.receive(3000); |
| assertEquals("msg4", tm4.getText()); |
| |
| _logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message"); |
| ((org.apache.qpid.jms.Message) tm3).acknowledgeThis(); |
| |
| _logger.info("Calling recover"); |
| // all acked so no messages to be delivered |
| consumerSession.recover(); |
| |
| tm4 = (TextMessage) consumer.receive(3000); |
| assertEquals("msg4", tm4.getText()); |
| ((org.apache.qpid.jms.Message) tm4).acknowledgeThis(); |
| |
| _logger.info("Calling recover"); |
| // all acked so no messages to be delivered |
| consumerSession.recover(); |
| |
| tm = (TextMessage) consumer.receiveNoWait(); |
| assertNull(tm); |
| _logger.info("No messages redelivered as is expected"); |
| |
| con.close(); |
| } |
| |
| public void testAcknowledgePerConsumer() throws Exception |
| { |
| AMQConnection con = (AMQConnection) getConnection("guest", "guest"); |
| |
| Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); |
| Queue queue = |
| new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), |
| false, true); |
| Queue queue2 = |
| new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q2"), new AMQShortString("Q2"), |
| false, true); |
| MessageConsumer consumer = consumerSession.createConsumer(queue); |
| MessageConsumer consumer2 = consumerSession.createConsumer(queue2); |
| |
| AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); |
| Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); |
| MessageProducer producer = producerSession.createProducer(queue); |
| MessageProducer producer2 = producerSession.createProducer(queue2); |
| |
| producer.send(producerSession.createTextMessage("msg1")); |
| producer2.send(producerSession.createTextMessage("msg2")); |
| |
| con2.close(); |
| |
| _logger.info("Starting connection"); |
| con.start(); |
| |
| TextMessage tm2 = (TextMessage) consumer2.receive(2000); |
| assertNotNull(tm2); |
| assertEquals("msg2", tm2.getText()); |
| |
| tm2.acknowledge(); |
| |
| consumerSession.recover(); |
| |
| TextMessage tm1 = (TextMessage) consumer.receive(2000); |
| assertNotNull(tm1); |
| assertEquals("msg1", tm1.getText()); |
| |
| con.close(); |
| |
| } |
| |
| public void testRecoverInAutoAckListener() throws Exception |
| { |
| AMQConnection con = (AMQConnection) getConnection("guest", "guest"); |
| |
| final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = |
| new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), new AMQShortString("Q3"), |
| false, true); |
| MessageConsumer consumer = consumerSession.createConsumer(queue); |
| MessageProducer producer = consumerSession.createProducer(queue); |
| producer.send(consumerSession.createTextMessage("hello")); |
| |
| final Object lock = new Object(); |
| |
| consumer.setMessageListener(new MessageListener() |
| { |
| |
| public void onMessage(Message message) |
| { |
| try |
| { |
| count.incrementAndGet(); |
| if (count.get() == 1) |
| { |
| if (message.getJMSRedelivered()) |
| { |
| setError( |
| new Exception("Message marked as redilvered on what should be first delivery attempt")); |
| } |
| |
| consumerSession.recover(); |
| } |
| else if (count.get() == 2) |
| { |
| if (!message.getJMSRedelivered()) |
| { |
| setError( |
| new Exception( |
| "Message not marked as redilvered on what should be second delivery attempt")); |
| } |
| } |
| else |
| { |
| System.err.println(message); |
| fail("Message delivered too many times!: " + count); |
| } |
| } |
| catch (JMSException e) |
| { |
| _logger.error("Error recovering session: " + e, e); |
| setError(e); |
| } |
| |
| synchronized (lock) |
| { |
| lock.notify(); |
| } |
| } |
| }); |
| |
| con.start(); |
| |
| long waitTime = 30000L; |
| long waitUntilTime = System.currentTimeMillis() + waitTime; |
| |
| synchronized (lock) |
| { |
| while ((count.get() <= 1) && (waitTime > 0)) |
| { |
| lock.wait(waitTime); |
| if (count.get() <= 1) |
| { |
| waitTime = waitUntilTime - System.currentTimeMillis(); |
| } |
| } |
| } |
| |
| Thread.sleep(1000); |
| |
| if (count.get() != 2) |
| { |
| System.err.println("Count != 2 : " + count); |
| } |
| |
| assertTrue(count.get() == 2); |
| |
| con.close(); |
| |
| if (_error != null) |
| { |
| throw _error; |
| } |
| } |
| |
| private void setError(Exception e) |
| { |
| _error = e; |
| } |
| |
| public static junit.framework.Test suite() |
| { |
| return new junit.framework.TestSuite(RecoverTest.class); |
| } |
| } |