| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import javax.cache.Cache; |
| import javax.cache.configuration.Factory; |
| import javax.cache.processor.EntryProcessor; |
| import javax.cache.processor.MutableEntry; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteTransactions; |
| import org.apache.ignite.cache.CacheAtomicityMode; |
| import org.apache.ignite.cache.CacheMode; |
| import org.apache.ignite.cache.CachePeekMode; |
| import org.apache.ignite.cache.CacheWriteSynchronizationMode; |
| import org.apache.ignite.cache.store.CacheStore; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.configuration.NearCacheConfiguration; |
| import org.apache.ignite.internal.IgniteKernal; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.util.lang.GridAbsPredicateX; |
| import org.apache.ignite.internal.util.typedef.CI1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.P1; |
| import org.apache.ignite.internal.util.typedef.R1; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; |
| import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; |
| import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; |
| import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.testframework.MvccFeatureChecker; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.apache.ignite.transactions.Transaction; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; |
| import static org.apache.ignite.cache.CacheMode.PARTITIONED; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; |
| |
| /** |
| * Abstract class for cache tests. |
| */ |
| public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest { |
| /** Test timeout */ |
| private static final long TEST_TIMEOUT = 30 * 1000; |
| |
| /** VM ip finder for TCP discovery. */ |
| protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); |
| |
| /** */ |
| protected static TestCacheStoreStrategy storeStgy; |
| |
| /** |
| * @return Grids count to start. |
| */ |
| protected abstract int gridCount(); |
| |
| /** {@inheritDoc} */ |
| @Override protected long getTestTimeout() { |
| return TEST_TIMEOUT; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTestsStarted() throws Exception { |
| super.beforeTestsStarted(); |
| |
| int cnt = gridCount(); |
| |
| assert cnt >= 1 : "At least one grid must be started"; |
| |
| if (!MvccFeatureChecker.forcedMvcc() || MvccFeatureChecker.isSupported(MvccFeatureChecker.Feature.CACHE_STORE)) |
| initStoreStrategy(); |
| |
| startGrids(cnt); |
| |
| awaitPartitionMapExchange(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTestsStopped() throws Exception { |
| if (storeStgy != null) |
| storeStgy.resetStore(); |
| } |
| |
| /** |
| * Initializes {@link #storeStgy} with respect to the nature of the test. |
| * |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected void initStoreStrategy() throws IgniteCheckedException { |
| if (storeStgy == null) |
| storeStgy = isMultiJvm() ? new H2CacheStoreStrategy() : new MapCacheStoreStrategy(); |
| else if (isMultiJvm() && !(storeStgy instanceof H2CacheStoreStrategy)) |
| storeStgy = new H2CacheStoreStrategy(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| assert jcache().unwrap(Ignite.class).transactions().tx() == null; |
| assertEquals(0, jcache().localSize()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| Transaction tx = jcache().unwrap(Ignite.class).transactions().tx(); |
| |
| if (tx != null) { |
| tx.close(); |
| |
| fail("Cache transaction remained after test completion: " + tx); |
| } |
| |
| for (int i = 0; i < gridCount(); i++) { |
| info("Checking grid: " + i); |
| |
| while (true) { |
| try { |
| final int fi = i; |
| |
| assertTrue( |
| "Cache is not empty: " + " localSize = " + jcache(fi).localSize(CachePeekMode.ALL) |
| + ", local entries " + entrySet(jcache(fi).localEntries()), |
| GridTestUtils.waitForCondition( |
| // Preloading may happen as nodes leave, so we need to wait. |
| new GridAbsPredicateX() { |
| @Override public boolean applyx() throws IgniteCheckedException { |
| jcache(fi).removeAll(); |
| |
| if (jcache(fi).size(CachePeekMode.ALL) > 0) { |
| for (Cache.Entry<String, ?> k : jcache(fi).localEntries()) |
| jcache(fi).remove(k.getKey()); |
| } |
| |
| return jcache(fi).localSize(CachePeekMode.ALL) == 0; |
| } |
| }, |
| getTestTimeout())); |
| |
| int primaryKeySize = jcache(i).localSize(CachePeekMode.PRIMARY); |
| int keySize = jcache(i).localSize(); |
| int size = jcache(i).localSize(); |
| int globalSize = jcache(i).size(); |
| int globalPrimarySize = jcache(i).size(CachePeekMode.PRIMARY); |
| |
| info("Size after [idx=" + i + |
| ", size=" + size + |
| ", keySize=" + keySize + |
| ", primarySize=" + primaryKeySize + |
| ", globalSize=" + globalSize + |
| ", globalPrimarySize=" + globalPrimarySize + |
| ", entrySet=" + jcache(i).localEntries() + ']'); |
| |
| assertEquals("Cache is not empty [idx=" + i + ", entrySet=" + jcache(i).localEntries() + ']', |
| 0, jcache(i).localSize(CachePeekMode.ALL)); |
| |
| break; |
| } |
| catch (Exception e) { |
| if (X.hasCause(e, ClusterTopologyCheckedException.class)) { |
| info("Got topology exception while tear down (will retry in 1000ms)."); |
| |
| U.sleep(1000); |
| } |
| else |
| throw e; |
| } |
| } |
| } |
| |
| assert jcache().unwrap(Ignite.class).transactions().tx() == null; |
| assertEquals("Cache is not empty", 0, jcache().localSize(CachePeekMode.ALL)); |
| |
| if (storeStgy != null) |
| storeStgy.resetStore(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); |
| |
| TcpDiscoverySpi disco = new TcpDiscoverySpi(); |
| |
| disco.setIpFinder(ipFinder); |
| |
| if (isDebug()) |
| disco.setAckTimeout(Integer.MAX_VALUE); |
| |
| cfg.setDiscoverySpi(disco); |
| |
| cfg.setCacheConfiguration(cacheConfiguration(igniteInstanceName)); |
| |
| TcpCommunicationSpi comm = new TcpCommunicationSpi(); |
| |
| comm.setSharedMemoryPort(-1); |
| |
| cfg.setCommunicationSpi(comm); |
| |
| return cfg; |
| } |
| |
| /** |
| * @param igniteInstanceName Ignite instance name. |
| * @return Cache configuration. |
| * @throws Exception In case of error. |
| */ |
| @SuppressWarnings("unchecked") |
| protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception { |
| CacheConfiguration cfg = defaultCacheConfiguration(); |
| |
| if (storeStgy != null) { |
| Factory<? extends CacheStore<Object, Object>> storeFactory = storeStgy.getStoreFactory(); |
| |
| CacheStore<?, ?> store = storeFactory.create(); |
| |
| if (store != null) { |
| cfg.setCacheStoreFactory(storeFactory); |
| cfg.setReadThrough(true); |
| cfg.setWriteThrough(true); |
| cfg.setLoadPreviousValue(true); |
| storeStgy.updateCacheConfiguration(cfg); |
| } |
| } |
| |
| cfg.setCacheMode(cacheMode()); |
| cfg.setAtomicityMode(atomicityMode()); |
| cfg.setWriteSynchronizationMode(writeSynchronization()); |
| cfg.setNearConfiguration(nearConfiguration()); |
| cfg.setOnheapCacheEnabled(onheapCacheEnabled()); |
| |
| Class<?>[] idxTypes = indexedTypes(); |
| |
| if (!F.isEmpty(idxTypes)) |
| cfg.setIndexedTypes(idxTypes); |
| |
| if (cacheMode() == PARTITIONED) |
| cfg.setBackups(backups()); |
| |
| return cfg; |
| } |
| |
| /** |
| * Indexed types. |
| */ |
| protected Class<?>[] indexedTypes() { |
| return null; |
| } |
| |
| /** |
| * @return Default cache mode. |
| */ |
| protected CacheMode cacheMode() { |
| return CacheConfiguration.DFLT_CACHE_MODE; |
| } |
| |
| /** |
| * @return Cache atomicity mode. |
| */ |
| protected CacheAtomicityMode atomicityMode() { |
| return TRANSACTIONAL; |
| } |
| |
| /** |
| * @return Partitioned mode. |
| */ |
| protected NearCacheConfiguration nearConfiguration() { |
| return new NearCacheConfiguration(); |
| } |
| |
| /** |
| * @return Write synchronization. |
| */ |
| protected CacheWriteSynchronizationMode writeSynchronization() { |
| return FULL_SYNC; |
| } |
| |
| /** |
| * @return {@code true} if swap should be enabled. |
| */ |
| protected boolean swapEnabled() { |
| return true; |
| } |
| |
| /** |
| * @return {@code true} if near cache should be enabled. |
| */ |
| protected boolean nearEnabled() { |
| return nearConfiguration() != null; |
| } |
| |
| /** |
| * @return {@code True} if transactions are enabled. |
| * @see #txShouldBeUsed() |
| */ |
| protected boolean txEnabled() { |
| return true; |
| } |
| |
| /** |
| * @return {@code True} if transactions should be used. |
| */ |
| protected boolean txShouldBeUsed() { |
| return txEnabled() && !isMultiJvm(); |
| } |
| |
| /** |
| * @return {@code True} if locking is enabled. |
| */ |
| protected boolean lockingEnabled() { |
| return true; |
| } |
| |
| /** |
| * @return {@code True} if on-heap cache is enabled. |
| */ |
| protected boolean onheapCacheEnabled() { |
| return false; |
| } |
| |
| /** |
| * @return {@code True} for partitioned caches. |
| */ |
| protected final boolean partitionedMode() { |
| return cacheMode() == PARTITIONED; |
| } |
| |
| /** |
| * @return Default cache instance. |
| */ |
| @SuppressWarnings({"unchecked"}) |
| @Override protected IgniteCache<String, Integer> jcache() { |
| return jcache(0); |
| } |
| |
| /** |
| * @return Transactions instance. |
| */ |
| protected IgniteTransactions transactions() { |
| return grid(0).transactions(); |
| } |
| |
| /** |
| * @return Backups. |
| */ |
| protected int backups() { |
| return 1; |
| } |
| |
| /** |
| * @param idx Index of grid. |
| * @return Default cache. |
| */ |
| @SuppressWarnings({"unchecked"}) |
| @Override protected IgniteCache<String, Integer> jcache(int idx) { |
| return ignite(idx).cache(DEFAULT_CACHE_NAME).withAllowAtomicOpsInTx(); |
| } |
| |
| /** |
| * @param idx Index of grid. |
| * @return Cache context. |
| */ |
| protected GridCacheContext<String, Integer> context(final int idx) { |
| if (isRemoteJvm(idx) && !isRemoteJvm()) |
| throw new UnsupportedOperationException("Operation can't be done automatically via proxy. " + |
| "Send task with this logic on remote jvm instead."); |
| |
| return ((IgniteKernal)grid(idx)).<String, Integer>internalCache(DEFAULT_CACHE_NAME).context(); |
| } |
| |
| /** |
| * @param key Key. |
| * @param idx Node index. |
| * @return {@code True} if key belongs to node with index idx. |
| */ |
| protected boolean belongs(String key, int idx) { |
| return context(idx).cache().affinity().isPrimaryOrBackup(context(idx).localNode(), key); |
| } |
| |
| /** |
| * Executes regular peek or peek from swap. |
| * |
| * @param cache Cache projection. |
| * @param key Key. |
| * @return Value. |
| */ |
| @Nullable protected <K, V> V peek(IgniteCache<K, V> cache, K key) { |
| return cache.localPeek(key); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param key Key. |
| * @return {@code True} if cache contains given key. |
| * @throws Exception If failed. |
| */ |
| @SuppressWarnings("unchecked") |
| protected boolean containsKey(IgniteCache cache, Object key) throws Exception { |
| return cache.containsKey(key); |
| } |
| |
| /** |
| * Filters cache entry projections leaving only ones with keys containing 'key'. |
| */ |
| protected static IgnitePredicate<Cache.Entry<String, Integer>> entryKeyFilter = |
| new P1<Cache.Entry<String, Integer>>() { |
| @Override public boolean apply(Cache.Entry<String, Integer> entry) { |
| return entry.getKey().contains("key"); |
| } |
| }; |
| |
| /** |
| * Filters cache entry projections leaving only ones with keys not containing 'key'. |
| */ |
| protected static IgnitePredicate<Cache.Entry<String, Integer>> entryKeyFilterInv = |
| new P1<Cache.Entry<String, Integer>>() { |
| @Override public boolean apply(Cache.Entry<String, Integer> entry) { |
| return !entry.getKey().contains("key"); |
| } |
| }; |
| |
| /** |
| * Filters cache entry projections leaving only ones with values less than 50. |
| */ |
| protected static final IgnitePredicate<Cache.Entry<String, Integer>> lt50 = |
| new P1<Cache.Entry<String, Integer>>() { |
| @Override public boolean apply(Cache.Entry<String, Integer> entry) { |
| Integer i = entry.getValue(); |
| |
| return i != null && i < 50; |
| } |
| }; |
| |
| /** |
| * Filters cache entry projections leaving only ones with values greater or equal than 100. |
| */ |
| protected static final IgnitePredicate<Cache.Entry<String, Integer>> gte100 = |
| new P1<Cache.Entry<String, Integer>>() { |
| @Override public boolean apply(Cache.Entry<String, Integer> entry) { |
| Integer i = entry.getValue(); |
| |
| return i != null && i >= 100; |
| } |
| |
| @Override public String toString() { |
| return "gte100"; |
| } |
| }; |
| |
| /** |
| * Filters cache entry projections leaving only ones with values greater or equal than 200. |
| */ |
| protected static final IgnitePredicate<Cache.Entry<String, Integer>> gte200 = |
| new P1<Cache.Entry<String, Integer>>() { |
| @Override public boolean apply(Cache.Entry<String, Integer> entry) { |
| Integer i = entry.getValue(); |
| |
| return i != null && i >= 200; |
| } |
| |
| @Override public String toString() { |
| return "gte200"; |
| } |
| }; |
| |
| /** |
| * {@link org.apache.ignite.lang.IgniteInClosure} for calculating sum. |
| */ |
| protected static final class SumVisitor implements CI1<Cache.Entry<String, Integer>> { |
| /** */ |
| private final AtomicInteger sum; |
| |
| /** |
| * @param sum {@link AtomicInteger} instance for accumulating sum. |
| */ |
| public SumVisitor(AtomicInteger sum) { |
| this.sum = sum; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void apply(Cache.Entry<String, Integer> entry) { |
| if (entry.getValue() != null) { |
| Integer i = entry.getValue(); |
| |
| assert i != null : "Value cannot be null for entry: " + entry; |
| |
| sum.addAndGet(i); |
| } |
| } |
| } |
| |
| /** |
| * {@link org.apache.ignite.lang.IgniteReducer} for calculating sum. |
| */ |
| protected static final class SumReducer implements R1<Cache.Entry<String, Integer>, Integer> { |
| /** */ |
| private int sum; |
| |
| /** */ |
| public SumReducer() { |
| // no-op |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean collect(Cache.Entry<String, Integer> entry) { |
| if (entry.getValue() != null) { |
| Integer i = entry.getValue(); |
| |
| assert i != null; |
| |
| sum += i; |
| } |
| |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Integer reduce() { |
| return sum; |
| } |
| } |
| |
| /** */ |
| protected enum ResourceType { |
| /** */ |
| IGNITE_INSTANCE, |
| |
| /** */ |
| CACHE_NAME, |
| |
| /** */ |
| SPRING_APPLICATION_CONTEXT, |
| |
| /** */ |
| LOGGER, |
| |
| /** */ |
| SERVICE, |
| |
| /** */ |
| SPRING_BEAN, |
| |
| } |
| |
| /** |
| * |
| */ |
| protected static class ResourceInfoSet { |
| /** */ |
| int val; |
| |
| /** */ |
| public ResourceInfoSet() { |
| this(0); |
| } |
| |
| /** */ |
| public ResourceInfoSet(int val) { |
| this.val = val; |
| } |
| |
| /** |
| * @param val Value. |
| */ |
| public static ResourceInfoSet valueOf(int val) { |
| return new ResourceInfoSet(val); |
| } |
| |
| /** */ |
| public int getValue() { |
| return val; |
| } |
| |
| /** |
| * @param type Type. |
| * @param injected Injected. |
| */ |
| public ResourceInfoSet set(ResourceType type, boolean injected) { |
| int mask = 1 << type.ordinal(); |
| |
| if (injected) |
| val |= mask; |
| else |
| val &= ~mask; |
| |
| return this; |
| } |
| |
| /** |
| * @see {@link #set(ResourceType, boolean)} |
| */ |
| public ResourceInfoSet set(ResourceType type, Object toCheck) { |
| return set(type, toCheck != null); |
| } |
| |
| /** |
| * @return collection of not injected resources |
| */ |
| public Collection<ResourceType> notInjected(Collection<ResourceType> exp) { |
| ArrayList<ResourceType> res = null; |
| |
| for (ResourceType type : exp) { |
| int mask = 1 << type.ordinal(); |
| |
| if ((this.val & mask) == 0) { |
| if (res == null) |
| res = new ArrayList<>(); |
| |
| res.add(type); |
| } |
| } |
| |
| return res == null ? Collections.<ResourceType>emptyList() : res; |
| } |
| } |
| |
| /** |
| * |
| */ |
| protected abstract static class ResourceInjectionEntryProcessorBase<K, V> |
| implements EntryProcessor<K, V, Integer>, Serializable { |
| /** */ |
| protected transient ResourceInfoSet infoSet; |
| |
| /** {@inheritDoc} */ |
| @Override public Integer process(MutableEntry<K, V> e, Object... args) { |
| return infoSet == null ? null : infoSet.getValue(); |
| } |
| |
| /** */ |
| protected void checkSet() { |
| if (infoSet == null) |
| infoSet = new ResourceInfoSet(); |
| } |
| } |
| } |