| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.query.continuous; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.CyclicBarrier; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import javax.cache.Cache; |
| import javax.cache.CacheException; |
| import javax.cache.event.CacheEntryEvent; |
| import javax.cache.event.CacheEntryListenerException; |
| import javax.cache.event.CacheEntryUpdatedListener; |
| import javax.cache.expiry.Duration; |
| import javax.cache.expiry.ExpiryPolicy; |
| import javax.cache.expiry.TouchedExpiryPolicy; |
| import javax.cache.processor.EntryProcessorException; |
| import javax.cache.processor.MutableEntry; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.Ignition; |
| import org.apache.ignite.cache.CacheAtomicityMode; |
| import org.apache.ignite.cache.CacheEntryEventSerializableFilter; |
| import org.apache.ignite.cache.CacheEntryProcessor; |
| import org.apache.ignite.cache.CacheMode; |
| import org.apache.ignite.cache.CacheRebalanceMode; |
| import org.apache.ignite.cache.CacheWriteSynchronizationMode; |
| import org.apache.ignite.cache.affinity.Affinity; |
| import org.apache.ignite.cache.query.ContinuousQuery; |
| import org.apache.ignite.cache.query.QueryCursor; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.cluster.ClusterTopologyException; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.configuration.NearCacheConfiguration; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.IgniteKernal; |
| import org.apache.ignite.internal.managers.communication.GridIoMessage; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; |
| import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; |
| import org.apache.ignite.internal.processors.continuous.GridContinuousMessage; |
| import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; |
| import org.apache.ignite.internal.util.GridConcurrentHashSet; |
| import org.apache.ignite.internal.util.lang.GridAbsPredicate; |
| import org.apache.ignite.internal.util.typedef.C1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.PA; |
| import org.apache.ignite.internal.util.typedef.PAX; |
| import org.apache.ignite.internal.util.typedef.T2; |
| import org.apache.ignite.internal.util.typedef.T3; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteAsyncCallback; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.lang.IgniteOutClosure; |
| import org.apache.ignite.logger.NullLogger; |
| import org.apache.ignite.plugin.extensions.communication.Message; |
| import org.apache.ignite.resources.LoggerResource; |
| import org.apache.ignite.spi.IgniteSpiException; |
| import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; |
| import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; |
| import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.apache.ignite.transactions.Transaction; |
| import org.apache.ignite.transactions.TransactionRollbackException; |
| import org.apache.ignite.transactions.TransactionSerializationException; |
| import org.junit.Test; |
| |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static java.util.concurrent.TimeUnit.MINUTES; |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| import static org.apache.ignite.cache.CacheMode.REPLICATED; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; |
| import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; |
| import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; |
| import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; |
| |
| /** |
| * |
| */ |
| public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridCommonAbstractTest { |
| /** */ |
| private static final int BACKUP_ACK_THRESHOLD = 100; |
| |
| /** */ |
| private static volatile boolean err; |
| |
| /** */ |
| private int backups = 1; |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); |
| |
| ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true); |
| |
| TestCommunicationSpi commSpi = new TestCommunicationSpi(); |
| |
| commSpi.setSharedMemoryPort(-1); |
| commSpi.setIdleConnectionTimeout(100); |
| |
| cfg.setCommunicationSpi(commSpi); |
| |
| MemoryEventStorageSpi evtSpi = new MemoryEventStorageSpi(); |
| evtSpi.setExpireCount(50); |
| |
| cfg.setEventStorageSpi(evtSpi); |
| |
| CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME); |
| |
| ccfg.setCacheMode(cacheMode()); |
| ccfg.setAtomicityMode(atomicityMode()); |
| ccfg.setBackups(backups); |
| ccfg.setWriteSynchronizationMode(FULL_SYNC); |
| ccfg.setNearConfiguration(nearCacheConfiguration()); |
| |
| cfg.setCacheConfiguration(ccfg); |
| |
| return cfg; |
| } |
| |
| /** |
| * @return Async callback flag. |
| */ |
| protected boolean asyncCallback() { |
| return false; |
| } |
| |
| /** |
| * @return Near cache configuration. |
| */ |
| protected NearCacheConfiguration nearCacheConfiguration() { |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected long getTestTimeout() { |
| return 8 * 60_000; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| super.beforeTest(); |
| |
| err = false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| super.afterTest(); |
| |
| stopAllGrids(); |
| } |
| |
| /** |
| * @return Cache mode. |
| */ |
| protected abstract CacheMode cacheMode(); |
| |
| /** |
| * @return Atomicity mode. |
| */ |
| protected abstract CacheAtomicityMode atomicityMode(); |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testFirstFilteredEvent() throws Exception { |
| this.backups = 2; |
| |
| final int SRV_NODES = 4; |
| |
| startGridsMultiThreaded(SRV_NODES); |
| |
| Ignite qryClient = startClientGrid(SRV_NODES); |
| |
| IgniteCache<Object, Object> qryClnCache = qryClient.cache(DEFAULT_CACHE_NAME); |
| |
| final CacheEventListener3 lsnr = new CacheEventListener3(); |
| |
| ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); |
| |
| qry.setLocalListener(lsnr); |
| |
| qry.setRemoteFilter(new CacheEventFilter()); |
| |
| try (QueryCursor<?> cur = qryClnCache.query(qry)) { |
| List<Integer> keys = testKeys(grid(0).cache(DEFAULT_CACHE_NAME), 1); |
| |
| for (Integer key : keys) |
| qryClnCache.put(key, -1); |
| |
| qryClnCache.put(keys.get(0), 100); |
| |
| GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return lsnr.evts.size() == 1; |
| } |
| }, 5000); |
| |
| assertEquals(lsnr.evts.size(), 1); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testRebalanceVersion() throws Exception { |
| IgniteEx ignite0 = startGrid(0); |
| |
| int minorVer = ignite0.configuration().isLateAffinityAssignment() ? 1 : 0; |
| |
| boolean replicated = ignite0.context().cache().context().cacheContext(CU.cacheId(DEFAULT_CACHE_NAME)).isReplicated(); |
| |
| GridDhtPartitionTopology top0 = ignite0.context().cache().context().cacheContext(CU.cacheId(DEFAULT_CACHE_NAME)).topology(); |
| |
| assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(1))); |
| assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(2))); |
| |
| Ignite ignite1 = startGrid(1); |
| GridDhtPartitionTopology top1 = ((IgniteKernal)ignite1).context().cache().context().cacheContext(CU.cacheId(DEFAULT_CACHE_NAME)).topology(); |
| |
| waitRebalanceFinished(ignite0, 2, minorVer); |
| waitRebalanceFinished(ignite1, 2, minorVer); |
| |
| assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(3))); |
| assertFalse(top1.rebalanceFinished(new AffinityTopologyVersion(3))); |
| |
| Ignite ignite2 = startGrid(2); |
| GridDhtPartitionTopology top2 = ((IgniteKernal)ignite2).context().cache().context().cacheContext(CU.cacheId(DEFAULT_CACHE_NAME)).topology(); |
| |
| waitRebalanceFinished(ignite0, 3, minorVer); |
| waitRebalanceFinished(ignite1, 3, minorVer); |
| waitRebalanceFinished(ignite2, 3, minorVer); |
| |
| assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(4))); |
| assertFalse(top1.rebalanceFinished(new AffinityTopologyVersion(4))); |
| assertFalse(top2.rebalanceFinished(new AffinityTopologyVersion(4))); |
| |
| Ignite ignite3 = startClientGrid(3); |
| GridDhtPartitionTopology top3 = ((IgniteKernal)ignite3).context().cache().context().cacheContext(CU.cacheId(DEFAULT_CACHE_NAME)).topology(); |
| |
| assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(4))); |
| assertTrue(top1.rebalanceFinished(new AffinityTopologyVersion(4))); |
| assertTrue(top2.rebalanceFinished(new AffinityTopologyVersion(4))); |
| assertTrue(top3.rebalanceFinished(new AffinityTopologyVersion(4))); |
| |
| stopGrid(1); |
| |
| waitRebalanceFinished(ignite0, 5, replicated ? 0 : minorVer); |
| waitRebalanceFinished(ignite2, 5, replicated ? 0 : minorVer); |
| waitRebalanceFinished(ignite3, 5, replicated ? 0 : minorVer); |
| } |
| |
| /** |
| * Test that during rebalancing correct old value passed to continuous query. |
| * |
| * @throws Exception If fail. |
| */ |
| @Test |
| public void testRebalance() throws Exception { |
| for (int iter = 0; iter < 5; iter++) { |
| log.info("Iteration: " + iter); |
| |
| final IgniteEx ignite = startGrid(1); |
| |
| final CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>("testCache"); |
| |
| ccfg.setAtomicityMode(atomicityMode()); |
| ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); |
| ccfg.setCacheMode(cacheMode()); |
| ccfg.setRebalanceMode(CacheRebalanceMode.SYNC); |
| ccfg.setBackups(2); |
| |
| final IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(ccfg); |
| |
| final int KEYS = 10_000; |
| |
| for (int i = 0; i < KEYS; i++) |
| cache.put(i, i); |
| |
| final ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); |
| |
| final AtomicBoolean err = new AtomicBoolean(); |
| |
| final AtomicInteger cntr = new AtomicInteger(); |
| |
| qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { |
| @Override public void onUpdated( |
| final Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> cacheEntryEvts) { |
| try { |
| for (final CacheEntryEvent<? extends Integer, ? extends Integer> evt : cacheEntryEvts) { |
| final Integer oldVal = evt.getOldValue(); |
| |
| final Integer val = evt.getValue(); |
| |
| assertNotNull("No old value: " + evt, oldVal); |
| assertEquals("Unexpected old value: " + evt, (Integer)(oldVal + 1), val); |
| |
| cntr.incrementAndGet(); |
| } |
| } |
| catch (Throwable e) { |
| err.set(true); |
| |
| error("Unexpected error: " + e, e); |
| } |
| } |
| }); |
| |
| final QueryCursor<Cache.Entry<Integer, Integer>> cur = cache.query(qry); |
| |
| final CountDownLatch latch = new CountDownLatch(1); |
| |
| final IgniteInternalFuture<Object> updFut = GridTestUtils.runAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| latch.await(); |
| |
| for (int i = 0; i < KEYS && !err.get(); i++) |
| cache.put(i, i + 1); |
| |
| return null; |
| } |
| }); |
| |
| final IgniteInternalFuture<Object> rebFut = GridTestUtils.runAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| latch.await(); |
| |
| for (int i = 2; i <= 5 && !err.get(); i++) |
| startGrid(i); |
| |
| return null; |
| } |
| }); |
| |
| latch.countDown(); |
| |
| updFut.get(); |
| rebFut.get(); |
| |
| assertFalse("Unexpected error during test", err.get()); |
| |
| assertTrue(cntr.get() > 0); |
| |
| cur.close(); |
| |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @param ignite Ignite. |
| * @param topVer Major topology version. |
| * @param minorVer Minor topology version. |
| * @throws Exception If failed. |
| */ |
| private void waitRebalanceFinished(Ignite ignite, long topVer, int minorVer) throws Exception { |
| final AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer, minorVer); |
| |
| final GridDhtPartitionTopology top = |
| ((IgniteKernal)ignite).context().cache().context().cacheContext(CU.cacheId(DEFAULT_CACHE_NAME)).topology(); |
| |
| GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return top.rebalanceFinished(topVer0); |
| } |
| }, 5000); |
| |
| assertTrue(top.rebalanceFinished(topVer0)); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testOneBackup() throws Exception { |
| checkBackupQueue(1, false); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testOneBackupClientUpdate() throws Exception { |
| checkBackupQueue(1, true); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testUpdatePartitionCounter() throws Exception { |
| this.backups = 2; |
| |
| final int SRV_NODES = 4; |
| |
| startGridsMultiThreaded(SRV_NODES); |
| |
| final Ignite qryClient = startClientGrid(SRV_NODES); |
| |
| Map<Integer, Long> updateCntrs = new HashMap<>(); |
| |
| ThreadLocalRandom rnd = ThreadLocalRandom.current(); |
| |
| int killedNode = rnd.nextInt(SRV_NODES); |
| |
| for (int i = 0; i < 10; i++) { |
| List<Integer> keys = testKeys(grid(0).cache(DEFAULT_CACHE_NAME), 10); |
| |
| for (Integer key : keys) { |
| IgniteCache<Object, Object> cache = null; |
| |
| if (rnd.nextBoolean()) |
| cache = qryClient.cache(DEFAULT_CACHE_NAME); |
| else { |
| for (int j = 0; j < 1000; j++) { |
| int nodeIdx = rnd.nextInt(SRV_NODES); |
| |
| if (killedNode != nodeIdx) { |
| cache = grid(nodeIdx).cache(DEFAULT_CACHE_NAME); |
| |
| break; |
| } |
| } |
| |
| if (cache == null) |
| throw new Exception("Failed to find a server node."); |
| } |
| |
| cache.put(key, key); |
| |
| int part = qryClient.affinity(DEFAULT_CACHE_NAME).partition(key); |
| |
| Long cntr = updateCntrs.get(part); |
| |
| if (cntr == null) |
| cntr = 0L; |
| |
| updateCntrs.put(part, ++cntr); |
| } |
| |
| checkPartCounter(SRV_NODES, killedNode, updateCntrs); |
| |
| stopGrid(killedNode); |
| |
| awaitPartitionMapExchange(); |
| |
| checkPartCounter(SRV_NODES, killedNode, updateCntrs); |
| |
| startGrid(killedNode); |
| |
| awaitPartitionMapExchange(); |
| |
| checkPartCounter(SRV_NODES, killedNode, updateCntrs); |
| |
| killedNode = rnd.nextInt(SRV_NODES); |
| } |
| } |
| |
| /** |
| * @param nodes Count nodes. |
| * @param killedNodeIdx Killed node index. |
| * @param updCntrs Update counters. |
| */ |
| private void checkPartCounter(int nodes, int killedNodeIdx, Map<Integer, Long> updCntrs) { |
| for (int i = 0; i < nodes; i++) { |
| if (i == killedNodeIdx) |
| continue; |
| |
| Affinity<Object> aff = grid(i).affinity(DEFAULT_CACHE_NAME); |
| |
| CachePartitionPartialCountersMap act = grid(i).cachex(DEFAULT_CACHE_NAME).context().topology() |
| .localUpdateCounters(false); |
| |
| for (Map.Entry<Integer, Long> e : updCntrs.entrySet()) { |
| if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode())) { |
| int partIdx = act.partitionIndex(e.getKey()); |
| |
| assertEquals(e.getValue(), (Long)act.updateCounterAt(partIdx)); |
| } |
| } |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testStartStopQuery() throws Exception { |
| this.backups = 1; |
| |
| final int SRV_NODES = 3; |
| |
| startGridsMultiThreaded(SRV_NODES); |
| |
| final Ignite qryClient = startClientGrid(SRV_NODES); |
| |
| IgniteCache<Object, Object> clnCache = qryClient.cache(DEFAULT_CACHE_NAME); |
| |
| IgniteOutClosure<IgniteCache<Integer, Integer>> rndCache = |
| new IgniteOutClosure<IgniteCache<Integer, Integer>>() { |
| int cnt = 0; |
| |
| @Override public IgniteCache<Integer, Integer> apply() { |
| ++cnt; |
| |
| return grid(cnt % SRV_NODES + 1).cache(DEFAULT_CACHE_NAME); |
| } |
| }; |
| |
| Ignite igniteSrv = ignite(0); |
| |
| IgniteCache<Object, Object> srvCache = igniteSrv.cache(DEFAULT_CACHE_NAME); |
| |
| List<Integer> keys = testKeys(srvCache, 3); |
| |
| int keyCnt = keys.size(); |
| |
| for (int j = 0; j < 50; ++j) { |
| ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); |
| |
| final CacheEventListener3 lsnr = asyncCallback() ? new CacheEventAsyncListener3() |
| : new CacheEventListener3(); |
| |
| qry.setLocalListener(lsnr); |
| |
| qry.setRemoteFilter(lsnr); |
| |
| int keyIter = 0; |
| |
| for (; keyIter < keyCnt / 2; keyIter++) { |
| int key = keys.get(keyIter); |
| |
| rndCache.apply().put(key, key); |
| } |
| |
| assert lsnr.evts.isEmpty(); |
| |
| QueryCursor<Cache.Entry<Object, Object>> qryCur = clnCache.query(qry); |
| |
| Map<Object, T2<Object, Object>> updates = new HashMap<>(); |
| |
| final List<T3<Object, Object, Object>> expEvts = new ArrayList<>(); |
| |
| Affinity<Object> aff = affinity(srvCache); |
| |
| boolean filtered = false; |
| |
| for (; keyIter < keys.size(); keyIter++) { |
| int key = keys.get(keyIter); |
| |
| int val = filtered ? 1 : 2; |
| |
| log.info("Put [key=" + key + ", val=" + val + ", part=" + aff.partition(key) + ']'); |
| |
| T2<Object, Object> t = updates.get(key); |
| |
| if (t == null) { |
| // Check filtered. |
| if (!filtered) { |
| updates.put(key, new T2<>((Object)val, null)); |
| |
| expEvts.add(new T3<>((Object)key, (Object)val, null)); |
| } |
| } |
| else { |
| // Check filtered. |
| if (!filtered) { |
| updates.put(key, new T2<>((Object)val, (Object)t.get1())); |
| |
| expEvts.add(new T3<>((Object)key, (Object)val, (Object)t.get1())); |
| } |
| } |
| |
| rndCache.apply().put(key, val); |
| |
| filtered = !filtered; |
| } |
| |
| checkEvents(expEvts, lsnr, false); |
| |
| qryCur.close(); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testLeftPrimaryAndBackupNodes() throws Exception { |
| if (cacheMode() == REPLICATED) |
| return; |
| |
| this.backups = 1; |
| |
| final int SRV_NODES = 3; |
| |
| startGridsMultiThreaded(SRV_NODES); |
| |
| final Ignite qryClient = startClientGrid(SRV_NODES); |
| |
| ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); |
| |
| final CacheEventListener3 lsnr = asyncCallback() ? new CacheEventAsyncListener3() : new CacheEventListener3(); |
| |
| qry.setLocalListener(lsnr); |
| |
| qry.setRemoteFilter(lsnr); |
| |
| IgniteCache<Object, Object> clnCache = qryClient.cache(DEFAULT_CACHE_NAME); |
| |
| QueryCursor<Cache.Entry<Object, Object>> qryCur = clnCache.query(qry); |
| |
| Ignite igniteSrv = ignite(0); |
| |
| IgniteCache<Object, Object> srvCache = igniteSrv.cache(DEFAULT_CACHE_NAME); |
| |
| Affinity<Object> aff = affinity(srvCache); |
| |
| List<Integer> keys = testKeys(srvCache, 1); |
| |
| Collection<ClusterNode> nodes = aff.mapPartitionToPrimaryAndBackups(keys.get(0)); |
| |
| Collection<UUID> ids = F.transform(nodes, new C1<ClusterNode, UUID>() { |
| @Override public UUID apply(ClusterNode node) { |
| return node.id(); |
| } |
| }); |
| |
| int keyIter = 0; |
| |
| boolean filtered = false; |
| |
| Map<Object, T2<Object, Object>> updates = new HashMap<>(); |
| |
| final List<T3<Object, Object, Object>> expEvts = new ArrayList<>(); |
| |
| for (; keyIter < keys.size() / 2; keyIter++) { |
| int key = keys.get(keyIter); |
| |
| log.info("Put [key=" + key + ", part=" + aff.partition(key) |
| + ", filtered=" + filtered + ']'); |
| |
| T2<Object, Object> t = updates.get(key); |
| |
| Integer val = filtered ? |
| (key % 2 == 0 ? key + 1 : key) : |
| key * 2; |
| |
| if (t == null) { |
| updates.put(key, new T2<>((Object)val, null)); |
| |
| if (!filtered) |
| expEvts.add(new T3<>((Object)key, (Object)val, null)); |
| } |
| else { |
| updates.put(key, new T2<>((Object)val, (Object)key)); |
| |
| if (!filtered) |
| expEvts.add(new T3<>((Object)key, (Object)val, (Object)key)); |
| } |
| |
| srvCache.put(key, val); |
| |
| filtered = !filtered; |
| } |
| |
| checkEvents(expEvts, lsnr, false); |
| |
| List<Thread> stopThreads = new ArrayList<>(3); |
| |
| // Stop nodes which owning this partition. |
| for (int i = 0; i < SRV_NODES; i++) { |
| Ignite ignite = ignite(i); |
| |
| if (ids.contains(ignite.cluster().localNode().id())) { |
| final int i0 = i; |
| |
| TestCommunicationSpi spi = (TestCommunicationSpi)ignite.configuration().getCommunicationSpi(); |
| |
| spi.skipAllMsg = true; |
| |
| stopThreads.add(new Thread() { |
| @Override public void run() { |
| stopGrid(i0, true); |
| } |
| }); |
| } |
| } |
| |
| // Stop and join threads. |
| for (Thread t : stopThreads) |
| t.start(); |
| |
| for (Thread t : stopThreads) |
| t.join(); |
| |
| assert GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| // (SRV_NODES + 1 client node) - 1 primary - backup nodes. |
| return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /* client node */) |
| - 1 /* Primary node */ - backups; |
| } |
| }, 5000L); |
| |
| awaitPartitionMapExchange(); |
| |
| for (; keyIter < keys.size(); keyIter++) { |
| int key = keys.get(keyIter); |
| |
| log.info("Put [key=" + key + ", filtered=" + filtered + ']'); |
| |
| T2<Object, Object> t = updates.get(key); |
| |
| Integer val = filtered ? |
| (key % 2 == 0 ? key + 1 : key) : |
| key * 2; |
| |
| if (t == null) { |
| updates.put(key, new T2<>((Object)val, null)); |
| |
| if (!filtered) |
| expEvts.add(new T3<>((Object)key, (Object)val, null)); |
| } |
| else { |
| updates.put(key, new T2<>((Object)val, (Object)key)); |
| |
| if (!filtered) |
| expEvts.add(new T3<>((Object)key, (Object)val, (Object)key)); |
| } |
| |
| boolean updated = false; |
| |
| while (!updated) { |
| try { |
| clnCache.put(key, val); |
| |
| updated = true; |
| } |
| catch (Exception ignore) { |
| assertEquals(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, atomicityMode()); |
| } |
| } |
| |
| filtered = !filtered; |
| } |
| |
| checkEvents(expEvts, lsnr, false); |
| |
| qryCur.close(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testRemoteFilter() throws Exception { |
| this.backups = 2; |
| |
| final int SRV_NODES = 4; |
| |
| startGridsMultiThreaded(SRV_NODES); |
| |
| Ignite qryClient = startClientGrid(SRV_NODES); |
| |
| IgniteCache<Object, Object> qryClientCache = qryClient.cache(DEFAULT_CACHE_NAME); |
| |
| if (cacheMode() != REPLICATED) |
| assertEquals(backups, qryClientCache.getConfiguration(CacheConfiguration.class).getBackups()); |
| |
| Affinity<Object> aff = qryClient.affinity(DEFAULT_CACHE_NAME); |
| |
| ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); |
| |
| final CacheEventListener3 lsnr = asyncCallback() ? new CacheEventAsyncListener3() : new CacheEventListener3(); |
| |
| qry.setLocalListener(lsnr); |
| |
| qry.setRemoteFilter(lsnr); |
| |
| int PARTS = 10; |
| |
| QueryCursor<?> cur = qryClientCache.query(qry); |
| |
| Map<Object, T2<Object, Object>> updates = new HashMap<>(); |
| |
| final List<T3<Object, Object, Object>> expEvts = new ArrayList<>(); |
| |
| for (int i = 0; i < (atomicityMode() == CacheAtomicityMode.ATOMIC ? SRV_NODES - 1 : SRV_NODES - 2); i++) { |
| log.info("Stop iteration: " + i); |
| |
| TestCommunicationSpi spi = (TestCommunicationSpi)ignite(i).configuration().getCommunicationSpi(); |
| |
| Ignite ignite = ignite(i); |
| |
| IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME); |
| |
| List<Integer> keys = testKeys(cache, PARTS); |
| |
| boolean first = true; |
| |
| boolean filtered = false; |
| |
| for (Integer key : keys) { |
| log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key) |
| + ", filtered=" + filtered + ']'); |
| |
| T2<Object, Object> t = updates.get(key); |
| |
| Integer val = filtered ? |
| (key % 2 == 0 ? key + 1 : key) : |
| key * 2; |
| |
| if (t == null) { |
| updates.put(key, new T2<>((Object)val, null)); |
| |
| if (!filtered) |
| expEvts.add(new T3<>((Object)key, (Object)val, null)); |
| } |
| else { |
| updates.put(key, new T2<>((Object)val, (Object)key)); |
| |
| if (!filtered) |
| expEvts.add(new T3<>((Object)key, (Object)val, (Object)key)); |
| } |
| |
| cache.put(key, val); |
| |
| if (first) { |
| spi.skipMsg = true; |
| |
| first = false; |
| } |
| |
| filtered = !filtered; |
| } |
| |
| stopGrid(i); |
| |
| boolean check = GridTestUtils.waitForCondition(new PAX() { |
| @Override public boolean applyx() throws IgniteCheckedException { |
| return expEvts.size() == lsnr.keys.size(); |
| } |
| }, 5000L); |
| |
| if (!check) { |
| Set<Integer> keys0 = new HashSet<>(keys); |
| |
| keys0.removeAll(lsnr.keys); |
| |
| log.info("Missed events for keys: " + keys0); |
| |
| fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + keys0.size() + ']'); |
| } |
| |
| checkEvents(expEvts, lsnr, false); |
| } |
| |
| cur.close(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testThreeBackups() throws Exception { |
| if (cacheMode() == REPLICATED) |
| return; |
| |
| checkBackupQueue(3, false); |
| } |
| |
| /** |
| * @param backups Number of backups. |
| * @param updateFromClient If {@code true} executes cache update from client node. |
| * @throws Exception If failed. |
| */ |
| private void checkBackupQueue(int backups, boolean updateFromClient) throws Exception { |
| this.backups = atomicityMode() == CacheAtomicityMode.ATOMIC ? backups : |
| backups < 2 ? 2 : backups; |
| |
| final int SRV_NODES = 4; |
| |
| startGridsMultiThreaded(SRV_NODES); |
| |
| Ignite qryClient = startClientGrid(SRV_NODES); |
| |
| IgniteCache<Object, Object> qryClientCache = qryClient.cache(DEFAULT_CACHE_NAME); |
| |
| Affinity<Object> aff = qryClient.affinity(DEFAULT_CACHE_NAME); |
| |
| CacheEventListener1 lsnr = asyncCallback() ? new CacheEventAsyncListener1(false) |
| : new CacheEventListener1(false); |
| |
| ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); |
| |
| qry.setLocalListener(lsnr); |
| |
| QueryCursor<?> cur = qryClientCache.query(qry); |
| |
| int PARTS = 10; |
| |
| Map<Object, T2<Object, Object>> updates = new HashMap<>(); |
| |
| List<T3<Object, Object, Object>> expEvts = new ArrayList<>(); |
| |
| for (int i = 0; i < (atomicityMode() == CacheAtomicityMode.ATOMIC ? SRV_NODES - 1 : SRV_NODES - 2); i++) { |
| log.info("Stop iteration: " + i); |
| |
| TestCommunicationSpi spi = (TestCommunicationSpi)ignite(i).configuration().getCommunicationSpi(); |
| |
| Ignite ignite = ignite(i); |
| |
| IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME); |
| |
| List<Integer> keys = testKeys(cache, PARTS); |
| |
| CountDownLatch latch = new CountDownLatch(keys.size()); |
| |
| lsnr.latch = latch; |
| |
| boolean first = true; |
| |
| for (Integer key : keys) { |
| log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key) + ']'); |
| |
| T2<Object, Object> t = updates.get(key); |
| |
| if (updateFromClient) { |
| if (atomicityMode() != CacheAtomicityMode.ATOMIC) { |
| try (Transaction tx = qryClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { |
| qryClientCache.put(key, key); |
| |
| tx.commit(); |
| } |
| catch (CacheException | ClusterTopologyException ignored) { |
| log.warning("Failed put. [Key=" + key + ", val=" + key + "]"); |
| |
| continue; |
| } |
| } |
| else |
| qryClientCache.put(key, key); |
| } |
| else { |
| if (atomicityMode() != CacheAtomicityMode.ATOMIC) { |
| try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { |
| cache.put(key, key); |
| |
| tx.commit(); |
| } |
| catch (CacheException | ClusterTopologyException ignored) { |
| log.warning("Failed put. [Key=" + key + ", val=" + key + "]"); |
| |
| continue; |
| } |
| } |
| else |
| cache.put(key, key); |
| } |
| |
| if (t == null) { |
| updates.put(key, new T2<>((Object)key, null)); |
| |
| expEvts.add(new T3<>((Object)key, (Object)key, null)); |
| } |
| else { |
| updates.put(key, new T2<>((Object)key, (Object)key)); |
| |
| expEvts.add(new T3<>((Object)key, (Object)key, (Object)key)); |
| } |
| |
| if (first) { |
| spi.skipMsg = true; |
| |
| first = false; |
| } |
| } |
| |
| stopGrid(i); |
| |
| if (!latch.await(5, SECONDS)) { |
| Set<Integer> keys0 = new HashSet<>(keys); |
| |
| keys0.removeAll(lsnr.keys); |
| |
| log.info("Missed events for keys: " + keys0); |
| |
| fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); |
| } |
| |
| checkEvents(expEvts, lsnr); |
| } |
| |
| for (int i = 0; i < (atomicityMode() == CacheAtomicityMode.ATOMIC ? SRV_NODES - 1 : SRV_NODES - 2); i++) { |
| log.info("Start iteration: " + i); |
| |
| Ignite ignite = startGrid(i); |
| |
| IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME); |
| |
| List<Integer> keys = testKeys(cache, PARTS); |
| |
| CountDownLatch latch = new CountDownLatch(keys.size()); |
| |
| lsnr.latch = latch; |
| |
| for (Integer key : keys) { |
| log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key) + ']'); |
| |
| T2<Object, Object> t = updates.get(key); |
| |
| if (t == null) { |
| updates.put(key, new T2<>((Object)key, null)); |
| |
| expEvts.add(new T3<>((Object)key, (Object)key, null)); |
| } |
| else { |
| updates.put(key, new T2<>((Object)key, (Object)key)); |
| |
| expEvts.add(new T3<>((Object)key, (Object)key, (Object)key)); |
| } |
| |
| if (updateFromClient) |
| qryClientCache.put(key, key); |
| else |
| cache.put(key, key); |
| } |
| |
| if (!latch.await(10, SECONDS)) { |
| Set<Integer> keys0 = new HashSet<>(keys); |
| |
| keys0.removeAll(lsnr.keys); |
| |
| log.info("Missed events for keys: " + keys0); |
| |
| fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); |
| } |
| |
| checkEvents(expEvts, lsnr); |
| } |
| |
| cur.close(); |
| |
| assertFalse("Unexpected error during test, see log for details.", err); |
| } |
| |
| /** |
| * @param expEvts Expected events. |
| * @param lsnr Listener. |
| */ |
| private void checkEvents(List<T3<Object, Object, Object>> expEvts, CacheEventListener1 lsnr) { |
| for (T3<Object, Object, Object> exp : expEvts) { |
| CacheEntryEvent<?, ?> e = lsnr.evts.get(exp.get1()); |
| |
| assertNotNull("No event for key: " + exp.get1(), e); |
| assertEquals("Unexpected value: " + e, exp.get2(), e.getValue()); |
| } |
| |
| expEvts.clear(); |
| |
| lsnr.evts.clear(); |
| } |
| |
| /** |
| * @param expEvts Expected events. |
| * @param lsnr Listener. |
| * @param lostAllow If {@code true} than won't assert on lost events. |
| * @throws Exception If failed. |
| */ |
| private void checkEvents(final List<T3<Object, Object, Object>> expEvts, |
| final CacheEventListener2 lsnr, |
| boolean lostAllow) throws Exception { |
| checkEvents(expEvts, lsnr, lostAllow, true); |
| } |
| |
| /** |
| * @param expEvts Expected events. |
| * @param lsnr Listener. |
| * @param lostAllow If {@code true} than won't assert on lost events. |
| * @param wait Wait flag. |
| * @throws Exception If failed. |
| */ |
| private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr, |
| boolean lostAllow, boolean wait) throws Exception { |
| if (wait) { |
| GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| return expEvts.size() == lsnr.size(); |
| } |
| }, 10_000L); |
| } |
| |
| synchronized (lsnr) { |
| Map<Integer, List<CacheEntryEvent<?, ?>>> prevMap = new HashMap<>(lsnr.evts.size()); |
| |
| for (Map.Entry<Integer, List<CacheEntryEvent<?, ?>>> e : lsnr.evts.entrySet()) |
| prevMap.put(e.getKey(), new ArrayList<>(e.getValue())); |
| |
| List<T3<Object, Object, Object>> lostEvts = new ArrayList<>(); |
| |
| for (T3<Object, Object, Object> exp : expEvts) { |
| List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1()); |
| |
| if (F.eq(exp.get2(), exp.get3())) |
| continue; |
| |
| if (rcvdEvts == null || rcvdEvts.isEmpty()) { |
| lostEvts.add(exp); |
| |
| continue; |
| } |
| |
| Iterator<CacheEntryEvent<?, ?>> iter = rcvdEvts.iterator(); |
| |
| boolean found = false; |
| |
| while (iter.hasNext()) { |
| CacheEntryEvent<?, ?> e = iter.next(); |
| |
| if ((exp.get2() != null && e.getValue() != null && exp.get2().equals(e.getValue())) |
| && equalOldValue(e, exp)) { |
| found = true; |
| |
| iter.remove(); |
| |
| break; |
| } |
| } |
| |
| // Lost event is acceptable. |
| if (!found) |
| lostEvts.add(exp); |
| } |
| |
| boolean dup = false; |
| |
| // Check duplicate. |
| if (!lsnr.evts.isEmpty()) { |
| for (List<CacheEntryEvent<?, ?>> evts : lsnr.evts.values()) { |
| if (!evts.isEmpty()) { |
| for (CacheEntryEvent<?, ?> e : evts) { |
| boolean found = false; |
| |
| for (T3<Object, Object, Object> lostEvt : lostEvts) { |
| if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())) { |
| found = true; |
| |
| lostEvts.remove(lostEvt); |
| |
| break; |
| } |
| } |
| |
| if (!found) { |
| dup = true; |
| |
| break; |
| } |
| } |
| } |
| } |
| |
| if (dup) { |
| for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values()) { |
| if (!e.isEmpty()) { |
| for (CacheEntryEvent<?, ?> event : e) |
| log.error("Got duplicate event: " + event); |
| } |
| } |
| } |
| } |
| |
| if (!lostAllow && lostEvts.size() > 100) { |
| log.error("Lost event cnt: " + lostEvts.size()); |
| |
| for (T3<Object, Object, Object> e : lostEvts) |
| log.error("Lost event: " + e); |
| |
| fail("Lose events, see log for details."); |
| } |
| |
| log.error("Lost event cnt: " + lostEvts.size()); |
| |
| expEvts.clear(); |
| |
| lsnr.evts.clear(); |
| lsnr.vals.clear(); |
| } |
| } |
| |
| /** |
| * @param e Event |
| * @param expVals expected value |
| * @return {@code True} if entries has the same key, value and oldValue. If cache start without backups |
| * than oldValue ignoring in comparison. |
| */ |
| private boolean equalOldValue(CacheEntryEvent<?, ?> e, T3<Object, Object, Object> expVals) { |
| return (e.getOldValue() == null && expVals.get3() == null) // Both null |
| || (e.getOldValue() != null && expVals.get3() != null // Equals |
| && e.getOldValue().equals(expVals.get3())) |
| || (backups == 0); // If we start without backup than oldValue might be lose. |
| } |
| |
| /** |
| * @param expEvts Expected events. |
| * @param lsnr Listener. |
| * @throws Exception If failed. |
| */ |
| private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener3 lsnr, |
| boolean allowLoseEvt) throws Exception { |
| if (!allowLoseEvt) |
| assert GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| return lsnr.evts.size() == expEvts.size(); |
| } |
| }, 2000L); |
| |
| for (T3<Object, Object, Object> exp : expEvts) { |
| CacheEntryEvent<?, ?> e = lsnr.evts.get(exp.get1()); |
| |
| assertNotNull("No event for key: " + exp.get1(), e); |
| assertEquals("Unexpected value: " + e, exp.get2(), e.getValue()); |
| |
| if (allowLoseEvt) |
| lsnr.evts.remove(exp.get1()); |
| } |
| |
| if (allowLoseEvt) |
| assert lsnr.evts.isEmpty(); |
| |
| expEvts.clear(); |
| |
| lsnr.evts.clear(); |
| lsnr.keys.clear(); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param parts Number of partitions. |
| * @return Keys. |
| * @throws Exception If failed. |
| */ |
| private List<Integer> testKeys(IgniteCache<Object, Object> cache, int parts) throws Exception { |
| Ignite ignite = cache.unwrap(Ignite.class); |
| |
| List<Integer> res = new ArrayList<>(); |
| |
| final Affinity<Object> aff = ignite.affinity(cache.getName()); |
| |
| final ClusterNode node = ignite.cluster().localNode(); |
| |
| assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return aff.primaryPartitions(node).length > 0; |
| } |
| }, 5000)); |
| |
| int[] nodeParts = aff.primaryPartitions(node); |
| |
| final int KEYS_PER_PART = 50; |
| |
| for (int i = 0; i < parts; i++) { |
| int part = nodeParts[i]; |
| |
| int cnt = 0; |
| |
| for (int key = 0; key < 100_000; key++) { |
| if (aff.partition(key) == part && aff.isPrimary(node, key)) { |
| res.add(key); |
| |
| if (++cnt == KEYS_PER_PART) |
| break; |
| } |
| } |
| |
| assertEquals(KEYS_PER_PART, cnt); |
| } |
| |
| assertEquals(parts * KEYS_PER_PART, res.size()); |
| |
| return res; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testBackupQueueCleanupClientQuery() throws Exception { |
| startGridsMultiThreaded(2); |
| |
| Ignite qryClient = startClientGrid(2); |
| |
| CacheEventListener1 lsnr = new CacheEventListener1(false); |
| |
| ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); |
| |
| qry.setLocalListener(lsnr); |
| |
| QueryCursor<?> cur = qryClient.cache(DEFAULT_CACHE_NAME).query(qry); |
| |
| assertEquals(0, backupQueue(ignite(1)).size()); |
| |
| IgniteCache<Object, Object> cache0 = ignite(0).cache(DEFAULT_CACHE_NAME); |
| |
| List<Integer> keys = primaryKeys(cache0, BACKUP_ACK_THRESHOLD); |
| |
| CountDownLatch latch = new CountDownLatch(keys.size()); |
| |
| lsnr.latch = latch; |
| |
| for (Integer key : keys) { |
| log.info("Put: " + key); |
| |
| cache0.put(key, key); |
| } |
| |
| GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return backupQueue(ignite(1)).isEmpty(); |
| } |
| }, 2000); |
| |
| assertTrue("Backup queue is not cleared: " + backupQueue(ignite(1)), |
| backupQueue(ignite(1)).size() < BACKUP_ACK_THRESHOLD); |
| |
| if (!latch.await(5, SECONDS)) |
| fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); |
| |
| keys = primaryKeys(cache0, BACKUP_ACK_THRESHOLD / 2); |
| |
| latch = new CountDownLatch(keys.size()); |
| |
| lsnr.latch = latch; |
| |
| for (Integer key : keys) |
| cache0.put(key, key); |
| |
| final long ACK_FREQ = 5000; |
| |
| GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return backupQueue(ignite(1)).isEmpty(); |
| } |
| }, ACK_FREQ + 2000); |
| |
| assertTrue("Backup queue is not cleared: " + backupQueue(ignite(1)), backupQueue(ignite(1)).isEmpty()); |
| |
| if (!latch.await(5, SECONDS)) |
| fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); |
| |
| cur.close(); |
| |
| assertFalse("Unexpected error during test, see log for details.", err); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testBackupQueueEvict() throws Exception { |
| startGridsMultiThreaded(2); |
| |
| Ignite qryClient = startClientGrid(2); |
| |
| CacheEventListener1 lsnr = new CacheEventListener1(false); |
| |
| ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); |
| |
| qry.setLocalListener(lsnr); |
| |
| QueryCursor<?> cur = qryClient.cache(DEFAULT_CACHE_NAME).query(qry); |
| |
| assertEquals(0, backupQueue(ignite(0)).size()); |
| |
| long ttl = 100; |
| |
| final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl)); |
| |
| final IgniteCache<Object, Object> cache0 = ignite(2).cache(DEFAULT_CACHE_NAME).withExpiryPolicy(expiry); |
| |
| final List<Integer> keys = primaryKeys(ignite(1).cache(DEFAULT_CACHE_NAME), BACKUP_ACK_THRESHOLD); |
| |
| lsnr.latch = new CountDownLatch(keys.size()); |
| |
| for (Integer key : keys) { |
| log.info("Put: " + key); |
| |
| cache0.put(key, key); |
| } |
| |
| GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return backupQueue(ignite(0)).isEmpty(); |
| } |
| }, 5000); |
| |
| assertTrue("Backup queue is not cleared: " + backupQueue(ignite(0)), |
| backupQueue(ignite(0)).size() < BACKUP_ACK_THRESHOLD); |
| |
| boolean wait = waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return cache0.localPeek(keys.get(0)) == null; |
| } |
| }, ttl + 1000); |
| |
| assertTrue("Entry evicted.", wait); |
| |
| GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return backupQueue(ignite(0)).isEmpty(); |
| } |
| }, 2000); |
| |
| assertTrue("Backup queue is not cleared: " + backupQueue(ignite(0)), backupQueue(ignite(0)).size() < BACKUP_ACK_THRESHOLD); |
| |
| if (!backupQueue(ignite(0)).isEmpty()) { |
| for (Object o : backupQueue(ignite(0))) { |
| CacheContinuousQueryEntry e = (CacheContinuousQueryEntry)o; |
| |
| assertNotSame("Evicted entry added to backup queue.", -1L, e.updateCounter()); |
| } |
| } |
| |
| cur.close(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testBackupQueueCleanupServerQuery() throws Exception { |
| Ignite qryClient = startGridsMultiThreaded(2); |
| |
| CacheEventListener1 lsnr = new CacheEventListener1(false); |
| |
| ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); |
| |
| qry.setLocalListener(lsnr); |
| |
| IgniteCache<Object, Object> cache = qryClient.cache(DEFAULT_CACHE_NAME); |
| |
| QueryCursor<?> cur = cache.query(qry); |
| |
| assertEquals(0, backupQueue(ignite(1)).size()); |
| |
| List<Integer> keys = primaryKeys(cache, BACKUP_ACK_THRESHOLD); |
| |
| CountDownLatch latch = new CountDownLatch(keys.size()); |
| |
| lsnr.latch = latch; |
| |
| for (Integer key : keys) { |
| log.info("Put: " + key); |
| |
| cache.put(key, key); |
| } |
| |
| GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return backupQueue(ignite(1)).isEmpty(); |
| } |
| }, 5000); |
| |
| assertTrue("Backup queue is not cleared: " + backupQueue(ignite(1)), |
| backupQueue(ignite(1)).size() < BACKUP_ACK_THRESHOLD); |
| |
| if (!latch.await(5, SECONDS)) |
| fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']'); |
| |
| cur.close(); |
| } |
| |
| /** |
| * @param ignite Ignite. |
| * @return Backup queue for test query. |
| */ |
| private Collection<Object> backupQueue(Ignite ignite) { |
| GridContinuousProcessor proc = ((IgniteKernal)ignite).context().continuous(); |
| |
| ConcurrentMap<Object, Object> infos = GridTestUtils.getFieldValue(proc, "rmtInfos"); |
| |
| Collection<Object> backupQueue = new ArrayList<>(); |
| |
| for (Object info : infos.values()) { |
| GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd"); |
| |
| if (hnd.isQuery() && hnd.cacheName().equals(DEFAULT_CACHE_NAME)) { |
| Map<Integer, CacheContinuousQueryEventBuffer> map = GridTestUtils.getFieldValue(hnd, |
| CacheContinuousQueryHandler.class, "entryBufs"); |
| |
| for (CacheContinuousQueryEventBuffer buf : map.values()) { |
| Collection<Object> q = GridTestUtils.getFieldValue(buf, |
| CacheContinuousQueryEventBuffer.class, "backupQ"); |
| |
| if (q != null) |
| backupQueue.addAll(q); |
| } |
| } |
| } |
| |
| return backupQueue; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testFailoverStartStopBackup() throws Exception { |
| failoverStartStopFilter(2); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testStartStop() throws Exception { |
| this.backups = 2; |
| |
| final int SRV_NODES = 4; |
| |
| startGridsMultiThreaded(SRV_NODES); |
| |
| Ignite qryClient = startClientGrid(SRV_NODES); |
| |
| IgniteCache<Object, Object> qryClnCache = qryClient.cache(DEFAULT_CACHE_NAME); |
| |
| Affinity<Object> aff = qryClient.affinity(DEFAULT_CACHE_NAME); |
| |
| final CacheEventListener2 lsnr = new CacheEventListener2(); |
| |
| ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); |
| |
| qry.setLocalListener(lsnr); |
| |
| qry.setRemoteFilter(new CacheEventFilter()); |
| |
| QueryCursor<?> cur = qryClnCache.query(qry); |
| |
| for (int i = 0; i < 10; i++) { |
| final int idx = i % (SRV_NODES - 1); |
| |
| log.info("Stop node: " + idx); |
| |
| stopGrid(idx); |
| |
| awaitPartitionMapExchange(); |
| |
| List<T3<Object, Object, Object>> afterRestEvts = new ArrayList<>(); |
| |
| for (int j = 0; j < aff.partitions(); j++) { |
| Integer oldVal = (Integer)qryClnCache.get(j); |
| |
| qryClnCache.put(j, i); |
| |
| afterRestEvts.add(new T3<>((Object)j, (Object)i, (Object)oldVal)); |
| } |
| |
| checkEvents(new ArrayList<>(afterRestEvts), lsnr, false); |
| |
| log.info("Start node: " + idx); |
| |
| startGrid(idx); |
| } |
| |
| cur.close(); |
| } |
| |
| /** |
| * @param backups Number of backups. |
| * @throws Exception If failed. |
| */ |
| private void failoverStartStopFilter(int backups) throws Exception { |
| this.backups = backups; |
| |
| final int SRV_NODES = 4; |
| |
| startGridsMultiThreaded(SRV_NODES); |
| |
| Ignite qryClient = startClientGrid(SRV_NODES); |
| |
| IgniteCache<Object, Object> qryClnCache = qryClient.cache(DEFAULT_CACHE_NAME); |
| |
| final CacheEventListener2 lsnr = new CacheEventListener2(); |
| |
| ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); |
| |
| qry.setLocalListener(lsnr); |
| |
| qry.setRemoteFilter(asyncCallback() ? new CacheEventAsyncFilter() : new CacheEventFilter()); |
| |
| QueryCursor<?> cur = qryClnCache.query(qry); |
| |
| CacheEventListener2 dinLsnr = null; |
| |
| QueryCursor<?> dinQry = null; |
| |
| final AtomicBoolean stop = new AtomicBoolean(); |
| |
| final AtomicReference<CountDownLatch> checkLatch = new AtomicReference<>(); |
| |
| IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| while (!stop.get() && !err) { |
| final int idx = ThreadLocalRandom.current().nextInt(SRV_NODES - 1); |
| |
| log.info("Stop node: " + idx); |
| |
| awaitPartitionMapExchange(); |
| |
| Thread.sleep(400); |
| |
| stopGrid(idx); |
| |
| awaitPartitionMapExchange(); |
| |
| Thread.sleep(400); |
| |
| log.info("Start node: " + idx); |
| |
| startGrid(idx); |
| |
| Thread.sleep(200); |
| |
| CountDownLatch latch = new CountDownLatch(1); |
| |
| assertTrue(checkLatch.compareAndSet(null, latch)); |
| |
| if (!stop.get()) { |
| log.info("Wait for event check."); |
| |
| assertTrue(latch.await(1, MINUTES)); |
| } |
| } |
| |
| return null; |
| } |
| }); |
| |
| final Map<Integer, Integer> vals = new HashMap<>(); |
| |
| final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>(); |
| |
| final List<T3<Object, Object, Object>> expEvtsNewLsnr = new ArrayList<>(); |
| |
| final List<T3<Object, Object, Object>> expEvtsLsnr = new ArrayList<>(); |
| |
| try { |
| long stopTime = System.currentTimeMillis() + 60_000; |
| |
| // Start new filter each 5 sec. |
| long startFilterTime = System.currentTimeMillis() + 5_000; |
| |
| final int PARTS = qryClient.affinity(DEFAULT_CACHE_NAME).partitions(); |
| |
| ThreadLocalRandom rnd = ThreadLocalRandom.current(); |
| |
| boolean filtered = false; |
| |
| boolean processorPut = false; |
| |
| while (System.currentTimeMillis() < stopTime) { |
| Integer key = rnd.nextInt(PARTS); |
| |
| Integer prevVal = vals.get(key); |
| Integer val = vals.get(key); |
| |
| if (System.currentTimeMillis() > startFilterTime) { |
| // Stop filter and check events. |
| if (dinQry != null) { |
| // If sync callback is used then we can close a query before checking notifications |
| // because CQ listeners on a server side have a pending notification upon each |
| // successfull cache update operations completion. |
| if (!asyncCallback()) |
| dinQry.close(); |
| |
| log.info("Await events: " + expEvtsNewLsnr.size()); |
| |
| checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0); |
| |
| // If async callback is used and we close a query before checking notifications then |
| // some updates can be missed because a callback submitted in parallel can be executed |
| // after CQ is closed and no notification will be sent as a result. |
| // So, we close CQ after the check. |
| if (asyncCallback()) |
| dinQry.close(); |
| } |
| |
| dinLsnr = new CacheEventListener2(); |
| |
| ContinuousQuery<Object, Object> newQry = new ContinuousQuery<>(); |
| |
| newQry.setLocalListener(dinLsnr); |
| |
| newQry.setRemoteFilter(asyncCallback() ? new CacheEventAsyncFilter() : new CacheEventFilter()); |
| |
| dinQry = qryClnCache.query(newQry); |
| |
| log.info("Continuous query listener started."); |
| |
| startFilterTime = System.currentTimeMillis() + 5_000; |
| } |
| |
| if (val == null) |
| val = 0; |
| else |
| val = Math.abs(val) + 1; |
| |
| if (filtered) |
| val = -val; |
| |
| boolean updated = false; |
| |
| while (!updated) { |
| try { |
| if (processorPut && prevVal != null) { |
| qryClnCache.invoke(key, new CacheEntryProcessor<Object, Object, Void>() { |
| @Override public Void process(MutableEntry<Object, Object> entry, |
| Object... arguments) throws EntryProcessorException { |
| entry.setValue(arguments[0]); |
| |
| return null; |
| } |
| }, val); |
| } |
| else |
| qryClnCache.put(key, val); |
| |
| updated = true; |
| } |
| catch (CacheException e) { |
| assertTrue(X.hasCause(e, TransactionRollbackException.class)); |
| assertSame(atomicityMode(), CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT); |
| } |
| } |
| |
| processorPut = !processorPut; |
| |
| vals.put(key, val); |
| |
| if (val >= 0) { |
| List<T2<Integer, Integer>> keyEvts = expEvts.get(key); |
| |
| if (keyEvts == null) { |
| keyEvts = new ArrayList<>(); |
| |
| expEvts.put(key, keyEvts); |
| } |
| |
| keyEvts.add(new T2<>(val, prevVal)); |
| |
| T3<Object, Object, Object> tupVal = new T3<>((Object)key, (Object)val, (Object)prevVal); |
| |
| expEvtsLsnr.add(tupVal); |
| |
| if (dinQry != null) |
| expEvtsNewLsnr.add(tupVal); |
| } |
| |
| filtered = !filtered; |
| |
| CountDownLatch latch = checkLatch.get(); |
| |
| if (latch != null) { |
| log.info("Check events."); |
| |
| checkLatch.set(null); |
| |
| boolean success = false; |
| |
| try { |
| if (err) |
| break; |
| |
| checkEvents(expEvtsLsnr, lsnr, backups == 0); |
| |
| success = true; |
| |
| log.info("Events checked."); |
| } |
| finally { |
| if (!success) |
| err = true; |
| |
| latch.countDown(); |
| } |
| } |
| } |
| } |
| finally { |
| stop.set(true); |
| } |
| |
| CountDownLatch latch = checkLatch.get(); |
| |
| if (latch != null) |
| latch.countDown(); |
| |
| restartFut.get(); |
| |
| checkEvents(expEvtsLsnr, lsnr, backups == 0); |
| |
| lsnr.evts.clear(); |
| lsnr.vals.clear(); |
| |
| if (dinQry != null) { |
| checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0); |
| |
| dinLsnr.evts.clear(); |
| dinLsnr.vals.clear(); |
| } |
| |
| List<T3<Object, Object, Object>> afterRestEvts = new ArrayList<>(); |
| |
| for (int i = 0; i < qryClient.affinity(DEFAULT_CACHE_NAME).partitions(); i++) { |
| Integer oldVal = (Integer)qryClnCache.get(i); |
| |
| qryClnCache.put(i, i); |
| |
| afterRestEvts.add(new T3<>((Object)i, (Object)i, (Object)oldVal)); |
| } |
| |
| checkEvents(new ArrayList<>(afterRestEvts), lsnr, false); |
| |
| cur.close(); |
| |
| if (dinQry != null) { |
| checkEvents(new ArrayList<>(afterRestEvts), dinLsnr, false); |
| |
| dinQry.close(); |
| } |
| |
| assertFalse("Unexpected error during test, see log for details.", err); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMultiThreadedFailover() throws Exception { |
| this.backups = 2; |
| |
| final int SRV_NODES = 4; |
| |
| startGridsMultiThreaded(SRV_NODES); |
| |
| final Ignite qryCln = startClientGrid(SRV_NODES); |
| |
| final IgniteCache<Object, Object> qryClnCache = qryCln.cache(DEFAULT_CACHE_NAME); |
| |
| final CacheEventListener2 lsnr = asyncCallback() ? new CacheEventAsyncListener2() : new CacheEventListener2(); |
| |
| ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); |
| |
| qry.setLocalListener(lsnr); |
| |
| QueryCursor<?> cur = qryClnCache.query(qry); |
| |
| final AtomicBoolean stop = new AtomicBoolean(); |
| |
| final int THREAD = 4; |
| |
| final int PARTS = THREAD; |
| |
| final List<List<T3<Object, Object, Object>>> expEvts = new ArrayList<>(THREAD + 5); |
| |
| for (int i = 0; i < THREAD; i++) |
| expEvts.add(i, new ArrayList<T3<Object, Object, Object>>()); |
| |
| final AtomicReference<CyclicBarrier> checkBarrier = new AtomicReference<>(); |
| |
| final ThreadLocalRandom rnd = ThreadLocalRandom.current(); |
| |
| IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| try { |
| while (!stop.get() && !err) { |
| final int idx = rnd.nextInt(SRV_NODES); |
| |
| log.info("Stop node: " + idx); |
| |
| stopGrid(idx); |
| |
| Thread.sleep(300); |
| |
| GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| return qryCln.cluster().nodes().size() == SRV_NODES; |
| } |
| }, 5000L); |
| |
| try { |
| log.info("Start node: " + idx); |
| |
| startGrid(idx); |
| |
| Thread.sleep(300); |
| |
| GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| return qryCln.cluster().nodes().size() == SRV_NODES + 1; |
| } |
| }, 5000L); |
| } |
| catch (Exception e) { |
| log.warning("Failed to stop nodes.", e); |
| } |
| |
| CyclicBarrier bar = new CyclicBarrier(THREAD + 1 /* plus start/stop thread */, new Runnable() { |
| @Override public void run() { |
| try { |
| int size0 = 0; |
| |
| for (List<T3<Object, Object, Object>> evt : expEvts) |
| size0 += evt.size(); |
| |
| final int size = size0; |
| |
| GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| return lsnr.size() >= size; |
| } |
| }, 10_000L); |
| |
| List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>(); |
| |
| for (List<T3<Object, Object, Object>> evt : expEvts) |
| expEvts0.addAll(evt); |
| |
| checkEvents(expEvts0, lsnr, false, false); |
| |
| for (List<T3<Object, Object, Object>> evt : expEvts) |
| evt.clear(); |
| } |
| catch (Exception e) { |
| log.error("Failed.", e); |
| |
| err = true; |
| |
| stop.set(true); |
| } |
| finally { |
| checkBarrier.set(null); |
| } |
| } |
| }); |
| |
| assertTrue(checkBarrier.compareAndSet(null, bar)); |
| |
| if (!stop.get() && !err) |
| bar.await(1, MINUTES); |
| } |
| } |
| catch (Throwable e) { |
| log.error("Unexpected error: " + e, e); |
| |
| err = true; |
| |
| throw e; |
| } |
| |
| return null; |
| } |
| }); |
| |
| final long stopTime = System.currentTimeMillis() + 60_000; |
| |
| final AtomicInteger valCntr = new AtomicInteger(0); |
| |
| final AtomicInteger threadSeq = new AtomicInteger(0); |
| |
| GridTestUtils.runMultiThreaded(new Runnable() { |
| @Override public void run() { |
| try { |
| final ThreadLocalRandom rnd = ThreadLocalRandom.current(); |
| |
| final int threadId = threadSeq.getAndIncrement(); |
| |
| log.error("Thread id: " + threadId); |
| |
| while (System.currentTimeMillis() < stopTime && !stop.get() && !err) { |
| Integer key = rnd.nextInt(PARTS); |
| |
| Integer val = valCntr.incrementAndGet(); |
| |
| Integer prevVal = null; |
| |
| boolean updated = false; |
| |
| while (!updated) { |
| try { |
| prevVal = (Integer)qryClnCache.getAndPut(key, val); |
| |
| updated = true; |
| } |
| catch (CacheException e) { |
| assertTrue(e.getCause() instanceof TransactionSerializationException); |
| assertSame(atomicityMode(), CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT); |
| } |
| } |
| |
| expEvts.get(threadId).add(new T3<>((Object)key, (Object)val, (Object)prevVal)); |
| |
| CyclicBarrier bar = checkBarrier.get(); |
| |
| if (bar != null) |
| bar.await(1, MINUTES); |
| } |
| } |
| catch (Exception e) { |
| log.error("Failed.", e); |
| |
| err = true; |
| |
| stop.set(true); |
| } |
| finally { |
| stop.set(true); |
| } |
| } |
| }, THREAD, "update-thread"); |
| |
| restartFut.get(); |
| |
| List<T3<Object, Object, Object>> expEvts0 = new ArrayList<>(); |
| |
| for (List<T3<Object, Object, Object>> evt : expEvts) { |
| expEvts0.addAll(evt); |
| |
| evt.clear(); |
| } |
| |
| if (!expEvts0.isEmpty()) |
| checkEvents(expEvts0, lsnr, true); |
| |
| cur.close(); |
| |
| assertFalse("Unexpected error during test, see log for details.", err); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMultiThreaded() throws Exception { |
| this.backups = 2; |
| |
| final int SRV_NODES = 3; |
| |
| startGridsMultiThreaded(SRV_NODES); |
| |
| Ignite qryClient = startClientGrid(SRV_NODES); |
| |
| final IgniteCache<Object, Object> cache = qryClient.cache(DEFAULT_CACHE_NAME); |
| |
| CacheEventListener1 lsnr = new CacheEventListener1(true); |
| |
| ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); |
| |
| qry.setLocalListener(lsnr); |
| |
| QueryCursor<?> cur = cache.query(qry); |
| |
| final int SRV_IDX = SRV_NODES - 1; |
| |
| List<Integer> keys = primaryKeys(ignite(SRV_IDX).cache(DEFAULT_CACHE_NAME), 10); |
| |
| final int THREADS = 10; |
| |
| for (int i = 0; i < keys.size(); i++) { |
| log.info("Iteration: " + i); |
| |
| Ignite srv = ignite(SRV_IDX); |
| |
| TestCommunicationSpi spi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi(); |
| |
| spi.sndFirstOnly = new AtomicBoolean(false); |
| |
| final Integer key = keys.get(i); |
| |
| final AtomicInteger val = new AtomicInteger(); |
| |
| CountDownLatch latch = new CountDownLatch(THREADS); |
| |
| lsnr.latch = latch; |
| |
| IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| Integer val0 = val.getAndIncrement(); |
| |
| boolean updated = false; |
| |
| while (!updated) { |
| try { |
| cache.put(key, val0); |
| |
| updated = true; |
| } |
| catch (CacheException e) { |
| assertTrue(e.getCause() instanceof TransactionSerializationException); |
| assertSame(atomicityMode(), CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT); |
| } |
| } |
| |
| return null; |
| } |
| }, THREADS, "update-thread"); |
| |
| fut.get(); |
| |
| stopGrid(SRV_IDX); |
| |
| if (!latch.await(5, SECONDS)) |
| fail("Failed to wait for notifications [exp=" + THREADS + ", left=" + lsnr.latch.getCount() + ']'); |
| |
| assertEquals(THREADS, lsnr.allEvts.size()); |
| |
| Set<Integer> vals = new HashSet<>(); |
| |
| boolean err = false; |
| |
| for (CacheEntryEvent<?, ?> evt : lsnr.allEvts) { |
| assertEquals(key, evt.getKey()); |
| assertNotNull(evt.getValue()); |
| |
| if (!vals.add((Integer)evt.getValue())) { |
| err = true; |
| |
| log.info("Extra event: " + evt); |
| } |
| } |
| |
| for (int v = 0; v < THREADS; v++) { |
| if (!vals.contains(v)) { |
| err = true; |
| |
| log.info("Event for value not received: " + v); |
| } |
| } |
| |
| assertFalse("Invalid events, see log for details.", err); |
| |
| lsnr.allEvts.clear(); |
| |
| startGrid(SRV_IDX); |
| } |
| |
| cur.close(); |
| } |
| |
| /** |
| * @param logAll If {@code true} logs all unexpected values. |
| * @param expEvts Expected values. |
| * @param lsnr Listener. |
| * @return Check status. |
| */ |
| @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") |
| private boolean checkEvents(boolean logAll, |
| Map<Integer, List<T2<Integer, Integer>>> expEvts, |
| CacheEventListener2 lsnr) { |
| assertTrue(!expEvts.isEmpty()); |
| |
| boolean pass = true; |
| |
| for (Map.Entry<Integer, List<T2<Integer, Integer>>> e : expEvts.entrySet()) { |
| Integer key = e.getKey(); |
| List<T2<Integer, Integer>> exp = e.getValue(); |
| |
| List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(key); |
| |
| if (rcvdEvts == null) { |
| pass = false; |
| |
| log.info("No events for key [key=" + key + ", exp=" + e.getValue() + ']'); |
| |
| if (!logAll) |
| return false; |
| } |
| else { |
| synchronized (rcvdEvts) { |
| if (rcvdEvts.size() != exp.size()) { |
| pass = false; |
| |
| log.info("Missed or extra events for key [key=" + key + |
| ", exp=" + e.getValue() + |
| ", rcvd=" + rcvdEvts + ']'); |
| |
| if (!logAll) |
| return false; |
| } |
| |
| int cnt = Math.min(rcvdEvts.size(), exp.size()); |
| |
| for (int i = 0; i < cnt; i++) { |
| T2<Integer, Integer> expEvt = exp.get(i); |
| CacheEntryEvent<?, ?> rcvdEvt = rcvdEvts.get(i); |
| |
| if (pass) { |
| assertEquals(key, rcvdEvt.getKey()); |
| assertEquals(expEvt.get1(), rcvdEvt.getValue()); |
| } |
| else { |
| if (!key.equals(rcvdEvt.getKey()) || !expEvt.get1().equals(rcvdEvt.getValue())) |
| log.warning("Missed events. [key=" + key + ", actKey=" + rcvdEvt.getKey() |
| + ", expVal=" + expEvt.get1() + ", actVal=" + rcvdEvt.getValue() + "]"); |
| } |
| } |
| |
| if (!pass) { |
| for (int i = cnt; i < exp.size(); i++) { |
| T2<Integer, Integer> val = exp.get(i); |
| |
| log.warning("Missed events. [key=" + key + ", expVal=" + val.get1() |
| + ", prevVal=" + val.get2() + "]"); |
| } |
| } |
| } |
| } |
| } |
| |
| if (pass) { |
| expEvts.clear(); |
| lsnr.evts.clear(); |
| } |
| |
| return pass; |
| } |
| |
| /** |
| * This is failover test detecting CQ event loss while topology changing. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testNoEventLossOnTopologyChange() throws Exception { |
| final int batchLoadSize = 2000; |
| |
| final int restartCycles = 5; |
| |
| Ignite qryClient = startGrid(0); |
| |
| final CacheEventListener4 lsnr = new CacheEventListener4(atomicityMode() == CacheAtomicityMode.ATOMIC); |
| |
| ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); |
| |
| qry.setLocalListener(lsnr); |
| |
| IgniteCache<Integer, Integer> cache = qryClient.cache(DEFAULT_CACHE_NAME); |
| |
| QueryCursor<?> cur = cache.query(qry); |
| |
| int iteration = 0; |
| |
| int putCnt = 0; |
| |
| int ignoredDupEvts = 0; |
| |
| Thread nodeRestartThread = nodeRestartThread(restartCycles, 2_000, 1_000); |
| |
| try { |
| nodeRestartThread.start(); |
| |
| while (!Thread.interrupted() && nodeRestartThread.isAlive()) { |
| iteration++; |
| |
| for (int i = 0; i < batchLoadSize; i++) |
| cache.put(i, iteration); |
| |
| putCnt += batchLoadSize; |
| |
| log.info("Batch loaded. Iteration: " + iteration); |
| |
| final long expCnt = putCnt + ignoredDupEvts; |
| |
| GridTestUtils.waitForCondition(new GridAbsPredicate() { |
| @Override public boolean apply() { |
| return lsnr.count() == expCnt; |
| } |
| }, 6_000); |
| |
| final long cnt = lsnr.count(); |
| |
| if (cnt != expCnt) { |
| StringBuilder sb = new StringBuilder(); |
| |
| for (int i = 0; i < batchLoadSize; i++) { |
| Integer key = i; |
| Integer val = cache.get(key); |
| |
| if (!F.eq(val, iteration)) |
| sb.append("\n\t").append(">>> WRONG CACHE VALUE (lost data?) [key=").append(key) |
| .append(", val=").append(val).append(']'); |
| } |
| |
| for (Map.Entry<Integer, Integer> entry : lsnr.eventMap().entrySet()) { |
| Integer key = entry.getKey(); |
| Integer val = entry.getValue(); |
| |
| if (!F.eq(val, iteration)) |
| sb.append("\n\t").append(">>> WRONG LISTENER VALUE (lost event?) [key=").append(key) |
| .append(", val=").append(val).append(']'); |
| } |
| |
| String msg = sb.toString(); |
| |
| // In atomic mode CQ can receive duplicate update events if update retried after fails. |
| if (atomicityMode() == CacheAtomicityMode.ATOMIC && msg.isEmpty() && cnt > expCnt) |
| ignoredDupEvts += cnt - expCnt; |
| else |
| fail("Unexpected event updates count: EXPECTED=" + expCnt + ", ACTUAL=" + cnt + ", " + |
| "ITERATION=" + iteration + msg); |
| } |
| |
| sleep(500); |
| } |
| } |
| finally { |
| nodeRestartThread.interrupt(); |
| |
| cur.close(); |
| |
| nodeRestartThread.join(3_000); |
| } |
| } |
| |
| /** |
| * Starts thread which restarts a node over and over again. |
| */ |
| private Thread nodeRestartThread(final int restartCycles, final long initDelay, final long restartDelay) { |
| Thread t = new Thread(new Runnable() { |
| @Override public void run() { |
| sleep(initDelay); |
| |
| try { |
| for (int i = 1; i <= restartCycles && !Thread.interrupted(); i++) { |
| |
| IgniteConfiguration cfg = optimize(getConfiguration("restartNode")). |
| setGridLogger(new NullLogger()); |
| |
| log.info("Node restart cycle started: " + i); |
| |
| try (Ignite ignored = Ignition.start(cfg)) { |
| awaitPartitionMapExchange(); |
| |
| sleep(restartDelay); |
| } |
| |
| log.info("Node restart cycle finished: " + i); |
| |
| awaitPartitionMapExchange(); |
| |
| sleep(restartDelay); |
| } |
| } |
| catch (Exception e) { |
| log.error("Unexpected error.", e); |
| } |
| } |
| }); |
| |
| t.setName("flapping-node-thread"); |
| |
| t.setDaemon(true); |
| |
| return t; |
| } |
| |
| /** |
| * Sleep quietly |
| * |
| * @param sleepTime Sleep time. |
| */ |
| private void sleep(long sleepTime) { |
| try { |
| if (Thread.currentThread().isInterrupted()) |
| return; |
| |
| U.sleep(sleepTime); |
| } |
| catch (IgniteInterruptedCheckedException ignored) { |
| Thread.interrupted(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| @IgniteAsyncCallback |
| private static class CacheEventAsyncListener1 extends CacheEventListener1 { |
| /** |
| * @param saveAll Save all events flag. |
| */ |
| CacheEventAsyncListener1(boolean saveAll) { |
| super(saveAll); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class CacheEventListener1 implements CacheEntryUpdatedListener<Object, Object> { |
| /** */ |
| private volatile CountDownLatch latch; |
| |
| /** */ |
| private GridConcurrentHashSet<Integer> keys = new GridConcurrentHashSet<>(); |
| |
| /** */ |
| private ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>(); |
| |
| /** */ |
| private final CopyOnWriteArrayList<CacheEntryEvent<?, ?>> allEvts; |
| |
| /** */ |
| @LoggerResource |
| private IgniteLogger log; |
| |
| /** |
| * @param saveAll Save all events flag. |
| */ |
| CacheEventListener1(boolean saveAll) { |
| allEvts = saveAll ? new CopyOnWriteArrayList<CacheEntryEvent<?, ?>>() : null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { |
| try { |
| for (CacheEntryEvent<?, ?> evt : evts) { |
| CountDownLatch latch = this.latch; |
| |
| log.info("Received cache event [evt=" + evt + |
| ", left=" + (latch != null ? latch.getCount() : null) + ']'); |
| |
| this.evts.put(evt.getKey(), evt); |
| |
| keys.add((Integer)evt.getKey()); |
| |
| if (allEvts != null) |
| allEvts.add(evt); |
| |
| assertTrue(latch != null); |
| assertTrue(latch.getCount() > 0); |
| |
| latch.countDown(); |
| |
| if (latch.getCount() == 0) { |
| this.latch = null; |
| |
| keys.clear(); |
| } |
| } |
| } |
| catch (Throwable e) { |
| err = true; |
| |
| log.error("Unexpected error", e); |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| @IgniteAsyncCallback |
| private static class CacheEventAsyncListener2 extends CacheEventListener2 { |
| // No-op. |
| } |
| |
| /** |
| * |
| */ |
| private static class CacheEventListener2 implements CacheEntryUpdatedListener<Object, Object> { |
| /** */ |
| @LoggerResource |
| private IgniteLogger log; |
| |
| /** */ |
| private final ConcurrentHashMap<Integer, Integer> vals = new ConcurrentHashMap<>(); |
| |
| /** */ |
| private final ConcurrentHashMap<Integer, List<CacheEntryEvent<?, ?>>> evts = new ConcurrentHashMap<>(); |
| |
| /** |
| * @return Count events. |
| */ |
| public synchronized int size() { |
| int size = 0; |
| |
| for (List<CacheEntryEvent<?, ?>> e : evts.values()) |
| size += e.size(); |
| |
| return size; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public synchronized void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) |
| throws CacheEntryListenerException { |
| try { |
| for (CacheEntryEvent<?, ?> evt : evts) { |
| Integer key = (Integer)evt.getKey(); |
| Integer val = (Integer)evt.getValue(); |
| |
| assertNotNull(key); |
| assertNotNull(val); |
| |
| Integer prevVal = vals.get(key); |
| |
| boolean dup = false; |
| |
| if (prevVal != null && prevVal.equals(val)) |
| dup = true; |
| |
| if (!dup) { |
| vals.put(key, val); |
| |
| List<CacheEntryEvent<?, ?>> keyEvts = this.evts.get(key); |
| |
| if (keyEvts == null) { |
| keyEvts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>()); |
| |
| this.evts.put(key, keyEvts); |
| } |
| |
| keyEvts.add(evt); |
| } |
| } |
| } |
| catch (Throwable e) { |
| err = true; |
| |
| log.error("Unexpected error", e); |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| @IgniteAsyncCallback |
| public static class CacheEventAsyncListener3 extends CacheEventListener3 { |
| // No-op. |
| } |
| |
| /** |
| * |
| */ |
| public static class CacheEventListener3 implements CacheEntryUpdatedListener<Object, Object>, |
| CacheEntryEventSerializableFilter<Object, Object> { |
| /** Keys. */ |
| GridConcurrentHashSet<Integer> keys = new GridConcurrentHashSet<>(); |
| |
| /** Events. */ |
| private final ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>(); |
| |
| /** {@inheritDoc} */ |
| @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) throws CacheEntryListenerException { |
| for (CacheEntryEvent<?, ?> e : evts) { |
| Integer key = (Integer)e.getKey(); |
| |
| keys.add(key); |
| |
| assert this.evts.put(key, e) == null; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean evaluate(CacheEntryEvent<?, ?> e) throws CacheEntryListenerException { |
| return (Integer)e.getValue() % 2 == 0; |
| } |
| } |
| |
| /** |
| * Listener. |
| */ |
| private static class CacheEventListener4 implements CacheEntryUpdatedListener<Integer, Integer> { |
| /** Listener count. */ |
| private final AtomicLong cntr = new AtomicLong(); |
| |
| /** Listener map. */ |
| private final Map<Integer, Integer> evtMap = new ConcurrentHashMap<>(); |
| |
| /** Atomicity mode flag. */ |
| private final boolean atomicModeFlag; |
| |
| /** Constructor */ |
| public CacheEventListener4(boolean atomicModeFlag) { |
| this.atomicModeFlag = atomicModeFlag; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("EqualsBetweenInconvertibleTypes") |
| @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) |
| throws CacheEntryListenerException { |
| for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { |
| Integer prev = evtMap.put(evt.getKey(), evt.getValue()); |
| |
| //Atomic cache allows duplicate events if cache update operation fails, e.g. due to topology change. |
| if (!atomicModeFlag || prev == null || !prev.equals(evt)) |
| cntr.incrementAndGet(); |
| } |
| } |
| |
| /** |
| * @return Events count. |
| */ |
| public long count() { |
| return cntr.get(); |
| } |
| |
| /** |
| * @return Event map. |
| */ |
| Map<Integer, Integer> eventMap() { |
| return evtMap; |
| } |
| } |
| |
| /** |
| * |
| */ |
| @IgniteAsyncCallback |
| private static class CacheEventAsyncFilter extends CacheEventFilter { |
| // No-op. |
| } |
| |
| /** |
| * |
| */ |
| public static class CacheEventFilter implements CacheEntryEventSerializableFilter<Object, Object> { |
| /** {@inheritDoc} */ |
| @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException { |
| return ((Integer)evt.getValue()) >= 0; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class TestCommunicationSpi extends TcpCommunicationSpi { |
| /** */ |
| @LoggerResource |
| private IgniteLogger log; |
| |
| /** */ |
| private volatile boolean skipMsg; |
| |
| /** */ |
| private volatile boolean skipAllMsg; |
| |
| /** */ |
| private volatile AtomicBoolean sndFirstOnly; |
| |
| /** {@inheritDoc} */ |
| @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) |
| throws IgniteSpiException { |
| Object msg0 = ((GridIoMessage)msg).message(); |
| |
| if (skipAllMsg) |
| return; |
| |
| if (msg0 instanceof GridContinuousMessage) { |
| if (skipMsg) { |
| if (log.isDebugEnabled()) |
| log.debug("Skip continuous message: " + msg0); |
| |
| return; |
| } |
| else { |
| AtomicBoolean sndFirstOnly = this.sndFirstOnly; |
| |
| if (sndFirstOnly != null && !sndFirstOnly.compareAndSet(false, true)) { |
| if (log.isDebugEnabled()) |
| log.debug("Skip continuous message: " + msg0); |
| |
| return; |
| } |
| } |
| } |
| |
| super.sendMessage(node, msg, ackC); |
| } |
| } |
| } |