/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache;

import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import com.google.common.collect.ImmutableSet;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CA;
import org.apache.ignite.internal.util.typedef.CAX;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;

/**
 * Multithreaded cache API tests.
 */
public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends GridCacheAbstractSelfTest {
    /** */
    private static final Random RND = new Random();

    /** */
    private static final int WRITE_THREAD_CNT = 3;

    /** */
    private static final int READ_THREAD_CNT = 3;

    /** */
    private static final String WRITE_THREAD_NAME = "write-thread";

    /** */
    private static final String READ_THREAD_NAME = "read-thread";

    /** */
    private static final int PUT_CNT = 1000;

    /** */
    private final AtomicInteger cnt = new AtomicInteger();

    /** */
    private final AtomicBoolean guard = new AtomicBoolean();

    /** */
    private final Collection<Integer> set = new GridConcurrentHashSet<>();

    /** {@inheritDoc} */
    @Override protected void beforeTest() throws Exception {
        super.beforeTest();

        cnt.set(0);
        guard.set(false);
        set.clear();
    }

    /**
     * @param c Test closure.
     * @throws Exception In case of error.
     */
    private void runTest(final IgniteInClosure<IgniteCache<String, Integer>> c) throws Exception {
        final IgniteInternalFuture<?> fut1 = GridTestUtils.runMultiThreadedAsync(new CAX() {
            @Override public void applyx() {
                while (true) {
                    int i = cnt.getAndIncrement();

                    if (i >= PUT_CNT)
                        break;

                    jcache().put("key" + i, i);

                    set.add(i);

                    if (i > 10)
                        guard.compareAndSet(false, true);
                }
            }
        }, WRITE_THREAD_CNT, WRITE_THREAD_NAME);

        IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new CA() {
            @Override public void apply() {
                IgniteCache<String, Integer> cache = jcache();

                while (!fut1.isDone())
                    if (guard.get())
                        c.apply(cache);
            }
        }, READ_THREAD_CNT, READ_THREAD_NAME);

        fut1.get();
        fut2.get();

        checkConsistency();
    }

    /**
     *
     */
    private void checkConsistency() {
        for (Cache.Entry<String, Integer> e : jcache())
            for (int i = 1; i < gridCount(); i++) {
                Integer val = jcache(i).get(e.getKey());

                if (val == null)
                    assert e.getValue() == null;
                else
                    assert val.equals(e.getValue());
            }
    }

    /**
     * @return Random.
     */
    private int random() {
        int rnd;

        do
            rnd = RND.nextInt(PUT_CNT);
        while (!set.contains(rnd));

        return rnd;
    }

    /**
     * @param fromIncl Inclusive start of the range.
     * @param toExcl Exclusive stop of the range.
     * @return Range of keys.
     */
    private Set<String> rangeKeys(int fromIncl, int toExcl) {
        return new TreeSet<>(F.transform(F.range(fromIncl, toExcl), new C1<Integer, String>() {
            @Override public String apply(Integer i) {
                return "key" + i;
            }
        }));
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testContainsKey() throws Exception {
        runTest(new CI1<IgniteCache<String, Integer>>() {
            @Override public void apply(IgniteCache<String, Integer> cache) {
                assert cache.containsKey("key" + random());
                assert !cache.containsKey("wrongKey");
            }
        });
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGet() throws Exception {
        runTest(new CIX1<IgniteCache<String, Integer>>() {
            @Override public void applyx(IgniteCache<String, Integer> cache) {
                int rnd = random();

                assert cache.get("key" + rnd) == rnd;
                assert cache.get("wrongKey") == null;
            }
        });
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGetAsyncOld() throws Exception {
        runTest(new CIX1<IgniteCache<String, Integer>>() {
            @Override public void applyx(IgniteCache<String, Integer> cache) {
                int rnd = random();

                IgniteCache<String, Integer> cacheAsync = cache.withAsync();

                cacheAsync.get("key" + rnd);

                assert cacheAsync.<Integer>future().get() == rnd;

                cacheAsync.get("wrongKey");

                assert cacheAsync.future().get() == null;
            }
        });
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGetAsync() throws Exception {
        runTest(new CIX1<IgniteCache<String, Integer>>() {
            @Override public void applyx(IgniteCache<String, Integer> cache) {
                int rnd = random();

                assert cache.getAsync("key" + rnd).get() == rnd;

                assert cache.getAsync("wrongKey").get() == null;
            }
        });
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGetAll() throws Exception {
        runTest(new CIX1<IgniteCache<String, Integer>>() {
            @Override public void applyx(IgniteCache<String, Integer> cache) {
                int rnd1 = random();
                int rnd2 = random();

                Map<String, Integer> map = cache.getAll(ImmutableSet.of("key" + rnd1, "key" + rnd2));

                assert map.size() == (rnd1 != rnd2 ? 2 : 1);
                assert map.get("key" + rnd1) == rnd1;
                assert map.get("key" + rnd2) == rnd2;
            }
        });
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGetAllAsyncOld() throws Exception {
        runTest(new CIX1<IgniteCache<String, Integer>>() {
            @Override public void applyx(IgniteCache<String, Integer> cache) {
                int rnd1 = random();
                int rnd2 = random();

                IgniteCache<String, Integer> cacheAsync = cache.withAsync();

                cacheAsync.getAll(ImmutableSet.of("key" + rnd1, "key" + rnd2));

                Map<String, Integer> map = cacheAsync.<Map<String, Integer>>future().get();

                assert map.size() == (rnd1 != rnd2 ? 2 : 1);
                assert map.get("key" + rnd1) == rnd1;
                assert map.get("key" + rnd2) == rnd2;
            }
        });
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testGetAllAsync() throws Exception {
        runTest(new CIX1<IgniteCache<String, Integer>>() {
            @Override public void applyx(IgniteCache<String, Integer> cache) {
                int rnd1 = random();
                int rnd2 = random();

                Map<String, Integer> map = cache.getAllAsync(ImmutableSet.of("key" + rnd1, "key" + rnd2)).get();

                assert map.size() == (rnd1 != rnd2 ? 2 : 1);
                assert map.get("key" + rnd1) == rnd1;
                assert map.get("key" + rnd2) == rnd2;
            }
        });
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testRemove() throws Exception {
        runTest(new CIX1<IgniteCache<String, Integer>>() {
            @Override public void applyx(IgniteCache<String, Integer> cache) {
                int rnd1 = random();
                int rnd2 = random();

                assert cache.getAndRemove("wrongKey") == null;
                assert !cache.remove("key" + rnd1, -1);

                Integer v1 = cache.localPeek("key" + rnd1, CachePeekMode.ONHEAP);
                Integer v2 = cache.localPeek("key" + rnd2, CachePeekMode.ONHEAP);

                assert v1 == null || v1 == rnd1;
                assert v2 == null || v2 == rnd2;

                v1 = cache.getAndRemove("key" + rnd1);

                assert cache.localPeek("key" + rnd1, CachePeekMode.ONHEAP) == null && (v1 == null || v1 == rnd1);

                assert cache.getAndRemove("key" + rnd1) == null;

                cache.remove("key" + rnd2, rnd2);

                assert cache.localPeek("key" + rnd2, CachePeekMode.ONHEAP) == null;
            }
        });
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testRemoveAsyncOld() throws Exception {
        runTest(new CIX1<IgniteCache<String, Integer>>() {
            @Override public void applyx(IgniteCache<String, Integer> cache) {
                int rnd1 = random();
                int rnd2 = random();

                IgniteCache<String, Integer> cacheAsync = cache.withAsync();

                cacheAsync.getAndRemove("wrongKey");

                assert cacheAsync.future().get() == null;

                cacheAsync.remove("key" + rnd1, -1);

                assert !cacheAsync.<Boolean>future().get();

                Integer v1 = cache.localPeek("key" + rnd1, CachePeekMode.ONHEAP);
                Integer v2 = cache.localPeek("key" + rnd2, CachePeekMode.ONHEAP);

                assert v1 == null || v1 == rnd1;
                assert v2 == null || v2 == rnd2;

                v1 = removeAsync(cache, "key" + rnd1);

                assert cache.localPeek("key" + rnd1, CachePeekMode.ONHEAP) == null && (v1 == null || v1 == rnd1);

                assert cache.getAndRemove("key" + rnd1) == null;

                removeAsync(cache, "key" + rnd2, rnd2);

                assert cache.localPeek("key" + rnd2, CachePeekMode.ONHEAP) == null;
            }
        });
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testRemoveAsync() throws Exception {
        runTest(new CIX1<IgniteCache<String, Integer>>() {
            @Override public void applyx(IgniteCache<String, Integer> cache) {
                int rnd1 = random();
                int rnd2 = random();

                assert cache.getAndRemoveAsync("wrongKey").get() == null;

                assert !cache.removeAsync("key" + rnd1, -1).get();

                Integer v1 = cache.localPeek("key" + rnd1, CachePeekMode.ONHEAP);
                Integer v2 = cache.localPeek("key" + rnd2, CachePeekMode.ONHEAP);

                assert v1 == null || v1 == rnd1;
                assert v2 == null || v2 == rnd2;

                v1 = removeAsync(cache, "key" + rnd1);

                assert cache.localPeek("key" + rnd1, CachePeekMode.ONHEAP) == null && (v1 == null || v1 == rnd1);

                assert cache.getAndRemove("key" + rnd1) == null;

                removeAsync(cache, "key" + rnd2, rnd2);

                assert cache.localPeek("key" + rnd2, CachePeekMode.ONHEAP) == null;
            }
        });
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testRemoveAll() throws Exception {
        runTest(new CIX1<IgniteCache<String, Integer>>() {
            @Override public void applyx(IgniteCache<String, Integer> cache) {
                int rnd = random();

                Set<Integer> ids = new HashSet<>(set);

                cache.removeAll(rangeKeys(0, rnd));

                for (int i = 0; i < rnd; i++) {
                    if (ids.contains(i))
                        assertNull(cache.localPeek("key" + i));
                }
            }
        });
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testRemoveAllAsyncOld() throws Exception {
        runTest(new CIX1<IgniteCache<String, Integer>>() {
            @Override public void applyx(IgniteCache<String, Integer> cache) {
                int rnd = random();

                IgniteCache<String, Integer> cacheAsync = cache.withAsync();

                Set<Integer> ids = new HashSet<>(set);

                cacheAsync.removeAll(rangeKeys(0, rnd));

                cacheAsync.future().get();

                for (int i = 0; i < rnd; i++) {
                    if (ids.contains(i))
                        assertNull(cache.localPeek("key" + i));
                }
            }
        });
    }

    /**
     * @throws Exception In case of error.
     */
    @Test
    public void testRemoveAllAsync() throws Exception {
        runTest(new CIX1<IgniteCache<String, Integer>>() {
            @Override public void applyx(IgniteCache<String, Integer> cache) {
                int rnd = random();

                Set<Integer> ids = new HashSet<>(set);

                cache.removeAllAsync(rangeKeys(0, rnd)).get();

                for (int i = 0; i < rnd; i++) {
                    if (ids.contains(i))
                        assertNull(cache.localPeek("key" + i));
                }
            }
        });
    }

    /**
     * @param cache Cache.
     * @param key Key.
     * @return Removed value.
     */
    private <K, V> V removeAsync(IgniteCache<K, V> cache, K key) {
        return cache.getAndRemoveAsync(key).get();
    }

    /**
     * @param cache Cache.
     * @param key Key.
     * @param val Value.
     * @return Remove result.
     */
    private <K, V> boolean removeAsync(IgniteCache<K, V> cache, K key, V val) {
        return cache.removeAsync(key, val).get();
    }
}
