| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.test.unit.basic; |
| |
| import org.apache.qpid.client.AMQConnection; |
| import org.apache.qpid.client.AMQDestination; |
| import org.apache.qpid.client.AMQSession; |
| import org.apache.qpid.client.AMQTopic; |
| import org.apache.qpid.client.transport.TransportConnection; |
| import org.apache.qpid.exchange.ExchangeDefaults; |
| import org.apache.qpid.test.utils.QpidTestCase; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.MessageListener; |
| import javax.jms.MessageProducer; |
| import javax.jms.Session; |
| |
| public class MultipleConnectionTest extends QpidTestCase |
| { |
| private static final Logger _logger = LoggerFactory.getLogger(MultipleConnectionTest.class); |
| |
| public static final String _defaultBroker = "vm://:1"; |
| public String _connectionString = _defaultBroker; |
| |
| private class Receiver |
| { |
| private AMQConnection _connection; |
| private Session[] _sessions; |
| private MessageCounter[] _counters; |
| |
| Receiver(String broker, AMQDestination dest, int sessions) throws Exception |
| { |
| this((AMQConnection) getConnection("guest", "guest"), dest, sessions); |
| } |
| |
| Receiver(AMQConnection connection, AMQDestination dest, int sessions) throws Exception |
| { |
| _connection = connection; |
| _sessions = new AMQSession[sessions]; |
| _counters = new MessageCounter[sessions]; |
| for (int i = 0; i < sessions; i++) |
| { |
| _sessions[i] = _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); |
| _counters[i] = new MessageCounter(_sessions[i].toString()); |
| _sessions[i].createConsumer(dest).setMessageListener(_counters[i]); |
| } |
| |
| _connection.start(); |
| } |
| |
| void close() throws JMSException |
| { |
| _connection.close(); |
| } |
| } |
| |
| private class Publisher |
| { |
| private AMQConnection _connection; |
| private Session _session; |
| private MessageProducer _producer; |
| |
| Publisher(String broker, AMQDestination dest) throws Exception |
| { |
| this((AMQConnection) getConnection("guest", "guest"), dest); |
| } |
| |
| Publisher(AMQConnection connection, AMQDestination dest) throws Exception |
| { |
| _connection = connection; |
| _session = _connection.createSession(false, AMQSession.NO_ACKNOWLEDGE); |
| _producer = _session.createProducer(dest); |
| } |
| |
| void send(String msg) throws JMSException |
| { |
| _producer.send(_session.createTextMessage(msg)); |
| } |
| |
| void close() throws JMSException |
| { |
| _connection.close(); |
| } |
| } |
| |
| private static class MessageCounter implements MessageListener |
| { |
| private final String _name; |
| private int _count; |
| |
| MessageCounter(String name) |
| { |
| _name = name; |
| } |
| |
| public synchronized void onMessage(Message message) |
| { |
| _count++; |
| notify(); |
| } |
| |
| synchronized boolean waitUntil(int expected, long maxWait) throws InterruptedException |
| { |
| long start = System.currentTimeMillis(); |
| while (expected > _count) |
| { |
| long timeLeft = maxWait - timeSince(start); |
| if (timeLeft < 0) |
| { |
| break; |
| } |
| |
| wait(timeLeft); |
| } |
| |
| return expected <= _count; |
| } |
| |
| private long timeSince(long start) |
| { |
| return System.currentTimeMillis() - start; |
| } |
| |
| public synchronized String toString() |
| { |
| return _name + ": " + _count; |
| } |
| } |
| |
| protected void setUp() throws Exception |
| { |
| super.setUp(); |
| TransportConnection.createVMBroker(1); |
| } |
| |
| protected void tearDown() throws Exception |
| { |
| super.tearDown(); |
| TransportConnection.killAllVMBrokers(); |
| } |
| |
| private static void waitForCompletion(int expected, long wait, Receiver[] receivers) throws InterruptedException |
| { |
| for (int i = 0; i < receivers.length; i++) |
| { |
| waitForCompletion(expected, wait, receivers[i]._counters); |
| } |
| } |
| |
| private static void waitForCompletion(int expected, long wait, MessageCounter[] counters) throws InterruptedException |
| { |
| for (int i = 0; i < counters.length; i++) |
| { |
| if (!counters[i].waitUntil(expected, wait)) |
| { |
| throw new RuntimeException("Expected: " + expected + " got " + counters[i]); |
| } |
| } |
| } |
| |
| private static String randomize(String in) |
| { |
| return in + System.currentTimeMillis(); |
| } |
| |
| public static void main(String[] argv) throws Exception |
| { |
| String broker = (argv.length > 0) ? argv[0] : _defaultBroker; |
| |
| MultipleConnectionTest test = new MultipleConnectionTest(); |
| test._connectionString = broker; |
| test.test(); |
| } |
| |
| public void test() throws Exception |
| { |
| String broker = _connectionString; |
| int messages = 10; |
| |
| AMQTopic topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "amq.topic"); |
| |
| Receiver[] receivers = new Receiver[] { new Receiver(broker, topic, 2), new Receiver(broker, topic, 14) }; |
| |
| Publisher publisher = new Publisher(broker, topic); |
| for (int i = 0; i < messages; i++) |
| { |
| publisher.send("Message " + (i + 1)); |
| } |
| |
| try |
| { |
| waitForCompletion(messages, 5000, receivers); |
| _logger.info("All receivers received all expected messages"); |
| } |
| finally |
| { |
| publisher.close(); |
| for (int i = 0; i < receivers.length; i++) |
| { |
| receivers[i].close(); |
| } |
| } |
| } |
| |
| public static junit.framework.Test suite() |
| { |
| return new junit.framework.TestSuite(MultipleConnectionTest.class); |
| } |
| } |