/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.testframework;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.lang.annotation.Annotation;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.ServerSocket;
import java.nio.file.attribute.PosixFilePermission;
import java.security.AccessController;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.PrivilegedAction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.CacheException;
import javax.cache.configuration.Factory;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.client.ssl.GridSslBasicContextFactory;
import org.apache.ignite.internal.client.ssl.GridSslContextFactory;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.util.GridBusyLock;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridAbsClosure;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.ssl.SslContextFactory;
import org.apache.ignite.testframework.config.GridTestProperties;
import org.apache.ignite.testframework.junits.GridAbstractTest;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

/**
 * Utility class for tests.
 */
public final class GridTestUtils {
    /** Default busy wait sleep interval in milliseconds.  */
    public static final long DFLT_BUSYWAIT_SLEEP_INTERVAL = 200;

    /** */
    public static final long DFLT_TEST_TIMEOUT = 5 * 60 * 1000;

    /** */
    static final String ALPHABETH = "qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM1234567890_";

    /**
     * Hook object intervenes to discovery message handling
     * and thus allows to make assertions or other actions like skipping certain discovery messages.
     */
    public static class DiscoveryHook {
        /**
         * @param msg Message.
         */
        public void handleDiscoveryMessage(DiscoverySpiCustomMessage msg) {
        }

        /**
         * @param ignite Ignite.
         */
        public void ignite(IgniteEx ignite) {
            // No-op.
        }
    }

    /**
     * Injects {@link DiscoveryHook} into handling logic.
     */
    public static final class DiscoverySpiListenerWrapper implements DiscoverySpiListener {
        /** */
        private final DiscoverySpiListener delegate;

        /** */
        private final DiscoveryHook hook;

        /**
         * @param delegate Delegate.
         * @param hook Hook.
         */
        private DiscoverySpiListenerWrapper(DiscoverySpiListener delegate, DiscoveryHook hook) {
            this.hook = hook;
            this.delegate = delegate;
        }

        /** {@inheritDoc} */
        @Override public IgniteFuture<?> onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, @Nullable Map<Long, Collection<ClusterNode>> topHist, @Nullable DiscoverySpiCustomMessage spiCustomMsg) {
            hook.handleDiscoveryMessage(spiCustomMsg);

            return delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, spiCustomMsg);
        }

        /** {@inheritDoc} */
        @Override public void onLocalNodeInitialized(ClusterNode locNode) {
            delegate.onLocalNodeInitialized(locNode);
        }

        /**
         * @param delegate Delegate.
         * @param discoveryHook Discovery hook.
         */
        public static DiscoverySpiListener wrap(DiscoverySpiListener delegate, DiscoveryHook discoveryHook) {
            return new DiscoverySpiListenerWrapper(delegate, discoveryHook);
        }
    }

    /** Test parameters scale factor util. */
    public static final class SF extends ScaleFactorUtil {

    }

    /** */
    private static final Map<Class<?>, String> addrs = new HashMap<>();

    /** */
    private static final Map<Class<? extends GridAbstractTest>, Integer> mcastPorts = new HashMap<>();

    /** */
    private static final Map<Class<? extends GridAbstractTest>, Integer> discoPorts = new HashMap<>();

    /** */
    private static final Map<Class<? extends GridAbstractTest>, Integer> commPorts = new HashMap<>();

    /** */
    private static int[] addr;

    /** */
    private static final int default_mcast_port = 50000;

    /** */
    private static final int max_mcast_port = 54999;

    /** */
    private static final int default_comm_port = 45000;

    /** */
    private static final int max_comm_port = 49999;

    /** */
    private static final int default_disco_port = 55000;

    /** */
    private static final int max_disco_port = 59999;

    /** */
    private static int mcastPort = default_mcast_port;

    /** */
    private static int discoPort = default_disco_port;

    /** */
    private static int commPort = default_comm_port;

    /** */
    private static final GridBusyLock busyLock = new GridBusyLock();

    /** */
    public static final ConcurrentMap<IgnitePair<UUID>, IgnitePair<Queue<Message>>> msgMap = new ConcurrentHashMap<>();

    /**
     * Ensure singleton.
     */
    private GridTestUtils() {
        // No-op.
    }

    /**
     * @param from From node ID.
     * @param to To node ID.
     * @param msg Message.
     * @param sent Sent or received.
     */
    public static void addMessage(UUID from, UUID to, Message msg, boolean sent) {
        IgnitePair<UUID> key = new IgnitePair<>(from, to);

        IgnitePair<Queue<Message>> val = msgMap.get(key);

        if (val == null) {
            IgnitePair<Queue<Message>> old = msgMap.putIfAbsent(key,
                val = new IgnitePair<Queue<Message>>(
                    new ConcurrentLinkedQueue<Message>(), new ConcurrentLinkedQueue<Message>()));

            if (old != null)
                val = old;
        }

        (sent ? val.get1() : val.get2()).add(msg);
    }

    /**
     * Dumps all messages tracked with {@link #addMessage(UUID, UUID, Message, boolean)} to std out.
     */
    public static void dumpMessages() {
        for (Map.Entry<IgnitePair<UUID>, IgnitePair<Queue<Message>>> entry : msgMap.entrySet()) {
            U.debug("\n" + entry.getKey().get1() + " [sent to] " + entry.getKey().get2());

            for (Message message : entry.getValue().get1())
                U.debug("\t" + message);

            U.debug(entry.getKey().get2() + " [received from] " + entry.getKey().get1());

            for (Message message : entry.getValue().get2())
                U.debug("\t" + message);
        }
    }

//    static {
//        new Thread(new Runnable() {
//            @Override public void run() {
//                JOptionPane.showMessageDialog(null, "Close this to dump messages.");
//
//                dumpMessages();
//            }
//        }).start();
//    }

    /**
     * Checks that string {@param str} doesn't contains substring {@param substr}. Logs both strings
     * and throws {@link java.lang.AssertionError}, if contains.
     *
     * @param log Logger (optional).
     * @param str String.
     * @param substr Substring.
     */
    public static void assertNotContains(@Nullable IgniteLogger log, String str, String substr) {
        try {
            assertFalse(str.contains(substr));
        } catch (AssertionError e) {
            U.warn(log, String.format("String contain substring: '%s', but shouldn't:", substr));
            U.warn(log, "String:");
            U.warn(log, str);

            throw e;
        }
    }

    /**
     * Checks that string {@param str} contains substring {@param substr}. Logs both strings
     * and throws {@link java.lang.AssertionError}, if not.
     *
     * @param log Logger (optional).
     * @param str String.
     * @param substr Substring.
     */
    public static void assertContains(@Nullable IgniteLogger log, String str, String substr) {
        try {
            assertTrue(str != null && str.contains(substr));
        } catch (AssertionError e) {
            U.warn(log, String.format("String does not contain substring: '%s':", substr));
            U.warn(log, "String:");
            U.warn(log, str);

            throw e;
        }
    }

    /**
     * Checks that collection {@param col} contains element {@param elem}. Logs collection, element
     * and throws {@link java.lang.AssertionError}, if not.
     *
     * @param log Logger (optional).
     * @param col Collection.
     * @param elem Element.
     */
    public static <C extends Collection<T>, T> void assertContains(@Nullable IgniteLogger log, C col, T elem) {
        try {
            assertTrue(col.contains(elem));
        } catch (AssertionError e) {
            U.warn(log, String.format("Collection does not contain: '%s':", elem));
            U.warn(log, "Collection:");
            U.warn(log, col);

            throw e;
        }
    }

    /**
     * Checks that collection {@param col} doesn't contains element {@param str}. Logs collection, element
     * and throws {@link java.lang.AssertionError}, if contains.
     *
     * @param log Logger (optional).
     * @param col Collection.
     * @param elem Element.
     */
    public static <C extends Collection<T>, T> void assertNotContains(@Nullable IgniteLogger log, C col, T elem) {
        try {
            assertFalse(col.contains(elem));
        } catch (AssertionError e) {
            U.warn(log, String.format("Collection contain element: '%s' but shouldn't:", elem));
            U.warn(log, "Collection:");
            U.warn(log, col);

            throw e;
        }
    }

    /**
     * Checks whether callable throws expected exception or not.
     *
     * @param log Logger (optional).
     * @param call Callable.
     * @param cls Exception class.
     * @param msg Exception message (optional). If provided exception message
     *      and this message should be equal.
     * @return Thrown throwable.
     */
    public static Throwable assertThrows(@Nullable IgniteLogger log, Callable<?> call,
        Class<? extends Throwable> cls, @Nullable String msg) {
        assert call != null;
        assert cls != null;

        try {
            call.call();
        }
        catch (Throwable e) {
            if (cls != e.getClass() && !cls.isAssignableFrom(e.getClass())) {
                if (e.getClass() == CacheException.class && e.getCause() != null && e.getCause().getClass() == cls)
                    e = e.getCause();
                else {
                    U.error(log, "Unexpected exception.", e);

                    fail("Exception class is not as expected [expected=" + cls + ", actual=" + e.getClass() + ']', e);
                }
            }

            if (msg != null && (e.getMessage() == null || !e.getMessage().contains(msg))) {
                U.error(log, "Unexpected exception message.", e);

                fail("Exception message is not as expected [expected=" + msg + ", actual=" + e.getMessage() + ']', e);
            }

            if (log != null) {
                if (log.isInfoEnabled())
                    log.info("Caught expected exception: " + e.getMessage());
            }
            else
                X.println("Caught expected exception: " + e.getMessage());

            return e;
        }

        throw new AssertionError("Exception has not been thrown.");
    }

    /**
     * Checks whether callable throws an exception with specified cause.
     *
     * @param log Logger (optional).
     * @param call Callable.
     * @param cls Exception class.
     * @param msg Exception message (optional). If provided exception message
     *      and this message should be equal.
     * @return Thrown throwable.
     */
    public static Throwable assertThrowsAnyCause(@Nullable IgniteLogger log, Callable<?> call,
        Class<? extends Throwable> cls, @Nullable String msg) {
        assert call != null;
        assert cls != null;

        try {
            call.call();
        }
        catch (Throwable e) {
            Throwable t = e;

            while (t != null) {
                if (cls == t.getClass() && (msg == null || (t.getMessage() != null && t.getMessage().contains(msg)))) {
                    if (log != null && log.isInfoEnabled())
                        log.info("Caught expected exception: " + t.getMessage());

                    return t;
                }

                t = t.getCause();
            }

            fail("Unexpected exception", e);
        }

        throw new AssertionError("Exception has not been thrown.");
    }

    /**
     * Checks whether callable throws expected exception or its child or not.
     *
     * @param log Logger (optional).
     * @param call Callable.
     * @param cls Exception class.
     * @param msg Exception message (optional). If provided exception message
     *      and this message should be equal.
     * @return Thrown throwable.
     */
    @Nullable public static Throwable assertThrowsInherited(@Nullable IgniteLogger log, Callable<?> call,
        Class<? extends Throwable> cls, @Nullable String msg) {
        assert call != null;
        assert cls != null;

        try {
            call.call();
        }
        catch (Throwable e) {
            if (!cls.isAssignableFrom(e.getClass()))
                fail("Exception class is not as expected [expected=" + cls + ", actual=" + e.getClass() + ']', e);

            if (msg != null && (e.getMessage() == null || !e.getMessage().startsWith(msg)))
                fail("Exception message is not as expected [expected=" + msg + ", actual=" + e.getMessage() + ']', e);

            if (log != null) {
                if (log.isDebugEnabled())
                    log.debug("Caught expected exception: " + e.getMessage());
            }
            else
                X.println("Caught expected exception: " + e.getMessage());

            return e;
        }

        throw new AssertionError("Exception has not been thrown.");
    }

    /**
     * Checks whether callable throws exception, which is itself of a specified
     * class, or has a cause of the specified class.
     *
     * @param runnable Runnable.
     * @param cls Expected class.
     * @return Thrown throwable.
     */
    @Nullable public static Throwable assertThrowsWithCause(Runnable runnable, Class<? extends Throwable> cls) {
        return assertThrowsWithCause(new Callable<Integer>() {
            @Override public Integer call() throws Exception {
                runnable.run();

                return 0;
            }
        }, cls);
    }

    /**
     * Checks whether callable throws exception, which is itself of a specified
     * class, or has a cause of the specified class.
     *
     * @param call Callable.
     * @param cls Expected class.
     * @return Thrown throwable.
     */
    @Nullable public static Throwable assertThrowsWithCause(Callable<?> call, Class<? extends Throwable> cls) {
        assert call != null;
        assert cls != null;

        try {
            call.call();
        }
        catch (Throwable e) {
            if (!X.hasCause(e, cls))
                fail("Exception is neither of a specified class, nor has a cause of the specified class: " + cls, e);

            return e;
        }

        throw new AssertionError("Exception has not been thrown.");
    }

    /**
     * Checks whether closure throws exception, which is itself of a specified
     * class, or has a cause of the specified class.
     *
     * @param call Closure.
     * @param p Parameter passed to closure.
     * @param cls Expected class.
     * @return Thrown throwable.
     */
    public static <P> Throwable assertThrowsWithCause(IgniteInClosure<P> call, P p, Class<? extends Throwable> cls) {
        assert call != null;
        assert cls != null;

        try {
            call.apply(p);
        }
        catch (Throwable e) {
            if (!X.hasCause(e, cls))
                fail("Exception is neither of a specified class, nor has a cause of the specified class: " + cls, e);

            return e;
        }

        throw new AssertionError("Exception has not been thrown.");
    }

    /**
     * Asserts that the specified runnable completes within the specified timeout.
     *
     * @param msg Assertion message in case of timeout.
     * @param timeout Timeout.
     * @param timeUnit Timeout {@link TimeUnit}.
     * @param runnable {@link Runnable} to check.
     * @throws Exception In case of any exception distinct from {@link TimeoutException}.
     */
    public static void assertTimeout(String msg, long timeout, TimeUnit timeUnit, Runnable runnable) throws Exception {
        ExecutorService executorSvc = Executors.newSingleThreadExecutor();
        Future<?> fut = executorSvc.submit(runnable);

        try {
            fut.get(timeout, timeUnit);
        }
        catch (TimeoutException ignored) {
            fail(msg, null);
        }
        finally {
            executorSvc.shutdownNow();
        }
    }

    /**
     * Asserts that the specified runnable completes within the specified timeout.
     *
     * @param timeout Timeout.
     * @param timeUnit Timeout {@link TimeUnit}.
     * @param runnable {@link Runnable} to check.
     * @throws Exception In case of any exception distinct from {@link TimeoutException}.
     */
    public static void assertTimeout(long timeout, TimeUnit timeUnit, Runnable runnable) throws Exception {
        assertTimeout("Timeout occurred.", timeout, timeUnit, runnable);
    }

    /**
     * Throw assertion error with specified error message and initialized cause.
     *
     * @param msg Error message.
     * @param cause Error cause.
     * @return Assertion error.
     */
    private static AssertionError fail(String msg, @Nullable Throwable cause) {
        AssertionError e = new AssertionError(msg);

        if (cause != null)
            e.initCause(cause);

        throw e;
    }

    /**
     * Checks whether object's method call throws expected exception or not.
     *
     * @param log Logger (optional).
     * @param cls Exception class.
     * @param msg Exception message (optional). If provided exception message
     *      and this message should be equal.
     * @param obj Object to invoke method for.
     * @param mtd Object's method to invoke.
     * @param params Method parameters.
     * @return Thrown throwable.
     */
    @Nullable public static Throwable assertThrows(@Nullable IgniteLogger log, Class<? extends Throwable> cls,
        @Nullable String msg, final Object obj, final String mtd, final Object... params) {
        return assertThrows(log, new Callable() {
            @Override public Object call() throws Exception {
                return invoke(obj, mtd, params);
            }
        }, cls, msg);
    }

    /**
     * Asserts that each element in iterable has one-to-one correspondence with a
     * predicate from list.
     *
     * @param it Input iterable of elements.
     * @param ps Array of predicates (by number of elements in iterable).
     */
    public static <T> void assertOneToOne(Iterable<T> it, IgnitePredicate<T>... ps) {
        Collection<IgnitePredicate<T>> ps0 = new ArrayList<>(Arrays.asList(ps));
        Collection<T2<IgnitePredicate<T>, T>> passed = new ArrayList<>();

        for (T elem : it) {
            for (T2<IgnitePredicate<T>, T> p : passed) {
                if (p.get1().apply(elem))
                    throw new AssertionError("Two elements match one predicate [elem1=" + p.get2() +
                        ", elem2=" + elem + ", pred=" + p.get1() + ']');
            }

            IgnitePredicate<T> matched = null;

            for (IgnitePredicate<T> p : ps0) {
                if (p.apply(elem)) {
                    if (matched != null)
                        throw new AssertionError("Element matches more than one predicate [elem=" + elem +
                            ", pred1=" + p + ", pred2=" + matched + ']');

                    matched = p;
                }
            }

            if (matched == null) // None matched.
                throw new AssertionError("The element does not match [elem=" + elem +
                    ", numRemainingPreds=" + ps0.size() + ']');

            ps0.remove(matched);
            passed.add(new T2<>(matched, elem));
        }
    }

    /**
     * Every invocation of this method will never return a
     * repeating multicast port for a different test case.
     *
     * @param cls Class.
     * @return Next multicast port.
     */
    public static synchronized int getNextMulticastPort(Class<? extends GridAbstractTest> cls) {
        Integer portRet = mcastPorts.get(cls);

        if (portRet != null)
            return portRet;

        int startPort = mcastPort;

        while (true) {
            if (mcastPort >= max_mcast_port)
                mcastPort = default_mcast_port;
            else
                mcastPort++;

            if (startPort == mcastPort)
                break;

            portRet = mcastPort;

            MulticastSocket sock = null;

            try {
                sock = new MulticastSocket(portRet);

                break;
            }
            catch (IOException ignored) {
                // No-op.
            }
            finally {
                U.closeQuiet(sock);
            }
        }

        // Cache port to be reused by the same test.
        mcastPorts.put(cls, portRet);

        return portRet;
    }

    /**
     * Every invocation of this method will never return a
     * repeating communication port for a different test case.
     *
     * @param cls Class.
     * @return Next communication port.
     */
    public static synchronized int getNextCommPort(Class<? extends GridAbstractTest> cls) {
        Integer portRet = commPorts.get(cls);

        if (portRet != null)
            return portRet;

        if (commPort >= max_comm_port)
            commPort = default_comm_port;
        else
            // Reserve 10 ports per test.
            commPort += 10;

        portRet = commPort;

        // Cache port to be reused by the same test.
        commPorts.put(cls, portRet);

        return portRet;
    }

    /**
     * Every invocation of this method will never return a
     * repeating discovery port for a different test case.
     *
     * @param cls Class.
     * @return Next discovery port.
     */
    public static synchronized int getNextDiscoPort(Class<? extends GridAbstractTest> cls) {
        Integer portRet = discoPorts.get(cls);

        if (portRet != null)
            return portRet;

        if (discoPort >= max_disco_port)
            discoPort = default_disco_port;
        else
            discoPort += 10;

        portRet = discoPort;

        // Cache port to be reused by the same test.
        discoPorts.put(cls, portRet);

        return portRet;
    }

    /**
     * @return Free communication port number on localhost.
     * @throws IOException If unable to find a free port.
     */
    public static int getFreeCommPort() throws IOException {
        for (int port = default_comm_port; port < max_comm_port; port++) {
            try (ServerSocket sock = new ServerSocket(port)) {
                return sock.getLocalPort();
            }
            catch (IOException ignored) {
                // No-op.
            }
        }

        throw new IOException("Unable to find a free communication port.");
    }

    /**
     * Every invocation of this method will never return a
     * repeating multicast group for a different test case.
     *
     * @param cls Class.
     * @return Next multicast group.
     */
    public static synchronized String getNextMulticastGroup(Class<?> cls) {
        String addrStr = addrs.get(cls);

        if (addrStr != null)
            return addrStr;

        // Increment address.
        if (addr[3] == 255) {
            if (addr[2] == 255)
                assert false;
            else {
                addr[2] += 1;

                addr[3] = 1;
            }
        }
        else
            addr[3] += 1;

        // Convert address to string.
        StringBuilder b = new StringBuilder(15);

        for (int i = 0; i < addr.length; i++) {
            b.append(addr[i]);

            if (i < addr.length - 1)
                b.append('.');
        }

        addrStr = b.toString();

        // Cache address to be reused by the same test.
        addrs.put(cls, addrStr);

        return addrStr;
    }

    /**
     * Runs runnable object in specified number of threads.
     *
     * @param run Target runnable.
     * @param threadNum Number of threads.
     * @param threadName Thread name.
     * @return Execution time in milliseconds.
     * @throws Exception Thrown if at least one runnable execution failed.
     */
    public static long runMultiThreaded(Runnable run, int threadNum, String threadName) throws Exception {
        return runMultiThreaded(makeCallable(run, null), threadNum, threadName);
    }

    /**
     * Runs runnable object in specified number of threads.
     *
     * @param run Target runnable.
     * @param threadNum Number of threads.
     * @param threadName Thread name.
     * @return Future for the run. Future returns execution time in milliseconds.
     */
    public static IgniteInternalFuture<Long> runMultiThreadedAsync(Runnable run, int threadNum, String threadName) {
        return runMultiThreadedAsync(makeCallable(run, null), threadNum, threadName);
    }

    /**
     * Runs callable object in specified number of threads.
     *
     * @param call Callable.
     * @param threadNum Number of threads.
     * @param threadName Thread names.
     * @return Execution time in milliseconds.
     * @throws Exception If failed.
     */
    public static long runMultiThreaded(Callable<?> call, int threadNum, String threadName) throws Exception {
        List<Callable<?>> calls = Collections.<Callable<?>>nCopies(threadNum, call);

        return runMultiThreaded(calls, threadName);
    }

    /**
     * @param call Closure that receives thread index.
     * @param threadNum Number of threads.
     * @param threadName Thread names.
     * @return Execution time in milliseconds.
     * @throws Exception If failed.
     */
    public static long runMultiThreaded(final IgniteInClosure<Integer> call, int threadNum, String threadName)
        throws Exception {
        List<Callable<?>> calls = new ArrayList<>(threadNum);

        for (int i = 0; i < threadNum; i++) {
            final int idx = i;

            calls.add(new Callable<Void>() {
                @Override public Void call() throws Exception {
                    call.apply(idx);

                    return null;
                }
            });
        }

        return runMultiThreaded(calls, threadName);
    }

    /**
     * Runs callable object in specified number of threads.
     *
     * @param call Callable.
     * @param threadNum Number of threads.
     * @param threadName Thread names.
     * @return Future for the run. Future returns execution time in milliseconds.
     */
    public static IgniteInternalFuture<Long> runMultiThreadedAsync(Callable<?> call, int threadNum, final String threadName) {
        final List<Callable<?>> calls = Collections.<Callable<?>>nCopies(threadNum, call);
        final GridTestSafeThreadFactory threadFactory = new GridTestSafeThreadFactory(threadName);

        IgniteInternalFuture<Long> runFut = runAsync(() -> runMultiThreaded(calls, threadFactory));

        GridFutureAdapter<Long> resFut = new GridFutureAdapter<Long>() {
            @Override public boolean cancel() throws IgniteCheckedException {
                super.cancel();

                if (isDone())
                    return false;

                runFut.cancel();

                threadFactory.interruptAllThreads();

                return onCancelled();
            }
        };

        runFut.listen(fut -> {
            try {
                resFut.onDone(fut.get());
            }
            catch (IgniteFutureCancelledCheckedException e) {
                resFut.onCancelled();
            }
            catch (Throwable e) {
                resFut.onDone(e);
            }
        });

        return resFut;
    }

    /**
     * Runs callable tasks each in separate threads.
     *
     * @param calls Callable tasks.
     * @param threadName Thread name.
     * @return Execution time in milliseconds.
     * @throws Exception If failed.
     */
    public static long runMultiThreaded(Iterable<Callable<?>> calls, String threadName) throws Exception {
        return runMultiThreaded(calls, new GridTestSafeThreadFactory(threadName));
    }

    /**
     * Runs callable tasks each in separate threads.
     *
     * @param calls Callable tasks.
     * @param threadFactory Thread factory.
     * @return Execution time in milliseconds.
     * @throws Exception If failed.
     */
    public static long runMultiThreaded(Iterable<Callable<?>> calls, GridTestSafeThreadFactory threadFactory)
        throws Exception {
        if (!busyLock.enterBusy())
            throw new IllegalStateException("Failed to start new threads (test is being stopped).");

        Collection<Thread> threads = new ArrayList<>();
        long time;

        try {
            for (Callable<?> call : calls)
                threads.add(threadFactory.newThread(call));

            time = System.currentTimeMillis();

            for (Thread t : threads)
                t.start();
        }
        finally {
            busyLock.leaveBusy();
        }

        // Wait threads finish their job.
        try {
            for (Thread t : threads)
                t.join();
        } catch (InterruptedException e) {
            for (Thread t : threads)
                t.interrupt();

            throw e;
        }

        time = System.currentTimeMillis() - time;

        // Validate errors happens
        threadFactory.checkError();

        return time;
    }

    /**
     * Runs runnable task asyncronously.
     *
     * @param task Runnable.
     * @return Future with task result.
     */
    public static IgniteInternalFuture runAsync(final Runnable task) {
        return runAsync(task,"async-runnable-runner");
    }

    /**
     * Runs runnable task asyncronously.
     *
     * @param task Runnable.
     * @return Future with task result.
     */
    public static IgniteInternalFuture runAsync(final Runnable task, String threadName) {
        return runAsync(() -> {
            task.run();

            return null;
        }, threadName);
    }

    /**
     * Runs callable task asyncronously.
     *
     * @param task Callable.
     * @return Future with task result.
     */
    public static <T> IgniteInternalFuture<T> runAsync(final Callable<T> task) {
        return runAsync(task, "async-callable-runner");
    }

    /**
     * Runs callable task asyncronously.
     *
     * @param task Callable.
     * @param threadName Thread name.
     * @return Future with task result.
     */
    public static <T> IgniteInternalFuture<T> runAsync(final Callable<T> task, String threadName) {
        if (!busyLock.enterBusy())
            throw new IllegalStateException("Failed to start new threads (test is being stopped).");

        try {
            final GridTestSafeThreadFactory thrFactory = new GridTestSafeThreadFactory(threadName);

            final GridFutureAdapter<T> fut = new GridFutureAdapter<T>() {
                @Override public boolean cancel() throws IgniteCheckedException {
                    super.cancel();

                    if (isDone())
                        return false;

                    thrFactory.interruptAllThreads();

                    try {
                        get();

                        return false;
                    }
                    catch (IgniteFutureCancelledCheckedException e) {
                        return true;
                    }
                    catch (IgniteCheckedException e) {
                        return false;
                    }
                }
            };

            thrFactory.newThread(() -> {
                try {
                    // Execute task.
                    T res = task.call();

                    fut.onDone(res);
                }
                catch (InterruptedException e) {
                    fut.onCancelled();
                }
                catch (Throwable e) {
                    fut.onDone(e);
                }
            }).start();

            return fut;
        }
        finally {
            busyLock.leaveBusy();
        }
    }

    /**
     * Wait for all passed futures to complete even if they fail.
     *
     * @param futs Futures.
     * @throws AssertionError Suppresses underlying exceptions if some futures failed.
     */
    public static void waitForAllFutures(IgniteInternalFuture<?>... futs) {
        AssertionError err = null;

        for (IgniteInternalFuture<?> fut : futs) {
            try {
                fut.get();
            }
            catch (Throwable t) {
                if (err == null)
                    err = new AssertionError("One or several futures threw the exception.");

                err.addSuppressed(t);
            }
        }

        if (err != null)
            throw err;
    }

    /**
     * Interrupts and waits for termination of all the threads started
     * so far by current test.
     *
     * @param log Logger.
     */
    public static void stopThreads(IgniteLogger log) {
        busyLock.block();

        try {
            GridTestSafeThreadFactory.stopAllThreads(log);
        }
        finally {
            busyLock.unblock();
        }
    }

    /**
     * @return Ignite home.
     * @throws Exception If failed.
     */
    @SuppressWarnings({"ProhibitedExceptionThrown"})
    public static String getIgniteHome() throws Exception {
        String ggHome = System.getProperty("IGNITE_HOME");

        if (ggHome == null)
            ggHome = System.getenv("IGNITE_HOME");

        if (ggHome == null)
            throw new Exception("IGNITE_HOME parameter must be set either as system or environment variable.");

        File dir = new File(ggHome);

        if (!dir.exists())
            throw new Exception("Ignite home does not exist [ignite-home=" + dir.getAbsolutePath() + ']');

        if (!dir.isDirectory())
            throw new Exception("Ignite home is not a directory [ignite-home=" + dir.getAbsolutePath() + ']');

        return ggHome;
    }

    /**
     * @param <T> Type.
     * @param cls Class.
     * @param annCls Annotation class.
     * @return Annotation.
     */
    @Nullable public static <T extends Annotation> T getAnnotation(Class<?> cls, Class<T> annCls) {
        for (Class<?> cls0 = cls; cls0 != null; cls0 = cls0.getSuperclass()) {
            T ann = cls0.getAnnotation(annCls);

            if (ann != null)
                return ann;
        }

        return null;
    }

    /**
     * Initializes address.
     */
    static {
        InetAddress locHost = null;

        try {
            locHost = U.getLocalHost();
        }
        catch (IOException e) {
            assert false : "Unable to get local address. This leads to the same multicast addresses " +
                "in the local network.";
        }

        if (locHost != null) {
            int thirdByte = locHost.getAddress()[3];

            if (thirdByte < 0)
                thirdByte += 256;

            // To get different addresses for different machines.
            addr = new int[] {229, thirdByte, 1, 1};
        }
        else
            addr = new int[] {229, 1, 1, 1};
    }

    /**
     * @param path Path.
     * @param startFilter Start filter.
     * @param endFilter End filter.
     * @return List of JARs that corresponds to the filters.
     * @throws IOException If failed.
     */
    private static Collection<String> getFiles(String path, @Nullable final String startFilter,
        @Nullable final String endFilter) throws IOException {
        Collection<String> res = new ArrayList<>();

        File file = new File(path);

        assert file.isDirectory();

        File[] jars = file.listFiles(new FilenameFilter() {
            /**
             * @see FilenameFilter#accept(File, String)
             */
            @SuppressWarnings({"UnnecessaryJavaDocLink"})
            @Override public boolean accept(File dir, String name) {
                // Exclude spring.jar because it tries to load META-INF/spring-handlers.xml from
                // all available JARs and create instances of classes from there for example.
                // Exclude logging as it is used by spring and casted to Log interface.
                // Exclude log4j because of the design - 1 per VM.
                if (name.startsWith("spring") || name.startsWith("log4j") ||
                    name.startsWith("commons-logging") || name.startsWith("junit") ||
                    name.startsWith("ignite-tests"))
                    return false;

                boolean ret = true;

                if (startFilter != null)
                    ret = name.startsWith(startFilter);

                if (ret && endFilter != null)
                    ret = name.endsWith(endFilter);

                return ret;
            }
        });

        for (File jar : jars)
            res.add(jar.getCanonicalPath());

        return res;
    }

    /**
     * Silent stop grid.
     * Method doesn't throw any exception.
     *
     * @param ignite Grid to stop.
     * @param log Logger.
     */
    public static void close(Ignite ignite, IgniteLogger log) {
        if (ignite != null)
            try {
                G.stop(ignite.name(), false);
            }
            catch (Throwable e) {
                U.error(log, "Failed to stop grid: " + ignite.name(), e);
            }
    }

    /**
     * Silent stop grid.
     * Method doesn't throw any exception.
     *
     * @param igniteInstanceName Ignite instance name.
     * @param log Logger.
     */
    public static void stopGrid(String igniteInstanceName, IgniteLogger log) {
        try {
            G.stop(igniteInstanceName, false);
        }
        catch (Throwable e) {
            U.error(log, "Failed to stop grid: " + igniteInstanceName, e);
        }
    }

    /**
     * Gets file representing the path passed in. First the check is made if path is absolute.
     * If not, then the check is made if path is relative to ${IGNITE_HOME}. If both checks fail,
     * then {@code null} is returned, otherwise file representing path is returned.
     * <p>
     * See {@link #getIgniteHome()} for information on how {@code IGNITE_HOME} is retrieved.
     *
     * @param path Path to resolve.
     * @return Resolved path, or {@code null} if file cannot be resolved.
     * @see #getIgniteHome()
     */
    @Nullable public static File resolveIgnitePath(String path) {
        return resolvePath(null, path);
    }

    /**
     * @param igniteHome Optional ignite home path.
     * @param path Path to resolve.
     * @return Resolved path, or {@code null} if file cannot be resolved.
     */
    @Nullable private static File resolvePath(@Nullable String igniteHome, String path) {
        File file = new File(path).getAbsoluteFile();

        if (!file.exists()) {
            String home = igniteHome != null ? igniteHome : U.getIgniteHome();

            if (home == null)
                return null;

            file = new File(home, path);

            return file.exists() ? file : null;
        }

        return file;
    }

    /**
     * @param cache Cache.
     * @return Cache context.
     */
    public static <K, V> GridCacheContext<K, V> cacheContext(IgniteCache<K, V> cache) {
        return ((IgniteKernal)cache.unwrap(Ignite.class)).<K, V>internalCache(cache.getName()).context();
    }

    /**
     * @param cache Cache.
     * @return Near cache.
     */
    public static <K, V> GridNearCacheAdapter<K, V> near(IgniteCache<K, V> cache) {
        return cacheContext(cache).near();
    }

    /**
     * @param cache Cache.
     * @return DHT cache.
     */
    public static <K, V> GridDhtCacheAdapter<K, V> dht(IgniteCache<K, V> cache) {
        return near(cache).dht();
    }

    /**
     * @param cacheName Cache name.
     * @param backups Number of backups.
     * @param log Logger.
     * @throws Exception If failed.
     */
    @SuppressWarnings("BusyWait")
    public static <K, V> void waitTopologyUpdate(@Nullable String cacheName, int backups, IgniteLogger log)
        throws Exception {
        for (Ignite g : Ignition.allGrids()) {
            IgniteCache<K, V> cache = ((IgniteEx)g).cache(cacheName);

            GridDhtPartitionTopology top = dht(cache).topology();

            while (true) {
                boolean wait = false;

                for (int p = 0; p < g.affinity(cacheName).partitions(); p++) {
                    Collection<ClusterNode> nodes = top.nodes(p, AffinityTopologyVersion.NONE);

                    if (nodes.size() > backups + 1) {
                        LT.warn(log, "Partition map was not updated yet (will wait) [igniteInstanceName=" + g.name() +
                            ", p=" + p + ", nodes=" + F.nodeIds(nodes) + ']');

                        wait = true;

                        break;
                    }
                }

                if (wait)
                    Thread.sleep(20);
                else
                    break; // While.
            }
        }
    }

    /**
     * Convert runnable tasks with callable.
     *
     * @param run Runnable task to convert into callable one.
     * @param res Callable result.
     * @param <T> The result type of method <tt>call</tt>, always {@code null}.
     * @return Callable task around the specified runnable one.
     */
    public static <T> Callable<T> makeCallable(final Runnable run, @Nullable final T res) {
        return new Callable<T>() {
            @Override public T call() throws Exception {
                run.run();
                return res;
            }
        };
    }

    /**
     * Get object field value via reflection.
     *
     * @param obj Object or class to get field value from.
     * @param cls Class.
     * @param fieldName Field names to get value for.
     * @param <T> Expected field class.
     * @return Field value.
     * @throws IgniteException In case of error.
     */
    public static <T> T getFieldValue(Object obj, Class cls, String fieldName) throws IgniteException {
        assert obj != null;
        assert fieldName != null;

        try {
            return (T)findField(cls, obj, fieldName);
        }
        catch (NoSuchFieldException | IllegalAccessException e) {
            throw new IgniteException("Failed to get object field [obj=" + obj +
                ", fieldName=" + fieldName + ']', e);
        }
    }

    /**
     * Get object field value via reflection.
     *
     * @param obj Object or class to get field value from.
     * @param fieldNames Field names to get value for: obj->field1->field2->...->fieldN.
     * @param <T> Expected field class.
     * @return Field value.
     * @throws IgniteException In case of error.
     */
    public static <T> T getFieldValue(Object obj, String... fieldNames) throws IgniteException {
        assert obj != null;
        assert fieldNames != null;
        assert fieldNames.length >= 1;

        try {
            for (String fieldName : fieldNames) {
                Class<?> cls = obj instanceof Class ? (Class)obj : obj.getClass();

                try {
                    obj = findField(cls, obj, fieldName);
                }
                catch (NoSuchFieldException e) {
                    // Resolve inner class, if not an inner field.
                    Class<?> innerCls = getInnerClass(cls, fieldName);

                    if (innerCls == null)
                        throw new IgniteException("Failed to get object field [obj=" + obj +
                            ", fieldNames=" + Arrays.toString(fieldNames) + ']', e);

                    obj = innerCls;
                }
            }

            return (T)obj;
        }
        catch (IllegalAccessException e) {
            throw new IgniteException("Failed to get object field [obj=" + obj +
                ", fieldNames=" + Arrays.toString(fieldNames) + ']', e);
        }
    }

    /**
     * Get object field value via reflection(including superclass).
     *
     * @param obj Object or class to get field value from.
     * @param fieldNames Field names to get value for: obj->field1->field2->...->fieldN.
     * @param <T> Expected field class.
     * @return Field value.
     * @throws IgniteException In case of error.
     */
    public static <T> T getFieldValueHierarchy(Object obj, String... fieldNames) throws IgniteException {
        assert obj != null;
        assert fieldNames != null;
        assert fieldNames.length >= 1;

        try {
            for (String fieldName : fieldNames) {
                Class<?> cls = obj instanceof Class ? (Class)obj : obj.getClass();

                while (cls != null) {
                    try {
                        obj = findField(cls, obj, fieldName);

                        break;
                    }
                    catch (NoSuchFieldException e) {
                        cls = cls.getSuperclass();
                    }
                }
            }

            return (T)obj;
        }
        catch (IllegalAccessException e) {
            throw new IgniteException("Failed to get object field [obj=" + obj +
                ", fieldNames=" + Arrays.toString(fieldNames) + ']', e);
        }
    }

    /**
     * @param cls Class for searching.
     * @param obj Target object.
     * @param fieldName Field name for search.
     * @return Field from object if it was found.
     */
    private static Object findField(Class<?> cls, Object obj,
        String fieldName) throws NoSuchFieldException, IllegalAccessException {
        // Resolve inner field.
        Field field = cls.getDeclaredField(fieldName);

        boolean accessible = field.isAccessible();

        if (!accessible)
            field.setAccessible(true);

        return field.get(obj);
    }

    /**
     * Change static final fields.
     * @param field Need to be changed.
     * @param newVal New value.
     * @throws Exception If failed.
     */
    public static void setFieldValue(Field field, Object newVal) throws Exception {
        field.setAccessible(true);
        Field modifiersField = Field.class.getDeclaredField("modifiers");

        AccessController.doPrivileged(new PrivilegedAction() {
            @Override
            public Object run() {
                modifiersField.setAccessible(true);
                return null;
            }
        });

        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
        field.set(null, newVal);
    }

    /**
     * Get inner class by its name from the enclosing class.
     *
     * @param parentCls Parent class to resolve inner class for.
     * @param innerClsName Name of the inner class.
     * @return Inner class.
     */
    @Nullable public static <T> Class<T> getInnerClass(Class<?> parentCls, String innerClsName) {
        for (Class<?> cls : parentCls.getDeclaredClasses())
            if (innerClsName.equals(cls.getSimpleName()))
                return (Class<T>)cls;

        return null;
    }

    /**
     * Set object field value via reflection.
     *
     * @param obj Object to set field value to.
     * @param fieldName Field name to set value for.
     * @param val New field value.
     * @throws IgniteException In case of error.
     */
    public static void setFieldValue(Object obj, String fieldName, Object val) throws IgniteException {
        assert obj != null;
        assert fieldName != null;

        try {
            Class<?> cls = obj instanceof Class ? (Class)obj : obj.getClass();

            Field field = cls.getDeclaredField(fieldName);

            boolean accessible = field.isAccessible();

            if (!accessible)
                field.setAccessible(true);

            field.set(obj, val);
        }
        catch (NoSuchFieldException | IllegalAccessException e) {
            throw new IgniteException("Failed to set object field [obj=" + obj + ", field=" + fieldName + ']', e);
        }
    }

    /**
     * Set object field value via reflection.
     *
     * @param obj Object to set field value to.
     * @param cls Class to get field from.
     * @param fieldName Field name to set value for.
     * @param val New field value.
     * @throws IgniteException In case of error.
     */
    public static void setFieldValue(Object obj, Class cls, String fieldName, Object val) throws IgniteException {
        assert fieldName != null;

        try {
            Field field = cls.getDeclaredField(fieldName);

            boolean accessible = field.isAccessible();

            if (!accessible)
                field.setAccessible(true);

            boolean isFinal = (field.getModifiers() & Modifier.FINAL) != 0;

            if (isFinal) {
                Field modifiersField = Field.class.getDeclaredField("modifiers");

                modifiersField.setAccessible(true);

                modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
            }

            field.set(obj, val);
        }
        catch (NoSuchFieldException | IllegalAccessException e) {
            throw new IgniteException("Failed to set object field [obj=" + obj + ", field=" + fieldName + ']', e);
        }
    }

    /**
     * Invoke method on an object.
     *
     * @param obj Object to call method on.
     * @param mtd Method to invoke.
     * @param params Parameters of the method.
     * @return Method invocation result.
     * @throws Exception If failed.
     */
    @Nullable public static <T> T invoke(Object obj, String mtd, Object... params) throws Exception {
        Class<?> cls = obj.getClass();

        do {
            // We cannot resolve method by parameter classes due to some of parameters can be null.
            // Search correct method among all methods collection.
            for (Method m : cls.getDeclaredMethods()) {
                // Filter methods by name.
                if (!m.getName().equals(mtd))
                    continue;

                if (!areCompatible(params, m.getParameterTypes()))
                    continue;

                try {
                    boolean accessible = m.isAccessible();

                    if (!accessible)
                        m.setAccessible(true);

                    return (T)m.invoke(obj, params);
                }
                catch (IllegalAccessException e) {
                    throw new RuntimeException("Failed to access method" +
                        " [obj=" + obj + ", mtd=" + mtd + ", params=" + Arrays.toString(params) + ']', e);
                }
                catch (InvocationTargetException e) {
                    Throwable cause = e.getCause();

                    if (cause instanceof Error)
                        throw (Error) cause;

                    if (cause instanceof Exception)
                        throw (Exception) cause;

                    throw new RuntimeException("Failed to invoke method)" +
                        " [obj=" + obj + ", mtd=" + mtd + ", params=" + Arrays.toString(params) + ']', e);
                }
            }

            cls = cls.getSuperclass();
        } while (cls != Object.class);

        throw new RuntimeException("Failed to find method" +
            " [obj=" + obj + ", mtd=" + mtd + ", params=" + Arrays.toString(params) + ']');
    }

    /**
     * Check objects and corresponding types are compatible.
     *
     * @param objs Objects array.
     * @param types Classes array.
     * @return Objects in array can be casted to corresponding types.
     */
    private static boolean areCompatible(Object[] objs, Class[] types) {
        if (objs.length != types.length)
            return false;

        for (int i = 0, size = objs.length; i < size; i++) {
            Object o = objs[i];

            if (o != null && !types[i].isInstance(o))
                return false;
        }

        return true;
    }

    /**
     * Tries few times to perform some assertion. In the worst case
     * {@code assertion} closure will be executed {@code retries} + 1 times and
     * thread will spend approximately {@code retries} * {@code retryInterval} sleeping.
     *
     * @param log Log.
     * @param retries Number of retries.
     * @param retryInterval Interval between retries in milliseconds.
     * @param c Closure with assertion. All {@link AssertionError}s thrown
     *      from this closure will be ignored {@code retries} times.
     * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If interrupted.
     */
    @SuppressWarnings("ErrorNotRethrown")
    public static void retryAssert(@Nullable IgniteLogger log, int retries, long retryInterval, GridAbsClosure c)
        throws IgniteInterruptedCheckedException {
        for (int i = 0; i < retries; i++) {
            try {
                c.apply();

                return;
            }
            catch (AssertionError e) {
                U.warn(log, "Check failed (will retry in " + retryInterval + "ms).", e);

                U.sleep(retryInterval);
            }
        }

        // Apply the last time without guarding try.
        c.apply();
    }

    /**
     * Reads entire file into byte array.
     *
     * @param file File to read.
     * @return Content of file in byte array.
     * @throws IOException If failed.
     */
    public static byte[] readFile(File file) throws IOException {
        assert file.exists();
        assert file.length() < Integer.MAX_VALUE;

        byte[] bytes = new byte[(int) file.length()];

        try (FileInputStream fis = new FileInputStream(file)) {
            int readBytesCnt = fis.read(bytes);
            assert readBytesCnt == bytes.length;
        }

        return bytes;
    }

    /**
     * Reads resource into byte array.
     *
     * @param classLoader Classloader.
     * @param resourceName Resource name.
     * @return Content of resorce in byte array.
     * @throws IOException If failed.
     */
    public static byte[] readResource(ClassLoader classLoader, String resourceName) throws IOException {
        try (InputStream is = classLoader.getResourceAsStream(resourceName)) {
            assertNotNull("Resource is missing: " + resourceName , is);

            try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
                U.copy(is, baos);

                return baos.toByteArray();
            }
        }
    }

    /**
     * Sleeps and increments an integer.
     * <p>
     * Allows for loops like the following:
     * <pre>{@code
     *     for (int i = 0; i < 20 && !condition; i = sleepAndIncrement(200, i)) {
     *         ...
     *     }
     * }</pre>
     * for busy-waiting limited number of iterations.
     *
     * @param sleepDur Sleep duration in milliseconds.
     * @param i Integer to increment.
     * @return Incremented value.
     * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If sleep was interrupted.
     */
    public static int sleepAndIncrement(int sleepDur, int i) throws IgniteInterruptedCheckedException {
        U.sleep(sleepDur);

        return i + 1;
    }

    /**
     * Waits for condition, polling in busy wait loop.
     *
     * @param cond Condition to wait for.
     * @param timeout Max time to wait in milliseconds.
     * @return {@code true} if condition was achieved, {@code false} otherwise.
     * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If interrupted.
     */
    public static boolean waitForCondition(GridAbsPredicate cond, long timeout) throws IgniteInterruptedCheckedException {
        long curTime = U.currentTimeMillis();
        long endTime = curTime + timeout;

        if (endTime < 0)
            endTime = Long.MAX_VALUE;

        while (curTime < endTime) {
            if (cond.apply())
                return true;

            U.sleep(DFLT_BUSYWAIT_SLEEP_INTERVAL);

            curTime = U.currentTimeMillis();
        }

        return false;
    }

    /**
     * Creates an SSL context from test key store with disabled trust manager.
     *
     * @return Initialized context.
     * @throws GeneralSecurityException In case if context could not be initialized.
     * @throws IOException If keystore cannot be accessed.
     */
    public static SSLContext sslContext() throws GeneralSecurityException, IOException {
        SSLContext ctx = SSLContext.getInstance("TLS");

        char[] storePass = keyStorePassword().toCharArray();

        KeyManagerFactory keyMgrFactory = KeyManagerFactory.getInstance("SunX509");

        KeyStore keyStore = KeyStore.getInstance("JKS");

        keyStore.load(new FileInputStream(U.resolveIgnitePath(GridTestProperties.getProperty("ssl.keystore.path"))),
            storePass);

        keyMgrFactory.init(keyStore, storePass);

        ctx.init(keyMgrFactory.getKeyManagers(),
            new TrustManager[]{GridSslBasicContextFactory.getDisabledTrustManager()}, null);

        return ctx;
    }

    /**
     * Creates test-purposed SSL context factory from test key store with disabled trust manager.
     *
     * @return SSL context factory used in test.
     */
    public static GridSslContextFactory sslContextFactory() {
        GridSslBasicContextFactory factory = new GridSslBasicContextFactory();

        factory.setKeyStoreFilePath(
            U.resolveIgnitePath(GridTestProperties.getProperty("ssl.keystore.path")).getAbsolutePath());
        factory.setKeyStorePassword(keyStorePassword().toCharArray());

        factory.setTrustManagers(GridSslBasicContextFactory.getDisabledTrustManager());

        return factory;
    }

    /**
     * Creates test-purposed SSL context factory from test key store with disabled trust manager.
     *
     * @return SSL context factory used in test.
     */
    public static Factory<SSLContext> sslFactory() {
        SslContextFactory factory = new SslContextFactory();

        factory.setKeyStoreFilePath(
            U.resolveIgnitePath(GridTestProperties.getProperty("ssl.keystore.path")).getAbsolutePath());
        factory.setKeyStorePassword(keyStorePassword().toCharArray());

        factory.setTrustManagers(SslContextFactory.getDisabledTrustManager());

        return factory;
    }

    /**
     * Creates test-purposed SSL context factory from specified key store and trust store.
     *
     * @param keyStore Key store name.
     * @param trustStore Trust store name.
     * @return SSL context factory used in test.
     */
    public static Factory<SSLContext> sslTrustedFactory(String keyStore, String trustStore) {
        SslContextFactory factory = new SslContextFactory();

        factory.setKeyStoreFilePath(keyStorePath(keyStore));
        factory.setKeyStorePassword(keyStorePassword().toCharArray());
        factory.setTrustStoreFilePath(keyStorePath(trustStore));
        factory.setTrustStorePassword(keyStorePassword().toCharArray());

        return factory;
    }

    public static String keyStorePassword() {
        return GridTestProperties.getProperty("ssl.keystore.password");
    }

    @NotNull public static String keyStorePath(String keyStore) {
        return U.resolveIgnitePath(GridTestProperties.getProperty(
            "ssl.keystore." + keyStore + ".path")).getAbsolutePath();
    }

    /**
     * @param o1 Object 1.
     * @param o2 Object 2.
     * @return Equals or not.
     */
    public static boolean deepEquals(@Nullable Object o1, @Nullable Object o2) {
        if (o1 == o2)
            return true;
        else if (o1 == null || o2 == null)
            return false;
        else if (o1.getClass() != o2.getClass())
            return false;
        else {
            Class<?> cls = o1.getClass();

            assert o2.getClass() == cls;

            for (Field f : cls.getDeclaredFields()) {
                f.setAccessible(true);

                Object v1;
                Object v2;

                try {
                    v1 = f.get(o1);
                    v2 = f.get(o2);
                }
                catch (IllegalAccessException e) {
                    throw new AssertionError(e);
                }

                if (!Objects.deepEquals(v1, v2))
                    return false;
            }

            return true;
        }
    }

    /**
     * Converts integer permission mode into set of {@link PosixFilePermission}.
     *
     * @param mode File mode.
     * @return Set of {@link PosixFilePermission}.
     */
    public static Set<PosixFilePermission> modeToPermissionSet(int mode) {
        Set<PosixFilePermission> res = EnumSet.noneOf(PosixFilePermission.class);

        if ((mode & 0400) > 0)
            res.add(PosixFilePermission.OWNER_READ);

        if ((mode & 0200) > 0)
            res.add(PosixFilePermission.OWNER_WRITE);

        if ((mode & 0100) > 0)
            res.add(PosixFilePermission.OWNER_EXECUTE);

        if ((mode & 040) > 0)
            res.add(PosixFilePermission.GROUP_READ);

        if ((mode & 020) > 0)
            res.add(PosixFilePermission.GROUP_WRITE);

        if ((mode & 010) > 0)
            res.add(PosixFilePermission.GROUP_EXECUTE);

        if ((mode & 04) > 0)
            res.add(PosixFilePermission.OTHERS_READ);

        if ((mode & 02) > 0)
            res.add(PosixFilePermission.OTHERS_WRITE);

        if ((mode & 01) > 0)
            res.add(PosixFilePermission.OTHERS_EXECUTE);

        return res;
    }

    /**
     * @param name Name.
     * @param run Run.
     */
    public static void benchmark(@Nullable String name, @NotNull Runnable run) {
        benchmark(name, 8000, 10000, run);
    }

    /**
     * @param name Name.
     * @param warmup Warmup.
     * @param executionTime Time.
     * @param run Run.
     */
    public static void benchmark(@Nullable String name, long warmup, long executionTime, @NotNull Runnable run) {
        final AtomicBoolean stop = new AtomicBoolean();

        class Stopper extends TimerTask {
            @Override public void run() {
                stop.set(true);
            }
        }

        new Timer(true).schedule(new Stopper(), warmup);

        while (!stop.get())
            run.run();

        stop.set(false);

        new Timer(true).schedule(new Stopper(), executionTime);

        long startTime = System.currentTimeMillis();

        int cnt = 0;

        do {
            run.run();

            cnt++;
        }
        while (!stop.get());

        double dur = (System.currentTimeMillis() - startTime) / 1000d;

        System.out.printf("%s:\n operations:%d, duration=%fs, op/s=%d, latency=%fms\n", name, cnt, dur,
            (long)(cnt / dur), dur / cnt);
    }

    /**
     * Prompt to execute garbage collector.
     * {@code System.gc();} is not guaranteed to garbage collection, this method try to fill memory to crowd out dead
     * objects.
     */
    public static void runGC() {
        System.gc();

        ReferenceQueue<byte[]> queue = new ReferenceQueue<>();

        Collection<SoftReference<byte[]>> refs = new ArrayList<>();

        while (true) {
            byte[] bytes = new byte[128 * 1024];

            refs.add(new SoftReference<>(bytes, queue));

            if (queue.poll() != null)
                break;
        }

        System.gc();
    }

    /**
     * @return Path to apache ignite.
     */
    public static String apacheIgniteTestPath() {
        return System.getProperty("IGNITE_TEST_PATH", U.getIgniteHome() + "/target/ignite");
    }

    /**
     * {@link Class#getSimpleName()} does not return outer class name prefix for inner classes, for example,
     * getSimpleName() returns "RegularDiscovery" instead of "GridDiscoveryManagerSelfTest$RegularDiscovery"
     * This method return correct simple name for inner classes.
     *
     * @param cls Class
     * @return Simple name with outer class prefix.
     */
    public static String fullSimpleName(@NotNull Class cls) {
        if (cls.getEnclosingClass() != null)
            return cls.getEnclosingClass().getSimpleName() + "." + cls.getSimpleName();
        else
            return cls.getSimpleName();
    }

    /**
     * Adds test class to the list only if it's not in {@code ignoredTests} set.
     *
     * @param suite List where to place the test class.
     * @param test Test.
     * @param ignoredTests Tests to ignore. If test contained in the collection it is not included in suite
     */
    public static void addTestIfNeeded(@NotNull final List<Class<?>> suite, @NotNull final Class<?> test,
        @Nullable final Collection<Class> ignoredTests) {
        if (ignoredTests != null && ignoredTests.contains(test))
            return;

        suite.add(test);
    }

    /**
     * Generate random alphabetical string.
     *
     * @param rnd Random object.
     * @param maxLen Maximal length of string.
     * @return Random string object.
     */
    public static String randomString(Random rnd, int maxLen) {
        return randomString(rnd, 0, maxLen);
    }

    /**
     * Generate random alphabetical string.
     *
     * @param rnd Random object.
     * @param minLen Minimum length of string.
     * @param maxLen Maximal length of string.
     * @return Random string object.
     */
    public static String randomString(Random rnd, int minLen, int maxLen) {
        assert minLen >= 0 : "minLen >= 0";
        assert maxLen >= minLen : "maxLen >= minLen";

        int len = maxLen == minLen ? minLen : minLen + rnd.nextInt(maxLen - minLen);

        StringBuilder b = new StringBuilder(len);

        for (int i = 0; i < len; i++)
            b.append(ALPHABETH.charAt(rnd.nextInt(ALPHABETH.length())));

        return b.toString();
    }

    /**
     * @param node Node.
     * @param topVer Ready exchange version to wait for before trying to merge exchanges.
     */
    public static void mergeExchangeWaitVersion(Ignite node, long topVer) {
        ((IgniteEx)node).context().cache().context().exchange().mergeExchangesTestWaitVersion(
            new AffinityTopologyVersion(topVer, 0), null);
    }

    /**
     * @param node Node.
     * @param topVer Ready exchange version to wait for before trying to merge exchanges.
     */
    public static void mergeExchangeWaitVersion(Ignite node, long topVer, List mergedEvts) {
        ((IgniteEx)node).context().cache().context().exchange().mergeExchangesTestWaitVersion(
            new AffinityTopologyVersion(topVer, 0), mergedEvts);
    }

    /**
     * Checks that {@code state} is active.
     *
     * @param state Passed cluster state.
     * @see ClusterState#active(ClusterState)
     */
    public static void assertActive(ClusterState state) {
        assertTrue(state + " isn't active state", ClusterState.active(state));
    }

    /**
     * Checks that {@code state} isn't active.
     *
     * @param state Passed cluster state.
     * @see ClusterState#active(ClusterState)
     */
    public static void assertInactive(ClusterState state) {
        assertFalse(state + " isn't inactive state", ClusterState.active(state));
    }

    /** Test parameters scale factor util. */
    private static class ScaleFactorUtil {
        /** Test speed scale factor property name. */
        private static final String TEST_SCALE_FACTOR_PROPERTY = "TEST_SCALE_FACTOR";

        /** Min test scale factor value. */
        private static final double MIN_TEST_SCALE_FACTOR_VALUE = 0.1;

        /** Max test scale factor value. */
        private static final double MAX_TEST_SCALE_FACTOR_VALUE = 1.0;

        /** Test speed scale factor. */
        private static final double TEST_SCALE_FACTOR_VALUE = readScaleFactor();

        /** */
        private static double readScaleFactor() {
            double scaleFactor = Double.parseDouble(System.getProperty(TEST_SCALE_FACTOR_PROPERTY, "1.0"));

            scaleFactor = Math.max(scaleFactor, MIN_TEST_SCALE_FACTOR_VALUE);
            scaleFactor = Math.min(scaleFactor, MAX_TEST_SCALE_FACTOR_VALUE);

            return scaleFactor;
        }

        /** */
        public static int apply(int val) {
            return (int)Math.round(TEST_SCALE_FACTOR_VALUE * val);
        }

        /** */
        public static int apply(int val, int lowerBound, int upperBound) {
            return applyUB(applyLB(val, lowerBound), upperBound);
        }

        /** Apply scale factor with lower bound */
        public static int applyLB(int val, int lowerBound) {
            return Math.max(apply(val), lowerBound);
        }

        /** Apply scale factor with upper bound */
        public static int applyUB(int val, int upperBound) {
            return Math.min(apply(val), upperBound);
        }
    }

    /**
     * @param node Node to connect to.
     * @param params Connection parameters.
     * @return Thin JDBC connection to specified node.
     */
    public static Connection connect(IgniteEx node, String params) throws SQLException {
        Collection<GridPortRecord> recs = node.context().ports().records();

        GridPortRecord cliLsnrRec = null;

        for (GridPortRecord rec : recs) {
            if (rec.clazz() == ClientListenerProcessor.class) {
                cliLsnrRec = rec;

                break;
            }
        }

        assertNotNull(cliLsnrRec);

        String connStr = "jdbc:ignite:thin://127.0.0.1:" + cliLsnrRec.port();

        if (!F.isEmpty(params))
            connStr += "/?" + params;

        return DriverManager.getConnection(connStr);
    }

    /**
     * Removes idle_verify log files created in tests.
     */
    public static void cleanIdleVerifyLogFiles() {
        File dir = new File(".");

        for (File f : dir.listFiles(n -> n.getName().startsWith(IdleVerifyResultV2.IDLE_VERIFY_FILE_PREFIX)))
            f.delete();
    }

    public static class SqlTestFunctions {
        /** Sleep milliseconds. */
        public static volatile long sleepMs;

        /** Fail flag. */
        public static volatile boolean fail;

        /**
         * Do sleep {@code sleepMs} milliseconds
         *
         * @return amount of milliseconds to sleep
         */
        @QuerySqlFunction
        @SuppressWarnings("BusyWait")
        public static long sleep() {
            long end = System.currentTimeMillis() + sleepMs;

            long remainTime =sleepMs;

            do {
                try {
                    Thread.sleep(remainTime);
                }
                catch (InterruptedException ignored) {
                    // No-op
                }
            }
            while ((remainTime = end - System.currentTimeMillis()) > 0);

            return sleepMs;
        }

        /**
         * Function do fail in case of {@code fail} is true, return 0 otherwise.
         *
         * @return in case of {@code fail} is false return 0, fail otherwise.
         */
        @QuerySqlFunction
        public static int can_fail() {
            if (fail)
                throw new IllegalArgumentException();
            else
                return 0;
        }

        /**
         * Function do sleep {@code sleepMs} milliseconds and do fail in case of {@code fail} is true, return 0 otherwise.
         *
         * @return amount of milliseconds to sleep in case of {@code fail} is false, fail otherwise.
         */
        @QuerySqlFunction
        public static long sleep_and_can_fail() {
            long sleep = sleep();

            can_fail();

            return sleep;
        }
    }

    /**
     * Runnable that can throw exceptions.
     */
    @FunctionalInterface
    public interface RunnableX extends Runnable {
        /**
         * Runnable body.
         *
         * @throws Exception If failed.
         */
        void runx() throws Exception;

        /** {@inheritdoc} */
        @Override default void run() {
            try {
                runx();
            }
            catch (Exception e) {
                throw new IgniteException(e);
            }
        }
    }

    /**
     * IgniteRunnable that can throw exceptions.
     */
    @FunctionalInterface
    public interface IgniteRunnableX extends IgniteRunnable {
        /**
         * Runnable body.
         *
         * @throws Exception If failed.
         */
        void runx() throws Exception;

        /** {@inheritdoc} */
        @Override default void run() {
            try {
                runx();
            }
            catch (Exception e) {
                throw new IgniteException(e);
            }
        }
    }
}
