| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.test.unit.client.forwardall; |
| |
| import org.apache.qpid.client.AMQConnection; |
| import org.apache.qpid.client.AMQQueue; |
| import org.apache.qpid.client.AMQSession; |
| import org.apache.qpid.framing.AMQShortString; |
| import org.apache.qpid.test.utils.QpidBrokerTestCase; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.MessageListener; |
| import javax.jms.MessageProducer; |
| |
| /** |
| * Declare a private temporary response queue, |
| * send a message to amq.direct with a well known routing key with the |
| * private response queue as the reply-to destination |
| * consume responses. |
| */ |
| public class Client implements MessageListener |
| { |
| private static final Logger _logger = LoggerFactory.getLogger(Client.class); |
| |
| private final AMQConnection _connection; |
| private final AMQSession _session; |
| private final int _expected; |
| private int _count; |
| private static QpidBrokerTestCase _qct; |
| |
| Client(String broker, int expected) throws Exception |
| { |
| this(connect(broker), expected); |
| } |
| |
| public static void setQTC(QpidBrokerTestCase qtc) |
| { |
| _qct = qtc; |
| } |
| Client(AMQConnection connection, int expected) throws Exception |
| { |
| _connection = connection; |
| _expected = expected; |
| _session = (AMQSession) _connection.createSession(true, AMQSession.NO_ACKNOWLEDGE); |
| AMQQueue response = |
| new AMQQueue(_connection.getDefaultQueueExchangeName(), new AMQShortString("ResponseQueue"), true); |
| _session.createConsumer(response).setMessageListener(this); |
| _connection.start(); |
| // AMQQueue service = new SpecialQueue(_connection, "ServiceQueue"); |
| AMQQueue service = (AMQQueue) _session.createQueue("ServiceQueue") ; |
| Message request = _session.createTextMessage("Request!"); |
| request.setJMSReplyTo(response); |
| MessageProducer prod = _session.createProducer(service); |
| prod.send(request); |
| _session.commit(); |
| } |
| |
| void shutdownWhenComplete() throws Exception |
| { |
| waitUntilComplete(); |
| _connection.close(); |
| } |
| |
| public synchronized void onMessage(Message response) |
| { |
| |
| _logger.info("Received " + (++_count) + " of " + _expected + " responses."); |
| if (_count == _expected) |
| { |
| |
| notifyAll(); |
| } |
| try |
| { |
| _session.commit(); |
| } |
| catch (JMSException e) |
| { |
| |
| } |
| |
| } |
| |
| synchronized void waitUntilComplete() throws Exception |
| { |
| |
| if (_count < _expected) |
| { |
| wait(60000); |
| } |
| |
| if (_count < _expected) |
| { |
| throw new Exception("Didn't receive all messages... got " + _count + " expected " + _expected); |
| } |
| } |
| |
| static AMQConnection connect(String broker) throws Exception |
| { |
| //return new AMQConnection(broker, "guest", "guest", "Client" + System.currentTimeMillis(), "test"); |
| return (AMQConnection) _qct.getConnection("guest", "guest") ; |
| } |
| |
| public static void main(String[] argv) throws Exception |
| { |
| final String connectionString; |
| final int expected; |
| if (argv.length == 0) |
| { |
| connectionString = "localhost:5672"; |
| expected = 100; |
| } |
| else |
| { |
| connectionString = argv[0]; |
| expected = Integer.parseInt(argv[1]); |
| } |
| |
| new Client(connect(connectionString), expected).shutdownWhenComplete(); |
| } |
| } |