| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.query.continuous; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.SortedMap; |
| import java.util.TreeMap; |
| import java.util.UUID; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import javax.cache.Cache; |
| import javax.cache.configuration.Factory; |
| import javax.cache.configuration.FactoryBuilder; |
| import javax.cache.event.CacheEntryEvent; |
| import javax.cache.event.CacheEntryEventFilter; |
| import javax.cache.event.CacheEntryListenerException; |
| import javax.cache.event.CacheEntryUpdatedListener; |
| import javax.cache.event.EventType; |
| import javax.cache.integration.CacheLoaderException; |
| import javax.cache.integration.CacheWriterException; |
| import javax.cache.processor.EntryProcessor; |
| import javax.cache.processor.MutableEntry; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.cache.CacheAtomicityMode; |
| import org.apache.ignite.cache.CacheEntryEventSerializableFilter; |
| import org.apache.ignite.cache.CacheMode; |
| import org.apache.ignite.cache.affinity.Affinity; |
| import org.apache.ignite.cache.query.AbstractContinuousQuery; |
| import org.apache.ignite.cache.query.CacheQueryEntryEvent; |
| import org.apache.ignite.cache.query.ContinuousQuery; |
| import org.apache.ignite.cache.query.ContinuousQueryWithTransformer; |
| import org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener; |
| import org.apache.ignite.cache.query.QueryCursor; |
| import org.apache.ignite.cache.store.CacheStore; |
| import org.apache.ignite.cache.store.CacheStoreAdapter; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.internal.util.tostring.GridToStringInclude; |
| import org.apache.ignite.internal.util.typedef.PA; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteClosure; |
| import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.testframework.GridTestUtils.SF; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.apache.ignite.transactions.Transaction; |
| import org.apache.ignite.transactions.TransactionConcurrency; |
| import org.apache.ignite.transactions.TransactionIsolation; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| import static javax.cache.event.EventType.CREATED; |
| import static javax.cache.event.EventType.REMOVED; |
| import static javax.cache.event.EventType.UPDATED; |
| import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; |
| import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; |
| import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; |
| import static org.apache.ignite.cache.CacheMode.PARTITIONED; |
| import static org.apache.ignite.cache.CacheMode.REPLICATED; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; |
| import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.ALL; |
| import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT; |
| import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER; |
| import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; |
| import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; |
| import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; |
| import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; |
| |
| /** |
| * |
| */ |
| public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstractTest { |
| /** */ |
| private static final int NODES = 5; |
| |
| /** */ |
| private static final int KEYS = 50; |
| |
| /** */ |
| private static final int VALS = 10; |
| |
| /** */ |
| public static final int ITERATION_CNT = SF.applyLB(100, 5); |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); |
| |
| ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); |
| |
| return cfg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTestsStarted() throws Exception { |
| super.beforeTestsStarted(); |
| |
| startGridsMultiThreaded(getServerNodeCount()); |
| |
| startClientGrid(getServerNodeCount()); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testFilterAndFactoryProvided() throws Exception { |
| final CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| ATOMIC, |
| false); |
| |
| grid(0).createCache(ccfg); |
| |
| try { |
| final ContinuousQuery qry = new ContinuousQuery(); |
| |
| qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter>() { |
| @Override public CacheEntryEventFilter create() { |
| return null; |
| } |
| }); |
| |
| qry.setRemoteFilter(new CacheEntryEventSerializableFilter() { |
| @Override public boolean evaluate(CacheEntryEvent event) throws CacheEntryListenerException { |
| return false; |
| } |
| }); |
| |
| qry.setLocalListener(new CacheEntryUpdatedListener() { |
| @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { |
| // No-op. |
| } |
| }); |
| |
| GridTestUtils.assertThrows(log, new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| return grid(0).cache(ccfg.getName()).query(qry); |
| } |
| }, IgniteException.class, null); |
| |
| } |
| finally { |
| grid(0).destroyCache(ccfg.getName()); |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAtomicClient() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| ATOMIC, |
| false); |
| |
| doTestContinuousQuery(ccfg, CLIENT); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAtomic() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| ATOMIC, |
| false); |
| |
| doTestContinuousQuery(ccfg, SERVER); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAtomicAllNodes() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| ATOMIC, |
| false); |
| |
| doTestContinuousQuery(ccfg, ALL); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAtomicReplicated() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, |
| 0, |
| ATOMIC, |
| false); |
| |
| doTestContinuousQuery(ccfg, SERVER); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAtomicReplicatedAllNodes() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, |
| 0, |
| ATOMIC, |
| false); |
| |
| doTestContinuousQuery(ccfg, ALL); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAtomicReplicatedClient() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, |
| 0, |
| ATOMIC, |
| false); |
| |
| doTestContinuousQuery(ccfg, CLIENT); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAtomicNoBackups() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 0, |
| ATOMIC, |
| false); |
| |
| doTestContinuousQuery(ccfg, SERVER); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAtomicNoBackupsAllNodes() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 0, |
| ATOMIC, |
| false); |
| |
| doTestContinuousQuery(ccfg, ALL); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testAtomicNoBackupsClient() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 0, |
| ATOMIC, |
| false); |
| |
| doTestContinuousQuery(ccfg, CLIENT); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTx() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| TRANSACTIONAL, |
| false); |
| |
| doTestContinuousQuery(ccfg, SERVER); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTxAllNodes() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| TRANSACTIONAL, |
| false); |
| |
| doTestContinuousQuery(ccfg, ALL); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTxExplicit() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| TRANSACTIONAL, |
| false); |
| |
| doTestContinuousQuery(ccfg, SERVER); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMvccTx() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| TRANSACTIONAL_SNAPSHOT, |
| false); |
| |
| doTestContinuousQuery(ccfg, SERVER); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMvccTxAllNodes() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| TRANSACTIONAL_SNAPSHOT, |
| false); |
| |
| doTestContinuousQuery(ccfg, ALL); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMvccTxExplicit() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| TRANSACTIONAL_SNAPSHOT, |
| false); |
| |
| doTestContinuousQuery(ccfg, SERVER); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDoubleRemoveAtomicWithoutBackup() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 0, |
| ATOMIC, |
| false); |
| |
| doTestNotModifyOperation(ccfg); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDoubleRemoveAtomicWithoutBackupWithStore() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 0, |
| ATOMIC, |
| true); |
| |
| doTestNotModifyOperation(ccfg); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDoubleRemoveAtomic() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| ATOMIC, |
| false); |
| |
| doTestNotModifyOperation(ccfg); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDoubleRemoveAtomicWithStore() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| ATOMIC, |
| true); |
| |
| doTestNotModifyOperation(ccfg); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDoubleRemoveTx() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| TRANSACTIONAL, |
| false); |
| |
| doTestNotModifyOperation(ccfg); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDoubleRemoveTxWithStore() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| TRANSACTIONAL, |
| true); |
| |
| doTestNotModifyOperation(ccfg); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDoubleRemoveReplicatedTx() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, |
| 0, |
| TRANSACTIONAL, |
| false); |
| |
| doTestNotModifyOperation(ccfg); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDoubleRemoveReplicatedTxWithStore() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, |
| 0, |
| TRANSACTIONAL, |
| true); |
| |
| doTestNotModifyOperation(ccfg); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDoubleRemoveMvccTx() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| TRANSACTIONAL_SNAPSHOT, |
| false); |
| |
| doTestNotModifyOperation(ccfg); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Ignore("https://issues.apache.org/jira/browse/IGNITE-8582") |
| @Test |
| public void testDoubleRemoveMvccTxWithStore() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| TRANSACTIONAL_SNAPSHOT, |
| true); |
| |
| doTestNotModifyOperation(ccfg); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDoubleRemoveReplicatedMvccTx() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, |
| 0, |
| TRANSACTIONAL_SNAPSHOT, |
| false); |
| |
| doTestNotModifyOperation(ccfg); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Ignore("https://issues.apache.org/jira/browse/IGNITE-8582") |
| @Test |
| public void testDoubleRemoveReplicatedMvccTxWithStore() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, |
| 0, |
| TRANSACTIONAL_SNAPSHOT, |
| true); |
| |
| doTestNotModifyOperation(ccfg); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDoubleRemoveReplicatedAtomic() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, |
| 0, |
| ATOMIC, |
| false); |
| |
| doTestNotModifyOperation(ccfg); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDoubleRemoveReplicatedAtomicWithStore() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, |
| 0, |
| ATOMIC, |
| true); |
| |
| doTestNotModifyOperation(ccfg); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| private void doTestNotModifyOperation(CacheConfiguration ccfg) throws Exception { |
| singleOperation(ccfg); |
| batchOperation(ccfg); |
| } |
| |
| /** |
| * @param ccfg Cache configuration. |
| * @throws Exception If failed. |
| */ |
| private void singleOperation(CacheConfiguration ccfg) throws Exception { |
| IgniteCache<QueryTestKey, QueryTestValue> cache = grid(getClientIndex()).createCache(ccfg); |
| |
| try { |
| AbstractContinuousQuery<QueryTestKey, QueryTestValue> qry = createQuery(); |
| |
| final List<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> evts = |
| new CopyOnWriteArrayList<>(); |
| |
| if (noOpFilterFactory() != null) |
| qry.setRemoteFilterFactory(noOpFilterFactory()); |
| |
| if (qry instanceof ContinuousQuery) { |
| ((ContinuousQuery<QueryTestKey, QueryTestValue>)qry).setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() { |
| @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, |
| ? extends QueryTestValue>> events) throws CacheEntryListenerException { |
| for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events) |
| evts.add(e); |
| } |
| }); |
| } |
| else if (qry instanceof ContinuousQueryWithTransformer) |
| initQueryWithTransformer( |
| (ContinuousQueryWithTransformer<QueryTestKey, QueryTestValue, CacheEntryEvent>)qry, evts); |
| else |
| fail("Unknown query type"); |
| |
| QueryTestKey key = new QueryTestKey(1); |
| |
| try (QueryCursor qryCur = cache.query(qry)) { |
| for (int i = 0; i < ITERATION_CNT; i++) { |
| log.info("Start iteration: " + i); |
| // Not events. |
| cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ? extends Object>) |
| (Object)new EntrySetValueProcessor(true)); |
| |
| // Get events. |
| cache.put(key, new QueryTestValue(1)); |
| cache.remove(key); |
| |
| // Not events. |
| cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ? extends Object>) |
| (Object)new EntrySetValueProcessor(null, false)); |
| cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ? extends Object>) |
| (Object)new EntrySetValueProcessor(null, false)); |
| cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ? extends Object>) |
| (Object)new EntrySetValueProcessor(true)); |
| cache.remove(key); |
| |
| // Get events. |
| cache.put(key, new QueryTestValue(2)); |
| |
| // Not events. |
| cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ? extends Object>) |
| (Object)new EntrySetValueProcessor(true)); |
| |
| // Get events. |
| cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ? extends Object>) |
| (Object)new EntrySetValueProcessor(null, false)); |
| |
| // Not events. |
| cache.remove(key); |
| |
| // Get events. |
| cache.put(key, new QueryTestValue(3)); |
| cache.put(key, new QueryTestValue(4)); |
| |
| // Not events. |
| cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ? extends Object>) |
| (Object)new EntrySetValueProcessor(true)); |
| cache.putIfAbsent(key, new QueryTestValue(5)); |
| cache.putIfAbsent(key, new QueryTestValue(5)); |
| cache.putIfAbsent(key, new QueryTestValue(5)); |
| cache.invoke(key, (EntryProcessor<QueryTestKey, QueryTestValue, ? extends Object>) |
| (Object)new EntrySetValueProcessor(true)); |
| cache.remove(key, new QueryTestValue(5)); |
| |
| // Get events. |
| cache.remove(key, new QueryTestValue(4)); |
| cache.putIfAbsent(key, new QueryTestValue(5)); |
| |
| // Not events. |
| cache.replace(key, new QueryTestValue(3), new QueryTestValue(2)); |
| cache.replace(key, new QueryTestValue(3), new QueryTestValue(2)); |
| cache.replace(key, new QueryTestValue(3), new QueryTestValue(2)); |
| |
| // Get events. |
| cache.replace(key, new QueryTestValue(5), new QueryTestValue(6)); |
| |
| assert GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| return evts.size() == 9; |
| } |
| }, 5_000); |
| |
| checkSingleEvent(evts.get(0), CREATED, new QueryTestValue(1), null); |
| checkSingleEvent(evts.get(1), REMOVED, new QueryTestValue(1), new QueryTestValue(1)); |
| checkSingleEvent(evts.get(2), CREATED, new QueryTestValue(2), null); |
| checkSingleEvent(evts.get(3), REMOVED, new QueryTestValue(2), new QueryTestValue(2)); |
| checkSingleEvent(evts.get(4), CREATED, new QueryTestValue(3), null); |
| checkSingleEvent(evts.get(5), EventType.UPDATED, new QueryTestValue(4), new QueryTestValue(3)); |
| checkSingleEvent(evts.get(6), REMOVED, new QueryTestValue(4), new QueryTestValue(4)); |
| checkSingleEvent(evts.get(7), CREATED, new QueryTestValue(5), null); |
| checkSingleEvent(evts.get(8), EventType.UPDATED, new QueryTestValue(6), new QueryTestValue(5)); |
| |
| evts.clear(); |
| |
| cache.remove(key); |
| cache.remove(key); |
| |
| assert GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| return evts.size() == 1; |
| } |
| }, 5_000); |
| |
| evts.clear(); |
| |
| log.info("Finish iteration: " + i); |
| } |
| } |
| } |
| finally { |
| grid(getClientIndex()).destroyCache(ccfg.getName()); |
| } |
| } |
| |
| /** |
| * @return No-op filter factory for batch operations. |
| */ |
| protected Factory<? extends CacheEntryEventFilter<QueryTestKey, QueryTestValue>> noOpFilterFactory() { |
| return null; |
| } |
| |
| /** |
| * @param ccfg Cache configuration. |
| * @throws Exception If failed. |
| */ |
| private void batchOperation(CacheConfiguration ccfg) throws Exception { |
| IgniteCache<QueryTestKey, QueryTestValue> cache = grid(getClientIndex()).createCache(ccfg); |
| |
| try { |
| AbstractContinuousQuery<QueryTestKey, QueryTestValue> qry = createQuery(); |
| |
| final List<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> evts = |
| new CopyOnWriteArrayList<>(); |
| |
| if (noOpFilterFactory() != null) |
| qry.setRemoteFilterFactory(noOpFilterFactory()); |
| |
| if (qry instanceof ContinuousQuery) { |
| ((ContinuousQuery<QueryTestKey, QueryTestValue>)qry).setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() { |
| @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, |
| ? extends QueryTestValue>> events) throws CacheEntryListenerException { |
| for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events) |
| evts.add(e); |
| } |
| }); |
| } |
| else if (qry instanceof ContinuousQueryWithTransformer) |
| initQueryWithTransformer( |
| (ContinuousQueryWithTransformer<QueryTestKey, QueryTestValue, CacheEntryEvent>)qry, evts); |
| else |
| fail("Unknown query type"); |
| |
| Map<QueryTestKey, QueryTestValue> map = new TreeMap<>(); |
| |
| for (int i = 0; i < KEYS; i++) |
| map.put(new QueryTestKey(i), new QueryTestValue(i)); |
| |
| try (QueryCursor qryCur = cache.query(qry)) { |
| for (int i = 0; i < ITERATION_CNT / 2; i++) { |
| log.info("Start iteration: " + i); |
| // Not events. |
| cache.removeAll(map.keySet()); |
| cache.invokeAll(map.keySet(), (EntryProcessor<QueryTestKey, QueryTestValue, ? extends Object>) |
| (Object)new EntrySetValueProcessor(null, false)); |
| cache.invokeAll(map.keySet(), (EntryProcessor<QueryTestKey, QueryTestValue, ? extends Object>) |
| (Object)new EntrySetValueProcessor(true)); |
| |
| // Get events. |
| cache.putAll(map); |
| |
| assert GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| return evts.size() == KEYS; |
| } |
| }, 5_000); |
| |
| checkEvents(evts, CREATED); |
| |
| evts.clear(); |
| |
| // Not events. |
| cache.invokeAll(map.keySet(), (EntryProcessor<QueryTestKey, QueryTestValue, ? extends Object>) |
| (Object)new EntrySetValueProcessor(true)); |
| |
| U.sleep(100); |
| |
| assertEquals(0, evts.size()); |
| |
| // Get events. |
| cache.invokeAll(map.keySet(), (EntryProcessor<QueryTestKey, QueryTestValue, ? extends Object>) |
| (Object)new EntrySetValueProcessor(null, false)); |
| |
| // Not events. |
| cache.removeAll(map.keySet()); |
| cache.removeAll(map.keySet()); |
| |
| assert GridTestUtils.waitForCondition(new PA() { |
| @Override public boolean apply() { |
| return evts.size() == KEYS; |
| } |
| }, 5_000); |
| |
| checkEvents(evts, REMOVED); |
| |
| evts.clear(); |
| |
| log.info("Finish iteration: " + i); |
| } |
| } |
| } |
| finally { |
| grid(getClientIndex()).destroyCache(ccfg.getName()); |
| } |
| } |
| |
| /** |
| * @param evts Events. |
| * @param evtType Event type. |
| */ |
| private void checkEvents(List<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> evts, |
| EventType evtType) { |
| for (int key = 0; key < KEYS; key++) { |
| QueryTestKey keyVal = new QueryTestKey(key); |
| |
| for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : evts) { |
| if (e.getKey().equals(keyVal)) { |
| checkSingleEvent(e, |
| evtType, |
| evtType != UPDATED ? new QueryTestValue(key) : null, |
| evtType == REMOVED ? new QueryTestValue(key) : null); |
| |
| keyVal = null; |
| |
| break; |
| } |
| } |
| |
| assertNull("Event for key not found.", keyVal); |
| } |
| } |
| |
| /** |
| * @param event Event. |
| * @param type Event type. |
| * @param val Value. |
| * @param oldVal Old value. |
| */ |
| private void checkSingleEvent( |
| CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event, |
| EventType type, |
| QueryTestValue val, |
| QueryTestValue oldVal) { |
| assertEquals(event.getEventType(), type); |
| assertEquals(event.getValue(), val); |
| assertEquals(event.getOldValue(), oldVal); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTxClient() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| TRANSACTIONAL, |
| false); |
| |
| doTestContinuousQuery(ccfg, CLIENT); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTxClientExplicit() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| TRANSACTIONAL, |
| false); |
| |
| doTestContinuousQuery(ccfg, CLIENT); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTxReplicated() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, |
| 0, |
| TRANSACTIONAL, |
| false); |
| |
| doTestContinuousQuery(ccfg, SERVER); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTxReplicatedClient() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, |
| 0, |
| TRANSACTIONAL, |
| false); |
| |
| doTestContinuousQuery(ccfg, CLIENT); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTxNoBackups() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 0, |
| TRANSACTIONAL, |
| false); |
| |
| doTestContinuousQuery(ccfg, SERVER); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTxNoBackupsAllNodes() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 0, |
| TRANSACTIONAL, |
| false); |
| |
| doTestContinuousQuery(ccfg, ALL); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTxNoBackupsExplicit() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 0, |
| TRANSACTIONAL, |
| false); |
| |
| doTestContinuousQuery(ccfg, SERVER); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testTxNoBackupsClient() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 0, |
| TRANSACTIONAL, |
| false); |
| |
| doTestContinuousQuery(ccfg, CLIENT); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMvccTxClient() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| TRANSACTIONAL_SNAPSHOT, |
| false); |
| |
| doTestContinuousQuery(ccfg, CLIENT); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMvccTxClientExplicit() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 1, |
| TRANSACTIONAL_SNAPSHOT, |
| false); |
| |
| doTestContinuousQuery(ccfg, CLIENT); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMvccTxReplicated() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, |
| 0, |
| TRANSACTIONAL_SNAPSHOT, |
| false); |
| |
| doTestContinuousQuery(ccfg, SERVER); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMvccTxReplicatedClient() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, |
| 0, |
| TRANSACTIONAL_SNAPSHOT, |
| false); |
| |
| doTestContinuousQuery(ccfg, CLIENT); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMvccTxNoBackups() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 0, |
| TRANSACTIONAL_SNAPSHOT, |
| false); |
| |
| doTestContinuousQuery(ccfg, SERVER); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMvccTxNoBackupsAllNodes() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 0, |
| TRANSACTIONAL_SNAPSHOT, |
| false); |
| |
| doTestContinuousQuery(ccfg, ALL); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMvccTxNoBackupsExplicit() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 0, |
| TRANSACTIONAL_SNAPSHOT, |
| false); |
| |
| doTestContinuousQuery(ccfg, SERVER); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMvccTxNoBackupsClient() throws Exception { |
| CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, |
| 0, |
| TRANSACTIONAL_SNAPSHOT, |
| false); |
| |
| doTestContinuousQuery(ccfg, CLIENT); |
| } |
| |
| /** |
| * @param ccfg Cache configuration. |
| * @param deploy The place where continuous query will be started. |
| * @throws Exception If failed. |
| */ |
| protected void doTestContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy) |
| throws Exception { |
| ignite(0).createCache(ccfg); |
| |
| try { |
| long seed = System.currentTimeMillis(); |
| |
| Random rnd = new Random(seed); |
| |
| log.info("Random seed: " + seed); |
| |
| List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>(); |
| |
| Collection<QueryCursor<?>> curs = new ArrayList<>(); |
| |
| if (deploy == CLIENT) { |
| AbstractContinuousQuery<Object, Object> qry = createQuery(); |
| |
| final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); |
| |
| if (qry instanceof ContinuousQuery) { |
| ((ContinuousQuery<Object, Object>)qry).setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { |
| @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { |
| for (CacheEntryEvent<?, ?> evt : evts) |
| evtsQueue.add(evt); |
| } |
| }); |
| } |
| else if (qry instanceof ContinuousQueryWithTransformer) |
| initQueryWithTransformer( |
| (ContinuousQueryWithTransformer<Object, Object, CacheEntryEvent>)qry, evtsQueue); |
| else |
| fail("Unknown query type"); |
| |
| evtsQueues.add(evtsQueue); |
| |
| QueryCursor<?> cur = grid(getClientIndex()).cache(ccfg.getName()).query(qry); |
| |
| curs.add(cur); |
| } |
| else if (deploy == SERVER) { |
| AbstractContinuousQuery<Object, Object> qry = createQuery(); |
| |
| final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); |
| |
| if (qry instanceof ContinuousQuery) { |
| ((ContinuousQuery<Object, Object>)qry).setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { |
| @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { |
| for (CacheEntryEvent<?, ?> evt : evts) |
| evtsQueue.add(evt); |
| } |
| }); |
| } |
| else if (qry instanceof ContinuousQueryWithTransformer) |
| initQueryWithTransformer( |
| (ContinuousQueryWithTransformer<Object, Object, CacheEntryEvent>)qry, evtsQueue); |
| else |
| fail("Unknown query type"); |
| |
| evtsQueues.add(evtsQueue); |
| |
| QueryCursor<?> cur = grid(rnd.nextInt(getServerNodeCount())).cache(ccfg.getName()).query(qry); |
| |
| curs.add(cur); |
| } |
| else { |
| for (int i = 0; i <= getServerNodeCount(); i++) { |
| AbstractContinuousQuery<Object, Object> qry = createQuery(); |
| |
| final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); |
| |
| if (qry instanceof ContinuousQuery) { |
| ((ContinuousQuery<Object, Object>)qry).setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { |
| @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { |
| for (CacheEntryEvent<?, ?> evt : evts) |
| evtsQueue.add(evt); |
| } |
| }); |
| } |
| else if (qry instanceof ContinuousQueryWithTransformer) |
| initQueryWithTransformer( |
| (ContinuousQueryWithTransformer<Object, Object, CacheEntryEvent>)qry, evtsQueue); |
| else |
| fail("Unknown query type"); |
| |
| evtsQueues.add(evtsQueue); |
| |
| QueryCursor<?> cur = ignite(i).cache(ccfg.getName()).query(qry); |
| |
| curs.add(cur); |
| } |
| } |
| |
| ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>(); |
| |
| Map<Integer, Long> partCntr = new ConcurrentHashMap<>(); |
| |
| try { |
| for (int i = 0; i < ITERATION_CNT; i++) { |
| if (i % 20 == 0) |
| log.info("Iteration: " + i); |
| |
| for (int idx = 0; idx < getServerNodeCount(); idx++) |
| randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName())); |
| } |
| } |
| finally { |
| for (QueryCursor<?> cur : curs) |
| cur.close(); |
| } |
| } |
| finally { |
| ignite(0).destroyCache(ccfg.getName()); |
| } |
| } |
| |
| /** |
| * @return Client node index. |
| */ |
| private int getClientIndex() { |
| return getServerNodeCount() - 1; |
| } |
| |
| /** |
| * @return Count nodes. |
| */ |
| protected int getServerNodeCount() { |
| return NODES; |
| } |
| |
| /** |
| * @param rnd Random generator. |
| * @param evtsQueues Events queue. |
| * @param expData Expected cache data. |
| * @param partCntr Partition counter. |
| * @param cache Cache. |
| * @throws Exception If failed. |
| */ |
| private void randomUpdate( |
| Random rnd, |
| List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues, |
| ConcurrentMap<Object, Object> expData, |
| Map<Integer, Long> partCntr, |
| IgniteCache<Object, Object> cache) |
| throws Exception { |
| Object key = new QueryTestKey(rnd.nextInt(KEYS)); |
| Object newVal = value(rnd); |
| Object oldVal = expData.get(key); |
| |
| int op = rnd.nextInt(13); |
| |
| Ignite ignite = cache.unwrap(Ignite.class); |
| |
| Map<Object, Long> expEvtCntrs = new ConcurrentHashMap<>(); |
| |
| Transaction tx = null; |
| |
| CacheAtomicityMode atomicityMode = atomicityMode(cache); |
| |
| boolean mvccEnabled = atomicityMode == TRANSACTIONAL_SNAPSHOT; |
| |
| if (atomicityMode != ATOMIC && rnd.nextBoolean()) { |
| TransactionConcurrency concurrency = mvccEnabled ? PESSIMISTIC : txRandomConcurrency(rnd); |
| TransactionIsolation isolation = mvccEnabled ? REPEATABLE_READ : txRandomIsolation(rnd); |
| |
| tx = ignite.transactions().txStart(concurrency, isolation); |
| } |
| |
| try { |
| log.info("Random operation [key=" + key + ", op=" + op + ']'); |
| |
| switch (op) { |
| case 0: { |
| cache.put(key, newVal); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| updatePartitionCounter(cache, key, partCntr, expEvtCntrs, false); |
| |
| waitAndCheckEvent(evtsQueues, partCntr, expEvtCntrs, affinity(cache), key, newVal, oldVal); |
| |
| expData.put(key, newVal); |
| |
| break; |
| } |
| |
| case 1: { |
| cache.getAndPut(key, newVal); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| updatePartitionCounter(cache, key, partCntr, expEvtCntrs, false); |
| |
| waitAndCheckEvent(evtsQueues, partCntr, expEvtCntrs, affinity(cache), key, newVal, oldVal); |
| |
| expData.put(key, newVal); |
| |
| break; |
| } |
| |
| case 2: { |
| boolean res = cache.remove(key); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| // We don't update part counter if nothing was removed when MVCC enabled. |
| updatePartitionCounter(cache, key, partCntr, expEvtCntrs, mvccEnabled && !res); |
| |
| waitAndCheckEvent(evtsQueues, partCntr, expEvtCntrs, affinity(cache), key, oldVal, oldVal); |
| |
| expData.remove(key); |
| |
| break; |
| } |
| |
| case 3: { |
| Object res = cache.getAndRemove(key); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| // We don't update part counter if nothing was removed when MVCC enabled. |
| updatePartitionCounter(cache, key, partCntr, expEvtCntrs, mvccEnabled && res == null); |
| |
| waitAndCheckEvent(evtsQueues, partCntr, expEvtCntrs, affinity(cache), key, oldVal, oldVal); |
| |
| expData.remove(key); |
| |
| break; |
| } |
| |
| case 4: { |
| cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean())); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| updatePartitionCounter(cache, key, partCntr, expEvtCntrs, false); |
| |
| waitAndCheckEvent(evtsQueues, partCntr, expEvtCntrs, affinity(cache), key, newVal, oldVal); |
| |
| expData.put(key, newVal); |
| |
| break; |
| } |
| |
| case 5: { |
| EntrySetValueProcessor proc = new EntrySetValueProcessor(null, rnd.nextBoolean()); |
| |
| cache.invoke(key, proc); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| // We don't update part counter if nothing was removed when MVCC enabled. |
| updatePartitionCounter(cache, key, partCntr, expEvtCntrs,mvccEnabled && proc.getOldVal() == null); |
| |
| waitAndCheckEvent(evtsQueues, partCntr, expEvtCntrs, affinity(cache), key, oldVal, oldVal); |
| |
| expData.remove(key); |
| |
| break; |
| } |
| |
| case 6: { |
| cache.putIfAbsent(key, newVal); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| if (oldVal == null) { |
| updatePartitionCounter(cache, key, partCntr, expEvtCntrs, false); |
| |
| waitAndCheckEvent(evtsQueues, partCntr, expEvtCntrs, affinity(cache), key, newVal, null); |
| |
| expData.put(key, newVal); |
| } |
| else |
| checkNoEvent(evtsQueues); |
| |
| break; |
| } |
| |
| case 7: { |
| cache.getAndPutIfAbsent(key, newVal); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| if (oldVal == null) { |
| updatePartitionCounter(cache, key, partCntr, expEvtCntrs, false); |
| |
| waitAndCheckEvent(evtsQueues, partCntr, expEvtCntrs, affinity(cache), key, newVal, null); |
| |
| expData.put(key, newVal); |
| } |
| else |
| checkNoEvent(evtsQueues); |
| |
| break; |
| } |
| |
| case 8: { |
| cache.replace(key, newVal); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| if (oldVal != null) { |
| updatePartitionCounter(cache, key, partCntr, expEvtCntrs, false); |
| |
| waitAndCheckEvent(evtsQueues, partCntr, expEvtCntrs, affinity(cache), key, newVal, oldVal); |
| |
| expData.put(key, newVal); |
| } |
| else |
| checkNoEvent(evtsQueues); |
| |
| break; |
| } |
| |
| case 9: { |
| cache.getAndReplace(key, newVal); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| if (oldVal != null) { |
| updatePartitionCounter(cache, key, partCntr, expEvtCntrs, false); |
| |
| waitAndCheckEvent(evtsQueues, partCntr, expEvtCntrs, affinity(cache), key, newVal, oldVal); |
| |
| expData.put(key, newVal); |
| } |
| else |
| checkNoEvent(evtsQueues); |
| |
| break; |
| } |
| |
| case 10: { |
| if (oldVal != null) { |
| Object replaceVal = value(rnd); |
| |
| boolean success = replaceVal.equals(oldVal); |
| |
| if (success) { |
| cache.replace(key, replaceVal, newVal); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| updatePartitionCounter(cache, key, partCntr, expEvtCntrs, false); |
| |
| waitAndCheckEvent(evtsQueues, partCntr, expEvtCntrs, affinity(cache), key, newVal, oldVal); |
| |
| expData.put(key, newVal); |
| } |
| else { |
| cache.replace(key, replaceVal, newVal); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| checkNoEvent(evtsQueues); |
| } |
| } |
| else { |
| cache.replace(key, value(rnd), newVal); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| checkNoEvent(evtsQueues); |
| } |
| |
| break; |
| } |
| |
| case 11: { |
| SortedMap<Object, Object> vals = new TreeMap<>(); |
| |
| while (vals.size() < KEYS / 5) |
| vals.put(new QueryTestKey(rnd.nextInt(KEYS)), value(rnd)); |
| |
| cache.putAll(vals); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| for (Map.Entry<Object, Object> e : vals.entrySet()) |
| updatePartitionCounter(cache, e.getKey(), partCntr, expEvtCntrs, false); |
| |
| waitAndCheckEvent(evtsQueues, partCntr, expEvtCntrs, affinity(cache), vals, expData); |
| |
| expData.putAll(vals); |
| |
| break; |
| } |
| |
| case 12: { |
| SortedMap<Object, Object> vals = new TreeMap<>(); |
| |
| while (vals.size() < KEYS / 5) |
| vals.put(new QueryTestKey(rnd.nextInt(KEYS)), newVal); |
| |
| cache.invokeAll(vals.keySet(), new EntrySetValueProcessor(newVal, rnd.nextBoolean())); |
| |
| if (tx != null) |
| tx.commit(); |
| |
| for (Map.Entry<Object, Object> e : vals.entrySet()) |
| updatePartitionCounter(cache, e.getKey(), partCntr, expEvtCntrs, false); |
| |
| waitAndCheckEvent(evtsQueues, partCntr, expEvtCntrs, affinity(cache), vals, expData); |
| |
| for (Object o : vals.keySet()) |
| expData.put(o, newVal); |
| |
| break; |
| } |
| |
| default: |
| fail("Op:" + op); |
| } |
| } finally { |
| if (tx != null) |
| tx.close(); |
| } |
| } |
| |
| /** |
| * @param evtsQueues Queue. |
| * @param partCntrs Counters. |
| * @param aff Affinity. |
| * @param vals Values. |
| * @param expData Expected data. |
| */ |
| private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues, |
| Map<Integer, Long> partCntrs, |
| Map<Object, Long> evtCntrs, |
| Affinity<Object> aff, |
| SortedMap<Object, Object> vals, |
| Map<Object, Object> expData) |
| throws Exception { |
| for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) { |
| Map<Object, CacheEntryEvent> rcvEvts = new HashMap<>(); |
| |
| for (int i = 0; i < vals.size(); i++) { |
| CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS); |
| |
| rcvEvts.put(evt.getKey(), evt); |
| } |
| |
| assertEquals(vals.size(), rcvEvts.size()); |
| |
| for (Map.Entry<Object, Object> e : vals.entrySet()) { |
| Object key = e.getKey(); |
| Object val = e.getValue(); |
| Object oldVal = expData.get(key); |
| |
| if (val == null && oldVal == null) { |
| checkNoEvent(evtsQueues); |
| |
| continue; |
| } |
| |
| CacheEntryEvent evt = rcvEvts.get(key); |
| |
| assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', |
| evt); |
| assertEquals(key, evt.getKey()); |
| assertEquals(val, evt.getValue()); |
| assertEquals(oldVal, evt.getOldValue()); |
| |
| Long curPartCntr = partCntrs.get(aff.partition(key)); |
| Long cntr = evtCntrs.get(key); |
| CacheQueryEntryEvent qryEntryEvt = (CacheQueryEntryEvent)evt.unwrap(CacheQueryEntryEvent.class); |
| |
| assertNotNull(cntr); |
| assertNotNull(curPartCntr); |
| assertNotNull(qryEntryEvt); |
| assertTrue(cntr <= curPartCntr); |
| |
| assertEquals((long)cntr, qryEntryEvt.getPartitionUpdateCounter()); |
| } |
| } |
| } |
| |
| /** |
| * @param rnd {@link Random}. |
| * @return {@link TransactionIsolation}. |
| */ |
| private TransactionIsolation txRandomIsolation(Random rnd) { |
| int val = rnd.nextInt(3); |
| |
| if (val == 0) |
| return READ_COMMITTED; |
| else if (val == 1) |
| return REPEATABLE_READ; |
| else |
| return SERIALIZABLE; |
| } |
| |
| /** |
| * @param rnd {@link Random}. |
| * @return {@link TransactionConcurrency}. |
| */ |
| private TransactionConcurrency txRandomConcurrency(Random rnd) { |
| return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : PESSIMISTIC; |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param key Key |
| * @param cntrs Partition counters. |
| * @param evtCntrs Event counters. |
| * @param skipUpdCntr Skip update counter flag. |
| */ |
| private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs, |
| Map<Object, Long> evtCntrs, boolean skipUpdCntr) { |
| Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName()); |
| |
| int part = aff.partition(key); |
| |
| Long partCntr = cntrs.get(part); |
| |
| if (partCntr == null) |
| partCntr = 0L; |
| |
| if (!skipUpdCntr) |
| partCntr++; |
| |
| cntrs.put(part, partCntr); |
| evtCntrs.put(key, partCntr); |
| } |
| |
| /** |
| * @param rnd Random generator. |
| * @return Cache value. |
| */ |
| private static Object value(Random rnd) { |
| return new QueryTestValue(rnd.nextInt(VALS)); |
| } |
| |
| /** |
| * @param evtsQueues Event queue. |
| * @param partCntrs Partition counters. |
| * @param aff Affinity function. |
| * @param key Key. |
| * @param val Value. |
| * @param oldVal Old value. |
| * @throws Exception If failed. |
| */ |
| private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues, |
| Map<Integer, Long> partCntrs, |
| Map<Object, Long> evtCntrs, |
| Affinity<Object> aff, |
| Object key, |
| Object val, |
| Object oldVal) |
| throws Exception { |
| if (val == null && oldVal == null) { |
| checkNoEvent(evtsQueues); |
| |
| return; |
| } |
| |
| for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) { |
| CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS); |
| |
| assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt); |
| assertEquals(key, evt.getKey()); |
| assertEquals(val, evt.getValue()); |
| assertEquals(oldVal, evt.getOldValue()); |
| |
| Long curPartCntr = partCntrs.get(aff.partition(key)); |
| |
| Long cntr = evtCntrs.get(key); |
| CacheQueryEntryEvent qryEntryEvt = evt.unwrap(CacheQueryEntryEvent.class); |
| |
| assertNotNull(cntr); |
| assertNotNull(curPartCntr); |
| assertNotNull(qryEntryEvt); |
| assertTrue(cntr <= curPartCntr); |
| |
| assertEquals((long)cntr, qryEntryEvt.getPartitionUpdateCounter()); |
| } |
| } |
| |
| /** |
| * @param evtsQueues Event queue. |
| * @throws Exception If failed. |
| */ |
| private void checkNoEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues) throws Exception { |
| for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) { |
| CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS); |
| |
| assertNull(evt); |
| } |
| } |
| |
| /** |
| * |
| * @param cacheMode Cache mode. |
| * @param backups Number of backups. |
| * @param atomicityMode Cache atomicity mode. |
| * @param store If {@code true} configures dummy cache store. |
| * @return Cache configuration. |
| */ |
| protected CacheConfiguration<Object, Object> cacheConfiguration( |
| CacheMode cacheMode, |
| int backups, |
| CacheAtomicityMode atomicityMode, |
| boolean store) { |
| CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); |
| |
| ccfg.setName("cache-" + UUID.randomUUID()); // TODO GG-11220 (remove setName when fixed). |
| ccfg.setAtomicityMode(atomicityMode); |
| ccfg.setCacheMode(cacheMode); |
| ccfg.setWriteSynchronizationMode(FULL_SYNC); |
| |
| if (cacheMode == PARTITIONED) |
| ccfg.setBackups(backups); |
| |
| if (store) { |
| ccfg.setCacheStoreFactory(new TestStoreFactory()); |
| ccfg.setReadThrough(true); |
| ccfg.setWriteThrough(true); |
| } |
| |
| return ccfg; |
| } |
| |
| /** |
| * @param <K> Key type. |
| * @param <V> Value type. |
| * @return New instance of continuous query. |
| */ |
| protected <K, V> AbstractContinuousQuery<K, V> createQuery() { |
| return new ContinuousQuery<>(); |
| } |
| |
| /** |
| * |
| */ |
| private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> { |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public CacheStore<Object, Object> create() { |
| return new CacheStoreAdapter() { |
| @Override public Object load(Object key) throws CacheLoaderException { |
| return null; |
| } |
| |
| @Override public void write(Cache.Entry entry) throws CacheWriterException { |
| // No-op. |
| } |
| |
| @Override public void delete(Object key) throws CacheWriterException { |
| // No-op. |
| } |
| }; |
| } |
| } |
| |
| /** |
| * |
| */ |
| public static class QueryTestKey implements Serializable, Comparable { |
| /** */ |
| private final Integer key; |
| |
| /** |
| * @param key Key. |
| */ |
| public QueryTestKey(Integer key) { |
| this.key = key; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean equals(Object o) { |
| if (this == o) |
| return true; |
| |
| if (o == null || getClass() != o.getClass()) |
| return false; |
| |
| QueryTestKey that = (QueryTestKey)o; |
| |
| return key.equals(that.key); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int hashCode() { |
| return key.hashCode(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(QueryTestKey.class, this); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int compareTo(Object o) { |
| return key - ((QueryTestKey)o).key; |
| } |
| } |
| |
| /** |
| * |
| */ |
| public static class QueryTestValue implements Serializable { |
| /** */ |
| @GridToStringInclude |
| protected final Integer val1; |
| |
| /** */ |
| @GridToStringInclude |
| protected final String val2; |
| |
| /** |
| * @param val Value. |
| */ |
| public QueryTestValue(Integer val) { |
| this.val1 = val; |
| this.val2 = String.valueOf(val); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean equals(Object o) { |
| if (this == o) |
| return true; |
| |
| if (o == null || getClass() != o.getClass()) |
| return false; |
| |
| QueryTestValue that = (QueryTestValue) o; |
| |
| return val1.equals(that.val1) && val2.equals(that.val2); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int hashCode() { |
| int res = val1.hashCode(); |
| |
| res = 31 * res + val2.hashCode(); |
| |
| return res; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(QueryTestValue.class, this); |
| } |
| } |
| |
| /** |
| * |
| */ |
| protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, Object> { |
| /** |
| * Static variable: we need to obtain a previous value from another node. |
| * Assume this is a single threaded execution. |
| */ |
| private static Object oldVal; |
| |
| /** */ |
| private Object val; |
| |
| /** */ |
| private boolean retOld; |
| |
| /** */ |
| private boolean skipModify; |
| |
| /** |
| * @param skipModify If {@code true} then entry will not be modified. |
| */ |
| public EntrySetValueProcessor(boolean skipModify) { |
| this.skipModify = skipModify; |
| } |
| |
| /** |
| * @param val Value to set. |
| * @param retOld Return old value flag. |
| */ |
| public EntrySetValueProcessor(Object val, boolean retOld) { |
| this.val = val; |
| this.retOld = retOld; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Object process(MutableEntry<Object, Object> e, Object... args) { |
| if (skipModify) |
| return null; |
| |
| oldVal = e.getValue(); |
| |
| Object old = retOld ? e.getValue() : null; |
| |
| if (val != null) |
| e.setValue(val); |
| else |
| e.remove(); |
| |
| return old; |
| } |
| |
| /** |
| * @return Old value. |
| */ |
| Object getOldVal() { |
| Object oldVal0 = oldVal; |
| |
| oldVal = null; // Clean value. |
| |
| return oldVal0; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(EntrySetValueProcessor.class, this); |
| } |
| } |
| |
| /** |
| * |
| */ |
| protected enum ContinuousDeploy { |
| CLIENT, SERVER, ALL |
| } |
| |
| /** |
| * Initialize continuous query with transformer. |
| * Query will accumulate all events in accumulator. |
| * |
| * @param qry Continuous query. |
| * @param acc Accumulator for events. |
| * @param <K> Key type. |
| * @param <V> Value type. |
| */ |
| private <K, V> void initQueryWithTransformer( |
| ContinuousQueryWithTransformer<K, V, CacheEntryEvent> qry, |
| Collection<CacheEntryEvent<? extends K, ? extends V>> acc) { |
| |
| IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, CacheEntryEvent> transformer = |
| new IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, CacheEntryEvent>() { |
| @Override public CacheEntryEvent apply(CacheEntryEvent<? extends K, ? extends V> event) { |
| return event; |
| } |
| }; |
| |
| ContinuousQueryWithTransformer<K, V, CacheEntryEvent> qry0 = |
| (ContinuousQueryWithTransformer<K, V, CacheEntryEvent>)qry; |
| |
| qry0.setRemoteTransformerFactory(FactoryBuilder.factoryOf(transformer)); |
| |
| qry0.setLocalListener(new EventListener<CacheEntryEvent>() { |
| @Override public void onUpdated(Iterable<? extends CacheEntryEvent> events) { |
| for (CacheEntryEvent e : events) |
| acc.add(e); |
| } |
| }); |
| } |
| } |