/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache.distributed;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Ignore;
import org.junit.Test;

import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;

/**
 * Test for TRANSFORM events recording.
 */
@SuppressWarnings("ConstantConditions")
public class GridCacheTransformEventSelfTest extends GridCommonAbstractTest {
    /** Nodes count. */
    private static final int GRID_CNT = 3;

    /** Backups count for partitioned cache. */
    private static final int BACKUP_CNT = 1;

    /** Cache name. */
    private static final String CACHE_NAME = "cache";

    /** Key 1. */
    private Integer key1;

    /** Key 2. */
    private Integer key2;

    /** Two keys in form of a set. */
    private Set<Integer> keys;

    /** Nodes. */
    private Ignite[] ignites;

    /** Node IDs. */
    private UUID[] ids;

    /** Caches. */
    private IgniteCache<Integer, Integer>[] caches;

    /** Recorded events. */
    private GridConcurrentHashSet<CacheEvent> evts;

    /** Cache mode. */
    private CacheMode cacheMode;

    /** Atomicity mode. */
    private CacheAtomicityMode atomicityMode;

    /** TX concurrency. */
    private TransactionConcurrency txConcurrency;

    /** TX isolation. */
    private TransactionIsolation txIsolation;

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

        TransactionConfiguration tCfg = cfg.getTransactionConfiguration();

        tCfg.setDefaultTxConcurrency(txConcurrency);
        tCfg.setDefaultTxIsolation(txIsolation);

        CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);

        ccfg.setName(CACHE_NAME);

        ccfg.setCacheMode(cacheMode);
        ccfg.setAtomicityMode(atomicityMode);
        ccfg.setWriteSynchronizationMode(FULL_SYNC);

        if (cacheMode == PARTITIONED)
            ccfg.setBackups(BACKUP_CNT);

        cfg.setCacheConfiguration(ccfg);
        cfg.setLocalHost("127.0.0.1");
        cfg.setIncludeEventTypes(EVT_CACHE_OBJECT_READ);

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected void afterTest() throws Exception {
        stopAllGrids();

        ignites = null;
        ids = null;
        caches = null;

        evts = null;

        key1 = null;
        key2 = null;
        keys = null;
    }

    /**
     * Initialization routine.
     *
     * @param cacheMode Cache mode.
     * @param atomicityMode Atomicity mode.
     * @param txConcurrency TX concurrency.
     * @param txIsolation TX isolation.
     * @throws Exception If failed.
     */
    @SuppressWarnings("unchecked")
    private void initialize(CacheMode cacheMode, CacheAtomicityMode atomicityMode,
        TransactionConcurrency txConcurrency, TransactionIsolation txIsolation) throws Exception {
        this.cacheMode = cacheMode;
        this.atomicityMode = atomicityMode;
        this.txConcurrency = txConcurrency;
        this.txIsolation = txIsolation;

        evts = new GridConcurrentHashSet<>();

        startGridsMultiThreaded(GRID_CNT, true);

        if (cacheMode == REPLICATED)
            awaitPartitionMapExchange();

        ignites = new Ignite[GRID_CNT];
        ids = new UUID[GRID_CNT];
        caches = new IgniteCache[GRID_CNT];

        for (int i = 0; i < GRID_CNT; i++) {
            ignites[i] = grid(i);

            ids[i] = ignites[i].cluster().localNode().id();

            caches[i] = ignites[i].cache(CACHE_NAME);

            ignites[i].events().localListen(new IgnitePredicate<Event>() {
                @Override public boolean apply(Event evt) {
                    CacheEvent evt0 = (CacheEvent)evt;

                    if (evt0.closureClassName() != null) {
                        System.out.println("ADDED: [nodeId=" + evt0.node() + ", evt=" + evt0 + ']');

                        evts.add(evt0);
                    }

                    return true;
                }
            }, EVT_CACHE_OBJECT_READ);
        }

        int key = 0;

        while (true) {
            if (cacheMode != PARTITIONED || (primary(0, key) && backup(1, key))) {
                key1 = key++;

                break;
            }
            else
                key++;
        }

        while (true) {
            if (cacheMode != PARTITIONED || (primary(0, key) && backup(1, key))) {
                key2 = key;

                break;
            }
            else
                key++;
        }

        keys = new TreeSet<>();

        keys.add(key1);
        keys.add(key2);

        caches[0].put(key1, 1);
        caches[0].put(key2, 2);

        for (int i = 0; i < GRID_CNT; i++) {
            ignites[i].events().localListen(new IgnitePredicate<Event>() {
                @Override public boolean apply(Event evt) {
                    CacheEvent evt0 = (CacheEvent)evt;

                    if (evt0.closureClassName() != null)
                        evts.add(evt0);

                    return true;
                }
            }, EVT_CACHE_OBJECT_READ);
        }
    }

    /**
     * @param gridIdx Grid index.
     * @param key Key.
     * @return {@code True} if grid is primary for given key.
     */
    private boolean primary(int gridIdx, Object key) {
        Affinity<Object> aff = grid(0).affinity(CACHE_NAME);

        return aff.isPrimary(grid(gridIdx).cluster().localNode(), key);
    }

    /**
     * @param gridIdx Grid index.
     * @param key Key.
     * @return {@code True} if grid is primary for given key.
     */
    private boolean backup(int gridIdx, Object key) {
        Affinity<Object> aff = grid(0).affinity(CACHE_NAME);

        return aff.isBackup(grid(gridIdx).cluster().localNode(), key);
    }

    /**
     * Test TRANSACTIONAL LOCAL cache with OPTIMISTIC/REPEATABLE_READ transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testTxLocalOptimisticRepeatableRead() throws Exception {
        checkTx(LOCAL, OPTIMISTIC, REPEATABLE_READ);
    }

    /**
     * Test TRANSACTIONAL LOCAL cache with OPTIMISTIC/READ_COMMITTED transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testTxLocalOptimisticReadCommitted() throws Exception {
        checkTx(LOCAL, OPTIMISTIC, READ_COMMITTED);
    }

    /**
     * Test TRANSACTIONAL LOCAL cache with OPTIMISTIC/SERIALIZABLE transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testTxLocalOptimisticSerializable() throws Exception {
        checkTx(LOCAL, OPTIMISTIC, SERIALIZABLE);
    }

    /**
     * Test TRANSACTIONAL LOCAL cache with PESSIMISTIC/REPEATABLE_READ transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testTxLocalPessimisticRepeatableRead() throws Exception {
        checkTx(LOCAL, PESSIMISTIC, REPEATABLE_READ);
    }

    /**
     * Test TRANSACTIONAL LOCAL cache with PESSIMISTIC/READ_COMMITTED transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testTxLocalPessimisticReadCommitted() throws Exception {
        checkTx(LOCAL, PESSIMISTIC, READ_COMMITTED);
    }

    /**
     * Test TRANSACTIONAL LOCAL cache with PESSIMISTIC/SERIALIZABLE transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testTxLocalPessimisticSerializable() throws Exception {
        checkTx(LOCAL, PESSIMISTIC, SERIALIZABLE);
    }

    /**
     * Test TRANSACTIONAL_SNAPSHOT LOCAL cache with PESSIMISTIC/REPEATABLE_READ transaction.
     *
     * @throws Exception If failed.
     */
    @Ignore("https://issues.apache.org/jira/browse/IGNITE-9530")
    @Test
    public void testMvccTxLocalPessimisticRepeatableRead() throws Exception {
        checkMvccTx(LOCAL, PESSIMISTIC, REPEATABLE_READ);
    }

    /**
     * Test TRANSACTIONAL PARTITIONED cache with OPTIMISTIC/REPEATABLE_READ transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testTxPartitionedOptimisticRepeatableRead() throws Exception {
        checkTx(PARTITIONED, OPTIMISTIC, REPEATABLE_READ);
    }

    /**
     * Test TRANSACTIONAL PARTITIONED cache with OPTIMISTIC/READ_COMMITTED transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testTxPartitionedOptimisticReadCommitted() throws Exception {
        checkTx(PARTITIONED, OPTIMISTIC, READ_COMMITTED);
    }

    /**
     * Test TRANSACTIONAL PARTITIONED cache with OPTIMISTIC/SERIALIZABLE transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testTxPartitionedOptimisticSerializable() throws Exception {
        checkTx(PARTITIONED, OPTIMISTIC, SERIALIZABLE);
    }

    /**
     * Test TRANSACTIONAL PARTITIONED cache with PESSIMISTIC/REPEATABLE_READ transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testTxPartitionedPessimisticRepeatableRead() throws Exception {
        checkTx(PARTITIONED, PESSIMISTIC, REPEATABLE_READ);
    }

    /**
     * Test TRANSACTIONAL PARTITIONED cache with PESSIMISTIC/READ_COMMITTED transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testTxPartitionedPessimisticReadCommitted() throws Exception {
        checkTx(PARTITIONED, PESSIMISTIC, READ_COMMITTED);
    }

    /**
     * Test TRANSACTIONAL PARTITIONED cache with PESSIMISTIC/SERIALIZABLE transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testTxPartitionedPessimisticSerializable() throws Exception {
        checkTx(PARTITIONED, PESSIMISTIC, SERIALIZABLE);
    }

    /**
     * Test TRANSACTIONAL_SNAPSHOT PARTITIONED cache with PESSIMISTIC/REPEATABLE_READ transaction.
     *
     * @throws Exception If failed.
     */
    @Ignore("https://issues.apache.org/jira/browse/IGNITE-9321")
    @Test
    public void testMvccTxPartitionedPessimisticRepeatableRead() throws Exception {
        checkMvccTx(PARTITIONED, PESSIMISTIC, REPEATABLE_READ);
    }


    /**
     * Test TRANSACTIONAL REPLICATED cache with OPTIMISTIC/REPEATABLE_READ transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testTxReplicatedOptimisticRepeatableRead() throws Exception {
        checkTx(REPLICATED, OPTIMISTIC, REPEATABLE_READ);
    }

    /**
     * Test TRANSACTIONAL REPLICATED cache with OPTIMISTIC/READ_COMMITTED transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testTxReplicatedOptimisticReadCommitted() throws Exception {
        checkTx(REPLICATED, OPTIMISTIC, READ_COMMITTED);
    }

    /**
     * Test TRANSACTIONAL REPLICATED cache with OPTIMISTIC/SERIALIZABLE transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testTxReplicatedOptimisticSerializable() throws Exception {
        checkTx(REPLICATED, OPTIMISTIC, SERIALIZABLE);
    }

    /**
     * Test TRANSACTIONAL REPLICATED cache with PESSIMISTIC/REPEATABLE_READ transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testTxReplicatedPessimisticRepeatableRead() throws Exception {
        checkTx(REPLICATED, PESSIMISTIC, REPEATABLE_READ);
    }

    /**
     * Test TRANSACTIONAL REPLICATED cache with PESSIMISTIC/READ_COMMITTED transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testTxReplicatedPessimisticReadCommitted() throws Exception {
        checkTx(REPLICATED, PESSIMISTIC, READ_COMMITTED);
    }

    /**
     * Test TRANSACTIONAL REPLICATED cache with PESSIMISTIC/SERIALIZABLE transaction.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testTxReplicatedPessimisticSerializable() throws Exception {
        checkTx(REPLICATED, PESSIMISTIC, SERIALIZABLE);
    }

    /**
     * Test TRANSACTIONAL_SNAPSHOT REPLICATED cache with PESSIMISTIC/REPEATABLE_READ transaction.
     *
     * @throws Exception If failed.
     */
    @Ignore("https://issues.apache.org/jira/browse/IGNITE-9321")
    @Test
    public void testMvccTxReplicatedPessimisticRepeatableRead() throws Exception {
        checkMvccTx(REPLICATED, PESSIMISTIC, REPEATABLE_READ);
    }

    /**
     * Test ATOMIC LOCAL cache.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testAtomicLocal() throws Exception {
        checkAtomic(LOCAL);
    }

    /**
     * Test ATOMIC PARTITIONED cache.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testAtomicPartitioned() throws Exception {
        checkAtomic(PARTITIONED);
    }

    /**
     * Test ATOMIC REPLICATED cache.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testAtomicReplicated() throws Exception {
        checkAtomic(REPLICATED);
    }

    /**
     * Check ATOMIC cache.
     *
     * @param cacheMode Cache mode.
     * @throws Exception If failed.
     */
    private void checkAtomic(CacheMode cacheMode) throws Exception {
        initialize(cacheMode, ATOMIC, null, null);

        caches[0].invoke(key1, new Transformer());

        checkEventNodeIdsStrict(Transformer.class.getName(), primaryIdsForKeys(key1));

        assert evts.isEmpty();

        caches[0].invokeAll(keys, new Transformer());

        checkEventNodeIdsStrict(Transformer.class.getName(), primaryIdsForKeys(key1, key2));

        assert evts.isEmpty();

        caches[0].invoke(key1, new TransformerWithInjection());

        checkEventNodeIdsStrict(TransformerWithInjection.class.getName(), primaryIdsForKeys(key1));

        assert evts.isEmpty();

        caches[0].invokeAll(keys, new TransformerWithInjection());

        checkEventNodeIdsStrict(TransformerWithInjection.class.getName(), primaryIdsForKeys(key1, key2));
    }

    /**
     * Check TRANSACTIONAL_SNAPSHOT cache.
     *
     * @param cacheMode Cache mode.
     * @param txConcurrency TX concurrency.
     * @param txIsolation TX isolation.
     * @throws Exception If failed.
     */
    private void checkMvccTx(CacheMode cacheMode, TransactionConcurrency txConcurrency,
        TransactionIsolation txIsolation) throws Exception {
        initialize(cacheMode, TRANSACTIONAL_SNAPSHOT, txConcurrency, txIsolation);

        checkTx0();
    }

    /**
     * Check TRANSACTIONAL cache.
     *
     * @param cacheMode Cache mode.
     * @param txConcurrency TX concurrency.
     * @param txIsolation TX isolation.
     * @throws Exception If failed.
     */
    private void checkTx(CacheMode cacheMode, TransactionConcurrency txConcurrency,
        TransactionIsolation txIsolation) throws Exception {
        initialize(cacheMode, TRANSACTIONAL, txConcurrency, txIsolation);

        checkTx0();
    }

    /**
     * Check TX cache.
     */
    private void checkTx0() {

        System.out.println("BEFORE: " + evts.size());

        caches[0].invoke(key1, new Transformer());

        System.out.println("AFTER: " + evts.size());

        checkEventNodeIdsStrict(Transformer.class.getName(), idsForKeys(key1));

        assert evts.isEmpty();

        caches[0].invokeAll(keys, new Transformer());

        checkEventNodeIdsStrict(Transformer.class.getName(), idsForKeys(key1, key2));

        assert evts.isEmpty();

        System.out.println("BEFORE: " + evts.size());

        caches[0].invoke(key1, new TransformerWithInjection());

        System.out.println("AFTER: " + evts.size());

        checkEventNodeIdsStrict(TransformerWithInjection.class.getName(), idsForKeys(key1));

        assert evts.isEmpty();

        caches[0].invokeAll(keys, new TransformerWithInjection());

        checkEventNodeIdsStrict(TransformerWithInjection.class.getName(), idsForKeys(key1, key2));
    }

    /**
     * Get node IDs where the given keys must reside.
     *
     * @param keys Keys.
     * @return Node IDs.
     */
    private UUID[] idsForKeys(int... keys) {
        return idsForKeys(false, keys);
    }

    /**
     * Get primary node IDs where the given keys must reside.
     *
     * @param keys Keys.
     * @return Node IDs.
     */
    private UUID[] primaryIdsForKeys(int... keys) {
        return idsForKeys(true, keys);
    }

    /**
     * Get node IDs where the given keys must reside.
     *
     * @param primaryOnly Primary only flag.
     * @param keys Keys.
     * @return Node IDs.
     */
    private UUID[] idsForKeys(boolean primaryOnly, int... keys) {
        List<UUID> res = new ArrayList<>();

        if (cacheMode == LOCAL) {
            for (int key : keys)
                res.add(ids[0]); // Perform PUTs from the node with index 0.
        }
        else if (cacheMode == PARTITIONED) {
            for (int key : keys) {
                for (int i = 0; i < GRID_CNT; i++) {
                    if (primary(i, key) || (!primaryOnly && backup(i, key)))
                        res.add(ids[i]);
                }
            }
        }
        else if (cacheMode == REPLICATED) {
            for (int key : keys) {
                if (primaryOnly)
                    res.add(grid(0).affinity(CACHE_NAME).mapKeyToNode(key).id());
                else
                    res.addAll(Arrays.asList(ids));
            }
        }

        return res.toArray(new UUID[res.size()]);
    }

    /**
     * Ensure that events were recorded on the given nodes.
     *
     * @param cClsName Entry processor class name.
     * @param ids Event IDs.
     */
    private void checkEventNodeIdsStrict(String cClsName, UUID... ids) {
        if (ids == null)
            assertTrue(evts.isEmpty());
        else {
            assertEquals(ids.length, evts.size());

            for (UUID id : ids) {
                CacheEvent foundEvt = null;

                for (CacheEvent evt : evts) {
                    if (F.eq(id, evt.node().id())) {
                        assertEquals(cClsName, evt.closureClassName());

                        foundEvt = evt;

                        break;
                    }
                }

                if (foundEvt == null) {
                    int gridIdx = -1;

                    for (int i = 0; i < GRID_CNT; i++) {
                        if (F.eq(this.ids[i], id)) {
                            gridIdx = i;

                            break;
                        }
                    }

                    fail("Expected transform event was not triggered on the node [nodeId=" + id +
                        ", key1Primary=" + primary(gridIdx, key1) + ", key1Backup=" + backup(gridIdx, key1) +
                        ", key2Primary=" + primary(gridIdx, key2) + ", key2Backup=" + backup(gridIdx, key2) + ']');
                }
                else
                    evts.remove(foundEvt);
            }
        }
    }

    /**
     * Transform closure.
     */
    private static class Transformer implements EntryProcessor<Integer, Integer, Void>, Serializable {
        /** {@inheritDoc} */
        @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
            e.setValue(e.getValue() + 1);

            return null;
        }
    }

    /**
     * Transform closure.
     */
    private static class TransformerWithInjection implements EntryProcessor<Integer, Integer, Void>, Serializable {
        /** */
        @IgniteInstanceResource
        private transient Ignite ignite;

        /** {@inheritDoc} */
        @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
            assert ignite != null;

            e.setValue(e.getValue() + 1);

            return null;
        }
    }
}
