/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.Lock;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.configuration.Factory;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
import com.google.common.collect.Sets;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheExistsException;
import org.apache.ignite.cache.CacheInterceptor;
import org.apache.ignite.cache.CacheInterceptorAdapter;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.affinity.AffinityKeyMapper;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory;
import org.apache.ignite.internal.util.collection.IntMap;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.lang.gridfunc.ContainsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionRollbackException;
import org.jetbrains.annotations.Nullable;
import org.junit.Ignore;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;

/**
 * TODO FIXME https://issues.apache.org/jira/browse/IGNITE-11820 https://issues.apache.org/jira/browse/IGNITE-11797
 */
@SuppressWarnings({"unchecked", "ThrowableNotThrown"})
public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
    /** */
    private static final String GROUP1 = "grp1";

    /** */
    private static final String GROUP2 = "grp2";

    /** */
    private static final String GROUP3 = "grp3";

    /** */
    private static final String CACHE1 = "cache1";

    /** */
    private static final String CACHE2 = "cache2";

    /** */
    private static final int ASYNC_TIMEOUT = 5000;

    /** */
    private CacheConfiguration[] ccfgs;

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(gridName);

        if (ccfgs != null) {
            cfg.setCacheConfiguration(ccfgs);

            ccfgs = null;
        }

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected long getTestTimeout() {
        return 10 * 60_000;
    }

    /** {@inheritDoc} */
    @Override protected void afterTest() throws Exception {
        stopAllGrids();

        super.afterTest();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCloseCache1() throws Exception {
        startGrid(0);

        Ignite client = startClientGrid(1);

        IgniteCache c1 = client.createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 0, false));

        checkCacheGroup(0, GROUP1, true);
        checkCacheGroup(0, GROUP1, true);

        checkCache(0, "c1", 10);
        checkCache(1, "c1", 10);

        c1.close();

        checkCacheGroup(0, GROUP1, true);
        checkCacheGroup(1, GROUP1, false);

        checkCache(0, "c1", 10);

        assertNotNull(client.cache("c1"));

        checkCacheGroup(0, GROUP1, true);
        checkCacheGroup(1, GROUP1, true);

        checkCache(0, "c1", 10);
        checkCache(1, "c1", 10);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCreateDestroyCaches1() throws Exception {
        createDestroyCaches(1);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCreateDestroyCaches2() throws Exception {
        createDestroyCaches(5);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCreateCacheWithSameNameInAnotherGroup() throws Exception {
        startGridsMultiThreaded(2);

        final Ignite ignite = ignite(0);

        ignite.createCache(cacheConfiguration(GROUP1, CACHE1, PARTITIONED, ATOMIC, 2, false));

        GridTestUtils.assertThrows(null, new GridPlainCallable<Void>() {
            @Override public Void call() throws Exception {
                ignite(1).createCache(cacheConfiguration(GROUP2, CACHE1, PARTITIONED, ATOMIC, 2, false));
                return null;
            }
        }, CacheExistsException.class, "a cache with the same name is already started");
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCreateDestroyCachesAtomicPartitioned() throws Exception {
        createDestroyCaches(PARTITIONED, ATOMIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCreateDestroyCachesTxPartitioned() throws Exception {
        createDestroyCaches(PARTITIONED, TRANSACTIONAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCreateDestroyCachesMvccTxPartitioned() throws Exception {
        createDestroyCaches(PARTITIONED, TRANSACTIONAL_SNAPSHOT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCreateDestroyCachesAtomicReplicated() throws Exception {
        createDestroyCaches(REPLICATED, ATOMIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCreateDestroyCachesTxReplicated() throws Exception {
        createDestroyCaches(REPLICATED, TRANSACTIONAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCreateDestroyCachesMvccTxReplicated() throws Exception {
        createDestroyCaches(REPLICATED, TRANSACTIONAL_SNAPSHOT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testScanQueryAtomicPartitioned() throws Exception {
        scanQuery(PARTITIONED, ATOMIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testScanQueryTxPartitioned() throws Exception {
        scanQuery(PARTITIONED, TRANSACTIONAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testScanQueryMvccTxPartitioned() throws Exception {
        scanQuery(PARTITIONED, TRANSACTIONAL_SNAPSHOT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testScanQueryAtomicReplicated() throws Exception {
        scanQuery(REPLICATED, ATOMIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testScanQueryTxReplicated() throws Exception {
        scanQuery(REPLICATED, TRANSACTIONAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testScanQueryMvccTxReplicated() throws Exception {
        scanQuery(REPLICATED, TRANSACTIONAL_SNAPSHOT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testEntriesTtlAtomicPartitioned() throws Exception {
        entriesTtl(PARTITIONED, ATOMIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testEntriesTtlTxPartitioned() throws Exception {
        entriesTtl(PARTITIONED, TRANSACTIONAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Ignore("https://issues.apache.org/jira/browse/IGNITE-7311")
    @Test
    public void testEntriesTtlMvccTxPartitioned() throws Exception {
        entriesTtl(PARTITIONED, TRANSACTIONAL_SNAPSHOT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testEntriesTtlAtomicReplicated() throws Exception {
        entriesTtl(REPLICATED, ATOMIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testEntriesTtlTxReplicated() throws Exception {
        entriesTtl(REPLICATED, TRANSACTIONAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Ignore("https://issues.apache.org/jira/browse/IGNITE-7311")
    @Test
    public void testEntriesTtlMvccTxReplicated() throws Exception {
        entriesTtl(REPLICATED, TRANSACTIONAL_SNAPSHOT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCacheIteratorAtomicPartitioned() throws Exception {
        cacheIterator(PARTITIONED, ATOMIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCacheIteratorTxPartitioned() throws Exception {
        cacheIterator(PARTITIONED, TRANSACTIONAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCacheIteratorMvccTxPartitioned() throws Exception {
        cacheIterator(PARTITIONED, TRANSACTIONAL_SNAPSHOT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCacheIteratorAtomicReplicated() throws Exception {
        cacheIterator(REPLICATED, ATOMIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCacheIteratorTxReplicated() throws Exception {
        cacheIterator(REPLICATED, TRANSACTIONAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCacheIteratorMvccTxReplicated() throws Exception {
        cacheIterator(REPLICATED, TRANSACTIONAL_SNAPSHOT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testScanQueryMultiplePartitionsAtomicPartitioned() throws Exception {
        scanQueryMultiplePartitions(PARTITIONED, ATOMIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testScanQueryMultiplePartitionsTxPartitioned() throws Exception {
        scanQueryMultiplePartitions(PARTITIONED, TRANSACTIONAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testScanQueryMultiplePartitionsMvccTxPartitioned() throws Exception {
        scanQueryMultiplePartitions(PARTITIONED, TRANSACTIONAL_SNAPSHOT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testScanQueryMultiplePartitionsAtomicReplicated() throws Exception {
        scanQueryMultiplePartitions(REPLICATED, ATOMIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testScanQueryMultiplePartitionsTxReplicated() throws Exception {
        scanQueryMultiplePartitions(REPLICATED, TRANSACTIONAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testScanQueryMultiplePartitionsMvccTxReplicated() throws Exception {
        scanQueryMultiplePartitions(REPLICATED, TRANSACTIONAL_SNAPSHOT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testContinuousQueryTxReplicated() throws Exception {
        continuousQuery(REPLICATED, TRANSACTIONAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testContinuousQueryMvccTxReplicated() throws Exception {
        continuousQuery(REPLICATED, TRANSACTIONAL_SNAPSHOT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testContinuousQueryTxPartitioned() throws Exception {
        continuousQuery(PARTITIONED, TRANSACTIONAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testContinuousQueryMvccTxPartitioned() throws Exception {
        continuousQuery(PARTITIONED, TRANSACTIONAL_SNAPSHOT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testContinuousQueryAtomicReplicated() throws Exception {
        continuousQuery(REPLICATED, ATOMIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testContinuousQueryAtomicPartitioned() throws Exception {
        continuousQuery(PARTITIONED, ATOMIC);
    }

    /**
     * @param cacheMode Cache mode.
     * @param atomicityMode Cache atomicity mode.
     * @throws Exception If failed.
     */
    private void scanQuery(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
        int keys = 10_000;

        Integer[] data1 = generateData(keys);
        Integer[] data2 = generateData(keys);

        startGridsMultiThreaded(4);

        Ignite srv0 = ignite(0);

        srv0.createCache(cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false));
        srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false));

        IgniteCache<Integer, Integer> cache1;
        IgniteCache<Integer, Integer> cache2;

        if (atomicityMode == TRANSACTIONAL) {
            Ignite ignite = ignite(1);

            cache1 = ignite.cache(CACHE1);
            cache2 = ignite.cache(CACHE2);

            try (Transaction tx = ignite.transactions().txStart()) {
                for (int i = 0; i < keys; i++) {
                    cache1.put(i, data1[i]);
                    cache2.put(i, data2[i]);
                }

                tx.commit();
            }
        }
        else {
            // Async put ops.
            int ldrs = 4;

            List<Callable<?>> cls = new ArrayList<>(ldrs * 2);

            for (int i = 0; i < ldrs; i++) {
                cls.add(putOperation(1, ldrs, i, CACHE1, data1));
                cls.add(putOperation(2, ldrs, i, CACHE2, data2));
            }

            GridTestUtils.runMultiThreaded(cls, "loaders");
        }

        ScanQuery<Integer, Integer> qry = new ScanQuery<>();

        Set<Integer> keysSet = sequence(keys);

        for (Cache.Entry<Integer, Integer> entry : ignite(3).cache(CACHE1).query(qry)) {
            assertTrue(keysSet.remove(entry.getKey()));
            assertEquals(data1[entry.getKey()], entry.getValue());
        }

        assertTrue(keysSet.isEmpty());

        srv0.destroyCache(CACHE1);

        keysSet = sequence(keys);

        for (Cache.Entry<Integer, Integer> entry : ignite(3).cache(CACHE2).query(qry)) {
            assertTrue(keysSet.remove(entry.getKey()));
            assertEquals(data2[entry.getKey()], entry.getValue());
        }

        assertTrue(keysSet.isEmpty());
    }

    /**
     * @param cacheMode Cache mode.
     * @param atomicityMode Cache atomicity mode.
     * @throws Exception If failed.
     */
    private void continuousQuery(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
        final int keys = 10_000;

        Integer[] data1 = generateData(keys);
        Integer[] data2 = generateData(keys);

        startGridsMultiThreaded(4);

        Ignite srv0 = ignite(0);

        srv0.createCache(cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false));
        srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false));

        final AtomicInteger cntr1 = new AtomicInteger();
        final AtomicInteger cntr2 = new AtomicInteger();

        CacheEntryUpdatedListener lsnr1 = new CacheEntryUpdatedListener<Integer, Integer>() {
            @Override public void onUpdated(
                Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
                for (CacheEntryEvent<? extends Integer, ? extends Integer> ignored : evts)
                    cntr1.incrementAndGet();
            }
        };

        CacheEntryUpdatedListener lsnr2 = new CacheEntryUpdatedListener<Integer, Integer>() {
            @Override public void onUpdated(
                Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
                for (CacheEntryEvent<? extends Integer, ? extends Integer> ignored : evts)
                    cntr2.incrementAndGet();
            }
        };

        QueryCursor qry1 = ignite(2).cache(CACHE1).query(new ContinuousQuery<>().setLocalListener(lsnr1));
        QueryCursor qry2 = ignite(3).cache(CACHE2).query(new ContinuousQuery<>().setLocalListener(lsnr2));

        if (atomicityMode == TRANSACTIONAL) {
            Ignite ignite = ignite(1);

            IgniteCache<Integer, Integer> cache1 = ignite.cache(CACHE1);
            IgniteCache<Integer, Integer> cache2 = ignite.cache(CACHE2);

            try (Transaction tx = ignite.transactions().txStart()) {
                for (int i = 0; i < keys; i++) {
                    cache1.put(i, data1[i]);
                    cache2.put(i, data2[i]);
                }

                tx.commit();
            }
        }
        else {
            int ldrs = 4;

            List<Callable<?>> cls = new ArrayList<>(ldrs * 2);

            for (int i = 0; i < ldrs; i++) {
                cls.add(putOperation(1, ldrs, i, CACHE1, data1));
                cls.add(putOperation(2, ldrs, i, CACHE2, data2));
            }

            GridTestUtils.runMultiThreaded(cls, "loaders");
        }

        GridTestUtils.waitForCondition(new PA() {
            @Override public boolean apply() {
                return cntr1.get() == keys && cntr2.get() == keys;
            }
        }, 2000);

        assertEquals(cntr1.get(), keys);
        assertEquals(cntr2.get(), keys);

        qry1.close();

        Map<Integer, Integer> map = generateDataMap(10);

        srv0.cache(CACHE1).putAll(map);
        srv0.cache(CACHE2).putAll(map);

        GridTestUtils.waitForCondition(new PA() {
            @Override public boolean apply() {
                return cntr2.get() == keys + 10;
            }
        }, 2000);

        assertEquals(keys + 10, cntr2.get());

        assertEquals(keys, cntr1.get());

        qry2.close();
    }

    /**
     * @param cacheMode Cache mode.
     * @param atomicityMode Cache atomicity mode.
     * @throws Exception If failed.
     */
    private void scanQueryMultiplePartitions(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
        int keys = 10000;

        Integer[] data1 = generateData(keys);
        Integer[] data2 = generateData(keys);

        startGridsMultiThreaded(4);

        Ignite srv0 = ignite(0);

        srv0.createCache(
            cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false)
                .setAffinity(new RendezvousAffinityFunction().setPartitions(32)));
        srv0.createCache(
            cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false)
                .setAffinity(new RendezvousAffinityFunction().setPartitions(32)));

        awaitPartitionMapExchange();

        IgniteCache<Integer, Integer> cache1;
        IgniteCache<Integer, Integer> cache2;

        if (atomicityMode == TRANSACTIONAL) {
            Ignite ignite = ignite(1);

            cache1 = ignite.cache(CACHE1);
            cache2 = ignite.cache(CACHE2);

            try (Transaction tx = ignite.transactions().txStart()) {
                for (int i = 0; i < keys; i++) {
                    cache1.put(i, data1[i]);
                    cache2.put(i, data2[i]);
                }

                tx.commit();
            }
        }
        else {
            // Async put ops.
            int ldrs = 4;

            List<Callable<?>> cls = new ArrayList<>(ldrs * 2);

            for (int i = 0; i < ldrs; i++) {
                cls.add(putOperation(1, ldrs, i, CACHE1, data1));
                cls.add(putOperation(2, ldrs, i, CACHE2, data2));
            }

            GridTestUtils.runMultiThreaded(cls, "loaders");
        }

        int p = ThreadLocalRandom.current().nextInt(32);

        ScanQuery<Integer, Integer> qry = new ScanQuery().setPartition(p);

        Set<Integer> keysSet = new TreeSet<>();

        cache1 = ignite(3).cache(CACHE1);

        Affinity<Integer> aff = affinity(cache1);

        for (int i = 0; i < keys; i++) {
            if (aff.partition(i) == p)
                keysSet.add(i);
        }

        for (Cache.Entry<Integer, Integer> entry : cache1.query(qry)) {
            assertTrue(keysSet.remove(entry.getKey()));
            assertEquals(data1[entry.getKey()], entry.getValue());
        }

        assertTrue(keysSet.isEmpty());

        srv0.destroyCache(CACHE1);

        keysSet = new TreeSet<>();

        cache2 = ignite(3).cache(CACHE2);

        aff = affinity(cache2);

        for (int i = 0; i < keys; i++) {
            if (aff.partition(i) == p)
                keysSet.add(i);
        }

        for (Cache.Entry<Integer, Integer> entry : cache2.query(qry)) {
            assertTrue(keysSet.remove(entry.getKey()));
            assertEquals(data2[entry.getKey()], entry.getValue());
        }

        assertTrue(keysSet.isEmpty());
    }

    /**
     * @param cacheMode Cache mode.
     * @param atomicityMode Cache atomicity mode.
     * @throws Exception If failed.
     */
    private void cacheIterator(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
        int keys = 10000;

        Integer[] data1 = generateData(keys);
        Integer[] data2 = generateData(keys);

        startGridsMultiThreaded(4);

        Ignite srv0 = ignite(0);

        srv0.createCache(cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false));
        srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false));

        awaitPartitionMapExchange();

        if (atomicityMode == TRANSACTIONAL) {
            Ignite ignite = ignite(1);

            IgniteCache cache1 = ignite.cache(CACHE1);
            IgniteCache cache2 = ignite.cache(CACHE2);

            try (Transaction tx = ignite.transactions().txStart()) {
                for (int i = 0; i < keys; i++) {
                    cache1.put(i, data1[i]);
                    cache2.put(i, data2[i]);
                }

                tx.commit();
            }
        }
        else {
            // Async put ops.
            int ldrs = 4;

            List<Callable<?>> cls = new ArrayList<>(ldrs * 2);

            for (int i = 0; i < ldrs; i++) {
                cls.add(putOperation(1, ldrs, i, CACHE1, data1));
                cls.add(putOperation(2, ldrs, i, CACHE2, data2));
            }

            GridTestUtils.runMultiThreaded(cls, "loaders");
        }

        Set<Integer> keysSet = sequence(keys);

        for (Cache.Entry<Integer, Integer> entry : ignite(3).<Integer, Integer>cache(CACHE1)) {
            assertTrue(keysSet.remove(entry.getKey()));
            assertEquals(data1[entry.getKey()], entry.getValue());
        }

        assertTrue(keysSet.isEmpty());

        srv0.destroyCache(CACHE1);

        keysSet = sequence(keys);

        for (Cache.Entry<Integer, Integer> entry : ignite(3).<Integer, Integer>cache(CACHE2)) {
            assertTrue(keysSet.remove(entry.getKey()));
            assertEquals(data2[entry.getKey()], entry.getValue());
        }

        assertTrue(keysSet.isEmpty());
    }

    /**
     * @param cacheMode Cache mode.
     * @param atomicityMode Cache atomicity mode.
     * @throws Exception If failed.
     */
    private void entriesTtl(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
        int keys = 10000;

        final int ttl = 10000;

        Integer[] data1 = generateData(keys);
        Integer[] data2 = generateData(keys);

        startGridsMultiThreaded(4);

        Ignite srv0 = ignite(0);

        srv0.createCache(
            cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false)
                // -1 = ETERNAL       just created entries are not expiring
                // -2 = NOT_CHANGED   not to change ttl on entry update
                .setExpiryPolicyFactory(new PlatformExpiryPolicyFactory(-1, -2, ttl)).setEagerTtl(true)
        );

        srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false));

        awaitPartitionMapExchange();

        if (atomicityMode == TRANSACTIONAL) {
            Ignite ignite = ignite(1);

            IgniteCache cache1 = ignite.cache(CACHE1);
            IgniteCache cache2 = ignite.cache(CACHE2);

            try (Transaction tx = ignite.transactions().txStart()) {
                for (int i = 0; i < keys; i++) {
                    cache1.put(i, data1[i]);
                    cache2.put(i, data2[i]);
                }

                tx.commit();
            }
        }
        else {
            // async put ops
            int ldrs = 4;

            List<Callable<?>> cls = new ArrayList<>(ldrs * 2);

            for (int i = 0; i < ldrs; i++) {
                cls.add(putOperation(1, ldrs, i, CACHE1, data1));
                cls.add(putOperation(2, ldrs, i, CACHE2, data2));
            }

            GridTestUtils.runMultiThreaded(cls, "loaders");
        }

        checkData(3, CACHE1, data1);
        checkData(3, CACHE2, data2);

        srv0.destroyCache(CACHE2);

        checkData(3, CACHE1, data1);

        // Wait for expiration

        Thread.sleep((long)(ttl * 1.2));

        assertEquals(0, ignite(3).cache(CACHE1).size());
    }

    /**
     * @param cacheMode Cache mode.
     * @param atomicityMode Cache atomicity mode.
     * @throws Exception If failed.
     */
    private void createDestroyCaches(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
        int keys = 10000;

        Integer[] data1 = generateData(keys);
        Integer[] data2 = generateData(keys);

        startGridsMultiThreaded(4);

        Ignite srv0 = ignite(0);

        srv0.createCache(cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 2, false));
        srv0.createCache(cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 2, false));

        awaitPartitionMapExchange();

        if (atomicityMode == TRANSACTIONAL) {
            Ignite ignite = ignite(1);

            IgniteCache cache1 = ignite.cache(CACHE1);
            IgniteCache cache2 = ignite.cache(CACHE2);

            try (Transaction tx = ignite.transactions().txStart()) {
                for (int i = 0; i < keys; i++) {
                    cache1.put(i, data1[i]);
                    cache2.put(i, data2[i]);
                }

                tx.commit();
            }
        }
        else {
            int ldrs = 4;

            List<Callable<?>> cls = new ArrayList<>(ldrs * 2);

            for (int i = 0; i < ldrs; i++) {
                cls.add(putOperation(1, ldrs, i, CACHE1, data1));
                cls.add(putOperation(2, ldrs, i, CACHE2, data2));
            }

            GridTestUtils.runMultiThreaded(cls, "loaders");
        }

        checkLocalData(3, CACHE1, data1);
        checkLocalData(0, CACHE2, data2);

        checkData(0, CACHE1, data1);
        checkData(3, CACHE2, data2);

        ignite(1).destroyCache(CACHE2);

        startGrid(5);

        awaitPartitionMapExchange();

        checkData(5, CACHE1, data1);
        checkLocalData(5, CACHE1, data1);

        ignite(1).destroyCache(CACHE1);

        checkCacheGroup(5, GROUP1, false);
    }

    /**
     * @param idx Node index.
     * @param ldrs Loaders count.
     * @param ldrIdx Loader index.
     * @param cacheName Cache name.
     * @param data Data.
     * @return Callable for put operation.
     */
    private Callable<Void> putOperation(
            final int idx,
            final int ldrs,
            final int ldrIdx,
            final String cacheName,
            final Integer[] data) {
        return new Callable<Void>() {
            @Override public Void call() throws Exception {
                IgniteCache cache = ignite(idx).cache(cacheName);

                for (int j = 0, size = data.length; j < size; j++) {
                    if (j % ldrs == ldrIdx)
                        cache.put(j, data[j]);
                }

                return null;
            }
        };
    }

    /**
     * Creates an array of random integers.
     *
     * @param cnt Array length.
     * @return Array of random integers.
     */
    private Integer[] generateData(int cnt) {
        Random rnd = ThreadLocalRandom.current();

        Integer[] data = new Integer[cnt];

        for (int i = 0; i < data.length; i++)
            data[i] = rnd.nextInt();

        return data;
    }

    /**
     * Creates a map with random integers.
     *
     * @param cnt Map size length.
     * @return Map with random integers.
     */
    private Map<Integer, Integer> generateDataMap(int cnt) {
        return generateDataMap(0, cnt);
    }

    /**
     * Creates a map with random integers.
     *
     * @param startKey Start key.
     * @param cnt Map size length.
     * @return Map with random integers.
     */
    private Map<Integer, Integer> generateDataMap(int startKey, int cnt) {
        Random rnd = ThreadLocalRandom.current();

        Map<Integer, Integer> data = new TreeMap<>();

        for (int i = 0; i < cnt; i++)
            data.put(startKey++, rnd.nextInt());

        return data;
    }

    /**
     * @param cnt Sequence length.
     * @return Sequence of integers.
     */
    private Set<Integer> sequence(int cnt) {
        Set<Integer> res = new TreeSet<>();

        for (int i = 0; i < cnt; i++)
            res.add(i);

        return res;
    }

    /**
     * @param idx Node index.
     * @param cacheName Cache name.
     * @param data Expected data.
     * @throws Exception If failed.
     */
    private void checkData(int idx, String cacheName, Integer[] data) throws Exception {
        Set<Integer> keys = sequence(data.length);

        Set<Map.Entry<Integer, Integer>> entries =
            ignite(idx).<Integer, Integer>cache(cacheName).getAll(keys).entrySet();

        for (Map.Entry<Integer, Integer> entry : entries) {
            assertTrue(keys.remove(entry.getKey()));
            assertEquals(data[entry.getKey()], entry.getValue());
        }

        assertTrue(keys.isEmpty());
    }

    /**
     * @param idx Node index.
     * @param cacheName Cache name.
     * @param data Expected data.
     * @throws Exception If failed.
     */
    private void checkLocalData(int idx, String cacheName, Integer[] data) throws Exception {
        Ignite ignite = ignite(idx);
        ClusterNode node = ignite.cluster().localNode();
        IgniteCache cache = ignite.<Integer, Integer>cache(cacheName);

        Affinity aff = affinity(cache);

        Set<Integer> locKeys = new TreeSet<>();

        for (int key = 0; key < data.length; key++) {
            if (aff.isPrimaryOrBackup(node, key))
                locKeys.add(key);
        }

        Iterable<Cache.Entry<Integer, Integer>> locEntries = cache.localEntries(CachePeekMode.OFFHEAP);

        for (Cache.Entry<Integer, Integer> entry : locEntries) {
            assertTrue(locKeys.remove(entry.getKey()));
            assertEquals(data[entry.getKey()], entry.getValue());
        }

        assertTrue(locKeys.isEmpty());
    }

    /**
     * @param srvs Number of server nodes.
     * @throws Exception If failed.
     */
    private void createDestroyCaches(int srvs) throws Exception {
        startGridsMultiThreaded(srvs);

        checkCacheDiscoveryDataConsistent();

        Ignite srv0 = ignite(0);

        for (int i = 0; i < srvs; i++)
            checkCacheGroup(i, GROUP1, false);

        for (int iter = 0; iter < 3; iter++) {
            log.info("Iteration: " + iter);

            srv0.createCache(cacheConfiguration(GROUP1, CACHE1, PARTITIONED, ATOMIC, 2, false));

            checkCacheDiscoveryDataConsistent();

            for (int i = 0; i < srvs; i++) {
                checkCacheGroup(i, GROUP1, true);

                checkCache(i, CACHE1, 10);
            }

            srv0.createCache(cacheConfiguration(GROUP1, CACHE2, PARTITIONED, ATOMIC, 2, false));

            checkCacheDiscoveryDataConsistent();

            for (int i = 0; i < srvs; i++) {
                checkCacheGroup(i, GROUP1, true);

                checkCache(i, CACHE2, 10);
            }

            srv0.destroyCache(CACHE1);

            checkCacheDiscoveryDataConsistent();

            for (int i = 0; i < srvs; i++) {
                checkCacheGroup(i, GROUP1, true);

                checkCache(i, CACHE2, 10);
            }

            srv0.destroyCache(CACHE2);

            checkCacheDiscoveryDataConsistent();

            for (int i = 0; i < srvs; i++)
                checkCacheGroup(i, GROUP1, false);
        }
    }

    /**
     * @param idx Node index.
     * @param cacheName Cache name.
     * @param ops Number of operations to execute.
     */
    private void checkCache(int idx, String cacheName, int ops) {
        IgniteCache cache = ignite(idx).cache(cacheName);

        ThreadLocalRandom rnd = ThreadLocalRandom.current();

        for (int i = 0; i < ops; i++) {
            Integer key = rnd.nextInt();

            cache.put(key, i);

            assertEquals(i, cache.get(key));
        }
    }

    /**
     * @param cache3 {@code True} if add last cache.
     * @return Cache configurations.
     */
    private CacheConfiguration[] staticConfigurations1(boolean cache3) {
        CacheConfiguration[] ccfgs = new CacheConfiguration[cache3 ? 3 : 2];

        ccfgs[0] = cacheConfiguration(null, "cache1", PARTITIONED, ATOMIC, 2, false);
        ccfgs[1] = cacheConfiguration(GROUP1, "cache2", PARTITIONED, ATOMIC, 2, false);

        if (cache3)
            ccfgs[2] = cacheConfiguration(GROUP1, "cache3", PARTITIONED, ATOMIC, 2, false);

        return ccfgs;
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testDiscoveryDataConsistency1() throws Exception {
        ccfgs = staticConfigurations1(true);
        Ignite srv0 = startGrid(0);

        ccfgs = staticConfigurations1(true);
        startGrid(1);

        checkCacheDiscoveryDataConsistent();

        ccfgs = null;
        startGrid(2);

        checkCacheDiscoveryDataConsistent();

        srv0.createCache(cacheConfiguration(null, "cache4", PARTITIONED, ATOMIC, 2, false));

        checkCacheDiscoveryDataConsistent();

        ccfgs = staticConfigurations1(true);
        startGrid(3);

        checkCacheDiscoveryDataConsistent();

        srv0.createCache(cacheConfiguration(GROUP1, "cache5", PARTITIONED, ATOMIC, 2, false));

        ccfgs = staticConfigurations1(true);
        startGrid(4);

        checkCacheDiscoveryDataConsistent();

        for (int i = 0; i < 5; i++)
            checkCacheGroup(i, GROUP1, true);

        srv0.destroyCache("cache1");
        srv0.destroyCache("cache2");
        srv0.destroyCache("cache3");

        checkCacheDiscoveryDataConsistent();

        ccfgs = staticConfigurations1(true);
        startGrid(5);

        checkCacheDiscoveryDataConsistent();

        for (int i = 0; i < 6; i++)
            checkCacheGroup(i, GROUP1, true);

        srv0.destroyCache("cache1");
        srv0.destroyCache("cache2");
        srv0.destroyCache("cache3");
        srv0.destroyCache("cache4");
        srv0.destroyCache("cache5");

        ccfgs = staticConfigurations1(true);
        startGrid(6);

        checkCacheDiscoveryDataConsistent();

        srv0.createCache(cacheConfiguration(null, "cache4", PARTITIONED, ATOMIC, 2, false));
        srv0.createCache(cacheConfiguration(GROUP1, "cache5", PARTITIONED, ATOMIC, 2, false));

        checkCacheDiscoveryDataConsistent();

        ccfgs = staticConfigurations1(false);
        startGrid(7);

        checkCacheDiscoveryDataConsistent();

        awaitPartitionMapExchange();
    }

    /**
     * @param cnt Caches number.
     * @param grp Cache groups.
     * @param baseName Caches name prefix.
     * @return Cache configurations.
     */
    private CacheConfiguration[] cacheConfigurations(int cnt, String grp, String baseName) {
        CacheConfiguration[] ccfgs = new CacheConfiguration[cnt];

        for (int i = 0; i < cnt; i++) {
            ccfgs[i] = cacheConfiguration(grp,
                baseName + i, PARTITIONED,
                i % 2 == 0 ? TRANSACTIONAL : ATOMIC,
                2,
                false).setAffinity(new RendezvousAffinityFunction(false, 256));
        }

        return ccfgs;
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testStartManyCaches() throws Exception {
        final int CACHES = SF.apply(5_000);

        final int NODES = 4;

        for (int i = 0; i < NODES; i++) {
            ccfgs = cacheConfigurations(CACHES, GROUP1, "testCache1-");

            boolean client = i == NODES - 1;

            if (client)
                startClientGrid(i);
            else
                startGrid(i);

        }

        Ignite client = ignite(NODES - 1);

        client.createCaches(Arrays.asList(cacheConfigurations(CACHES, GROUP2, "testCache2-")));

        checkCacheDiscoveryDataConsistent();

        for (int i = 0; i < NODES; i++) {
            log.info("Check node: " + i);

            for (int c = 0; c < 10; c++) {
                int cache = ThreadLocalRandom.current().nextInt(CACHES);

                checkCache(i, "testCache1-" + cache, 1);
                checkCache(i, "testCache2-" + cache, 1);
            }
        }

        log.info("Stop nodes.");

        GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
            @Override public void apply(Integer idx) {
                stopGrid(idx);
            }
        }, NODES, "stopThread");
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testRebalance1() throws Exception {
        Ignite srv0 = startGrid(0);

        IgniteCache<Object, Object> srv0Cache1 =
            srv0.createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 2, false));
        IgniteCache<Object, Object> srv0Cache2 =
            srv0.createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false));
        IgniteCache<Object, Object> srv0Cache3 =
            srv0.createCache(cacheConfiguration(GROUP2, "c3", PARTITIONED, TRANSACTIONAL, 2, false));
        IgniteCache<Object, Object> srv0Cache4 =
            srv0.createCache(cacheConfiguration(GROUP2, "c4", PARTITIONED, TRANSACTIONAL, 2, false));
        IgniteCache<Object, Object> srv0Cache5 =
            srv0.createCache(cacheConfiguration(GROUP3, "c5", PARTITIONED, TRANSACTIONAL_SNAPSHOT, 2, false));
        IgniteCache<Object, Object> srv0Cache6 =
            srv0.createCache(cacheConfiguration(GROUP3, "c6", PARTITIONED, TRANSACTIONAL_SNAPSHOT, 2, false));

        final int ITEMS = 1_000;

        for (int i = 0; i < ITEMS; i++) {
            srv0Cache1.put(new Key1(i), i);

            srv0Cache3.put(new Key1(i), i);
            srv0Cache4.put(new Key1(i), -i);

            srv0Cache5.put(new Key1(i), i);
            srv0Cache6.put(new Key1(i), -i);
        }

        assertEquals(ITEMS, srv0Cache1.size());
        assertEquals(ITEMS, srv0Cache1.localSize());
        assertEquals(0, srv0Cache2.size());
        assertEquals(ITEMS, srv0Cache3.size());
        assertEquals(ITEMS, srv0Cache4.localSize());
        assertEquals(ITEMS, srv0Cache5.size());
        assertEquals(ITEMS, srv0Cache6.localSize());

        startGrid(1);

        awaitPartitionMapExchange();

        for (int i = 0; i < 2; i++) {
            Ignite node = ignite(i);

            IgniteCache<Object, Object> cache1 = node.cache("c1");
            IgniteCache<Object, Object> cache2 = node.cache("c2");
            IgniteCache<Object, Object> cache3 = node.cache("c3");
            IgniteCache<Object, Object> cache4 = node.cache("c4");
            IgniteCache<Object, Object> cache5 = node.cache("c5");
            IgniteCache<Object, Object> cache6 = node.cache("c6");

            assertEquals(ITEMS * 2, cache1.size(CachePeekMode.ALL));
            assertEquals(ITEMS, cache1.localSize(CachePeekMode.ALL));
            assertEquals(0, cache2.size(CachePeekMode.ALL));
            assertEquals(0, cache2.localSize(CachePeekMode.ALL));

            assertEquals(ITEMS * 2, cache3.size(CachePeekMode.ALL));
            assertEquals(ITEMS, cache3.localSize(CachePeekMode.ALL));

            assertEquals(ITEMS * 2, cache4.size(CachePeekMode.ALL));
            assertEquals(ITEMS, cache4.localSize(CachePeekMode.ALL));

            assertEquals(ITEMS * 2, cache5.size(CachePeekMode.ALL));
            assertEquals(ITEMS, cache5.localSize(CachePeekMode.ALL));

            assertEquals(ITEMS * 2, cache6.size(CachePeekMode.ALL));
            assertEquals(ITEMS, cache6.localSize(CachePeekMode.ALL));

            for (int k = 0; k < ITEMS; k++) {
                assertEquals(i, cache1.localPeek(new Key1(i)));
                assertNull(cache2.localPeek(new Key1(i)));
                assertEquals(i, cache3.localPeek(new Key1(i)));
                assertEquals(-i, cache4.localPeek(new Key1(i)));
                assertEquals(i, cache5.localPeek(new Key1(i)));
                assertEquals(-i, cache6.localPeek(new Key1(i)));
            }
        }

        for (int i = 0; i < ITEMS * 2; i++)
            srv0Cache2.put(new Key1(i), i + 1);

        Ignite srv2 = startGrid(2);

        awaitPartitionMapExchange();

        for (int i = 0; i < 3; i++) {
            Ignite node = ignite(i);

            IgniteCache<Object, Object> cache1 = node.cache("c1");
            IgniteCache<Object, Object> cache2 = node.cache("c2");
            IgniteCache<Object, Object> cache3 = node.cache("c3");
            IgniteCache<Object, Object> cache4 = node.cache("c4");
            IgniteCache<Object, Object> cache5 = node.cache("c5");
            IgniteCache<Object, Object> cache6 = node.cache("c6");

            assertEquals(ITEMS * 3, cache1.size(CachePeekMode.ALL));
            assertEquals(ITEMS, cache1.localSize(CachePeekMode.ALL));
            assertEquals(ITEMS * 6, cache2.size(CachePeekMode.ALL));
            assertEquals(ITEMS * 2, cache2.localSize(CachePeekMode.ALL));
            assertEquals(ITEMS, cache3.localSize(CachePeekMode.ALL));
            assertEquals(ITEMS, cache4.localSize(CachePeekMode.ALL));
            assertEquals(ITEMS, cache5.localSize(CachePeekMode.ALL));
            assertEquals(ITEMS, cache6.localSize(CachePeekMode.ALL));
        }

        IgniteCache<Object, Object> srv2Cache1 = srv2.cache("c1");
        IgniteCache<Object, Object> srv2Cache2 = srv2.cache("c2");

        for (int i = 0; i < ITEMS; i++)
            assertEquals(i, srv2Cache1.localPeek(new Key1(i)));

        for (int i = 0; i < ITEMS * 2; i++)
            assertEquals(i + 1, srv2Cache2.localPeek(new Key1(i)));
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testRebalance2() throws Exception {
        Ignite srv0 = startGrid(0);

        IgniteCache<Object, Object> srv0Cache1 =
            srv0.createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 0, false));
        IgniteCache<Object, Object> srv0Cache2 =
            srv0.createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 0, false));

        Affinity aff = srv0.affinity("c1");

        final int ITEMS = 2_000;

        Map<Integer, Integer> c1Data = new HashMap<>();
        Map<Integer, Integer> c2Data = new HashMap<>();

        for (int i = 0; i < ITEMS; i++) {
            srv0Cache1.put(i, i);
            c1Data.put(i, i);

            if (i % 2 == 0) {
                srv0Cache2.put(i, i);
                c2Data.put(i, i);
            }
        }

        assertEquals(ITEMS, srv0Cache1.size());
        assertEquals(ITEMS / 2, srv0Cache2.size());

        Ignite srv1 = startGrid(1);

        awaitPartitionMapExchange();

        assertEquals(ITEMS, srv0Cache1.size());
        assertEquals(ITEMS / 2, srv0Cache2.size());

        checkCacheData(c1Data, "c1");
        checkCacheData(c2Data, "c2");

        Set<Integer> srv1Parts = new HashSet<>();

        for (Integer p : aff.primaryPartitions(srv1.cluster().localNode()))
            srv1Parts.add(p);

        CacheGroupContext grpSrv0 = cacheGroup(srv0, GROUP1);
        CacheGroupContext grpSrv1 = cacheGroup(srv1, GROUP1);

        for (int p = 0; p < aff.partitions(); p++) {
            if (srv1Parts.contains(p)) {
                GridIterator<CacheDataRow> it = grpSrv0.offheap().partitionIterator(p);
                assertFalse(it.hasNext());

                it = grpSrv1.offheap().partitionIterator(p);
                assertTrue(it.hasNext());
            }
            else {
                GridIterator<CacheDataRow> it = grpSrv0.offheap().partitionIterator(p);
                assertTrue(it.hasNext());

                it = grpSrv1.offheap().partitionIterator(p);
                assertFalse(it.hasNext());
            }
        }

        c1Data = new HashMap<>();
        c2Data = new HashMap<>();

        for (int i = 0; i < ITEMS; i++) {
            srv0Cache1.put(i, i + 1);
            c1Data.put(i, i + 1);

            if (i % 2 == 0) {
                srv0Cache2.put(i, i + 1);
                c2Data.put(i, i + 1);
            }
        }

        checkCacheData(c1Data, "c1");
        checkCacheData(c2Data, "c2");
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testNoKeyIntersectTx() throws Exception {
        testNoKeyIntersect(TRANSACTIONAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testNoKeyIntersectMvccTx() throws Exception {
        testNoKeyIntersect(TRANSACTIONAL_SNAPSHOT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testNoKeyIntersectAtomic() throws Exception {
        testNoKeyIntersect(ATOMIC);
    }

    /**
     * @param atomicityMode Atomicity mode.
     * @throws Exception If failed.
     */
    private void testNoKeyIntersect(CacheAtomicityMode atomicityMode) throws Exception {
        startGrid(0);

        testNoKeyIntersect(atomicityMode, false);

        testNoKeyIntersect(atomicityMode, true);

        startGridsMultiThreaded(1, 4);

        testNoKeyIntersect(atomicityMode, false);

        testNoKeyIntersect(atomicityMode, true);
    }

    /**
     * @param keys Keys.
     * @param rnd Random.
     * @return Added key.
     */
    private Integer addKey(Set<Integer> keys, ThreadLocalRandom rnd) {
        for (;;) {
            Integer key = rnd.nextInt(100_000);

            if (keys.add(key))
                return key;
        }
    }

    /**
     * @param atomicityMode Cache atomicity mode.
     * @param heapCache On heap cache flag.
     * @throws Exception If failed.
     */
    private void testNoKeyIntersect(CacheAtomicityMode atomicityMode, boolean heapCache) throws Exception {
        Ignite srv0 = ignite(0);

        try {
            IgniteCache cache1 = srv0.
                createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, atomicityMode, 1, heapCache));

            Set<Integer> keys = new LinkedHashSet<>(30);

            ThreadLocalRandom rnd = ThreadLocalRandom.current();

            for (int i = 0; i < 10; i++) {
                Integer key = addKey(keys, rnd);

                cache1.put(key, key);
                cache1.put(new Key1(key), new Value1(key));
                cache1.put(new Key2(key), new Value2(key));
            }

            assertEquals(30, cache1.size());

            IgniteCache cache2 = srv0.
                createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, atomicityMode, 1, heapCache));

            assertEquals(30, cache1.size());
            assertEquals(0, cache2.size());

            for (Integer key : keys) {
                assertNull(cache2.get(key));
                assertNull(cache2.get(new Key1(key)));
                assertNull(cache2.get(new Key2(key)));

                cache2.put(key, key + 1);
                cache2.put(new Key1(key), new Value1(key + 1));
                cache2.put(new Key2(key), new Value2(key + 1));
            }

            assertEquals(30, cache1.size());
            assertEquals(30, cache2.size());

            for (int i = 0; i < 10; i++) {
                Integer key = addKey(keys, rnd);

                cache2.put(key, key + 1);
                cache2.put(new Key1(key), new Value1(key + 1));
                cache2.put(new Key2(key), new Value2(key + 1));
            }

            assertEquals(30, cache1.size());
            assertEquals(60, cache2.size());

            int i = 0;

            for (Integer key : keys) {
                if (i++ < 10) {
                    assertEquals(key, cache1.get(key));
                    assertEquals(new Value1(key), cache1.get(new Key1(key)));
                    assertEquals(new Value2(key), cache1.get(new Key2(key)));
                }
                else {
                    assertNull(cache1.get(key));
                    assertNull(cache1.get(new Key1(key)));
                    assertNull(cache1.get(new Key2(key)));
                }

                assertEquals(key + 1, cache2.get(key));
                assertEquals(new Value1(key + 1), cache2.get(new Key1(key)));
                assertEquals(new Value2(key + 1), cache2.get(new Key2(key)));
            }

            IgniteCache cache3 = srv0.
                createCache(cacheConfiguration(GROUP1, "c3", PARTITIONED, atomicityMode, 1, heapCache));

            assertEquals(30, cache1.size());
            assertEquals(60, cache2.size());
            assertEquals(0, cache3.size());

            for (Integer key : keys) {
                assertNull(cache3.get(key));
                assertNull(cache3.get(new Key1(key)));
                assertNull(cache3.get(new Key2(key)));
            }

            for (Integer key : keys) {
                cache3.put(key, key);
                cache3.put(new Key1(key), new Value1(key));
                cache3.put(new Key2(key), new Value2(key));
            }

            i = 0;

            for (Integer key : keys) {
                if (i++ < 10) {
                    assertEquals(key, cache1.get(key));
                    assertEquals(new Value1(key), cache1.get(new Key1(key)));
                    assertEquals(new Value2(key), cache1.get(new Key2(key)));
                }
                else {
                    assertNull(cache1.get(key));
                    assertNull(cache1.get(new Key1(key)));
                    assertNull(cache1.get(new Key2(key)));
                }

                assertEquals(key + 1, cache2.get(key));
                assertEquals(new Value1(key + 1), cache2.get(new Key1(key)));
                assertEquals(new Value2(key + 1), cache2.get(new Key2(key)));

                assertEquals(key, cache3.get(key));
                assertEquals(new Value1(key), cache3.get(new Key1(key)));
                assertEquals(new Value2(key), cache3.get(new Key2(key)));
            }

            i = 0;

            for (Integer key : keys) {
                if (i++ == 3)
                    break;

                cache1.remove(key);
                cache1.remove(new Key1(key));
                cache1.remove(new Key2(key));

                assertNull(cache1.get(key));
                assertNull(cache1.get(new Key1(key)));
                assertNull(cache1.get(new Key2(key)));

                assertEquals(key + 1, cache2.get(key));
                assertEquals(new Value1(key + 1), cache2.get(new Key1(key)));
                assertEquals(new Value2(key + 1), cache2.get(new Key2(key)));

                assertEquals(key, cache3.get(key));
                assertEquals(new Value1(key), cache3.get(new Key1(key)));
                assertEquals(new Value2(key), cache3.get(new Key2(key)));
            }

            cache1.removeAll();

            for (Integer key : keys) {
                assertNull(cache1.get(key));
                assertNull(cache1.get(new Key1(key)));
                assertNull(cache1.get(new Key2(key)));

                assertEquals(key + 1, cache2.get(key));
                assertEquals(new Value1(key + 1), cache2.get(new Key1(key)));
                assertEquals(new Value2(key + 1), cache2.get(new Key2(key)));

                assertEquals(key, cache3.get(key));
                assertEquals(new Value1(key), cache3.get(new Key1(key)));
                assertEquals(new Value2(key), cache3.get(new Key2(key)));
            }

            cache2.removeAll();

            for (Integer key : keys) {
                assertNull(cache1.get(key));
                assertNull(cache1.get(new Key1(key)));
                assertNull(cache1.get(new Key2(key)));

                assertNull(cache2.get(key));
                assertNull(cache2.get(new Key1(key)));
                assertNull(cache2.get(new Key2(key)));

                assertEquals(key, cache3.get(key));
                assertEquals(new Value1(key), cache3.get(new Key1(key)));
                assertEquals(new Value2(key), cache3.get(new Key2(key)));
            }

            if (atomicityMode == TRANSACTIONAL)
                testNoKeyIntersectTxLocks(cache1, cache2);
        }
        finally {
            srv0.destroyCaches(Arrays.asList("c1", "c2", "c3"));
        }
    }

    /**
     * @param cache1 Cache1.
     * @param cache2 Cache2.
     * @throws Exception If failed.
     */
    private void testNoKeyIntersectTxLocks(final IgniteCache cache1, final IgniteCache cache2) throws Exception {
        final Ignite node = (Ignite)cache1.unwrap(Ignite.class);

        for (int i = 0; i < 5; i++) {
            final Integer key = ThreadLocalRandom.current().nextInt(1000);

            Lock lock = cache1.lock(key);

            lock.lock();

            try {
                IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
                    @Override public Object call() throws Exception {
                        Lock lock1 = cache1.lock(key);

                        assertFalse(lock1.tryLock());

                        Lock lock2 = cache2.lock(key);

                        assertTrue(lock2.tryLock());

                        lock2.unlock();

                        return null;
                    }
                }, "lockThread");

                fut.get(10_000);
            }
            finally {
                lock.unlock();
            }

            try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
                cache1.put(key, 1);

                IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
                    @Override public Object call() throws Exception {
                        try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
                            cache2.put(key, 2);

                            tx.commit();
                        }

                        assertEquals(2, cache2.get(key));

                        return null;
                    }
                }, "txThread");

                fut.get(10_000);

                tx.commit();
            }

            assertEquals(1, cache1.get(key));
            assertEquals(2, cache2.get(key));

            try (Transaction tx = node.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
                Integer val = (Integer)cache1.get(key);

                cache1.put(key, val + 10);

                IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() {
                    @Override public Object call() throws Exception {
                        try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
                            cache2.put(key, 3);

                            tx.commit();
                        }

                        assertEquals(3, cache2.get(key));

                        return null;
                    }
                }, "txThread");

                fut.get(10_000);

                tx.commit();
            }

            assertEquals(11, cache1.get(key));
            assertEquals(3, cache2.get(key));
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCacheApiTxPartitioned() throws Exception {
        cacheApiTest(PARTITIONED, TRANSACTIONAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Ignore("https://issues.apache.org/jira/browse/IGNITE-7952")
    @Test
    public void testCacheApiMvccTxPartitioned() throws Exception {
        cacheApiTest(PARTITIONED, TRANSACTIONAL_SNAPSHOT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCacheApiTxReplicated() throws Exception {
        cacheApiTest(REPLICATED, TRANSACTIONAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Ignore("https://issues.apache.org/jira/browse/IGNITE-7952")
    @Test
    public void testCacheApiMvccTxReplicated() throws Exception {
        cacheApiTest(REPLICATED, TRANSACTIONAL_SNAPSHOT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCacheApiAtomicPartitioned() throws Exception {
        cacheApiTest(PARTITIONED, ATOMIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCacheApiAtomicReplicated() throws Exception {
        cacheApiTest(REPLICATED, ATOMIC);
    }

    /**
     * @param cacheMode Cache mode.
     * @param atomicityMode Atomicity mode.
     * @throws Exception If failed.
     */
    private void cacheApiTest(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
        startGridsMultiThreaded(4);

        startClientGrid(4);

        int[] backups = cacheMode == REPLICATED ? new int[]{Integer.MAX_VALUE} : new int[]{0, 1, 2, 3};

        for (int backups0 : backups)
            cacheApiTest(cacheMode, atomicityMode, backups0, false, false, false);

        int backups0 = cacheMode == REPLICATED ? Integer.MAX_VALUE :
            backups[ThreadLocalRandom.current().nextInt(backups.length)];

        cacheApiTest(cacheMode, atomicityMode, backups0, true, false, false);

        if (cacheMode == PARTITIONED) {
            // Here the f variable is used as a bit set where 2 last bits
            // determine whether a near cache is used on server/client side.
            // The case without near cache is already tested at this point.
            for (int f : new int[]{1, 2, 3}) {
                cacheApiTest(cacheMode, atomicityMode, backups0, false, nearSrv(f), nearClient(f));
                cacheApiTest(cacheMode, atomicityMode, backups0, true, nearSrv(f), nearClient(f));
            }
        }
    }

    /**
     * @param flag Flag.
     * @return {@code True} if near cache should be used on a client side.
     */
    private boolean nearClient(int flag) {
        return (flag & 0b01) == 0b01;
    }

    /**
     * @param flag Flag.
     * @return {@code True} if near cache should be used on a server side.
     */
    private boolean nearSrv(int flag) {
        return (flag & 0b10) == 0b10;
    }

    /**
     * @param cacheMode Cache mode.
     * @param atomicityMode Atomicity mode.
     * @param backups Number of backups.
     * @param heapCache On heap cache flag.
     * @param nearSrv {@code True} if near cache should be used on a server side.
     * @param nearClient {@code True} if near cache should be used on a client side.
     * @throws Exception If failed.
     */
    private void cacheApiTest(CacheMode cacheMode,
        CacheAtomicityMode atomicityMode,
        int backups,
        boolean heapCache,
        boolean nearSrv,
        boolean nearClient) throws Exception {
        Ignite srv0 = ignite(0);

        NearCacheConfiguration nearCfg = nearSrv ? new NearCacheConfiguration() : null;

        srv0.createCache(cacheConfiguration(GROUP1, "cache-0", cacheMode, atomicityMode, backups, heapCache)
            .setNearConfiguration(nearCfg));

        srv0.createCache(cacheConfiguration(GROUP1, "cache-1", cacheMode, atomicityMode, backups, heapCache));

        srv0.createCache(cacheConfiguration(GROUP2, "cache-2", cacheMode, atomicityMode, backups, heapCache)
            .setNearConfiguration(nearCfg));

        srv0.createCache(cacheConfiguration(null, "cache-3", cacheMode, atomicityMode, backups, heapCache));

        awaitCacheOnClient(ignite(4), "cache-3");

        if (nearClient) {
            Ignite clientNode = ignite(4);

            clientNode.createNearCache("cache-0", new NearCacheConfiguration());
            clientNode.createNearCache("cache-2", new NearCacheConfiguration());
        }

        try {
            for (final Ignite node : Ignition.allGrids()) {
                List<Callable<?>> ops = new ArrayList<>();

                for (int i = 0; i < 4; i++)
                    ops.add(testSet(node.cache("cache-" + i), cacheMode, atomicityMode, backups, heapCache, node));

                // Async operations.
                GridTestUtils.runMultiThreaded(ops, "cacheApiTest");
            }
        }
        finally {
            for (int i = 0; i < 4; i++)
                srv0.destroyCache("cache-" + i);
        }
    }

    /**
     * @param cache Cache.
     * @param cacheMode Cache mode.
     * @param atomicityMode Atomicity mode.
     * @param backups Number of backups.
     * @param heapCache On heap cache flag.
     * @param node Ignite node.
     * @return Callable for the test operations.
     */
    private Callable<?> testSet(
        final IgniteCache<Object, Object> cache,
        final CacheMode cacheMode,
        final CacheAtomicityMode atomicityMode,
        final int backups,
        final boolean heapCache,
        final Ignite node) {
        return new Callable<Void>() {
            @Override public Void call() throws Exception {
                log.info("Test cache [node=" + node.name() +
                    ", cache=" + cache.getName() +
                    ", mode=" + cacheMode +
                    ", atomicity=" + atomicityMode +
                    ", backups=" + backups +
                    ", heapCache=" + heapCache +
                    ']');

                cacheApiTest(cache);

                return null;
            }
        };
    }

    /**
     * @param cache Cache.
     * @throws Exception If failed.
     */
    private void cacheApiTest(IgniteCache cache) throws Exception {
        cachePutAllGetAllContainsAll(cache);

        cachePutAllGetAllContainsAllAsync(cache);

        cachePutRemove(cache);

        cachePutRemoveAsync(cache);

        cachePutGetContains(cache);

        cachePutGetContainsAsync(cache);

        cachePutGetAndPut(cache);

        cachePutGetAndPutAsync(cache);

        cachePutGetAndRemove(cache);

        cachePutGetAndRemoveAsync(cache);

        cachePutGetAndReplace(cache);

        cachePutGetAndReplaceAsync(cache);

        cachePutIfAbsent(cache);

        cachePutIfAbsentAsync(cache);

        cachePutGetAndPutIfAbsent(cache);

        cachePutGetAndPutIfAbsentAsync(cache);

        cacheQuery(cache);

        cacheInvokeAll(cache);

        cacheInvoke(cache);

        cacheInvokeAllAsync(cache);

        cacheInvokeAsync(cache);

        cacheDataStreamer(cache);
    }

    /**
     * @param cache Cache.
     */
    private void tearDown(IgniteCache cache) {
        cache.clear();

        cache.removeAll();
    }

    /**
     * @param cache Cache.
     * @throws Exception If failed.
     */
    private void cacheDataStreamer(final IgniteCache cache) throws Exception {
        final int keys = 400;
        final int loaders = 4;

        final Integer[] data = generateData(keys * loaders);

        // Stream through a client node.
        Ignite clientNode = ignite(4);

        List<Callable<?>> cls = new ArrayList<>(loaders);

        for (final int i : sequence(loaders)) {
            final IgniteDataStreamer ldr = clientNode.dataStreamer(cache.getName());
            ldr.allowOverwrite(true); // TODO FIXME https://issues.apache.org/jira/browse/IGNITE-11793
            ldr.autoFlushFrequency(0);

            cls.add(new Callable<Void>() {
                @Override public Void call() throws Exception {
                    List<IgniteFuture> futs = new ArrayList<>(keys);

                    for (int j = 0, size = keys * loaders; j < size; j++) {
                        if (j % loaders == i)
                            futs.add(ldr.addData(j, data[j]));

                        if (j % (100 * loaders) == 0)
                            ldr.flush();
                    }

                    ldr.flush();

                    for (IgniteFuture fut : futs)
                        fut.get();

                    return null;
                }
            });
        }

        GridTestUtils.runMultiThreaded(cls, "loaders");

        Set<Integer> keysSet = sequence(data.length);

        for (Cache.Entry<Integer, Integer> entry : (IgniteCache<Integer, Integer>)cache) {
            assertTrue(keysSet.remove(entry.getKey()));
            assertEquals(data[entry.getKey()], entry.getValue());
        }

        assertTrue(keysSet.isEmpty());

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cachePutAllGetAllContainsAll(IgniteCache cache) {
        int keys = 100;

        Map<Integer, Integer> data = generateDataMap(keys);

        cache.putAll(data);

        Map data0 = cache.getAll(data.keySet());

        assertEquals(data.size(), data0.size());

        for (Map.Entry<Integer, Integer> entry : data.entrySet())
            assertEquals(entry.getValue(), data0.get(entry.getKey()));

        assertTrue(cache.containsKeys(data.keySet()));

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cachePutAllGetAllContainsAllAsync(IgniteCache cache) {
        int keys = 100;

        Map<Integer, Integer> data = generateDataMap(keys);

        cache.putAllAsync(data).get(ASYNC_TIMEOUT);

        Map data0 = (Map)cache.getAllAsync(data.keySet()).get(ASYNC_TIMEOUT);

        assertEquals(data.size(), data0.size());

        for (Map.Entry<Integer, Integer> entry : data.entrySet())
            assertEquals(entry.getValue(), data0.get(entry.getKey()));

        assertTrue((Boolean)cache.containsKeysAsync(data.keySet()).get(ASYNC_TIMEOUT));

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cachePutRemove(IgniteCache cache) {
        Random rnd = ThreadLocalRandom.current();

        Integer key = rnd.nextInt();
        Integer val = rnd.nextInt();

        cache.put(key, val);

        assertTrue(cache.remove(key));

        assertNull(cache.get(key));

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cachePutRemoveAsync(IgniteCache cache) {
        Random rnd = ThreadLocalRandom.current();

        Integer key = rnd.nextInt();
        Integer val = rnd.nextInt();

        cache.putAsync(key, val).get(ASYNC_TIMEOUT);

        assertTrue((Boolean)cache.removeAsync(key).get(ASYNC_TIMEOUT));

        assertNull(cache.get(key));

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cachePutGetContains(IgniteCache cache) {
        Random rnd = ThreadLocalRandom.current();

        Integer key = rnd.nextInt();
        Integer val = rnd.nextInt();

        cache.put(key, val);

        Object val0 = cache.get(key);

        assertEquals(val, val0);

        assertTrue(cache.containsKey(key));

        Integer key2 = rnd.nextInt();

        while (key2.equals(key))
            key2 = rnd.nextInt();

        assertFalse(cache.containsKey(key2));

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cachePutGetContainsAsync(IgniteCache cache) {
        Random rnd = ThreadLocalRandom.current();

        Integer key = rnd.nextInt();
        Integer val = rnd.nextInt();

        cache.putAsync(key, val).get(ASYNC_TIMEOUT);

        Object val0 = cache.getAsync(key).get(ASYNC_TIMEOUT);

        assertEquals(val, val0);

        assertTrue((Boolean)cache.containsKeyAsync(key).get(ASYNC_TIMEOUT));

        Integer key2 = rnd.nextInt();

        while (key2.equals(key))
            key2 = rnd.nextInt();

        assertFalse((Boolean)cache.containsKeyAsync(key2).get(ASYNC_TIMEOUT));

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cachePutGetAndPut(IgniteCache cache) {
        Random rnd = ThreadLocalRandom.current();

        Integer key = rnd.nextInt();
        Integer val1 = rnd.nextInt();
        Integer val2 = rnd.nextInt();

        cache.put(key, val1);

        Object val0 = cache.getAndPut(key, val2);

        assertEquals(val1, val0);

        val0 = cache.get(key);

        assertEquals(val2, val0);

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cachePutGetAndPutAsync(IgniteCache cache) {
        Random rnd = ThreadLocalRandom.current();

        Integer key = rnd.nextInt();
        Integer val1 = rnd.nextInt();
        Integer val2 = rnd.nextInt();

        cache.put(key, val1);

        Object val0 = cache.getAndPutAsync(key, val2).get(ASYNC_TIMEOUT);

        assertEquals(val1, val0);

        val0 = cache.get(key);

        assertEquals(val2, val0);

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cachePutGetAndReplace(IgniteCache cache) {
        Random rnd = ThreadLocalRandom.current();

        Integer key = rnd.nextInt();
        Integer val1 = rnd.nextInt();
        Integer val2 = rnd.nextInt();

        Object val0 = cache.getAndReplace(key, val1);

        assertEquals(null, val0);

        val0 = cache.get(key);

        assertEquals(null, val0);

        cache.put(key, val1);

        val0 = cache.getAndReplace(key, val2);

        assertEquals(val1, val0);

        val0 = cache.get(key);

        assertEquals(val2, val0);

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cachePutGetAndReplaceAsync(IgniteCache cache) {
        Random rnd = ThreadLocalRandom.current();

        Integer key = rnd.nextInt();
        Integer val1 = rnd.nextInt();
        Integer val2 = rnd.nextInt();

        Object val0 = cache.getAndReplaceAsync(key, val1).get(ASYNC_TIMEOUT);

        assertEquals(null, val0);

        val0 = cache.get(key);

        assertEquals(null, val0);

        cache.put(key, val1);

        val0 = cache.getAndReplaceAsync(key, val2).get(ASYNC_TIMEOUT);

        assertEquals(val1, val0);

        val0 = cache.get(key);

        assertEquals(val2, val0);

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cachePutGetAndRemove(IgniteCache cache) {
        Random rnd = ThreadLocalRandom.current();

        Integer key = rnd.nextInt();
        Integer val = rnd.nextInt();

        cache.put(key, val);

        Object val0 = cache.getAndRemove(key);

        assertEquals(val, val0);

        val0 = cache.get(key);

        assertNull(val0);

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cachePutGetAndRemoveAsync(IgniteCache cache) {
        Random rnd = ThreadLocalRandom.current();

        Integer key = rnd.nextInt();
        Integer val = rnd.nextInt();

        cache.put(key, val);

        Object val0 = cache.getAndRemoveAsync(key).get(ASYNC_TIMEOUT);

        assertEquals(val, val0);

        val0 = cache.get(key);

        assertNull(val0);

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cachePutIfAbsent(IgniteCache cache) {
        Random rnd = ThreadLocalRandom.current();

        Integer key = rnd.nextInt();
        Integer val1 = rnd.nextInt();
        Integer val2 = rnd.nextInt();

        assertTrue(cache.putIfAbsent(key, val1));

        Object val0 = cache.get(key);

        assertEquals(val1, val0);

        assertFalse(cache.putIfAbsent(key, val2));

        val0 = cache.get(key);

        assertEquals(val1, val0);

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cachePutIfAbsentAsync(IgniteCache cache) {
        Random rnd = ThreadLocalRandom.current();

        Integer key = rnd.nextInt();
        Integer val1 = rnd.nextInt();
        Integer val2 = rnd.nextInt();

        assertTrue((Boolean)cache.putIfAbsentAsync(key, val1).get(ASYNC_TIMEOUT));

        Object val0 = cache.get(key);

        assertEquals(val1, val0);

        assertFalse((Boolean)cache.putIfAbsentAsync(key, val2).get(ASYNC_TIMEOUT));

        val0 = cache.get(key);

        assertEquals(val1, val0);

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cachePutGetAndPutIfAbsent(IgniteCache cache) {
        Random rnd = ThreadLocalRandom.current();

        Integer key = rnd.nextInt();
        Integer val1 = rnd.nextInt();
        Integer val2 = rnd.nextInt();

        cache.put(key, val1);

        Object val0 = cache.getAndPutIfAbsent(key, val2);

        assertEquals(val1, val0);

        val0 = cache.get(key);

        assertEquals(val1, val0);

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cachePutGetAndPutIfAbsentAsync(IgniteCache cache) {
        Random rnd = ThreadLocalRandom.current();

        Integer key = rnd.nextInt();
        Integer val1 = rnd.nextInt();
        Integer val2 = rnd.nextInt();

        cache.put(key, val1);

        Object val0 = cache.getAndPutIfAbsentAsync(key, val2).get(ASYNC_TIMEOUT);

        assertEquals(val1, val0);

        val0 = cache.get(key);

        assertEquals(val1, val0);

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cacheQuery(IgniteCache cache) {
        int keys = 100;

        Map<Integer, Integer> data = generateDataMap(keys);

        cache.putAll(data);

        ScanQuery<Integer, Integer> qry = new ScanQuery<>(new IgniteBiPredicate<Integer, Integer>() {
            @Override public boolean apply(Integer key, Integer val) {
                return key % 2 == 0;
            }
        });

        List<Cache.Entry<Integer, Integer>> all = cache.query(qry).getAll();

        assertEquals(all.size(), data.size() / 2);

        for (Cache.Entry<Integer, Integer> entry : all) {
            assertEquals(0, entry.getKey() % 2);
            assertEquals(entry.getValue(), data.get(entry.getKey()));
        }

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cacheInvokeAll(IgniteCache cache) {
        int keys = 100;
        Map<Integer, Integer> data = generateDataMap(keys);

        cache.putAll(data);

        Random rnd = ThreadLocalRandom.current();

        int one = rnd.nextInt();
        int two = rnd.nextInt();

        Map<Integer, CacheInvokeResult<Integer>> res = cache.invokeAll(data.keySet(), new CacheEntryProcessor<Integer, Integer, Integer>() {
            @Override public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments) throws EntryProcessorException {
                Object expected = ((Map)arguments[0]).get(entry.getKey());

                assertEquals(expected, entry.getValue());

                // Some calculation.
                return (Integer)arguments[1] + (Integer)arguments[2];
            }
        }, data, one, two);

        assertEquals(keys, res.size());
        assertEquals(one + two, (Object)res.get(0).get());

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cacheInvoke(IgniteCache cache) {
        Random rnd = ThreadLocalRandom.current();

        Integer key = rnd.nextInt();
        Integer val = rnd.nextInt();

        cache.put(key, val);

        int one = rnd.nextInt();
        int two = rnd.nextInt();

        Object res = cache.invoke(key, new CacheEntryProcessor<Integer, Integer, Integer>() {
            @Override public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments) throws EntryProcessorException {
                assertEquals(arguments[0], entry.getValue());

                // Some calculation.
                return (Integer)arguments[1] + (Integer)arguments[2];
            }
        }, val, one, two);

        assertEquals(one + two, res);

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cacheInvokeAllAsync(IgniteCache cache) {
        int keys = 100;
        Map<Integer, Integer> data = generateDataMap(keys);

        cache.putAll(data);

        Random rnd = ThreadLocalRandom.current();

        int one = rnd.nextInt();
        int two = rnd.nextInt();

        Object res0 = cache.invokeAllAsync(data.keySet(), new CacheEntryProcessor<Integer, Integer, Integer>() {
            @Override public Integer process(MutableEntry<Integer, Integer> entry,
                Object... arguments) throws EntryProcessorException {
                Object expected = ((Map)arguments[0]).get(entry.getKey());

                assertEquals(expected, entry.getValue());

                // Some calculation.
                return (Integer)arguments[1] + (Integer)arguments[2];
            }
        }, data, one, two).get(ASYNC_TIMEOUT);

        Map<Integer, CacheInvokeResult<Integer>> res = (Map<Integer, CacheInvokeResult<Integer>>)res0;

        assertEquals(keys, res.size());
        assertEquals(one + two, (Object)res.get(0).get());

        tearDown(cache);
    }

    /**
     * @param cache Cache.
     */
    private void cacheInvokeAsync(IgniteCache cache) {
        Random rnd = ThreadLocalRandom.current();

        Integer key = rnd.nextInt();
        Integer val = rnd.nextInt();

        cache.put(key, val);

        int one = rnd.nextInt();
        int two = rnd.nextInt();

        Object res = cache.invokeAsync(key, new CacheEntryProcessor<Integer, Integer, Integer>() {
            @Override public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments) throws EntryProcessorException {
                assertEquals(arguments[0], entry.getValue());

                // Some calculation.
                return (Integer)arguments[1] + (Integer)arguments[2];
            }
        }, val, one, two).get(ASYNC_TIMEOUT);

        assertEquals(one + two, res);

        tearDown(cache);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLoadCacheAtomicPartitioned() throws Exception {
        loadCache(PARTITIONED, ATOMIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLoadCacheAtomicReplicated() throws Exception {
        loadCache(REPLICATED, ATOMIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLoadCacheTxPartitioned() throws Exception {
        loadCache(PARTITIONED, TRANSACTIONAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Ignore("https://issues.apache.org/jira/browse/IGNITE-7954")
    @Test
    public void testLoadCacheMvccTxPartitioned() throws Exception {
        loadCache(PARTITIONED, TRANSACTIONAL_SNAPSHOT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLoadCacheTxReplicated() throws Exception {
        loadCache(REPLICATED, TRANSACTIONAL);
    }

    /**
     * @throws Exception If failed.
     */
    @Ignore("https://issues.apache.org/jira/browse/IGNITE-7954")
    @Test
    public void testLoadCacheMvccTxReplicated() throws Exception {
        loadCache(REPLICATED, TRANSACTIONAL_SNAPSHOT);
    }

    /**
     * @param cacheMode Cache mode.
     * @param atomicityMode Atomicity mode.
     * @throws Exception If failed.
     */
    private void loadCache(CacheMode cacheMode, CacheAtomicityMode atomicityMode) throws Exception {
        int keys = 100;

        Map<Integer, Integer> data1 = generateDataMap(keys);
        Map<Integer, Integer> data2 = generateDataMap(keys);

        Factory<? extends CacheStore<Integer, Integer>> fctr1 =
            FactoryBuilder.factoryOf(new MapBasedStore<>(data1));

        Factory<? extends CacheStore<Integer, Integer>> fctr2 =
            FactoryBuilder.factoryOf(new MapBasedStore<>(data2));

        CacheConfiguration ccfg1 = cacheConfiguration(GROUP1, CACHE1, cacheMode, atomicityMode, 1, false)
            .setCacheStoreFactory(fctr1);

        CacheConfiguration ccfg2 = cacheConfiguration(GROUP1, CACHE2, cacheMode, atomicityMode, 1, false)
            .setCacheStoreFactory(fctr2);

        Ignite node = startGrids(4);

        node.createCaches(F.asList(ccfg1, ccfg2));

        IgniteCache<Integer, Integer> cache1 = node.cache(CACHE1);
        IgniteCache<Integer, Integer> cache2 = node.cache(CACHE2);

        cache1.loadCache(null);

        checkCacheData(data1, CACHE1);

        assertEquals(0, cache2.size());

        cache2.loadCache(null);

        checkCacheData(data2, CACHE2);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testConcurrentOperationsSameKeys() throws Exception {
        final int SRVS = 4;
        final int CLIENTS = 4;
        final int NODES = SRVS + CLIENTS;

        startGrid(0);

        Ignite srv0 = startGridsMultiThreaded(1, SRVS - 1);

        startClientGridsMultiThreaded(SRVS, CLIENTS);

        srv0.createCache(cacheConfiguration(GROUP1, "a0", PARTITIONED, ATOMIC, 1, false));
        srv0.createCache(cacheConfiguration(GROUP1, "a1", PARTITIONED, ATOMIC, 1, false));
        srv0.createCache(cacheConfiguration(GROUP1, "t0", PARTITIONED, TRANSACTIONAL, 1, false));
        srv0.createCache(cacheConfiguration(GROUP1, "t1", PARTITIONED, TRANSACTIONAL, 1, false));

        F.view(G.allGrids(), ignite -> ignite.cluster().localNode().isClient())
            .forEach(ignite -> awaitCacheOnClient(ignite, "t1"));

        final List<Integer> keys = new ArrayList<>();

        for (int i = 0; i < 50; i++)
            keys.add(i);

        final AtomicBoolean err = new AtomicBoolean();

        final AtomicBoolean stop = new AtomicBoolean();

        IgniteInternalFuture fut1 = updateFuture(NODES, "a0", keys, false, stop, err);
        IgniteInternalFuture fut2 = updateFuture(NODES, "a1", keys, true, stop, err);
        IgniteInternalFuture fut3 = updateFuture(NODES, "t0", keys, false, stop, err);
        IgniteInternalFuture fut4 = updateFuture(NODES, "t1", keys, true, stop, err);

        try {
            for (int i = 0; i < 15 && !stop.get(); i++)
                U.sleep(1_000);
        }
        finally {
            stop.set(true);
        }

        fut1.get();
        fut2.get();
        fut3.get();
        fut4.get();

        assertFalse("Unexpected error, see log for details", err.get());
    }

    /**
     * @param nodes Total number of nodes.
     * @param cacheName Cache name.
     * @param keys Keys to update.
     * @param reverse {@code True} if update in reverse order.
     * @param stop Stop flag.
     * @param err Error flag.
     * @return Update future.
     */
    private IgniteInternalFuture updateFuture(final int nodes,
        final String cacheName,
        final List<Integer> keys,
        final boolean reverse,
        final AtomicBoolean stop,
        final AtomicBoolean err) {
        final AtomicInteger idx = new AtomicInteger();

        return GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
            @Override public Void call() throws Exception {
                try {
                    Ignite node = ignite(idx.getAndIncrement() % nodes);

                    log.info("Start thread [node=" + node.name() + ']');

                    IgniteCache cache = node.cache(cacheName);

                    Map<Integer, Integer> map = new LinkedHashMap<>();

                    if (reverse) {
                        for (int i = keys.size() - 1; i >= 0; i--)
                            map.put(keys.get(i), 2);
                    }
                    else {
                        for (Integer key : keys)
                            map.put(key, 1);
                    }

                    while (!stop.get())
                        cache.putAll(map);
                }
                catch (Exception e) {
                    err.set(true);

                    log.error("Unexpected error: " + e, e);

                    stop.set(true);
                }

                return null;
            }
        }, nodes * 2, "update-" + cacheName + "-" + reverse);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testConcurrentOperationsAndCacheDestroy() throws Exception {
        final int SRVS = 4;
        final int CLIENTS = 4;
        final int NODES = SRVS + CLIENTS;

        startGrid(0);

        Ignite srv0 = startGridsMultiThreaded(1, SRVS - 1);

        startClientGridsMultiThreaded(SRVS, CLIENTS);

        final int CACHES = 8;

        final int grp1Backups = ThreadLocalRandom.current().nextInt(3);
        final int grp2Backups = ThreadLocalRandom.current().nextInt(3);

        log.info("Start test [grp1Backups=" + grp1Backups + ", grp2Backups=" + grp2Backups + ']');

        for (int i = 0; i < CACHES; i++) {
            srv0.createCache(
                cacheConfiguration(GROUP1, GROUP1 + "-" + i, PARTITIONED, ATOMIC, grp1Backups, i % 2 == 0));

            srv0.createCache(
                cacheConfiguration(GROUP2, GROUP2 + "-" + i, PARTITIONED, TRANSACTIONAL, grp2Backups, i % 2 == 0));

            // TODO IGNITE-7164: add Mvcc cache to test.
        }

        F.view(G.allGrids(), ignite -> ignite.cluster().localNode().isClient())
            .forEach(ignite -> awaitCacheOnClient(ignite, GROUP2 + "-" + (CACHES - 1)));

        final AtomicInteger idx = new AtomicInteger();

        final AtomicBoolean err = new AtomicBoolean();

        final AtomicBoolean stop = new AtomicBoolean();

        IgniteInternalFuture opFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
            @Override public void run() {
                try {
                    Ignite node = ignite(idx.getAndIncrement() % NODES);

                    log.info("Start thread [node=" + node.name() + ']');

                    ThreadLocalRandom rnd = ThreadLocalRandom.current();

                    while (!stop.get()) {
                        try {
                            String grp = rnd.nextBoolean() ? GROUP1 : GROUP2;
                            int cacheIdx = rnd.nextInt(CACHES);

                            IgniteCache cache = node.cache(grp + "-" + cacheIdx);

                            for (int i = 0; i < 10; i++)
                                cacheOperation(rnd, cache);
                        }
                        catch (Exception e) {
                            if (X.hasCause(e, CacheStoppedException.class)) {
                                // Cache operation can be blocked on
                                // awaiting new topology version and cancelled with CacheStoppedException cause.

                                continue;
                            }

                            throw e;
                        }
                    }
                }
                catch (Exception e) {
                    err.set(true);

                    log.error("Unexpected error(1): " + e, e);

                    stop.set(true);
                }
            }
        }, (SRVS + CLIENTS) * 2, "op-thread");

        IgniteInternalFuture cacheFut = GridTestUtils.runAsync(new Runnable() {
            @Override public void run() {
                int cntr = 0;

                while (!stop.get()) {
                    try {
                        ThreadLocalRandom rnd = ThreadLocalRandom.current();

                        String grp;
                        int backups;

                        if (rnd.nextBoolean()) {
                            grp = GROUP1;
                            backups = grp1Backups;
                        }
                        else {
                            grp = GROUP2;
                            backups = grp2Backups;
                        }

                        Ignite node = ignite(rnd.nextInt(NODES));

                        log.info("Create cache [node=" + node.name() + ", grp=" + grp + ']');

                        IgniteCache cache = node.createCache(cacheConfiguration(grp, "tmpCache-" + cntr++,
                            PARTITIONED,
                            rnd.nextBoolean() ? ATOMIC : TRANSACTIONAL,
                            backups,
                            rnd.nextBoolean()));

                        for (int i = 0; i < 10; i++)
                            cacheOperation(rnd, cache);

                        log.info("Destroy cache [node=" + node.name() + ", grp=" + grp + ']');

                        node.destroyCache(cache.getName());
                    }
                    catch (Exception e) {
                        if (X.hasCause(e, CacheStoppedException.class)) {
                            // Cache operation can be blocked on
                            // awaiting new topology version and cancelled with CacheStoppedException cause.

                            continue;
                        }

                        err.set(true);

                        log.error("Unexpected error(2): " + e, e);

                        stop.set(true);
                    }
                }
            }
        }, "cache-destroy-thread");

        try {
            for (int i = 0; i < 30 && !stop.get(); i++)
                U.sleep(1_000);
        }
        finally {
            stop.set(true);
        }

        opFut.get();
        cacheFut.get();

        assertFalse("Unexpected error, see log for details", err.get());
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testStaticConfigurationsValidation() throws Exception {
        ccfgs = new CacheConfiguration[2];

        ccfgs[0] = new CacheConfiguration(CACHE1);
        ccfgs[0].setGroupName(GROUP1);
        ccfgs[0].setAffinity(new RendezvousAffinityFunction(false, 1024));

        ccfgs[1] = new CacheConfiguration(CACHE2);
        ccfgs[1].setGroupName(GROUP1);
        ccfgs[1].setAffinity(new RendezvousAffinityFunction(false, 512));

        try {
            startGrid(0);

            fail();
        }
        catch (IgniteCheckedException ignore) {
            // Expected exception.
        }

        ccfgs = new CacheConfiguration[3];

        ccfgs[0] = new CacheConfiguration(CACHE1);
        ccfgs[0].setGroupName(GROUP1);
        ccfgs[0].setAffinity(new RendezvousAffinityFunction(false, 16));

        ccfgs[1] = new CacheConfiguration(CACHE2);
        ccfgs[1].setGroupName(GROUP2);
        ccfgs[1].setAffinity(new RendezvousAffinityFunction(false, 512));

        ccfgs[2] = new CacheConfiguration("cache3");
        ccfgs[2].setGroupName(GROUP1);
        ccfgs[2].setAffinity(new RendezvousAffinityFunction(false, 16));

        startGrid(0);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCacheIdConflict() throws Exception {
        ccfgs = new CacheConfiguration[]{new CacheConfiguration("AaAaAa"), new CacheConfiguration("AaAaBB")};

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                startGrid(0);

                return null;
            }
        }, IgniteCheckedException.class, null);

        ccfgs = new CacheConfiguration[]{new CacheConfiguration("AaAaAa")};

        startGrid(0);

        ccfgs = new CacheConfiguration[]{new CacheConfiguration("AaAaBB")};

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                startGrid(1);

                return null;
            }
        }, IgniteCheckedException.class, null);

        assertFalse(ignite(0).cacheNames().contains("AaAaBB"));

        final Ignite ignite1 = startGrid(1);

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                ignite1.createCache(new CacheConfiguration("AaAaBB"));

                return null;
            }
        }, CacheException.class, null);

        assertFalse(ignite(0).cacheNames().contains("AaAaBB"));
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCacheGroupIdConflict1() throws Exception {
        ccfgs = new CacheConfiguration[]{new CacheConfiguration(CACHE1).setGroupName("AaAaAa"),
            new CacheConfiguration(CACHE2).setGroupName("AaAaBB")};

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                startGrid(0);

                return null;
            }
        }, IgniteCheckedException.class, null);

        ccfgs = new CacheConfiguration[]{new CacheConfiguration(CACHE1).setGroupName("AaAaAa")};

        startGrid(0);

        ccfgs = new CacheConfiguration[]{new CacheConfiguration(CACHE2).setGroupName("AaAaBB")};

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                startGrid(1);

                return null;
            }
        }, IgniteCheckedException.class, null);

        assertFalse(ignite(0).cacheNames().contains(CACHE2));

        final Ignite ignite1 = startGrid(1);

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                ignite1.createCache(new CacheConfiguration(CACHE2).setGroupName("AaAaBB"));

                return null;
            }
        }, CacheException.class, null);

        assertFalse(ignite(0).cacheNames().contains(CACHE2));
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCacheGroupIdConflict2() throws Exception {
        ccfgs = new CacheConfiguration[]{new CacheConfiguration("AaAaAa"),
            new CacheConfiguration(CACHE2).setGroupName("AaAaBB")};

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                startGrid(0);

                return null;
            }
        }, IgniteCheckedException.class, null);

        ccfgs = new CacheConfiguration[]{new CacheConfiguration("AaAaAa")};

        startGrid(0);

        ccfgs = new CacheConfiguration[]{new CacheConfiguration(CACHE2).setGroupName("AaAaBB")};

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                startGrid(1);

                return null;
            }
        }, IgniteCheckedException.class, null);

        assertFalse(ignite(0).cacheNames().contains(CACHE2));

        final Ignite ignite1 = startGrid(1);

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                ignite1.createCache(new CacheConfiguration(CACHE2).setGroupName("AaAaBB"));

                return null;
            }
        }, CacheException.class, null);

        assertFalse(ignite(0).cacheNames().contains(CACHE2));
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCacheGroupIdConflict3() throws Exception {
        ccfgs = new CacheConfiguration[]{new CacheConfiguration(CACHE2).setGroupName("AaAaBB"),
            new CacheConfiguration("AaAaAa")};

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                startGrid(0);

                return null;
            }
        }, IgniteCheckedException.class, null);

        ccfgs = new CacheConfiguration[]{new CacheConfiguration(CACHE2).setGroupName("AaAaBB")};

        startGrid(0);

        ccfgs = new CacheConfiguration[]{new CacheConfiguration("AaAaAa")};

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                startGrid(1);

                return null;
            }
        }, IgniteCheckedException.class, null);

        assertFalse(ignite(0).cacheNames().contains("AaAaAa"));

        final Ignite ignite1 = startGrid(1);

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                ignite1.createCache(new CacheConfiguration("AaAaAa"));

                return null;
            }
        }, CacheException.class, null);

        assertFalse(ignite(0).cacheNames().contains("AaAaAa"));
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCacheGroupNameConflict1() throws Exception {
        ccfgs = new CacheConfiguration[]{new CacheConfiguration("cache1"), new CacheConfiguration("cache2").setGroupName("cache1")};

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                startGrid(0);

                return null;
            }
        }, IgniteCheckedException.class, null);

        ccfgs = new CacheConfiguration[]{new CacheConfiguration("cache1")};

        startGrid(0);

        ccfgs = new CacheConfiguration[]{new CacheConfiguration("cache2").setGroupName("cache1")};

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                startGrid(1);

                return null;
            }
        }, IgniteCheckedException.class, null);

        assertFalse(ignite(0).cacheNames().contains("cache2"));

        final Ignite ignite1 = startGrid(1);

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                ignite1.createCache(new CacheConfiguration("cache2").setGroupName("cache1"));

                return null;
            }
        }, CacheException.class, null);

        assertFalse(ignite(0).cacheNames().contains("cache2"));
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCacheGroupNameConflict2() throws Exception {
        ccfgs = new CacheConfiguration[]{new CacheConfiguration("cache2").setGroupName("cache1"), new CacheConfiguration("cache1")};

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                startGrid(0);

                return null;
            }
        }, IgniteCheckedException.class, null);

        ccfgs = new CacheConfiguration[]{new CacheConfiguration("cache2").setGroupName("cache1")};

        startGrid(0);

        ccfgs = new CacheConfiguration[]{new CacheConfiguration("cache1")};

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                startGrid(1);

                return null;
            }
        }, IgniteCheckedException.class, null);

        assertFalse(ignite(0).cacheNames().contains("cache1"));

        final Ignite ignite1 = startGrid(1);

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                ignite1.createCache(new CacheConfiguration("cache1"));

                return null;
            }
        }, CacheException.class, null);

        assertFalse(ignite(0).cacheNames().contains("cache1"));
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testConfigurationConsistencyValidation() throws Exception {
        startGrids(2);

        startClientGrid(2);

        ignite(0).createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1, false));

        for (int i = 0; i < 3; i++) {
            try {
                ignite(i).createCache(cacheConfiguration(GROUP1, "c2", REPLICATED, ATOMIC, Integer.MAX_VALUE, false));

                fail();
            }
            catch (CacheException e) {
                assertTrue("Unexpected message: " + e.getMessage(),
                    e.getMessage().contains("Cache mode mismatch for caches related to the same group [groupName=grp1"));
            }

            try {
                ignite(i).createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false));

                fail();
            }
            catch (CacheException e) {
                assertTrue("Unexpected message: " + e.getMessage(),
                    e.getMessage().contains("Backups mismatch for caches related to the same group [groupName=grp1"));
            }

            try {
                ignite(i).createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, TRANSACTIONAL_SNAPSHOT, 1, false));

                fail();
            }
            catch (CacheException e) {
                assertTrue("Unexpected message: " + e.getMessage(),
                    e.getMessage().contains("Atomicity mode mismatch for caches related to the same group [groupName=grp1"));
            }
        }
    }

    /**
     * @return Cache configurations.
     */
    private CacheConfiguration[] interceptorConfigurations() {
        CacheConfiguration[] ccfgs = new CacheConfiguration[6];

        ccfgs[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 2, false).setInterceptor(new Interceptor1());
        ccfgs[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false).setInterceptor(new Interceptor2());
        ccfgs[2] = cacheConfiguration(GROUP1, "c3", PARTITIONED, TRANSACTIONAL, 2, false).setInterceptor(new Interceptor1());
        ccfgs[3] = cacheConfiguration(GROUP1, "c4", PARTITIONED, TRANSACTIONAL, 2, false).setInterceptor(new Interceptor2());
        ccfgs[4] = cacheConfiguration(GROUP1, "c5", PARTITIONED, ATOMIC, 2, false);
        ccfgs[5] = cacheConfiguration(GROUP1, "c6", PARTITIONED, TRANSACTIONAL, 2, false);

        //TODO IGNITE-9323: Check Mvcc mode.

        return ccfgs;
    }

    /**
     * Tests caches in the same group with different {@link CacheInterceptor}s.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testInterceptors() throws Exception {
        for (int i = 0; i < 4; i++) {
            ccfgs = interceptorConfigurations();

            startGrid(i);
        }

        Ignite node = ignite(0);

        checkInterceptorPut(node.cache("c1"), "v1");
        checkInterceptorPut(node.cache("c2"), "v2");
        checkInterceptorPut(node.cache("c3"), "v1");
        checkInterceptorPut(node.cache("c4"), "v2");

        checkCache(0, "c5", 10);
        checkCache(0, "c6", 10);
    }

    /**
     * @param cache Cache.
     * @param expVal Expected value.
     */
    private void checkInterceptorPut(IgniteCache cache, String expVal) {
        ThreadLocalRandom rnd = ThreadLocalRandom.current();

        for (int i = 0; i < 10; i++) {
            Integer key = rnd.nextInt();

            cache.put(key, i);

            assertEquals(expVal, cache.get(key));
        }
    }

    /**
     * @return Cache configurations.
     */
    private CacheConfiguration[] cacheStoreConfigurations() {
        CacheConfiguration[] ccfgs = new CacheConfiguration[6];

        ccfgs[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 2, false).
            setCacheStoreFactory(new StoreFactory1()).setReadThrough(true).setWriteThrough(true);

        ccfgs[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false).
            setCacheStoreFactory(new StoreFactory2()).setReadThrough(true).setWriteThrough(true);

        ccfgs[2] = cacheConfiguration(GROUP1, "c3", PARTITIONED, TRANSACTIONAL, 2, false).
            setCacheStoreFactory(new StoreFactory1()).setReadThrough(true).setWriteThrough(true);

        ccfgs[3] = cacheConfiguration(GROUP1, "c4", PARTITIONED, TRANSACTIONAL, 2, false).
            setCacheStoreFactory(new StoreFactory2()).setReadThrough(true).setWriteThrough(true);

        ccfgs[4] = cacheConfiguration(GROUP1, "c5", PARTITIONED, ATOMIC, 2, false);
        ccfgs[5] = cacheConfiguration(GROUP1, "c6", PARTITIONED, TRANSACTIONAL, 2, false);

        //TODO IGNITE-8582: Check Mvcc mode.

        return ccfgs;
    }

    /**
     * Tests caches in the same group with different {@link CacheStore}s.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testCacheStores() throws Exception {
        for (int i = 0; i < 4; i++) {
            ccfgs = cacheStoreConfigurations();

            startGrid(i);
        }

        Ignite node = ignite(0);

        checkStorePut(node.cache("c1"), Store1.map);
        assertTrue(Store2.map.isEmpty());

        checkStorePut(node.cache("c3"), Store1.map);
        assertTrue(Store2.map.isEmpty());

        Store1.map.clear();

        checkStorePut(node.cache("c2"), Store2.map);
        assertTrue(Store1.map.isEmpty());

        checkStorePut(node.cache("c4"), Store2.map);
        assertTrue(Store1.map.isEmpty());

        Store2.map.clear();

        checkCache(0, "c5", 10);
        checkCache(0, "c6", 10);

        assertTrue(Store1.map.isEmpty());
        assertTrue(Store2.map.isEmpty());
    }

    /**
     * @param cache Cache.
     * @param storeMap Cache store data.
     */
    private void checkStorePut(IgniteCache cache, ConcurrentHashMap<Object, Object> storeMap) {
        ThreadLocalRandom rnd = ThreadLocalRandom.current();

        for (int i = 0; i < 10; i++) {
            Integer key = rnd.nextInt();

            storeMap.put(key, i);

            assertEquals(i, cache.get(key));

            cache.put(key, 10_000);

            assertEquals(10_000, cache.get(key));
            assertEquals(10_000, storeMap.get(key));
        }
    }

    /**
     * @return Cache configurations.
     */
    private CacheConfiguration[] mapperConfigurations() {
        CacheConfiguration[] ccfgs = new CacheConfiguration[6];

        ccfgs[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 2, false).setAffinityMapper(new Mapper1());
        ccfgs[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false).setAffinityMapper(new Mapper2());
        ccfgs[2] = cacheConfiguration(GROUP1, "c3", PARTITIONED, TRANSACTIONAL, 2, false).setAffinityMapper(new Mapper1());
        ccfgs[3] = cacheConfiguration(GROUP1, "c4", PARTITIONED, TRANSACTIONAL, 2, false).setAffinityMapper(new Mapper2());
        ccfgs[4] = cacheConfiguration(GROUP1, "c5", PARTITIONED, ATOMIC, 2, false);
        ccfgs[5] = cacheConfiguration(GROUP1, "c6", PARTITIONED, TRANSACTIONAL, 2, false);

        return ccfgs;
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testAffinityMappers() throws Exception {
        for (int i = 0; i < 4; i++) {
            ccfgs = mapperConfigurations();

            startGrid(i);
        }

        for (int i = 0; i < 4; i++)
            checkAffinityMappers(ignite(i));

        startClientGrid(4);

        checkAffinityMappers(ignite(4));

        for (int i = 0; i < 5; i++) {
            checkCache(i, "c1", 10);
            checkCache(i, "c2", 10);
            checkCache(i, "c3", 10);
            checkCache(i, "c4", 10);
            checkCache(i, "c5", 10);
            checkCache(i, "c6", 10);
        }
    }

    /**
     * @param node Node.
     */
    private void checkAffinityMappers(Ignite node) {
        Affinity aff1 = node.affinity("c1");
        Affinity aff2 = node.affinity("c2");
        Affinity aff3 = node.affinity("c3");
        Affinity aff4 = node.affinity("c4");
        Affinity aff5 = node.affinity("c5");
        Affinity aff6 = node.affinity("c6");

        RendezvousAffinityFunction func = new RendezvousAffinityFunction();

        for (int i = 0; i < 100; i++) {
            MapperTestKey1 k = new MapperTestKey1(i, i + 10);

            assertEquals(i, aff1.partition(k));
            assertEquals(i, aff3.partition(k));
            assertEquals(i + 10, aff2.partition(k));
            assertEquals(i + 10, aff4.partition(k));

            int part;

            if (node.configuration().getMarshaller() instanceof BinaryMarshaller)
                part = func.partition(node.binary().toBinary(k));
            else
                part = func.partition(k);

            assertEquals(part, aff5.partition(k));
            assertEquals(part, aff6.partition(k));
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testContinuousQueriesMultipleGroups1() throws Exception {
        continuousQueriesMultipleGroups(1);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testContinuousQueriesMultipleGroups2() throws Exception {
        continuousQueriesMultipleGroups(4);
    }

    /**
     * @param srvs Number of server nodes.
     * @throws Exception If failed.
     */
    private void continuousQueriesMultipleGroups(int srvs) throws Exception {
        Ignite srv0 = startGrids(srvs);

        Ignite client = startClientGrid(srvs);

        client.createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1, false));
        client.createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, TRANSACTIONAL, 1, false));
        client.createCache(cacheConfiguration(GROUP1, "c3", PARTITIONED, ATOMIC, 1, false));

        client.createCache(cacheConfiguration(GROUP2, "c4", PARTITIONED, TRANSACTIONAL, 1, false));
        client.createCache(cacheConfiguration(GROUP2, "c5", PARTITIONED, ATOMIC, 1, false));
        client.createCache(cacheConfiguration(GROUP2, "c6", PARTITIONED, TRANSACTIONAL, 1, false));

        client.createCache(cacheConfiguration(null, "c7", PARTITIONED, ATOMIC, 1, false));
        client.createCache(cacheConfiguration(null, "c8", PARTITIONED, TRANSACTIONAL, 1, false));

        client.createCache(cacheConfiguration(GROUP3, "c9", PARTITIONED, TRANSACTIONAL_SNAPSHOT, 1, false));

        String[] cacheNames = {"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9"};

        AtomicInteger c1 = registerListener(client, "c1");

        for (String cache : cacheNames)
            srv0.cache(cache).put(1, 1);

        waitForEvents(c1, 1);

        for (String cache : cacheNames)
            srv0.cache(cache).put(1, 1);

        waitForEvents(c1, 1);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCacheIdSort() throws Exception {
        Ignite node = startGrid(0);

        final List<IgniteCache> caches = new ArrayList<>(3);

        caches.add(node.createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1, false)
            .setAffinity(new RendezvousAffinityFunction(false, 8))));
        caches.add(node.createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 1, false)
            .setAffinity(new RendezvousAffinityFunction(false, 8))));
        caches.add(node.createCache(cacheConfiguration(GROUP1, "c3", PARTITIONED, ATOMIC, 1, false)
            .setAffinity(new RendezvousAffinityFunction(false, 8))));

        Affinity aff = node.affinity("c1");

        final List<Integer> keys = new ArrayList<>();

        for (int i = 0; i < 1_000_000; i++) {
            if (aff.partition(i) == 0) {
                keys.add(i);

                if (keys.size() >= 10_000)
                    break;
            }
        }

        assertEquals(10_000, keys.size());

        final long stopTime = System.currentTimeMillis() + 10_000;

        GridTestUtils.runMultiThreaded(new Callable<Void>() {
            @Override public Void call() throws Exception {
                ThreadLocalRandom rnd = ThreadLocalRandom.current();

                while (System.currentTimeMillis() < stopTime) {
                    for (int i = 0; i < 100; i++) {
                        IgniteCache cache = caches.get(rnd.nextInt(3));

                        Integer key = keys.get(rnd.nextInt(10_000));

                        if (rnd.nextFloat() > 0.8f)
                            cache.remove(key);
                        else
                            cache.put(key, key);
                    }
                }

                return null;
            }
        }, 5, "update-thread");

        CacheGroupContext grp = cacheGroup(node, GROUP1);

        Integer cacheId = null;

        GridIterator<CacheDataRow> it = grp.offheap().partitionIterator(0);

        int c = 0;

        while (it.hasNext()) {
            CacheDataRow row = it.next();

            if (cacheId == null || cacheId != row.cacheId()) {
                cacheId = row.cacheId();

                c++;
            }
        }

        assertEquals(3, c);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testDataCleanup() throws Exception {
        Ignite node = startGrid(0);

        IgniteCache cache0 = node.createCache(cacheConfiguration(GROUP1, "c0", PARTITIONED, ATOMIC, 1, false));

        for (int i = 0; i < 100; i++)
            assertNull(cache0.get(i));

        for (int i = 0; i < 100; i++)
            cache0.put(i, i);

        List<CacheConfiguration> ccfgs = new ArrayList<>();

        ccfgs.add(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1, false));
        ccfgs.add(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 1, true));
        ccfgs.add(cacheConfiguration(GROUP1, "c1", PARTITIONED, TRANSACTIONAL, 1, false));
        ccfgs.add(cacheConfiguration(GROUP1, "c1", PARTITIONED, TRANSACTIONAL, 1, true));

        for (CacheConfiguration ccfg : ccfgs) {
            IgniteCache cache = node.createCache(ccfg);

            for (int i = 0; i < 100; i++)
                assertNull(cache.get(i));

            for (int i = 0; i < 100; i++)
                cache.put(i, i);

            for (int i = 0; i < 100; i++)
                assertEquals(i, cache.get(i));

            node.destroyCache(ccfg.getName());

            cache = node.createCache(ccfg);

            for (int i = 0; i < 100; i++)
                assertNull(cache.get(i));

            node.destroyCache(ccfg.getName());
        }

        for (int i = 0; i < 100; i++)
            assertEquals(i, cache0.get(i));

        node.destroyCache(cache0.getName());

        cache0 = node.createCache(cacheConfiguration(GROUP1, "c0", PARTITIONED, ATOMIC, 1, false));

        for (int i = 0; i < 100; i++)
            assertNull(cache0.get(i));

        for (int i = 0; i < 100; i++)
            cache0.put(i, i);

        for (int i = 0; i < 100; i++)
            assertEquals(i, cache0.get(i));
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testRestartsAndCacheCreateDestroy() throws Exception {
        final int SRVS = 5;

        startGrids(SRVS);

        final Ignite clientNode = startClientGrid(SRVS);

        final int CACHES = SF.applyLB(10, 2);

        final AtomicReferenceArray<IgniteCache> caches = new AtomicReferenceArray<>(CACHES);

        for (int i = 0; i < CACHES; i++) {
            CacheAtomicityMode atomicityMode = i % 2 == 0 ? ATOMIC : TRANSACTIONAL;

            caches.set(i,
                clientNode.createCache(cacheConfiguration(GROUP1, "c" + i, PARTITIONED, atomicityMode, 0, false)));
        }

        final AtomicBoolean stop = new AtomicBoolean();
        final AtomicInteger cacheCntr = new AtomicInteger();

        try {
            final int ITERATIONS_COUNT = SF.applyLB(10, 1);
            for (int i = 0; i < ITERATIONS_COUNT; i++) {
                stop.set(false);

                final AtomicReference<Exception> err = new AtomicReference<>();

                log.info("Iteration: " + i);

                IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Runnable() {
                    @Override public void run() {
                        try {
                            ThreadLocalRandom rnd = ThreadLocalRandom.current();

                            while (!stop.get()) {
                                int node = rnd.nextInt(SRVS);

                                log.info("Stop node: " + node);

                                stopGrid(node);

                                U.sleep(500);

                                log.info("Start node: " + node);

                                startGrid(node);

                                try {
                                    if (rnd.nextBoolean())
                                        awaitPartitionMapExchange();
                                }
                                catch (Exception ignore) {
                                    // No-op.
                                }
                            }
                        }
                        catch (Exception e) {
                            log.error("Unexpected error: " + e, e);

                            err.set(e);

                            stop.set(true);
                        }
                    }
                });

                IgniteInternalFuture<?> cacheFut = GridTestUtils.runAsync(new Runnable() {
                    @Override public void run() {
                        try {
                            ThreadLocalRandom rnd = ThreadLocalRandom.current();

                            while (!stop.get()) {
                                int idx = rnd.nextInt(CACHES);

                                IgniteCache cache = caches.get(idx);

                                if (cache != null && caches.compareAndSet(idx, cache, null)) {
                                    log.info("Destroy cache: " + cache.getName());

                                    clientNode.destroyCache(cache.getName());

                                    CacheAtomicityMode atomicityMode = rnd.nextBoolean() ? ATOMIC : TRANSACTIONAL;

                                    String name = "newName-" + cacheCntr.incrementAndGet();

                                    cache = clientNode.createCache(
                                        cacheConfiguration(GROUP1, name, PARTITIONED, atomicityMode, 0, false));

                                    caches.set(idx, cache);
                                }
                            }
                        }
                        catch (Exception e) {
                            log.error("Unexpected error: " + e, e);

                            err.set(e);

                            stop.set(true);
                        }
                    }
                });

                IgniteInternalFuture opFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
                    @Override public void run() {
                        ThreadLocalRandom rnd = ThreadLocalRandom.current();

                        while (!stop.get()) {
                            try {
                                int idx = rnd.nextInt(CACHES);

                                IgniteCache cache = caches.get(idx);

                                if (cache != null && caches.compareAndSet(idx, cache, null)) {
                                    try {
                                        for (int i = 0; i < 10; i++)
                                            cacheOperation(rnd, cache);
                                    }
                                    catch (Exception e) {
                                        if (X.hasCause(e, CacheStoppedException.class) ||
                                            (X.hasCause(e, CacheInvalidStateException.class) &&
                                                X.hasCause(e, TransactionRollbackException.class))
                                        ) {
                                            // Cache operation can be blocked on
                                            // awaiting new topology version and cancelled with CacheStoppedException cause.
                                            // Cache operation can failed
                                            // if a node was stopped during transaction.
                                            continue;
                                        }

                                        throw e;
                                    }
                                    finally {
                                        caches.set(idx, cache);
                                    }
                                }
                            }
                            catch (Exception e) {
                                err.set(e);

                                log.error("Unexpected error: " + e, e);

                                stop.set(true);
                            }
                        }
                    }
                }, 8, "op-thread");

                Thread.sleep(SF.applyLB(10_000, 1_000));

                stop.set(true);

                restartFut.get();
                cacheFut.get();
                opFut.get();

                assertNull("Unexpected error during test, see log for details", err.get());

                awaitPartitionMapExchange();

                Set<Integer> cacheIds = new HashSet<>();

                for (int c = 0; c < CACHES; c++) {
                    IgniteCache cache = caches.get(c);

                    assertNotNull(cache);

                    assertTrue(cacheIds.add(CU.cacheId(cache.getName())));
                }

                for (int n = 0; n < SRVS; n++) {
                    CacheGroupContext grp = cacheGroup(ignite(n), GROUP1);

                    assertNotNull(grp);

                    for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) {
                        IntMap<Object> cachesMap = GridTestUtils.getFieldValue(part, "cacheMaps");

                        assertTrue(cachesMap.size() <= cacheIds.size());

                        cachesMap.forEach((cacheId, v) -> assertTrue(cachesMap.containsKey(cacheId)));
                    }
                }
            }
        }
        finally {
            stop.set(true);
        }
    }

    /**
     * @param cntr Counter.
     * @param expEvts Expected events number.
     * @throws Exception If failed.
     */
    private void waitForEvents(final AtomicInteger cntr, final int expEvts) throws Exception {
        GridTestUtils.waitForCondition(new GridAbsPredicate() {
            @Override public boolean apply() {
                if (cntr.get() < expEvts)
                    log.info("Wait for events [rcvd=" + cntr.get() + ", exp=" + expEvts + ']');

                return false;
            }
        }, 5000);

        assertEquals(expEvts, cntr.get());
        assertTrue(cntr.compareAndSet(expEvts, 0));
    }

    /**
     * @param node Node.
     * @param cacheName Cache name.
     * @return Received events counter.
     */
    private AtomicInteger registerListener(Ignite node, String cacheName) {
        ContinuousQuery qry = new ContinuousQuery();

        final AtomicInteger cntr = new AtomicInteger();

        qry.setLocalListener(new CacheEntryUpdatedListener() {
            @Override public void onUpdated(Iterable iterable) {
                for (Object evt : iterable)
                    cntr.incrementAndGet();
            }
        });

        node.cache(cacheName).query(qry);

        return cntr;
    }

    /**
     *
     */
    static class Mapper1 implements AffinityKeyMapper {
        /** {@inheritDoc} */
        @Override public Object affinityKey(Object key) {
            if (key instanceof MapperTestKey1)
                return ((MapperTestKey1)key).p1;
            else if (key instanceof BinaryObject)
                ((BinaryObject)key).field("p1");

            return key;
        }

        /** {@inheritDoc} */
        @Override public void reset() {
            // No-op.
        }
    }

    /**
     *
     */
    static class Mapper2 implements AffinityKeyMapper {
        /** {@inheritDoc} */
        @Override public Object affinityKey(Object key) {
            if (key instanceof MapperTestKey1)
                return ((MapperTestKey1)key).p2;
            else if (key instanceof BinaryObject)
                ((BinaryObject)key).field("p2");

            return key;
        }

        /** {@inheritDoc} */
        @Override public void reset() {
            // No-op.
        }
    }

    /**
     *
     */
    static class MapperTestKey1 {
        /** */
        final int p1;

        /** */
        final int p2;

        /**
         * @param p1 Field1.
         * @param p2 Field2.
         */
        public MapperTestKey1(int p1, int p2) {
            this.p1 = p1;
            this.p2 = p2;
        }

        /** {@inheritDoc} */
        @Override public boolean equals(Object o) {
            if (this == o)
                return true;

            if (o == null || getClass() != o.getClass())
                return false;

            MapperTestKey1 testKey1 = (MapperTestKey1)o;

            return p1 == testKey1.p1 && p2 == testKey1.p2;
        }

        /** {@inheritDoc} */
        @Override public int hashCode() {
            int res = p1;
            res = 31 * res + p2;
            return res;
        }
    }

    /**
     *
     */
    static class Interceptor1 extends CacheInterceptorAdapter<Object, Object> {
        /** {@inheritDoc} */
        @Override public Object onBeforePut(Cache.Entry<Object, Object> entry, Object newVal) {
            return "v1";
        }
    }

    /**
     *
     */
    static class Interceptor2 extends CacheInterceptorAdapter<Object, Object> {
        /** {@inheritDoc} */
        @Override public Object onBeforePut(Cache.Entry<Object, Object> entry, Object newVal) {
            return "v2";
        }
    }

    /**
     *
     */
    static class StoreFactory1 implements Factory<CacheStore> {
        /** {@inheritDoc} */
        @Override public CacheStore create() {
            return new Store1();
        }
    }

    /**
     *
     */
    static class Store1 extends CacheStoreAdapter {
        /** */
        static ConcurrentHashMap<Object, Object> map = new ConcurrentHashMap<>();

        /** {@inheritDoc} */
        @Override public Object load(Object key) throws CacheLoaderException {
            return map.get(key);
        }

        /** {@inheritDoc} */
        @Override public void write(Cache.Entry entry) throws CacheWriterException {
            map.put(entry.getKey(), entry.getValue());
        }

        /** {@inheritDoc} */
        @Override public void delete(Object key) throws CacheWriterException {
            map.remove(key);
        }
    }

    /**
     *
     */
    static class StoreFactory2 implements Factory<CacheStore> {
        /** {@inheritDoc} */
        @Override public CacheStore create() {
            return new Store2();
        }
    }

    /**
     *
     */
    static class Store2 extends CacheStoreAdapter {
        /** */
        static ConcurrentHashMap<Object, Object> map = new ConcurrentHashMap<>();

        /** {@inheritDoc} */
        @Override public Object load(Object key) throws CacheLoaderException {
            return map.get(key);
        }

        /** {@inheritDoc} */
        @Override public void write(Cache.Entry entry) throws CacheWriterException {
            map.put(entry.getKey(), entry.getValue());
        }

        /** {@inheritDoc} */
        @Override public void delete(Object key) throws CacheWriterException {
            map.remove(key);
        }
    }

    /**
     * @param rnd Random.
     * @param cache Cache.
     */
    private void cacheOperation(ThreadLocalRandom rnd, IgniteCache cache) {
        final int KEYS = 10_000;

        Integer key = rnd.nextInt(KEYS);

        switch (rnd.nextInt(6)) {
            case 0:
                cache.put(key, 1);

                break;

            case 1:
                cache.get(key);

                break;

            case 2:
                cache.remove(key);

                break;

            case 3:
                cache.localPeek(key);

                break;

            case 4:
                Set<Integer> keys = new HashSet<>();

                for (int i = 0; i < 5; i++)
                    keys.add(rnd.nextInt(KEYS));

                cache.getAll(keys);

                break;

            case 5:
                Map<Integer, Integer> map = new TreeMap<>();

                for (int i = 0; i < 5; i++)
                    map.put(rnd.nextInt(KEYS), i);

                cache.putAll(map);

                break;
        }
    }

    /**
     *
     */
    static class Key1 implements Serializable {
        /** */
        private int id;

        /**
         * @param id ID.
         */
        Key1(int id) {
            this.id = id;
        }

        /** {@inheritDoc} */
        @Override public boolean equals(Object o) {
            if (this == o)
                return true;

            if (o == null || getClass() != o.getClass())
                return false;

            Key1 key = (Key1)o;

            return id == key.id;
        }

        /** {@inheritDoc} */
        @Override public int hashCode() {
            return id;
        }
    }

    /**
     *
     */
    static class Key2 implements Serializable {
        /** */
        private int id;

        /**
         * @param id ID.
         */
        Key2(int id) {
            this.id = id;
        }

        /** {@inheritDoc} */
        @Override public boolean equals(Object o) {
            if (this == o)
                return true;

            if (o == null || getClass() != o.getClass())
                return false;

            Key2 key = (Key2)o;

            return id == key.id;
        }

        /** {@inheritDoc} */
        @Override public int hashCode() {
            return id;
        }
    }

    /**
     *
     */
    static class Value1 implements Serializable {
        /** */
        private int val;

        /**
         * @param val Value.
         */
        public Value1(int val) {
            this.val = val;
        }

        /** {@inheritDoc} */
        @Override public boolean equals(Object o) {
            if (this == o)
                return true;

            if (o == null || getClass() != o.getClass())
                return false;

            Value1 val1 = (Value1)o;

            return val == val1.val;
        }

        /** {@inheritDoc} */
        @Override public int hashCode() {
            return val;
        }
    }

    /**
     *
     */
    static class Value2 implements Serializable {
        /** */
        private int val;

        /**
         * @param val Value.
         */
        public Value2(int val) {
            this.val = val;
        }

        /** {@inheritDoc} */
        @Override public boolean equals(Object o) {
            if (this == o)
                return true;

            if (o == null || getClass() != o.getClass())
                return false;

            Value2 val2 = (Value2)o;

            return val == val2.val;
        }

        /** {@inheritDoc} */
        @Override public int hashCode() {
            return val;
        }
    }

    /**
     * @param grpName Cache group name.
     * @param name Cache name.
     * @param cacheMode Cache mode.
     * @param atomicityMode Atomicity mode.
     * @param backups Backups number.
     * @param heapCache On heap cache flag.
     * @return Cache configuration.
     */
    private CacheConfiguration cacheConfiguration(
        String grpName,
        String name,
        CacheMode cacheMode,
        CacheAtomicityMode atomicityMode,
        int backups,
        boolean heapCache
    ) {
        CacheConfiguration ccfg = new CacheConfiguration();

        ccfg.setName(name);
        ccfg.setGroupName(grpName);
        ccfg.setAtomicityMode(atomicityMode);
        ccfg.setBackups(backups);
        ccfg.setCacheMode(cacheMode);
        ccfg.setWriteSynchronizationMode(FULL_SYNC);
        ccfg.setOnheapCacheEnabled(heapCache);

        return ccfg;
    }

    /**
     * @param idx Node index.
     * @param grpName Cache group name.
     * @param expGrp {@code True} if cache group should be created.
     * @throws IgniteCheckedException If failed.
     */
    private void checkCacheGroup(int idx, final String grpName, final boolean expGrp) throws IgniteCheckedException {
        final IgniteKernal node = (IgniteKernal)ignite(idx);

        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
            @Override public boolean apply() {
                return expGrp == (cacheGroup(node, grpName) != null);
            }
        }, 1000));

        assertNotNull(node.context().cache().cache(CU.UTILITY_CACHE_NAME));
    }

    /**
     * @param node Node.
     * @param grpName Cache group name.
     * @return Cache group.
     */
    private CacheGroupContext cacheGroup(Ignite node, String grpName) {
        for (CacheGroupContext grp : ((IgniteKernal)node).context().cache().cacheGroups()) {
            if (grpName.equals(grp.name()))
                return grp;
        }

        return null;
    }

    /**
     *
     */
    static class MapBasedStore<K, V> implements CacheStore<K, V>, Serializable {
        /** */
        private final Map<K, V> src;

        /**
         * @param src Source map.
         */
        MapBasedStore(Map<K, V> src) {
            this.src = src;
        }

        /** {@inheritDoc} */
        @Override public void loadCache(IgniteBiInClosure<K, V> clo, @Nullable Object... args) throws CacheLoaderException {
            for (Map.Entry<K, V> e : src.entrySet())
                clo.apply(e.getKey(), e.getValue());
        }

        /** {@inheritDoc} */
        @Override public void sessionEnd(boolean commit) throws CacheWriterException {
            // No-op
        }

        /** {@inheritDoc} */
        @Override public V load(K key) throws CacheLoaderException {
            return src.get(key);
        }

        /** {@inheritDoc} */
        @Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws CacheLoaderException {
            return F.view(src, new ContainsPredicate<>(Sets.newHashSet(keys)));
        }

        /** {@inheritDoc} */
        @Override public void write(Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException {
            throw new UnsupportedOperationException();
        }

        /** {@inheritDoc} */
        @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) {
            throw new UnsupportedOperationException();
        }

        /** {@inheritDoc} */
        @Override public void delete(Object key) throws CacheWriterException {
            throw new UnsupportedOperationException();
        }

        /** {@inheritDoc} */
        @Override public void deleteAll(Collection<?> keys) throws CacheWriterException {
            throw new UnsupportedOperationException();
        }
    }
}
