/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.ignite.internal.processors.cache;

import java.io.Serializable;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.LongStream;
import javax.management.AttributeNotFoundException;
import javax.management.DynamicMBean;
import javax.management.MBeanException;
import javax.management.ReflectionException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TransactionsMXBeanImpl;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.mxbean.TransactionsMXBean;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.MessageOrderLogListener;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.thread.IgniteThreadFactory;
import org.apache.ignite.transactions.Transaction;
import org.junit.Test;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_TRANSACTION_TIME_DUMP_THRESHOLD;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_COEFFICIENT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_PER_SECOND_LIMIT;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.processors.cache.transactions.TransactionMetricsAdapter.METRIC_SYSTEM_TIME_HISTOGRAM;
import static org.apache.ignite.internal.processors.cache.transactions.TransactionMetricsAdapter.METRIC_TOTAL_SYSTEM_TIME;
import static org.apache.ignite.internal.processors.cache.transactions.TransactionMetricsAdapter.METRIC_TOTAL_USER_TIME;
import static org.apache.ignite.internal.processors.cache.transactions.TransactionMetricsAdapter.METRIC_USER_TIME_HISTOGRAM;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.TX_METRICS;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;

/**
 *
 */
@WithSystemProperty(key = IGNITE_LONG_TRANSACTION_TIME_DUMP_THRESHOLD, value = "999")
@WithSystemProperty(key = IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_COEFFICIENT, value = "1.0")
@WithSystemProperty(key = IGNITE_TRANSACTION_TIME_DUMP_SAMPLES_PER_SECOND_LIMIT, value = "5")
@WithSystemProperty(key = IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, value = "500")
public class GridTransactionsSystemUserTimeMetricsTest extends GridCommonAbstractTest {
    /** */
    private static final String CACHE_NAME = "test";

    /** */
    private static final String CLIENT = "client";

    /** */
    private static final String CLIENT_2 = CLIENT + "2";

    /** */
    private static final long USER_DELAY = 1000;

    /** */
    private static final long SYSTEM_DELAY = 1000;

    /** */
    private static final long EPSILON = 300;

    /** */
    private static final String TRANSACTION_TIME_DUMP_REGEX = ".*?ransaction time dump .*?totalTime=[0-9]{1,4}, " +
            "systemTime=[0-9]{1,4}, userTime=[0-9]{1,4}, cacheOperationsTime=[0-9]{1,4}.*";

    /** */
    private static final String ROLLBACK_TIME_DUMP_REGEX =
        ".*?Long transaction time dump .*?cacheOperationsTime=[0-9]{1,4}.*?rollbackTime=[0-9]{1,4}.*";

    /** Prefix of key for distributed meta storage. */
    private static final String DIST_CONF_PREFIX = "distrConf-";

    /** */
    private LogListener logTxDumpLsnr = new MessageOrderLogListener(TRANSACTION_TIME_DUMP_REGEX);

    /** */
    private TransactionDumpListener transactionDumpLsnr = new TransactionDumpListener(TRANSACTION_TIME_DUMP_REGEX);

    /** */
    private LogListener rollbackDumpLsnr = new MessageOrderLogListener(ROLLBACK_TIME_DUMP_REGEX);

    /** */
    private static CommonLogProxy testLog = new CommonLogProxy(null);

    /** */
    private final ListeningTestLogger listeningTestLog = new ListeningTestLogger(log());

    /** */
    private static IgniteLogger oldLog;

    /** Flag which is set to true if we need to slow system time. */
    private volatile boolean slowSystem;

    /** Flag which is set to true if we need to simulate transaction failure. */
    private volatile boolean simulateFailure;

    /** */
    private static boolean gridStarted = false;

    /** */
    private Ignite client;

    /** */
    private IgniteCache<Integer, Integer> cache;

    /** */
    private Callable<Object> txCallable = () -> {
        Integer val = cache.get(1);

        cache.put(1, val + 1);

        return null;
    };

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

        cfg.setGridLogger(testLog);

        boolean isClient = igniteInstanceName.contains(CLIENT);

        if (!isClient) {
            CacheConfiguration ccfg = new CacheConfiguration(CACHE_NAME);

            ccfg.setAtomicityMode(TRANSACTIONAL);
            ccfg.setBackups(1);
            ccfg.setWriteSynchronizationMode(FULL_SYNC);

            cfg.setCacheConfiguration(ccfg);
        }

        cfg.setMetricExporterSpi(new JmxMetricExporterSpi());

        cfg.setCommunicationSpi(new TestCommunicationSpi());

        return cfg;
    }

    /** */
    @Override protected void beforeTestsStarted() throws Exception {
        super.beforeTestsStarted();

        oldLog = GridTestUtils.getFieldValue(IgniteTxAdapter.class, "log");

        GridTestUtils.setFieldValue(IgniteTxAdapter.class, "log", testLog);
    }

    /** */
    @Override protected void afterTestsStopped() throws Exception {
        GridTestUtils.setFieldValue(IgniteTxAdapter.class, "log", oldLog);

        oldLog = null;

        gridStarted = false;

        stopAllGrids();

        super.afterTestsStopped();
    }

    /** */
    @Override protected void beforeTest() throws Exception {
        super.beforeTest();

        testLog.setImpl(listeningTestLog);

        listeningTestLog.registerListener(logTxDumpLsnr);
        listeningTestLog.registerListener(transactionDumpLsnr);
        listeningTestLog.registerListener(rollbackDumpLsnr);

        if (!gridStarted) {
            startGrids(2);

            gridStarted = true;
        }

        client = startClientGrid(CLIENT);

        cache = client.getOrCreateCache(CACHE_NAME);

        cache.put(1, 1);

        applyJmxParameters(1000L, 0.0, 5);
    }

    /** */
    @Override protected void afterTest() throws Exception {
        stopGrid(CLIENT);

        super.afterTest();
    }

    /**
     * Applies JMX parameters to client node in runtime. Parameters are spreading through the cluster, so this method
     * allows to change system/user time tracking without restarting the cluster.
     *
     * @param threshold Long transaction time dump threshold.
     * @param coefficient Transaction time dump samples coefficient.
     * @param limit Transaction time dump samples per second limit.
     * @return Transaction MX bean.
     * @throws InterruptedException If the current thread is interrupted while waiting.
     */
    private TransactionsMXBean applyJmxParameters(Long threshold, Double coefficient, Integer limit) throws InterruptedException {
        TransactionsMXBean tmMxBean = getMxBean(
            CLIENT,
            "Transactions",
            TransactionsMXBeanImpl.class,
            TransactionsMXBean.class);

        return applyJmxParameters(threshold, coefficient, limit, tmMxBean, client);
    }

    /**
     * Applies JMX parameters to node in runtime. Parameters are spreading through the cluster, so this method
     * allows to change system/user time tracking without restarting the cluster.
     *
     * @param threshold Long transaction time dump threshold.
     * @param coefficient Transaction time dump samples coefficient.
     * @param limit Transaction time dump samples per second limit.
     * @param tmMxBean Instance {@link TransactionsMXBean}.
     * @param ignite Node.
     * @return Transaction MX bean.
     * @throws InterruptedException If the current thread is interrupted while waiting.
     */
    private TransactionsMXBean applyJmxParameters(Long threshold, Double coefficient, Integer limit,
                                                  TransactionsMXBean tmMxBean, Ignite ignite
    ) throws InterruptedException {
        IgniteEx igniteEx = (IgniteEx)ignite;

        CountDownLatch thresholdLatch = new CountDownLatch(1);
        CountDownLatch coefficientLatch = new CountDownLatch(1);
        CountDownLatch limitLatch = new CountDownLatch(1);

        if (threshold != null) {
            igniteEx.context().distributedMetastorage().listen(
                    (key) -> key.startsWith(DIST_CONF_PREFIX),
                    (String key, Serializable oldVal, Serializable newVal) -> {
                        if (key.endsWith("longTransactionTimeDumpThreshold") && (long)newVal == threshold)
                            thresholdLatch.countDown();
                    });

            tmMxBean.setLongTransactionTimeDumpThreshold(threshold);
        }

        if (coefficient != null) {
            igniteEx.context().distributedMetastorage().listen(
                    (key) -> key.startsWith(DIST_CONF_PREFIX),
                    (String key, Serializable oldVal, Serializable newVal) -> {
                        if (key.endsWith("transactionTimeDumpSamplesCoefficient") && (double)newVal == coefficient)
                            coefficientLatch.countDown();
                    });

            tmMxBean.setTransactionTimeDumpSamplesCoefficient(coefficient);
        }

        if (limit != null) {
            igniteEx.context().distributedMetastorage().listen(
                    (key) -> key.startsWith(DIST_CONF_PREFIX),
                    (String key, Serializable oldVal, Serializable newVal) -> {
                        if (key.endsWith("longTransactionTimeDumpSamplesPerSecondLimit") && (int)newVal == limit)
                            limitLatch.countDown();
                    });

            tmMxBean.setTransactionTimeDumpSamplesPerSecondLimit(limit);
        }

        if (threshold != null)
            thresholdLatch.await(300, TimeUnit.MILLISECONDS);

        if (coefficient != null)
            coefficientLatch.await(300, TimeUnit.MILLISECONDS);

        if (limit != null)
            limitLatch.await(300, TimeUnit.MILLISECONDS);

        return tmMxBean;
    }

    /**
     * Allows to make N asynchronous transactions executing {@link #txCallable} in separate thread pool,
     * with given delay on user time for each transaction.
     *
     * @param client Client.
     * @param txCnt Transactions count.
     * @param userDelay User delay for each transaction.
     */
    private void doAsyncTransactions(Ignite client, int txCnt, long userDelay) {
        ExecutorService executorSrvc = Executors.newFixedThreadPool(txCnt, new IgniteThreadFactory("testscope", "async-tx-with-delay"));

        for (int i = 0; i < txCnt; i++) {
            executorSrvc.submit(() -> {
                try {
                    doInTransaction(client, () -> {
                        doSleep(userDelay);

                        txCallable.call();

                        return null;
                    });
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }

        executorSrvc.shutdown();
    }

    /**
     * Allows to run a transaction which executes {@link #txCallable} with given system delay, user delay and
     * mode.
     *
     * @param client Client.
     * @param sysDelay System delay.
     * @param userDelay User delay.
     * @param mode Mode, see {@link TxTestMode}.
     * @throws Exception If failed.
     */
    private void doTransaction(Ignite client, boolean sysDelay, boolean userDelay, TxTestMode mode) throws Exception {
        if (sysDelay)
            slowSystem = true;

        if (mode == TxTestMode.FAIL)
            simulateFailure = true;

        try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
            if (userDelay)
                doSleep(USER_DELAY);

            txCallable.call();

            if (mode == TxTestMode.ROLLBACK)
                tx.rollback();
            else
                tx.commit();
        }

        slowSystem = false;
        simulateFailure = false;
    }

    /**
     * Allows to run a transaction which executes {@link #txCallable} with given system delay, user delay and
     * mode, also measures it's start time, completion time, gets MX bean with metrics and gives it in a result.
     *
     * @param sysDelay  System delay.
     * @param userDelay User delay.
     * @param mode Mode, see {@link TxTestMode}.
     * @return Result, see {@link ClientTxTestResult}.
     * @throws Exception If failed.
     */
    private ClientTxTestResult measureClientTransaction(boolean sysDelay, boolean userDelay, TxTestMode mode) throws Exception {
        logTxDumpLsnr.reset();
        rollbackDumpLsnr.reset();

        long startTime = System.currentTimeMillis();

        try {
            doTransaction(client, sysDelay, userDelay, mode);
        }
        catch (Exception e) {
            // Giving a time for transaction to rollback.
            doSleep(500);
        }

        long completionTime = System.currentTimeMillis();

        ClientTxTestResult res = new ClientTxTestResult(startTime, completionTime,
            metricRegistry(CLIENT, null, TX_METRICS));

        return res;
    }

    /**
     * Checks that histogram long array is not null and is not empty.
     *
     * @param histogram Array.
     * @param txCnt Total count of transactions, that should be presented in histogram.
     */
    private void checkHistogram(long[] histogram, long txCnt) {
        assertNotNull(histogram);

        long cnt = LongStream.of(histogram).sum();

        assertEquals("Must be " + txCnt + " transaction(s), actually were: " + cnt + ". Histogram: " + histogram, txCnt, cnt);
    }

    /**
     * Checks if metrics have correct values with given delay mode.
     *
     * @param res Should contains the result of transaction completion - start time, completion time and MX bean
     * from which metrics can be received.
     * @param userDelayMode If true, we are checking metrics after transaction with user delay. Otherwise,
     * we are checking metrics after transaction with system delay.
     * @throws MBeanException If getting of metric attribute failed.
     * @throws AttributeNotFoundException If getting of metric attribute failed.
     * @throws ReflectionException If getting of metric attribute failed.
     */
    private void checkTxDelays(ClientTxTestResult res, boolean userDelayMode)
        throws MBeanException, AttributeNotFoundException, ReflectionException {
        long userTime = (Long)res.mBean.getAttribute(METRIC_TOTAL_USER_TIME);
        long sysTime = (Long)res.mBean.getAttribute(METRIC_TOTAL_SYSTEM_TIME);

        if (userDelayMode) {
            assertTrue(userTime >= USER_DELAY);
            assertTrue(userTime < res.completionTime - res.startTime - sysTime + EPSILON);
            assertTrue(sysTime >= 0);
            assertTrue(sysTime < EPSILON);
        }
        else {
            assertTrue(userTime >= 0);
            assertTrue(userTime < EPSILON);
            assertTrue(sysTime >= SYSTEM_DELAY);
            assertTrue(sysTime < res.completionTime - res.startTime - userTime + EPSILON);
        }

        checkHistogram((long[])res.mBean.getAttribute(METRIC_SYSTEM_TIME_HISTOGRAM), 2);
        checkHistogram((long[])res.mBean.getAttribute(METRIC_USER_TIME_HISTOGRAM), 2);
    }

    /**
     * Test user time and system time with user delay on committed transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testUserDelayOnCommittedTx() throws Exception {
        ClientTxTestResult res = measureClientTransaction(false, true, TxTestMode.COMMIT);

        assertTrue(logTxDumpLsnr.check());

        checkTxDelays(res, true);
    }

    /**
     * Test user time and system time with user delay on rolled back transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testUserDelayOnRolledBackTx() throws Exception {
        ClientTxTestResult res = measureClientTransaction(false, true, TxTestMode.ROLLBACK);

        assertTrue(rollbackDumpLsnr.check());

        checkTxDelays(res, true);
    }

    /**
     * Test user time and system time with user delay on failed transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testUserDelayOnFailedTx() throws Exception {
        ClientTxTestResult res = measureClientTransaction(false, true, TxTestMode.FAIL);

        assertTrue(rollbackDumpLsnr.check());

        checkTxDelays(res, true);
    }

    /**
     * Test user time and system time with system delay on committed transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testSystemDelayOnCommittedTx() throws Exception {
        ClientTxTestResult res = measureClientTransaction(true, false, TxTestMode.COMMIT);

        assertTrue(logTxDumpLsnr.check());

        checkTxDelays(res, false);
    }

    /**
     * Test user time and system time with system delay on rolled back transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testSystemDelayOnRolledBackTx() throws Exception {
        ClientTxTestResult res = measureClientTransaction(true, false, TxTestMode.ROLLBACK);

        assertTrue(rollbackDumpLsnr.check());

        checkTxDelays(res, false);
    }

    /**
     * Test user time and system time with system delay on failed transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testSystemDelayOnFailedTx() throws Exception {
        ClientTxTestResult res = measureClientTransaction(true, false, TxTestMode.FAIL);

        assertTrue(rollbackDumpLsnr.check());

        checkTxDelays(res, false);
    }

    /**
     * Test that changing of JMX parameters spreads on cluster correctly.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testJmxParametersSpreading() throws Exception {
        IgniteEx client2 = startGrid(CLIENT_2);

        try {
            TransactionsMXBean tmMxBean = getMxBean(
                CLIENT,
                "Transactions",
                TransactionsMXBeanImpl.class,
                TransactionsMXBean.class);

            TransactionsMXBean tmMxBean2 = getMxBean(
                CLIENT_2,
                "Transactions",
                TransactionsMXBeanImpl.class,
                TransactionsMXBean.class);

            int oldLimit = tmMxBean.getTransactionTimeDumpSamplesPerSecondLimit();
            long oldThreshold = tmMxBean.getLongTransactionTimeDumpThreshold();
            double oldCoefficient = tmMxBean.getTransactionTimeDumpSamplesCoefficient();

            try {
                int newLimit = 1234;
                long newThreshold = 99999;
                double newCoefficient = 0.01;

                applyJmxParameters(null, newCoefficient, newLimit);
                applyJmxParameters(newThreshold, null, null, tmMxBean2, client2);

                assertEquals(newLimit, tmMxBean2.getTransactionTimeDumpSamplesPerSecondLimit());
                assertEquals(newThreshold, tmMxBean.getLongTransactionTimeDumpThreshold());
                assertTrue(tmMxBean2.getTransactionTimeDumpSamplesCoefficient() - newCoefficient < 0.0001);
            }
            finally {
                applyJmxParameters(oldThreshold, oldCoefficient, oldLimit);
            }
        }
        finally {
            // CLIENT grid is stopped in afterTest.
            stopGrid(CLIENT_2);
        }
    }

    /**
     * Tests that tx time dumps appear in log correctly and after tx completion. Also checks that LRT dump
     * now contains information about current system and user time.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testLongTransactionDumpLimit() throws Exception {
        logTxDumpLsnr.reset();
        transactionDumpLsnr.reset();

        int txCnt = 10;

        List<String> txLogLines = new LinkedList<>();

        txLogLines.add("First 10 long running transactions \\[total=" + txCnt + "\\]");

        for (int i = 0; i < txCnt; i++)
            txLogLines.add(".*?>>> Transaction .*? systemTime=[0-4]{1,4}, userTime=[0-4]{1,4}.*");

        LogListener lrtLogLsnr = new MessageOrderLogListener(txLogLines.toArray(new String[0]));

        listeningTestLog.registerListener(lrtLogLsnr);

        applyJmxParameters(5000L, null, txCnt);

        doAsyncTransactions(client, txCnt, 5200);

        doSleep(3000);

        assertFalse(logTxDumpLsnr.check());

        doSleep(3000);

        assertTrue(logTxDumpLsnr.check());
        assertTrue(transactionDumpLsnr.check());
        assertTrue(lrtLogLsnr.check());

        assertEquals(txCnt, transactionDumpLsnr.value());
    }

    /**
     * Tests transactions sampling with dumping 100% of transactions in log.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testSamplingCoefficient() throws Exception {
        logTxDumpLsnr.reset();
        transactionDumpLsnr.reset();

        int txCnt = 10;

        applyJmxParameters(null, 1.0, txCnt);

        // Wait for a second to reset hit counter.
        doSleep(1000);

        for (int i = 0; i < txCnt; i++)
            doInTransaction(client, txCallable);

        assertTrue(logTxDumpLsnr.check());
        assertTrue(transactionDumpLsnr.check());

        assertEquals(txCnt, transactionDumpLsnr.value());
    }

    /**
     * Tests transactions sampling with dumping 0% of transactions in log.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testNoSamplingCoefficient() throws Exception {
        logTxDumpLsnr.reset();

        applyJmxParameters(null, 0.0, 10);

        int txCnt = 10;

        for (int i = 0; i < txCnt; i++)
            doInTransaction(client, txCallable);

        assertFalse(logTxDumpLsnr.check());
    }

    /**
     * Tests transactions sampling with dumping 100% of transactions in log but limited by 2 dump records per second.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testSamplingLimit() throws Exception {
        logTxDumpLsnr.reset();
        transactionDumpLsnr.reset();

        int txCnt = 10;
        int txDumpCnt = 2;

        LogListener transactionDumpsSkippedLsnr = LogListener
                .matches("Transaction time dumps skipped because of log throttling: " + (txCnt - txDumpCnt))
                .build();

        listeningTestLog.registerListener(transactionDumpsSkippedLsnr);

        applyJmxParameters(null, 1.0, txDumpCnt);

        // Wait for a second to reset hit counter.
        doSleep(1000);

        for (int i = 0; i < txCnt; i++)
            doInTransaction(client, txCallable);

        // Wait for a second to reset hit counter.
        doSleep(1000);

        // One more sample to print information about skipped previous samples.
        doInTransaction(client, txCallable);

        assertTrue(logTxDumpLsnr.check());
        assertTrue(transactionDumpLsnr.check());
        assertTrue(transactionDumpsSkippedLsnr.check());

        assertEquals(txDumpCnt + 1, transactionDumpLsnr.value());
    }

    /**
     * Tests transactions sampling with dumping 100% of transactions in log and no threshold timeout.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testSamplingNoThreshold() throws Exception {
        logTxDumpLsnr.reset();
        transactionDumpLsnr.reset();

        int txCnt = 10;

        applyJmxParameters(0L, 1.0, txCnt);

        // Wait for a second to reset hit counter.
        doSleep(1000);

        for (int i = 0; i < txCnt; i++)
            doInTransaction(client, txCallable);

        assertTrue(logTxDumpLsnr.check());
        assertTrue(transactionDumpLsnr.check());

        assertEquals(txCnt, transactionDumpLsnr.value());
    }

    /**
     * Tests transactions sampling with dumping 100% of transactions in log, no threshold timeout but with limit of 5
     * transactions per second.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testSamplingNoThresholdWithLimit() throws Exception {
        logTxDumpLsnr.reset();

        int txCnt = 10;

        applyJmxParameters(0L, 0.0, 5);

        for (int i = 0; i < txCnt; i++)
            doInTransaction(client, txCallable);

        assertFalse(logTxDumpLsnr.check());
    }

    /**
     * Test communication SPI, allowing to simulate system delay on lock and transaction failure on prepare.
     */
    private class TestCommunicationSpi extends TcpCommunicationSpi {
        /** {@inheritDoc} */
        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
            throws IgniteSpiException {
            if (msg instanceof GridIoMessage) {
                Object msg0 = ((GridIoMessage)msg).message();

                if (msg0 instanceof GridNearLockRequest || msg0 instanceof MvccTxSnapshotRequest) {
                    if (slowSystem) {
                        slowSystem = false;

                        doSleep(SYSTEM_DELAY);
                    }
                }

                if (msg0 instanceof GridNearTxPrepareRequest) {
                    if (simulateFailure) {
                        simulateFailure = false;

                        throw new RuntimeException("Simulating prepare failure.");
                    }
                }
            }

            super.sendMessage(node, msg, ackClosure);
        }
    }

    /**
     *
     */
    private static class TransactionDumpListener extends LogListener {
        /** */
        private final AtomicInteger counter = new AtomicInteger(0);

        /** */
        private final String regex;

        /** */
        private TransactionDumpListener(String regex) {
            this.regex = regex;
        }

        /** {@inheritDoc} */
        @Override public boolean check() {
            return value() > 0;
        }

        /** {@inheritDoc} */
        @Override public void reset() {
            counter.set(0);
        }

        /** {@inheritDoc} */
        @Override public void accept(String s) {
            if (s.matches(regex))
                counter.incrementAndGet();
        }

        /** */
        int value() {
            return counter.get();
        }
    }

    /**
     * Enum to define transaction test mode.
     */
    enum TxTestMode {
        /** If transaction should be committed. */
        COMMIT,

        /** If transaction should be rolled back. */
        ROLLBACK,

        /** If transaction should fail. */
        FAIL
    }

    /**
     * Result of running of a test transaction.
     */
    private static class ClientTxTestResult {
        /** Start time. */
        final long startTime;

        /** Completion time. */
        final long completionTime;

        /** MX bean to receive metrics. */
        final DynamicMBean mBean;

        /** */
        public ClientTxTestResult(long startTime, long completionTime, DynamicMBean mBean) {
            this.startTime = startTime;
            this.completionTime = completionTime;
            this.mBean = mBean;
        }
    }

    /** */
    private static class CommonLogProxy implements IgniteLogger {
        /** */
        private IgniteLogger impl;

        /** */
        public CommonLogProxy(IgniteLogger impl) {
            this.impl = impl;
        }

        /** */
        public void setImpl(IgniteLogger impl) {
            this.impl = impl;
        }

        /** {@inheritDoc} */
        @Override public IgniteLogger getLogger(Object ctgr) {
            return impl.getLogger(ctgr);
        }

        /** {@inheritDoc} */
        @Override public void trace(String msg) {
            impl.trace(msg);
        }

        /** {@inheritDoc} */
        @Override public void debug(String msg) {
            impl.debug(msg);
        }

        /** {@inheritDoc} */
        @Override public void info(String msg) {
            impl.info(msg);
        }

        /** {@inheritDoc} */
        @Override public void warning(String msg, Throwable e) {
            impl.warning(msg, e);
        }

        /** {@inheritDoc} */
        @Override public void error(String msg, Throwable e) {
            impl.error(msg, e);
        }

        /** {@inheritDoc} */
        @Override public boolean isTraceEnabled() {
            return impl.isTraceEnabled();
        }

        /** {@inheritDoc} */
        @Override public boolean isDebugEnabled() {
            return impl.isDebugEnabled();
        }

        /** {@inheritDoc} */
        @Override public boolean isInfoEnabled() {
            return impl.isInfoEnabled();
        }

        /** {@inheritDoc} */
        @Override public boolean isQuiet() {
            return impl.isQuiet();
        }

        /** {@inheritDoc} */
        @Override public String fileName() {
            return impl.fileName();
        }
    }
}
