/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Factory;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryListener;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ModifiedExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static javax.cache.event.EventType.CREATED;
import static javax.cache.event.EventType.EXPIRED;
import static javax.cache.event.EventType.REMOVED;
import static javax.cache.event.EventType.UPDATED;
import static org.apache.ignite.cache.CacheMode.LOCAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;

/**
 *
 */
public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAbstractTest {
    /** */
    private static volatile List<CacheEntryEvent<?, ?>> evts;

    /** */
    private static volatile CountDownLatch evtsLatch;

    /** */
    private static volatile CountDownLatch syncEvtLatch;

    /** */
    private Integer lastKey = 0;

    /** */
    private CacheEntryListenerConfiguration<Object, Object> lsnrCfg;

    /** */
    private boolean useObjects;

    /** */
    private static AtomicBoolean serialized = new AtomicBoolean(false);

    /** {@inheritDoc} */
    @Override protected void beforeTestsStarted() throws Exception {
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_EVENTS);

        super.beforeTestsStarted();
    }

    /** {@inheritDoc} */
    @SuppressWarnings("unchecked")
    @Override protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception {
        MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_EVENTS);

        CacheConfiguration cfg = super.cacheConfiguration(igniteInstanceName);

        if (lsnrCfg != null)
            cfg.addCacheEntryListenerConfiguration(lsnrCfg);

        cfg.setEagerTtl(eagerTtl());

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

        MemoryEventStorageSpi eventSpi = new MemoryEventStorageSpi();
        eventSpi.setExpireCount(50);

        cfg.setEventStorageSpi(eventSpi);

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected void afterTest() throws Exception {
        super.afterTest();

        for (int i = 0; i < gridCount(); i++) {
            GridContinuousProcessor proc = grid(i).context().continuous();

            final ConcurrentMap<?, ?> syncMsgFuts = GridTestUtils.getFieldValue(proc, "syncMsgFuts");

            GridTestUtils.waitForCondition(new GridAbsPredicate() {
                @Override public boolean apply() {
                    return syncMsgFuts.isEmpty();
                }
            }, 5000);

            assertEquals(0, syncMsgFuts.size());
        }

        serialized.set(false);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testExceptionIgnored() throws Exception {
        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
            new Factory<CacheEntryListener<Object, Object>>() {
                @Override public CacheEntryListener<Object, Object> create() {
                    return new ExceptionListener();
                }
            },
            null,
            false,
            false
        );

        IgniteCache<Object, Object> cache = jcache();

        cache.registerCacheEntryListener(lsnrCfg);

        try {
            for (Integer key : keys()) {
                log.info("Check listener exceptions are ignored [key=" + key + ']');

                cache.put(key, key);

                cache.remove(key);
            }
        }
        finally {
            cache.deregisterCacheEntryListener(lsnrCfg);
        }

        lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
            new Factory<CacheEntryListener<Object, Object>>() {
                @Override public CacheEntryListener<Object, Object> create() {
                    return new CreateUpdateRemoveExpireListener();
                }
            },
            new ExceptionFilterFactory(),
            false,
            false
        );

        cache.registerCacheEntryListener(lsnrCfg);

        try {
            for (Integer key : keys()) {
                log.info("Check filter exceptions are ignored [key=" + key + ']');

                cache.put(key, key);

                cache.remove(key);
            }
        }
        finally {
            cache.deregisterCacheEntryListener(lsnrCfg);
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testNoOldValue() throws Exception {
        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
            new Factory<CacheEntryListener<Object, Object>>() {
                @Override public CacheEntryListener<Object, Object> create() {
                    return new CreateUpdateRemoveExpireListener();
                }
            },
            null,
            false,
            true
        );

        IgniteCache<Object, Object> cache = jcache();

        try {
            for (Integer key : keys()) {
                log.info("Check create/update/remove/expire events, no old value [key=" + key + ']');

                cache.registerCacheEntryListener(lsnrCfg);

                checkEvents(cache, lsnrCfg, key, true, true, true, true, false);
            }
        }
        finally {
            cache.deregisterCacheEntryListener(lsnrCfg);
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testSynchronousEventsObjectKeyValue() throws Exception {
        useObjects = true;

        testSynchronousEvents();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testSynchronousEvents() throws Exception {
        final CacheEntryCreatedListener<Object, Object> lsnr = new CreateUpdateRemoveExpireListener() {
            @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
                super.onRemoved(evts);

                awaitLatch();
            }

            @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
                super.onCreated(evts);

                awaitLatch();
            }

            @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
                super.onUpdated(evts);

                awaitLatch();
            }

            private void awaitLatch() {
                try {
                    assertTrue(syncEvtLatch.await(5000, MILLISECONDS));
                }
                catch (InterruptedException e) {
                    fail("Unexpected exception: " + e);
                }
            }
        };

        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
            new Factory<CacheEntryListener<Object, Object>>() {
                @Override public CacheEntryListener<Object, Object> create() {
                    return lsnr;
                }
            },
            null,
            true,
            true
        );

        IgniteCache<Object, Object> cache = jcache();

        cache.registerCacheEntryListener(lsnrCfg);

        try {
            for (Integer key : keys()) {
                log.info("Check synchronous create event [key=" + key + ']');

                syncEvent(key, 1, cache, 1);

                checkEvent(evts.iterator(), key, CREATED, 1, null);

                log.info("Check synchronous update event [key=" + key + ']');

                syncEvent(key, 2, cache, 1);

                checkEvent(evts.iterator(), key, UPDATED, 2, 1);

                log.info("Check synchronous remove event [key=" + key + ']');

                syncEvent(key, null, cache, 1);

                checkEvent(evts.iterator(), key, REMOVED, 2, 2);

                log.info("Check synchronous expire event [key=" + key + ']');

                syncEvent(key,
                    3,
                    cache.withExpiryPolicy(new ModifiedExpiryPolicy(new Duration(MILLISECONDS, 1000))),
                    eagerTtl() ? 2 : 1);

                checkEvent(evts.iterator(), key, CREATED, 3, null);

                if (!eagerTtl()) {
                    U.sleep(1100);

                    assertNull(primaryCache(key(key), cache.getName()).get(key(key)));

                    evtsLatch.await(5000, MILLISECONDS);

                    assertEquals(1, evts.size());
                }

                checkEvent(evts.iterator(), key, EXPIRED, 3, 3);

                assertEquals(0, evts.size());
            }
        }
        finally {
            cache.deregisterCacheEntryListener(lsnrCfg);
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testSynchronousEventsListenerNodeFailed() throws Exception {
        if (cacheMode() != PARTITIONED)
            return;

        lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
            new NoOpCreateUpdateListenerFactory(),
            null,
            true,
            true
        );

        final Ignite grid = startGrid(gridCount());

        try {
            awaitPartitionMapExchange();

            IgniteCache<Integer, Integer> cache = jcache(0);

            Map<Integer, Integer> vals = new HashMap<>();

            for (Integer key : nearKeys(grid.cache(DEFAULT_CACHE_NAME), 100, 1_000_000))
                vals.put(key, 1);

            final AtomicBoolean done = new AtomicBoolean();

            IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
                @Override public Void call() throws Exception {
                    U.sleep(500);

                    stopGrid(grid.name());

                    done.set(true);

                    return null;
                }
            });

            while (!done.get())
                cache.putAll(vals);

            fut.get();

            log.info("Update one more time.");

            cache.putAll(vals);
        }
        finally {
            stopGrid(gridCount());
        }
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testConcurrentRegisterDeregister() throws Exception {
        final int THREADS = 10;

        final CyclicBarrier barrier = new CyclicBarrier(THREADS);

        final IgniteCache<Object, Object> cache = jcache(0);

        GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
            @Override public Void call() throws Exception {
                CacheEntryListenerConfiguration<Object, Object> cfg = new MutableCacheEntryListenerConfiguration<>(
                    new Factory<CacheEntryListener<Object, Object>>() {
                        @Override public CacheEntryListener<Object, Object> create() {
                            return new CreateUpdateRemoveExpireListener();
                        }
                    },
                    null,
                    true,
                    false
                );

                barrier.await();

                for (int i = 0; i < 100; i++) {
                    cache.registerCacheEntryListener(cfg);

                    cache.deregisterCacheEntryListener(cfg);
                }

                return null;
            }
        }, THREADS, "register-thread").get();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testSerialization() throws Exception {
        if (cacheMode() == LOCAL)
            return;

        AtomicBoolean serialized = new AtomicBoolean();

        NonSerializableListener lsnr = new NonSerializableListener(serialized);

        jcache(0).registerCacheEntryListener(new MutableCacheEntryListenerConfiguration<>(
            FactoryBuilder.factoryOf(lsnr),
            new SerializableFactory(),
            true,
            false
        ));

        try {
            startGrid(gridCount());

            jcache(0).put(1, 1);
        }
        finally {
            stopGrid(gridCount());
        }

        jcache(0).put(2, 2);

        assertFalse(IgniteCacheEntryListenerAbstractTest.serialized.get());
        assertFalse(serialized.get());
    }

    /**
     * @param key Key.
     * @param val Value.
     * @param cache Cache.
     * @param expEvts Expected events number.
     * @throws Exception If failed.
     */
    private void syncEvent(
        Integer key,
        Integer val,
        IgniteCache<Object, Object> cache,
        int expEvts)
        throws Exception {
        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());

        evtsLatch = new CountDownLatch(expEvts);

        syncEvtLatch = new CountDownLatch(1);

        final AtomicBoolean done = new AtomicBoolean();

        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
            @Override public Void call() throws Exception {
                assertFalse(done.get());

                U.sleep(500);

                assertFalse(done.get());

                syncEvtLatch.countDown();

                return null;
            }
        });

        if (val != null)
            cache.put(key(key), value(val));
        else
            cache.remove(key(key));

        done.set(true);

        fut.get();

        evtsLatch.await(5000, MILLISECONDS);

        assertEquals(expEvts, evts.size());
    }

    /**
     * @param key Integer key.
     * @return Key instance.
     */
    private Object key(Integer key) {
        assert key != null;

        return useObjects ? new ListenerTestKey(key) : key;
    }

    /**
     * @param val Integer value.
     * @return Value instance.
     */
    private Object value(Integer val) {
        if (val == null)
            return null;

        return useObjects ? new ListenerTestValue(val) : val;
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testEventsObjectKeyValue() throws Exception {
        useObjects = true;

        testEvents();
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testEvents() throws Exception {
        IgniteCache<Object, Object> cache = jcache();

        Map<Object, Object> vals = new HashMap<>();

        for (int i = 0; i < 100; i++)
            vals.put(key(i + 2_000_000), value(i));

        cache.putAll(vals); // Put some data in cache to make sure events are not generated for existing entries.

        for (Integer key : keys()) {
            log.info("Check create event [key=" + key + ']');

            checkEvents(cache, new CreateListenerFactory(), key, true, false, false, false);

            log.info("Check update event [key=" + key + ']');

            checkEvents(cache, new UpdateListenerFactory(), key, false, true, false, false);

            log.info("Check remove event [key=" + key + ']');

            checkEvents(cache, new RemoveListenerFactory(), key, false, false, true, false);

            log.info("Check expire event [key=" + key + ']');

            checkEvents(cache, new ExpireListenerFactory(), key, false, false, false, true);

            log.info("Check create/update events [key=" + key + ']');

            checkEvents(cache, new CreateUpdateListenerFactory(), key, true, true, false, false);

            log.info("Check create/update/remove/expire events [key=" + key + ']');

            checkEvents(cache, new CreateUpdateRemoveExpireListenerFactory(), key, true, true, true, true);
        }

        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
            new CreateUpdateRemoveExpireListenerFactory(),
            new TestFilterFactory(),
            true,
            false
        );

        cache.registerCacheEntryListener(lsnrCfg);

        log.info("Check filter.");

        try {
            checkFilter(cache, vals);
        }
        finally {
            cache.deregisterCacheEntryListener(lsnrCfg);
        }

        cache.putAll(vals);

        try {
            checkListenerOnStart(vals);
        }
        finally {
            cache.removeAll(vals.keySet());
        }
    }

    /**
     * @param vals Values in cache.
     * @throws Exception If failed.
     */
    private void checkListenerOnStart(Map<Object, Object> vals) throws Exception {
        lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
            new CreateUpdateRemoveExpireListenerFactory(),
            null,
            true,
            false
        );

        Ignite grid = startGrid(gridCount());

        try {
            awaitPartitionMapExchange();

            IgniteCache<Object, Object> cache = grid.cache(DEFAULT_CACHE_NAME);

            Integer key = Integer.MAX_VALUE;

            log.info("Check create/update/remove events for listener in configuration [key=" + key + ']');

            checkEvents(cache, lsnrCfg, key, true, true, true, true, true);
        }
        finally {
            stopGrid(gridCount());
        }

        lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
            new CreateUpdateRemoveExpireListenerFactory(),
            new TestFilterFactory(),
            true,
            false
        );

        grid = startGrid(gridCount());

        try {
            awaitPartitionMapExchange();

            IgniteCache<Object, Object> cache = grid.cache(DEFAULT_CACHE_NAME);

            log.info("Check filter for listener in configuration.");

            if (cacheMode() == LOCAL)
                cache.putAll(vals);

            checkFilter(cache, vals);
        }
        finally {
            stopGrid(gridCount());
        }
    }

    /**
     * @param cache Cache.
     * @param lsnrFactory Listener factory.
     * @param key Key.
     * @param create {@code True} if listens for create events.
     * @param update {@code True} if listens for update events.
     * @param rmv {@code True} if listens for remove events.
     * @param expire {@code True} if listens for expire events.
     * @throws Exception If failed.
     */
    private void checkEvents(
        final IgniteCache<Object, Object> cache,
        final Factory<CacheEntryListener<Object, Object>> lsnrFactory,
        Integer key,
        boolean create,
        boolean update,
        boolean rmv,
        boolean expire) throws Exception {
        CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>(
            lsnrFactory,
            null,
            true,
            false
        );

        cache.registerCacheEntryListener(lsnrCfg);

        try {
            checkEvents(cache, lsnrCfg, key, create, update, rmv, expire, true);
        }
        finally {
            cache.deregisterCacheEntryListener(lsnrCfg);
        }
    }

    /**
     * @param cache Cache.
     * @param vals Values in cache.
     * @throws Exception If failed.
     */
    private void checkFilter(final IgniteCache<Object, Object> cache, Map<Object, Object> vals) throws Exception {
        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());

        final int expEvts = (vals.size() / 2) * 4; // Remove, create, update and expire for half of modified entries.

        evtsLatch = new CountDownLatch(expEvts);

        cache.removeAll(vals.keySet());

        cache.putAll(vals);

        final Map<Object, Object> newVals = new HashMap<>();

        for (Object key : vals.keySet())
            newVals.put(key, value(-1));

        cache.withExpiryPolicy(new ModifiedExpiryPolicy(new Duration(MILLISECONDS, 500))).putAll(newVals);

        U.sleep(1000);

        GridTestUtils.waitForCondition(new GridAbsPredicate() {
            @Override public boolean apply() {
                for (Object key : newVals.keySet()) {
                    if (primaryCache(key, cache.getName()).get(key) != null)
                        return false;
                }

                return true;
            }
        }, 5000);

        evtsLatch.await(5000, MILLISECONDS);

        assertEquals(expEvts, evts.size());

        Set<Object> rmvd = new HashSet<>();
        Set<Object> created = new HashSet<>();
        Set<Object> updated = new HashSet<>();
        Set<Object> expired = new HashSet<>();

        for (CacheEntryEvent<?, ?> evt : evts) {
            Integer key;

            if (useObjects)
                key = ((ListenerTestKey)evt.getKey()).key;
            else
                key = (Integer)evt.getKey();

            assertTrue(key % 2 == 0);

            assertTrue(vals.keySet().contains(evt.getKey()));

            switch (evt.getEventType()) {
                case REMOVED:
                    assertEquals(evt.getOldValue(), evt.getValue());

                    assertEquals(vals.get(evt.getKey()), evt.getOldValue());

                    assertTrue(rmvd.add(evt.getKey()));

                    break;

                case CREATED:
                    assertEquals(vals.get(evt.getKey()), evt.getValue());

                    assertNull(evt.getOldValue());

                    assertTrue(rmvd.contains(evt.getKey()));

                    assertTrue(created.add(evt.getKey()));

                    break;

                case UPDATED:
                    assertEquals(value(-1), evt.getValue());

                    assertEquals(vals.get(evt.getKey()), evt.getOldValue());

                    assertTrue(rmvd.contains(evt.getKey()));

                    assertTrue(created.contains(evt.getKey()));

                    assertTrue(updated.add(evt.getKey()));

                    break;

                case EXPIRED:
                    assertEquals(evt.getOldValue(), evt.getValue());

                    assertEquals(value(-1), evt.getOldValue());

                    assertTrue(rmvd.contains(evt.getKey()));

                    assertTrue(created.contains(evt.getKey()));

                    assertTrue(updated.contains(evt.getKey()));

                    assertTrue(expired.add(evt.getKey()));

                    break;

                default:
                    fail("Unexpected type: " + evt.getEventType());
            }
        }

        assertEquals(vals.size() / 2, rmvd.size());
        assertEquals(vals.size() / 2, created.size());
        assertEquals(vals.size() / 2, updated.size());
        assertEquals(vals.size() / 2, expired.size());
    }

    /**
     * @param cache Cache.
     * @param lsnrCfg Listener configuration.
     * @param key Key.
     * @param create {@code True} if listens for create events.
     * @param update {@code True} if listens for update events.
     * @param rmv {@code True} if listens for remove events.
     * @param expire {@code True} if listens for expire events.
     * @param oldVal {@code True} if old value should be provided for event.
     * @throws Exception If failed.
     */
    private void checkEvents(
        final IgniteCache<Object, Object> cache,
        final CacheEntryListenerConfiguration<Object, Object> lsnrCfg,
        Integer key,
        boolean create,
        boolean update,
        boolean rmv,
        boolean expire,
        boolean oldVal) throws Exception {
        GridTestUtils.assertThrows(log, new Callable<Void>() {
            @Override public Void call() throws Exception {
                cache.registerCacheEntryListener(lsnrCfg);

                return null;
            }
        }, IllegalArgumentException.class, null);

        final int UPDATES = 10;

        int expEvts = 0;

        if (create)
            expEvts += 4;

        if (update)
            expEvts += (UPDATES + 1);

        if (rmv)
            expEvts += 2;

        if (expire)
            expEvts += 2;

        evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());

        evtsLatch = new CountDownLatch(expEvts);

        cache.put(key(key), value(0));

        for (int i = 0; i < UPDATES; i++) {
            if (i % 2 == 0)
                cache.put(key(key), value(i + 1));
            else
                cache.invoke(key(key), new EntrySetValueProcessor(value(i + 1)));
        }

        // Invoke processor does not update value, should not trigger event.
        assertEquals(String.valueOf(UPDATES), cache.invoke(key(key), new EntryToStringProcessor()));

        assertFalse(cache.putIfAbsent(key(key), value(-1)));

        assertFalse(cache.remove(key(key), value(-1)));

        assertTrue(cache.remove(key(key)));

        IgniteCache<Object, Object> expirePlcCache =
            cache.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100)));

        expirePlcCache.put(key(key), value(10));

        U.sleep(700);

        if (!eagerTtl())
            assertNull(primaryCache(key(key), cache.getName()).get(key(key))); // Provoke expire event if eager ttl is disabled.

        IgniteCache<Object, Object> cache1 = cache;

        if (gridCount() > 1)
            cache1 = jcache(1); // Do updates from another node.

        cache1.put(key(key), value(1));

        cache1.put(key(key), value(2));

        assertTrue(cache1.remove(key(key)));

        IgniteCache<Object, Object> expirePlcCache1 =
            cache1.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100)));

        expirePlcCache1.put(key(key), value(20));

        U.sleep(200);

        if (!eagerTtl())
            assertNull(primaryCache(key(key), cache.getName()).get(key(key))); // Provoke expire event if eager ttl is disabled.

        evtsLatch.await(5000, MILLISECONDS);

        assertEquals(expEvts, evts.size());

        Iterator<CacheEntryEvent<?, ?>> iter = evts.iterator();

        if (create)
            checkEvent(iter, key, CREATED, 0, null);

        if (update) {
            for (int i = 0; i < UPDATES; i++)
                checkEvent(iter, key, UPDATED, i + 1, oldVal ? i : null);
        }

        if (rmv)
            checkEvent(iter, key, REMOVED, oldVal ? UPDATES : null, oldVal ? UPDATES : null);

        if (create)
            checkEvent(iter, key, CREATED, 10, null);

        if (expire)
            checkEvent(iter, key, EXPIRED, oldVal ? 10 : null, oldVal ? 10 : null);

        if (create)
            checkEvent(iter, key, CREATED, 1, null);

        if (update)
            checkEvent(iter, key, UPDATED, 2, oldVal ? 1 : null);

        if (rmv)
            checkEvent(iter, key, REMOVED, oldVal ? 2 : null, oldVal ? 2 : null);

        if (create)
            checkEvent(iter, key, CREATED, 20, null);

        if (expire)
            checkEvent(iter, key, EXPIRED, oldVal ? 20 : null, oldVal ? 20 : null);

        assertEquals(0, evts.size());

        log.info("Remove listener.");

        cache.deregisterCacheEntryListener(lsnrCfg);

        cache.put(key(key), value(1));

        cache.put(key(key), value(2));

        assertTrue(cache.remove(key(key)));

        U.sleep(500); // Sleep some time to ensure listener was really removed.

        assertEquals(0, evts.size());

        cache.registerCacheEntryListener(lsnrCfg);

        cache.deregisterCacheEntryListener(lsnrCfg);
    }

    /**
     * @param iter Received events iterator.
     * @param expKey Expected key.
     * @param expType Expected type.
     * @param expVal Expected value.
     * @param expOld Expected old value.
     */
    private void checkEvent(Iterator<CacheEntryEvent<?, ?>> iter,
        Integer expKey,
        EventType expType,
        @Nullable Integer expVal,
        @Nullable Integer expOld) {
        assertTrue(iter.hasNext());

        CacheEntryEvent<?, ?> evt = iter.next();

        iter.remove();

        assertTrue(evt.getSource() instanceof IgniteCacheProxy);

        assertEquals(key(expKey), evt.getKey());

        assertEquals(expType, evt.getEventType());

        assertEquals(value(expVal), evt.getValue());

        assertEquals(value(expOld), evt.getOldValue());

        if (expOld == null)
            assertFalse(evt.isOldValueAvailable());
        else
            assertTrue(evt.isOldValueAvailable());
    }

    /**
     * @return Test keys.
     * @throws Exception If failed.
     */
    protected Collection<Integer> keys() throws Exception {
        IgniteCache<Integer, Object> cache = jcache(0);

        ArrayList<Integer> keys = new ArrayList<>();

        keys.add(primaryKeys(cache, 1, lastKey).get(0));

        if (gridCount() > 1) {
            keys.add(backupKeys(cache, 1, lastKey).get(0));

            if (cache.getConfiguration(CacheConfiguration.class).getCacheMode() != REPLICATED)
                keys.add(nearKeys(cache, 1, lastKey).get(0));
        }

        lastKey = Collections.max(keys) + 1;

        return keys;
    }

    /**
     * @return Value for configuration property {@link CacheConfiguration#isEagerTtl()}.
     */
    protected boolean eagerTtl() {
        return true;
    }

    /** {@inheritDoc} */
    @Override protected void afterTestsStopped() throws Exception {
        super.afterTestsStopped();

        evts = null;

        evtsLatch = null;
    }

    /**
     * @param evt Event.
     */
    private static void onEvent(CacheEntryEvent<?, ?> evt) {
        // System.out.println("Received event [evt=" + evt + ", thread=" + Thread.currentThread().getName() + ']');

        assertNotNull(evt);
        assertNotNull(evt.getSource());
        assertNotNull(evt.getEventType());
        assertNotNull(evt.getKey());

        evts.add(evt);

        evtsLatch.countDown();
    }

    /**
     *
     */
    private static class CreateUpdateRemoveExpireListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
        /** {@inheritDoc} */
        @Override public CacheEntryListener<Object, Object> create() {
            return new CreateUpdateRemoveExpireListener();
        }
    }

    /**
     *
     */
    private static class NoOpCreateUpdateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
        /** {@inheritDoc} */
        @Override public CacheEntryListener<Object, Object> create() {
            return new NoOpCreateUpdateListener();
        }
    }

    /**
     *
     */
    private static class CreateUpdateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
        /** {@inheritDoc} */
        @Override public CacheEntryListener<Object, Object> create() {
            return new CreateUpdateListener();
        }
    }

    /**
     *
     */
    private static class CreateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
        /** {@inheritDoc} */
        @Override public CacheEntryListener<Object, Object> create() {
            return new CreateListener();
        }
    }

    /**
     *
     */
    private static class RemoveListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
        /** {@inheritDoc} */
        @Override public CacheEntryListener<Object, Object> create() {
            return new RemoveListener();
        }
    }

    /**
     *
     */
    private static class UpdateListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
        /** {@inheritDoc} */
        @Override public CacheEntryListener<Object, Object> create() {
            return new UpdateListener();
        }
    }

    /**
     *
     */
    private static class ExpireListenerFactory implements Factory<CacheEntryListener<Object, Object>> {
        /** {@inheritDoc} */
        @Override public CacheEntryListener<Object, Object> create() {
            return new ExpireListener();
        }
    }

    /**
     *
     */
    private static class TestFilterFactory implements Factory<CacheEntryEventFilter<Object, Object>> {
        /** {@inheritDoc} */
        @Override public CacheEntryEventFilter<Object, Object> create() {
            return new TestFilter();
        }
    }

    /**
     *
     */
    private static class CreateListener implements CacheEntryCreatedListener<Object, Object> {
        /** {@inheritDoc} */
        @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
            for (CacheEntryEvent<?, ?> evt : evts)
                onEvent(evt);
        }
    }

    /**
     *
     */
    private static class UpdateListener implements CacheEntryUpdatedListener<Object, Object> {
        /** {@inheritDoc} */
        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
            for (CacheEntryEvent<?, ?> evt : evts)
                onEvent(evt);
        }
    }

    /**
     *
     */
    private static class RemoveListener implements CacheEntryRemovedListener<Object, Object> {
        /** {@inheritDoc} */
        @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
            for (CacheEntryEvent<?, ?> evt : evts)
                onEvent(evt);
        }
    }

    /**
     *
     */
    private static class ExpireListener implements CacheEntryExpiredListener<Object, Object> {
        /** {@inheritDoc} */
        @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) {
            for (CacheEntryEvent<?, ?> evt : evts)
                onEvent(evt);
        }
    }

    /**
     *
     */
    private static class TestFilter implements CacheEntryEventFilter<Object, Object>, Externalizable {
        /** {@inheritDoc} */
        @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
            assert evt != null;
            assert evt.getSource() != null : evt;
            assert evt.getEventType() != null : evt;
            assert evt.getKey() != null : evt;

            Integer key;

            if (evt.getKey() instanceof ListenerTestKey)
                key = ((ListenerTestKey)evt.getKey()).key;
            else
                key = (Integer)evt.getKey();

            return key % 2 == 0;
        }

        /** {@inheritDoc} */
        @Override public void writeExternal(ObjectOutput out) throws IOException {
            throw new UnsupportedOperationException("Filter must not be marshaled.");
        }

        /** {@inheritDoc} */
        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            throw new UnsupportedOperationException("Filter must not be unmarshaled.");
        }
    }

    /**
     *
     */
    private static class CreateUpdateListener implements CacheEntryCreatedListener<Object, Object>,
        CacheEntryUpdatedListener<Object, Object> {
        /** {@inheritDoc} */
        @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
            for (CacheEntryEvent<?, ?> evt : evts)
                onEvent(evt);
        }

        /** {@inheritDoc} */
        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
            for (CacheEntryEvent<?, ?> evt : evts)
                onEvent(evt);
        }
    }

    /**
     *
     */
    private static class NoOpCreateUpdateListener implements CacheEntryCreatedListener<Object, Object>,
        CacheEntryUpdatedListener<Object, Object> {
        /** {@inheritDoc} */
        @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
            for (CacheEntryEvent<?, ?> evt : evts) {
                assertNotNull(evt);
                assertNotNull(evt.getSource());
                assertNotNull(evt.getEventType());
                assertNotNull(evt.getKey());
            }
        }

        /** {@inheritDoc} */
        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
            for (CacheEntryEvent<?, ?> evt : evts) {
                assertNotNull(evt);
                assertNotNull(evt.getSource());
                assertNotNull(evt.getEventType());
                assertNotNull(evt.getKey());
            }
        }
    }

    /**
     *
     */
    private static class CreateUpdateRemoveExpireListener extends CreateUpdateListener
        implements CacheEntryRemovedListener<Object, Object>, CacheEntryExpiredListener<Object, Object> {
        /** {@inheritDoc} */
        @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
            for (CacheEntryEvent<?, ?> evt : evts)
                onEvent(evt);
        }

        /** {@inheritDoc} */
        @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) {
            for (CacheEntryEvent<?, ?> evt : evts)
                onEvent(evt);
        }
    }

    /**
     *
     */
    private static class ExceptionFilter implements CacheEntryEventSerializableFilter<Object, Object> {
        /** {@inheritDoc} */
        @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) {
            throw new RuntimeException("Test filter error.");
        }
    }

    /**
     *
     */
    private static class ExceptionListener extends CreateUpdateListener
        implements CacheEntryRemovedListener<Object, Object>, CacheEntryExpiredListener<Object, Object> {
        /** {@inheritDoc} */
        @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) {
            error();
        }

        /** {@inheritDoc} */
        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
            error();
        }

        /** {@inheritDoc} */
        @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) {
            error();
        }

        /** {@inheritDoc} */
        @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) {
            error();
        }

        /**
         * Throws exception.
         */
        private void error() {
            throw new RuntimeException("Test listener error.");
        }
    }

    /**
     *
     */
    protected static class EntryToStringProcessor implements EntryProcessor<Object, Object, String> {
        /** {@inheritDoc} */
        @Override public String process(MutableEntry<Object, Object> e, Object... args) {
            if (e.getValue() instanceof ListenerTestValue)
                return String.valueOf(((ListenerTestValue)e.getValue()).val1);

            return String.valueOf(e.getValue());
        }

        /** {@inheritDoc} */
        @Override public String toString() {
            return S.toString(EntryToStringProcessor.class, this);
        }
    }

    /**
     *
     */
    protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, String> {
        /** */
        private Object val;

        /**
         * @param val Value to set.
         */
        public EntrySetValueProcessor(Object val) {
            this.val = val;
        }

        /** {@inheritDoc} */
        @Override public String process(MutableEntry<Object, Object> e, Object... args)
            throws EntryProcessorException {
            e.setValue(val);

            return null;
        }

        /** {@inheritDoc} */
        @Override public String toString() {
            return S.toString(EntrySetValueProcessor.class, this);
        }
    }

    /**
     *
     */
    public static class SerializableFactory implements Factory<NonSerializableFilter> {
        /** {@inheritDoc} */
        @Override public NonSerializableFilter create() {
            return new NonSerializableFilter();
        }
    }

    /**
     *
     */
    public static class NonSerializableFilter implements CacheEntryEventFilter<Object, Object>, Externalizable {
        /** {@inheritDoc} */
        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            serialized.set(true);
        }

        /** {@inheritDoc} */
        @Override public void writeExternal(ObjectOutput out) throws IOException {
            serialized.set(true);
        }

        /** {@inheritDoc} */
        @Override public boolean evaluate(CacheEntryEvent<?, ?> event) throws CacheEntryListenerException {
            return true;
        }
    }

    /**
     */
    public static class NonSerializableListener implements CacheEntryCreatedListener<Object, Object>, Externalizable {
        /** */
        private final AtomicBoolean serialized;

        /**
         * @param serialized Serialized flag.
         */
        public NonSerializableListener(AtomicBoolean serialized) {
            this.serialized = serialized;
        }

        /** {@inheritDoc} */
        @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts)
            throws CacheEntryListenerException {
            // No-op.
        }

        /** {@inheritDoc} */
        @Override public void writeExternal(ObjectOutput out) throws IOException {
            serialized.set(true);
        }

        /** {@inheritDoc} */
        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            // No-op.
        }
    }

    /**
     *
     */
    static class ListenerTestKey implements Serializable {
        /** */
        private final Integer key;

        /**
         * @param key Key.
         */
        public ListenerTestKey(Integer key) {
            this.key = key;
        }

        /** {@inheritDoc} */
        @Override public boolean equals(Object o) {
            if (this == o)
                return true;

            if (o == null || getClass() != o.getClass())
                return false;

            ListenerTestKey that = (ListenerTestKey)o;

            return key.equals(that.key);
        }

        /** {@inheritDoc} */
        @Override public int hashCode() {
            return key.hashCode();
        }

        /** {@inheritDoc} */
        @Override public String toString() {
            return S.toString(ListenerTestKey.class, this);
        }
    }

    /**
     *
     */
    static class ListenerTestValue implements Serializable {
        /** */
        private final Integer val1;

        /** */
        private final String val2;

        /**
         * @param val Value.
         */
        public ListenerTestValue(Integer val) {
            this.val1 = val;
            this.val2 = String.valueOf(val);
        }

        /** {@inheritDoc} */
        @Override public boolean equals(Object o) {
            if (this == o)
                return true;

            if (o == null || getClass() != o.getClass())
                return false;

            ListenerTestValue that = (ListenerTestValue) o;

            return val1.equals(that.val1) && val2.equals(that.val2);
        }

        /** {@inheritDoc} */
        @Override public int hashCode() {
            int res = val1.hashCode();

            res = 31 * res + val2.hashCode();

            return res;
        }

        /** {@inheritDoc} */
        @Override public String toString() {
            return S.toString(ListenerTestValue.class, this);
        }
    }

    /**
     *
     */
    static class ExceptionFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object, Object>> {
        /** {@inheritDoc} */
        @Override public CacheEntryEventSerializableFilter<Object, Object> create() {
            return new ExceptionFilter();
        }
    }
}
