blob: a3ce7b5c9c075cfe66d2fb7218791015b5cdaea6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.integration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriter;
import javax.cache.integration.CompletionListener;
import javax.cache.integration.CompletionListenerFuture;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.junit.Before;
import org.junit.Test;
/**
* Test for {@link Cache#loadAll(Set, boolean, CompletionListener)}.
*/
public abstract class IgniteCacheLoadAllAbstractTest extends IgniteCacheAbstractTest {
/** */
private volatile boolean writeThrough = true;
/** */
private static ConcurrentHashMap<Object, Object> storeMap;
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE);
CacheConfiguration ccfg = super.cacheConfiguration(igniteInstanceName);
ccfg.setWriteThrough(writeThrough);
ccfg.setCacheLoaderFactory(new CacheLoaderFactory());
ccfg.setCacheWriterFactory(new CacheWriterFactory());
return ccfg;
}
/** */
@Before
public void beforeIgniteCacheLoadAllAbstractTest() {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE);
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE);
super.beforeTest();
storeMap = new ConcurrentHashMap<>();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
storeMap = null;
}
/**
* @throws Exception If failed.
*/
@Test
public void testLoadAll() throws Exception {
IgniteCache<Integer, String> cache0 = jcache(0);
// Put some data in cache, it also should be put in store.
final int KEYS = 10;
for (int i = 0; i < KEYS; i++)
cache0.put(i, String.valueOf(i));
// Restart nodes with write-through disabled so that data in store does not change.
stopAllGrids();
writeThrough = false;
startGrids();
cache0 = jcache(0);
Set<Integer> keys = new HashSet<>();
Map<Integer, String> expVals = new HashMap<>();
for (int i = 0; i < KEYS / 2; i++) {
keys.add(i);
expVals.put(i, String.valueOf(i));
}
for (int i = KEYS + 1000; i < KEYS + 1010; i++)
keys.add(i);
CompletionListenerFuture fut = new CompletionListenerFuture();
log.info("Load1.");
cache0.loadAll(keys, false, fut);
fut.get();
checkValues(KEYS, expVals);
HashMap<Integer, String> expChangedVals = new HashMap<>();
for (int i = 0; i < KEYS / 2; i++) {
String val = "changed";
cache0.put(i, val);
expChangedVals.put(i, val);
}
checkValues(KEYS, expChangedVals);
fut = new CompletionListenerFuture();
log.info("Load2.");
cache0.loadAll(keys, false, fut);
fut.get();
checkValues(KEYS, expChangedVals);
log.info("Load3.");
fut = new CompletionListenerFuture();
cache0.loadAll(keys, true, fut);
fut.get();
checkValues(KEYS, expVals);
for (int i = 0; i < KEYS; i++) {
keys.add(i);
expVals.put(i, String.valueOf(i));
}
fut = new CompletionListenerFuture();
log.info("Load4.");
cache0.loadAll(keys, false, fut);
fut.get();
checkValues(KEYS, expVals);
}
/**
* @param keys Keys to check.
* @param expVals Expected values.
*/
private void checkValues(int keys, Map<Integer, String> expVals) {
Affinity<Object> aff = grid(0).affinity(DEFAULT_CACHE_NAME);
for (int i = 0; i < gridCount(); i++) {
ClusterNode node = ignite(i).cluster().localNode();
IgniteCache<Integer, String> cache = jcache(i);
for (int key = 0; key < keys; key++) {
String expVal = expVals.get(key);
if (aff.isPrimaryOrBackup(node, key)) {
assertEquals(expVal, cache.localPeek(key));
assertEquals(expVal, cache.get(key));
}
else {
assertNull(cache.localPeek(key));
if (!expVals.containsKey(key))
assertNull(cache.get(key));
}
}
for (int key = keys + 1000; i < keys + 1010; i++) {
assertNull(cache.localPeek(key));
assertNull(cache.get(key));
}
}
}
/**
*
*/
private static class CacheLoaderFactory implements Factory<CacheLoader> {
/** {@inheritDoc} */
@Override public CacheLoader create() {
return new CacheLoader<Object, Object>() {
@Override public Object load(Object key) throws CacheLoaderException {
return storeMap.get(key);
}
@Override public Map<Object, Object> loadAll(Iterable<?> keys) throws CacheLoaderException {
Map<Object, Object> loaded = new HashMap<>();
for (Object key : keys) {
Object val = storeMap.get(key);
if (val != null)
loaded.put(key, val);
}
return loaded;
}
};
}
}
/**
*
*/
private static class CacheWriterFactory implements Factory<CacheWriter> {
/** {@inheritDoc} */
@Override public CacheWriter create() {
return new CacheWriter<Object, Object>() {
@Override public void write(Cache.Entry<?, ?> e) {
storeMap.put(e.getKey(), e.getValue());
}
@Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) {
for (Cache.Entry<?, ?> e : entries)
write(e);
}
@Override public void delete(Object key) {
storeMap.remove(key);
}
@Override public void deleteAll(Collection<?> keys) {
for (Object key : keys)
delete(key);
}
};
}
}
}