| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.transport.udp; |
| |
| import java.io.IOException; |
| |
| import javax.jms.MessageNotWriteableException; |
| |
| import junit.framework.TestCase; |
| import org.apache.activemq.command.ActiveMQDestination; |
| import org.apache.activemq.command.ActiveMQQueue; |
| import org.apache.activemq.command.ActiveMQTextMessage; |
| import org.apache.activemq.command.Command; |
| import org.apache.activemq.command.ConsumerInfo; |
| import org.apache.activemq.command.ProducerInfo; |
| import org.apache.activemq.command.Response; |
| import org.apache.activemq.command.WireFormatInfo; |
| import org.apache.activemq.transport.Transport; |
| import org.apache.activemq.transport.TransportAcceptListener; |
| import org.apache.activemq.transport.TransportListener; |
| import org.apache.activemq.transport.TransportServer; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * |
| */ |
| public abstract class UdpTestSupport extends TestCase implements TransportListener { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(UdpTestSupport.class); |
| |
| protected Transport producer; |
| protected Transport consumer; |
| |
| protected Object lock = new Object(); |
| protected Command receivedCommand; |
| protected TransportServer server; |
| protected boolean large; |
| |
| // You might want to set this to massive number if debugging |
| protected int waitForCommandTimeout = 40000; |
| |
| public void testSendingSmallMessage() throws Exception { |
| ConsumerInfo expected = new ConsumerInfo(); |
| expected.setSelector("Cheese"); |
| expected.setExclusive(true); |
| expected.setExclusive(true); |
| expected.setPrefetchSize(3456); |
| |
| try { |
| LOG.info("About to send: " + expected); |
| producer.oneway(expected); |
| |
| Command received = assertCommandReceived(); |
| assertTrue("Should have received a ConsumerInfo but was: " + received, received instanceof ConsumerInfo); |
| ConsumerInfo actual = (ConsumerInfo)received; |
| assertEquals("Selector", expected.getSelector(), actual.getSelector()); |
| assertEquals("isExclusive", expected.isExclusive(), actual.isExclusive()); |
| assertEquals("getPrefetchSize", expected.getPrefetchSize(), actual.getPrefetchSize()); |
| } catch (Exception e) { |
| LOG.info("Caught: " + e); |
| e.printStackTrace(); |
| fail("Failed to send to transport: " + e); |
| } |
| } |
| |
| public void testSendingMediumMessage() throws Exception { |
| String text = createMessageBodyText(4 * 105); |
| ActiveMQDestination destination = new ActiveMQQueue("Foo.Bar.Medium"); |
| assertSendTextMessage(destination, text); |
| } |
| |
| public void testSendingLargeMessage() throws Exception { |
| String text = createMessageBodyText(4 * 1024); |
| ActiveMQDestination destination = new ActiveMQQueue("Foo.Bar.Large"); |
| assertSendTextMessage(destination, text); |
| } |
| |
| protected void assertSendTextMessage(ActiveMQDestination destination, String text) throws MessageNotWriteableException { |
| large = true; |
| |
| ActiveMQTextMessage expected = new ActiveMQTextMessage(); |
| |
| expected.setText(text); |
| expected.setDestination(destination); |
| |
| try { |
| LOG.info("About to send message of type: " + expected.getClass()); |
| producer.oneway(expected); |
| |
| // lets send a dummy command to ensure things don't block if we |
| // discard the last one |
| // keepalive does not have a commandId... |
| // producer.oneway(new KeepAliveInfo()); |
| producer.oneway(new ProducerInfo()); |
| producer.oneway(new ProducerInfo()); |
| |
| Command received = assertCommandReceived(); |
| assertTrue("Should have received a ActiveMQTextMessage but was: " + received, received instanceof ActiveMQTextMessage); |
| ActiveMQTextMessage actual = (ActiveMQTextMessage)received; |
| |
| assertEquals("getDestination", expected.getDestination(), actual.getDestination()); |
| assertEquals("getText", expected.getText(), actual.getText()); |
| |
| LOG.info("Received text message with: " + actual.getText().length() + " character(s)"); |
| } catch (Exception e) { |
| LOG.info("Caught: " + e); |
| e.printStackTrace(); |
| fail("Failed to send to transport: " + e); |
| } |
| } |
| |
| protected String createMessageBodyText(int loopSize) { |
| StringBuffer buffer = new StringBuffer(); |
| for (int i = 0; i < loopSize; i++) { |
| buffer.append("0123456789"); |
| } |
| return buffer.toString(); |
| } |
| |
| protected void setUp() throws Exception { |
| server = createServer(); |
| if (server != null) { |
| server.setAcceptListener(new TransportAcceptListener() { |
| |
| public void onAccept(Transport transport) { |
| consumer = transport; |
| consumer.setTransportListener(UdpTestSupport.this); |
| try { |
| consumer.start(); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public void onAcceptError(Exception error) { |
| } |
| }); |
| server.start(); |
| } |
| |
| consumer = createConsumer(); |
| if (consumer != null) { |
| consumer.setTransportListener(this); |
| consumer.start(); |
| } |
| |
| producer = createProducer(); |
| producer.setTransportListener(new TransportListener() { |
| public void onCommand(Object command) { |
| LOG.info("Producer received: " + command); |
| } |
| |
| public void onException(IOException error) { |
| LOG.info("Producer exception: " + error); |
| error.printStackTrace(); |
| } |
| |
| public void transportInterupted() { |
| } |
| |
| public void transportResumed() { |
| } |
| }); |
| |
| producer.start(); |
| } |
| |
| protected void tearDown() throws Exception { |
| if (producer != null) { |
| producer.stop(); |
| } |
| if (consumer != null) { |
| consumer.stop(); |
| } |
| if (server != null) { |
| server.stop(); |
| } |
| } |
| |
| public void onCommand(Object o) { |
| final Command command = (Command)o; |
| if (command instanceof WireFormatInfo) { |
| LOG.info("Got WireFormatInfo: " + command); |
| } else { |
| if (command.isResponseRequired()) { |
| // lets send a response back... |
| sendResponse(command); |
| |
| } |
| if (large) { |
| LOG.info("### Received command: " + command.getClass() + " with id: " + command.getCommandId()); |
| } else { |
| LOG.info("### Received command: " + command); |
| } |
| |
| synchronized (lock) { |
| if (receivedCommand == null) { |
| receivedCommand = command; |
| } else { |
| LOG.info("Ignoring superfluous command: " + command); |
| } |
| lock.notifyAll(); |
| } |
| } |
| } |
| |
| protected void sendResponse(Command command) { |
| Response response = new Response(); |
| response.setCorrelationId(command.getCommandId()); |
| try { |
| consumer.oneway(response); |
| } catch (IOException e) { |
| LOG.info("Caught: " + e); |
| e.printStackTrace(); |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public void onException(IOException error) { |
| LOG.info("### Received error: " + error); |
| error.printStackTrace(); |
| } |
| |
| public void transportInterupted() { |
| LOG.info("### Transport interrupted"); |
| } |
| |
| public void transportResumed() { |
| LOG.info("### Transport resumed"); |
| } |
| |
| protected Command assertCommandReceived() throws InterruptedException { |
| Command answer = null; |
| synchronized (lock) { |
| answer = receivedCommand; |
| if (answer == null) { |
| lock.wait(waitForCommandTimeout); |
| } |
| answer = receivedCommand; |
| } |
| |
| assertNotNull("Should have received a Command by now!", answer); |
| return answer; |
| } |
| |
| protected abstract Transport createConsumer() throws Exception; |
| |
| protected abstract Transport createProducer() throws Exception; |
| |
| protected TransportServer createServer() throws Exception { |
| return null; |
| } |
| |
| } |