/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.affinity.AffinityFunctionContext;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridJobExecuteResponse;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

/** */
public class IgniteDynamicCacheStartCoordinatorFailoverTest extends GridCommonAbstractTest {
    /** Latch which blocks DynamicCacheChangeFailureMessage until main thread has sent node fail signal. */
    private static volatile CountDownLatch latch;

    /** */
    private static final String COORDINATOR_ATTRIBUTE = "coordinator";

    /** Client mode flag. */
    private Boolean appendCustomAttribute;

    /** {@inheritDoc} */
    @Override protected void afterTest() throws Exception {
        stopAllGrids();
    }

    /** {@inheritDoc} */
    @Override protected void beforeTest() throws Exception {
        latch = new CountDownLatch(1);
    }

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

        TcpCommunicationSpi commSpi = new CustomCommunicationSpi();
        commSpi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));

        cfg.setCommunicationSpi(commSpi);

        cfg.setFailureDetectionTimeout(15_000);

        if (appendCustomAttribute) {
            Map<String, Object> attrs = new HashMap<>();

            attrs.put(COORDINATOR_ATTRIBUTE, Boolean.TRUE);

            cfg.setUserAttributes(attrs);
        }

        return cfg;
    }

    /**
     * Tests coordinator failover during cache start failure.
     *
     * @throws Exception If test failed.
     */
    @Test
    public void testCoordinatorFailure() throws Exception {
        // Start coordinator node.
        appendCustomAttribute = true;

        Ignite g = startGrid(0);

        appendCustomAttribute = false;

        Ignite g1 = startGrid(1);
        Ignite g2 = startGrid(2);

        awaitPartitionMapExchange();

        CacheConfiguration cfg = new CacheConfiguration();

        cfg.setName("test-coordinator-failover");

        cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);

        cfg.setAffinity(new BrokenAffinityFunction(false, getTestIgniteInstanceName(2)));

        GridTestUtils.runAsync(new Callable<Object>() {
            @Override public Object call() throws Exception {
                GridTestUtils.assertThrows(log, new Callable<Object>() {
                    @Override public Object call() throws Exception {
                        g1.getOrCreateCache(cfg);
                        return null;
                    }
                }, CacheException.class, null);

                return null;
            }
        }, "cache-starter-thread");

        latch.await();

        stopGrid(0, true);

        awaitPartitionMapExchange();

        // Correct the cache configuration.
        cfg.setAffinity(new RendezvousAffinityFunction());

        IgniteCache cache = g1.getOrCreateCache(cfg);

        checkCacheOperations(g1, cache);
    }

    /**
     * Test the basic cache operations.
     *
     * @param cache Cache.
     * @throws Exception If test failed.
     */
    protected void checkCacheOperations(Ignite ignite, IgniteCache cache) throws Exception {
        int cnt = 1000;

        // Check base cache operations.
        for (int i = 0; i < cnt; ++i)
            cache.put(i, i);

        for (int i = 0; i < cnt; ++i) {
            Integer v = (Integer)cache.get(i);

            assertNotNull(v);
            assertEquals(i, v.intValue());
        }

        // Check Data Streamer capabilities.
        try (IgniteDataStreamer streamer = ignite.dataStreamer(cache.getName())) {
            for (int i = 0; i < 10_000; ++i)
                streamer.addData(i, i);
        }
    }

    /**
     * Communication SPI which could optionally block outgoing messages.
     */
    private static class CustomCommunicationSpi extends TcpCommunicationSpi {
        /**
         * Send message optionally either blocking it or throwing an exception if it is of
         * {@link GridJobExecuteResponse} type.
         *
         * @param node Destination node.
         * @param msg Message to be sent.
         * @param ackClosure Ack closure.
         * @throws org.apache.ignite.spi.IgniteSpiException If failed.
         */
        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
            throws IgniteSpiException {

            if (msg instanceof GridIoMessage) {
                GridIoMessage msg0 = (GridIoMessage)msg;

                if (msg0.message() instanceof GridDhtPartitionsSingleMessage) {
                    Boolean attr = (Boolean)node.attributes().get(COORDINATOR_ATTRIBUTE);

                    GridDhtPartitionsSingleMessage singleMsg = (GridDhtPartitionsSingleMessage)msg0.message();

                    Exception err = singleMsg.getError();

                    if (Boolean.TRUE.equals(attr) && err != null) {
                        // skip message
                        latch.countDown();

                        return;
                    }
                }
            }

            super.sendMessage(node, msg, ackClosure);
        }
    }

    /**
     * Affinity function that throws an exception when affinity nodes are calculated on the given node.
     */
    public static class BrokenAffinityFunction extends RendezvousAffinityFunction {
        /** */
        private static final long serialVersionUID = 0L;

        /** */
        @IgniteInstanceResource
        private Ignite ignite;

        /** Exception should arise on all nodes. */
        private boolean eOnAllNodes = false;

        /** Exception should arise on node with certain name. */
        private String gridName;

        /**
         * Default constructor.
         */
        public BrokenAffinityFunction() {
            // No-op.
        }

        /**
         * @param eOnAllNodes {@code True} if exception should be thrown on all nodes.
         * @param gridName Exception should arise on node with certain name.
         */
        public BrokenAffinityFunction(boolean eOnAllNodes, String gridName) {
            this.eOnAllNodes = eOnAllNodes;
            this.gridName = gridName;
        }

        /** {@inheritDoc} */
        @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
            if (eOnAllNodes || ignite.name().equals(gridName))
                throw new IllegalStateException("Simulated exception [locNodeId="
                    + ignite.cluster().localNode().id() + "]");
            else
                return super.assignPartitions(affCtx);
        }
    }
}
