/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import javax.cache.Cache;
import javax.cache.CacheException;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.expiry.TouchedExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.CacheNameResource;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.resources.ServiceResource;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CachePeekMode.ALL;
import static org.apache.ignite.cache.CachePeekMode.ONHEAP;
import static org.apache.ignite.cache.CachePeekMode.PRIMARY;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_LOCKED;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_UNLOCKED;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;

/**
 * Full API cache test.
 */
@SuppressWarnings("TransientFieldInNonSerializableClass")
public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstractSelfTest {
    /** Test timeout */
    private static final long TEST_TIMEOUT = 60 * 1000;

    /** Service name. */
    private static final String SERVICE_NAME1 = "testService1";

    /** */
    public static final CacheEntryProcessor<String, Integer, String> ERR_PROCESSOR =
        new CacheEntryProcessor<String, Integer, String>() {
            /** */
            private static final long serialVersionUID = 0L;

            @Override public String process(MutableEntry<String, Integer> e, Object... args) {
                throw new RuntimeException("Failed!");
            }
        };

    /** Increment processor for invoke operations. */
    public static final EntryProcessor<String, Integer, String> INCR_PROCESSOR = new IncrementEntryProcessor();

    /** Increment processor for invoke operations with IgniteEntryProcessor. */
    public static final CacheEntryProcessor<String, Integer, String> INCR_IGNITE_PROCESSOR =
        new CacheEntryProcessor<String, Integer, String>() {
            /** */
            private static final long serialVersionUID = 0L;

            @Override public String process(MutableEntry<String, Integer> e, Object... args) {
                return INCR_PROCESSOR.process(e, args);
            }
        };

    /** Increment processor for invoke operations. */
    public static final EntryProcessor<String, Integer, String> RMV_PROCESSOR = new RemoveEntryProcessor();

    /** Increment processor for invoke operations with IgniteEntryProcessor. */
    public static final CacheEntryProcessor<String, Integer, String> RMV_IGNITE_PROCESSOR =
        new CacheEntryProcessor<String, Integer, String>() {
            /** */
            private static final long serialVersionUID = 0L;

            @Override public String process(MutableEntry<String, Integer> e, Object... args) {
                return RMV_PROCESSOR.process(e, args);
            }
        };

    /** Dflt grid. */
    protected static transient Ignite dfltIgnite;

    /** */
    private static Map<String, CacheConfiguration[]> cacheCfgMap;

    /** */
    @Before
    public void beforeGridCacheAbstractFullApiSelfTest() {
        Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-9543", MvccFeatureChecker.forcedMvcc());
    }

    /** {@inheritDoc} */
    @Override protected long getTestTimeout() {
        return TEST_TIMEOUT;
    }

    /** {@inheritDoc} */
    @Override protected int gridCount() {
        return 1;
    }

    /** {@inheritDoc} */
    @Override protected boolean swapEnabled() {
        return true;
    }

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-9543", MvccFeatureChecker.forcedMvcc());

        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);

        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);

        cfg.setIncludeEventTypes(
            EVT_CACHE_OBJECT_READ,
            EVT_CACHE_OBJECT_LOCKED,
            EVT_CACHE_OBJECT_UNLOCKED);

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected void beforeTestsStarted() throws Exception {
        initStoreStrategy();

        if (cacheStartType() == CacheStartMode.STATIC)
            super.beforeTestsStarted();
        else {
            cacheCfgMap = Collections.synchronizedMap(new HashMap<String, CacheConfiguration[]>());

            if (cacheStartType() == CacheStartMode.NODES_THEN_CACHES) {
                super.beforeTestsStarted();

                for (Map.Entry<String, CacheConfiguration[]> entry : cacheCfgMap.entrySet()) {
                    Ignite ignite = grid(entry.getKey());

                    for (CacheConfiguration cfg : entry.getValue())
                        ignite.getOrCreateCache(cfg);
                }

                awaitPartitionMapExchange();
            }
            else {
                int cnt = gridCount();

                assert cnt >= 1 : "At least one grid must be started";

                for (int i = 0; i < cnt; i++) {
                    Ignite ignite = startGrid(i);

                    CacheConfiguration[] cacheCfgs = cacheCfgMap.get(ignite.name());

                    for (CacheConfiguration cfg : cacheCfgs)
                        ignite.createCache(cfg);
                }

                if (cnt > 1)
                    checkTopology(cnt);

                awaitPartitionMapExchange();
            }

            cacheCfgMap = null;
        }

        for (int i = 0; i < gridCount(); i++)
            info("Grid " + i + ": " + grid(i).localNode().id());
    }

    /**
     * Checks that any invoke returns result.
     *
     * @throws Exception if something goes bad.
     */
    @Ignore("https://issues.apache.org/jira/browse/IGNITE-4380")
    @Test
    public void testInvokeAllMultithreaded() throws Exception {
        final IgniteCache<String, Integer> cache = jcache();
        final int threadCnt = 4;
        final int cnt = 5000;

        final Set<String> keys = Collections.singleton("myKey");

        GridTestUtils.runMultiThreaded(new Runnable() {
            @Override public void run() {
                for (int i = 0; i < cnt; i++) {
                    final Map<String, EntryProcessorResult<String>> res = cache.invokeAll(keys, INCR_PROCESSOR);

                    assertEquals(1, res.size());
                }
            }
        }, threadCnt, "testInvokeAllMultithreaded");

        assertEquals(cnt * threadCnt, (int)cache.get("myKey"));
    }

    /**
     * Checks that skipStore flag gets overridden inside a transaction.
     */
    @Test
    public void testWriteThroughTx() {
        String key = "writeThroughKey";

        storeStgy.removeFromStore(key);

        IgniteCache<String, Integer> cache = jcache(0);

        try (final Transaction transaction = grid(0).transactions().txStart()) {
            // retrieve market type from the grid
            Integer old = cache.withSkipStore().get(key);

            assertNull(old);

            // update the grid
            cache.put(key, 2);

            // finally commit the transaction
            transaction.commit();
        }

        assertEquals(2, storeStgy.getFromStore(key));
    }

    /**
     * Checks that skipStore flag gets overridden inside a transaction.
     */
    @Test
    public void testNoReadThroughTx() {
        String key = "writeThroughKey";

        IgniteCache<String, Integer> cache = jcache(0);

        storeStgy.resetStore();

        cache.put(key, 1);

        storeStgy.putToStore(key, 2);

        try (final Transaction transaction = grid(0).transactions().txStart()) {
            Integer old = cache.get(key);

            assertEquals((Integer)1, old);

            // update the grid
            cache.put(key, 2);

            // finally commit the transaction
            transaction.commit();
        }

        assertEquals(0, storeStgy.getReads());
    }

    /** {@inheritDoc} */
    @Override protected Ignite startGrid(String igniteInstanceName, GridSpringResourceContext ctx) throws Exception {
        if (cacheCfgMap == null)
            return super.startGrid(igniteInstanceName, ctx);

        IgniteConfiguration cfg = getConfiguration(igniteInstanceName);

        cacheCfgMap.put(igniteInstanceName, cfg.getCacheConfiguration());

        cfg.setCacheConfiguration();

        if (!isRemoteJvm(igniteInstanceName))
            return IgnitionEx.start(optimize(cfg), ctx);
        else
            return startRemoteGrid(igniteInstanceName, optimize(cfg), ctx);
    }

    /** {@inheritDoc} */
    @Override protected void beforeTest() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        assertEquals(0, cache.localSize());
        assertEquals(0, cache.size());

        super.beforeTest();

        assertEquals(0, cache.localSize());
        assertEquals(0, cache.size());

        dfltIgnite = grid(0);
    }

    /** {@inheritDoc} */
    @Override protected void afterTest() throws Exception {
        super.afterTest();

        IgniteCache<String, Integer> cache = jcache();

        assertEquals(0, cache.localSize());
        assertEquals(0, cache.size());
        assertEquals(0, cache.size(ONHEAP));

        dfltIgnite = null;
    }

    /**
     * @return A not near-only cache.
     */
    protected IgniteCache<String, Integer> fullCache() {
        return jcache();
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testSize() throws Exception {
        assert jcache().localSize() == 0;

        int size = 10;

        final Map<String, Integer> map = new HashMap<>();

        for (int i = 0; i < size; i++)
            map.put("key" + i, i);

        // Put in primary nodes to avoid near readers which will prevent entry from being cleared.
        Map<ClusterNode, Collection<String>> mapped = grid(0).<String>affinity(DEFAULT_CACHE_NAME).mapKeysToNodes(map.keySet());

        for (int i = 0; i < gridCount(); i++) {
            Collection<String> keys = mapped.get(grid(i).localNode());

            if (!F.isEmpty(keys)) {
                for (String key : keys)
                    jcache(i).put(key, map.get(key));
            }
        }

        map.remove("key0");

        mapped = grid(0).<String>affinity(DEFAULT_CACHE_NAME).mapKeysToNodes(map.keySet());

        for (int i = 0; i < gridCount(); i++) {
            // Will actually delete entry from map.
            CU.invalidate(jcache(i), "key0");

            assertNull("Failed check for grid: " + i, jcache(i).localPeek("key0", ONHEAP));

            Collection<String> keysCol = mapped.get(grid(i).localNode());

            assert jcache(i).localSize() != 0 || F.isEmpty(keysCol);
        }

        for (int i = 0; i < gridCount(); i++)
            executeOnLocalOrRemoteJvm(i, new CheckCacheSizeTask(map));

        for (int i = 0; i < gridCount(); i++) {
            Collection<String> keysCol = mapped.get(grid(i).localNode());

            assertEquals("Failed check for grid: " + i, !F.isEmpty(keysCol) ? keysCol.size() : 0,
                jcache(i).localSize(PRIMARY));
        }

        int globalPrimarySize = map.size();

        for (int i = 0; i < gridCount(); i++)
            assertEquals(globalPrimarySize, jcache(i).size(PRIMARY));

        // Check how many instances of any given key there is in the cluster.
        int globalSize = 0;

        for (String key : map.keySet())
            globalSize += affinity(jcache()).mapKeyToPrimaryAndBackups(key).size();

        for (int i = 0; i < gridCount(); i++)
            assertEquals(globalSize, jcache(i).size(ALL));
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testContainsKey() throws Exception {
        jcache().put("testContainsKey", 1);

        checkContainsKey(true, "testContainsKey");
        checkContainsKey(false, "testContainsKeyWrongKey");
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testContainsKeyTx() throws Exception {
        if (!txEnabled())
            return;

        IgniteCache<String, Integer> cache = jcache();

        IgniteTransactions txs = ignite(0).transactions();

        for (int i = 0; i < 10; i++) {
            String key = String.valueOf(i);

            try (Transaction tx = txs.txStart()) {
                assertNull(key, cache.get(key));

                assertFalse(cache.containsKey(key));

                tx.commit();
            }

            try (Transaction tx = txs.txStart()) {
                assertNull(key, cache.get(key));

                cache.put(key, i);

                assertTrue(cache.containsKey(key));

                tx.commit();
            }
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testContainsKeysTx() throws Exception {
        if (!txEnabled())
            return;

        IgniteCache<String, Integer> cache = jcache();

        IgniteTransactions txs = ignite(0).transactions();

        Set<String> keys = new HashSet<>();

        for (int i = 0; i < 10; i++) {
            String key = String.valueOf(i);

            keys.add(key);
        }

        try (Transaction tx = txs.txStart()) {
            for (String key : keys)
                assertNull(key, cache.get(key));

            assertFalse(cache.containsKeys(keys));

            tx.commit();
        }

        try (Transaction tx = txs.txStart()) {
            for (String key : keys)
                assertNull(key, cache.get(key));

            for (String key : keys)
                cache.put(key, 0);

            assertTrue(cache.containsKeys(keys));

            tx.commit();
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testRemoveInExplicitLocks() throws Exception {
        if (lockingEnabled()) {
            IgniteCache<String, Integer> cache = jcache();

            cache.put("a", 1);

            Lock lock = cache.lockAll(ImmutableSet.of("a", "b", "c", "d"));

            lock.lock();

            try {
                cache.remove("a");

                // Make sure single-key operation did not remove lock.
                cache.putAll(F.asMap("b", 2, "c", 3, "d", 4));
            }
            finally {
                lock.unlock();
            }
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testRemoveAllSkipStore() throws Exception {
        IgniteCache<String, Integer> jcache = jcache();

        jcache.putAll(F.asMap("1", 1, "2", 2, "3", 3));

        jcache.withSkipStore().removeAll();

        assertEquals((Integer)1, jcache.get("1"));
        assertEquals((Integer)2, jcache.get("2"));
        assertEquals((Integer)3, jcache.get("3"));
    }

    /**
     * @throws IgniteCheckedException If failed.
     */
    @Test
    public void testAtomicOps() throws IgniteCheckedException {
        IgniteCache<String, Integer> c = jcache();

        final int cnt = 10;

        for (int i = 0; i < cnt; i++)
            assertNull(c.getAndPutIfAbsent("k" + i, i));

        for (int i = 0; i < cnt; i++) {
            boolean wrong = i % 2 == 0;

            String key = "k" + i;

            boolean res = c.replace(key, wrong ? i + 1 : i, -1);

            assertEquals(wrong, !res);
        }

        for (int i = 0; i < cnt; i++) {
            boolean success = i % 2 != 0;

            String key = "k" + i;

            boolean res = c.remove(key, -1);

            assertTrue(success == res);
        }
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGet() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key1", 1);
        cache.put("key2", 2);

        assert cache.get("key1") == 1;
        assert cache.get("key2") == 2;
        assert cache.get("wrongKey") == null;
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGetEntry() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key1", 1);
        cache.put("key2", 2);

        CacheEntry<String, Integer> key1e = cache.getEntry("key1");
        CacheEntry<String, Integer> key2e = cache.getEntry("key2");
        CacheEntry<String, Integer> wrongKeye = cache.getEntry("wrongKey");

        assert key1e.getValue() == 1;
        assert key1e.getKey().equals("key1");
        assert key1e.version() != null;

        assert key2e.getValue() == 2;
        assert key2e.getKey().equals("key2");
        assert key2e.version() != null;

        assert wrongKeye == null;
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGetAsync() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key1", 1);
        cache.put("key2", 2);

        IgniteFuture<Integer> fut1 = cache.getAsync("key1");

        IgniteFuture<Integer> fut2 = cache.getAsync("key2");

        IgniteFuture<Integer> fut3 = cache.getAsync("wrongKey");

        assert fut1.get() == 1;
        assert fut2.get() == 2;
        assert fut3.get() == null;
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGetAsyncOld() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key1", 1);
        cache.put("key2", 2);

        IgniteCache<String, Integer> cacheAsync = cache.withAsync();

        cacheAsync.get("key1");

        IgniteFuture<Integer> fut1 = cacheAsync.future();

        cacheAsync.get("key2");

        IgniteFuture<Integer> fut2 = cacheAsync.future();

        cacheAsync.get("wrongKey");

        IgniteFuture<Integer> fut3 = cacheAsync.future();

        assert fut1.get() == 1;
        assert fut2.get() == 2;
        assert fut3.get() == null;
    }


    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGetAll() throws Exception {
        Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;

        final IgniteCache<String, Integer> cache = jcache();

        try {
            cache.put("key1", 1);
            cache.put("key2", 2);

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                cache.getAll(null).isEmpty();

                return null;
            }
        }, NullPointerException.class, null);

        assert cache.getAll(Collections.<String>emptySet()).isEmpty();

        Map<String, Integer> map1 = cache.getAll(ImmutableSet.of("key1", "key2", "key9999"));

        info("Retrieved map1: " + map1);

        assert 2 == map1.size() : "Invalid map: " + map1;

        assertEquals(1, (int)map1.get("key1"));
        assertEquals(2, (int)map1.get("key2"));
        assertNull(map1.get("key9999"));

        Map<String, Integer> map2 = cache.getAll(ImmutableSet.of("key1", "key2", "key9999"));

        info("Retrieved map2: " + map2);

        assert 2 == map2.size() : "Invalid map: " + map2;

        assertEquals(1, (int)map2.get("key1"));
        assertEquals(2, (int)map2.get("key2"));
        assertNull(map2.get("key9999"));

        // Now do the same checks but within transaction.
        if (txShouldBeUsed()) {
            try (Transaction tx0 = transactions().txStart()) {
                assert cache.getAll(Collections.<String>emptySet()).isEmpty();

                map1 = cache.getAll(ImmutableSet.of("key1", "key2", "key9999"));

                info("Retrieved map1: " + map1);

                assert 2 == map1.size() : "Invalid map: " + map1;

                assertEquals(1, (int)map1.get("key1"));
                assertEquals(2, (int)map1.get("key2"));
                assertNull(map1.get("key9999"));

                map2 = cache.getAll(ImmutableSet.of("key1", "key2", "key9999"));

                info("Retrieved map2: " + map2);

                assert 2 == map2.size() : "Invalid map: " + map2;

                assertEquals(1, (int)map2.get("key1"));
                assertEquals(2, (int)map2.get("key2"));
                assertNull(map2.get("key9999"));

                tx0.commit();
            }
        }
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGetEntries() throws Exception {
        Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;

        final IgniteCache<String, Integer> cache = jcache();

        try {
            cache.put("key1", 1);
            cache.put("key2", 2);

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                cache.getEntries(null).isEmpty();

                return null;
            }
        }, NullPointerException.class, null);

        assert cache.getEntries(Collections.<String>emptySet()).isEmpty();

        Collection<CacheEntry<String, Integer>> c1 = cache.getEntries(ImmutableSet.of("key1", "key2", "key9999"));

        info("Retrieved c1: " + c1);

        assert 2 == c1.size() : "Invalid collection: " + c1;

        boolean b1 = false;
        boolean b2 = false;

        for (CacheEntry<String, Integer> e : c1) {
            if (e.getKey().equals("key1") && e.getValue().equals(1))
                b1 = true;

            if (e.getKey().equals("key2") && e.getValue().equals(2))
                b2 = true;
        }

        assertTrue(b1 && b2);

        Collection<CacheEntry<String, Integer>> c2 = cache.getEntries(ImmutableSet.of("key1", "key2", "key9999"));

        info("Retrieved c2: " + c2);

        assert 2 == c2.size() : "Invalid collection: " + c2;

        b1 = false;
        b2 = false;

        for (CacheEntry<String, Integer> e : c2) {
            if (e.getKey().equals("key1") && e.getValue().equals(1))
                b1 = true;

            if (e.getKey().equals("key2") && e.getValue().equals(2))
                b2 = true;
        }

        assertTrue(b1 && b2);

        // Now do the same checks but within transaction.
        if (txShouldBeUsed()) {
            try (Transaction tx0 = transactions().txStart()) {
                assert cache.getEntries(Collections.<String>emptySet()).isEmpty();

                c1 = cache.getEntries(ImmutableSet.of("key1", "key2", "key9999"));

                info("Retrieved c1: " + c1);

                assert 2 == c1.size() : "Invalid collection: " + c1;

                b1 = false;
                b2 = false;

                for (CacheEntry<String, Integer> e : c1) {
                    if (e.getKey().equals("key1") && e.getValue().equals(1))
                        b1 = true;

                    if (e.getKey().equals("key2") && e.getValue().equals(2))
                        b2 = true;
                }

                assertTrue(b1 && b2);

                c2 = cache.getEntries(ImmutableSet.of("key1", "key2", "key9999"));

                info("Retrieved c2: " + c2);

                assert 2 == c2.size() : "Invalid collection: " + c2;

                b1 = false;
                b2 = false;

                for (CacheEntry<String, Integer> e : c2) {
                    if (e.getKey().equals("key1") && e.getValue().equals(1))
                        b1 = true;

                    if (e.getKey().equals("key2") && e.getValue().equals(2))
                        b2 = true;
                }

                assertTrue(b1 && b2);

                tx0.commit();
            }
        }
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGetAllWithLastNull() throws Exception {
        final IgniteCache<String, Integer> cache = jcache();

        final Set<String> c = new LinkedHashSet<>();

        c.add("key1");
        c.add(null);

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                cache.getAll(c);

                return null;
            }
        }, NullPointerException.class, null);
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGetAllWithFirstNull() throws Exception {
        final IgniteCache<String, Integer> cache = jcache();

        final Set<String> c = new LinkedHashSet<>();

        c.add(null);
        c.add("key1");

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                cache.getAll(c);

                return null;
            }
        }, NullPointerException.class, null);
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGetAllWithInTheMiddle() throws Exception {
        final IgniteCache<String, Integer> cache = jcache();

        final Set<String> c = new LinkedHashSet<>();

        c.add("key1");
        c.add(null);
        c.add("key2");

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                cache.getAll(c);

                return null;
            }
        }, NullPointerException.class, null);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testGetTxNonExistingKey() throws Exception {
        if (txShouldBeUsed()) {
            try (Transaction ignored = transactions().txStart()) {
                assert jcache().get("key999123") == null;
            }
        }
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGetAllAsync() throws Exception {
        final IgniteCache<String, Integer> cache = jcache();

        cache.put("key1", 1);
        cache.put("key2", 2);

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                cache.getAllAsync(null);

                return null;
            }
        }, NullPointerException.class, null);

        IgniteFuture<Map<String, Integer>> fut2 = cache.getAllAsync(Collections.<String>emptySet());

        IgniteFuture<Map<String, Integer>> fut3 = cache.getAllAsync(ImmutableSet.of("key1", "key2"));

        assert fut2.get().isEmpty();
        assert fut3.get().size() == 2 : "Invalid map: " + fut3.get();
        assert fut3.get().get("key1") == 1;
        assert fut3.get().get("key2") == 2;
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGetAllAsyncOld() throws Exception {
        final IgniteCache<String, Integer> cache = jcache();

        final IgniteCache<String, Integer> cacheAsync = cache.withAsync();

        cache.put("key1", 1);
        cache.put("key2", 2);

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                cacheAsync.getAll(null);

                return null;
            }
        }, NullPointerException.class, null);

        cacheAsync.getAll(Collections.<String>emptySet());
        IgniteFuture<Map<String, Integer>> fut2 = cacheAsync.future();

        cacheAsync.getAll(ImmutableSet.of("key1", "key2"));
        IgniteFuture<Map<String, Integer>> fut3 = cacheAsync.future();

        assert fut2.get().isEmpty();
        assert fut3.get().size() == 2 : "Invalid map: " + fut3.get();
        assert fut3.get().get("key1") == 1;
        assert fut3.get().get("key2") == 2;
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testPut() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        assert cache.getAndPut("key1", 1) == null;
        assert cache.getAndPut("key2", 2) == null;

        // Check inside transaction.
        assert cache.get("key1") == 1;
        assert cache.get("key2") == 2;

        // Put again to check returned values.
        assert cache.getAndPut("key1", 1) == 1;
        assert cache.getAndPut("key2", 2) == 2;

        checkContainsKey(true, "key1");
        checkContainsKey(true, "key2");

        assert cache.get("key1") != null;
        assert cache.get("key2") != null;
        assert cache.get("wrong") == null;

        // Check outside transaction.
        checkContainsKey(true, "key1");
        checkContainsKey(true, "key2");

        assert cache.get("key1") == 1;
        assert cache.get("key2") == 2;
        assert cache.get("wrong") == null;

        assertEquals((Integer)1, cache.getAndPut("key1", 10));
        assertEquals((Integer)2, cache.getAndPut("key2", 11));
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testPutTx() throws Exception {
        if (txShouldBeUsed()) {
            IgniteCache<String, Integer> cache = jcache();

            try (Transaction tx = transactions().txStart()) {
                assert cache.getAndPut("key1", 1) == null;
                assert cache.getAndPut("key2", 2) == null;

                // Check inside transaction.
                assert cache.get("key1") == 1;
                assert cache.get("key2") == 2;

                // Put again to check returned values.
                assert cache.getAndPut("key1", 1) == 1;
                assert cache.getAndPut("key2", 2) == 2;

                assert cache.get("key1") != null;
                assert cache.get("key2") != null;
                assert cache.get("wrong") == null;

                tx.commit();
            }

            // Check outside transaction.
            checkContainsKey(true, "key1");
            checkContainsKey(true, "key2");

            assert cache.get("key1") == 1;
            assert cache.get("key2") == 2;
            assert cache.get("wrong") == null;

            assertEquals((Integer)1, cache.getAndPut("key1", 10));
            assertEquals((Integer)2, cache.getAndPut("key2", 11));
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformOptimisticReadCommitted() throws Exception {
        checkTransform(OPTIMISTIC, READ_COMMITTED);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformOptimisticRepeatableRead() throws Exception {
        checkTransform(OPTIMISTIC, REPEATABLE_READ);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformPessimisticReadCommitted() throws Exception {
        checkTransform(PESSIMISTIC, READ_COMMITTED);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformPessimisticRepeatableRead() throws Exception {
        checkTransform(PESSIMISTIC, REPEATABLE_READ);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testIgniteTransformOptimisticReadCommitted() throws Exception {
        checkIgniteTransform(OPTIMISTIC, READ_COMMITTED);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testIgniteTransformOptimisticRepeatableRead() throws Exception {
        checkIgniteTransform(OPTIMISTIC, REPEATABLE_READ);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testIgniteTransformPessimisticReadCommitted() throws Exception {
        checkIgniteTransform(PESSIMISTIC, READ_COMMITTED);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testIgniteTransformPessimisticRepeatableRead() throws Exception {
        checkIgniteTransform(PESSIMISTIC, REPEATABLE_READ);
    }

    /**
     * @param concurrency Concurrency.
     * @param isolation Isolation.
     * @throws Exception If failed.
     */
    private void checkIgniteTransform(TransactionConcurrency concurrency, TransactionIsolation isolation)
        throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key2", 1);
        cache.put("key3", 3);

        Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, isolation) : null;

        try {
            assertEquals("null", cache.invoke("key1", INCR_IGNITE_PROCESSOR));
            assertEquals("1", cache.invoke("key2", INCR_IGNITE_PROCESSOR));
            assertEquals("3", cache.invoke("key3", RMV_IGNITE_PROCESSOR));

            if (tx != null)
                tx.commit();
        }
        catch (Exception e) {
            e.printStackTrace();

            throw e;
        }
        finally {
            if (tx != null)
                tx.close();
        }

        assertEquals((Integer)1, cache.get("key1"));
        assertEquals((Integer)2, cache.get("key2"));
        assertNull(cache.get("key3"));

        for (int i = 0; i < gridCount(); i++)
            assertNull("Failed for cache: " + i, jcache(i).localPeek("key3", ONHEAP));

        cache.remove("key1");
        cache.put("key2", 1);
        cache.put("key3", 3);

        assertEquals("null", cache.invoke("key1", INCR_IGNITE_PROCESSOR));
        assertEquals("1", cache.invoke("key2", INCR_IGNITE_PROCESSOR));
        assertEquals("3", cache.invoke("key3", RMV_IGNITE_PROCESSOR));

        assertEquals((Integer)1, cache.get("key1"));
        assertEquals((Integer)2, cache.get("key2"));
        assertNull(cache.get("key3"));

        for (int i = 0; i < gridCount(); i++)
            assertNull(jcache(i).localPeek("key3", ONHEAP));
    }

    /**
     * @param concurrency Concurrency.
     * @param isolation Isolation.
     * @throws Exception If failed.
     */
    private void checkTransform(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key2", 1);
        cache.put("key3", 3);

        Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, isolation) : null;

        try {
            assertEquals("null", cache.invoke("key1", INCR_PROCESSOR));
            assertEquals("1", cache.invoke("key2", INCR_PROCESSOR));
            assertEquals("3", cache.invoke("key3", RMV_PROCESSOR));

            if (tx != null)
                tx.commit();
        }
        catch (Exception e) {
            e.printStackTrace();

            throw e;
        }
        finally {
            if (tx != null)
                tx.close();
        }

        assertEquals((Integer)1, cache.get("key1"));
        assertEquals((Integer)2, cache.get("key2"));
        assertNull(cache.get("key3"));

        for (int i = 0; i < gridCount(); i++)
            assertNull("Failed for cache: " + i, jcache(i).localPeek("key3", ONHEAP));

        cache.remove("key1");
        cache.put("key2", 1);
        cache.put("key3", 3);

        assertEquals("null", cache.invoke("key1", INCR_PROCESSOR));
        assertEquals("1", cache.invoke("key2", INCR_PROCESSOR));
        assertEquals("3", cache.invoke("key3", RMV_PROCESSOR));

        assertEquals((Integer)1, cache.get("key1"));
        assertEquals((Integer)2, cache.get("key2"));
        assertNull(cache.get("key3"));

        for (int i = 0; i < gridCount(); i++)
            assertNull(jcache(i).localPeek("key3", ONHEAP));
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformAllOptimisticReadCommitted() throws Exception {
        checkTransformAll(OPTIMISTIC, READ_COMMITTED);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformAllOptimisticRepeatableRead() throws Exception {
        checkTransformAll(OPTIMISTIC, REPEATABLE_READ);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformAllPessimisticReadCommitted() throws Exception {
        checkTransformAll(PESSIMISTIC, READ_COMMITTED);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformAllPessimisticRepeatableRead() throws Exception {
        checkTransformAll(PESSIMISTIC, REPEATABLE_READ);
    }

    /**
     * @param concurrency Transaction concurrency.
     * @param isolation Transaction isolation.
     * @throws Exception If failed.
     */
    private void checkTransformAll(TransactionConcurrency concurrency, TransactionIsolation isolation)
        throws Exception {
        final IgniteCache<String, Integer> cache = jcache();

        cache.put("key2", 1);
        cache.put("key3", 3);

        if (txShouldBeUsed()) {
            Map<String, EntryProcessorResult<String>> res;

            try (Transaction tx = ignite(0).transactions().txStart(concurrency, isolation)) {
                res = cache.invokeAll(F.asSet("key1", "key2", "key3"), INCR_PROCESSOR);

                tx.commit();
            }

            assertEquals((Integer)1, cache.get("key1"));
            assertEquals((Integer)2, cache.get("key2"));
            assertEquals((Integer)4, cache.get("key3"));

            assertEquals("null", res.get("key1").get());
            assertEquals("1", res.get("key2").get());
            assertEquals("3", res.get("key3").get());

            assertEquals(3, res.size());

            cache.remove("key1");
            cache.put("key2", 1);
            cache.put("key3", 3);
        }

        Map<String, EntryProcessorResult<String>> res = cache.invokeAll(F.asSet("key1", "key2", "key3"), RMV_PROCESSOR);

        for (int i = 0; i < gridCount(); i++) {
            assertNull(jcache(i).localPeek("key1", ONHEAP));
            assertNull(jcache(i).localPeek("key2", ONHEAP));
            assertNull(jcache(i).localPeek("key3", ONHEAP));
        }

        assertEquals("null", res.get("key1").get());
        assertEquals("1", res.get("key2").get());
        assertEquals("3", res.get("key3").get());

        assertEquals(3, res.size());

        cache.remove("key1");
        cache.put("key2", 1);
        cache.put("key3", 3);

        res = cache.invokeAll(F.asSet("key1", "key2", "key3"), INCR_PROCESSOR);

        assertEquals((Integer)1, cache.get("key1"));
        assertEquals((Integer)2, cache.get("key2"));
        assertEquals((Integer)4, cache.get("key3"));

        assertEquals("null", res.get("key1").get());
        assertEquals("1", res.get("key2").get());
        assertEquals("3", res.get("key3").get());

        assertEquals(3, res.size());

        cache.remove("key1");
        cache.put("key2", 1);
        cache.put("key3", 3);

        res = cache.invokeAll(F.asMap("key1", INCR_PROCESSOR, "key2", INCR_PROCESSOR, "key3", INCR_PROCESSOR));

        assertEquals((Integer)1, cache.get("key1"));
        assertEquals((Integer)2, cache.get("key2"));
        assertEquals((Integer)4, cache.get("key3"));

        assertEquals("null", res.get("key1").get());
        assertEquals("1", res.get("key2").get());
        assertEquals("3", res.get("key3").get());

        assertEquals(3, res.size());
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformAllWithNulls() throws Exception {
        final IgniteCache<String, Integer> cache = jcache();

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                cache.invokeAll((Set<String>)null, INCR_PROCESSOR);

                return null;
            }
        }, NullPointerException.class, null);

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                cache.invokeAll(F.asSet("key1"), null);

                return null;
            }
        }, NullPointerException.class, null);

        {
            final Set<String> keys = new LinkedHashSet<>(2);

            keys.add("key1");
            keys.add(null);

            GridTestUtils.assertThrows(log, new Callable<Void>() {
                @Override public Void call() throws Exception {
                    cache.invokeAll(keys, INCR_PROCESSOR);

                    return null;
                }
            }, NullPointerException.class, null);

            GridTestUtils.assertThrows(log, new Callable<Void>() {
                @Override public Void call() throws Exception {
                    cache.invokeAll(F.asSet("key1"), null);

                    return null;
                }
            }, NullPointerException.class, null);
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformSequentialOptimisticNoStart() throws Exception {
        checkTransformSequential0(false, OPTIMISTIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformSequentialPessimisticNoStart() throws Exception {
        checkTransformSequential0(false, PESSIMISTIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformSequentialOptimisticWithStart() throws Exception {
        checkTransformSequential0(true, OPTIMISTIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformSequentialPessimisticWithStart() throws Exception {
        checkTransformSequential0(true, PESSIMISTIC);
    }

    /**
     * @param startVal Whether to put value.
     * @param concurrency Concurrency.
     * @throws Exception If failed.
     */
    private void checkTransformSequential0(boolean startVal, TransactionConcurrency concurrency)
        throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        final String key = primaryKeysForCache(cache, 1).get(0);

        Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, READ_COMMITTED) : null;

        try {
            if (startVal)
                cache.put(key, 2);
            else
                assertEquals(null, cache.get(key));

            Integer expRes = startVal ? 2 : null;

            assertEquals(String.valueOf(expRes), cache.invoke(key, INCR_PROCESSOR));

            expRes = startVal ? 3 : 1;

            assertEquals(String.valueOf(expRes), cache.invoke(key, INCR_PROCESSOR));

            expRes++;

            assertEquals(String.valueOf(expRes), cache.invoke(key, INCR_PROCESSOR));

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        Integer exp = (startVal ? 2 : 0) + 3;

        assertEquals(exp, cache.get(key));

        for (int i = 0; i < gridCount(); i++) {
            if (ignite(i).affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(grid(i).localNode(), key))
                assertEquals(exp, peek(jcache(i), key));
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformAfterRemoveOptimistic() throws Exception {
        checkTransformAfterRemove(OPTIMISTIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformAfterRemovePessimistic() throws Exception {
        checkTransformAfterRemove(PESSIMISTIC);
    }

    /**
     * @param concurrency Concurrency.
     * @throws Exception If failed.
     */
    private void checkTransformAfterRemove(TransactionConcurrency concurrency) throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key", 4);

        Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, READ_COMMITTED) : null;

        try {
            cache.remove("key");

            cache.invoke("key", INCR_PROCESSOR);
            cache.invoke("key", INCR_PROCESSOR);
            cache.invoke("key", INCR_PROCESSOR);

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        assertEquals((Integer)3, cache.get("key"));
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformReturnValueGetOptimisticReadCommitted() throws Exception {
        checkTransformReturnValue(false, OPTIMISTIC, READ_COMMITTED);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformReturnValueGetOptimisticRepeatableRead() throws Exception {
        checkTransformReturnValue(false, OPTIMISTIC, REPEATABLE_READ);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformReturnValueGetPessimisticReadCommitted() throws Exception {
        checkTransformReturnValue(false, PESSIMISTIC, READ_COMMITTED);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformReturnValueGetPessimisticRepeatableRead() throws Exception {
        checkTransformReturnValue(false, PESSIMISTIC, REPEATABLE_READ);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformReturnValuePutInTx() throws Exception {
        checkTransformReturnValue(true, OPTIMISTIC, READ_COMMITTED);
    }

    /**
     * @param put Whether to put value.
     * @param concurrency Concurrency.
     * @param isolation Isolation.
     * @throws Exception If failed.
     */
    private void checkTransformReturnValue(boolean put,
        TransactionConcurrency concurrency,
        TransactionIsolation isolation)
        throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        if (!put)
            cache.put("key", 1);

        Transaction tx = txShouldBeUsed() ? ignite(0).transactions().txStart(concurrency, isolation) : null;

        try {
            if (put)
                cache.put("key", 1);

            cache.invoke("key", INCR_PROCESSOR);

            assertEquals((Integer)2, cache.get("key"));

            if (tx != null) {
                // Second get inside tx. Make sure read value is not transformed twice.
                assertEquals((Integer)2, cache.get("key"));

                tx.commit();
            }
        }
        finally {
            if (tx != null)
                tx.close();
        }
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGetAndPutAsyncOld() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        IgniteCache<String, Integer> cacheAsync = cache.withAsync();

        cache.put("key1", 1);
        cache.put("key2", 2);

        cacheAsync.getAndPut("key1", 10);

        IgniteFuture<Integer> fut1 = cacheAsync.future();

        cacheAsync.getAndPut("key2", 11);

        IgniteFuture<Integer> fut2 = cacheAsync.future();

        assertEquals((Integer)1, fut1.get(5000));
        assertEquals((Integer)2, fut2.get(5000));

        assertEquals((Integer)10, cache.get("key1"));
        assertEquals((Integer)11, cache.get("key2"));
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGetAndPutAsync() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key1", 1);
        cache.put("key2", 2);

        IgniteFuture<Integer> fut1 = cache.getAndPutAsync("key1", 10);

        IgniteFuture<Integer> fut2 = cache.getAndPutAsync("key2", 11);

        assertEquals((Integer)1, fut1.get(5000));
        assertEquals((Integer)2, fut2.get(5000));

        assertEquals((Integer)10, cache.get("key1"));
        assertEquals((Integer)11, cache.get("key2"));
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testPutAsyncOld0() throws Exception {
        IgniteCache<String, Integer> cacheAsync = jcache().withAsync();

        cacheAsync.getAndPut("key1", 0);

        IgniteFuture<Integer> fut1 = cacheAsync.future();

        cacheAsync.getAndPut("key2", 1);

        IgniteFuture<Integer> fut2 = cacheAsync.future();

        assert fut1.get(5000) == null;
        assert fut2.get(5000) == null;
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testPutAsync0() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        IgniteFuture<Integer> fut1 = cache.getAndPutAsync("key1", 0);

        IgniteFuture<Integer> fut2 = cache.getAndPutAsync("key2", 1);

        assert fut1.get(5000) == null;
        assert fut2.get(5000) == null;
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testInvokeAsyncOld() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key2", 1);
        cache.put("key3", 3);

        IgniteCache<String, Integer> cacheAsync = cache.withAsync();

        assertNull(cacheAsync.invoke("key1", INCR_PROCESSOR));

        IgniteFuture<?> fut0 = cacheAsync.future();

        assertNull(cacheAsync.invoke("key2", INCR_PROCESSOR));

        IgniteFuture<?> fut1 = cacheAsync.future();

        assertNull(cacheAsync.invoke("key3", RMV_PROCESSOR));

        IgniteFuture<?> fut2 = cacheAsync.future();

        fut0.get();
        fut1.get();
        fut2.get();

        assertEquals((Integer)1, cache.get("key1"));
        assertEquals((Integer)2, cache.get("key2"));
        assertNull(cache.get("key3"));

        for (int i = 0; i < gridCount(); i++)
            assertNull(jcache(i).localPeek("key3", ONHEAP));
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testInvokeAsync() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key2", 1);
        cache.put("key3", 3);

        IgniteFuture<?> fut0 = cache.invokeAsync("key1", INCR_PROCESSOR);

        IgniteFuture<?> fut1 = cache.invokeAsync("key2", INCR_PROCESSOR);

        IgniteFuture<?> fut2 = cache.invokeAsync("key3", RMV_PROCESSOR);

        fut0.get();
        fut1.get();
        fut2.get();

        assertEquals((Integer)1, cache.get("key1"));
        assertEquals((Integer)2, cache.get("key2"));
        assertNull(cache.get("key3"));

        for (int i = 0; i < gridCount(); i++)
            assertNull(jcache(i).localPeek("key3", ONHEAP));
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testInvoke() throws Exception {
        final IgniteCache<String, Integer> cache = jcache();

        assertEquals("null", cache.invoke("k0", INCR_PROCESSOR));

        assertEquals((Integer)1, cache.get("k0"));

        assertEquals("1", cache.invoke("k0", INCR_PROCESSOR));

        assertEquals((Integer)2, cache.get("k0"));

        cache.put("k1", 1);

        assertEquals("1", cache.invoke("k1", INCR_PROCESSOR));

        assertEquals((Integer)2, cache.get("k1"));

        assertEquals("2", cache.invoke("k1", INCR_PROCESSOR));

        assertEquals((Integer)3, cache.get("k1"));

        EntryProcessor<String, Integer, Integer> c = new RemoveAndReturnNullEntryProcessor();

        assertNull(cache.invoke("k1", c));
        assertNull(cache.get("k1"));

        for (int i = 0; i < gridCount(); i++)
            assertNull(jcache(i).localPeek("k1", ONHEAP));

        final EntryProcessor<String, Integer, Integer> errProcessor = new FailedEntryProcessor();

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                cache.invoke("k1", errProcessor);

                return null;
            }
        }, EntryProcessorException.class, "Test entry processor exception.");
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testPutx() throws Exception {
        if (txShouldBeUsed())
            checkPut(true);
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testPutxNoTx() throws Exception {
        checkPut(false);
    }

    /**
     * @param inTx Whether to start transaction.
     * @throws Exception If failed.
     */
    private void checkPut(boolean inTx) throws Exception {
        Transaction tx = inTx ? transactions().txStart() : null;

        IgniteCache<String, Integer> cache = jcache();

        try {
            cache.put("key1", 1);
            cache.put("key2", 2);

            // Check inside transaction.
            assert cache.get("key1") == 1;
            assert cache.get("key2") == 2;

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        checkSize(F.asSet("key1", "key2"));

        // Check outside transaction.
        checkContainsKey(true, "key1");
        checkContainsKey(true, "key2");
        checkContainsKey(false, "wrong");

        assert cache.get("key1") == 1;
        assert cache.get("key2") == 2;
        assert cache.get("wrong") == null;
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testPutAsyncOld() throws Exception {
        Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;

        IgniteCache<String, Integer> cacheAsync = jcache().withAsync();

        try {
            jcache().put("key2", 1);

            cacheAsync.put("key1", 10);

            IgniteFuture<?> fut1 = cacheAsync.future();

            cacheAsync.put("key2", 11);

            IgniteFuture<?> fut2 = cacheAsync.future();

            IgniteFuture<Transaction> f = null;

            if (tx != null) {
                tx = (Transaction)tx.withAsync();

                tx.commit();

                f = tx.future();
            }

            assertNull(fut1.get());
            assertNull(fut2.get());

            assert f == null || f.get().state() == COMMITTED;
        }
        finally {
            if (tx != null)
                tx.close();
        }

        checkSize(F.asSet("key1", "key2"));

        assert jcache().get("key1") == 10;
        assert jcache().get("key2") == 11;
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testPutAsync() throws Exception {
        Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;

        try {
            jcache().put("key2", 1);

            IgniteFuture<?> fut1 = jcache().putAsync("key1", 10);

            IgniteFuture<?> fut2 = jcache().putAsync("key2", 11);

            IgniteFuture<Void> f = null;

            if (tx != null)
                f = tx.commitAsync();

            assertNull(fut1.get());
            assertNull(fut2.get());

            try {
                if (f != null)
                    f.get();
            } catch (Throwable t) {
                assert false : "Unexpected exception " + t;
            }
        }
        finally {
            if (tx != null)
                tx.close();
        }

        checkSize(F.asSet("key1", "key2"));

        assert jcache().get("key1") == 10;
        assert jcache().get("key2") == 11;
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testPutAll() throws Exception {
        Map<String, Integer> map = F.asMap("key1", 1, "key2", 2);

        IgniteCache<String, Integer> cache = jcache();

        cache.putAll(map);

        checkSize(F.asSet("key1", "key2"));

        assert cache.get("key1") == 1;
        assert cache.get("key2") == 2;

        map.put("key1", 10);
        map.put("key2", 20);

        cache.putAll(map);

        checkSize(F.asSet("key1", "key2"));

        assert cache.get("key1") == 10;
        assert cache.get("key2") == 20;
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testNullInTx() throws Exception {
        if (!txShouldBeUsed())
            return;

        final IgniteCache<String, Integer> cache = jcache();

        for (int i = 0; i < 100; i++) {
            final String key = "key-" + i;

            assertNull(cache.get(key));

            GridTestUtils.assertThrows(log, new Callable<Void>() {
                @Override public Void call() throws Exception {
                    IgniteTransactions txs = transactions();

                    try (Transaction tx = txs.txStart()) {
                        cache.put(key, 1);

                        cache.put(null, 2);

                        tx.commit();
                    }

                    return null;
                }
            }, NullPointerException.class, null);

            assertNull(cache.get(key));

            cache.put(key, 1);

            assertEquals(1, (int)cache.get(key));

            GridTestUtils.assertThrows(log, new Callable<Void>() {
                @Override public Void call() throws Exception {
                    IgniteTransactions txs = transactions();

                    try (Transaction tx = txs.txStart()) {
                        cache.put(key, 2);

                        cache.remove(null);

                        tx.commit();
                    }

                    return null;
                }
            }, NullPointerException.class, null);

            assertEquals(1, (int)cache.get(key));

            cache.put(key, 2);

            assertEquals(2, (int)cache.get(key));

            GridTestUtils.assertThrows(log, new Callable<Void>() {
                @Override public Void call() throws Exception {
                    IgniteTransactions txs = transactions();

                    Map<String, Integer> map = new LinkedHashMap<>();

                    map.put("k1", 1);
                    map.put("k2", 2);
                    map.put(null, 3);

                    try (Transaction tx = txs.txStart()) {
                        cache.put(key, 1);

                        cache.putAll(map);

                        tx.commit();
                    }

                    return null;
                }
            }, NullPointerException.class, null);

            assertNull(cache.get("k1"));
            assertNull(cache.get("k2"));

            assertEquals(2, (int)cache.get(key));

            cache.put(key, 3);

            assertEquals(3, (int)cache.get(key));
        }
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testPutAllWithNulls() throws Exception {
        final IgniteCache<String, Integer> cache = jcache();

        {
            final Map<String, Integer> m = new LinkedHashMap<>(2);

            m.put("key1", 1);
            m.put(null, 2);

            GridTestUtils.assertThrows(log, new Callable<Void>() {
                @Override public Void call() throws Exception {
                    cache.putAll(m);

                    return null;
                }
            }, NullPointerException.class, null);

            cache.put("key1", 1);

            assertEquals(1, (int)cache.get("key1"));
        }

        {
            final Map<String, Integer> m = new LinkedHashMap<>(2);

            m.put("key3", 3);
            m.put("key4", null);

            GridTestUtils.assertThrows(log, new Callable<Void>() {
                @Override public Void call() throws Exception {
                    cache.putAll(m);

                    return null;
                }
            }, NullPointerException.class, null);

            m.put("key4", 4);

            cache.putAll(m);

            assertEquals(3, (int)cache.get("key3"));
            assertEquals(4, (int)cache.get("key4"));
        }

        assertThrows(log, new Callable<Object>() {
            @Nullable @Override public Object call() throws Exception {
                cache.put("key1", null);

                return null;
            }
        }, NullPointerException.class, A.NULL_MSG_PREFIX);

        assertThrows(log, new Callable<Object>() {
            @Nullable @Override public Object call() throws Exception {
                cache.getAndPut("key1", null);

                return null;
            }
        }, NullPointerException.class, A.NULL_MSG_PREFIX);

        assertThrows(log, new Callable<Object>() {
            @Nullable @Override public Object call() throws Exception {
                cache.put(null, 1);

                return null;
            }
        }, NullPointerException.class, A.NULL_MSG_PREFIX);

        assertThrows(log, new Callable<Object>() {
            @Nullable @Override public Object call() throws Exception {
                cache.replace(null, 1);

                return null;
            }
        }, NullPointerException.class, A.NULL_MSG_PREFIX);

        assertThrows(log, new Callable<Object>() {
            @Nullable @Override public Object call() throws Exception {
                cache.getAndReplace(null, 1);

                return null;
            }
        }, NullPointerException.class, A.NULL_MSG_PREFIX);

        assertThrows(log, new Callable<Object>() {
            @Nullable @Override public Object call() throws Exception {
                cache.replace("key", null);

                return null;
            }
        }, NullPointerException.class, A.NULL_MSG_PREFIX);

        assertThrows(log, new Callable<Object>() {
            @Nullable @Override public Object call() throws Exception {
                cache.getAndReplace("key", null);

                return null;
            }
        }, NullPointerException.class, A.NULL_MSG_PREFIX);

        assertThrows(log, new Callable<Object>() {
            @Nullable @Override public Object call() throws Exception {
                cache.replace(null, 1, 2);

                return null;
            }
        }, NullPointerException.class, A.NULL_MSG_PREFIX);

        assertThrows(log, new Callable<Object>() {
            @Nullable @Override public Object call() throws Exception {
                cache.replace("key", null, 2);

                return null;
            }
        }, NullPointerException.class, A.NULL_MSG_PREFIX);

        assertThrows(log, new Callable<Object>() {
            @Nullable @Override public Object call() throws Exception {
                cache.replace("key", 1, null);

                return null;
            }
        }, NullPointerException.class, A.NULL_MSG_PREFIX);
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testPutAllAsyncOld() throws Exception {
        Map<String, Integer> map = F.asMap("key1", 1, "key2", 2);

        IgniteCache<String, Integer> cache = jcache();

        IgniteCache<String, Integer> cacheAsync = cache.withAsync();

        cacheAsync.putAll(map);

        IgniteFuture<?> f1 = cacheAsync.future();

        map.put("key1", 10);
        map.put("key2", 20);

        cacheAsync.putAll(map);

        IgniteFuture<?> f2 = cacheAsync.future();

        assertNull(f2.get());
        assertNull(f1.get());

        checkSize(F.asSet("key1", "key2"));

        assert cache.get("key1") == 10;
        assert cache.get("key2") == 20;
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testPutAllAsync() throws Exception {
        Map<String, Integer> map = F.asMap("key1", 1, "key2", 2);

        IgniteCache<String, Integer> cache = jcache();

        IgniteFuture<?> f1 = cache.putAllAsync(map);

        map.put("key1", 10);
        map.put("key2", 20);

        IgniteFuture<?> f2 = cache.putAllAsync(map);

        assertNull(f2.get());
        assertNull(f1.get());

        checkSize(F.asSet("key1", "key2"));

        assert cache.get("key1") == 10;
        assert cache.get("key2") == 20;
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGetAndPutIfAbsent() throws Exception {
        Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;

        IgniteCache<String, Integer> cache = jcache();

        try {
            assert cache.getAndPutIfAbsent("key", 1) == null;

            assert cache.get("key") != null;
            assert cache.get("key") == 1;

            assert cache.getAndPutIfAbsent("key", 2) != null;
            assert cache.getAndPutIfAbsent("key", 2) == 1;

            assert cache.get("key") != null;
            assert cache.get("key") == 1;

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        assert cache.getAndPutIfAbsent("key", 2) != null;

        for (int i = 0; i < gridCount(); i++) {
            info("Peek on node [i=" + i + ", id=" + grid(i).localNode().id() + ", val=" +
                grid(i).cache(DEFAULT_CACHE_NAME).localPeek("key", ONHEAP) + ']');
        }

        assertEquals((Integer)1, cache.getAndPutIfAbsent("key", 2));

        assert cache.get("key") != null;
        assert cache.get("key") == 1;

        // Check swap.
        cache.put("key2", 1);

        cache.localEvict(Collections.singleton("key2"));

        assertEquals((Integer)1, cache.getAndPutIfAbsent("key2", 3));

        // Check db.
        if (!isMultiJvm()) {
            storeStgy.putToStore("key3", 3);

            assertEquals((Integer)3, cache.getAndPutIfAbsent("key3", 4));

            assertEquals((Integer)3, cache.get("key3"));
        }

        assertEquals((Integer)1, cache.get("key2"));

        cache.localEvict(Collections.singleton("key2"));

        // Same checks inside tx.
        tx = txShouldBeUsed() ? transactions().txStart() : null;

        try {
            assertEquals((Integer)1, cache.getAndPutIfAbsent("key2", 3));

            if (tx != null)
                tx.commit();

            assertEquals((Integer)1, cache.get("key2"));
        }
        finally {
            if (tx != null)
                tx.close();
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testGetAndPutIfAbsentAsyncOld() throws Exception {
        Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;

        IgniteCache<String, Integer> cache = jcache();

        IgniteCache<String, Integer> cacheAsync = cache.withAsync();

        try {
            cacheAsync.getAndPutIfAbsent("key", 1);

            IgniteFuture<Integer> fut1 = cacheAsync.future();

            assertNull(fut1.get());
            assertEquals((Integer)1, cache.get("key"));

            cacheAsync.getAndPutIfAbsent("key", 2);

            IgniteFuture<Integer> fut2 = cacheAsync.future();

            assertEquals((Integer)1, fut2.get());
            assertEquals((Integer)1, cache.get("key"));

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        // Check swap.
        cache.put("key2", 1);

        cache.localEvict(Collections.singleton("key2"));

        cacheAsync.getAndPutIfAbsent("key2", 3);

        assertEquals((Integer)1, cacheAsync.<Integer>future().get());

        // Check db.
        if (!isMultiJvm()) {
            storeStgy.putToStore("key3", 3);

            cacheAsync.getAndPutIfAbsent("key3", 4);

            assertEquals((Integer)3, cacheAsync.<Integer>future().get());
        }

        cache.localEvict(Collections.singleton("key2"));

        // Same checks inside tx.
        tx = txShouldBeUsed() ? transactions().txStart() : null;

        try {
            cacheAsync.getAndPutIfAbsent("key2", 3);

            assertEquals(1, cacheAsync.future().get());

            if (tx != null)
                tx.commit();

            assertEquals((Integer)1, cache.get("key2"));
        }
        finally {
            if (tx != null)
                tx.close();
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testGetAndPutIfAbsentAsync() throws Exception {
        Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;

        IgniteCache<String, Integer> cache = jcache();

        try {
            IgniteFuture<Integer> fut1 = cache.getAndPutIfAbsentAsync("key", 1);

            assertNull(fut1.get());
            assertEquals((Integer)1, cache.get("key"));

            IgniteFuture<Integer> fut2 = cache.getAndPutIfAbsentAsync("key", 2);

            assertEquals((Integer)1, fut2.get());
            assertEquals((Integer)1, cache.get("key"));

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        // Check swap.
        cache.put("key2", 1);

        cache.localEvict(Collections.singleton("key2"));

        assertEquals((Integer)1, cache.getAndPutIfAbsentAsync("key2", 3).get());

        // Check db.
        if (!isMultiJvm()) {
            storeStgy.putToStore("key3", 3);

            assertEquals((Integer)3, cache.getAndPutIfAbsentAsync("key3", 4).get());
        }

        cache.localEvict(Collections.singleton("key2"));

        // Same checks inside tx.
        tx = txShouldBeUsed() ? transactions().txStart() : null;

        try {
            assertEquals(1, (int)cache.getAndPutIfAbsentAsync("key2", 3).get());

            if (tx != null)
                tx.commit();

            assertEquals((Integer)1, cache.get("key2"));
        }
        finally {
            if (tx != null)
                tx.close();
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testPutIfAbsent() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        assertNull(cache.get("key"));
        assert cache.putIfAbsent("key", 1);
        assert cache.get("key") != null && cache.get("key") == 1;
        assert !cache.putIfAbsent("key", 2);
        assert cache.get("key") != null && cache.get("key") == 1;

        // Check swap.
        cache.put("key2", 1);

        cache.localEvict(Collections.singleton("key2"));

        assertFalse(cache.putIfAbsent("key2", 3));

        // Check db.
        if (!isMultiJvm()) {
            storeStgy.putToStore("key3", 3);

            assertFalse(cache.putIfAbsent("key3", 4));
        }

        cache.localEvict(Collections.singleton("key2"));

        // Same checks inside tx.
        Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;

        try {
            assertFalse(cache.putIfAbsent("key2", 3));

            if (tx != null)
                tx.commit();

            assertEquals((Integer)1, cache.get("key2"));
        }
        finally {
            if (tx != null)
                tx.close();
        }
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testPutxIfAbsentAsync() throws Exception {
        if (txShouldBeUsed())
            checkPutxIfAbsentAsync(true);
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testPutxIfAbsentAsyncNoTx() throws Exception {
        checkPutxIfAbsentAsync(false);
    }

    /**
     * @param inTx In tx flag.
     * @throws Exception If failed.
     */
    private void checkPutxIfAbsentAsyncOld(boolean inTx) throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        IgniteCache<String, Integer> cacheAsync = cache.withAsync();

        cacheAsync.putIfAbsent("key", 1);

        IgniteFuture<Boolean> fut1 = cacheAsync.future();

        assert fut1.get();
        assert cache.get("key") != null && cache.get("key") == 1;

        cacheAsync.putIfAbsent("key", 2);

        IgniteFuture<Boolean> fut2 = cacheAsync.future();

        assert !fut2.get();
        assert cache.get("key") != null && cache.get("key") == 1;

        // Check swap.
        cache.put("key2", 1);

        cache.localEvict(Collections.singleton("key2"));

        cacheAsync.putIfAbsent("key2", 3);

        assertFalse(cacheAsync.<Boolean>future().get());

        // Check db.
        if (!isMultiJvm()) {
            storeStgy.putToStore("key3", 3);

            cacheAsync.putIfAbsent("key3", 4);

            assertFalse(cacheAsync.<Boolean>future().get());
        }

        cache.localEvict(Collections.singletonList("key2"));

        // Same checks inside tx.
        Transaction tx = inTx ? transactions().txStart() : null;

        try {
            cacheAsync.putIfAbsent("key2", 3);

            assertFalse(cacheAsync.<Boolean>future().get());

            if (!isMultiJvm()) {
                cacheAsync.putIfAbsent("key3", 4);

                assertFalse(cacheAsync.<Boolean>future().get());
            }

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        assertEquals((Integer)1, cache.get("key2"));

        if (!isMultiJvm())
            assertEquals((Integer)3, cache.get("key3"));
    }

    /**
     * @param inTx In tx flag.
     * @throws Exception If failed.
     */
    private void checkPutxIfAbsentAsync(boolean inTx) throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        IgniteFuture<Boolean> fut1 = cache.putIfAbsentAsync("key", 1);

        assert fut1.get();
        assert cache.get("key") != null && cache.get("key") == 1;

        IgniteFuture<Boolean> fut2 = cache.putIfAbsentAsync("key", 2);

        assert !fut2.get();
        assert cache.get("key") != null && cache.get("key") == 1;

        // Check swap.
        cache.put("key2", 1);

        cache.localEvict(Collections.singleton("key2"));

        assertFalse(cache.putIfAbsentAsync("key2", 3).get());

        // Check db.
        if (!isMultiJvm()) {
            storeStgy.putToStore("key3", 3);

            assertFalse(cache.putIfAbsentAsync("key3", 4).get());
        }

        cache.localEvict(Collections.singletonList("key2"));

        // Same checks inside tx.
        Transaction tx = inTx ? transactions().txStart() : null;

        try {
            assertFalse(cache.putIfAbsentAsync("key2", 3).get());

            if (!isMultiJvm())
                assertFalse(cache.putIfAbsentAsync("key3", 4).get());

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        assertEquals((Integer)1, cache.get("key2"));

        if (!isMultiJvm())
            assertEquals((Integer)3, cache.get("key3"));
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testPutIfAbsentAsyncConcurrentOld() throws Exception {
        IgniteCache<String, Integer> cacheAsync = jcache().withAsync();

        cacheAsync.putIfAbsent("key1", 1);

        IgniteFuture<Boolean> fut1 = cacheAsync.future();

        cacheAsync.putIfAbsent("key2", 2);

        IgniteFuture<Boolean> fut2 = cacheAsync.future();

        assert fut1.get();
        assert fut2.get();
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testPutIfAbsentAsyncConcurrent() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        IgniteFuture<Boolean> fut1 = cache.putIfAbsentAsync("key1", 1);

        IgniteFuture<Boolean> fut2 = cache.putIfAbsentAsync("key2", 2);

        assert fut1.get();
        assert fut2.get();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testGetAndReplace() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key", 1);

        assert cache.get("key") == 1;

        info("key 1 -> 2");

        assert cache.getAndReplace("key", 2) == 1;

        assert cache.get("key") == 2;

        assert cache.getAndReplace("wrong", 0) == null;

        assert cache.get("wrong") == null;

        info("key 0 -> 3");

        assert !cache.replace("key", 0, 3);

        assert cache.get("key") == 2;

        info("key 0 -> 3");

        assert !cache.replace("key", 0, 3);

        assert cache.get("key") == 2;

        info("key 2 -> 3");

        assert cache.replace("key", 2, 3);

        assert cache.get("key") == 3;

        info("evict key");

        cache.localEvict(Collections.singleton("key"));

        info("key 3 -> 4");

        assert cache.replace("key", 3, 4);

        assert cache.get("key") == 4;

        if (!isMultiJvm()) {
            storeStgy.putToStore("key2", 5);

            info("key2 5 -> 6");

            assert cache.replace("key2", 5, 6);
        }

        for (int i = 0; i < gridCount(); i++) {
            info("Peek key on grid [i=" + i + ", nodeId=" + grid(i).localNode().id() +
                ", peekVal=" + grid(i).cache(DEFAULT_CACHE_NAME).localPeek("key", ONHEAP) + ']');

            info("Peek key2 on grid [i=" + i + ", nodeId=" + grid(i).localNode().id() +
                ", peekVal=" + grid(i).cache(DEFAULT_CACHE_NAME).localPeek("key2", ONHEAP) + ']');
        }

        if (!isMultiJvm())
            assertEquals((Integer)6, cache.get("key2"));

        cache.localEvict(Collections.singleton("key"));

        Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;

        try {
            assert cache.replace("key", 4, 5);

            if (tx != null)
                tx.commit();

            assert cache.get("key") == 5;
        }
        finally {
            if (tx != null)
                tx.close();
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testReplace() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key", 1);

        assert cache.get("key") == 1;

        assert cache.replace("key", 2);

        assert cache.get("key") == 2;

        assert !cache.replace("wrong", 2);

        cache.localEvict(Collections.singleton("key"));

        assert cache.replace("key", 4);

        assert cache.get("key") == 4;

        if (!isMultiJvm()) {
            storeStgy.putToStore("key2", 5);

            assert cache.replace("key2", 6);

            assertEquals((Integer)6, cache.get("key2"));
        }

        cache.localEvict(Collections.singleton("key"));

        Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;

        try {
            assert cache.replace("key", 5);

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        assert cache.get("key") == 5;
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testGetAndReplaceAsyncOld() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        IgniteCache<String, Integer> cacheAsync = cache.withAsync();

        cache.put("key", 1);

        assert cache.get("key") == 1;

        cacheAsync.getAndReplace("key", 2);

        assert cacheAsync.<Integer>future().get() == 1;

        assert cache.get("key") == 2;

        cacheAsync.getAndReplace("wrong", 0);

        assert cacheAsync.future().get() == null;

        assert cache.get("wrong") == null;

        cacheAsync.replace("key", 0, 3);

        assert !cacheAsync.<Boolean>future().get();

        assert cache.get("key") == 2;

        cacheAsync.replace("key", 0, 3);

        assert !cacheAsync.<Boolean>future().get();

        assert cache.get("key") == 2;

        cacheAsync.replace("key", 2, 3);

        assert cacheAsync.<Boolean>future().get();

        assert cache.get("key") == 3;

        cache.localEvict(Collections.singleton("key"));

        cacheAsync.replace("key", 3, 4);

        assert cacheAsync.<Boolean>future().get();

        assert cache.get("key") == 4;

        if (!isMultiJvm()) {
            storeStgy.putToStore("key2", 5);

            cacheAsync.replace("key2", 5, 6);

            assert cacheAsync.<Boolean>future().get();

            assertEquals((Integer)6, cache.get("key2"));
        }

        cache.localEvict(Collections.singleton("key"));

        Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;

        try {
            cacheAsync.replace("key", 4, 5);

            assert cacheAsync.<Boolean>future().get();

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        assert cache.get("key") == 5;
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testGetAndReplaceAsync() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key", 1);

        assert cache.get("key") == 1;

        assert cache.getAndReplaceAsync("key", 2).get() == 1;

        assert cache.get("key") == 2;

        assert cache.getAndReplaceAsync("wrong", 0).get() == null;

        assert cache.get("wrong") == null;

        assert !cache.replaceAsync("key", 0, 3).get();

        assert cache.get("key") == 2;

        assert !cache.replaceAsync("key", 0, 3).get();

        assert cache.get("key") == 2;

        assert cache.replaceAsync("key", 2, 3).get();

        assert cache.get("key") == 3;

        cache.localEvict(Collections.singleton("key"));

        assert cache.replaceAsync("key", 3, 4).get();

        assert cache.get("key") == 4;

        if (!isMultiJvm()) {
            storeStgy.putToStore("key2", 5);

            assert cache.replaceAsync("key2", 5, 6).get();

            assertEquals((Integer)6, cache.get("key2"));
        }

        cache.localEvict(Collections.singleton("key"));

        Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;

        try {
            assert cache.replaceAsync("key", 4, 5).get();

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        assert cache.get("key") == 5;
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testReplacexAsyncOld() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        IgniteCache<String, Integer> cacheAsync = cache.withAsync();

        cache.put("key", 1);

        assert cache.get("key") == 1;

        cacheAsync.replace("key", 2);

        assert cacheAsync.<Boolean>future().get();

        info("Finished replace.");

        assertEquals((Integer)2, cache.get("key"));

        cacheAsync.replace("wrond", 2);

        assert !cacheAsync.<Boolean>future().get();

        cache.localEvict(Collections.singleton("key"));

        cacheAsync.replace("key", 4);

        assert cacheAsync.<Boolean>future().get();

        assert cache.get("key") == 4;

        if (!isMultiJvm()) {
            storeStgy.putToStore("key2", 5);

            cacheAsync.replace("key2", 6);

            assert cacheAsync.<Boolean>future().get();

            assert cache.get("key2") == 6;
        }

        cache.localEvict(Collections.singleton("key"));

        Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;

        try {
            cacheAsync.replace("key", 5);

            assert cacheAsync.<Boolean>future().get();

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        assert cache.get("key") == 5;
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testReplacexAsync() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key", 1);

        assert cache.get("key") == 1;

        assert cache.replaceAsync("key", 2).get();

        info("Finished replace.");

        assertEquals((Integer)2, cache.get("key"));

        assert !cache.replaceAsync("wrond", 2).get();

        cache.localEvict(Collections.singleton("key"));

        assert cache.replaceAsync("key", 4).get();

        assert cache.get("key") == 4;

        if (!isMultiJvm()) {
            storeStgy.putToStore("key2", 5);

            assert cache.replaceAsync("key2", 6).get();

            assert cache.get("key2") == 6;
        }

        cache.localEvict(Collections.singleton("key"));

        Transaction tx = txShouldBeUsed() ? transactions().txStart() : null;

        try {
            assert cache.replaceAsync("key", 5).get();

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        assert cache.get("key") == 5;
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGetAndRemove() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key1", 1);
        cache.put("key2", 2);

        assert !cache.remove("key1", 0);
        assert cache.get("key1") != null && cache.get("key1") == 1;
        assert cache.remove("key1", 1);
        assert cache.get("key1") == null;
        assert cache.getAndRemove("key2") == 2;
        assert cache.get("key2") == null;
        assert cache.getAndRemove("key2") == null;
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testGetAndRemoveObject() throws Exception {
        IgniteCache<String, TestValue> cache = ignite(0).cache(DEFAULT_CACHE_NAME);

        TestValue val1 = new TestValue(1);
        TestValue val2 = new TestValue(2);

        cache.put("key1", val1);
        cache.put("key2", val2);

        assert !cache.remove("key1", new TestValue(0));

        TestValue oldVal = cache.get("key1");

        assert oldVal != null && F.eq(val1, oldVal);

        assert cache.remove("key1");

        assert cache.get("key1") == null;

        TestValue oldVal2 = cache.getAndRemove("key2");

        assert F.eq(val2, oldVal2);

        assert cache.get("key2") == null;
        assert cache.getAndRemove("key2") == null;
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testGetAndPutObject() throws Exception {
        IgniteCache<String, TestValue> cache = ignite(0).cache(DEFAULT_CACHE_NAME);

        TestValue val1 = new TestValue(1);
        TestValue val2 = new TestValue(2);

        cache.put("key1", val1);

        TestValue oldVal = cache.get("key1");

        assertEquals(val1, oldVal);

        oldVal = cache.getAndPut("key1", val2);

        assertEquals(val1, oldVal);

        TestValue updVal = cache.get("key1");

        assertEquals(val2, updVal);
    }

    /**
     * TODO: GG-11241.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testDeletedEntriesFlag() throws Exception {
        if (cacheMode() != LOCAL && cacheMode() != REPLICATED) {
            final int cnt = 3;

            IgniteCache<String, Integer> cache = jcache();

            for (int i = 0; i < cnt; i++)
                cache.put(String.valueOf(i), i);

            for (int i = 0; i < cnt; i++)
                cache.remove(String.valueOf(i));

            for (int g = 0; g < gridCount(); g++)
                executeOnLocalOrRemoteJvm(g, new CheckEntriesDeletedTask(cnt));
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testRemoveLoad() throws Exception {
        int cnt = 10;

        Set<String> keys = new HashSet<>();

        for (int i = 0; i < cnt; i++)
            keys.add(String.valueOf(i));

        jcache().removeAll(keys);

        for (String key : keys)
            storeStgy.putToStore(key, Integer.parseInt(key));

        for (int g = 0; g < gridCount(); g++)
            grid(g).cache(DEFAULT_CACHE_NAME).localLoadCache(null);

        for (int g = 0; g < gridCount(); g++) {
            for (int i = 0; i < cnt; i++) {
                String key = String.valueOf(i);

                if (grid(0).affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(key).contains(grid(g).localNode()))
                    assertEquals((Integer)i, peek(jcache(g), key));
                else
                    assertNull(peek(jcache(g), key));
            }
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testRemoveLoadAsync() throws Exception {
        if (isMultiJvm())
            return;

        int cnt = 10;

        Set<String> keys = new HashSet<>();

        for (int i = 0; i < cnt; i++)
            keys.add(String.valueOf(i));

        jcache().removeAllAsync(keys).get();

        for (String key : keys)
            storeStgy.putToStore(key, Integer.parseInt(key));

        for (int g = 0; g < gridCount(); g++)
            grid(g).cache(DEFAULT_CACHE_NAME).localLoadCacheAsync(null).get();

        for (int g = 0; g < gridCount(); g++) {
            for (int i = 0; i < cnt; i++) {
                String key = String.valueOf(i);

                if (grid(0).affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(key).contains(grid(g).localNode()))
                    assertEquals((Integer)i, peek(jcache(g), key));
                else
                    assertNull(peek(jcache(g), key));
            }
        }
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testRemoveAsyncOld() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        IgniteCache<String, Integer> cacheAsync = cache.withAsync();

        cache.put("key1", 1);
        cache.put("key2", 2);

        cacheAsync.remove("key1", 0);

        assert !cacheAsync.<Boolean>future().get();

        assert cache.get("key1") != null && cache.get("key1") == 1;

        cacheAsync.remove("key1", 1);

        assert cacheAsync.<Boolean>future().get();

        assert cache.get("key1") == null;

        cacheAsync.getAndRemove("key2");

        assert cacheAsync.<Integer>future().get() == 2;

        assert cache.get("key2") == null;

        cacheAsync.getAndRemove("key2");

        assert cacheAsync.future().get() == null;
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testRemoveAsync() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key1", 1);
        cache.put("key2", 2);

        assert !cache.removeAsync("key1", 0).get();

        assert cache.get("key1") != null && cache.get("key1") == 1;

        assert cache.removeAsync("key1", 1).get();

        assert cache.get("key1") == null;

        assert cache.getAndRemoveAsync("key2").get() == 2;

        assert cache.get("key2") == null;

        assert cache.getAndRemoveAsync("key2").get() == null;
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testRemove() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key1", 1);

        assert cache.remove("key1");
        assert cache.get("key1") == null;
        assert !cache.remove("key1");
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testRemovexAsyncOld() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        IgniteCache<String, Integer> cacheAsync = cache.withAsync();

        cache.put("key1", 1);

        cacheAsync.remove("key1");

        assert cacheAsync.<Boolean>future().get();

        assert cache.get("key1") == null;

        cacheAsync.remove("key1");

        assert !cacheAsync.<Boolean>future().get();
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testRemovexAsync() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key1", 1);

        assert cache.removeAsync("key1").get();

        assert cache.get("key1") == null;

        assert !cache.removeAsync("key1").get();
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGlobalRemoveAll() throws Exception {
        globalRemoveAll(false);
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGlobalRemoveAllAsync() throws Exception {
        globalRemoveAll(true);
    }

    /**
     * @param async If {@code true} uses asynchronous operation.
     * @throws Exception In case of error.
     */
    private void globalRemoveAllOld(boolean async) throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key1", 1);
        cache.put("key2", 2);
        cache.put("key3", 3);

        checkSize(F.asSet("key1", "key2", "key3"));

        IgniteCache<String, Integer> asyncCache = cache.withAsync();

        if (async) {
            asyncCache.removeAll(F.asSet("key1", "key2"));

            asyncCache.future().get();
        }
        else
            cache.removeAll(F.asSet("key1", "key2"));

        checkSize(F.asSet("key3"));

        checkContainsKey(false, "key1");
        checkContainsKey(false, "key2");
        checkContainsKey(true, "key3");

        // Put values again.
        cache.put("key1", 1);
        cache.put("key2", 2);
        cache.put("key3", 3);

        if (async) {
            IgniteCache<String, Integer> asyncCache0 = jcache(gridCount() > 1 ? 1 : 0).withAsync();

            asyncCache0.removeAll();

            asyncCache0.future().get();
        }
        else
            jcache(gridCount() > 1 ? 1 : 0).removeAll();

        assertEquals(0, cache.localSize());
        long entryCnt = hugeRemoveAllEntryCount();

        for (int i = 0; i < entryCnt; i++)
            cache.put(String.valueOf(i), i);

        for (int i = 0; i < entryCnt; i++)
            assertEquals(Integer.valueOf(i), cache.get(String.valueOf(i)));

        if (async) {
            asyncCache.removeAll();

            asyncCache.future().get();
        }
        else
            cache.removeAll();

        for (int i = 0; i < entryCnt; i++)
            assertNull(cache.get(String.valueOf(i)));
    }

    /**
     * @param async If {@code true} uses asynchronous operation.
     * @throws Exception In case of error.
     */
    private void globalRemoveAll(boolean async) throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key1", 1);
        cache.put("key2", 2);
        cache.put("key3", 3);

        checkSize(F.asSet("key1", "key2", "key3"));

        if (async)
            cache.removeAllAsync(F.asSet("key1", "key2")).get();
        else
            cache.removeAll(F.asSet("key1", "key2"));

        checkSize(F.asSet("key3"));

        checkContainsKey(false, "key1");
        checkContainsKey(false, "key2");
        checkContainsKey(true, "key3");

        // Put values again.
        cache.put("key1", 1);
        cache.put("key2", 2);
        cache.put("key3", 3);

        if (async)
            jcache(gridCount() > 1 ? 1 : 0).removeAllAsync().get();
        else
            jcache(gridCount() > 1 ? 1 : 0).removeAll();

        assertEquals(0, cache.localSize());
        long entryCnt = hugeRemoveAllEntryCount();

        for (int i = 0; i < entryCnt; i++)
            cache.put(String.valueOf(i), i);

        for (int i = 0; i < entryCnt; i++)
            assertEquals(Integer.valueOf(i), cache.get(String.valueOf(i)));

        if (async)
            cache.removeAllAsync().get();
        else
            cache.removeAll();

        for (int i = 0; i < entryCnt; i++)
            assertNull(cache.get(String.valueOf(i)));
    }

    /**
     * @return Count of entries to be removed in removeAll() test.
     */
    protected long hugeRemoveAllEntryCount() {
        return 1000L;
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testRemoveAllWithNulls() throws Exception {
        final IgniteCache<String, Integer> cache = jcache();

        final Set<String> c = new LinkedHashSet<>();

        c.add("key1");
        c.add(null);

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                cache.removeAll(c);

                return null;
            }
        }, NullPointerException.class, null);

        assertEquals(0, grid(0).cache(DEFAULT_CACHE_NAME).localSize());

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                cache.removeAll(null);

                return null;
            }
        }, NullPointerException.class, null);

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                cache.remove(null);

                return null;
            }
        }, NullPointerException.class, null);

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                cache.getAndRemove(null);

                return null;
            }
        }, NullPointerException.class, null);

        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                cache.remove("key1", null);

                return null;
            }
        }, NullPointerException.class, null);
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testRemoveAllDuplicates() throws Exception {
        jcache().removeAll(ImmutableSet.of("key1", "key1", "key1"));
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testRemoveAllDuplicatesTx() throws Exception {
        if (txShouldBeUsed()) {
            try (Transaction tx = transactions().txStart()) {
                jcache().removeAll(ImmutableSet.of("key1", "key1", "key1"));

                tx.commit();
            }
        }
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testRemoveAllEmpty() throws Exception {
        jcache().removeAll();
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testRemoveAllAsyncOld() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        IgniteCache<String, Integer> cacheAsync = cache.withAsync();

        cache.put("key1", 1);
        cache.put("key2", 2);
        cache.put("key3", 3);

        checkSize(F.asSet("key1", "key2", "key3"));

        cacheAsync.removeAll(F.asSet("key1", "key2"));

        assertNull(cacheAsync.future().get());

        checkSize(F.asSet("key3"));

        checkContainsKey(false, "key1");
        checkContainsKey(false, "key2");
        checkContainsKey(true, "key3");
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testRemoveAllAsync() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key1", 1);
        cache.put("key2", 2);
        cache.put("key3", 3);

        checkSize(F.asSet("key1", "key2", "key3"));

        assertNull(cache.removeAllAsync(F.asSet("key1", "key2")).get());

        checkSize(F.asSet("key3"));

        checkContainsKey(false, "key1");
        checkContainsKey(false, "key2");
        checkContainsKey(true, "key3");
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testLoadAll() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        Set<String> keys = new HashSet<>(primaryKeysForCache(cache, 2));

        for (String key : keys)
            assertNull(cache.localPeek(key, ONHEAP));

        Map<String, Integer> vals = new HashMap<>();

        int i = 0;

        for (String key : keys) {
            cache.put(key, i);

            vals.put(key, i);

            i++;
        }

        for (String key : keys)
            assertEquals(vals.get(key), peek(cache, key));

        cache.clear();

        for (String key : keys)
            assertNull(peek(cache, key));

        loadAll(cache, keys, true);

        for (String key : keys)
            assertEquals(vals.get(key), peek(cache, key));
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testRemoveAfterClear() throws Exception {
        IgniteEx ignite = grid(0);

        boolean affNode = ignite.context().cache().internalCache(DEFAULT_CACHE_NAME).context().affinityNode();

        if (!affNode) {
            if (gridCount() < 2)
                return;

            ignite = grid(1);
        }

        IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);

        int key = 0;

        Collection<Integer> keys = new ArrayList<>();

        for (int k = 0; k < 2; k++) {
            while (!ignite.affinity(DEFAULT_CACHE_NAME).isPrimary(ignite.localNode(), key))
                key++;

            keys.add(key);

            key++;
        }

        info("Keys: " + keys);

        for (Integer k : keys)
            cache.put(k, k);

        cache.clear();

        for (int g = 0; g < gridCount(); g++) {
            Ignite grid0 = grid(g);

            grid0.cache(DEFAULT_CACHE_NAME).removeAll();

            assertTrue(grid0.cache(DEFAULT_CACHE_NAME).localSize() == 0);
        }
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testClear() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        Set<String> keys = new HashSet<>(primaryKeysForCache(cache, 3));

        for (String key : keys)
            assertNull(cache.get(key));

        Map<String, Integer> vals = new HashMap<>(keys.size());

        int i = 0;

        for (String key : keys) {
            cache.put(key, i);

            vals.put(key, i);

            i++;
        }

        for (String key : keys)
            assertEquals(vals.get(key), peek(cache, key));

        cache.clear();

        for (String key : keys)
            assertNull(peek(cache, key));

        for (i = 0; i < gridCount(); i++)
            jcache(i).clear();

        for (i = 0; i < gridCount(); i++)
            assert jcache(i).localSize() == 0;

        for (Map.Entry<String, Integer> entry : vals.entrySet())
            cache.put(entry.getKey(), entry.getValue());

        for (String key : keys)
            assertEquals(vals.get(key), peek(cache, key));

        String first = F.first(keys);

        if (lockingEnabled()) {
            Lock lock = cache.lock(first);

            lock.lock();

            try {
                cache.clear();

                GridCacheContext<String, Integer> cctx = context(0);

                GridCacheEntryEx entry = cctx.isNear() ? cctx.near().dht().peekEx(first) :
                    cctx.cache().peekEx(first);

                assertNotNull(entry);
            }
            finally {
                lock.unlock();
            }
        }
        else {
            cache.clear();

            cache.put(first, vals.get(first));
        }

        cache.clear();

        assert cache.localSize() == 0 : "Values after clear.";

        i = 0;

        for (String key : keys) {
            cache.put(key, i);

            vals.put(key, i);

            i++;
        }

        cache.put("key1", 1);
        cache.put("key2", 2);

        cache.localEvict(Sets.union(ImmutableSet.of("key1", "key2"), keys));

        assert cache.localSize(ONHEAP) == 0;

        cache.clear();

        assert cache.localPeek("key1", ONHEAP) == null;
        assert cache.localPeek("key2", ONHEAP) == null;
    }

    /**
     * @param keys0 Keys to check.
     * @throws IgniteCheckedException If failed.
     */
    protected void checkUnlocked(final Collection<String> keys0) throws IgniteCheckedException {
        GridTestUtils.waitForCondition(new GridAbsPredicate() {
            @Override public boolean apply() {
                try {
                    for (int i = 0; i < gridCount(); i++) {
                        GridCacheAdapter<Object, Object> cache = ((IgniteKernal)ignite(i)).internalCache(DEFAULT_CACHE_NAME);

                        for (String key : keys0) {
                            GridCacheEntryEx entry = cache.peekEx(key);

                            if (entry != null) {
                                if (entry.lockedByAny()) {
                                    info("Entry is still locked [i=" + i + ", entry=" + entry + ']');

                                    return false;
                                }
                            }

                            if (cache.isNear()) {
                                entry = cache.context().near().dht().peekEx(key);

                                if (entry != null) {
                                    if (entry.lockedByAny()) {
                                        info("Entry is still locked [i=" + i + ", entry=" + entry + ']');

                                        return false;
                                    }
                                }
                            }
                        }
                    }

                    return true;
                }
                catch (GridCacheEntryRemovedException ignore) {
                    info("Entry was removed, will retry");

                    return false;
                }
            }
        }, 10_000);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testGlobalClearAll() throws Exception {
        globalClearAll(false, false);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testGlobalClearAllAsyncOld() throws Exception {
        globalClearAll(true, true);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testGlobalClearAllAsync() throws Exception {
        globalClearAll(true, false);
    }

    /**
     * @param async If {@code true} uses async method.
     * @param oldAsync Use old async API.
     * @throws Exception If failed.
     */
    protected void globalClearAll(boolean async, boolean oldAsync) throws Exception {
        // Save entries only on their primary nodes. If we didn't do so, clearLocally() will not remove all entries
        // because some of them were blocked due to having readers.
        for (int i = 0; i < gridCount(); i++) {
            for (String key : primaryKeysForCache(jcache(i), 3, 100_000))
                jcache(i).put(key, 1);
        }

        if (async) {
            if (oldAsync) {
                IgniteCache<String, Integer> asyncCache = jcache().withAsync();

                asyncCache.clear();

                asyncCache.future().get();
            } else
                jcache().clearAsync().get();
        }
        else
            jcache().clear();

        for (int i = 0; i < gridCount(); i++)
            assert jcache(i).localSize() == 0;
    }

    /**
     * @throws Exception In case of error.
     */
    @SuppressWarnings("BusyWait")
    @Test
    public void testLockUnlock() throws Exception {
        if (lockingEnabled()) {
            final CountDownLatch lockCnt = new CountDownLatch(1);
            final CountDownLatch unlockCnt = new CountDownLatch(1);

            grid(0).events().localListen(new IgnitePredicate<Event>() {
                @Override public boolean apply(Event evt) {
                    switch (evt.type()) {
                        case EVT_CACHE_OBJECT_LOCKED:
                            lockCnt.countDown();

                            break;
                        case EVT_CACHE_OBJECT_UNLOCKED:
                            unlockCnt.countDown();

                            break;
                    }

                    return true;
                }
            }, EVT_CACHE_OBJECT_LOCKED, EVT_CACHE_OBJECT_UNLOCKED);

            IgniteCache<String, Integer> cache = jcache();

            String key = primaryKeysForCache(cache, 1).get(0);

            cache.put(key, 1);

            assert !cache.isLocalLocked(key, false);

            Lock lock = cache.lock(key);

            lock.lock();

            try {
                lockCnt.await();

                assert cache.isLocalLocked(key, false);
            }
            finally {
                lock.unlock();
            }

            unlockCnt.await();

            for (int i = 0; i < 100; i++)
                if (cache.isLocalLocked(key, false))
                    Thread.sleep(10);
                else
                    break;

            assert !cache.isLocalLocked(key, false);
        }
    }

    /**
     * @throws Exception In case of error.
     */
    @SuppressWarnings("BusyWait")
    @Test
    public void testLockUnlockAll() throws Exception {
        if (lockingEnabled()) {
            IgniteCache<String, Integer> cache = jcache();

            cache.put("key1", 1);
            cache.put("key2", 2);

            assert !cache.isLocalLocked("key1", false);
            assert !cache.isLocalLocked("key2", false);

            Lock lock1_2 = cache.lockAll(ImmutableSet.of("key1", "key2"));

            lock1_2.lock();

            try {
                assert cache.isLocalLocked("key1", false);
                assert cache.isLocalLocked("key2", false);
            }
            finally {
                lock1_2.unlock();
            }

            for (int i = 0; i < 100; i++)
                if (cache.isLocalLocked("key1", false) || cache.isLocalLocked("key2", false))
                    Thread.sleep(10);
                else
                    break;

            assert !cache.isLocalLocked("key1", false);
            assert !cache.isLocalLocked("key2", false);

            lock1_2.lock();

            try {
                assert cache.isLocalLocked("key1", false);
                assert cache.isLocalLocked("key2", false);
            }
            finally {
                lock1_2.unlock();
            }

            for (int i = 0; i < 100; i++)
                if (cache.isLocalLocked("key1", false) || cache.isLocalLocked("key2", false))
                    Thread.sleep(10);
                else
                    break;

            assert !cache.isLocalLocked("key1", false);
            assert !cache.isLocalLocked("key2", false);
        }
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testPeek() throws Exception {
        Ignite ignite = primaryIgnite("key");
        IgniteCache<String, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);

        assert peek(cache, "key") == null;

        cache.put("key", 1);

        cache.replace("key", 2);

        assertEquals(2, peek(cache, "key").intValue());
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testPeekTxRemoveOptimistic() throws Exception {
        checkPeekTxRemove(OPTIMISTIC);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testPeekTxRemovePessimistic() throws Exception {
        checkPeekTxRemove(PESSIMISTIC);
    }

    /**
     * @param concurrency Concurrency.
     * @throws Exception If failed.
     */
    private void checkPeekTxRemove(TransactionConcurrency concurrency) throws Exception {
        if (txShouldBeUsed()) {
            Ignite ignite = primaryIgnite("key");
            IgniteCache<String, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME).withAllowAtomicOpsInTx();

            cache.put("key", 1);

            try (Transaction tx = ignite.transactions().txStart(concurrency, READ_COMMITTED)) {
                cache.remove("key");

                assertNull(cache.get("key")); // localPeek ignores transactions.
                assertNotNull(peek(cache, "key")); // localPeek ignores transactions.

                tx.commit();
            }
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testPeekRemove() throws Exception {
        IgniteCache<String, Integer> cache = primaryCache("key");

        cache.put("key", 1);
        cache.remove("key");

        assertNull(peek(cache, "key"));
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testEvictExpired() throws Exception {
        final IgniteCache<String, Integer> cache = jcache(0);

        final String key = primaryKeysForCache(cache, 1).get(0);

        cache.put(key, 1);

        assertEquals((Integer)1, cache.get(key));

        long ttl = 500;

        final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl));

        grid(0).cache(DEFAULT_CACHE_NAME).withExpiryPolicy(expiry).put(key, 1);

        final Affinity<String> aff = ignite(0).affinity(DEFAULT_CACHE_NAME);

        boolean wait = waitForCondition(new GridAbsPredicate() {
            @Override public boolean apply() {
                for (int i = 0; i < gridCount(); i++) {
                    if (peek(jcache(i), key) != null)
                        return false;
                }

                return true;
            }
        }, ttl + 1000);

        assertTrue("Failed to wait for entry expiration.", wait);

        // Expired entry should not be swapped.
        cache.localEvict(Collections.singleton(key));

        assertNull(peek(cache, "key"));

        assertNull(cache.localPeek(key, ONHEAP));

        assertTrue(cache.localSize() == 0);

        load(cache, key, true);

        for (int i = 0; i < gridCount(); i++) {
            if (aff.isPrimary(grid(i).cluster().localNode(), key))
                assertEquals((Integer)1, peek(jcache(i), key));

            if (aff.isBackup(grid(i).cluster().localNode(), key))
                assertEquals((Integer)1, peek(jcache(i), key));
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testPeekExpired() throws Exception {
        final IgniteCache<String, Integer> c = jcache();

        final String key = primaryKeysForCache(c, 1).get(0);

        info("Using key: " + key);

        c.put(key, 1);

        assertEquals(Integer.valueOf(1), peek(c, key));

        int ttl = 500;

        final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl));

        c.withExpiryPolicy(expiry).put(key, 1);

        Thread.sleep(ttl + 100);

        GridTestUtils.waitForCondition(new GridAbsPredicate() {
            @Override public boolean apply() {
                return peek(c, key) == null;
            }
        }, 2000);

        assert peek(c, key) == null;

        assert c.localSize() == 0 : "Cache is not empty.";
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testPeekExpiredTx() throws Exception {
        if (txShouldBeUsed()) {
            final IgniteCache<String, Integer> c = jcache();

            final String key = "1";
            int ttl = 500;

            try (Transaction tx = grid(0).transactions().txStart()) {
                final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl));

                grid(0).cache(DEFAULT_CACHE_NAME).withExpiryPolicy(expiry).put(key, 1);

                tx.commit();
            }

            GridTestUtils.waitForCondition(new GridAbsPredicate() {
                @Override public boolean apply() {
                    return peek(c, key) == null;
                }
            }, 2000);

            assertNull(peek(c, key));

            assert c.localSize() == 0;
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTtlTx() throws Exception {
        if (txShouldBeUsed())
            checkTtl(true, false);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTtlNoTx() throws Exception {
        checkTtl(false, false);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTtlNoTxOldEntry() throws Exception {
        checkTtl(false, true);
    }

    /**
     * @param inTx In tx flag.
     * @param oldEntry {@code True} to check TTL on old entry, {@code false} on new.
     * @throws Exception If failed.
     */
    private void checkTtl(boolean inTx, boolean oldEntry) throws Exception {
        int ttl = 4000;

        final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl));

        final IgniteCache<String, Integer> c = jcache();

        final String key = primaryKeysForCache(jcache(), 1).get(0);

        IgnitePair<Long> entryTtl;

        if (oldEntry) {
            c.put(key, 1);

            entryTtl = entryTtl(fullCache(), key);

            assertNotNull(entryTtl.get1());
            assertNotNull(entryTtl.get2());
            assertEquals((Long)0L, entryTtl.get1());
            assertEquals((Long)0L, entryTtl.get2());
        }

        long startTime = System.currentTimeMillis();

        if (inTx) {
            // Rollback transaction for the first time.
            Transaction tx = transactions().txStart();

            try {
                jcache().withExpiryPolicy(expiry).put(key, 1);
            }
            finally {
                tx.rollback();
            }

            if (oldEntry) {
                entryTtl = entryTtl(fullCache(), key);

                assertEquals((Long)0L, entryTtl.get1());
                assertEquals((Long)0L, entryTtl.get2());
            }
        }

        // Now commit transaction and check that ttl and expire time have been saved.
        Transaction tx = inTx ? transactions().txStart() : null;

        try {
            jcache().withExpiryPolicy(expiry).put(key, 1);

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        long[] expireTimes = new long[gridCount()];

        for (int i = 0; i < gridCount(); i++) {
            if (grid(i).affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(grid(i).localNode(), key)) {
                IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key);

                assertNotNull(curEntryTtl.get1());
                assertNotNull(curEntryTtl.get2());
                assertTrue(curEntryTtl.get2() > startTime);

                expireTimes[i] = curEntryTtl.get2();
            }
        }

        // One more update from the same cache entry to ensure that expire time is shifted forward.
        U.sleep(100);

        tx = inTx ? transactions().txStart() : null;

        try {
            jcache().withExpiryPolicy(expiry).put(key, 2);

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        for (int i = 0; i < gridCount(); i++) {
            if (grid(i).affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(grid(i).localNode(), key)) {
                IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key);

                assertNotNull(curEntryTtl.get1());
                assertNotNull(curEntryTtl.get2());
                assertTrue(curEntryTtl.get2() > startTime);

                expireTimes[i] = curEntryTtl.get2();
            }
        }

        // And one more direct update to ensure that expire time is shifted forward.
        U.sleep(100);

        tx = inTx ? transactions().txStart() : null;

        try {
            jcache().withExpiryPolicy(expiry).put(key, 3);

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        for (int i = 0; i < gridCount(); i++) {
            if (grid(i).affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(grid(i).localNode(), key)) {
                IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key);

                assertNotNull(curEntryTtl.get1());
                assertNotNull(curEntryTtl.get2());
                assertTrue(curEntryTtl.get2() > startTime);

                expireTimes[i] = curEntryTtl.get2();
            }
        }

        // And one more update to ensure that ttl is not changed and expire time is not shifted forward.
        U.sleep(100);

        log.info("Put 4");

        tx = inTx ? transactions().txStart() : null;

        try {
            jcache().put(key, 4);

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        log.info("Put 4 done");

        for (int i = 0; i < gridCount(); i++) {
            if (grid(i).affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(grid(i).localNode(), key)) {
                IgnitePair<Long> curEntryTtl = entryTtl(jcache(i), key);

                assertNotNull(curEntryTtl.get1());
                assertNotNull(curEntryTtl.get2());
                assertEquals(expireTimes[i], (long)curEntryTtl.get2());
            }
        }

        // Avoid reloading from store.
        storeStgy.removeFromStore(key);

        assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() {
            @Override public boolean applyx() {
                try {
                    Integer val = c.get(key);

                    if (val != null) {
                        info("Value is in cache [key=" + key + ", val=" + val + ']');

                        return false;
                    }

                    // Get "cache" field from GridCacheProxyImpl.
                    GridCacheAdapter c0 = cacheFromCtx(c);

                    if (!c0.context().deferredDelete()) {
                        GridCacheEntryEx e0 = c0.peekEx(key);

                        return e0 == null || (e0.rawGet() == null && e0.valueBytes() == null);
                    }
                    else
                        return true;
                }
                catch (GridCacheEntryRemovedException e) {
                    throw new RuntimeException(e);
                }
            }
        }, Math.min(ttl * 10, getTestTimeout())));

        IgniteCache fullCache = fullCache();

        if (!isMultiJvmObject(fullCache)) {
            GridCacheAdapter internalCache = internalCache(fullCache);

            if (internalCache.isLocal())
                return;
        }

        assert c.get(key) == null;

        // Ensure that old TTL and expire time are not longer "visible".
        entryTtl = entryTtl(fullCache(), key);

        assertNotNull(entryTtl.get1());
        assertNotNull(entryTtl.get2());
        assertEquals(0, (long)entryTtl.get1());
        assertEquals(0, (long)entryTtl.get2());

        // Ensure that next update will not pick old expire time.

        tx = inTx ? transactions().txStart() : null;

        try {
            jcache().put(key, 10);

            if (tx != null)
                tx.commit();
        }
        finally {
            if (tx != null)
                tx.close();
        }

        U.sleep(2000);

        entryTtl = entryTtl(fullCache(), key);

        assertEquals((Integer)10, c.get(key));

        assertNotNull(entryTtl.get1());
        assertNotNull(entryTtl.get2());
        assertEquals(0, (long)entryTtl.get1());
        assertEquals(0, (long)entryTtl.get2());
    }

    /**
     * @throws Exception In case of error.
     */
    @Test(timeout = 10050000)
    public void testLocalEvict() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        List<String> keys = primaryKeysForCache(cache, 3);

        String key1 = keys.get(0);
        String key2 = keys.get(1);
        String key3 = keys.get(2);

        cache.put(key1, 1);
        cache.put(key2, 2);
        cache.put(key3, 3);

        assert peek(cache, key1) == 1;
        assert peek(cache, key2) == 2;
        assert peek(cache, key3) == 3;

        cache.localEvict(F.asList(key1, key2));

        assert cache.localPeek(key1, ONHEAP) == null;
        assert cache.localPeek(key2, ONHEAP) == null;
        assert peek(cache, key3) == 3;

        loadAll(cache, ImmutableSet.of(key1, key2), true);

        Affinity<String> aff = ignite(0).affinity(DEFAULT_CACHE_NAME);

        for (int i = 0; i < gridCount(); i++) {
            if (aff.isPrimaryOrBackup(grid(i).cluster().localNode(), key1))
                assertEquals("node name = " + grid(i).name(), (Integer)1, peek(jcache(i), key1));

            if (aff.isPrimaryOrBackup(grid(i).cluster().localNode(), key2))
                assertEquals((Integer)2, peek(jcache(i), key2));

            if (aff.isPrimaryOrBackup(grid(i).cluster().localNode(), key3))
                assertEquals((Integer)3, peek(jcache(i), key3));
        }
    }

    /**
     * JUnit.
     */
    @Test
    public void testCacheProxy() {
        IgniteCache<String, Integer> cache = jcache();

        assert cache instanceof IgniteCacheProxy;
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testCompactExpired() throws Exception {
        final IgniteCache<String, Integer> cache = jcache();

        final String key = F.first(primaryKeysForCache(cache, 1));

        cache.put(key, 1);

        long ttl = 500;

        final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl));

        grid(0).cache(DEFAULT_CACHE_NAME).withExpiryPolicy(expiry).put(key, 1);

        waitForCondition(new GridAbsPredicate() {
            @Override public boolean apply() {
                return cache.localPeek(key) == null;
            }
        }, ttl + 1000);

        // Peek will actually remove entry from cache.
        assertNull(cache.localPeek(key));

        assertEquals(0, cache.localSize());

        // Clear readers, if any.
        cache.remove(key);
    }

    /**
     * JUnit.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testOptimisticTxMissingKey() throws Exception {
        if (txShouldBeUsed()) {
            try (Transaction tx = transactions().txStart(OPTIMISTIC, READ_COMMITTED)) {
                // Remove missing key.
                assertFalse(jcache().remove(UUID.randomUUID().toString()));

                tx.commit();
            }
        }
    }

    /**
     * JUnit.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testOptimisticTxMissingKeyNoCommit() throws Exception {
        if (txShouldBeUsed()) {
            try (Transaction tx = transactions().txStart(OPTIMISTIC, READ_COMMITTED)) {
                // Remove missing key.
                assertFalse(jcache().remove(UUID.randomUUID().toString()));

                tx.setRollbackOnly();
            }
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testOptimisticTxReadCommittedInTx() throws Exception {
        checkRemovexInTx(OPTIMISTIC, READ_COMMITTED);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testOptimisticTxRepeatableReadInTx() throws Exception {
        checkRemovexInTx(OPTIMISTIC, REPEATABLE_READ);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testPessimisticTxReadCommittedInTx() throws Exception {
        checkRemovexInTx(PESSIMISTIC, READ_COMMITTED);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testPessimisticTxRepeatableReadInTx() throws Exception {
        checkRemovexInTx(PESSIMISTIC, REPEATABLE_READ);
    }

    /**
     * @param concurrency Concurrency.
     * @param isolation Isolation.
     * @throws Exception If failed.
     */
    private void checkRemovexInTx(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception {
        if (txShouldBeUsed()) {
            final int cnt = 10;

            CU.inTx(ignite(0), jcache(), concurrency, isolation, new CIX1<IgniteCache<String, Integer>>() {
                @Override public void applyx(IgniteCache<String, Integer> cache) {
                    for (int i = 0; i < cnt; i++)
                        cache.put("key" + i, i);
                }
            });

            CU.inTx(ignite(0), jcache(), concurrency, isolation, new CIX1<IgniteCache<String, Integer>>() {
                @Override public void applyx(IgniteCache<String, Integer> cache) {
                    for (int i = 0; i < cnt; i++)
                        assertEquals(new Integer(i), cache.get("key" + i));
                }
            });

            CU.inTx(ignite(0), jcache(), concurrency, isolation, new CIX1<IgniteCache<String, Integer>>() {
                @Override public void applyx(IgniteCache<String, Integer> cache) {
                    for (int i = 0; i < cnt; i++)
                        assertTrue("Failed to remove key: key" + i, cache.remove("key" + i));
                }
            });

            CU.inTx(ignite(0), jcache(), concurrency, isolation, new CIX1<IgniteCache<String, Integer>>() {
                @Override public void applyx(IgniteCache<String, Integer> cache) {
                    for (int i = 0; i < cnt; i++)
                        assertNull(cache.get("key" + i));
                }
            });
        }
    }

    /**
     * JUnit.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testPessimisticTxMissingKey() throws Exception {
        if (txShouldBeUsed()) {
            try (Transaction tx = transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
                // Remove missing key.
                assertFalse(jcache().remove(UUID.randomUUID().toString()));

                tx.commit();
            }
        }
    }

    /**
     * JUnit.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testPessimisticTxMissingKeyNoCommit() throws Exception {
        if (txShouldBeUsed()) {
            try (Transaction tx = transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
                // Remove missing key.
                assertFalse(jcache().remove(UUID.randomUUID().toString()));

                tx.setRollbackOnly();
            }
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testPessimisticTxRepeatableRead() throws Exception {
        if (txShouldBeUsed()) {
            try (Transaction ignored = transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
                jcache().put("key", 1);

                assert jcache().get("key") == 1;
            }
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testPessimisticTxRepeatableReadOnUpdate() throws Exception {
        if (txShouldBeUsed()) {
            try (Transaction ignored = transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
                jcache().put("key", 1);

                assert jcache().getAndPut("key", 2) == 1;
            }
        }
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testToMap() throws Exception {
        IgniteCache<String, Integer> cache = jcache();

        cache.put("key1", 1);
        cache.put("key2", 2);

        Map<String, Integer> map = new HashMap<>();

        for (int i = 0; i < gridCount(); i++) {
            for (Cache.Entry<String, Integer> entry : jcache(i))
                map.put(entry.getKey(), entry.getValue());
        }

        assert map.size() == 2;
        assert map.get("key1") == 1;
        assert map.get("key2") == 2;
    }

    /**
     * @param keys Expected keys.
     * @throws Exception If failed.
     */
    protected void checkSize(final Collection<String> keys) throws Exception {
        if (nearEnabled())
            assertEquals(keys.size(), jcache().localSize(CachePeekMode.ALL));
        else {
            for (int i = 0; i < gridCount(); i++)
                executeOnLocalOrRemoteJvm(i, new CheckEntriesTask(keys));
        }
    }

    /**
     * @param keys Expected keys.
     * @throws Exception If failed.
     */
    protected void checkKeySize(final Collection<String> keys) throws Exception {
        if (nearEnabled())
            assertEquals("Invalid key size: " + jcache().localSize(ALL),
                keys.size(), jcache().localSize(ALL));
        else {
            for (int i = 0; i < gridCount(); i++)
                executeOnLocalOrRemoteJvm(i, new CheckKeySizeTask(keys));
        }
    }

    /**
     * @param exp Expected value.
     * @param key Key.
     * @throws Exception If failed.
     */
    private void checkContainsKey(boolean exp, String key) throws Exception {
        if (nearEnabled())
            assertEquals(exp, jcache().containsKey(key));
        else {
            boolean contains = false;

            for (int i = 0; i < gridCount(); i++)
                if (containsKey(jcache(i), key)) {
                    contains = true;

                    break;
                }

            assertEquals("Key: " + key, exp, contains);
        }
    }

    /**
     * @param key Key.
     * @return Ignite instance for primary node.
     */
    protected Ignite primaryIgnite(String key) {
        ClusterNode node = grid(0).affinity(DEFAULT_CACHE_NAME).mapKeyToNode(key);

        if (node == null)
            throw new IgniteException("Failed to find primary node.");

        UUID nodeId = node.id();

        for (int i = 0; i < gridCount(); i++) {
            if (grid(i).localNode().id().equals(nodeId))
                return ignite(i);
        }

        throw new IgniteException("Failed to find primary node.");
    }

    /**
     * @param key Key.
     * @return Cache.
     */
    protected IgniteCache<String, Integer> primaryCache(String key) {
        return primaryIgnite(key).cache(DEFAULT_CACHE_NAME);
    }

    /**
     * @param cache Cache.
     * @param cnt Keys count.
     * @param startFrom Begin value ofthe key.
     * @return Collection of keys for which given cache is primary.
     */
    protected List<String> primaryKeysForCache(IgniteCache<String, Integer> cache, int cnt, int startFrom) {
        return executeOnLocalOrRemoteJvm(cache, new CheckPrimaryKeysTask(startFrom, cnt));
    }

    /**
     * @param cache Cache.
     * @param cnt Keys count.
     * @return Collection of keys for which given cache is primary.
     * @throws IgniteCheckedException If failed.
     */
    protected List<String> primaryKeysForCache(IgniteCache<String, Integer> cache, int cnt)
        throws IgniteCheckedException {
        return primaryKeysForCache(cache, cnt, 1);
    }

    /**
     * @param cache Cache.
     * @param key Entry key.
     * @return Pair [ttl, expireTime]; both values null if entry not found
     */
    protected IgnitePair<Long> entryTtl(IgniteCache cache, String key) {
        return executeOnLocalOrRemoteJvm(cache, new EntryTtlTask(key, true));
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testIterator() throws Exception {
        IgniteCache<Integer, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME);

        final int KEYS = 1000;

        for (int i = 0; i < KEYS; i++)
            cache.put(i, i);

        // Try to initialize readers in case when near cache is enabled.
        for (int i = 0; i < gridCount(); i++) {
            cache = grid(i).cache(DEFAULT_CACHE_NAME);

            for (int k = 0; k < KEYS; k++)
                assertEquals((Object)k, cache.get(k));
        }

        int cnt = 0;

        for (Cache.Entry e : cache)
            cnt++;

        assertEquals(KEYS, cnt);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testIgniteCacheIterator() throws Exception {
        IgniteCache<String, Integer> cache = jcache(0);

        Iterator<Cache.Entry<String, Integer>> it = cache.iterator();

        boolean hasNext = it.hasNext();

        if (hasNext)
            assertFalse("Cache has value: " + it.next(), hasNext);

        final int SIZE = 10_000;

        Map<String, Integer> entries = new HashMap<>();

        Map<String, Integer> putMap = new HashMap<>();

        for (int i = 0; i < SIZE; ++i) {
            String key = Integer.toString(i);

            putMap.put(key, i);

            entries.put(key, i);

            if (putMap.size() == 500) {
                cache.putAll(putMap);

                info("Puts finished: " + (i + 1));

                putMap.clear();
            }
        }

        cache.putAll(putMap);

        checkIteratorHasNext();

        checkIteratorCache(entries);

        checkIteratorRemove(cache, entries);

        checkIteratorEmpty(cache);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testIteratorLeakOnCancelCursor() throws Exception {
        IgniteCache<String, Integer> cache = jcache(0);

        final int SIZE = 10_000;

        Map<String, Integer> putMap = new HashMap<>();

        for (int i = 0; i < SIZE; ++i) {
            String key = Integer.toString(i);

            putMap.put(key, i);

            if (putMap.size() == 500) {
                cache.putAll(putMap);

                info("Puts finished: " + (i + 1));

                putMap.clear();
            }
        }

        cache.putAll(putMap);

        QueryCursor<Cache.Entry<String, Integer>> cur = cache.query(new ScanQuery<String, Integer>());

        cur.iterator().next();

        cur.close();

        waitForIteratorsCleared(cache, 10);
    }

    /**
     * If hasNext() is called repeatedly, it should return the same result.
     */
    private void checkIteratorHasNext() {
        Iterator<Cache.Entry<String, Integer>> iter = jcache(0).iterator();

        assertEquals(iter.hasNext(), iter.hasNext());

        while (iter.hasNext())
            iter.next();

        assertFalse(iter.hasNext());
    }

    /**
     * @param cache Cache.
     * @param entries Expected entries in the cache.
     */
    private void checkIteratorRemove(IgniteCache<String, Integer> cache, Map<String, Integer> entries) {
        // Check that we can remove element.
        String rmvKey = Integer.toString(5);

        removeCacheIterator(cache, rmvKey);

        entries.remove(rmvKey);

        assertFalse(cache.containsKey(rmvKey));
        assertNull(cache.get(rmvKey));

        checkIteratorCache(entries);

        // Check that we cannot call Iterator.remove() without next().
        final Iterator<Cache.Entry<String, Integer>> iter = jcache(0).iterator();

        assertTrue(iter.hasNext());

        iter.next();

        iter.remove();

        GridTestUtils.assertThrows(log, new Callable<Object>() {
            @Override public Void call() throws Exception {
                iter.remove();

                return null;
            }
        }, IllegalStateException.class, null);
    }

    /**
     * @param cache Cache.
     * @param key Key to remove.
     */
    private void removeCacheIterator(IgniteCache<String, Integer> cache, String key) {
        Iterator<Cache.Entry<String, Integer>> iter = cache.iterator();

        int delCnt = 0;

        while (iter.hasNext()) {
            Cache.Entry<String, Integer> cur = iter.next();

            if (cur.getKey().equals(key)) {
                iter.remove();

                delCnt++;
            }
        }

        assertEquals(1, delCnt);
    }

    /**
     * @param entries Expected entries in the cache.
     */
    private void checkIteratorCache(Map<String, Integer> entries) {
        for (int i = 0; i < gridCount(); ++i)
            checkIteratorCache(jcache(i), entries);
    }

    /**
     * @param cache Cache.
     * @param entries Expected entries in the cache.
     */
    private void checkIteratorCache(IgniteCache<String, Integer> cache, Map<String, Integer> entries) {
        Iterator<Cache.Entry<String, Integer>> iter = cache.iterator();

        int cnt = 0;

        while (iter.hasNext()) {
            Cache.Entry<String, Integer> cur = iter.next();

            assertTrue(entries.containsKey(cur.getKey()));
            assertEquals(entries.get(cur.getKey()), cur.getValue());

            cnt++;
        }

        assertEquals(entries.size(), cnt);
    }

    /**
     * Checks iterators are cleared.
     */
    private void checkIteratorsCleared() {
        for (int j = 0; j < gridCount(); j++)
            executeOnLocalOrRemoteJvm(j, new CheckIteratorTask());
    }

    /**
     * Checks iterators are cleared.
     */
    private void waitForIteratorsCleared(IgniteCache<String, Integer> cache, int secs) throws InterruptedException {
        for (int i = 0; i < secs; i++) {
            try {
                cache.size(); // Trigger weak queue poll.

                checkIteratorsCleared();
            }
            catch (Throwable t) {
                // If AssertionError is in the chain, assume we need to wait and retry.
                if (!X.hasCause(t, AssertionError.class))
                    throw t;

                if (i == 9) {
                    for (int j = 0; j < gridCount(); j++)
                        executeOnLocalOrRemoteJvm(j, new PrintIteratorStateTask());

                    throw t;
                }

                log.info("Iterators not cleared, will wait");

                Thread.sleep(1000);
            }
        }
    }

    /**
     * Checks iterators are cleared after using.
     *
     * @param cache Cache.
     * @throws Exception If failed.
     */
    private void checkIteratorEmpty(IgniteCache<String, Integer> cache) throws Exception {
        int cnt = 5;

        for (int i = 0; i < cnt; ++i) {
            Iterator<Cache.Entry<String, Integer>> iter = cache.iterator();

            iter.next();

            assert iter.hasNext();
        }

        System.gc();

        waitForIteratorsCleared(cache, 10);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLocalClearKey() throws Exception {
        addKeys();

        String keyToRmv = "key" + 25;

        Ignite g = primaryIgnite(keyToRmv);

        g.<String, Integer>cache(DEFAULT_CACHE_NAME).localClear(keyToRmv);

        checkLocalRemovedKey(keyToRmv);

        g.<String, Integer>cache(DEFAULT_CACHE_NAME).put(keyToRmv, 1);

        String keyToEvict = "key" + 30;

        g = primaryIgnite(keyToEvict);

        g.<String, Integer>cache(DEFAULT_CACHE_NAME).localEvict(Collections.singleton(keyToEvict));

        g.<String, Integer>cache(DEFAULT_CACHE_NAME).localClear(keyToEvict);

        checkLocalRemovedKey(keyToEvict);
    }

    /**
     * @param keyToRmv Removed key.
     */
    protected void checkLocalRemovedKey(String keyToRmv) {
        for (int i = 0; i < 500; ++i) {
            String key = "key" + i;

            boolean found = primaryIgnite(key).cache(DEFAULT_CACHE_NAME).localPeek(key) != null;

            if (keyToRmv.equals(key)) {
                Collection<ClusterNode> nodes = grid(0).affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(key);

                for (int j = 0; j < gridCount(); ++j) {
                    if (nodes.contains(grid(j).localNode()) && grid(j) != primaryIgnite(key))
                        assertTrue("Not found on backup removed key ", grid(j).cache(DEFAULT_CACHE_NAME).localPeek(key) != null);
                }

                assertFalse("Found removed key " + key, found);
            }
            else
                assertTrue("Not found key " + key, found);
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLocalClearKeys() throws Exception {
        Map<String, List<String>> keys = addKeys();

        Ignite g = grid(0);

        Set<String> keysToRmv = new HashSet<>();

        for (int i = 0; i < gridCount(); ++i) {
            List<String> gridKeys = keys.get(grid(i).name());

            if (gridKeys.size() > 2) {
                keysToRmv.add(gridKeys.get(0));

                keysToRmv.add(gridKeys.get(1));

                g = grid(i);

                break;
            }
        }

        assert keysToRmv.size() > 1;

        info("Will clear keys on node: " + g.cluster().localNode().id());

        g.<String, Integer>cache(DEFAULT_CACHE_NAME).localClearAll(keysToRmv);

        for (int i = 0; i < 500; ++i) {
            String key = "key" + i;

            Ignite ignite = primaryIgnite(key);

            boolean found = ignite.cache(DEFAULT_CACHE_NAME).localPeek(key) != null;

            if (keysToRmv.contains(key))
                assertFalse("Found removed key [key=" + key + ", node=" + ignite.cluster().localNode().id() + ']',
                    found);
            else
                assertTrue("Not found key " + key, found);
        }
    }

    /**
     * Add 500 keys to cache only on primaries nodes.
     *
     * @return Map grid's name to its primary keys.
     */
    protected Map<String, List<String>> addKeys() {
        // Save entries only on their primary nodes. If we didn't do so, clearLocally() will not remove all entries
        // because some of them were blocked due to having readers.
        Map<String, List<String>> keys = new HashMap<>();

        for (int i = 0; i < gridCount(); ++i)
            keys.put(grid(i).name(), new ArrayList<String>());

        for (int i = 0; i < 500; ++i) {
            String key = "key" + i;

            Ignite g = primaryIgnite(key);

            g.cache(DEFAULT_CACHE_NAME).put(key, "value" + i);

            keys.get(g.name()).add(key);
        }

        return keys;
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testGlobalClearKey() throws Exception {
        testGlobalClearKey(false, Arrays.asList("key25"), false);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testGlobalClearKeyAsyncOld() throws Exception {
        testGlobalClearKey(true, Arrays.asList("key25"), true);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testGlobalClearKeyAsync() throws Exception {
        testGlobalClearKey(true, Arrays.asList("key25"), false);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testGlobalClearKeys() throws Exception {
        testGlobalClearKey(false, Arrays.asList("key25", "key100", "key150"), false);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testGlobalClearKeysAsyncOld() throws Exception {
        testGlobalClearKey(true, Arrays.asList("key25", "key100", "key150"), true);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testGlobalClearKeysAsync() throws Exception {
        testGlobalClearKey(true, Arrays.asList("key25", "key100", "key150"), false);
    }

    /**
     * @param async If {@code true} uses async method.
     * @param keysToRmv Keys to remove.
     * @param oldAsync Use old async API.
     * @throws Exception If failed.
     */
    protected void testGlobalClearKey(boolean async, Collection<String> keysToRmv, boolean oldAsync) throws Exception {
        // Save entries only on their primary nodes. If we didn't do so, clearLocally() will not remove all entries
        // because some of them were blocked due to having readers.
        for (int i = 0; i < 500; ++i) {
            String key = "key" + i;

            Ignite g = primaryIgnite(key);

            g.cache(DEFAULT_CACHE_NAME).put(key, "value" + i);
        }

        if (async) {
            if (oldAsync) {
                IgniteCache<String, Integer> asyncCache = jcache().withAsync();

                if (keysToRmv.size() == 1)
                    asyncCache.clear(F.first(keysToRmv));
                else
                    asyncCache.clearAll(new HashSet<>(keysToRmv));

                asyncCache.future().get();
            } else {

                if (keysToRmv.size() == 1)
                    jcache().clearAsync(F.first(keysToRmv)).get();
                else
                    jcache().clearAllAsync(new HashSet<>(keysToRmv)).get();
            }
        }
        else {
            if (keysToRmv.size() == 1)
                jcache().clear(F.first(keysToRmv));
            else
                jcache().clearAll(new HashSet<>(keysToRmv));
        }

        for (int i = 0; i < 500; ++i) {
            String key = "key" + i;

            boolean found = false;

            for (int j = 0; j < gridCount(); j++) {
                if (jcache(j).localPeek(key) != null)
                    found = true;
            }

            if (!keysToRmv.contains(key))
                assertTrue("Not found key " + key, found);
            else
                assertFalse("Found removed key " + key, found);
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testWithSkipStore() throws Exception {
        IgniteCache<String, Integer> cache = jcache(0);

        IgniteCache<String, Integer> cacheSkipStore = cache.withSkipStore();

        List<String> keys = primaryKeysForCache(cache, 10);

        for (int i = 0; i < keys.size(); ++i)
            storeStgy.putToStore(keys.get(i), i);

        assertFalse(cacheSkipStore.iterator().hasNext());

        for (String key : keys) {
            assertNull(cacheSkipStore.get(key));

            assertNotNull(cache.get(key));
        }

        for (String key : keys) {
            cacheSkipStore.remove(key);

            assertNotNull(cache.get(key));
        }

        cache.removeAll(new HashSet<>(keys));

        for (String key : keys)
            assertNull(cache.get(key));

        final int KEYS = 250;

        // Put/remove data from multiple nodes.

        keys = new ArrayList<>(KEYS);

        for (int i = 0; i < KEYS; i++)
            keys.add("key_" + i);

        for (int i = 0; i < keys.size(); ++i)
            cache.put(keys.get(i), i);

        for (int i = 0; i < keys.size(); ++i) {
            String key = keys.get(i);

            assertNotNull(cacheSkipStore.get(key));
            assertNotNull(cache.get(key));
            assertEquals(i, storeStgy.getFromStore(key));
        }

        for (int i = 0; i < keys.size(); ++i) {
            String key = keys.get(i);

            Integer val1 = -1;

            cacheSkipStore.put(key, val1);
            assertEquals(i, storeStgy.getFromStore(key));
            assertEquals(val1, cacheSkipStore.get(key));

            Integer val2 = -2;

            assertEquals(val1, cacheSkipStore.invoke(key, new SetValueProcessor(val2)));
            assertEquals(i, storeStgy.getFromStore(key));
            assertEquals(val2, cacheSkipStore.get(key));
        }

        for (String key : keys) {
            cacheSkipStore.remove(key);

            assertNull(cacheSkipStore.get(key));
            assertNotNull(cache.get(key));
            assertTrue(storeStgy.isInStore(key));
        }

        for (String key : keys) {
            cache.remove(key);

            assertNull(cacheSkipStore.get(key));
            assertNull(cache.get(key));
            assertFalse(storeStgy.isInStore(key));

            storeStgy.putToStore(key, 0);

            Integer val = -1;

            assertNull(cacheSkipStore.invoke(key, new SetValueProcessor(val)));
            assertEquals(0, storeStgy.getFromStore(key));
            assertEquals(val, cacheSkipStore.get(key));

            cache.remove(key);

            storeStgy.putToStore(key, 0);

            assertTrue(cacheSkipStore.putIfAbsent(key, val));
            assertEquals(val, cacheSkipStore.get(key));
            assertEquals(0, storeStgy.getFromStore(key));

            cache.remove(key);

            storeStgy.putToStore(key, 0);

            assertNull(cacheSkipStore.getAndPut(key, val));
            assertEquals(val, cacheSkipStore.get(key));
            assertEquals(0, storeStgy.getFromStore(key));

            cache.remove(key);
        }

        assertFalse(cacheSkipStore.iterator().hasNext());
        assertTrue(storeStgy.getStoreSize() == 0);
        assertTrue(cache.size(ALL) == 0);

        // putAll/removeAll from multiple nodes.

        Map<String, Integer> data = new LinkedHashMap<>();

        for (int i = 0; i < keys.size(); i++)
            data.put(keys.get(i), i);

        cacheSkipStore.putAll(data);

        for (String key : keys) {
            assertNotNull(cacheSkipStore.get(key));
            assertNotNull(cache.get(key));
            assertFalse(storeStgy.isInStore(key));
        }

        cache.putAll(data);

        for (String key : keys) {
            assertNotNull(cacheSkipStore.get(key));
            assertNotNull(cache.get(key));
            assertTrue(storeStgy.isInStore(key));
        }

        cacheSkipStore.removeAll(data.keySet());

        for (String key : keys) {
            assertNull(cacheSkipStore.get(key));
            assertNotNull(cache.get(key));
            assertTrue(storeStgy.isInStore(key));
        }

        cacheSkipStore.putAll(data);

        for (String key : keys) {
            assertNotNull(cacheSkipStore.get(key));
            assertNotNull(cache.get(key));
            assertTrue(storeStgy.isInStore(key));
        }

        cacheSkipStore.removeAll(data.keySet());

        for (String key : keys) {
            assertNull(cacheSkipStore.get(key));
            assertNotNull(cache.get(key));
            assertTrue(storeStgy.isInStore(key));
        }

        cache.removeAll(data.keySet());

        for (String key : keys) {
            assertNull(cacheSkipStore.get(key));
            assertNull(cache.get(key));
            assertFalse(storeStgy.isInStore(key));
        }

        assertTrue(storeStgy.getStoreSize() == 0);

        // Miscellaneous checks.

        String newKey = "New key";

        assertFalse(storeStgy.isInStore(newKey));

        cacheSkipStore.put(newKey, 1);

        assertFalse(storeStgy.isInStore(newKey));

        cache.put(newKey, 1);

        assertTrue(storeStgy.isInStore(newKey));

        Iterator<Cache.Entry<String, Integer>> it = cacheSkipStore.iterator();

        assertTrue(it.hasNext());

        Cache.Entry<String, Integer> entry = it.next();

        String rmvKey = entry.getKey();

        assertTrue(storeStgy.isInStore(rmvKey));

        it.remove();

        assertNull(cacheSkipStore.get(rmvKey));

        assertTrue(storeStgy.isInStore(rmvKey));

        assertTrue(cache.size(ALL) == 0);
        assertTrue(cacheSkipStore.size(ALL) == 0);

        cache.remove(rmvKey);

        assertTrue(storeStgy.getStoreSize() == 0);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testWithSkipStoreRemoveAll() throws Exception {
        if (atomicityMode() == TRANSACTIONAL || (atomicityMode() == ATOMIC && nearEnabled())) // TODO IGNITE-373.
            return;

        IgniteCache<String, Integer> cache = jcache(0);

        IgniteCache<String, Integer> cacheSkipStore = cache.withSkipStore();

        Map<String, Integer> data = new HashMap<>();

        for (int i = 0; i < 100; i++)
            data.put("key_" + i, i);

        cache.putAll(data);

        for (String key : data.keySet()) {
            assertNotNull(cacheSkipStore.get(key));
            assertNotNull(cache.get(key));
            assertTrue(storeStgy.isInStore(key));
        }

        cacheSkipStore.removeAll();

        for (String key : data.keySet()) {
            assertNull(cacheSkipStore.get(key));
            assertNotNull(cache.get(key));
            assertTrue(storeStgy.isInStore(key));
        }

        cache.removeAll();

        for (String key : data.keySet()) {
            assertNull(cacheSkipStore.get(key));
            assertNull(cache.get(key));
            assertFalse(storeStgy.isInStore(key));
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testWithSkipStoreTx() throws Exception {
        if (txShouldBeUsed()) {
            IgniteCache<String, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME);

            IgniteCache<String, Integer> cacheSkipStore = cache.withSkipStore();

            final int KEYS = 250;

            // Put/remove data from multiple nodes.

            List<String> keys = new ArrayList<>(KEYS);

            for (int i = 0; i < KEYS; i++)
                keys.add("key_" + i);

            Map<String, Integer> data = new LinkedHashMap<>();

            for (int i = 0; i < keys.size(); i++)
                data.put(keys.get(i), i);

            checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, OPTIMISTIC, READ_COMMITTED);

            checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, OPTIMISTIC, REPEATABLE_READ);

            checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, OPTIMISTIC, SERIALIZABLE);

            checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, PESSIMISTIC, READ_COMMITTED);

            checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, PESSIMISTIC, REPEATABLE_READ);

            checkSkipStoreWithTransaction(cache, cacheSkipStore, data, keys, PESSIMISTIC, SERIALIZABLE);
        }
    }

    /**
     * @param cache Cache instance.
     * @param cacheSkipStore Cache skip store projection.
     * @param data Data set.
     * @param keys Keys list.
     * @param txConcurrency Concurrency mode.
     * @param txIsolation Isolation mode.
     * @throws Exception If failed.
     */
    private void checkSkipStoreWithTransaction(IgniteCache<String, Integer> cache,
        IgniteCache<String, Integer> cacheSkipStore,
        Map<String, Integer> data,
        List<String> keys,
        TransactionConcurrency txConcurrency,
        TransactionIsolation txIsolation)
        throws Exception {
        info("Test tx skip store [concurrency=" + txConcurrency + ", isolation=" + txIsolation + ']');

        cache.removeAll(data.keySet());
        checkEmpty(cache, cacheSkipStore);

        IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();

        Integer val = -1;

        // Several put check.
        try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
            for (String key : keys)
                cacheSkipStore.put(key, val);

            for (String key : keys) {
                assertEquals(val, cacheSkipStore.get(key));
                assertEquals(val, cache.get(key));
                assertFalse(storeStgy.isInStore(key));
            }

            tx.commit();
        }

        for (String key : keys) {
            assertEquals(val, cacheSkipStore.get(key));
            assertEquals(val, cache.get(key));
            assertFalse(storeStgy.isInStore(key));
        }

        assertEquals(0, storeStgy.getStoreSize());

        // cacheSkipStore putAll(..)/removeAll(..) check.
        try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
            cacheSkipStore.putAll(data);

            tx.commit();
        }

        for (String key : keys) {
            val = data.get(key);

            assertEquals(val, cacheSkipStore.get(key));
            assertEquals(val, cache.get(key));
            assertFalse(storeStgy.isInStore(key));
        }

        storeStgy.putAllToStore(data);

        try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
            cacheSkipStore.removeAll(data.keySet());

            tx.commit();
        }

        for (String key : keys) {
            assertNull(cacheSkipStore.get(key));
            assertNotNull(cache.get(key));
            assertTrue(storeStgy.isInStore(key));

            cache.remove(key);
        }

        assertTrue(storeStgy.getStoreSize() == 0);

        // cache putAll(..)/removeAll(..) check.
        try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
            cache.putAll(data);

            for (String key : keys) {
                assertNotNull(cacheSkipStore.get(key));
                assertNotNull(cache.get(key));
                assertFalse(storeStgy.isInStore(key));
            }

            cache.removeAll(data.keySet());

            for (String key : keys) {
                assertNull(cacheSkipStore.get(key));
                assertNull(cache.get(key));
                assertFalse(storeStgy.isInStore(key));
            }

            tx.commit();
        }

        assertTrue(storeStgy.getStoreSize() == 0);

        // putAll(..) from both cacheSkipStore and cache.
        try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
            Map<String, Integer> subMap = new HashMap<>();

            for (int i = 0; i < keys.size() / 2; i++)
                subMap.put(keys.get(i), i);

            cacheSkipStore.putAll(subMap);

            subMap.clear();

            for (int i = keys.size() / 2; i < keys.size(); i++)
                subMap.put(keys.get(i), i);

            cache.putAll(subMap);

            for (String key : keys) {
                assertNotNull(cacheSkipStore.get(key));
                assertNotNull(cache.get(key));
                assertFalse(storeStgy.isInStore(key));
            }

            tx.commit();
        }

        for (int i = 0; i < keys.size() / 2; i++) {
            String key = keys.get(i);

            assertNotNull(cacheSkipStore.get(key));
            assertNotNull(cache.get(key));
            assertFalse(storeStgy.isInStore(key));
        }

        for (int i = keys.size() / 2; i < keys.size(); i++) {
            String key = keys.get(i);

            assertNotNull(cacheSkipStore.get(key));
            assertNotNull(cache.get(key));
            assertTrue(storeStgy.isInStore(key));
        }

        cache.removeAll(data.keySet());

        for (String key : keys) {
            assertNull(cacheSkipStore.get(key));
            assertNull(cache.get(key));
            assertFalse(storeStgy.isInStore(key));
        }

        // Check that read-through is disabled when cacheSkipStore is used.
        for (int i = 0; i < keys.size(); i++)
            storeStgy.putToStore(keys.get(i), i);

        assertTrue(cacheSkipStore.size(ALL) == 0);
        assertTrue(cache.size(ALL) == 0);
        assertTrue(storeStgy.getStoreSize() != 0);

        try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
            assertTrue(cacheSkipStore.getAll(data.keySet()).isEmpty());

            for (String key : keys) {
                assertNull(cacheSkipStore.get(key));

                if (txIsolation == READ_COMMITTED) {
                    assertNotNull(cache.get(key));
                    assertNotNull(cacheSkipStore.get(key));
                }
            }

            tx.commit();
        }

        cache.removeAll(data.keySet());

        val = -1;

        try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
            for (String key : data.keySet()) {
                storeStgy.putToStore(key, 0);

                assertNull(cacheSkipStore.invoke(key, new SetValueProcessor(val)));
            }

            tx.commit();
        }

        for (String key : data.keySet()) {
            assertEquals(0, storeStgy.getFromStore(key));

            assertEquals(val, cacheSkipStore.get(key));
            assertEquals(val, cache.get(key));
        }

        cache.removeAll(data.keySet());

        try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
            for (String key : data.keySet()) {
                storeStgy.putToStore(key, 0);

                assertTrue(cacheSkipStore.putIfAbsent(key, val));
            }

            tx.commit();
        }

        for (String key : data.keySet()) {
            assertEquals(0, storeStgy.getFromStore(key));

            assertEquals(val, cacheSkipStore.get(key));
            assertEquals(val, cache.get(key));
        }

        cache.removeAll(data.keySet());

        try (Transaction tx = txs.txStart(txConcurrency, txIsolation)) {
            for (String key : data.keySet()) {
                storeStgy.putToStore(key, 0);

                assertNull(cacheSkipStore.getAndPut(key, val));
            }

            tx.commit();
        }

        for (String key : data.keySet()) {
            assertEquals(0, storeStgy.getFromStore(key));

            assertEquals(val, cacheSkipStore.get(key));
            assertEquals(val, cache.get(key));
        }

        cache.removeAll(data.keySet());
        checkEmpty(cache, cacheSkipStore);
    }

    /**
     * @param cache Cache instance.
     * @param cacheSkipStore Cache skip store projection.
     * @throws Exception If failed.
     */
    private void checkEmpty(IgniteCache<String, Integer> cache, IgniteCache<String, Integer> cacheSkipStore)
        throws Exception {
        assertTrue(cache.size(ALL) == 0);
        assertTrue(cacheSkipStore.size(ALL) == 0);
        assertTrue(storeStgy.getStoreSize() == 0);
    }

    /**
     * @return Cache start mode.
     */
    protected CacheStartMode cacheStartType() {
        String mode = System.getProperty("cache.start.mode");

        if (CacheStartMode.NODES_THEN_CACHES.name().equalsIgnoreCase(mode))
            return CacheStartMode.NODES_THEN_CACHES;

        if (CacheStartMode.ONE_BY_ONE.name().equalsIgnoreCase(mode))
            return CacheStartMode.ONE_BY_ONE;

        return CacheStartMode.STATIC;
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testGetOutTx() throws Exception {
        checkGetOutTx(false);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testGetOutTxAsync() throws Exception {
        checkGetOutTx(true);
    }

    /**
     * @throws Exception If failed.
     */
    private void checkGetOutTx(boolean async) throws Exception {
        final AtomicInteger lockEvtCnt = new AtomicInteger();

        IgnitePredicate<Event> lsnr = new IgnitePredicate<Event>() {
            @Override public boolean apply(Event evt) {
                lockEvtCnt.incrementAndGet();

                return true;
            }
        };

        try {
            IgniteCache<String, Integer> cache = jcache(0);

            List<String> keys = primaryKeysForCache(cache, 2);

            assertEquals(2, keys.size());

            cache.put(keys.get(0), 0);
            cache.put(keys.get(1), 1);

            grid(0).events().localListen(lsnr, EVT_CACHE_OBJECT_LOCKED, EVT_CACHE_OBJECT_UNLOCKED);

            try (Transaction tx = transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
                Integer val0;

                if (async)
                    val0 = cache.getAsync(keys.get(0)).get();
                else
                     val0 = cache.get(keys.get(0));

                assertEquals(0, val0.intValue());

                Map<String, Integer> allOutTx;

                if (async)
                    allOutTx = cache.getAllOutTxAsync(F.asSet(keys.get(1))).get();
                else
                    allOutTx = cache.getAllOutTx(F.asSet(keys.get(1)));

                assertEquals(1, allOutTx.size());

                assertTrue(allOutTx.containsKey(keys.get(1)));

                assertEquals(1, allOutTx.get(keys.get(1)).intValue());
            }

            assertTrue(GridTestUtils.waitForCondition(new PA() {
                @Override public boolean apply() {
                    info("Lock event count: " + lockEvtCnt.get());
                    if (atomicityMode() == ATOMIC)
                        return lockEvtCnt.get() == 0;

                    if (cacheMode() == PARTITIONED && nearEnabled()) {
                        if (!grid(0).configuration().isClientMode())
                            return lockEvtCnt.get() == 4;
                    }

                    return lockEvtCnt.get() == 2;
                }
            }, 15000));
        }
        finally {
            grid(0).events().stopLocalListen(lsnr, EVT_CACHE_OBJECT_LOCKED, EVT_CACHE_OBJECT_UNLOCKED);
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformException() throws Exception {
        final IgniteCache<String, Integer> cache = jcache();

        assertThrows(log, new Callable<Object>() {
            @Override public Object call() throws Exception {
                IgniteFuture fut = cache.invokeAsync("key2", ERR_PROCESSOR).chain(new IgniteClosure<IgniteFuture, Object>() {
                    @Override public Object apply(IgniteFuture o) {
                        return o.get();
                    }
                });

                fut.get();

                return null;
            }
        }, EntryProcessorException.class, null);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testLockInsideTransaction() throws Exception {
        if (txEnabled()) {
            GridTestUtils.assertThrows(
                log,
                new Callable<Object>() {
                    @Override public Object call() throws Exception {
                        IgniteCache<String, Integer> cache = jcache(0);

                        try (Transaction tx = ignite(0).transactions().txStart()) {
                            cache.lock("key").lock();
                        }

                        return null;
                    }
                },
                CacheException.class,
                "Explicit lock can't be acquired within a transaction."
            );

            GridTestUtils.assertThrows(
                log,
                new Callable<Object>() {
                    @Override public Object call() throws Exception {
                        IgniteCache<String, Integer> cache = jcache(0);

                        try (Transaction tx = ignite(0).transactions().txStart()) {
                            cache.lockAll(Arrays.asList("key1", "key2")).lock();
                        }

                        return null;
                    }
                },
                CacheException.class,
                "Explicit lock can't be acquired within a transaction."
            );
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testTransformResourceInjection() throws Exception {
        ClusterGroup servers = grid(0).cluster().forServers();

        if (F.isEmpty(servers.nodes()))
            return;

        grid(0).services( grid(0).cluster()).deployNodeSingleton(SERVICE_NAME1, new DummyServiceImpl());

        IgniteCache<String, Integer> cache = jcache();
        Ignite ignite = ignite(0);

        doTransformResourceInjection(ignite, cache, false, false);
        doTransformResourceInjection(ignite, cache, true, false);
        doTransformResourceInjection(ignite, cache, true, true);

        if (txEnabled()) {
            doTransformResourceInjectionInTx(ignite, cache, false, false);
            doTransformResourceInjectionInTx(ignite, cache, true, false);
            doTransformResourceInjectionInTx(ignite, cache, true, true);
        }
    }

    /**
     * @param ignite Node.
     * @param cache Cache.
     * @param async Use async API.
     * @param oldAsync Use old async API.
     * @throws Exception If failed.
     */
    private void doTransformResourceInjectionInTx(Ignite ignite, IgniteCache<String, Integer> cache, boolean async,
        boolean oldAsync) throws Exception {
        for (TransactionConcurrency concurrency : TransactionConcurrency.values()) {
            for (TransactionIsolation isolation : TransactionIsolation.values()) {
                IgniteTransactions txs = ignite.transactions();

                try (Transaction tx = txs.txStart(concurrency, isolation)) {
                    doTransformResourceInjection(ignite, cache, async, oldAsync);

                    tx.commit();
                }
            }
        }
    }

    /**
     * @param ignite Node.
     * @param cache Cache.
     * @param async Use async API.
     * @param oldAsync Use old async API.
     * @throws Exception If failed.
     */
    private void doTransformResourceInjection(Ignite ignite, IgniteCache<String, Integer> cache, boolean async,
        boolean oldAsync) throws Exception {
        final Collection<ResourceType> required = Arrays.asList(ResourceType.IGNITE_INSTANCE,
            ResourceType.CACHE_NAME,
            ResourceType.LOGGER);

        final CacheEventListener lsnr = new CacheEventListener();

        IgniteEvents evts = ignite.events(ignite.cluster());

        UUID opId = evts.remoteListen(lsnr, null, EVT_CACHE_OBJECT_READ);

        try {
            checkResourceInjectionOnInvoke(cache, required, async, oldAsync);

            checkResourceInjectionOnInvokeAll(cache, required, async, oldAsync);

            checkResourceInjectionOnInvokeAllMap(cache, required, async, oldAsync);
        }
        finally {
            evts.stopRemoteListen(opId);
        }
    }

    /**
     * Tests invokeAll method for map of pairs (key, entryProcessor).
     *
     * @param cache Cache.
     * @param required Expected injected resources.
     * @param async Use async API.
     * @param oldAsync Use old async API.
     */
    private void checkResourceInjectionOnInvokeAllMap(IgniteCache<String, Integer> cache,
        Collection<ResourceType> required, boolean async, boolean oldAsync) {
        Map<String, EntryProcessorResult<Integer>> results;

        Map<String, EntryProcessor<String, Integer, Integer>> map = new HashMap<>();

        map.put(UUID.randomUUID().toString(), new ResourceInjectionEntryProcessor());
        map.put(UUID.randomUUID().toString(), new ResourceInjectionEntryProcessor());
        map.put(UUID.randomUUID().toString(), new ResourceInjectionEntryProcessor());
        map.put(UUID.randomUUID().toString(), new ResourceInjectionEntryProcessor());

        if (async) {
            if (oldAsync) {
                IgniteCache<String, Integer> acache = cache.withAsync();

                acache.invokeAll(map);

                results = acache.<Map<String, EntryProcessorResult<Integer>>>future().get();
            }
            else
                results = cache.invokeAllAsync(map).get();
        }
        else
            results = cache.invokeAll(map);

        assertEquals(map.size(), results.size());

        for (EntryProcessorResult<Integer> res : results.values()) {
            Collection<ResourceType> notInjected = ResourceInfoSet.valueOf(res.get()).notInjected(required);

            if (!notInjected.isEmpty())
                fail("Can't inject resource(s): " + Arrays.toString(notInjected.toArray()));
        }
    }

    /**
     * Tests invokeAll method for set of keys.
     *
     * @param cache Cache.
     * @param required Expected injected resources.
     * @param async Use async API.
     * @param oldAsync Use old async API.
     */
    private void checkResourceInjectionOnInvokeAll(IgniteCache<String, Integer> cache,
        Collection<ResourceType> required, boolean async, boolean oldAsync) {
        Set<String> keys = new HashSet<>(Arrays.asList(UUID.randomUUID().toString(),
            UUID.randomUUID().toString(),
            UUID.randomUUID().toString(),
            UUID.randomUUID().toString()));

        Map<String, EntryProcessorResult<Integer>> results;

        if (async) {
            if (oldAsync) {
                IgniteCache<String, Integer> acache = cache.withAsync();

                acache.invokeAll(keys, new ResourceInjectionEntryProcessor());

                results = acache.<Map<String, EntryProcessorResult<Integer>>>future().get();
            }
            else
                results = cache.invokeAllAsync(keys, new ResourceInjectionEntryProcessor()).get();
        }
        else
            results = cache.invokeAll(keys, new ResourceInjectionEntryProcessor());

        assertEquals(keys.size(), results.size());

        for (EntryProcessorResult<Integer> res : results.values()) {
            Collection<ResourceType> notInjected1 = ResourceInfoSet.valueOf(res.get()).notInjected(required);

            if (!notInjected1.isEmpty())
                fail("Can't inject resource(s): " + Arrays.toString(notInjected1.toArray()));
        }
    }

    /**
     * Tests invoke for single key.
     *
     * @param cache Cache.
     * @param required Expected injected resources.
     * @param async Use async API.
     * @param oldAsync Use old async API.
     */
    private void checkResourceInjectionOnInvoke(IgniteCache<String, Integer> cache,
        Collection<ResourceType> required, boolean async, boolean oldAsync) {

        String key = UUID.randomUUID().toString();

        Integer flags;

        if (async) {
            if (oldAsync) {
                IgniteCache<String, Integer> acache = cache.withAsync();

                acache.invoke(key, new GridCacheAbstractFullApiSelfTest.ResourceInjectionEntryProcessor());

                flags = acache.<Integer>future().get();
            }
            else
                flags = cache.invokeAsync(key,
                    new GridCacheAbstractFullApiSelfTest.ResourceInjectionEntryProcessor()).get();
        }
        else
            flags = cache.invoke(key, new GridCacheAbstractFullApiSelfTest.ResourceInjectionEntryProcessor());

        if (cache.isAsync())
            flags = cache.<Integer>future().get();

        assertTrue("Processor result is null", flags != null);

        Collection<ResourceType> notInjected = ResourceInfoSet.valueOf(flags).notInjected(required);

        if (!notInjected.isEmpty())
            fail("Can't inject resource(s): " + Arrays.toString(notInjected.toArray()));
    }

    /**
     * Sets given value, returns old value.
     */
    public static final class SetValueProcessor implements EntryProcessor<String, Integer, Integer> {
        /** */
        private Integer newVal;

        /**
         * @param newVal New value to set.
         */
        SetValueProcessor(Integer newVal) {
            this.newVal = newVal;
        }

        /** {@inheritDoc} */
        @Override public Integer process(MutableEntry<String, Integer> entry,
            Object... arguments) throws EntryProcessorException {
            Integer val = entry.getValue();

            entry.setValue(newVal);

            return val;
        }
    }

    /**
     *
     */
    public enum CacheStartMode {
        /** Start caches together nodes (not dynamically) */
        STATIC,

        /** */
        NODES_THEN_CACHES,

        /** */
        ONE_BY_ONE
    }

    /**
     *
     */
    private static class RemoveEntryProcessor implements EntryProcessor<String, Integer, String>, Serializable {
        /** {@inheritDoc} */
        @Override public String process(MutableEntry<String, Integer> e, Object... args) {
            assertNotNull(e.getKey());

            Integer old = e.getValue();

            e.remove();

            return String.valueOf(old);
        }
    }

    /**
     *
     */
    private static class IncrementEntryProcessor implements EntryProcessor<String, Integer, String>, Serializable {
        /** {@inheritDoc} */
        @Override public String process(MutableEntry<String, Integer> e, Object... args) {
            assertNotNull(e.getKey());

            Integer old = e.getValue();

            e.setValue(old == null ? 1 : old + 1);

            return String.valueOf(old);
        }
    }

    /**
     *
     */
    public static class ResourceInjectionEntryProcessor extends ResourceInjectionEntryProcessorBase<String, Integer> {
        /** */
        protected transient Ignite ignite;

        /** */
        protected transient String cacheName;

        /** */
        protected transient IgniteLogger log;

        /** */
        protected transient DummyService svc;

        /**
         * @param ignite Ignite.
         */
        @IgniteInstanceResource
        public void setIgnite(Ignite ignite) {
            assert ignite != null;

            checkSet();

            infoSet.set(ResourceType.IGNITE_INSTANCE, true);

            this.ignite = ignite;
        }

        /**
         * @param cacheName Cache name.
         */
        @CacheNameResource
        public void setCacheName(String cacheName) {
            checkSet();

            infoSet.set(ResourceType.CACHE_NAME, true);

            this.cacheName = cacheName;
        }

        /**
         * @param log Logger.
         */
        @LoggerResource
        public void setLoggerResource(IgniteLogger log) {
            assert log != null;

            checkSet();

            infoSet.set(ResourceType.LOGGER, true);

            this.log = log;
        }

        /**
         * @param svc Service.
         */
        @ServiceResource(serviceName = SERVICE_NAME1)
        public void setDummyService(DummyService svc) {
            assert svc != null;

            checkSet();

            infoSet.set(ResourceType.SERVICE, true);

            this.svc = svc;
        }

        /** {@inheritDoc} */
        @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
            Integer oldVal = e.getValue();

            e.setValue(ThreadLocalRandom.current().nextInt() + (oldVal == null ? 0 : oldVal));

            return super.process(e, args);
        }
    }

    /**
     *
     */
    private static class CheckEntriesTask extends TestIgniteIdxRunnable {
        /** Keys. */
        private final Collection<String> keys;

        /**
         * @param keys Keys.
         */
        public CheckEntriesTask(Collection<String> keys) {
            this.keys = keys;
        }

        /** {@inheritDoc} */
        @Override public void run(int idx) throws Exception {
            GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache(DEFAULT_CACHE_NAME).context();

            int size = 0;

            if (ctx.isNear())
                ctx = ctx.near().dht().context();

            for (String key : keys) {
                if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx())) {
                    GridCacheEntryEx e = ctx.cache().entryEx(key);

                    assert e != null : "Entry is null [idx=" + idx + ", key=" + key + ", ctx=" + ctx + ']';
                    assert !e.deleted() : "Entry is deleted: " + e;

                    size++;

                    e.touch();
                }
            }

            assertEquals("Incorrect size on cache #" + idx, size, ignite.cache(ctx.name()).localSize(ALL));
        }
    }

    /**
     *
     */
    private static class CheckCacheSizeTask extends TestIgniteIdxRunnable {
        /** */
        private final Map<String, Integer> map;

        /**
         * @param map Map.
         */
        CheckCacheSizeTask(Map<String, Integer> map) {
            this.map = map;
        }

        /** {@inheritDoc} */
        @Override public void run(int idx) throws Exception {
            GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache(DEFAULT_CACHE_NAME).context();

            int size = 0;

            for (String key : map.keySet())
                if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx()))
                    size++;

            assertEquals("Incorrect key size on cache #" + idx, size, ignite.cache(ctx.name()).localSize(ALL));
        }
    }

    /**
     *
     */
    private static class CheckPrimaryKeysTask implements TestCacheCallable<String, Integer, List<String>> {
        /** Start from. */
        private final int startFrom;

        /** Count. */
        private final int cnt;

        /**
         * @param startFrom Start from.
         * @param cnt Count.
         */
        public CheckPrimaryKeysTask(int startFrom, int cnt) {
            this.startFrom = startFrom;
            this.cnt = cnt;
        }

        /** {@inheritDoc} */
        @Override public List<String> call(Ignite ignite, IgniteCache<String, Integer> cache) throws Exception {
            List<String> found = new ArrayList<>();

            Affinity<Object> affinity = ignite.affinity(cache.getName());

            for (int i = startFrom; i < startFrom + 100_000; i++) {
                String key = "key" + i;

                if (affinity.isPrimary(ignite.cluster().localNode(), key)) {
                    found.add(key);

                    if (found.size() == cnt)
                        return found;
                }
            }

            throw new IgniteException("Unable to find " + cnt + " keys as primary for cache.");
        }
    }

    /**
     *
     */
    public static class EntryTtlTask implements TestCacheCallable<String, Integer, IgnitePair<Long>> {
        /** Entry key. */
        private final String key;

        /** Check cache for nearness, use DHT cache if it is near. */
        private final boolean useDhtForNearCache;

        /**
         * @param key Entry key.
         * @param useDhtForNearCache Check cache for nearness, use DHT cache if it is near.
         */
        public EntryTtlTask(String key, boolean useDhtForNearCache) {
            this.key = key;
            this.useDhtForNearCache = useDhtForNearCache;
        }

        /** {@inheritDoc} */
        @Override public IgnitePair<Long> call(Ignite ignite, IgniteCache<String, Integer> cache) throws Exception {
            GridCacheAdapter<?, ?> internalCache = internalCache0(cache);

            if (useDhtForNearCache && internalCache.context().isNear())
                internalCache = internalCache.context().near().dht();

            GridCacheEntryEx entry = internalCache.entryEx(key);

            entry.unswap();

            IgnitePair<Long> pair = new IgnitePair<>(entry.ttl(), entry.expireTime());

            if (!entry.isNear())
                entry.context().cache().removeEntry(entry);

            return pair;
        }
    }

    /**
     *
     */
    private static class CheckIteratorTask extends TestIgniteIdxCallable<Void> {
        /**
         * @param idx Index.
         */
        @Override public Void call(int idx) throws Exception {
            GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache(DEFAULT_CACHE_NAME).context();
            GridCacheQueryManager queries = ctx.queries();

            ConcurrentMap<UUID, Map<Long, GridFutureAdapter<?>>> map = GridTestUtils.getFieldValue(queries,
                GridCacheQueryManager.class, "qryIters");

            for (Map<Long, GridFutureAdapter<?>> map1 : map.values())
                assertTrue("Iterators not removed for grid " + idx, map1.isEmpty());

            return null;
        }
    }

    /**
     *
     */
    private static class PrintIteratorStateTask extends TestIgniteIdxCallable<Void> {
        /** */
        @LoggerResource
        private IgniteLogger log;

        /**
         * @param idx Index.
         */
        @Override public Void call(int idx) throws Exception {
            GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache(DEFAULT_CACHE_NAME).context();
            GridCacheQueryManager queries = ctx.queries();

            ConcurrentMap<UUID, Map<Long, GridFutureAdapter<?>>> map = GridTestUtils.getFieldValue(queries,
                GridCacheQueryManager.class, "qryIters");

            for (Map<Long, GridFutureAdapter<?>> map1 : map.values()) {
                if (!map1.isEmpty()) {
                    log.warning("Iterators leak detected at grid: " + idx);

                    for (Map.Entry<Long, GridFutureAdapter<?>> entry : map1.entrySet())
                        log.warning(entry.getKey() + "; " + entry.getValue());
                }
            }

            return null;
        }
    }

    /**
     *
     */
    private static class RemoveAndReturnNullEntryProcessor implements
        EntryProcessor<String, Integer, Integer>, Serializable {

        /** {@inheritDoc} */
        @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
            e.remove();

            return null;
        }
    }

    /**
     *
     */
    private static class CheckEntriesDeletedTask extends TestIgniteIdxRunnable {
        /** */
        private final int cnt;

        /**
         * @param cnt Keys count.
         */
        public CheckEntriesDeletedTask(int cnt) {
            this.cnt = cnt;
        }

        /** {@inheritDoc} */
        @Override public void run(int idx) throws Exception {
            for (int i = 0; i < cnt; i++) {
                String key = String.valueOf(i);

                GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache(DEFAULT_CACHE_NAME).context();

                GridCacheEntryEx entry = ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key);

                if (ignite.affinity(DEFAULT_CACHE_NAME).mapKeyToPrimaryAndBackups(key).contains(((IgniteKernal)ignite).localNode())) {
                    assertNotNull(entry);
                    assertTrue(entry.deleted());
                }
                else
                    assertNull(entry);
            }
        }
    }

    /**
     *
     */
    private static class CheckKeySizeTask extends TestIgniteIdxRunnable {
        /** Keys. */
        private final Collection<String> keys;

        /**
         * @param keys Keys.
         */
        public CheckKeySizeTask(Collection<String> keys) {
            this.keys = keys;
        }

        /** {@inheritDoc} */
        @Override public void run(int idx) throws Exception {
            GridCacheContext<String, Integer> ctx = ((IgniteKernal)ignite).<String, Integer>internalCache(DEFAULT_CACHE_NAME).context();

            int size = 0;

            for (String key : keys)
                if (ctx.affinity().keyLocalNode(key, ctx.discovery().topologyVersionEx()))
                    size++;

            assertEquals("Incorrect key size on cache #" + idx, size, ignite.cache(DEFAULT_CACHE_NAME).localSize(ALL));
        }
    }

    /**
     *
     */
    private static class FailedEntryProcessor implements EntryProcessor<String, Integer, Integer>, Serializable {
        /** {@inheritDoc} */
        @Override public Integer process(MutableEntry<String, Integer> e, Object... args) {
            throw new EntryProcessorException("Test entry processor exception.");
        }
    }

    /**
     *
     */
    private static class TestValue implements Serializable {
        /** */
        private int val;

        /**
         * @param val Value.
         */
        TestValue(int val) {
            this.val = val;
        }

        /**
         * @return Value.
         */
        public int value() {
            return val;
        }

        /** {@inheritDoc} */
        @Override public boolean equals(Object o) {
            if (this == o)
                return true;

            if (!(o instanceof TestValue))
                return false;

            TestValue value = (TestValue)o;

            if (val != value.val)
                return false;

            return true;
        }

        /** {@inheritDoc} */
        @Override public int hashCode() {
            return val;
        }
    }

    /**
     * Dummy Service.
     */
    public interface DummyService {
        /**
         *
         */
        public void noop();
    }

    /**
     * No-op test service.
     */
    public static class DummyServiceImpl implements DummyService, Service {
        /** */
        private static final long serialVersionUID = 0L;

        /** {@inheritDoc} */
        @Override public void noop() {
            // No-op.
        }

        /** {@inheritDoc} */
        @Override public void cancel(ServiceContext ctx) {
            System.out.println("Cancelling service: " + ctx.name());
        }

        /** {@inheritDoc} */
        @Override public void init(ServiceContext ctx) throws Exception {
            System.out.println("Initializing service: " + ctx.name());
        }

        /** {@inheritDoc} */
        @Override public void execute(ServiceContext ctx) {
            System.out.println("Executing service: " + ctx.name());
        }
    }

    /**
     *
     */
    public static class CacheEventListener implements IgniteBiPredicate<UUID, CacheEvent>, IgnitePredicate<CacheEvent> {
        /** */
        public final LinkedBlockingQueue<CacheEvent> evts = new LinkedBlockingQueue<>();

        /** {@inheritDoc} */
        @Override public boolean apply(UUID uuid, CacheEvent evt) {
            evts.add(evt);

            return true;
        }

        /** {@inheritDoc} */
        @Override public boolean apply(CacheEvent evt) {
            evts.add(evt);

            return true;
        }
    }
}
