/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

package org.apache.qpid.systests;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import org.apache.qpid.test.utils.TestSSLConstants;

public class QpidJmsClientConnectionBuilder implements ConnectionBuilder
{
    private static final AtomicInteger CLIENTID_COUNTER = new AtomicInteger();
    private String _host;
    private int _port;
    private int _sslPort;
    private Map<String, Object> _options;
    private boolean _enableTls;
    private boolean _enableFailover;
    private final List<Integer> _failoverPorts = new ArrayList<>();
    private String _transport = "amqp";

    QpidJmsClientConnectionBuilder()
    {
        _options = new TreeMap<>();
        _options.put("jms.clientID", getNextClientId());
        _options.put("jms.username", USERNAME);
        _options.put("jms.password", PASSWORD);
        _host = "localhost";
    }

    @Override
    public ConnectionBuilder setHost(final String host)
    {
        _host = host;
        return this;
    }

    @Override
    public ConnectionBuilder setPort(final int port)
    {
        _port = port;
        return setSslPort(port);
    }

    @Override
    public ConnectionBuilder addFailoverPort(final int port)
    {
        _failoverPorts.add(port);
        return this;
    }

    @Override
    public ConnectionBuilder setSslPort(final int port)
    {
        _sslPort = port;
        return this;
    }

    @Override
    public ConnectionBuilder setPrefetch(final int prefetch)
    {
        _options.put("jms.prefetchPolicy.all", prefetch);
        return this;
    }

    @Override
    public ConnectionBuilder setClientId(final String clientId)
    {
        if (clientId == null)
        {
            _options.remove("jms.clientID");
        }
        else
        {
            _options.put("jms.clientID", clientId);
        }
        return this;
    }

    @Override
    public ConnectionBuilder setUsername(final String username)
    {
        if (username == null)
        {
            _options.remove("jms.username");
        }
        else
        {
            _options.put("jms.username", username);
        }
        return this;
    }

    @Override
    public ConnectionBuilder setPassword(final String password)
    {
        if (password == null)
        {
            _options.remove("jms.password");
        }
        else
        {
            _options.put("jms.password", password);
        }
        return this;
    }

    @Override
    public ConnectionBuilder setVirtualHost(final String virtualHostName)
    {
        _options.put("amqp.vhost", virtualHostName);
        return this;
    }

    @Override
    public ConnectionBuilder setFailover(final boolean enableFailover)
    {
        _enableFailover = enableFailover;
        return this;
    }

    @Override
    public ConnectionBuilder setFailoverReconnectAttempts(final int reconnectAttempts)
    {
        _options.put("failover.maxReconnectAttempts", reconnectAttempts);
        return this;
    }

    @Override
    public ConnectionBuilder setFailoverReconnectDelay(final int connectDelay)
    {
        _options.put("failover.reconnectDelay", connectDelay);
        return this;
    }

    @Override
    public ConnectionBuilder setTls(final boolean enableTls)
    {
        _enableTls = enableTls;
        if (enableTls)
        {
            _options.put("transport.storeType", TestSSLConstants.JAVA_KEYSTORE_TYPE);
        }
        else
        {
            _options.remove("transport.storeType");
        }
        return this;
    }

    @Override
    public ConnectionBuilder setSyncPublish(final boolean syncPublish)
    {
        _options.put("jms.forceSyncSend", syncPublish);
        return this;
    }

    @Override
    public ConnectionBuilder setOptions(final Map<String, String> options)
    {
        _options.putAll(options);
        return this;
    }

    @Override
    public ConnectionBuilder setPopulateJMSXUserID(final boolean populateJMSXUserID)
    {
        _options.put("jms.populateJMSXUserID", String.valueOf(populateJMSXUserID));
        return this;
    }

    @Override
    public ConnectionBuilder setMessageRedelivery(final boolean redelivery)
    {
        return this;
    }

    @Override
    public ConnectionBuilder setDeserializationPolicyWhiteList(final String whiteList)
    {
        _options.put("jms.deserializationPolicy.whiteList", whiteList);
        return this;
    }

    @Override
    public ConnectionBuilder setDeserializationPolicyBlackList(final String blackList)
    {
        _options.put("jms.deserializationPolicy.blackList", blackList);
        return this;
    }

    @Override
    public ConnectionBuilder setKeyStoreLocation(final String keyStoreLocation)
    {
        _options.put("transport.keyStoreLocation", keyStoreLocation);
        return this;
    }

    @Override
    public ConnectionBuilder setKeyStorePassword(final String keyStorePassword)
    {
        _options.put("transport.keyStorePassword", keyStorePassword);
        return this;
    }

    @Override
    public ConnectionBuilder setTrustStoreLocation(final String trustStoreLocation)
    {
        _options.put("transport.trustStoreLocation", trustStoreLocation);
        return this;
    }

    @Override
    public ConnectionBuilder setTrustStorePassword(final String trustStorePassword)
    {
        _options.put("transport.trustStorePassword", trustStorePassword);
        return this;
    }

    @Override
    public ConnectionBuilder setVerifyHostName(final boolean verifyHostName)
    {
        _options.put("transport.verifyHost", verifyHostName);
        return this;
    }

    @Override
    public ConnectionBuilder setKeyAlias(final String alias)
    {
        _options.put("transport.keyAlias", alias);
        return this;
    }

    @Override
    public ConnectionBuilder setSaslMechanisms(final String... mechanism)
    {
        _options.put("amqp.saslMechanisms", String.join(",", mechanism));
        return this;
    }

    @Override
    public ConnectionBuilder setCompress(final boolean compress)
    {
        throw new UnsupportedOperationException();
    }

    @Override
    public Connection build() throws NamingException, JMSException
    {
        return buildConnectionFactory().createConnection();
    }

    @Override
    public ConnectionFactory buildConnectionFactory() throws NamingException
    {
        final Hashtable<Object, Object> initialContextEnvironment = new Hashtable<>();
        initialContextEnvironment.put(Context.INITIAL_CONTEXT_FACTORY,
                                      "org.apache.qpid.jms.jndi.JmsInitialContextFactory");

        final String connectionUrl = buildConnectionURL();

        final String factoryName = "connection";
        initialContextEnvironment.put("connectionfactory." + factoryName, connectionUrl);

        InitialContext initialContext = new InitialContext(initialContextEnvironment);
        try
        {
            return (ConnectionFactory) initialContext.lookup(factoryName);
        }
        finally
        {
            initialContext.close();
        }
    }

    @Override
    public String buildConnectionURL()
    {
        final StringBuilder connectionUrlBuilder = new StringBuilder();

        final Map<String, Object> options = new TreeMap<>();
        options.putAll(_options);
        if (_enableFailover)
        {
            if (!options.containsKey("failover.useReconnectBackOff"))
            {
                options.put("failover.useReconnectBackOff", "false");
            }
            if (!options.containsKey("failover.maxReconnectAttempts"))
            {
                options.put("failover.maxReconnectAttempts", "2");
            }

            final Set<String> transportKeys = options.keySet()
                                                     .stream()
                                                     .filter(key -> key.startsWith("amqp.") || key.startsWith(
                                                             "transport."))
                                                     .collect(Collectors.toSet());


            final Map<String, Object> transportOptions = new HashMap<>(options);
            transportOptions.keySet().retainAll(transportKeys);
            options.keySet().removeAll(transportKeys);

            final StringBuilder transportQueryBuilder = new StringBuilder();
            appendOptions(transportOptions, transportQueryBuilder);
            final String transportQuery = transportQueryBuilder.toString();

            final List<Integer> copy = new ArrayList<>(_failoverPorts.size() + 1);
            copy.add(_enableTls ? _sslPort : _port);
            copy.addAll(_failoverPorts);

            final String failover = copy.stream()
                                        .map(port -> String.format("amqp://%s:%d%s", _host, port, transportQuery))
                                        .collect(Collectors.joining(",", "failover:(", ")"));
            connectionUrlBuilder.append(failover);
            appendOptions(options, connectionUrlBuilder);
        }
        else if (!_enableTls)
        {
            connectionUrlBuilder.append(_transport).append("://").append(_host).append(":").append(_port);

            appendOptions(options, connectionUrlBuilder);
        }
        else
        {
            connectionUrlBuilder.append(_transport).append("s").append("://").append(_host).append(":").append(_sslPort);
            appendOptions(options, connectionUrlBuilder);
        }
        return connectionUrlBuilder.toString();
    }

    @Override
    public ConnectionBuilder setTransport(final String transport)
    {
        _transport = transport;
        return this;
    }

    private void appendOptions(final Map<String, Object> actualOptions, final StringBuilder stem)
    {
        boolean first = true;
        for(Map.Entry<String, Object> option : actualOptions.entrySet())
        {
            if(first)
            {
                stem.append('?');
                first = false;
            }
            else
            {
                stem.append('&');
            }
            try
            {
                stem.append(option.getKey()).append('=').append(URLEncoder.encode(String.valueOf(option.getValue()), "UTF-8"));
            }
            catch (UnsupportedEncodingException e)
            {
                throw new RuntimeException(e);
            }
        }
    }

    private String getNextClientId()
    {
        return "builderClientId-" + CLIENTID_COUNTER.getAndIncrement();
    }
}
