/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache;

import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.util.lang.GridTuple;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Test;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;

/**
 * Tests that removes are not lost when topology changes.
 */
public abstract class GridCacheAbstractRemoveFailureTest extends GridCommonAbstractTest {
    /** */
    private static final int GRID_CNT = 3;

    /** Keys count. */
    private static final int KEYS_CNT = 10_000;

    /** Test duration. */
    private static final long DUR = 90 * 1000L;

    /** Cache data assert frequency. */
    private static final long ASSERT_FREQ = 10_000;

    /** Kill delay. */
    private static final T2<Integer, Integer> KILL_DELAY = new T2<>(2000, 5000);

    /** Start delay. */
    private static final T2<Integer, Integer> START_DELAY = new T2<>(2000, 5000);

    /** */
    private static String sizePropVal;

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);

        if (testClientNode() && getTestIgniteInstanceName(0).equals(igniteInstanceName))
            cfg.setClientMode(true);

        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected void beforeTestsStarted() throws Exception {
        // Need to increase value set in GridAbstractTest
        sizePropVal = System.getProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE);

        System.setProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, "100000");

        startGrids(GRID_CNT);
    }

    /** {@inheritDoc} */
    @Override protected void afterTestsStopped() throws Exception {
        System.setProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, sizePropVal != null ? sizePropVal : "");
    }

    /** {@inheritDoc} */
    @Override protected long getTestTimeout() {
        return DUR + 60_000;
    }

    /**
     * @return Cache mode.
     */
    protected abstract CacheMode cacheMode();

    /**
     * @return Cache atomicity mode.
     */
    protected abstract CacheAtomicityMode atomicityMode();

    /**
     * @return Near cache configuration.
     */
    protected abstract NearCacheConfiguration nearCache();

    /**
     * @return {@code True} if test updates from client node.
     */
    protected boolean testClientNode() {
        return false;
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testPutAndRemove() throws Exception {
        putAndRemove(duration(), null, null);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testPutAndRemovePessimisticTx() throws Exception {
        if (atomicityMode() != CacheAtomicityMode.TRANSACTIONAL)
            return;

        putAndRemove(duration(), PESSIMISTIC, REPEATABLE_READ);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testPutAndRemoveOptimisticSerializableTx() throws Exception {
        if (atomicityMode() != CacheAtomicityMode.TRANSACTIONAL)
            return;

        putAndRemove(duration(), OPTIMISTIC, SERIALIZABLE);
    }

    /** */
    protected long duration() {
        return DUR;
    }

    /**
     * @param duration Test duration.
     * @param txConcurrency Transaction concurrency if test explicit transaction.
     * @param txIsolation Transaction isolation if test explicit transaction.
     * @throws Exception If failed.
     */
    private void putAndRemove(long duration,
        final TransactionConcurrency txConcurrency,
        final TransactionIsolation txIsolation) throws Exception {
        assertEquals(testClientNode(), (boolean) grid(0).configuration().isClientMode());

        grid(0).destroyCache(DEFAULT_CACHE_NAME);

        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);

        ccfg.setWriteSynchronizationMode(FULL_SYNC);

        ccfg.setCacheMode(cacheMode());

        if (cacheMode() == PARTITIONED)
            ccfg.setBackups(1);

        ccfg.setAtomicityMode(atomicityMode());
        ccfg.setNearConfiguration(nearCache());

        final IgniteCache<Integer, Integer> sndCache0 = grid(0).createCache(ccfg);

        final AtomicBoolean stop = new AtomicBoolean();

        final AtomicLong cntr = new AtomicLong();

        final AtomicLong errCntr = new AtomicLong();

        // Expected values in cache.
        final Map<Integer, GridTuple<Integer>> expVals = new ConcurrentHashMap<>();

        final AtomicReference<CyclicBarrier> cmp = new AtomicReference<>();

        IgniteInternalFuture<?> updateFut = GridTestUtils.runAsync(new Callable<Void>() {
            @Override public Void call() throws Exception {
                Thread.currentThread().setName("update-thread");

                ThreadLocalRandom rnd = ThreadLocalRandom.current();

                IgniteTransactions txs = sndCache0.unwrap(Ignite.class).transactions();

                while (!stop.get()) {
                    for (int i = 0; i < 100; i++) {
                        int key = rnd.nextInt(KEYS_CNT);

                        boolean put = rnd.nextInt(0, 100) > 10;

                        while (true) {
                            try {
                                if (put) {
                                    boolean failed = false;

                                    if (txConcurrency != null) {
                                        try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
                                            sndCache0.put(key, i);

                                            tx.commit();
                                        }
                                        catch (CacheException | IgniteException e) {
                                            if (!X.hasCause(e, ClusterTopologyCheckedException.class)) {
                                                log.error("Unexpected error: " + e);

                                                throw e;
                                            }

                                            failed = true;
                                        }
                                    }
                                    else
                                        sndCache0.put(key, i);

                                    if (!failed)
                                        expVals.put(key, F.t(i));
                                }
                                else {
                                    boolean failed = false;

                                    if (txConcurrency != null) {
                                        try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
                                            sndCache0.remove(key);

                                            tx.commit();
                                        }
                                        catch (CacheException | IgniteException e) {
                                            if (!X.hasCause(e, ClusterTopologyCheckedException.class)) {
                                                log.error("Unexpected error: " + e);

                                                throw e;
                                            }

                                            failed = true;
                                        }
                                    }
                                    else
                                        sndCache0.remove(key);

                                    if (!failed)
                                        expVals.put(key, F.<Integer>t(null));
                                }

                                break;
                            }
                            catch (CacheException e) {
                                if (put)
                                    log.error("Put failed [key=" + key + ", val=" + i + ']', e);
                                else
                                    log.error("Remove failed [key=" + key + ']', e);

                                errCntr.incrementAndGet();
                            }
                        }
                    }

                    cntr.addAndGet(100);

                    CyclicBarrier barrier = cmp.get();

                    if (barrier != null) {
                        log.info("Wait data check.");

                        barrier.await(60_000, TimeUnit.MILLISECONDS);

                        log.info("Finished wait data check.");
                    }
                }

                return null;
            }
        });

        IgniteInternalFuture killFut = createAndRunConcurrentAction(stop, cmp);

        try {
            long stopTime = duration + U.currentTimeMillis();

            long nextAssert = U.currentTimeMillis() + ASSERT_FREQ;

            while (U.currentTimeMillis() < stopTime) {
                long start = System.nanoTime();

                long ops = cntr.longValue();

                U.sleep(1000);

                long diff = cntr.longValue() - ops;

                double time = (System.nanoTime() - start) / 1_000_000_000d;

                long opsPerSecond = (long)(diff / time);

                log.info("Operations/second: " + opsPerSecond);

                if (U.currentTimeMillis() >= nextAssert) {
                    CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
                        @Override public void run() {
                            try {
                                cmp.set(null);

                                log.info("Checking cache content.");

                                assertCacheContent(expVals);

                                log.info("Finished check cache content.");
                            }
                            catch (Throwable e) {
                                log.error("Unexpected error: " + e, e);

                                throw e;
                            }
                        }
                    });

                    log.info("Start cache content check.");

                    cmp.set(barrier);

                    try {
                        barrier.await(60_000, TimeUnit.MILLISECONDS);
                    }
                    catch (TimeoutException e) {
                        U.dumpThreads(log);

                        fail("Failed to check cache content: " + e);
                    }

                    log.info("Cache content check done.");

                    nextAssert = System.currentTimeMillis() + ASSERT_FREQ;
                }
            }
        }
        finally {
            stop.set(true);
        }

        killFut.get();

        updateFut.get();

        log.info("Test finished. Update errors: " + errCntr.get());
    }

    /** */
    protected IgniteInternalFuture createAndRunConcurrentAction(final AtomicBoolean stop, final AtomicReference<CyclicBarrier> cmp) {
        return GridTestUtils.runAsync(new Callable<Void>() {
            @Override public Void call() throws Exception {
                Thread.currentThread().setName("restart-thread");

                while (!stop.get()) {
                    U.sleep(random(KILL_DELAY.get1(), KILL_DELAY.get2()));

                    killAndRestart(stop, random(1, GRID_CNT + 1));

                    CyclicBarrier barrier = cmp.get();

                    if (barrier != null) {
                        log.info("Wait data check.");

                        barrier.await(60_000, TimeUnit.MILLISECONDS);

                        log.info("Finished wait data check.");
                    }
                }

                return null;
            }
        });
    }

    /**
     * @param stop Stop flag.
     * @throws Exception If failed.
     */
    protected void killAndRestart(AtomicBoolean stop, int nodeIdx) throws Exception {
        if (stop.get())
            return;

        log.info("Killing node " + nodeIdx);

        stopGrid(nodeIdx);

        U.sleep(random(START_DELAY.get1(), START_DELAY.get2()));

        log.info("Restarting node " + nodeIdx);

        startGrid(nodeIdx);

        if (stop.get())
            return;

        U.sleep(1000);
    }

    /**
     * @param expVals Expected values in cache.
     */
    private void assertCacheContent(Map<Integer, GridTuple<Integer>> expVals) {
        assert !expVals.isEmpty();

        Collection<Integer> failedKeys = new HashSet<>();

        for (int i = 0; i < GRID_CNT; i++) {
            Ignite ignite = grid(i);

            IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);

            for (Map.Entry<Integer, GridTuple<Integer>> expVal : expVals.entrySet()) {
                Integer val = cache.get(expVal.getKey());

                if (!F.eq(expVal.getValue().get(), val)) {
                    failedKeys.add(expVal.getKey());

                    boolean primary = affinity(cache).isPrimary(ignite.cluster().localNode(), expVal.getKey());
                    boolean backup = affinity(cache).isBackup(ignite.cluster().localNode(), expVal.getKey());

                    log.error("Unexpected cache data [exp=" + expVal +
                        ", actual=" + val +
                        ", nodePrimary=" + primary +
                        ", nodeBackup=" + backup +
                        ", nodeIdx" + i +
                        ", nodeId=" + ignite.cluster().localNode().id() + ']');
                }
            }
        }

        assertTrue("Unexpected data for keys: " + failedKeys, failedKeys.isEmpty());
    }

    /**
     * @param min Min possible value.
     * @param max Max possible value (exclusive).
     * @return Random value.
     */
    protected static int random(int min, int max) {
        if (max == min)
            return max;

        return ThreadLocalRandom.current().nextInt(min, max);
    }
}
