| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.client.protocol; |
| |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.SocketAddress; |
| import java.net.UnknownHostException; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.qpid.AMQException; |
| import org.apache.qpid.QpidException; |
| import org.apache.qpid.client.AMQAuthenticationException; |
| import org.apache.qpid.client.AMQProtocolHandler; |
| import org.apache.qpid.client.MockAMQConnection; |
| import org.apache.qpid.client.state.AMQState; |
| import org.apache.qpid.client.transport.TestNetworkConnection; |
| import org.apache.qpid.framing.AMQBody; |
| import org.apache.qpid.framing.AMQFrame; |
| import org.apache.qpid.framing.AMQMethodBody; |
| import org.apache.qpid.framing.BasicRecoverSyncOkBody; |
| import org.apache.qpid.framing.ProtocolVersion; |
| import org.apache.qpid.protocol.ErrorCodes; |
| import org.apache.qpid.test.utils.QpidTestCase; |
| |
| /** |
| * This is a test address QPID-1431 where frame listeners would fail to be notified of an incomming exception. |
| * |
| * Currently we do checks at the Session level to ensure that the connection/session are open. However, it is possible |
| * for the connection to close AFTER this check has been performed. |
| * |
| * Performing a similar check at the frameListener level in AMQProtocolHandler makes most sence as this will prevent |
| * listening when there can be no returning frames. |
| * |
| * With the correction in place it also means that the new listener will either make it on to the list for notification |
| * or it will be notified of any existing exception due to the connection being closed. |
| * |
| * There may still be an issue in this space if the client utilises a second thread to close the session as there will |
| * be no exception set to throw and so the wait will occur. That said when the session is closed the framelisteners |
| * should be notified. Not sure this is tested. |
| */ |
| public class AMQProtocolHandlerTest extends QpidTestCase |
| { |
| private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandlerTest.class); |
| |
| // The handler to test |
| private AMQProtocolHandler _handler; |
| |
| // A frame to block upon whilst waiting the exception |
| private AMQFrame _blockFrame; |
| |
| // Latch to know when the listener receives an exception |
| private CountDownLatch _handleCountDown; |
| // The listener that will receive an exception |
| private BlockToAccessFrameListener _listener; |
| |
| @Override |
| public void setUp() throws Exception |
| { |
| super.setUp(); |
| //Create a new ProtocolHandler with a fake connection. |
| _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='tcp://localhost:1'")); |
| _handler.setNetworkConnection(new TestNetworkConnection()); |
| AMQBody body = new BasicRecoverSyncOkBody(ProtocolVersion.v0_8); |
| _blockFrame = new AMQFrame(0, body); |
| |
| _handleCountDown = new CountDownLatch(1); |
| |
| _logger.info("Creating _Listener that should also receive the thrown exception."); |
| _listener = new BlockToAccessFrameListener(1); |
| } |
| |
| /** |
| * There are two paths based on the type of exception thrown. |
| * |
| * This tests that when an AMQException is thrown we get the same type of AMQException back with the real exception |
| * wrapped as the cause. |
| * |
| * @throws InterruptedException - if we are unable to wait for the test signals |
| */ |
| public void testFrameListenerUpdateWithAMQException() throws InterruptedException |
| { |
| AMQAuthenticationException trigger = new AMQAuthenticationException( |
| "AMQPHTest", new RuntimeException()); |
| |
| performWithException(trigger); |
| |
| |
| Exception receivedException = _listener.getReceivedException(); |
| |
| assertEquals("Return exception was not the expected type", |
| AMQAuthenticationException.class, receivedException.getClass()); |
| |
| assertEquals("The _Listener did not receive the correct error code", |
| trigger.getErrorCode(), ((AMQAuthenticationException)receivedException).getErrorCode()); |
| } |
| |
| /** |
| * There are two paths based on the type of exception thrown. |
| * |
| * This tests that when a generic Exception is thrown we get the exception back wrapped in a AMQException |
| * as the cause. |
| * @throws InterruptedException - if we are unable to wait for the test signals |
| */ |
| public void testFrameListenerUpdateWithException() throws InterruptedException |
| { |
| |
| Exception trigger = new Exception(new RuntimeException()); |
| |
| performWithException(trigger); |
| |
| assertEquals("The _Listener did not receive the correct error code", |
| ErrorCodes.INTERNAL_ERROR, ((AMQException)_listener.getReceivedException()).getErrorCode()); |
| } |
| |
| |
| public void testTemporaryQueueWildcard() throws UnknownHostException |
| { |
| checkTempQueueName(new InetSocketAddress(1234), "tmp_0_0_0_0_0_0_0_0_1234_1"); |
| } |
| |
| public void testTemporaryQueueLocalhostAddr() throws UnknownHostException |
| { |
| checkTempQueueName(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234), "tmp_127_0_0_1_1234_1"); |
| } |
| |
| public void testTemporaryQueueLocalhostName() throws UnknownHostException |
| { |
| checkTempQueueName(new InetSocketAddress(InetAddress.getByName("localhost"), 1234), "tmp_localhost_127_0_0_1_1234_1"); |
| } |
| |
| public void testTemporaryQueueInet4() throws UnknownHostException |
| { |
| checkTempQueueName(new InetSocketAddress(InetAddress.getByName("192.168.1.2"), 1234), "tmp_192_168_1_2_1234_1"); |
| } |
| |
| public void testTemporaryQueueInet6() throws UnknownHostException |
| { |
| checkTempQueueName(new InetSocketAddress(InetAddress.getByName("1080:0:0:0:8:800:200C:417A"), 1234), "tmp_1080_0_0_0_8_800_200c_417a_1234_1"); |
| } |
| |
| private void checkTempQueueName(SocketAddress address, String queueName) |
| { |
| TestNetworkConnection networkConnection = new TestNetworkConnection(); |
| networkConnection.setLocalAddress(address); |
| _handler.setNetworkConnection(networkConnection); |
| assertEquals("Wrong queue name", queueName, _handler.generateQueueName()); |
| } |
| |
| /** |
| * This is the main test method for both test cases. |
| * |
| * What occurs is that we create a new thread that will block (<30s[DEFAULT]) for a frame or exception to occur . |
| * |
| * We use a CountDownLatch to ensure that the new thread is running before we then yield and sleep to help ensure |
| * the new thread has entered the synchronized block in the writeCommandFrameAndWaitForReply. |
| * |
| * We can then ack like an the incomming exception handler in (ConnectionCloseMethodHandler). |
| * |
| * We fire the error to the stateManager, which in this case will recored the error as there are no state listeners. |
| * |
| * We then set the connection to be closed, as we would normally close the socket at this point. |
| * |
| * Then fire the exception in to any frameListeners. |
| * |
| * The blocked listener (created above) when receiving the error simulates the user by creating a new request to |
| * block for a frame. |
| * |
| * This request should fail. Prior to the fix this will fail with a NPE as we are attempting to use a null listener |
| * in the writeCommand.... call L:268. |
| * |
| * This highlights that the listener would be added dispite there being a pending error state that the listener will |
| * miss as it is not currently part of the _frameListeners set that is being notified by the iterator. |
| * |
| * The method waits to ensure that an exception is received before returning. |
| * |
| * The calling methods validate that exception that was received based on the one they sent in. |
| * |
| * @param trigger The exception to throw through the handler |
| */ |
| private void performWithException(Exception trigger) throws InterruptedException |
| { |
| |
| final CountDownLatch callingWriteCommand = new CountDownLatch(1); |
| |
| //Set an initial listener that will allow us to create a new blocking method |
| new Thread(new Runnable() |
| { |
| public void run() |
| { |
| |
| try |
| { |
| |
| _logger.info("At initial block, signalling to fire new exception"); |
| callingWriteCommand.countDown(); |
| |
| _handler.writeCommandFrameAndWaitForReply(_blockFrame, _listener); |
| } |
| catch (Exception e) |
| { |
| _logger.error(e.getMessage(), e); |
| fail(e.getMessage()); |
| } |
| } |
| }).start(); |
| |
| _logger.info("Waiting for 'initial block' to start "); |
| if (!callingWriteCommand.await(1000, TimeUnit.MILLISECONDS)) |
| { |
| fail("Failed to start new thread to block for frame"); |
| } |
| |
| // Do what we can to ensure that this thread does not continue before the above thread has hit the synchronized |
| // block in the writeCommandFrameAndWaitForReply |
| Thread.yield(); |
| Thread.sleep(1000); |
| |
| _logger.info("Firing Erorr through state manager. There should be not state waiters here."); |
| _handler.getStateManager().error(trigger); |
| |
| _logger.info("Setting state to be CONNECTION_CLOSED."); |
| |
| _handler.getStateManager().changeState(AMQState.CONNECTION_CLOSED); |
| |
| _logger.info("Firing exception"); |
| _handler.propagateExceptionToFrameListeners(trigger); |
| |
| _logger.info("Awaiting notifcation from handler that exception arrived."); |
| |
| if (!_handleCountDown.await(2000, TimeUnit.MILLISECONDS)) |
| { |
| fail("Failed to handle exception and timeout has not occured"); |
| } |
| |
| |
| assertNotNull("The _Listener did not receive the exception", _listener.getReceivedException()); |
| |
| assertTrue("Received exception not an AMQException", |
| _listener.getReceivedException() instanceof QpidException); |
| |
| QpidException receivedException = (QpidException) _listener.getReceivedException(); |
| |
| assertTrue("The _Listener did not receive the correct message", |
| receivedException.getMessage().startsWith(trigger.getMessage())); |
| |
| |
| assertEquals("The _Listener did not receive the correct cause", |
| trigger, receivedException.getCause()); |
| |
| assertEquals("The _Listener did not receive the correct sub cause", |
| trigger.getCause(), receivedException.getCause().getCause()); |
| |
| } |
| |
| class BlockToAccessFrameListener extends BlockingMethodFrameListener |
| { |
| private Exception _receivedException = null; |
| |
| /** |
| * Creates a new method listener, that filters incoming method to just those that match the specified channel id. |
| * |
| * @param channelId The channel id to filter incoming methods with. |
| */ |
| public BlockToAccessFrameListener(int channelId) |
| { |
| super(channelId); |
| _logger.info("Creating a listener:" + this); |
| } |
| |
| public boolean processMethod(int channelId, AMQMethodBody frame) |
| { |
| return true; |
| } |
| |
| @Override |
| public void error(Exception e) |
| { |
| _logger.info("Exception(" + e + ") Received by:" + this); |
| // Create a new Thread to start the blocking registration. |
| new Thread(new Runnable() |
| { |
| |
| public void run() |
| { |
| //Set an initial listener that will allow us to create a new blocking method |
| try |
| { |
| _handler.writeCommandFrameAndWaitForReply(_blockFrame, null, 2000L); |
| _logger.info("listener(" + this + ") Wait completed"); |
| } |
| catch (Exception e) |
| { |
| _logger.info("listener(" + this + ") threw exception:" + e.getMessage()); |
| _receivedException = e; |
| } |
| |
| _logger.info("listener(" + this + ") completed"); |
| _handleCountDown.countDown(); |
| } |
| }).start(); |
| } |
| |
| public Exception getReceivedException() |
| { |
| return _receivedException; |
| } |
| } |
| } |