/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.tests.load;

import java.lang.reflect.Constructor;
import java.util.LinkedList;
import java.util.List;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.cassandra.common.SystemHelper;
import org.apache.ignite.tests.utils.TestsHelper;
import org.apache.logging.log4j.Logger;

/**
 * Basic load test driver to be inherited by specific implementation for particular use-case.
 */
public abstract class LoadTestDriver {
    /** Number of attempts to setup load test */
    private static final int NUMBER_OF_SETUP_ATTEMPTS = 10;

    /** Timeout between load test setup attempts */
    private static final int SETUP_ATTEMPT_TIMEOUT = 1000;

    /** */
    public void runTest(String testName, Class<? extends Worker> clazz, String logName) {
        logger().info("Running " + testName + " test");

        Object cfg = null;

        int attempt;

        logger().info("Setting up load tests driver");

        for (attempt = 0; attempt < NUMBER_OF_SETUP_ATTEMPTS; attempt++) {
            try {
                cfg = setup(logName);
                break;
            }
            catch (Throwable e) {
                logger().error((attempt + 1) + " attempt to setup load test '" + testName + "' failed", e);
            }

            if (attempt + 1 != NUMBER_OF_SETUP_ATTEMPTS) {
                logger().info("Sleeping for " + SETUP_ATTEMPT_TIMEOUT + " seconds before trying next attempt " +
                        "to setup '" + testName + "' load test");

                try {
                    Thread.sleep(SETUP_ATTEMPT_TIMEOUT);
                }
                catch (InterruptedException ignored) {
                    // No-op.
                }
            }
        }

        if (cfg == null && attempt == NUMBER_OF_SETUP_ATTEMPTS) {
            throw new RuntimeException("All " + NUMBER_OF_SETUP_ATTEMPTS + " attempts to setup load test '" +
                    testName + "' have failed");
        }

        // calculates host unique prefix based on its subnet IP address
        long hostUniquePrefix = getHostUniquePrefix();

        logger().info("Load tests driver setup successfully completed");

        try {

            List<Worker> workers = new LinkedList<>();
            long startPosition = 0;

            logger().info("Starting workers");

            for (int i = 0; i < TestsHelper.getLoadTestsThreadsCount(); i++) {
                Worker worker = createWorker(clazz, cfg,
                    hostUniquePrefix + startPosition,
                    hostUniquePrefix + startPosition + 100000000);
                workers.add(worker);
                worker.setName(testName + "-worker-" + i);
                worker.start();
                startPosition += 100000001;
            }

            logger().info("Workers started");
            logger().info("Waiting for workers to complete");

            List<String> failedWorkers = new LinkedList<>();

            for (Worker worker : workers) {
                boolean failed = false;

                try {
                    worker.join();
                }
                catch (Throwable e) {
                    logger().error("Worker " + worker.getName() + " waiting interrupted", e);
                    failed = true;
                }

                if (failed || worker.isFailed()) {
                    failedWorkers.add(worker.getName());
                    logger().info("Worker " + worker.getName() + " execution failed");
                }
                else
                    logger().info("Worker " + worker.getName() + " successfully completed");
            }

            printTestResultsHeader(testName, failedWorkers);
            printTestResultsStatistics(testName, workers);
        }
        finally {
            tearDown(cfg);
        }
    }

    /** */
    protected abstract Logger logger();

    /** */
    protected abstract Object setup(String logName);

    /** */
    protected void tearDown(Object obj) {
    }

    /** */
    @SuppressWarnings("unchecked")
    private Worker createWorker(Class clazz, Object cfg, long startPosition, long endPosition) {
        try {
            Class cfgCls = cfg instanceof Ignite ? Ignite.class : CacheStore.class;

            Constructor ctor = clazz.getConstructor(cfgCls, long.class, long.class);

            return (Worker)ctor.newInstance(cfg, startPosition, endPosition);
        }
        catch (Throwable e) {
            logger().error("Failed to instantiate worker of class '" + clazz.getName() + "'", e);
            throw new RuntimeException("Failed to instantiate worker of class '" + clazz.getName() + "'", e);
        }
    }

    /** */
    private void printTestResultsHeader(String testName, List<String> failedWorkers) {
        if (failedWorkers.isEmpty()) {
            logger().info(testName + " test execution successfully completed.");
            return;
        }

        if (failedWorkers.size() == TestsHelper.getLoadTestsThreadsCount()) {
            logger().error(testName + " test execution totally failed.");
            return;
        }

        String strFailedWorkers = "";

        for (String workerName : failedWorkers) {
            if (!strFailedWorkers.isEmpty())
                strFailedWorkers += ", ";

            strFailedWorkers += workerName;
        }

        logger().warn(testName + " test execution completed, but " + failedWorkers.size() + " of " +
            TestsHelper.getLoadTestsThreadsCount() + " workers failed. Failed workers: " + strFailedWorkers);
    }

    /** */
    @SuppressWarnings("StringBufferReplaceableByString")
    private void printTestResultsStatistics(String testName, List<Worker> workers) {
        long cnt = 0;
        long errCnt = 0;
        long speed = 0;

        for (Worker worker : workers) {
            cnt += worker.getMsgProcessed();
            errCnt += worker.getErrorsCount();
            speed += worker.getSpeed();
        }

        float errPercent = errCnt == 0 ?
            0 :
            cnt + errCnt == 0 ? 0 : (float)(errCnt * 100 ) / (float)(cnt + errCnt);

        StringBuilder builder = new StringBuilder();
        builder.append(SystemHelper.LINE_SEPARATOR);
        builder.append("-------------------------------------------------");
        builder.append(SystemHelper.LINE_SEPARATOR);
        builder.append(testName).append(" test statistics").append(SystemHelper.LINE_SEPARATOR);
        builder.append(testName).append(" messages: ").append(cnt).append(SystemHelper.LINE_SEPARATOR);
        builder.append(testName).append(" errors: ").append(errCnt).append(", ").
                append(String.format("%.2f", errPercent).replace(",", ".")).
                append("%").append(SystemHelper.LINE_SEPARATOR);
        builder.append(testName).append(" speed: ").append(speed).append(" msg/sec").append(SystemHelper.LINE_SEPARATOR);
        builder.append("-------------------------------------------------");

        logger().info(builder.toString());
    }

    /** */
    private long getHostUniquePrefix() {
        String[] parts = SystemHelper.HOST_IP.split("\\.");

        if (parts[2].equals("0"))
            parts[2] = "777";

        if (parts[3].equals("0"))
            parts[3] = "777";

        long part3 = Long.parseLong(parts[2]);
        long part4 = Long.parseLong(parts[3]);

        if (part3 < 10)
            part3 *= 100;
        else if (part4 < 100)
            part3 *= 10;

        if (part4 < 10)
            part4 *= 100;
        else if (part4 < 100)
            part4 *= 10;

        return (part4 * 100000000000000L) + (part3 * 100000000000L) + Thread.currentThread().getId();
    }
}
