blob: b3c30ccc4d6ae5b285f1a50e95b2491b8c792497 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.systest.connection;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import javax.jms.Connection;
import javax.jms.JMSException;
import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.client.ConnectAttemptListener;
import org.apache.qpid.client.ConnectionExtension;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.protocol.ErrorCodes;
import org.apache.qpid.systest.core.BrokerAdmin;
import org.apache.qpid.systest.core.JmsTestBase;
public class ConnectionFactoryTest extends JmsTestBase
{
private static final String BROKER_URL = "tcp://%s:%d%s";
private static final String CONNECTION_URL = "amqp://%s:%s@clientID/?brokerlist='" + BROKER_URL + "'";
private String _urlWithCredentials;
private String _urlWithoutCredentials;
private String _userName;
private String _password;
@Before
public void setUp()
{
final BrokerAdmin brokerAdmin = getBrokerAdmin();
final InetSocketAddress brokerAddress = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.AMQP);
_userName = brokerAdmin.getValidUsername();
_password = brokerAdmin.getValidPassword();
final String host = brokerAddress.getHostString();
final int port = brokerAddress.getPort();
_urlWithCredentials = String.format(CONNECTION_URL, _userName, _password, host, port, "");
_urlWithoutCredentials = String.format(CONNECTION_URL, "", "", host, port, "");
}
@Test
public void testCreateConnectionWithUsernamePasswordSetOnConnectionURL() throws Exception
{
AMQConnectionFactory factory = new AMQConnectionFactory(_urlWithCredentials);
AMQConnection con = null;
try
{
con = factory.createConnection();
assertEquals("Usernames used is different from the one in URL", _userName, con.getUsername());
assertEquals("Password used is different from the one in URL", _password, con.getPassword());
}
finally
{
if (con != null)
{
con.close();
}
}
}
@Test
public void testCreateConnectionWithUsernamePasswordPassedIntoCreateConnection() throws Exception
{
AMQConnectionFactory factory = new AMQConnectionFactory(_urlWithCredentials);
AMQConnection con2 = null;
try
{
con2 = factory.createConnection("admin", "admin");
assertEquals("Usernames used is different from the one in URL", "admin", con2.getUsername());
assertEquals("Password used is different from the one in URL", "admin", con2.getPassword());
}
finally
{
if (con2 != null)
{
con2.close();
}
}
}
@Test
public void testCreateConnectionWithUsernamePasswordPassedIntoCreateConnectionWhenConnectionUrlWithoutCredentials()
throws Exception
{
AMQConnectionFactory factory = new AMQConnectionFactory(_urlWithoutCredentials);
AMQConnection con3 = null;
try
{
con3 = factory.createConnection(_userName, _password);
assertEquals("Usernames used is different from the one in URL", _userName, con3.getUsername());
assertEquals("Password used is different from the one in URL", _password, con3.getPassword());
}
finally
{
if (con3 != null)
{
con3.close();
}
}
}
@Test
public void testCreatingConnectionWithInstanceMadeUsingDefaultConstructor() throws Exception
{
AMQConnectionFactory factory = new AMQConnectionFactory();
factory.setConnectionURLString(_urlWithCredentials);
AMQConnection con = null;
try
{
con = factory.createConnection();
assertNotNull(con);
assertEquals(_userName, con.getUsername());
assertEquals(_password, con.getPassword());
}
finally
{
if (con != null)
{
con.close();
}
}
}
@Test
public void testCreatingConnectionUsingUserNameAndPasswordExtensions() throws Exception
{
AMQConnectionFactory factory = new AMQConnectionFactory();
factory.setConnectionURLString(_urlWithoutCredentials);
factory.setExtension(ConnectionExtension.PASSWORD_OVERRIDE.getExtensionName(), (connection, uri) -> _password);
factory.setExtension(ConnectionExtension.USERNAME_OVERRIDE.getExtensionName(), (connection, uri) -> _userName);
AMQConnection con = null;
try
{
con = factory.createConnection();
assertNotNull(con);
assertEquals(_userName, con.getUsername());
assertEquals(_password, con.getPassword());
}
finally
{
if (con != null)
{
con.close();
}
}
}
@Test
public void testCreatingConnectionUsingUserNameAndPasswordExtensionsWhenRuntimeExceptionIsThrown() throws Exception
{
AMQConnectionFactory factory = new AMQConnectionFactory();
factory.setConnectionURLString(_urlWithoutCredentials);
factory.setExtension(ConnectionExtension.PASSWORD_OVERRIDE.getExtensionName(), (connection, uri) -> {
throw new RuntimeException("Test");
});
factory.setExtension(ConnectionExtension.USERNAME_OVERRIDE.getExtensionName(), (connection, uri) -> _userName);
AMQConnection con = null;
try
{
con = factory.createConnection();
fail("Exception is expected");
}
catch (JMSException e)
{
// pass
}
finally
{
if (con != null)
{
con.close();
}
}
}
@Test
public void testAccountRotationViaConnectAttemptListenerOnConnect() throws Exception
{
final BrokerAdmin brokerAdmin = getBrokerAdmin();
assumeThat("Requires authentication failure for invalid credentials",
brokerAdmin.getBrokerType(),
is(equalTo(BrokerAdmin.BrokerType.BROKERJ)));
final AMQConnectionFactory factory = new AMQConnectionFactory(_urlWithoutCredentials);
final AccountRotatingHelper accountRotatingHelper = new AccountRotatingHelper();
factory.setConnectAttemptListener(accountRotatingHelper);
factory.setExtension(ConnectionExtension.PASSWORD_OVERRIDE.getExtensionName(),
accountRotatingHelper.getPasswordExtension());
factory.setExtension(ConnectionExtension.USERNAME_OVERRIDE.getExtensionName(),
accountRotatingHelper.getUsernameExtension());
AMQConnection con = null;
try
{
con = factory.createConnection();
assertNotNull(con);
assertEquals(_userName, con.getUsername());
assertEquals(_password, con.getPassword());
assertTrue(accountRotatingHelper.getUseValidCredentials());
final InetSocketAddress brokerAddress = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.AMQP);
final URI expectedBrokerURI =
URI.create(String.format(BROKER_URL, brokerAddress.getHostName(), brokerAddress.getPort(), ""));
assertEquals(expectedBrokerURI, accountRotatingHelper.getLastUriForFailedlAttempt());
assertEquals(expectedBrokerURI, accountRotatingHelper.getLastUriForSuccessfulAttempt());
}
finally
{
if (con != null)
{
con.close();
}
}
}
@Test
public void testAccountRotationViaConnectAttemptListenerOnFailover() throws Exception
{
final BrokerAdmin brokerAdmin = getBrokerAdmin();
assumeThat("Requires authentication failure for invalid credentials",
brokerAdmin.getBrokerType(),
is(equalTo(BrokerAdmin.BrokerType.BROKERJ)));
final InetSocketAddress brokerAddress = brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.AMQP);
final String retriesOption = "?retries='1'";
final String url = String.format(CONNECTION_URL,
"",
"",
brokerAddress.getHostName(),
brokerAddress.getPort(),
retriesOption) + "&failover='singlebroker?cyclecount='1''";
final AMQConnectionFactory factory = new AMQConnectionFactory(url);
final AccountRotatingHelper accountRotatingHelper = new AccountRotatingHelper();
accountRotatingHelper.setUseValidCredentials(true);
factory.setConnectAttemptListener(accountRotatingHelper);
factory.setExtension(ConnectionExtension.PASSWORD_OVERRIDE.getExtensionName(),
accountRotatingHelper.getPasswordExtension());
factory.setExtension(ConnectionExtension.USERNAME_OVERRIDE.getExtensionName(),
accountRotatingHelper.getUsernameExtension());
AMQConnection con = null;
try
{
con = factory.createConnection();
assertNotNull(con);
assertEquals(_userName, con.getUsername());
assertEquals(_password, con.getPassword());
assertEquals(1, accountRotatingHelper.getSuccessfulConnectCount());
accountRotatingHelper.setUseValidCredentials(false);
restartBrokerAndWaitForFailover(brokerAdmin, con);
assertEquals(_userName, con.getUsername());
assertEquals(_password, con.getPassword());
assertTrue(accountRotatingHelper.getUseValidCredentials());
assertEquals(2, accountRotatingHelper.getSuccessfulConnectCount());
final URI expectedBrokerURI =
URI.create(String.format(BROKER_URL,
brokerAddress.getHostName(),
brokerAddress.getPort(),
retriesOption));
assertEquals(expectedBrokerURI, accountRotatingHelper.getLastUriForFailedlAttempt());
assertEquals(expectedBrokerURI, accountRotatingHelper.getLastUriForSuccessfulAttempt());
}
finally
{
if (con != null)
{
con.close();
}
}
}
private void restartBrokerAndWaitForFailover(final BrokerAdmin brokerAdmin, final AMQConnection con)
throws InterruptedException
{
final CountDownLatch latch = new CountDownLatch(1);
final ConnectionListener connectionListener = new ConnectionListener()
{
@Override
public void bytesSent(final long count)
{
}
@Override
public void bytesReceived(final long count)
{
}
@Override
public boolean preFailover(final boolean redirect)
{
return true;
}
@Override
public boolean preResubscribe()
{
return false;
}
@Override
public void failoverComplete()
{
latch.countDown();
}
};
con.setConnectionListener(connectionListener);
brokerAdmin.restart();
assertTrue(latch.await(10, TimeUnit.SECONDS));
}
private class AccountRotatingHelper implements ConnectAttemptListener
{
private final AtomicBoolean _useValidCredentials = new AtomicBoolean(false);
private final AtomicInteger _successCounter = new AtomicInteger();
private volatile URI _lastUriForSuccessfulAttempt;
private volatile URI _lastUriForFailedlAttempt;
@Override
public boolean connectAttemptFailed(final URI brokerURI, final JMSException e)
{
boolean reattempt = String.valueOf(ErrorCodes.CONNECTION_FORCED).equals(e.getErrorCode())
|| String.valueOf(ErrorCodes.NOT_ALLOWED).equals(e.getErrorCode());
if (reattempt)
{
_useValidCredentials.set(true);
}
_lastUriForFailedlAttempt = brokerURI;
return reattempt;
}
@Override
public void connectAttemptSucceeded(final URI brokerURI)
{
_successCounter.incrementAndGet();
_lastUriForSuccessfulAttempt = brokerURI;
}
BiFunction<Connection, URI, Object> getPasswordExtension()
{
return (connection, uri) -> _useValidCredentials.get() ? _password : "invalid";
}
BiFunction<Connection, URI, Object> getUsernameExtension()
{
return (connection, uri) -> _useValidCredentials.get() ? _userName : "invalid";
}
void setUseValidCredentials(boolean value)
{
_useValidCredentials.set(value);
}
boolean getUseValidCredentials()
{
return _useValidCredentials.get();
}
int getSuccessfulConnectCount()
{
return _successCounter.get();
}
URI getLastUriForSuccessfulAttempt()
{
return _lastUriForSuccessfulAttempt;
}
URI getLastUriForFailedlAttempt()
{
return _lastUriForFailedlAttempt;
}
}
}