blob: fe0761c2efa1a83285f2190b5b893f17e92d6b66 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.test.utils;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Hashtable;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class QpidJmsClientConnectionBuilder implements ConnectionBuilder
{
private static final AtomicInteger CLIENTID_COUNTER = new AtomicInteger();
private String _username;
private String _password;
private Map<String, Object> _options;
private boolean _enableTls;
private boolean _enableFailover;
public QpidJmsClientConnectionBuilder()
{
_options = new TreeMap<>();
_options.put("jms.clientID", getNextClientId());
_options.put("amqp.vhost", "test");
_username = "guest";
_password = "guest";
}
@Override
public ConnectionBuilder setPrefetch(final int prefetch)
{
_options.put("jms.prefetchPolicy.all", prefetch);
return this;
}
@Override
public ConnectionBuilder setClientId(final String clientId)
{
if (clientId == null)
{
_options.remove("jms.clientID");
}
else
{
_options.put("jms.clientID", clientId);
}
return this;
}
@Override
public ConnectionBuilder setUsername(final String username)
{
_username = username;
return this;
}
@Override
public ConnectionBuilder setPassword(final String password)
{
_password = password;
return this;
}
@Override
public ConnectionBuilder setVirtualHost(final String virtualHostName)
{
_options.put("amqp.vhost", virtualHostName);
return this;
}
@Override
public ConnectionBuilder setFailover(final boolean enableFailover)
{
_enableFailover = enableFailover;
return this;
}
@Override
public ConnectionBuilder setFailoverReconnectAttempts(final int reconnectAttempts)
{
_options.put("failover.maxReconnectAttempts", reconnectAttempts);
return this;
}
@Override
public ConnectionBuilder setTls(final boolean enableTls)
{
_enableTls = enableTls;
return this;
}
@Override
public ConnectionBuilder setSyncPublish(final boolean syncPublish)
{
_options.put("jms.forceSyncSend", syncPublish);
return this;
}
@Override
public Connection build() throws NamingException, JMSException
{
final Hashtable<Object, Object> initialContextEnvironment = new Hashtable<>();
final String factoryName;
final Map<String, Object> options = new TreeMap<>();
options.putAll(_options);
if (_enableFailover)
{
if (!options.containsKey("failover.maxReconnectAttempts"))
{
options.put("failover.maxReconnectAttempts", "2");
}
final StringBuilder stem = new StringBuilder("failover:(amqp://localhost:")
.append(System.getProperty("test.port"))
.append(",amqp://localhost:")
.append(System.getProperty("test.port.alt"))
.append(")");
appendOptions(options, stem);
initialContextEnvironment.put("property.connectionfactory.failover.remoteURI",
stem.toString());
factoryName = "failover";
}
else if (!_enableTls)
{
final StringBuilder stem =
new StringBuilder("amqp://localhost:").append(System.getProperty("test.port"));
appendOptions(options, stem);
initialContextEnvironment.put("property.connectionfactory.default.remoteURI", stem.toString());
factoryName = "default";
}
else
{
final StringBuilder stem = new StringBuilder("amqps://localhost:").append(String.valueOf(System.getProperty("test.port.ssl")));
appendOptions(options, stem);
initialContextEnvironment.put("connectionfactory.default.ssl", stem.toString());
factoryName = "default.ssl";
}
final ConnectionFactory connectionFactory =
(ConnectionFactory) new InitialContext(initialContextEnvironment).lookup(factoryName);
return connectionFactory.createConnection(_username, _password);
}
private void appendOptions(final Map<String, Object> actualOptions, final StringBuilder stem)
{
boolean first = true;
for(Map.Entry<String, Object> option : actualOptions.entrySet())
{
if(first)
{
stem.append('?');
first = false;
}
else
{
stem.append('&');
}
try
{
stem.append(option.getKey()).append('=').append(URLEncoder.encode(String.valueOf(option.getValue()), "UTF-8"));
}
catch (UnsupportedEncodingException e)
{
throw new RuntimeException(e);
}
}
}
private String getNextClientId()
{
return "builderClientId-" + CLIENTID_COUNTER.getAndIncrement();
}
}