| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server.failover; |
| |
| import org.apache.qpid.client.AMQConnection; |
| import org.apache.qpid.client.protocol.AMQProtocolSession; |
| import org.apache.qpid.jms.ConnectionListener; |
| import org.apache.qpid.test.utils.FailoverBaseCase; |
| |
| import javax.jms.Destination; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageProducer; |
| import javax.jms.Queue; |
| import javax.jms.Session; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| |
| /** |
| * Test case based on user reported error. |
| * |
| * Summary: |
| * A user has reported message loss from their application. On bouncing of |
| * the broker the 'lost' messages are delivered to the broker. |
| * |
| * Note: |
| * The client was using Spring so that may influence the situation. |
| * |
| * Issue: |
| * The log files show 7 instances of the following which result in 7 |
| * missing messages. |
| * |
| * The client log files show: |
| * |
| * The broker log file show: |
| * |
| * |
| * 7 missing messages have delivery tags 5-11. Which says that they are |
| * sequentially the next message from the broker. |
| * |
| * The only way for the 'without a handler' log to occur is if the consumer |
| * has been removed from the look up table of the dispatcher. |
| * And the only way for the 'null message' log to occur on the broker is is |
| * if the message does not exist in the unacked-map |
| * |
| * The consumer is only removed from the list during session |
| * closure and failover. |
| * |
| * If the session was closed then the broker would requeue the unacked |
| * messages so the potential exists to have an empty map but the broker |
| * will not send a message out after the unacked map has been cleared. |
| * |
| * When failover occurs the _consumer map is cleared and the consumers are |
| * resubscribed. This is down without first stopping any existing |
| * dispatcher so there exists the potential to receive a message after |
| * the _consumer map has been cleared which is how the 'without a handler' |
| * log statement occurs. |
| * |
| * Scenario: |
| * |
| * Looking over logs the sequence that best fits the events is as follows: |
| * - Something causes Mina to be delayed causing the WriteTimoutException. |
| * - This exception is recevied by AMQProtocolHandler#exceptionCaught |
| * - As the WriteTimeoutException is an IOException this will cause |
| * sessionClosed to be called to start failover. |
| * + This is potentially the issues here. All IOExceptions are treated |
| * as connection failure events. |
| * - Failover Runs |
| * + Failover assumes that the previous connection has been closed. |
| * + Failover binds the existing objects (AMQConnection/Session) to the |
| * new connection objects. |
| * - Everything is reported as being successfully failed over. |
| * However, what is neglected is that the original connection has not |
| * been closed. |
| * + So what occurs is that the broker sends a message to the consumer on |
| * the original connection, as it was not notified of the client |
| * failing over. |
| * As the client failover reuses the original AMQSession and Dispatcher |
| * the new messages the broker sends to the old consumer arrives at the |
| * client and is processed by the same AMQSession and Dispatcher. |
| * However, as the failover process cleared the _consumer map and |
| * resubscribe the consumers the Dispatcher does not recognise the |
| * delivery tag and so logs the 'without a handler' message. |
| * - The Dispatcher then attempts to reject the message, however, |
| * + The AMQSession/Dispatcher pair have been swapped to using a new Mina |
| * ProtocolSession as part of the failover process so the reject is |
| * sent down the second connection. The broker receives the Reject |
| * request but as the Message was sent on a different connection the |
| * unacknowledgemap is empty and a 'message is null' log message |
| * produced. |
| * |
| * Test Strategy: |
| * |
| * It should be easy to demonstrate if we can send an IOException to |
| * AMQProtocolHandler#exceptionCaught and then try sending a message. |
| * |
| * The current unknowns here are the type of consumers that are in use. |
| * If it was an exclusive queue(Durable Subscription) then why did the |
| * resubscribe not fail. |
| * |
| * If it was not exclusive then why did the messages not round robin? |
| */ |
| public class MessageDisappearWithIOExceptionTest extends FailoverBaseCase implements ConnectionListener |
| { |
| private CountDownLatch _failoverOccured = new CountDownLatch(1); |
| AMQConnection _connection; |
| Session _session; |
| Queue _queue; |
| MessageConsumer _consumer; |
| |
| public void setUp() throws Exception |
| { |
| super.setUp(); |
| stopBroker(getFailingPort()); |
| |
| } |
| |
| /** |
| * Test Summary: |
| * |
| * Create a queue consumer and send 10 messages to the broker. |
| * |
| * Consume the first message. |
| * This will pull the rest into the prefetch |
| * |
| * Send an IOException to the MinaProtocolHandler. |
| * |
| * This will force failover to occur. |
| * |
| * 9 messages would normally be expected but it is expected that none will |
| * arrive. As they are still in the prefetch of the first session. |
| * |
| * To free the messages we need to close all connections. |
| * - Simply doing connection.close() and retesting will not be enough as |
| * the original connection's IO layer will still exist and is nolonger |
| * connected to the connection object as a result of failover. |
| * |
| * - Test will need to retain a reference to the original connection IO so |
| * that it can be closed releasing the messages to validate that the |
| * messages have indeed been 'lost' on that sesssion. |
| */ |
| public void test() throws Exception |
| { |
| initialiseConnection(); |
| |
| // Create Producer |
| // Send 10 messages |
| List<Message> messages = sendNumberedBytesMessage(_session, _queue, 10); |
| |
| // Consume first messasge |
| Message received = _consumer.receive(2000); |
| |
| // Verify received messages |
| assertNotNull("First message not received.", received); |
| assertEquals("Incorrect message Received", |
| messages.remove(0).getIntProperty("count"), |
| received.getIntProperty("count")); |
| |
| // When the Exception is received by the underlying IO layer it will |
| // initiate failover. The first step of which is to ensure that the |
| // existing conection is closed. So in this situation the connection |
| // will be flushed casuing the above ACK to be sent to the broker. |
| // |
| // That said: |
| // when the socket close is detected on the server it will rise up the |
| // Mina filter chain and interrupt processing. |
| // this has been raised as QPID-2138 |
| _session.createConsumer(_session.createTemporaryQueue()).close(); |
| |
| //Retain IO Layer |
| AMQProtocolSession protocolSession = _connection.getProtocolHandler().getProtocolSession(); |
| |
| // Send IO Exception - causing failover |
| _connection.getProtocolHandler(). |
| exception(new IOException("IOException to cause failover.")); |
| |
| // Verify Failover occured through ConnectionListener |
| assertTrue("Failover did not occur", |
| _failoverOccured.await(4000, TimeUnit.MILLISECONDS)); |
| |
| /***********************************/ |
| // This verifies that the bug has been resolved |
| |
| // Attempt to consume again. Expect 9 messages |
| for (int count = 1; count < 10; count++) |
| { |
| received = _consumer.receive(2000); |
| assertNotNull("Expected message not received:" + count, received); |
| assertEquals(messages.remove(0).getIntProperty("count"), |
| received.getIntProperty("count")); |
| } |
| |
| //Verify there are no more messages |
| received = _consumer.receive(1000); |
| assertNull("Message receieved when there should be none:" + received, |
| received); |
| |
| // /***********************************/ |
| // // This verifies that the bug exists |
| // |
| // // Attempt to consume remaining 9 messages.. Expecting NONE. |
| // // receiving just one message should fail so no need to fail 9 times |
| // received = _consumer.receive(1000); |
| // assertNull("Message receieved when it should be null:" + received, received); |
| // |
| //// //Close the Connection which you would assume would free the messages |
| //// _connection.close(); |
| //// |
| //// // Reconnect |
| //// initialiseConnection(); |
| //// |
| //// // We should still be unable to receive messages |
| //// received = _consumer.receive(1000); |
| //// assertNull("Message receieved when it should be null:" + received, received); |
| //// |
| //// _connection.close(); |
| // |
| // // Close original IO layer. Expecting messages to be released |
| // protocolSession.closeProtocolSession(); |
| // |
| // // Reconnect and all should be good. |
| //// initialiseConnection(); |
| // |
| // // Attempt to consume again. Expect 9 messages |
| // for (int count = 1; count < 10; count++) |
| // { |
| // received = _consumer.receive(2000); |
| // assertNotNull("Expected message not received:" + count, received); |
| // assertEquals(messages.remove(0).getIntProperty("count"), |
| // received.getIntProperty("count")); |
| // } |
| // |
| // //Verify there are no more messages |
| // received = _consumer.receive(1000); |
| // assertNull("Message receieved when there should be none:" + received, |
| // received); |
| } |
| |
| private void initialiseConnection() |
| throws Exception |
| { |
| //Create Connection using the default connection URL. i.e. not the Failover URL that would be used by default |
| _connection = (AMQConnection) getConnectionFactory("default").createConnection("guest", "guest"); |
| // The default connection does not have any retries configured so |
| // Allow this connection to retry so that we can block on the failover. |
| // The alternative would be to use the getConnection() default. However, |
| // this would add additional complexity in the logging as a second |
| // broker is defined in that url. We do not need it for this test. |
| _connection.getFailoverPolicy().getCurrentMethod().setRetries(1); |
| _connection.setConnectionListener(this); |
| |
| _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| _queue = _session.createQueue(getTestQueueName()); |
| |
| // Create Consumer |
| _consumer = _session.createConsumer(_queue); |
| |
| //Start connection |
| _connection.start(); |
| } |
| |
| /** QpidTestCase back port to this release */ |
| |
| // modified from QTC as sendMessage is not testable. |
| // - should be renamed sendBlankBytesMessage |
| // - should be renamed sendNumberedBytesMessage |
| public List<Message> sendNumberedBytesMessage(Session session, Destination destination, |
| int count) throws Exception |
| { |
| List<Message> messages = new ArrayList<Message>(count); |
| |
| MessageProducer producer = session.createProducer(destination); |
| |
| for (int i = 0; i < count; i++) |
| { |
| Message next = session.createMessage(); |
| |
| next.setIntProperty("count", i); |
| |
| producer.send(next); |
| |
| messages.add(next); |
| } |
| |
| producer.close(); |
| return messages; |
| } |
| |
| public void bytesSent(long count) |
| { |
| //To change body of implemented methods use File | Settings | File Templates. |
| } |
| |
| public void bytesReceived(long count) |
| { |
| } |
| |
| public boolean preFailover(boolean redirect) |
| { |
| //Allow failover to occur |
| return true; |
| } |
| |
| public boolean preResubscribe() |
| { |
| //Allow failover to occur |
| return true; |
| } |
| |
| public void failoverComplete() |
| { |
| _failoverOccured.countDown(); |
| } |
| } |