/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

package org.apache.qpid.systest.core;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import ch.qos.logback.classic.LoggerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.systest.core.logback.LogbackPropertyValueDiscriminator;
import org.apache.qpid.systest.core.util.FileUtils;
import org.apache.qpid.systest.core.util.SystemUtils;

public abstract class AbstractSpawnQpidBrokerAdmin implements BrokerAdmin
{
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSpawnQpidBrokerAdmin.class);

    protected static final String SYSTEST_PROPERTY_BROKER_READY_LOG = "qpid.systest.broker.ready";
    protected static final String SYSTEST_PROPERTY_BROKER_STOPPED_LOG = "qpid.systest.broker.stopped";
    protected static final String SYSTEST_PROPERTY_BROKER_LISTENING_LOG = "qpid.systest.broker.listening";
    protected static final String SYSTEST_PROPERTY_BROKER_PROCESS_LOG = "qpid.systest.broker.process";

    private static final String SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME = "qpid.systest.broker_startup_time";
    private static final String SYSTEST_PROPERTY_BROKER_CLEAN_BETWEEN_TESTS = "qpid.systest.broker.clean.between.tests";

    private volatile List<ListeningPort> _ports;
    private volatile Process _process;
    private volatile Integer _pid;
    private ExecutorService _executorService;
    protected Class _currentTestClass;
    protected Method _currentTestMethod;

    @Override
    public void beforeTestClass(final Class testClass)
    {
        _currentTestClass = testClass;
        setClassQualifiedTestName(testClass.getName());
        LOGGER.info("========================= creating broker for test class : {}", testClass.getSimpleName());
        setUp(testClass);
    }

    @Override
    public void beforeTestMethod(final Class testClass, final Method method)
    {
        _currentTestMethod = method;
        begin(testClass, method);
        LOGGER.info("========================= executing test : {}#{}", testClass.getSimpleName(), method.getName());
        setClassQualifiedTestName(testClass, method);
        LOGGER.info("========================= start executing test : {}#{}",
                    testClass.getSimpleName(),
                    method.getName());
    }

    @Override
    public void afterTestMethod(final Class testClass, final Method method)
    {
        _currentTestMethod = null;
        LOGGER.info("========================= stop executing test : {}#{}",
                    testClass.getSimpleName(),
                    method.getName());
        setClassQualifiedTestName(testClass, null);
        LOGGER.info("========================= cleaning up test environment for test : {}#{}",
                    testClass.getSimpleName(),
                    method.getName());
        end(testClass, method);
        LOGGER.info("========================= cleaning done for test : {}#{}",
                    testClass.getSimpleName(),
                    method.getName());
    }

    @Override
    public void afterTestClass(final Class testClass)
    {
        _currentTestClass = null;
        LOGGER.info("========================= stopping broker for test class: {}", testClass.getSimpleName());
        cleanUp(testClass);
        LOGGER.info("========================= stopping broker done for test class : {}", testClass.getSimpleName());
        setClassQualifiedTestName(null);
    }

    @Override
    public void restart()
    {
        end(_currentTestClass, _currentTestMethod);
        begin(_currentTestClass, _currentTestMethod);
    }

    @Override
    public void stop()
    {
        end(_currentTestClass, _currentTestMethod);
    }

    @Override
    public InetSocketAddress getBrokerAddress(final PortType portType)
    {
        Integer port = null;
        switch (portType)
        {
            case AMQP:
                for (ListeningPort p : _ports)
                {
                    if (p.getTransport().contains("TCP"))
                    {
                        port = p.getPort();
                        break;
                    }
                }
                break;
            default:
                throw new IllegalArgumentException(String.format("Unknown port type '%s'", portType));
        }
        if (port == null)
        {
            throw new IllegalArgumentException(String.format("Cannot find port of type '%s'", portType));
        }
        return new InetSocketAddress(port);
    }

    @Override
    public Connection getConnection() throws JMSException
    {
        return getConnection(getVirtualHostName(), null);
    }

    @Override
    public Connection getConnection(final Map<String, String> options) throws JMSException
    {
        return getConnection(getVirtualHostName(), options);
    }

    protected abstract void setUp(final Class testClass);

    protected abstract void cleanUp(final Class testClass);

    protected abstract void begin(final Class testClass, final Method method);

    protected abstract void end(final Class testClass, final Method method);

    protected abstract ProcessBuilder createBrokerProcessBuilder(final String workDirectory, final Class testClass) throws IOException;

    public LogConsumer getLogConsumer()
    {
        return new LogConsumer()
        {
            @Override
            public void accept(final String line)
            {
            }
        };
    }

    protected void runBroker(final Class testClass,
                             final Method method,
                             final String readyLogPattern,
                             final String stopLogPattern,
                             final String portListeningLogPattern,
                             final String processPIDLogPattern, String currentWorkDirectory) throws IOException
    {
        LOGGER.debug("Spawning broker working folder: {}", currentWorkDirectory);

        int startUpTime = Integer.getInteger(SYSTEST_PROPERTY_SPAWN_BROKER_STARTUP_TIME, 30000);

        LOGGER.debug("Spawning broker permitted start-up time: {}", startUpTime);

        ProcessBuilder processBuilder = createBrokerProcessBuilder(currentWorkDirectory, testClass);
        processBuilder.redirectErrorStream(true);

        Map<String, String> processEnvironment = processBuilder.environment();
        processEnvironment.put("QPID_PNAME", String.format("-DPNAME=QPBRKR -DTNAME=\"%s\"",  testClass.getName()));

        CountDownLatch readyLatch = new CountDownLatch(1);
        long startTime = System.currentTimeMillis();

        LOGGER.debug("Starting broker process");
        _process = processBuilder.start();

        BrokerSystemOutputHandler brokerSystemOutputHandler = new BrokerSystemOutputHandler(_process.getInputStream(),
                                                                                            getLogConsumer(),
                                                                                            readyLogPattern,
                                                                                            stopLogPattern,
                                                                                            processPIDLogPattern,
                                                                                            portListeningLogPattern,
                                                                                            readyLatch,
                                                                                            getClass().getName()
        );
        boolean brokerStarted = false;
        _executorService = Executors.newFixedThreadPool(1, new ThreadFactory()
        {
            @Override
            public Thread newThread(final Runnable r)
            {
                Thread t = new Thread(r, BrokerSystemOutputHandler.class.getSimpleName());
                t.setDaemon(false);
                return t;
            }
        });
        try
        {
            _executorService.submit(brokerSystemOutputHandler);
            if (!readyLatch.await(startUpTime, TimeUnit.MILLISECONDS))
            {
                LOGGER.warn("Spawned broker failed to become ready within {} ms. Ready line '{}'",
                            startUpTime, readyLogPattern);
                throw new BrokerAdminException(String.format(
                        "Broker failed to become ready within %d ms. Stop line : %s",
                        startUpTime,
                        readyLogPattern));
            }

            _pid = brokerSystemOutputHandler.getPID();
            _ports = brokerSystemOutputHandler.getAmqpPorts();

            if (_pid == -1)
            {
                throw new BrokerAdminException("Broker PID is not detected");
            }

            if (_ports.size() == 0)
            {
                throw new BrokerAdminException("Broker port is not detected");
            }

            try
            {
                //test that the broker is still running and hasn't exited unexpectedly
                int exit = _process.exitValue();
                LOGGER.info("broker aborted: {}", exit);
                throw new BrokerAdminException("broker aborted: " + exit);
            }
            catch (IllegalThreadStateException e)
            {
                // this is expect if the broker started successfully
            }

            LOGGER.info("Broker was started successfully within {} milliseconds, broker PID {}",
                        System.currentTimeMillis() - startTime,
                        _pid);
            LOGGER.info("Broker ports: {}", _ports);
            brokerStarted = true;
        }
        catch (RuntimeException e)
        {
            throw e;
        }
        catch (InterruptedException e)
        {
            Thread.interrupted();
        }
        catch (Exception e)
        {
            throw new BrokerAdminException(String.format("Unexpected exception on broker startup: %s", e), e);
        }
        finally
        {
            if (!brokerStarted)
            {
                LOGGER.warn("Broker failed to start");
                _process.destroy();
                _process = null;
                _executorService.shutdown();
                _executorService = null;
                _ports = null;
                _pid = null;
            }
        }
    }

    protected String getWorkingDirectory(final Class testClass, final Method method)
    {
        try
        {
            String timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System.currentTimeMillis()));
            String test = testClass.getSimpleName();
            if (method != null)
            {
                test += "-" + method.getName();
            }
            return Files.createTempDirectory(String.format("qpid-work-%s-%s-", timestamp, test))
                         .toString();
        }
        catch (IOException e)
        {
            throw new IllegalStateException(e);
        }
    }

    protected void shutdownBroker()
    {
        try
        {
            if (SystemUtils.isWindows())
            {
                doWindowsKill();
            }

            if (_process != null)
            {
                LOGGER.info("Destroying broker process");
                _process.destroy();

                reapChildProcess();
            }
        }
        finally
        {
            if (_executorService != null)
            {
                _executorService.shutdown();
            }
            if (_ports != null)
            {
                _ports.clear();
                _ports = null;
            }
            _pid = null;
            _process = null;
        }
    }

    protected void cleanWorkDirectory(final String currentWorkDirectory)
    {
        if (currentWorkDirectory != null && Boolean.getBoolean(SYSTEST_PROPERTY_BROKER_CLEAN_BETWEEN_TESTS))
        {
            FileUtils.delete(new File(currentWorkDirectory), true);
        }
    }

    protected String escapePath(String value)
    {
        if (SystemUtils.isWindows() && value.contains("\"") && !value.startsWith("\""))
        {
            return "\"" + value.replaceAll("\"", "\"\"") + "\"";
        }
        else
        {
            return value;
        }
    }

    @Override
    public Connection getConnection(final String virtualHostName,
                                       final Map<String, String> options) throws JMSException
    {
        final Hashtable<Object, Object> initialContextEnvironment = new Hashtable<>();
        initialContextEnvironment.put(Context.INITIAL_CONTEXT_FACTORY,
                                      "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
        final String factoryName = "connectionFactory";
        String urlTemplate = "amqp://:@%s/%s?brokerlist='tcp://localhost:%d?failover='nofailover''";
        StringBuilder url = new StringBuilder(String.format(urlTemplate,
                                                            "spawn_broker_admin",
                                                            virtualHostName,
                                                            getBrokerAddress(PortType.AMQP).getPort()));
        if (options != null)
        {
            for (Map.Entry<String, String> option : options.entrySet())
            {
                url.append("&").append(option.getKey()).append("='").append(option.getValue()).append("'");
            }
        }
        initialContextEnvironment.put("connectionfactory." + factoryName, url.toString());
        try
        {
            InitialContext initialContext = new InitialContext(initialContextEnvironment);
            try
            {
                ConnectionFactory factory = (ConnectionFactory) initialContext.lookup(factoryName);
                return factory.createConnection(getValidUsername(), getValidPassword());
            }
            finally
            {
                initialContext.close();
            }
        }
        catch (NamingException e)
        {
            throw new BrokerAdminException("Unexpected exception on connection lookup", e);
        }
    }

    protected void setClassQualifiedTestName(final Class testClass, final Method method)
    {
        String qualifiedTestName = null;
        if (testClass != null)
        {
            if (method == null)
            {
                qualifiedTestName = testClass.getName();
            }
            else
            {
                qualifiedTestName = String.format("%s.%s", testClass.getName(), method.getName());
            }
        }
        setClassQualifiedTestName(qualifiedTestName);
    }

    private void setClassQualifiedTestName(final String qualifiedTestName)
    {
        final LoggerContext loggerContext = ((ch.qos.logback.classic.Logger) LOGGER).getLoggerContext();
        loggerContext.putProperty(LogbackPropertyValueDiscriminator.CLASS_QUALIFIED_TEST_NAME, qualifiedTestName);
    }

    private void doWindowsKill()
    {
        try
        {

            Process p;
            p = Runtime.getRuntime().exec(new String[]{"taskkill", "/PID", Integer.toString(_pid), "/T", "/F"});
            consumeAllOutput(p);
        }
        catch (IOException e)
        {
            LOGGER.error("Error whilst killing process " + _pid, e);
        }
    }

    private static void consumeAllOutput(Process p) throws IOException
    {
        try (InputStreamReader inputStreamReader = new InputStreamReader(p.getInputStream()))
        {
            try (BufferedReader reader = new BufferedReader(inputStreamReader))
            {
                String line;
                while ((line = reader.readLine()) != null)
                {
                    LOGGER.debug("Consuming output: {}", line);
                }
            }
        }
    }

    private void reapChildProcess()
    {
        try
        {
            _process.waitFor();
            LOGGER.info("broker exited: " + _process.exitValue());
        }
        catch (InterruptedException e)
        {
            LOGGER.error("Interrupted whilst waiting for process shutdown");
            Thread.currentThread().interrupt();
        }
        finally
        {
            try
            {
                _process.getInputStream().close();
                _process.getErrorStream().close();
                _process.getOutputStream().close();
            }
            catch (IOException ignored)
            {
            }
        }
    }

    private final class BrokerSystemOutputHandler implements Runnable
    {
        private final BufferedReader _in;
        private final List<ListeningPort> _amqpPorts;
        private final Logger _out;
        private final Pattern _readyPattern;
        private final Pattern _stoppedPattern;
        private final Pattern _pidPattern;
        private final Pattern _amqpPortPattern;
        private final CountDownLatch _readyLatch;
        private final LogConsumer _logConsumer;

        private volatile boolean _seenReady;
        private volatile int _pid;

        private BrokerSystemOutputHandler(InputStream in,
                                          LogConsumer logConsumer,
                                          String readyRegExp,
                                          String stoppedRedExp,
                                          String pidRegExp,
                                          String amqpPortRegExp,
                                          CountDownLatch readyLatch,
                                          String loggerName)
        {
            _logConsumer = logConsumer;
            _amqpPorts = new ArrayList<>();
            _seenReady = false;
            _in = new BufferedReader(new InputStreamReader(in));
            _out = LoggerFactory.getLogger(loggerName);
            _readyPattern = Pattern.compile(readyRegExp);
            _stoppedPattern = Pattern.compile(stoppedRedExp);
            _amqpPortPattern = Pattern.compile(amqpPortRegExp);
            _pidPattern = Pattern.compile(pidRegExp);
            _readyLatch = readyLatch;
        }

        @Override
        public void run()
        {
            try
            {
                String line;
                while ((line = _in.readLine()) != null)
                {
                    _logConsumer.accept(line);
                    _out.info(line);

                    checkPortListeningLog(line, _amqpPortPattern, _amqpPorts);

                    Matcher pidMatcher = _pidPattern.matcher(line);
                    if (pidMatcher.find())
                    {
                        if (pidMatcher.groupCount() > 1)
                        {
                            _pid = Integer.parseInt(pidMatcher.group(1));
                        }
                    }

                    Matcher readyMatcher = _readyPattern.matcher(line);
                    if (readyMatcher.find())
                    {
                        _seenReady = true;
                        _readyLatch.countDown();
                    }

                    if (!_seenReady)
                    {
                        Matcher stopMatcher = _stoppedPattern.matcher(line);
                        if (stopMatcher.find())
                        {
                            break;
                        }
                    }
                }
            }
            catch (IOException e)
            {
                LOGGER.warn(e.getMessage()
                            + " : Broker stream from unexpectedly closed; last log lines written by Broker may be lost.");
            }
        }

        private void checkPortListeningLog(final String line,
                                           final Pattern portPattern,
                                           final List<ListeningPort> ports)
        {
            Matcher portMatcher = portPattern.matcher(line);
            if (portMatcher.find())
            {
                ports.add(new ListeningPort(portMatcher.group(1),
                                            Integer.parseInt(portMatcher.group(2))));
            }
        }

        int getPID()
        {
            return _pid;
        }

        List<ListeningPort> getAmqpPorts()
        {
            return _amqpPorts;
        }
    }

    private static class ListeningPort
    {
        private String _transport;
        private int _port;

        ListeningPort(final String transport, final int port)
        {
            _transport = transport;
            _port = port;
        }

        String getTransport()
        {
            return _transport;
        }

        int getPort()
        {
            return _port;
        }

        @Override
        public String toString()
        {
            return "ListeningPort{" +
                   ", _transport='" + _transport + '\'' +
                   ", _port=" + _port +
                   '}';
        }
    }
}
