/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheServerNotFoundException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.TransactionSerializationException;
import org.junit.Ignore;
import org.junit.Test;

import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;

/**
 *
 */
public class IgniteClientCacheStartFailoverTest extends GridCommonAbstractTest {
    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected void afterTest() throws Exception {
        stopAllGrids();

        super.afterTest();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testClientStartCoordinatorFailsAtomic() throws Exception {
        clientStartCoordinatorFails(ATOMIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testClientStartCoordinatorFailsTx() throws Exception {
        clientStartCoordinatorFails(TRANSACTIONAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testClientStartCoordinatorFailsMvccTx() throws Exception {
        clientStartCoordinatorFails(TRANSACTIONAL_SNAPSHOT);
    }

    /**
     * @param atomicityMode Cache atomicity mode.
     * @throws Exception If failed.
     */
    private void clientStartCoordinatorFails(CacheAtomicityMode atomicityMode) throws Exception {
        Ignite srv0 = startGrids(3);

        final int KEYS = 500;

        IgniteCache<Object, Object> cache = srv0.createCache(cacheConfiguration(DEFAULT_CACHE_NAME, atomicityMode, 1));

        for (int i = 0; i < KEYS; i++)
            cache.put(i, i);

        final Ignite c = startClientGrid(3);

        TestRecordingCommunicationSpi.spi(srv0).blockMessages(GridDhtAffinityAssignmentResponse.class, c.name());

        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
            @Override public Void call() throws Exception {
                c.cache(DEFAULT_CACHE_NAME);

                return null;
            }
        }, "start-cache");

        assertFalse(fut.isDone());

        stopGrid(0);

        fut.get();

        cache = c.cache(DEFAULT_CACHE_NAME);

        for (int i = 0; i < KEYS; i++) {
            assertEquals(i, cache.get(i));

            cache.put(i, i + 1);

            assertEquals(i + 1, cache.get(i));
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testClientStartLastServerFailsAtomic() throws Exception {
        clientStartLastServerFails(ATOMIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testClientStartLastServerFailsTx() throws Exception {
        clientStartLastServerFails(TRANSACTIONAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testClientStartLastServerFailsMvccTx() throws Exception {
        clientStartLastServerFails(TRANSACTIONAL_SNAPSHOT);
    }

    /**
     * @param atomicityMode Cache atomicity mode.
     * @throws Exception If failed.
     */
    private void clientStartLastServerFails(CacheAtomicityMode atomicityMode) throws Exception {
        startGrids(3);

        CacheConfiguration<Object, Object> cfg = cacheConfiguration(DEFAULT_CACHE_NAME, atomicityMode, 1);

        cfg.setNodeFilter(new TestNodeFilter(getTestIgniteInstanceName(1)));

        Ignite srv1 = ignite(1);

        srv1.createCache(cfg);

        final Ignite c = startClientGrid(3);

        TestRecordingCommunicationSpi.spi(srv1).blockMessages(GridDhtAffinityAssignmentResponse.class, c.name());

        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
            @Override public Void call() throws Exception {
                c.cache(DEFAULT_CACHE_NAME);

                return null;
            }
        }, "start-cache");

        assertFalse(fut.isDone());

        stopGrid(1);

        fut.get();

        final IgniteCache<Object, Object> clientCache = c.cache(DEFAULT_CACHE_NAME);

        for (int i = 0; i < 10; i++) {
            final int k = i;

            GridTestUtils.assertThrows(log, new Callable<Void>() {
                @Override public Void call() throws Exception {
                    clientCache.get(k);

                    return null;
                }
            }, CacheServerNotFoundException.class, null);

            GridTestUtils.assertThrows(log, new Callable<Void>() {
                @Override public Void call() throws Exception {
                    clientCache.put(k, k);

                    return null;
                }
            }, CacheServerNotFoundException.class, null);

            GridTestUtils.assertThrows(log, new Callable<Void>() {
                @Override public Void call() throws Exception {
                    clientCache.remove(k);

                    return null;
                }
            }, CacheServerNotFoundException.class, null);
        }

        startGrid(1);

        awaitPartitionMapExchange();

        for (int i = 0; i < 100; i++) {
            assertNull(clientCache.get(i));

            clientCache.put(i, i);

            assertEquals(i, clientCache.get(i));
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testRebalanceState() throws Exception {
        final int SRVS = 3;

        startGrids(SRVS);

        List<String> cacheNames = startCaches(ignite(0), 100);

        Ignite c = startClientGrid(SRVS);

        assertTrue(c.configuration().isClientMode());

        awaitPartitionMapExchange();

        TestRecordingCommunicationSpi.spi(ignite(0)).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
            @Override public boolean apply(ClusterNode clusterNode, Message msg) {
                return msg instanceof GridDhtPartitionsFullMessage &&
                    ((GridDhtPartitionsFullMessage)msg).exchangeId() == null;
            }
        });

        startGrid(SRVS + 1);

        for (String cacheName : cacheNames)
            c.cache(cacheName);

        // Will switch to ideal topology but some partitions are not evicted yet.
        for (int i = 0; i < SRVS + 1; i++) {
            AffinityTopologyVersion topVer = new AffinityTopologyVersion(SRVS + 2, 1);

            IgniteKernal node = (IgniteKernal)ignite(i);

            for (String cacheName : cacheNames) {
                GridDhtPartitionTopology top = node.cachex(cacheName).context().topology();

                waitForReadyTopology(top, topVer);

                assertEquals(topVer, top.readyTopologyVersion());
            }
        }

        TestRecordingCommunicationSpi.spi(ignite(0)).stopBlock();

        // Trigger eviction.
        awaitPartitionMapExchange();

        for (int i = 0; i < SRVS + 1; i++) {
            final AffinityTopologyVersion topVer = new AffinityTopologyVersion(SRVS + 2, 1);

            final IgniteKernal node = (IgniteKernal)ignite(i);

            for (String cacheName : cacheNames) {
                final GridDhtPartitionTopology top = node.cachex(cacheName).context().topology();

                GridTestUtils.waitForCondition(new GridAbsPredicate() {
                    @Override public boolean apply() {
                        return top.rebalanceFinished(topVer);
                    }
                }, 5000);

                assertTrue(top.rebalanceFinished(topVer));
            }
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testRebalanceStateConcurrentStart() throws Exception {
        final int SRVS1 = 3;
        final int CLIENTS = 5;
        final int SRVS2 = 5;

        startGrids(SRVS1);

        Ignite srv0 = ignite(0);

        final int KEYS = 1000;

        final List<String> cacheNames = startCaches(srv0, KEYS);

        final List<Ignite> clients = new ArrayList<>();

        for (int i = 0; i < CLIENTS; i++)
            clients.add(startClientGrid(SRVS1 + i));

        final CyclicBarrier barrier = new CyclicBarrier(clients.size() + SRVS2);

        final AtomicInteger clientIdx = new AtomicInteger();

        final Set<Integer> keys = new HashSet<>();

        for (int i = 0; i < KEYS; i++)
            keys.add(i);

        IgniteInternalFuture<?> fut1 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
            @Override public Void call() throws Exception {
                barrier.await();

                Ignite client = clients.get(clientIdx.getAndIncrement());

                for (String cacheName : cacheNames)
                    client.cache(cacheName);

                ThreadLocalRandom rnd = ThreadLocalRandom.current();

                for (int i = 0; i < 10; i++) {
                    for (String cacheName : cacheNames) {
                        IgniteCache<Object, Object> cache = client.cache(cacheName);

                        Map<Object, Object> map0 = cache.getAll(keys);

                        assertEquals("[cache=" + cacheName +
                            ", expected=" + KEYS +
                            ", actual=" + map0.size() + ']', KEYS, map0.size());

                        int key = rnd.nextInt(KEYS);

                        try {
                            cache.put(key, i);
                        }
                        catch (CacheException e) {
                            log.error("It couldn't put a value [cache=" + cacheName +
                                ", key=" + key +
                                ", val=" + i + ']', e);

                            CacheConfiguration ccfg = cache.getConfiguration(CacheConfiguration.class);

                            TransactionSerializationException txEx = X.cause(e, TransactionSerializationException.class);

                            boolean notContains = !txEx.getMessage().contains(
                                "Cannot serialize transaction due to write conflict (transaction is marked for rollback)"
                            );

                            if (txEx == null || ccfg.getAtomicityMode() != TRANSACTIONAL_SNAPSHOT || notContains)
                                fail("Assert violated because exception was thrown [e=" + e.getMessage() + ']');
                        }
                    }
                }

                return null;
            }
        }, clients.size(), "client-cache-start");

        final AtomicInteger srvIdx = new AtomicInteger(SRVS1 + CLIENTS);

        IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
            @Override public Void call() throws Exception {
                barrier.await();

                startGrid(srvIdx.incrementAndGet());

                return null;
            }
        }, SRVS2, "node-start");

        fut1.get();
        fut2.get();

        final AffinityTopologyVersion topVer = new AffinityTopologyVersion(SRVS1 + SRVS2 + CLIENTS, 1);

        for (Ignite client : clients) {
            for (String cacheName : cacheNames) {
                final GridDhtPartitionTopology top =
                    ((IgniteKernal)client).context().cache().internalCache(cacheName).context().topology();

                GridTestUtils.waitForCondition(new GridAbsPredicate() {
                    @Override public boolean apply() {
                        return top.rebalanceFinished(topVer);
                    }
                }, 5000);

                assertTrue(top.rebalanceFinished(topVer));
            }
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Ignore("https://issues.apache.org/jira/browse/IGNITE-11810")
    @Test
    public void testClientStartCloseServersRestart() throws Exception {
        final int SRVS = 4;
        final int CLIENTS = 4;

        startGrids(SRVS);

        final List<String> cacheNames = startCaches(ignite(0), 1000);

        final List<Ignite> clients = new ArrayList<>();

        for (int i = 0; i < CLIENTS; i++)
            clients.add(startClientGrid(SRVS + i));

        final AtomicBoolean stop = new AtomicBoolean();

        IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
            @Override public Void call() throws Exception {
                ThreadLocalRandom rnd = ThreadLocalRandom.current();

                while (!stop.get()) {
                    int nodeIdx = rnd.nextInt(SRVS);

                    stopGrid(nodeIdx);

                    U.sleep(rnd.nextLong(200) + 1);

                    startGrid(nodeIdx);

                    U.sleep(rnd.nextLong(200) + 1);
                }

                return null;
            }
        }, "restart");

        final AtomicInteger clientIdx = new AtomicInteger();

        IgniteInternalFuture<?> clientsFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
            @Override public Void call() throws Exception {
                Ignite client = clients.get(clientIdx.getAndIncrement());

                assertTrue(client.configuration().isClientMode());

                ThreadLocalRandom rnd = ThreadLocalRandom.current();

                while (!stop.get()) {
                    for (String cacheName : cacheNames)
                        client.cache(cacheName);

                    for (String cacheName : cacheNames) {
                        IgniteCache<Object, Object> cache = client.cache(cacheName);

                        cache.put(rnd.nextInt(1000), rnd.nextInt());

                        cache.get(rnd.nextInt(1000));
                    }

                    for (String cacheName : cacheNames) {
                        IgniteCache<Object, Object> cache = client.cache(cacheName);

                        cache.close();
                    }
                }

                return null;
            }
        }, CLIENTS, "client-thread");

        try {
            U.sleep(10_000);

            stop.set(true);

            restartFut.get();
            clientsFut.get();
        }
        finally {
            stop.set(true);
        }

        ThreadLocalRandom rnd = ThreadLocalRandom.current();

        for (Ignite client : clients) {
            for (String cacheName : cacheNames) {
                IgniteCache<Object, Object> cache = client.cache(cacheName);

                for (int i = 0; i < 10; i++) {
                    Integer key = rnd.nextInt(1000);

                    cache.put(key, i);

                    assertEquals(i, cache.get(key));
                }
            }
        }
    }

    /**
     * @param node Node.
     * @param keys Number of keys to put in caches.
     * @return Cache names.
     */
    private List<String> startCaches(Ignite node, int keys) {
        List<String> cacheNames = new ArrayList<>();

        final Map<Integer, Integer> map = new TreeMap<>();

        for (int i = 0; i < keys; i++)
            map.put(i, i);

        for (int i = 0; i < 3; i++) {
            CacheConfiguration<Object, Object> ccfg = cacheConfiguration("atomic-" + i, ATOMIC, i);

            IgniteCache<Object, Object> cache = node.createCache(ccfg);

            cacheNames.add(ccfg.getName());

            cache.putAll(map);
        }

        for (int i = 0; i < 3; i++) {
            CacheConfiguration<Object, Object> ccfg = cacheConfiguration("tx-" + i, TRANSACTIONAL, i);

            IgniteCache<Object, Object> cache = node.createCache(ccfg);

            cacheNames.add(ccfg.getName());

            cache.putAll(map);
        }

        for (int i = 0; i < 3; i++) {
            CacheConfiguration<Object, Object> ccfg = cacheConfiguration("mvcc-" + i, TRANSACTIONAL_SNAPSHOT, i);

            IgniteCache<Object, Object> cache = node.createCache(ccfg);

            cacheNames.add(ccfg.getName());

            cache.putAll(map);
        }

        return cacheNames;
    }

    /**
     * @param name Cache name.
     * @param atomicityMode Cache atomicity mode.
     * @param backups Number of backups.
     * @return Cache configuration.
     */
    private CacheConfiguration<Object, Object> cacheConfiguration(String name, CacheAtomicityMode atomicityMode, int backups) {
        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(name);

        ccfg.setWriteSynchronizationMode(FULL_SYNC);
        ccfg.setAtomicityMode(atomicityMode);
        ccfg.setBackups(backups);

        return ccfg;
    }

    /**
     *
     */
    private static class TestNodeFilter implements IgnitePredicate<ClusterNode> {
        /** */
        private final String includeName;

        /**
         * @param includeName Node to include.
         */
        public TestNodeFilter(String includeName) {
            this.includeName = includeName;
        }

        /** {@inheritDoc} */
        @Override public boolean apply(ClusterNode node) {
            return includeName.equals(node.attribute(IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME));
        }
    }
}
