blob: 3219582fb35e349b5b61f650ec68eab0d6a776ec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.systests;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class QpidJmsClient0xConnectionBuilder implements ConnectionBuilder
{
private final List<Integer> _failoverPorts = new ArrayList<>();
private String _clientId = "clientid";
private String _username = USERNAME;
private String _password = PASSWORD;
private String _virtualHost;
private boolean _enableTls;
private boolean _enableFailover;
private final Map<String, Object> _options = new TreeMap<>();
private int _reconnectAttempts = 20;
private int _connectdelay;
private String _host = "localhost";
private int _port;
private String _keyStoreLocation;
private String _keyStorePassword;
private String _trustStoreLocation;
private String _trustStorePassword;
private Boolean _verifyHostName;
private String _keyAlias;
private String _saslMechanisms;
@Override
public ConnectionBuilder setHost(final String host)
{
_host = host;
return this;
}
@Override
public ConnectionBuilder setPort(final int port)
{
_port = port;
return this;
}
@Override
public ConnectionBuilder addFailoverPort(final int port)
{
_failoverPorts.add(port);
return this;
}
@Override
public ConnectionBuilder setPrefetch(final int prefetch)
{
_options.put("maxprefetch", prefetch);
return this;
}
@Override
public ConnectionBuilder setClientId(final String clientId)
{
_clientId = clientId;
return this;
}
@Override
public ConnectionBuilder setUsername(final String username)
{
_username = username;
return this;
}
@Override
public ConnectionBuilder setPassword(final String password)
{
_password = password;
return this;
}
@Override
public ConnectionBuilder setVirtualHost(final String virtualHostName)
{
_virtualHost = virtualHostName;
return this;
}
@Override
public ConnectionBuilder setFailover(final boolean enableFailover)
{
_enableFailover = enableFailover;
return this;
}
@Override
public ConnectionBuilder setFailoverReconnectAttempts(final int reconnectAttempts)
{
_reconnectAttempts = reconnectAttempts;
return this;
}
@Override
public ConnectionBuilder setFailoverReconnectDelay(final int connectDelay)
{
_connectdelay = connectDelay;
return this;
}
@Override
public ConnectionBuilder setTls(final boolean enableTls)
{
_enableTls = enableTls;
return this;
}
@Override
public ConnectionBuilder setSyncPublish(final boolean syncPublish)
{
if (syncPublish)
{
_options.put("sync_publish", "all");
}
else
{
_options.remove("sync_publish");
}
return this;
}
@Override
public ConnectionBuilder setOptions(final Map<String, String> options)
{
_options.putAll(options);
return this;
}
@Override
public ConnectionBuilder setPopulateJMSXUserID(final boolean populateJMSXUserID)
{
_options.put("populateJMSXUserID", String.valueOf(populateJMSXUserID));
return this;
}
@Override
public ConnectionBuilder setMessageRedelivery(final boolean redelivery)
{
if (redelivery)
{
_options.put("rejectbehaviour", "server");
}
else
{
_options.remove("rejectbehaviour");
}
return this;
}
@Override
public ConnectionBuilder setDeserializationPolicyWhiteList(final String whiteList)
{
_options.put("objectMessageClassHierarchyWhiteList", whiteList);
return this;
}
@Override
public ConnectionBuilder setDeserializationPolicyBlackList(final String blackList)
{
_options.put("objectMessageClassHierarchyBlackList", blackList);
return this;
}
@Override
public ConnectionBuilder setKeyStoreLocation(final String keyStoreLocation)
{
_keyStoreLocation = keyStoreLocation;
return this;
}
@Override
public ConnectionBuilder setKeyStorePassword(final String keyStorePassword)
{
_keyStorePassword = keyStorePassword;
return this;
}
@Override
public ConnectionBuilder setTrustStoreLocation(final String trustStoreLocation)
{
_trustStoreLocation = trustStoreLocation;
return this;
}
@Override
public ConnectionBuilder setTrustStorePassword(final String trustStorePassword)
{
_trustStorePassword = trustStorePassword;
return this;
}
@Override
public ConnectionBuilder setVerifyHostName(final boolean verifyHostName)
{
_verifyHostName = verifyHostName;
return this;
}
@Override
public ConnectionBuilder setKeyAlias(final String alias)
{
_keyAlias = alias;
return this;
}
@Override
public ConnectionBuilder setSaslMechanisms(final String... mechanism)
{
_saslMechanisms = String.join(" ", mechanism);
return this;
}
@Override
public ConnectionBuilder setCompress(final boolean compress)
{
_options.put("compressMessages", compress);
return this;
}
@Override
public Connection build() throws JMSException, NamingException
{
return buildConnectionFactory().createConnection(_username, _password);
}
@Override
public ConnectionFactory buildConnectionFactory() throws NamingException
{
String connectionUrl = buildConnectionURL();
final Hashtable<Object, Object> initialContextEnvironment = new Hashtable<>();
initialContextEnvironment.put(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
final String factoryName = "connectionFactory";
initialContextEnvironment.put("connectionfactory." + factoryName, connectionUrl);
InitialContext initialContext = new InitialContext(initialContextEnvironment);
try
{
return (ConnectionFactory) initialContext.lookup(factoryName);
}
finally
{
initialContext.close();
}
}
@Override
public String buildConnectionURL()
{
StringBuilder cUrlBuilder = new StringBuilder("amqp://");
if (_username != null)
{
cUrlBuilder.append(_username);
}
if (_username != null || _password != null)
{
cUrlBuilder.append(":");
}
if (_password != null)
{
cUrlBuilder.append(_password);
}
if (_username != null || _password != null)
{
cUrlBuilder.append("@");
}
if (_clientId != null)
{
cUrlBuilder.append(_clientId);
}
cUrlBuilder.append("/");
if (_virtualHost != null)
{
cUrlBuilder.append(_virtualHost);
}
cUrlBuilder.append("?");
final List<Integer> copy = new ArrayList<>(_failoverPorts.size() + 1);
copy.add(_port);
if (_enableFailover)
{
copy.addAll(_failoverPorts);
}
final String transportQuery = buildTransportQuery();
final String brokerlist = copy.stream()
.map(port -> String.format("tcp://%s:%d%s", _host, port, transportQuery))
.collect(Collectors.joining(";", "brokerlist='", "'"));
cUrlBuilder.append(brokerlist);
if (_enableFailover)
{
cUrlBuilder.append("&sync_ack='true'&sync_publish='all'");
cUrlBuilder.append(String.format("&failover='roundrobin?cyclecount='%d''", _reconnectAttempts));
}
if (_saslMechanisms != null)
{
cUrlBuilder.append("&");
cUrlBuilder.append("sasl_mechs='").append(_saslMechanisms).append('\'');
}
for (Map.Entry<String, Object> entry : _options.entrySet())
{
cUrlBuilder.append("&").append(entry.getKey()).append("='").append(entry.getValue()).append("'");
}
return cUrlBuilder.toString();
}
@Override
public ConnectionBuilder setTransport(final String transport)
{
throw new UnsupportedOperationException("Cannot modify transport");
}
private String buildTransportQuery()
{
final StringBuilder builder = new StringBuilder();
if (_enableTls)
{
builder.append("?ssl='true'");
if (_keyStoreLocation != null)
{
builder.append("&key_store='").append(encodeBrokerOption(_keyStoreLocation)).append('\'');
}
if (_keyStorePassword != null)
{
builder.append("&key_store_password='").append(_keyStorePassword).append('\'');
}
if (_trustStoreLocation != null)
{
builder.append("&trust_store='").append(encodeBrokerOption(_trustStoreLocation)).append('\'');
}
if (_trustStorePassword != null)
{
builder.append("&trust_store_password='").append(_trustStorePassword).append('\'');
}
if (_verifyHostName != null)
{
builder.append("&ssl_verify_hostname='").append(_verifyHostName).append('\'');
}
if (_keyAlias != null)
{
builder.append("&ssl_cert_alias='").append(_keyAlias).append('\'');
}
}
if (_connectdelay > 0)
{
final char initial = builder.length() == 0 ? '?' : '&';
builder.append(String.format("%cconnectdelay='%d'", initial, _connectdelay));
}
return builder.toString();
}
private String encodeBrokerOption(final String canonicalPath)
{
try
{
return URLEncoder.encode(encodeConnectionOption(canonicalPath), StandardCharsets.UTF_8.name());
}
catch (UnsupportedEncodingException e)
{
throw new RuntimeException(e);
}
}
private String encodeConnectionOption(final String canonicalPath)
{
try
{
return URLEncoder.encode(canonicalPath, StandardCharsets.UTF_8.name()).replace("+", "%20");
}
catch (UnsupportedEncodingException e)
{
throw new RuntimeException(e);
}
}
}