| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.systest; |
| |
| import org.apache.commons.configuration.ConfigurationException; |
| |
| import org.apache.qpid.AMQException; |
| import org.apache.qpid.jms.ConnectionListener; |
| import org.apache.qpid.protocol.AMQConstant; |
| import org.apache.qpid.test.utils.QpidBrokerTestCase; |
| |
| import javax.jms.Connection; |
| import javax.jms.Destination; |
| import javax.jms.ExceptionListener; |
| import javax.jms.JMSException; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageProducer; |
| import javax.jms.Session; |
| import javax.jms.Topic; |
| import javax.naming.NamingException; |
| import java.io.IOException; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| |
| public class TestingBaseCase extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener |
| { |
| |
| private Topic _destination; |
| protected CountDownLatch _disconnectionLatch = new CountDownLatch(1); |
| protected int MAX_QUEUE_MESSAGE_COUNT; |
| protected int MESSAGE_SIZE = DEFAULT_MESSAGE_SIZE; |
| |
| private Thread _publisher; |
| protected static final long DISCONNECTION_WAIT = 5; |
| protected Exception _publisherError = null; |
| protected JMSException _connectionException = null; |
| private static final long JOIN_WAIT = 5000; |
| |
| @Override |
| public void setUp() throws Exception |
| { |
| |
| setConfigurationProperty("virtualhosts.virtualhost." |
| + getConnectionURL().getVirtualHost().substring(1) + |
| ".slow-consumer-detection.delay", "1"); |
| |
| setConfigurationProperty("virtualhosts.virtualhost." |
| + getConnectionURL().getVirtualHost().substring(1) + |
| ".slow-consumer-detection.timeunit", "SECONDS"); |
| |
| } |
| |
| |
| protected void setProperty(String property, String value) throws NamingException, IOException, ConfigurationException |
| { |
| setConfigurationProperty("virtualhosts.virtualhost." + |
| getConnectionURL().getVirtualHost().substring(1) + |
| property, value); |
| } |
| |
| |
| /** |
| * Create and start an asynchrounous publisher that will send MAX_QUEUE_MESSAGE_COUNT |
| * messages to the provided destination. Messages are sent in a new connection |
| * on a transaction. Any error is captured and the test is signalled to exit. |
| * |
| * @param destination |
| */ |
| private void startPublisher(final Destination destination) |
| { |
| _publisher = new Thread(new Runnable() |
| { |
| |
| public void run() |
| { |
| try |
| { |
| Connection connection = getConnection(); |
| Session session = connection.createSession(true, Session.SESSION_TRANSACTED); |
| |
| MessageProducer publisher = session.createProducer(destination); |
| |
| for (int count = 0; count < MAX_QUEUE_MESSAGE_COUNT; count++) |
| { |
| publisher.send(createNextMessage(session, count)); |
| session.commit(); |
| } |
| } |
| catch (Exception e) |
| { |
| _publisherError = e; |
| _disconnectionLatch.countDown(); |
| } |
| } |
| }); |
| |
| _publisher.start(); |
| } |
| |
| |
| |
| /** |
| * Perform the Main test of a topic Consumer with the given AckMode. |
| * |
| * Test creates a new connection and sets up the connection to prevent |
| * failover |
| * |
| * A new consumer is connected and started so that it will prefetch msgs. |
| * |
| * An asynchrounous publisher is started to fill the broker with messages. |
| * |
| * We then wait to be notified of the disconnection via the ExceptionListener |
| * |
| * 0-10 does not have the same notification paths but sync() apparently should |
| * give us the exception, currently it doesn't, so the test is excluded from 0-10 |
| * |
| * We should ensure that this test has the same path for all protocol versions. |
| * |
| * Clients should not have to modify their code based on the protocol in use. |
| * |
| * @param ackMode @see javax.jms.Session |
| * |
| * @throws Exception |
| */ |
| protected void topicConsumer(int ackMode, boolean durable) throws Exception |
| { |
| Connection connection = getConnection(); |
| |
| connection.setExceptionListener(this); |
| |
| Session session = connection.createSession(ackMode == Session.SESSION_TRANSACTED, ackMode); |
| |
| _destination = session.createTopic(getName()); |
| |
| MessageConsumer consumer; |
| |
| if (durable) |
| { |
| consumer = session.createDurableSubscriber(_destination, getTestQueueName()); |
| } |
| else |
| { |
| consumer = session.createConsumer(_destination); |
| } |
| |
| connection.start(); |
| |
| // Start the consumer pre-fetching |
| // Don't care about response as we will fill the broker up with messages |
| // after this point and ensure that the client is disconnected at the |
| // right point. |
| consumer.receiveNoWait(); |
| startPublisher(_destination); |
| |
| boolean disconnected = _disconnectionLatch.await(DISCONNECTION_WAIT, TimeUnit.SECONDS); |
| |
| assertTrue("Client was not disconnected", disconnected); |
| assertTrue("Client was not disconnected.", _connectionException != null); |
| |
| Exception linked = _connectionException.getLinkedException(); |
| |
| _publisher.join(JOIN_WAIT); |
| |
| assertFalse("Publisher still running", _publisher.isAlive()); |
| |
| //Validate publishing occurred ok |
| if (_publisherError != null) |
| { |
| throw _publisherError; |
| } |
| |
| // NOTE these exceptions will need to be modeled so that they are not |
| // 0-8 specific. e.g. JMSSessionClosedException |
| |
| assertNotNull("No error received onException listener.", _connectionException); |
| |
| assertNotNull("No linked exception set on:" + _connectionException.getMessage(), linked); |
| |
| assertTrue("Incorrect linked exception received.", linked instanceof AMQException); |
| |
| AMQException amqException = (AMQException) linked; |
| |
| assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, amqException.getErrorCode()); |
| } |
| |
| |
| // Exception Listener |
| |
| public void onException(JMSException e) |
| { |
| _connectionException = e; |
| |
| e.printStackTrace(); |
| |
| _disconnectionLatch.countDown(); |
| } |
| |
| /// Connection Listener |
| |
| public void bytesSent(long count) |
| { |
| } |
| |
| public void bytesReceived(long count) |
| { |
| } |
| |
| public boolean preFailover(boolean redirect) |
| { |
| // Prevent Failover |
| return false; |
| } |
| |
| public boolean preResubscribe() |
| { |
| return false; |
| } |
| |
| public void failoverComplete() |
| { |
| } |
| } |