blob: 2e64c525851c4815d24d0bdee29cffe52b06beec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.R1;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
* Abstract class for cache tests.
*/
public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest {
/** Test timeout */
private static final long TEST_TIMEOUT = 30 * 1000;
/** VM ip finder for TCP discovery. */
protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
/** */
protected static TestCacheStoreStrategy storeStgy;
/**
* @return Grids count to start.
*/
protected abstract int gridCount();
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return TEST_TIMEOUT;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
int cnt = gridCount();
assert cnt >= 1 : "At least one grid must be started";
if (!MvccFeatureChecker.forcedMvcc() || MvccFeatureChecker.isSupported(MvccFeatureChecker.Feature.CACHE_STORE))
initStoreStrategy();
startGrids(cnt);
awaitPartitionMapExchange();
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
if (storeStgy != null)
storeStgy.resetStore();
}
/**
* Initializes {@link #storeStgy} with respect to the nature of the test.
*
* @throws IgniteCheckedException If failed.
*/
protected void initStoreStrategy() throws IgniteCheckedException {
if (storeStgy == null)
storeStgy = isMultiJvm() ? new H2CacheStoreStrategy() : new MapCacheStoreStrategy();
else if (isMultiJvm() && !(storeStgy instanceof H2CacheStoreStrategy))
storeStgy = new H2CacheStoreStrategy();
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
assert jcache().unwrap(Ignite.class).transactions().tx() == null;
assertEquals(0, jcache().localSize());
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
Transaction tx = jcache().unwrap(Ignite.class).transactions().tx();
if (tx != null) {
tx.close();
fail("Cache transaction remained after test completion: " + tx);
}
for (int i = 0; i < gridCount(); i++) {
info("Checking grid: " + i);
while (true) {
try {
final int fi = i;
assertTrue(
"Cache is not empty: " + " localSize = " + jcache(fi).localSize(CachePeekMode.ALL)
+ ", local entries " + entrySet(jcache(fi).localEntries()),
GridTestUtils.waitForCondition(
// Preloading may happen as nodes leave, so we need to wait.
new GridAbsPredicateX() {
@Override public boolean applyx() throws IgniteCheckedException {
jcache(fi).removeAll();
if (jcache(fi).size(CachePeekMode.ALL) > 0) {
for (Cache.Entry<String, ?> k : jcache(fi).localEntries())
jcache(fi).remove(k.getKey());
}
return jcache(fi).localSize(CachePeekMode.ALL) == 0;
}
},
getTestTimeout()));
int primaryKeySize = jcache(i).localSize(CachePeekMode.PRIMARY);
int keySize = jcache(i).localSize();
int size = jcache(i).localSize();
int globalSize = jcache(i).size();
int globalPrimarySize = jcache(i).size(CachePeekMode.PRIMARY);
info("Size after [idx=" + i +
", size=" + size +
", keySize=" + keySize +
", primarySize=" + primaryKeySize +
", globalSize=" + globalSize +
", globalPrimarySize=" + globalPrimarySize +
", entrySet=" + jcache(i).localEntries() + ']');
assertEquals("Cache is not empty [idx=" + i + ", entrySet=" + jcache(i).localEntries() + ']',
0, jcache(i).localSize(CachePeekMode.ALL));
break;
}
catch (Exception e) {
if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
info("Got topology exception while tear down (will retry in 1000ms).");
U.sleep(1000);
}
else
throw e;
}
}
}
assert jcache().unwrap(Ignite.class).transactions().tx() == null;
assertEquals("Cache is not empty", 0, jcache().localSize(CachePeekMode.ALL));
if (storeStgy != null)
storeStgy.resetStore();
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
TcpDiscoverySpi disco = new TcpDiscoverySpi();
disco.setIpFinder(ipFinder);
if (isDebug())
disco.setAckTimeout(Integer.MAX_VALUE);
cfg.setDiscoverySpi(disco);
cfg.setCacheConfiguration(cacheConfiguration(igniteInstanceName));
TcpCommunicationSpi comm = new TcpCommunicationSpi();
comm.setSharedMemoryPort(-1);
cfg.setCommunicationSpi(comm);
return cfg;
}
/**
* @param igniteInstanceName Ignite instance name.
* @return Cache configuration.
* @throws Exception In case of error.
*/
@SuppressWarnings("unchecked")
protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception {
CacheConfiguration cfg = defaultCacheConfiguration();
if (storeStgy != null) {
Factory<? extends CacheStore<Object, Object>> storeFactory = storeStgy.getStoreFactory();
CacheStore<?, ?> store = storeFactory.create();
if (store != null) {
cfg.setCacheStoreFactory(storeFactory);
cfg.setReadThrough(true);
cfg.setWriteThrough(true);
cfg.setLoadPreviousValue(true);
storeStgy.updateCacheConfiguration(cfg);
}
}
cfg.setCacheMode(cacheMode());
cfg.setAtomicityMode(atomicityMode());
cfg.setWriteSynchronizationMode(writeSynchronization());
cfg.setNearConfiguration(nearConfiguration());
cfg.setOnheapCacheEnabled(onheapCacheEnabled());
Class<?>[] idxTypes = indexedTypes();
if (!F.isEmpty(idxTypes))
cfg.setIndexedTypes(idxTypes);
if (cacheMode() == PARTITIONED)
cfg.setBackups(backups());
return cfg;
}
/**
* Indexed types.
*/
protected Class<?>[] indexedTypes() {
return null;
}
/**
* @return Default cache mode.
*/
protected CacheMode cacheMode() {
return CacheConfiguration.DFLT_CACHE_MODE;
}
/**
* @return Cache atomicity mode.
*/
protected CacheAtomicityMode atomicityMode() {
return TRANSACTIONAL;
}
/**
* @return Partitioned mode.
*/
protected NearCacheConfiguration nearConfiguration() {
return new NearCacheConfiguration();
}
/**
* @return Write synchronization.
*/
protected CacheWriteSynchronizationMode writeSynchronization() {
return FULL_SYNC;
}
/**
* @return {@code true} if swap should be enabled.
*/
protected boolean swapEnabled() {
return true;
}
/**
* @return {@code true} if near cache should be enabled.
*/
protected boolean nearEnabled() {
return nearConfiguration() != null;
}
/**
* @return {@code True} if transactions are enabled.
* @see #txShouldBeUsed()
*/
protected boolean txEnabled() {
return true;
}
/**
* @return {@code True} if transactions should be used.
*/
protected boolean txShouldBeUsed() {
return txEnabled() && !isMultiJvm();
}
/**
* @return {@code True} if locking is enabled.
*/
protected boolean lockingEnabled() {
return true;
}
/**
* @return {@code True} if on-heap cache is enabled.
*/
protected boolean onheapCacheEnabled() {
return false;
}
/**
* @return {@code True} for partitioned caches.
*/
protected final boolean partitionedMode() {
return cacheMode() == PARTITIONED;
}
/**
* @return Default cache instance.
*/
@SuppressWarnings({"unchecked"})
@Override protected IgniteCache<String, Integer> jcache() {
return jcache(0);
}
/**
* @return Transactions instance.
*/
protected IgniteTransactions transactions() {
return grid(0).transactions();
}
/**
* @return Backups.
*/
protected int backups() {
return 1;
}
/**
* @param idx Index of grid.
* @return Default cache.
*/
@SuppressWarnings({"unchecked"})
@Override protected IgniteCache<String, Integer> jcache(int idx) {
return ignite(idx).cache(DEFAULT_CACHE_NAME).withAllowAtomicOpsInTx();
}
/**
* @param idx Index of grid.
* @return Cache context.
*/
protected GridCacheContext<String, Integer> context(final int idx) {
if (isRemoteJvm(idx) && !isRemoteJvm())
throw new UnsupportedOperationException("Operation can't be done automatically via proxy. " +
"Send task with this logic on remote jvm instead.");
return ((IgniteKernal)grid(idx)).<String, Integer>internalCache(DEFAULT_CACHE_NAME).context();
}
/**
* @param key Key.
* @param idx Node index.
* @return {@code True} if key belongs to node with index idx.
*/
protected boolean belongs(String key, int idx) {
return context(idx).cache().affinity().isPrimaryOrBackup(context(idx).localNode(), key);
}
/**
* Executes regular peek or peek from swap.
*
* @param cache Cache projection.
* @param key Key.
* @return Value.
*/
@Nullable protected <K, V> V peek(IgniteCache<K, V> cache, K key) {
return cache.localPeek(key);
}
/**
* @param cache Cache.
* @param key Key.
* @return {@code True} if cache contains given key.
* @throws Exception If failed.
*/
@SuppressWarnings("unchecked")
protected boolean containsKey(IgniteCache cache, Object key) throws Exception {
return cache.containsKey(key);
}
/**
* Filters cache entry projections leaving only ones with keys containing 'key'.
*/
protected static IgnitePredicate<Cache.Entry<String, Integer>> entryKeyFilter =
new P1<Cache.Entry<String, Integer>>() {
@Override public boolean apply(Cache.Entry<String, Integer> entry) {
return entry.getKey().contains("key");
}
};
/**
* Filters cache entry projections leaving only ones with keys not containing 'key'.
*/
protected static IgnitePredicate<Cache.Entry<String, Integer>> entryKeyFilterInv =
new P1<Cache.Entry<String, Integer>>() {
@Override public boolean apply(Cache.Entry<String, Integer> entry) {
return !entry.getKey().contains("key");
}
};
/**
* Filters cache entry projections leaving only ones with values less than 50.
*/
protected static final IgnitePredicate<Cache.Entry<String, Integer>> lt50 =
new P1<Cache.Entry<String, Integer>>() {
@Override public boolean apply(Cache.Entry<String, Integer> entry) {
Integer i = entry.getValue();
return i != null && i < 50;
}
};
/**
* Filters cache entry projections leaving only ones with values greater or equal than 100.
*/
protected static final IgnitePredicate<Cache.Entry<String, Integer>> gte100 =
new P1<Cache.Entry<String, Integer>>() {
@Override public boolean apply(Cache.Entry<String, Integer> entry) {
Integer i = entry.getValue();
return i != null && i >= 100;
}
@Override public String toString() {
return "gte100";
}
};
/**
* Filters cache entry projections leaving only ones with values greater or equal than 200.
*/
protected static final IgnitePredicate<Cache.Entry<String, Integer>> gte200 =
new P1<Cache.Entry<String, Integer>>() {
@Override public boolean apply(Cache.Entry<String, Integer> entry) {
Integer i = entry.getValue();
return i != null && i >= 200;
}
@Override public String toString() {
return "gte200";
}
};
/**
* {@link org.apache.ignite.lang.IgniteInClosure} for calculating sum.
*/
protected static final class SumVisitor implements CI1<Cache.Entry<String, Integer>> {
/** */
private final AtomicInteger sum;
/**
* @param sum {@link AtomicInteger} instance for accumulating sum.
*/
public SumVisitor(AtomicInteger sum) {
this.sum = sum;
}
/** {@inheritDoc} */
@Override public void apply(Cache.Entry<String, Integer> entry) {
if (entry.getValue() != null) {
Integer i = entry.getValue();
assert i != null : "Value cannot be null for entry: " + entry;
sum.addAndGet(i);
}
}
}
/**
* {@link org.apache.ignite.lang.IgniteReducer} for calculating sum.
*/
protected static final class SumReducer implements R1<Cache.Entry<String, Integer>, Integer> {
/** */
private int sum;
/** */
public SumReducer() {
// no-op
}
/** {@inheritDoc} */
@Override public boolean collect(Cache.Entry<String, Integer> entry) {
if (entry.getValue() != null) {
Integer i = entry.getValue();
assert i != null;
sum += i;
}
return true;
}
/** {@inheritDoc} */
@Override public Integer reduce() {
return sum;
}
}
/** */
protected enum ResourceType {
/** */
IGNITE_INSTANCE,
/** */
CACHE_NAME,
/** */
SPRING_APPLICATION_CONTEXT,
/** */
LOGGER,
/** */
SERVICE,
/** */
SPRING_BEAN,
}
/**
*
*/
protected static class ResourceInfoSet {
/** */
int val;
/** */
public ResourceInfoSet() {
this(0);
}
/** */
public ResourceInfoSet(int val) {
this.val = val;
}
/**
* @param val Value.
*/
public static ResourceInfoSet valueOf(int val) {
return new ResourceInfoSet(val);
}
/** */
public int getValue() {
return val;
}
/**
* @param type Type.
* @param injected Injected.
*/
public ResourceInfoSet set(ResourceType type, boolean injected) {
int mask = 1 << type.ordinal();
if (injected)
val |= mask;
else
val &= ~mask;
return this;
}
/**
* @see {@link #set(ResourceType, boolean)}
*/
public ResourceInfoSet set(ResourceType type, Object toCheck) {
return set(type, toCheck != null);
}
/**
* @return collection of not injected resources
*/
public Collection<ResourceType> notInjected(Collection<ResourceType> exp) {
ArrayList<ResourceType> res = null;
for (ResourceType type : exp) {
int mask = 1 << type.ordinal();
if ((this.val & mask) == 0) {
if (res == null)
res = new ArrayList<>();
res.add(type);
}
}
return res == null ? Collections.<ResourceType>emptyList() : res;
}
}
/**
*
*/
protected abstract static class ResourceInjectionEntryProcessorBase<K, V>
implements EntryProcessor<K, V, Integer>, Serializable {
/** */
protected transient ResourceInfoSet infoSet;
/** {@inheritDoc} */
@Override public Integer process(MutableEntry<K, V> e, Object... args) {
return infoSet == null ? null : infoSet.getValue();
}
/** */
protected void checkSet() {
if (infoSet == null)
infoSet = new ResourceInfoSet();
}
}
}