blob: 8ebd4eb562a5190baabc15558695a3d9546bc358 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client.handler;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import javax.jms.*;
import javax.jms.IllegalStateException;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.CommonConnection;
import org.apache.qpid.client.PooledConnectionFactory;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.test.utils.QpidTestCase;
public class PooledConnectionFactoryTest extends QpidTestCase
{
private interface CommonConnectionCreator
{
CommonConnection newConnection(ConnectionURL connectionUrl);
}
private CommonConnectionCreator _connectionCreator;
private PooledConnectionFactory _connectionFactory;
@Override
public void setUp() throws Exception
{
super.setUp();
_connectionCreator = mock(CommonConnectionCreator.class);
_connectionFactory = new PooledConnectionFactory()
{
@Override
protected CommonConnection newConnectionInstance(final ConnectionURL connectionDetails) throws QpidException
{
return _connectionCreator.newConnection(connectionDetails);
}
};
}
public void testConnectionCreatedWithUrlUserAndPassword() throws Exception
{
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
final ArgumentCaptor<ConnectionURL> connectionCaptor = ArgumentCaptor.forClass(ConnectionURL.class);
final List<CommonConnection> createdConnections = new ArrayList<>();
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new Answer<CommonConnection>()
{
@Override
public CommonConnection answer(final InvocationOnMock invocation) throws Throwable
{
ConnectionURL url = connectionCaptor.getValue();
assertEquals("user", url.getUsername());
assertEquals("pass", url.getPassword());
final CommonConnection connection = mock(CommonConnection.class);
when(connection.isClosed()).thenReturn(false);
createdConnections.add(connection);
return connection;
}
});
Connection conn = _connectionFactory.createConnection();
assertNotNull(conn);
assertEquals(1, createdConnections.size());
final String connToString = conn.toString();
assertTrue(connToString.endsWith(createdConnections.get(0).toString()));
conn.close();
Connection conn2 = _connectionFactory.createConnection();
assertNotNull(conn2);
assertEquals(1, createdConnections.size());
assertTrue(conn2.toString().endsWith(createdConnections.get(0).toString()));
try
{
conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
fail("Closed connection should not allow sessions to be created");
}
catch (IllegalStateException e)
{
// pass
}
Connection conn3 = _connectionFactory.createConnection();
assertNotNull(conn3);
assertEquals(2, createdConnections.size());
assertTrue(conn3.toString().endsWith(createdConnections.get(1).toString()));
assertFalse(conn3.toString().equals(conn2.toString()));
conn2.close();
Connection conn4 = _connectionFactory.createConnection();
assertNotNull(conn4);
assertEquals(2, createdConnections.size());
assertTrue(conn4.toString().endsWith(createdConnections.get(0).toString()));
}
public void testConnectionCreatedPassedUserAndPassword() throws Exception
{
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
final ArgumentCaptor<ConnectionURL> connectionCaptor = ArgumentCaptor.forClass(ConnectionURL.class);
final List<CommonConnection> createdConnections = new ArrayList<>();
final List<String> usernames = new ArrayList<>();
final List<String> passwords = new ArrayList<>();
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new Answer<CommonConnection>()
{
@Override
public CommonConnection answer(final InvocationOnMock invocation) throws Throwable
{
ConnectionURL url = connectionCaptor.getValue();
usernames.add(url.getUsername());
passwords.add(url.getPassword());
final CommonConnection connection = mock(CommonConnection.class);
when(connection.isClosed()).thenReturn(false);
createdConnections.add(connection);
return connection;
}
});
Connection conn = _connectionFactory.createConnection("user1", "pass1");
assertNotNull(conn);
assertEquals(1, createdConnections.size());
assertEquals("user1", usernames.get(0));
assertEquals("pass1", passwords.get(0));
conn.close();
Connection conn2 = _connectionFactory.createConnection("user2", "pass2");
assertNotNull(conn2);
assertEquals(2, createdConnections.size());
assertEquals("user2", usernames.get(1));
assertEquals("pass2", passwords.get(1));
conn2.close();
Connection conn3 = _connectionFactory.createConnection("user1", "pass1");
assertNotNull(conn3);
assertEquals(2, createdConnections.size());
assertTrue(conn3.toString().endsWith(createdConnections.get(0).toString()));
}
public void testMaxPoolSize() throws Exception
{
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
_connectionFactory.setMaxPoolSize(3);
final ArgumentCaptor<ConnectionURL> connectionCaptor = ArgumentCaptor.forClass(ConnectionURL.class);
final List<CommonConnection> createdConnections = new ArrayList<>();
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new Answer<CommonConnection>()
{
@Override
public CommonConnection answer(final InvocationOnMock invocation) throws Throwable
{
ConnectionURL url = connectionCaptor.getValue();
final CommonConnection connection = mock(CommonConnection.class);
when(connection.isClosed()).thenReturn(false);
createdConnections.add(connection);
return connection;
}
});
Connection conn1 = _connectionFactory.createQueueConnection();
Connection conn2 = _connectionFactory.createQueueConnection();
Connection conn3 = _connectionFactory.createQueueConnection();
Connection conn4 = _connectionFactory.createQueueConnection();
assertEquals(4, createdConnections.size());
conn1.close();
conn2.close();
conn3.close();
conn4.close();
Connection conn5 = _connectionFactory.createTopicConnection();
assertEquals(4, createdConnections.size());
Connection conn6 = _connectionFactory.createTopicConnection();
assertEquals(4, createdConnections.size());
Connection conn7 = _connectionFactory.createTopicConnection();
assertEquals(4, createdConnections.size());
Connection conn8 = _connectionFactory.createTopicConnection();
assertEquals(5, createdConnections.size());
}
public void testConnectionTimeout() throws Exception
{
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
_connectionFactory.setMaxPoolSize(4);
_connectionFactory.setConnectionTimeout(100l);
final ArgumentCaptor<ConnectionURL> connectionCaptor = ArgumentCaptor.forClass(ConnectionURL.class);
final List<CommonConnection> createdConnections = new ArrayList<>();
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new Answer<CommonConnection>()
{
@Override
public CommonConnection answer(final InvocationOnMock invocation) throws Throwable
{
ConnectionURL url = connectionCaptor.getValue();
final CommonConnection connection = mock(CommonConnection.class);
when(connection.isClosed()).thenReturn(false);
createdConnections.add(connection);
return connection;
}
});
Connection conn1 = _connectionFactory.createQueueConnection();
Connection conn2 = _connectionFactory.createQueueConnection();
Connection conn3 = _connectionFactory.createQueueConnection();
Connection conn4 = _connectionFactory.createQueueConnection();
assertEquals(4, createdConnections.size());
conn1.close();
conn2.close();
conn3.close();
conn4.close();
Thread.sleep(500l);
Connection conn5 = _connectionFactory.createTopicConnection();
assertEquals(5, createdConnections.size());
Connection conn6 = _connectionFactory.createTopicConnection();
assertEquals(6, createdConnections.size());
Connection conn7 = _connectionFactory.createTopicConnection();
assertEquals(7, createdConnections.size());
Connection conn8 = _connectionFactory.createTopicConnection();
assertEquals(8, createdConnections.size());
}
public void testSessionsAreClosed() throws Exception
{
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
final ArgumentCaptor<ConnectionURL> connectionCaptor = ArgumentCaptor.forClass(ConnectionURL.class);
final List<CommonConnection> createdConnections = new ArrayList<>();
final Session session = mock(Session.class);
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new Answer<CommonConnection>()
{
@Override
public CommonConnection answer(final InvocationOnMock invocation) throws Throwable
{
ConnectionURL url = connectionCaptor.getValue();
final CommonConnection connection = mock(CommonConnection.class);
when(connection.isClosed()).thenReturn(false);
when(connection.createSession(anyBoolean(),anyInt())).thenReturn(session);
createdConnections.add(connection);
return connection;
}
});
Connection conn = _connectionFactory.createConnection();
Session createdSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.close();
verify(session, times(1)).close();
}
public void testConnectionWithExceptionNotPooled() throws Exception
{
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
final ArgumentCaptor<ConnectionURL> connectionCaptor = ArgumentCaptor.forClass(ConnectionURL.class);
final List<CommonConnection> createdConnections = new ArrayList<>();
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new Answer<CommonConnection>()
{
@Override
public CommonConnection answer(final InvocationOnMock invocation) throws Throwable
{
ConnectionURL url = connectionCaptor.getValue();
final CommonConnection connection = mock(CommonConnection.class);
when(connection.isClosed()).thenReturn(false);
when(connection.createSession(anyBoolean(),anyInt())).thenThrow(new JMSException("foo"));
createdConnections.add(connection);
return connection;
}
});
Connection conn = _connectionFactory.createConnection();
try
{
Session createdSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
fail("Expected an exception");
}
catch(JMSException e)
{
assertEquals("foo", e.getMessage());
}
conn.close();
Connection conn2 = _connectionFactory.createConnection();
assertEquals(2, createdConnections.size());
}
public void testConnectionWithExceptionListenerExceptionNotPooled() throws Exception
{
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
final List<CommonConnection> createdConnections = new ArrayList<>();
when(_connectionCreator.newConnection(any(ConnectionURL.class))).thenAnswer(new Answer<CommonConnection>()
{
@Override
public CommonConnection answer(final InvocationOnMock invocation) throws Throwable
{
final CommonConnection connection = mock(CommonConnection.class);
when(connection.isClosed()).thenReturn(false);
createdConnections.add(connection);
final ArgumentCaptor<ExceptionListener> listenerCaptor =
ArgumentCaptor.forClass(ExceptionListener.class);
doAnswer(new Answer()
{
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable
{
when(connection.getExceptionListener()).thenReturn(listenerCaptor.getValue());
return null;
}
}).when(connection).setExceptionListener(listenerCaptor.capture());
return connection;
}
});
Connection conn = _connectionFactory.createConnection();
assertEquals(1, createdConnections.size());
ExceptionListener listener = mock(ExceptionListener.class);
conn.setExceptionListener(listener);
assertEquals(listener, conn.getExceptionListener());
verify(listener, never()).onException(any(JMSException.class));
createdConnections.get(0).getExceptionListener().onException(new JMSException("bar"));
verify(listener, times(1)).onException(any(JMSException.class));
conn.close();
Connection conn2 = _connectionFactory.createConnection();
assertEquals(2, createdConnections.size());
}
public void testSessionCloseException() throws Exception
{
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
final ArgumentCaptor<ConnectionURL> connectionCaptor = ArgumentCaptor.forClass(ConnectionURL.class);
final List<CommonConnection> createdConnections = new ArrayList<>();
final Session session = mock(Session.class);
doThrow(new JMSException("foo")).when(session).close();
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new Answer<CommonConnection>()
{
@Override
public CommonConnection answer(final InvocationOnMock invocation) throws Throwable
{
ConnectionURL url = connectionCaptor.getValue();
final CommonConnection connection = mock(CommonConnection.class);
when(connection.isClosed()).thenReturn(false);
when(connection.createSession(anyBoolean(),anyInt())).thenReturn(session);
createdConnections.add(connection);
return connection;
}
});
Connection conn = _connectionFactory.createConnection();
Session createdSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
try
{
conn.close();
fail("Expected an exception");
}
catch(JMSException e)
{
assertEquals("foo", e.getMessage());
}
Connection conn2 = _connectionFactory.createConnection();
assertEquals(2, createdConnections.size());
}
}