blob: 28d7bf4aed6df3368c0569aff3a03b4f5976f879 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.systest.management.jmx;
import java.io.IOException;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.management.JMException;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.TabularData;
import org.apache.commons.lang.StringUtils;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.management.common.mbeans.ManagedConnection;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
public class ConnectionManagementTest extends QpidBrokerTestCase
{
private static final String VIRTUAL_HOST_NAME = "test";
private JMXTestUtils _jmxUtils;
private Connection _connection;
public void setUp() throws Exception
{
_jmxUtils = new JMXTestUtils(this);
_jmxUtils.setUp(); // modifies broker config therefore must be done before super.setUp()
super.setUp();
_jmxUtils.open();
}
public void tearDown() throws Exception
{
try
{
if (_jmxUtils != null)
{
_jmxUtils.close();
}
}
finally
{
super.tearDown();
}
}
public void testNumberOfManagedConnectionsMatchesNumberOfClientConnections() throws Exception
{
assertEquals("Expected no managed connections", 0, getManagedConnections().size());
_connection = getConnection();
assertEquals("Expected one managed connection", 1, getManagedConnections().size());
_connection.close();
assertEquals("Expected no managed connections after client connection closed", 0, getManagedConnections().size());
}
public void testGetAttributes() throws Exception
{
_connection = getConnection();
final ManagedConnection mBean = getConnectionMBean();
checkAuthorisedId(mBean);
checkClientVersion(mBean);
checkClientId(mBean);
}
public void testNonTransactedSession() throws Exception
{
_connection = getConnection();
boolean transactional = false;
boolean flowBlocked = false;
_connection.createSession(transactional, Session.AUTO_ACKNOWLEDGE);
final ManagedConnection mBean = getConnectionMBean();
final CompositeDataSupport row = getTheOneChannelRow(mBean);
assertChannelRowData(row, 0, transactional, flowBlocked);
}
public void testTransactedSessionWithUnackMessages() throws Exception
{
_connection = getConnection();
_connection.start();
boolean transactional = true;
int numberOfMessages = 2;
final Session session = _connection.createSession(transactional, Session.SESSION_TRANSACTED);
final Destination destination = session.createQueue(getTestQueueName());
final MessageConsumer consumer = session.createConsumer(destination);
sendMessage(session, destination, numberOfMessages);
receiveMessagesWithoutCommit(consumer, numberOfMessages);
final ManagedConnection mBean = getConnectionMBean();
final CompositeDataSupport row = getTheOneChannelRow(mBean);
boolean flowBlocked = false;
assertChannelRowData(row, numberOfMessages, transactional, flowBlocked);
// check that commit advances the lastIoTime
final Date initialLastIOTime = mBean.getLastIoTime();
session.commit();
assertTrue("commit should have caused last IO time to advance", mBean.getLastIoTime().after(initialLastIOTime));
// check that channels() now returns one session with no unacknowledged messages
final CompositeDataSupport rowAfterCommit = getTheOneChannelRow(mBean);
final Number unackCountAfterCommit = (Number) rowAfterCommit.get(ManagedConnection.UNACKED_COUNT);
assertEquals("Unexpected number of unacknowledged messages", 0, unackCountAfterCommit);
}
public void testProducerFlowBlocked() throws Exception
{
_connection = getConnection();
_connection.start();
String queueName = getTestQueueName();
Session session = _connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
createQueueOnBroker(session, queue);
ManagedQueue managedQueue = _jmxUtils.getManagedQueue(queueName);
managedQueue.setFlowResumeCapacity(DEFAULT_MESSAGE_SIZE * 2l);
managedQueue.setCapacity(DEFAULT_MESSAGE_SIZE * 3l);
final ManagedConnection managedConnection = getConnectionMBean();
// Check that producer flow is not block before test
final CompositeDataSupport rowBeforeSend = getTheOneChannelRow(managedConnection);
assertFlowBlocked(rowBeforeSend, false);
// Check that producer flow does not become block too soon
sendMessage(session, queue, 3);
final CompositeDataSupport rowBeforeFull = getTheOneChannelRow(managedConnection);
assertFlowBlocked(rowBeforeFull, false);
// Fourth message will over-fill the queue (but as we are not sending more messages, client thread wont't block)
sendMessage(session, queue, 1);
final CompositeDataSupport rowAfterFull = getTheOneChannelRow(managedConnection);
assertFlowBlocked(rowAfterFull, true);
// Consume two to bring the queue down to the resume capacity
MessageConsumer consumer = session.createConsumer(queue);
assertNotNull("Could not receive first message", consumer.receive(1000));
assertNotNull("Could not receive second message", consumer.receive(1000));
session.commit();
// Check that producer flow is no longer blocked
final CompositeDataSupport rowAfterReceive = getTheOneChannelRow(managedConnection);
assertFlowBlocked(rowAfterReceive, false);
}
private void createQueueOnBroker(Session session, Destination destination) throws JMSException
{
session.createConsumer(destination).close(); // Create a consumer only to cause queue creation
}
private void assertChannelRowData(final CompositeData row, int unacknowledgedMessages, boolean isTransactional, boolean flowBlocked)
{
assertNotNull(row);
assertEquals("Unexpected transactional flag", isTransactional, row.get(ManagedConnection.TRANSACTIONAL));
assertEquals("Unexpected unacknowledged message count", unacknowledgedMessages, row.get(ManagedConnection.UNACKED_COUNT));
assertEquals("Unexpected flow blocked", flowBlocked, row.get(ManagedConnection.FLOW_BLOCKED));
}
private void assertFlowBlocked(final CompositeData row, boolean flowBlocked)
{
assertNotNull(row);
assertEquals("Unexpected flow blocked", flowBlocked, row.get(ManagedConnection.FLOW_BLOCKED));
}
private void checkAuthorisedId(ManagedConnection mBean) throws Exception
{
assertEquals("Unexpected authorized id", GUEST_USERNAME, mBean.getAuthorizedId());
}
private void checkClientVersion(ManagedConnection mBean) throws Exception
{
String expectedVersion = QpidProperties.getReleaseVersion();
assertTrue(StringUtils.isNotBlank(expectedVersion));
assertEquals("Unexpected version", expectedVersion, mBean.getVersion());
}
private void checkClientId(ManagedConnection mBean) throws Exception
{
String expectedClientId = _connection.getClientID();
assertTrue(StringUtils.isNotBlank(expectedClientId));
assertEquals("Unexpected ClientId", expectedClientId, mBean.getClientId());
}
private ManagedConnection getConnectionMBean()
{
List<ManagedConnection> connections = getManagedConnections();
assertNotNull("Connection MBean is not found", connections);
assertEquals("Unexpected number of connection mbeans", 1, connections.size());
final ManagedConnection mBean = connections.get(0);
assertNotNull("Connection MBean is null", mBean);
return mBean;
}
private List<ManagedConnection> getManagedConnections()
{
return _jmxUtils.getManagedConnections(VIRTUAL_HOST_NAME);
}
private CompositeDataSupport getTheOneChannelRow(final ManagedConnection mBean) throws Exception
{
TabularData channelsData = getChannelsDataWithRetry(mBean);
assertEquals("Unexpected number of rows in channel table", 1, channelsData.size());
@SuppressWarnings("unchecked")
final Iterator<CompositeDataSupport> rowItr = (Iterator<CompositeDataSupport>) channelsData.values().iterator();
final CompositeDataSupport row = rowItr.next();
return row;
}
private void receiveMessagesWithoutCommit(final MessageConsumer consumer, int numberOfMessages) throws Exception
{
for (int i = 0; i < numberOfMessages; i++)
{
final Message m = consumer.receive(1000l);
assertNotNull("Message " + i + " is not received", m);
}
}
private TabularData getChannelsDataWithRetry(final ManagedConnection mBean)
throws IOException, JMException
{
TabularData channelsData = mBean.channels();
int retries = 0;
while(channelsData.size() == 0 && retries < 5)
{
sleep();
channelsData = mBean.channels();
retries++;
}
return channelsData;
}
private void sleep()
{
try
{
Thread.sleep(50);
}
catch (InterruptedException ie)
{
Thread.currentThread().interrupt();
}
}}