| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.test.unit.publish; |
| |
| import org.apache.qpid.client.AMQConnection; |
| import org.apache.qpid.client.AMQDestination; |
| import org.apache.qpid.client.AMQSession; |
| import org.apache.qpid.jms.ConnectionListener; |
| import org.apache.qpid.test.utils.FailoverBaseCase; |
| |
| import javax.jms.Connection; |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageListener; |
| import javax.jms.MessageProducer; |
| import javax.jms.Queue; |
| import javax.jms.Session; |
| import javax.jms.TransactionRolledBackException; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| /** |
| * QPID-1816 : Whilst testing Acknoledgement after failover this completes testing |
| * of the client after failover. When we have a dirty session we should receive |
| * an error if we attempt to publish. This test ensures that both in the synchronous |
| * and asynchronous message delivery paths we receive the expected exceptions at |
| * the expected time. |
| */ |
| public class DirtyTransactedPublishTest extends FailoverBaseCase implements ConnectionListener |
| { |
| protected CountDownLatch _failoverCompleted = new CountDownLatch(1); |
| |
| protected int NUM_MESSAGES; |
| protected Connection _connection; |
| protected Queue _queue; |
| protected Session _consumerSession; |
| protected MessageConsumer _consumer; |
| protected MessageProducer _producer; |
| |
| private static final String MSG = "MSG"; |
| private static final String SEND_FROM_ON_MESSAGE_TEXT = "sendFromOnMessage"; |
| protected CountDownLatch _receviedAll; |
| protected AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null); |
| |
| @Override |
| protected void setUp() throws Exception |
| { |
| super.setUp(); |
| NUM_MESSAGES = 10; |
| |
| _queue = getTestQueue(); |
| |
| //Create Producer put some messages on the queue |
| _connection = getConnection(); |
| } |
| |
| /** |
| * Initialise the test variables |
| * @param transacted is this a transacted test |
| * @param mode if not trasacted then what ack mode to use |
| * @throws Exception if there is a setup issue. |
| */ |
| protected void init(boolean transacted, int mode) throws Exception |
| { |
| _consumerSession = _connection.createSession(transacted, mode); |
| _consumer = _consumerSession.createConsumer(_queue); |
| _producer = _consumerSession.createProducer(_queue); |
| |
| // These should all end up being prefetched by session |
| sendMessage(_consumerSession, _queue, 1); |
| |
| assertEquals("Wrong number of messages on queue", 1, |
| ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue)); |
| } |
| |
| /** |
| * If a transacted session has failed over whilst it has uncommitted sent |
| * data then we need to throw a TransactedRolledbackException on commit() |
| * |
| * The alternative would be to maintain a replay buffer so that the message |
| * could be resent. This is not currently implemented |
| * |
| * @throws Exception if something goes wrong. |
| */ |
| public void testDirtySendingSynchronousTransacted() throws Exception |
| { |
| Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED); |
| |
| // Ensure we get failover notifications |
| ((AMQConnection) _connection).setConnectionListener(this); |
| |
| MessageProducer producer = producerSession.createProducer(_queue); |
| |
| // Create and send message 0 |
| Message msg = producerSession.createMessage(); |
| msg.setIntProperty(INDEX, 0); |
| producer.send(msg); |
| |
| // DON'T commit message .. fail connection |
| |
| failBroker(getFailingPort()); |
| |
| // Ensure destination exists for sending |
| producerSession.createConsumer(_queue).close(); |
| |
| // Send the next message |
| msg.setIntProperty(INDEX, 1); |
| try |
| { |
| producer.send(msg); |
| fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException."); |
| } |
| catch (JMSException jmse) |
| { |
| assertEquals("Early warning of dirty session not correct", |
| "Failover has occurred and session is dirty so unable to send.", jmse.getMessage()); |
| } |
| |
| // Ignore that the session is dirty and attempt to commit to validate the |
| // exception is thrown. AND that the above failure notification did NOT |
| // clean up the session. |
| |
| try |
| { |
| producerSession.commit(); |
| fail("Session is dirty we should get an TransactionRolledBackException"); |
| } |
| catch (TransactionRolledBackException trbe) |
| { |
| // Normal path. |
| } |
| |
| // Resending of messages should now work ok as the commit was forcilbly rolledback |
| msg.setIntProperty(INDEX, 0); |
| producer.send(msg); |
| msg.setIntProperty(INDEX, 1); |
| producer.send(msg); |
| |
| producerSession.commit(); |
| |
| assertEquals("Wrong number of messages on queue", 2, |
| ((AMQSession) producerSession).getQueueDepth((AMQDestination) _queue)); |
| } |
| |
| /** |
| * If a transacted session has failed over whilst it has uncommitted sent |
| * data then we need to throw a TransactedRolledbackException on commit() |
| * |
| * The alternative would be to maintain a replay buffer so that the message |
| * could be resent. This is not currently implemented |
| * |
| * @throws Exception if something goes wrong. |
| */ |
| public void testDirtySendingOnMessageTransacted() throws Exception |
| { |
| NUM_MESSAGES = 1; |
| _receviedAll = new CountDownLatch(NUM_MESSAGES); |
| ((AMQConnection) _connection).setConnectionListener(this); |
| |
| init(true, Session.SESSION_TRANSACTED); |
| |
| _consumer.setMessageListener(new MessageListener() |
| { |
| |
| public void onMessage(Message message) |
| { |
| try |
| { |
| // Create and send message 0 |
| Message msg = _consumerSession.createMessage(); |
| msg.setIntProperty(INDEX, 0); |
| _producer.send(msg); |
| |
| // DON'T commit message .. fail connection |
| |
| failBroker(getFailingPort()); |
| |
| // rep |
| repopulateBroker(); |
| |
| // Destination will exist as this failBroker will populate |
| // the queue with 1 message |
| |
| // Send the next message |
| msg.setIntProperty(INDEX, 1); |
| try |
| { |
| _producer.send(msg); |
| fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException."); |
| } |
| catch (JMSException jmse) |
| { |
| assertEquals("Early warning of dirty session not correct", |
| "Failover has occurred and session is dirty so unable to send.", jmse.getMessage()); |
| } |
| |
| // Ignore that the session is dirty and attempt to commit to validate the |
| // exception is thrown. AND that the above failure notification did NOT |
| // clean up the session. |
| |
| try |
| { |
| _consumerSession.commit(); |
| fail("Session is dirty we should get an TransactionRolledBackException"); |
| } |
| catch (TransactionRolledBackException trbe) |
| { |
| // Normal path. |
| } |
| |
| // Resend messages |
| msg.setIntProperty(INDEX, 0); |
| msg.setStringProperty(MSG, SEND_FROM_ON_MESSAGE_TEXT); |
| _producer.send(msg); |
| msg.setIntProperty(INDEX, 1); |
| msg.setStringProperty(MSG, SEND_FROM_ON_MESSAGE_TEXT); |
| _producer.send(msg); |
| |
| _consumerSession.commit(); |
| |
| // Stop this consumer .. can't do _consumer.stop == DEADLOCK |
| // this doesn't seem to stop dispatcher running |
| _connection.stop(); |
| |
| // Signal that the onMessage send part of test is complete |
| // main thread can validate that messages are correct |
| _receviedAll.countDown(); |
| |
| } |
| catch (Exception e) |
| { |
| fail(e); |
| } |
| |
| } |
| |
| }); |
| |
| _connection.start(); |
| |
| if (!_receviedAll.await(10000L, TimeUnit.MILLISECONDS)) |
| { |
| // Check to see if we ended due to an exception in the onMessage handler |
| Exception cause = _causeOfFailure.get(); |
| if (cause != null) |
| { |
| cause.printStackTrace(); |
| fail(cause.getMessage()); |
| } |
| else |
| { |
| fail("All messages not received:" + _receviedAll.getCount() + "/" + NUM_MESSAGES); |
| } |
| } |
| |
| // Check to see if we ended due to an exception in the onMessage handler |
| Exception cause = _causeOfFailure.get(); |
| if (cause != null) |
| { |
| cause.printStackTrace(); |
| fail(cause.getMessage()); |
| } |
| |
| _consumer.close(); |
| _consumerSession.close(); |
| |
| _consumerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| |
| _connection.start(); |
| |
| // Validate that we could send the messages as expected. |
| assertEquals("Wrong number of messages on queue", 3, |
| ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue)); |
| |
| MessageConsumer consumer = _consumerSession.createConsumer(_queue); |
| |
| //Validate the message sent to setup the failed over broker. |
| Message message = consumer.receive(1000); |
| assertNotNull("Message " + 0 + " not received.", message); |
| assertEquals("Incorrect message received", 0, message.getIntProperty(INDEX)); |
| |
| // Validate the two messages sent from within the onMessage |
| for (int index = 0; index <= 1; index++) |
| { |
| message = consumer.receive(1000); |
| assertNotNull("Message " + index + " not received.", message); |
| assertEquals("Incorrect message received", index, message.getIntProperty(INDEX)); |
| assertEquals("Incorrect message text for message:" + index, SEND_FROM_ON_MESSAGE_TEXT, message.getStringProperty(MSG)); |
| } |
| |
| assertNull("Extra message received.", consumer.receiveNoWait()); |
| |
| _consumerSession.close(); |
| |
| assertEquals("Wrong number of messages on queue", 0, |
| ((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue)); |
| } |
| |
| private void repopulateBroker() throws Exception |
| { |
| // Repopulate this new broker so we can test what happends after failover |
| |
| //Get the connection to the first (main port) broker. |
| Connection connection = getConnection(); |
| // Use a transaction to send messages so we can be sure they arrive. |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| // ensure destination is created. |
| session.createConsumer(_queue).close(); |
| |
| sendMessage(session, _queue, NUM_MESSAGES); |
| |
| assertEquals("Wrong number of messages on queue", NUM_MESSAGES, |
| ((AMQSession) session).getQueueDepth((AMQDestination) _queue)); |
| |
| connection.close(); |
| } |
| |
| // AMQConnectionListener Interface.. used so we can validate that we |
| // actually failed over. |
| |
| public void bytesSent(long count) |
| { |
| } |
| |
| public void bytesReceived(long count) |
| { |
| } |
| |
| public boolean preFailover(boolean redirect) |
| { |
| //Allow failover |
| return true; |
| } |
| |
| public boolean preResubscribe() |
| { |
| //Allow failover |
| return true; |
| } |
| |
| public void failoverComplete() |
| { |
| _failoverCompleted.countDown(); |
| } |
| |
| /** |
| * Override so we can block until failover has completd |
| * |
| * @param port int the port of the broker to fail. |
| */ |
| @Override |
| public void failBroker(int port) |
| { |
| super.failBroker(port); |
| |
| try |
| { |
| if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS)) |
| { |
| fail("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME); |
| } |
| } |
| catch (InterruptedException e) |
| { |
| fail("Failover was interuppted"); |
| } |
| } |
| |
| /** |
| * Pass the given exception back to the waiting thread to fail the test run. |
| * |
| * @param e The exception that is causing the test to fail. |
| */ |
| protected void fail(Exception e) |
| { |
| _causeOfFailure.set(e); |
| // End the test. |
| while (_receviedAll.getCount() != 0) |
| { |
| _receviedAll.countDown(); |
| } |
| } |
| } |