/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.expiry.TouchedExpiryPolicy;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestThread;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.events.EventType.EVTS_CACHE;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED;

/**
 * Test cases for multi-threaded tests.
 */
@SuppressWarnings("LockAcquiredButNotSafelyReleased")
public abstract class GridCacheBasicApiAbstractTest extends GridCommonAbstractTest {
    /** Grid. */
    private Ignite ignite;

    /**
     *
     */
    protected GridCacheBasicApiAbstractTest() {
        super(true /*start grid.*/);
    }

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

        TcpDiscoverySpi disco = new TcpDiscoverySpi();

        disco.setIpFinder(new TcpDiscoveryVmIpFinder(true));

        cfg.setDiscoverySpi(disco);

        cfg.setIncludeEventTypes(EventType.EVTS_ALL);

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected void beforeTest() throws Exception {
        ignite = grid();
    }

    /** {@inheritDoc} */
    @Override protected void afterTest() throws Exception {
        ignite = null;
    }

    /**
     *
     * @throws Exception If test failed.
     */
    @Test
    public void testBasicLock() throws Exception {
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.ENTRY_LOCK);

        IgniteCache<Integer, String> cache = ignite.cache(DEFAULT_CACHE_NAME);

        Lock lock = cache.lock(1);

        assert lock.tryLock();

        assert cache.isLocalLocked(1, false);

        lock.unlock();

        assert !cache.isLocalLocked(1, false);
    }

    /**
     * @throws IgniteCheckedException If test failed.
     */
    @Test
    public void testSingleLockReentry() throws IgniteCheckedException {
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.ENTRY_LOCK);

        IgniteCache<Integer, String> cache = ignite.cache(DEFAULT_CACHE_NAME);

        Lock lock = cache.lock(1);

        lock.lock();

        try {
            assert cache.isLocalLocked(1, true);

            lock.lock();

            lock.unlock();

            assert cache.isLocalLocked(1, true);
        }
        finally {
            lock.unlock();
        }

        assert !cache.isLocalLocked(1, true);
        assert !cache.isLocalLocked(1, false);
    }

    /**
     *
     * @throws Exception If test failed.
     */
    @Test
    public void testReentry() throws Exception {
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.ENTRY_LOCK);

        IgniteCache<Integer, String> cache = ignite.cache(DEFAULT_CACHE_NAME);

        Lock lock = cache.lock(1);

        lock.lock();

        assert cache.isLocalLocked(1, false);
        assert cache.isLocalLocked(1, true);

        lock.lock();

        assert cache.isLocalLocked(1, false);
        assert cache.isLocalLocked(1, true);

        lock.lock();

        assert cache.isLocalLocked(1, false);
        assert cache.isLocalLocked(1, true);

        lock.unlock();

        assert cache.isLocalLocked(1, false);
        assert cache.isLocalLocked(1, true);

        lock.unlock();

        assert cache.isLocalLocked(1, false);
        assert cache.isLocalLocked(1, true);

        lock.unlock();

        assert !cache.isLocalLocked(1, false);
        assert !cache.isLocalLocked(1, true);
    }

    /**
     *
     */
    @Test
    public void testInterruptLock() throws InterruptedException {
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.ENTRY_LOCK);

        final IgniteCache<Integer, String> cache = ignite.cache(DEFAULT_CACHE_NAME);

        final Lock lock = cache.lock(1);

        lock.lock();

        final AtomicBoolean isOk = new AtomicBoolean(false);

        Thread t = new Thread(new Runnable() {
            @Override public void run() {
                assertFalse(cache.isLocalLocked(1, true));

                lock.lock();

                try {
                    assertTrue(cache.isLocalLocked(1, true));
                }
                finally {
                    lock.unlock();
                }

                assertTrue(Thread.currentThread().isInterrupted());

                isOk.set(true);
            }
        });

        t.start();

        Thread.sleep(100);

        t.interrupt();

        lock.unlock();

        t.join();

        assertTrue(isOk.get());
    }

    /**
     *
     */
    @Test
    public void testInterruptLockWithTimeout() throws Exception {
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.ENTRY_LOCK);

        final IgniteCache<Integer, String> cache = ignite.cache(DEFAULT_CACHE_NAME);

        startGrid(1);

        try {
            final List<Integer> keys = primaryKeys(grid(1).cache(DEFAULT_CACHE_NAME), 2, 1);

            Lock lock1 = cache.lock(keys.get(1));

            lock1.lock();

            final AtomicBoolean isOk = new AtomicBoolean(false);

            final CountDownLatch latch = new CountDownLatch(1);

            Thread t = new Thread(new Runnable() {
                @Override public void run() {
                    try {
                        latch.countDown();

                        isOk.set(!cache.lockAll(Arrays.asList(keys.get(0), keys.get(1))).tryLock(5000, MILLISECONDS));
                    }
                    catch (InterruptedException ignored) {
                        isOk.set(false);
                    }
                }
            });

            t.start();

            latch.await();

            Thread.sleep(300);

            t.interrupt();

            t.join();

            lock1.unlock();

            Thread.sleep(1000);

            assertFalse(cache.isLocalLocked(keys.get(0), false));
            assertFalse(cache.isLocalLocked(keys.get(1), false));

            assertFalse(grid(1).cache(DEFAULT_CACHE_NAME).isLocalLocked(keys.get(0), false));
            assertFalse(grid(1).cache(DEFAULT_CACHE_NAME).isLocalLocked(keys.get(1), false));

            assertTrue(isOk.get());
        }
        finally {
            stopGrid(1);
        }
    }

    /**
     * @throws IgniteCheckedException If test failed.
     */
    @Test
    public void testManyLockReentries() throws IgniteCheckedException {
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.ENTRY_LOCK);

        IgniteCache<Integer, String> cache = ignite.cache(DEFAULT_CACHE_NAME);

        Integer key = 1;

        Lock lock = cache.lock(key);

        lock.lock();

        try {
            assert cache.get(key) == null;
            assert cache.getAndPut(key, "1") == null;
            assert "1".equals(cache.get(key));

            assert cache.isLocalLocked(key, false);
            assert cache.isLocalLocked(key, true);

            lock.lock();

            assert cache.isLocalLocked(key, false);
            assert cache.isLocalLocked(key, true);

            try {
                assert "1".equals(cache.getAndRemove(key));
            }
            finally {
                lock.unlock();
            }

            assert cache.isLocalLocked(key, false);
            assert cache.isLocalLocked(key, true);
        }
        finally {
            lock.unlock();

            assert !cache.isLocalLocked(key, false);
            assert !cache.isLocalLocked(key, true);
        }
    }

    /**
     * @throws Exception If test failed.
     */
    @Test
    public void testLockMultithreaded() throws Exception {
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.ENTRY_LOCK);

        final IgniteCache<Integer, String> cache = ignite.cache(DEFAULT_CACHE_NAME);

        final CountDownLatch l1 = new CountDownLatch(1);
        final CountDownLatch l2 = new CountDownLatch(1);
        final CountDownLatch l3 = new CountDownLatch(1);

        final Lock lock = cache.lock(1);

        GridTestThread t1 = new GridTestThread(new Callable<Object>() {
            /** {@inheritDoc} */
            @Nullable @Override public Object call() throws Exception {
                info("Before lock for.key 1");

                lock.lock();

                info("After lock for key 1");

                try {
                    assert cache.isLocalLocked(1, false);
                    assert cache.isLocalLocked(1, true);

                    l1.countDown();

                    info("Let thread2 proceed.");

                    // Reentry.
                    assert lock.tryLock();

                    // Nested lock.
                    Lock lock2 = cache.lock(2);

                    assert lock2.tryLock();

                    l2.await();

                    lock.unlock();

                    // Unlock in reverse order.
                    lock2.unlock();

                    info("Waited for latch 2");
                }
                finally {
                    lock.unlock();

                    info("Unlocked entry for key 1.");
                }

                l3.countDown();

                return null;
            }
        });

        GridTestThread t2 = new GridTestThread(new Callable<Object>() {
            /** {@inheritDoc} */
            @Nullable @Override public Object call() throws Exception {
                info("Waiting for latch1...");

                l1.await();

                info("Latch1 released.");

                assert !lock.tryLock();

                if (!cache.isLocalLocked(1, false))
                    throw new IllegalArgumentException();

                assert !cache.isLocalLocked(1, true);

                info("Tried to lock cache for key1");

                l2.countDown();

                info("Released latch2");

                l3.await();

                assert lock.tryLock();

                try {
                    info("Locked cache for key 1");

                    assert cache.isLocalLocked(1, false);
                    assert cache.isLocalLocked(1, true);

                    info("Checked that cache is locked for key 1");
                }
                finally {
                    lock.unlock();

                    info("Unlocked cache for key 1");
                }

                assert !cache.isLocalLocked(1, false);
                assert !cache.isLocalLocked(1, true);

                return null;
            }
        });

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        t1.checkError();
        t2.checkError();

        assert !cache.isLocalLocked(1, false);
    }

    /**
     *
     * @throws Exception If error occur.
     */
    @Test
    public void testBasicOps() throws Exception {
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.ENTRY_LOCK);
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_EVENTS);

        IgniteCache<Integer, String> cache = ignite.cache(DEFAULT_CACHE_NAME);

        CountDownLatch latch = new CountDownLatch(1);

        CacheEventListener lsnr = new CacheEventListener(latch);

        try {
            ignite.events().localListen(lsnr, EVTS_CACHE);

            int key = (int)System.currentTimeMillis();

            assert !cache.containsKey(key);

            cache.put(key, "a");

            info("Start latch wait 1");

            latch.await();

            info("Stop latch wait 1");

            assert cache.containsKey(key);

            latch = new CountDownLatch(2);

            lsnr.latch(latch);

            cache.put(key, "b");
            cache.put(key, "c");

            info("Start latch wait 2");

            latch.await();

            info("Stop latch wait 2");

            assert cache.containsKey(key);

            latch = new CountDownLatch(1);

            lsnr.latch(latch);

            cache.remove(key);

            info("Start latch wait 3");

            latch.await();

            info("Stop latch wait 3");

            assert !cache.containsKey(key);
        }
        finally {
            ignite.events().stopLocalListen(lsnr, EVTS_CACHE);
        }
    }

    /**
     * @throws Exception If error occur.
     */
    @Test
    public void testBasicOpsWithReentry() throws Exception {
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.ENTRY_LOCK);
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_EVENTS);

        IgniteCache<Integer, String> cache = ignite.cache(DEFAULT_CACHE_NAME);

        int key = (int)System.currentTimeMillis();

        assert !cache.containsKey(key);

        Lock lock = cache.lock(key);

        lock.lock();

        CountDownLatch latch = new CountDownLatch(1);

        CacheEventListener lsnr = new CacheEventListener(latch);

        try {
            ignite.events().localListen(lsnr, EVTS_CACHE);

            cache.put(key, "a");

            info("Start latch wait 1");

            latch.await();

            info("Stop latch wait 1");

            assert cache.containsKey(key);
            assert cache.isLocalLocked(key, true);

            latch = new CountDownLatch(2);

            lsnr.latch(latch);

            cache.put(key, "b");
            cache.put(key, "c");

            info("Start latch wait 2");

            latch.await();

            info("Stop latch wait 2");

            assert cache.containsKey(key);
            assert cache.isLocalLocked(key, true);

            latch = new CountDownLatch(1);

            lsnr.latch(latch);

            cache.remove(key);

            info("Start latch wait 3");

            latch.await();

            info("Stop latch wait 3");

            assert cache.isLocalLocked(key, false);
        }
        finally {
            lock.unlock();

            ignite.events().stopLocalListen(lsnr, EVTS_CACHE);
        }

        // Entry should be evicted since allowEmptyEntries is false.
        assert !cache.isLocalLocked(key, false);
    }

    /**
     * @throws Exception If test failed.
     */
    @Test
    public void testMultiLocks() throws Exception {
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.ENTRY_LOCK);

        IgniteCache<Integer, String> cache = ignite.cache(DEFAULT_CACHE_NAME);

        Collection<Integer> keys = Arrays.asList(1, 2, 3);

        Lock lock = cache.lockAll(keys);

        lock.lock();

        assert cache.isLocalLocked(1, false);
        assert cache.isLocalLocked(2, false);
        assert cache.isLocalLocked(3, false);

        assert cache.isLocalLocked(1, true);
        assert cache.isLocalLocked(2, true);
        assert cache.isLocalLocked(3, true);

        lock.unlock();

        assert !cache.isLocalLocked(1, false);
        assert !cache.isLocalLocked(2, false);
        assert !cache.isLocalLocked(3, false);

        assert !cache.isLocalLocked(1, true);
        assert !cache.isLocalLocked(2, true);
        assert !cache.isLocalLocked(3, true);
    }

    /**
     * @throws IgniteCheckedException If test failed.
     */
    @Test
    public void testGetPutRemove() throws IgniteCheckedException {
        IgniteCache<Integer, String> cache = ignite.cache(DEFAULT_CACHE_NAME);

        int key = (int)System.currentTimeMillis();

        assert cache.get(key) == null;
        assert cache.getAndPut(key, "1") == null;

        String val = cache.get(key);

        assert val != null;
        assert "1".equals(val);

        val = cache.getAndRemove(key);

        assert val != null;
        assert "1".equals(val);
        assert cache.get(key) == null;
    }

    /**
     *
     * @throws Exception In case of error.
     */
    @Test
    public void testPutWithExpiration() throws Exception {
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.ENTRY_LOCK);
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.EXPIRATION);
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_EVENTS);

        IgniteCache<Integer, String> cache = ignite.cache(DEFAULT_CACHE_NAME);

        CacheEventListener lsnr = new CacheEventListener(new CountDownLatch(1));

        ignite.events().localListen(lsnr, EVTS_CACHE);

        ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, 200L));

        try {
            int key = (int)System.currentTimeMillis();

            cache.withExpiryPolicy(expiry).put(key, "val");

            assert cache.get(key) != null;

            cache.withExpiryPolicy(expiry).put(key, "val");

            Thread.sleep(500);

            assert cache.get(key) == null;
        }
        finally {
            ignite.events().stopLocalListen(lsnr, EVTS_CACHE);
        }
    }

    /**
     * Event listener.
     */
    private class CacheEventListener implements IgnitePredicate<Event> {
        /** Wait latch. */
        private CountDownLatch latch;

        /** Event types. */
        private int[] types;

        /**
         * @param latch Wait latch.
         * @param types Event types.
         */
        CacheEventListener(CountDownLatch latch, int... types) {
            this.latch = latch;
            this.types = types;

            if (F.isEmpty(types))
                this.types = new int[] { EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED };
        }

        /**
         * @param latch New latch.
         */
        void latch(CountDownLatch latch) {
            this.latch = latch;
        }

        /**
         * Waits for latch.
         *
         * @throws InterruptedException If got interrupted.
         */
        void await() throws InterruptedException {
            latch.await();
        }

        /** {@inheritDoc} */
        @Override public boolean apply(Event evt) {
            info("Grid cache event: " + evt);

            if (U.containsIntArray(types, evt.type()))
                latch.countDown();

            return true;
        }
    }
}
