blob: c20eefd9877fbd3b6e8377514ec92bca67815d29 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*
*/
package org.apache.qpid.test.unit.client.channelclose;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.ExchangeDeclareOkBody;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
public class ChannelCloseTest extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener
{
private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseTest.class);
private Connection _connection;
private Session _session;
private static final long SYNC_TIMEOUT = 500;
private int TEST = 0;
/**
* Close channel, use chanel with same id ensure error.
*
* This test is only valid for non 0-10 connection .
*/
public void testReusingChannelAfterFullClosure() throws Exception
{
_connection=newConnection();
// Create Producer
try
{
_connection.start();
createChannelAndTest(1);
// Cause it to close
try
{
_logger.info("Testing invalid exchange");
declareExchange(1, "", "name_that_will_lookup_to_null", false);
fail("Exchange name is empty so this should fail ");
}
catch (AMQException e)
{
assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode());
}
// Check that
try
{
_logger.info("Testing valid exchange should fail");
declareExchange(1, "topic", "amq.topic", false);
fail("This should not succeed as the channel should be closed ");
}
catch (AMQException e)
{
if (_logger.isInfoEnabled())
{
_logger.info("Exception occured was:" + e.getErrorCode());
}
assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode());
_connection=newConnection();
}
checkSendingMessage();
_session.close();
_connection.close();
}
catch (JMSException e)
{
e.printStackTrace();
fail(e.getMessage());
}
}
/*
close channel and send guff then send ok no errors
REMOVE TEST - The behaviour after server has sent close is undefined.
the server should be free to fail as it may wish to reclaim its resources
immediately after close.
*/
/*public void testSendingMethodsAfterClose() throws Exception
{
// this is testing an 0.8 connection
if(isBroker08())
{
try
{
_connection=new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'");
((AMQConnection) _connection).setConnectionListener(this);
_connection.setExceptionListener(this);
// Change the StateManager for one that doesn't respond with Close-OKs
AMQStateManager oldStateManager=((AMQConnection) _connection).getProtocolHandler().getStateManager();
_session=_connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
_connection.start();
// Test connection
checkSendingMessage();
// Set StateManager to manager that ignores Close-oks
AMQProtocolSession protocolSession=
((AMQConnection) _connection).getProtocolHandler().getProtocolSession();
MethodDispatcher d = protocolSession.getMethodDispatcher();
MethodDispatcher wrappedDispatcher = (MethodDispatcher)
Proxy.newProxyInstance(d.getClass().getClassLoader(),
d.getClass().getInterfaces(),
new MethodDispatcherProxyHandler(
(ClientMethodDispatcherImpl) d));
protocolSession.setMethodDispatcher(wrappedDispatcher);
AMQStateManager newStateManager=new NoCloseOKStateManager(protocolSession);
newStateManager.changeState(oldStateManager.getCurrentState());
((AMQConnection) _connection).getProtocolHandler().setStateManager(newStateManager);
final int TEST_CHANNEL=1;
_logger.info("Testing Channel(" + TEST_CHANNEL + ") Creation");
createChannelAndTest(TEST_CHANNEL);
// Cause it to close
try
{
_logger.info("Closing Channel - invalid exchange");
declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", false);
fail("Exchange name is empty so this should fail ");
}
catch (AMQException e)
{
assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode());
}
try
{
// Send other methods that should be ignored
// send them no wait as server will ignore them
_logger.info("Tested known exchange - should ignore");
declareExchange(TEST_CHANNEL, "topic", "amq.topic", true);
_logger.info("Tested known invalid exchange - should ignore");
declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", true);
_logger.info("Tested known invalid exchange - should ignore");
declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", true);
// Send sync .. server will igore and timy oue
_logger.info("Tested known invalid exchange - should ignore");
declareExchange(TEST_CHANNEL, "", "name_that_will_lookup_to_null", false);
}
catch (AMQTimeoutException te)
{
assertEquals("Request should timeout", AMQConstant.REQUEST_TIMEOUT, te.getErrorCode());
}
catch (AMQException e)
{
fail("This should not fail as all requests should be ignored");
}
_logger.info("Sending Close");
// Send Close-ok
sendClose(TEST_CHANNEL);
_logger.info("Re-opening channel");
createChannelAndTest(TEST_CHANNEL);
// Test connection is still ok
checkSendingMessage();
}
catch (JMSException e)
{
e.printStackTrace();
fail(e.getMessage());
}
catch (AMQException e)
{
fail(e.getMessage());
}
catch (URLSyntaxException e)
{
fail(e.getMessage());
}
finally
{
try
{
_session.close();
_connection.close();
}
catch (JMSException e)
{
e.printStackTrace();
fail(e.getMessage());
}
}
}
}
*/
private void createChannelAndTest(int channel) throws FailoverException
{
// Create A channel
try
{
createChannel(channel);
}
catch (AMQException e)
{
fail(e.getMessage());
}
// Test it is ok
try
{
declareExchange(channel, "topic", "amq.topic", false);
_logger.info("Tested known exchange");
}
catch (AMQException e)
{
fail("This should not fail as this is the default exchange details");
}
}
private void sendClose(int channel)
{
ChannelCloseOkBody body =
((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelCloseOkBody();
AMQFrame frame = body.generateFrame(channel);
((AMQConnection) _connection).getProtocolHandler().writeFrame(frame);
}
private void checkSendingMessage() throws JMSException
{
TEST++;
_logger.info("Test creating producer which will use channel id 1");
Queue queue = _session.createQueue("CCT_test_validation_queue" + TEST);
MessageConsumer consumer = _session.createConsumer(queue);
MessageProducer producer = _session.createProducer(queue);
final String MESSAGE = "CCT_Test_Message";
producer.send(_session.createTextMessage(MESSAGE));
Message msg = consumer.receive(2000);
assertNotNull("Received messages should not be null.", msg);
assertEquals("Message received not what we sent", MESSAGE, ((TextMessage) msg).getText());
}
private Connection newConnection()
{
Connection connection = null;
try
{
connection = getConnection();
((AMQConnection) connection).setConnectionListener(this);
_session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
connection.start();
}
catch (Exception e)
{
fail("Creating new connection when:" + e.getMessage());
}
return connection;
}
private void declareExchange(int channelId, String _type, String _name, boolean nowait)
throws AMQException, FailoverException
{
ExchangeDeclareBody body =
((AMQConnection) _connection).getProtocolHandler()
.getMethodRegistry()
.createExchangeDeclareBody(0,
new AMQShortString(_name),
new AMQShortString(_type),
true,
false,
false,
false,
nowait,
null);
AMQFrame exchangeDeclare = body.generateFrame(channelId);
AMQProtocolHandler protocolHandler = ((AMQConnection) _connection).getProtocolHandler();
if (nowait)
{
protocolHandler.writeFrame(exchangeDeclare);
}
else
{
protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT);
}
// return null;
// }
// }, (AMQConnection)_connection).execute();
}
private void createChannel(int channelId) throws AMQException, FailoverException
{
ChannelOpenBody body =
((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
((AMQConnection) _connection).getProtocolHandler().syncWrite(body.generateFrame(channelId), // outOfBand
ChannelOpenOkBody.class);
}
public void onException(JMSException jmsException)
{
// _logger.info("CCT" + jmsException);
fail(jmsException.getMessage());
}
public void bytesSent(long count)
{ }
public void bytesReceived(long count)
{ }
public boolean preFailover(boolean redirect)
{
return false;
}
public boolean preResubscribe()
{
return false;
}
public void failoverComplete()
{ }
}